【redis源码学习】从源码角度看主从复制(2):主从之间的“三次握手”
2023-09-14 09:15:26 时间
主从握手流程
1、发送 REPLICAOF 命令到某个服务端,要求它成为指定服务器的从节点
2、在配置文件中写明主从关系
下面我们从从节点的视角来看主从握手环节:
一次握手
从节点使用replicaofCommand 函数处理 REPLICAOF 命令,该命令主要逻辑为:
1、如果处理的命令是:REPLICAOF NO ONE,则将当前服务器转换为主节点,取消原来的主从复制关系。
2、调用 replicationSetMaster 函数,与指定服务器建立主从复制关系。
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) {
freeClient(server.master); //如果已连接了主节点,则从原来的主节点离开
}
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
//断开当前节点所有的从节点连接,使这些从节点重新发起同步流程
disconnectSlaves();
cancelReplicationHandshake();
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.repl_state = REPL_STATE_CONNECT; //预示着主从复制流程开始
}
二次握手
serverCron 时间事件有负责对 REPL_STATE_CONNECT 状态进行处理,其中调用 connectWithMaster 函数进行处理,该函数负责建立主从网络连接:
int connectWithMaster(void) {
//创建一个Socket 套接字,connCreateTLS函数创建TLS连接,connCreateSocket 函数创建TCP连接。
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
//负责连接到主节点,并在连接成功后调用syncWithMaster 函数
if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
connGetLastError(server.repl_transfer_s));
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
return C_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_state = REPL_STATE_CONNECTING; //从节点进入 REPL_STATE_CONNECTING 状态
return C_OK;
}
三次握手
网络连接成功后,从节点调用 syncWithMaster 函数,进入握手阶段
/* This handler fires when the non blocking connect was able to
* establish a connection with the master. */
void syncWithMaster(connection *conn) {
char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5;
int psync_result;
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP. */
if (server.repl_state == REPL_STATE_NONE) {
connClose(conn);
return;
}
/* Check for errors in the socket: after a non blocking connect() we
* may find that the socket is in error state. */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
connGetLastError(conn));
goto error;
}
//根据 server.repl_state 执行对应操作,具体见下文
/* Send a PING to check the master is able to reply without errors. */
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithMaster);
connSetWriteHandler(conn, NULL);
server.repl_state = REPL_STATE_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);
if (err) goto write_error;
return;
}
/* Receive the PONG command. */
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE,
"Master replied to PING, replication can continue...");
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_AUTH;
}
/* AUTH with the master if required. */
if (server.repl_state == REPL_STATE_SEND_AUTH) {
if (server.masteruser && server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",
server.masteruser,server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else if (server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else {
server.repl_state = REPL_STATE_SEND_PORT;
}
}
/* Receive AUTH reply. */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PORT;
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
if (server.repl_state == REPL_STATE_SEND_PORT) {
int port;
if (server.slave_announce_port) port = server.slave_announce_port;
else if (server.tls_replication && server.tls_port) port = server.tls_port;
else port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"listening-port",portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}
/* Receive REPLCONF listening-port reply. */
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF listening-port: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP;
}
/* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
if (server.repl_state == REPL_STATE_SEND_IP &&
server.slave_announce_ip == NULL)
{
server.repl_state = REPL_STATE_SEND_CAPA;
}
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP;
return;
}
/* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF ip-address: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_CAPA;
}
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return;
}
/* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
//执行到这里,主从握手阶段已经完成。
/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
"state should be RECEIVE_PSYNC but is %d",
server.repl_state);
goto error;
}
psync_result = slaveTryPartialResynchronization(conn,1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
/* If the master is in an transient error, we should try to PSYNC
* from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its
* master and so forth. */
if (psync_result == PSYNC_TRY_LATER) goto error;
/* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n");
redisCommunicateSystemd("READY=1\n");
}
return;
}
/* PSYNC failed or is not supported: we want our slaves to resync with us
* as well, if we have any sub-slaves. The master may transfer us an
* entirely different data set and we have no way to incrementally feed
* our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.master_replid and master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
serverLog(LL_NOTICE,"Retrying with SYNC...");
if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
/* Prepare a suitable temp file for bulk transfer */
if (!useDisklessLoad()) {
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.repl_transfer_fd = dfd;
}
/* Setup the non blocking download of the bulk file. */
if (connSetReadHandler(conn, readSyncBulkPayload)
== C_ERR)
{
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (%s)",
strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_lastio = server.unixtime;
return;
error:
if (dfd != -1) close(dfd);
connClose(conn);
server.repl_transfer_s = NULL;
if (server.repl_transfer_fd != -1)
close(server.repl_transfer_fd);
if (server.repl_transfer_tmpfile)
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_state = REPL_STATE_CONNECT;
return;
write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
sdsfree(err);
goto error;
}
/* Slave replication state. Used in server.repl_state for slaves to remember
* what to do next. */
#define REPL_STATE_NONE 0 /* No active replication */
#define REPL_STATE_CONNECT 1 /* Must connect to master */
#define REPL_STATE_CONNECTING 2 /* Connecting to master */
/* --- Handshake states, must be ordered --- */
#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */
#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */
#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */
#define REPL_STATE_CONNECTED 15 /* Connected to master */
从主节点视角来看,主节点不断处理来自从节点的命令。
相关文章
- Redis入门教程:学习Redis的基础技能(redis教程)
- 学习Redis队列——看视频讲解(redis队列视频讲解)
- 缓存之旅:从 Redis 迁移到新世界(redis缓存迁移)
- 群落中的Redis学习之旅(redis学习群)
- Redis:学习如何使用它(redis 使用教程)
- 慕课网Redis视频教程学习Redis必不可少(慕课网redis视频下载)
- 全新上线慕课网Redis教程(慕课网 redis教程)
- 慕课网学习Redis,提升技能(慕课网redis教学)
- 极速安装:手把手教你源码安装Redis(源码安装redis)
- 查看Redis重启记录实现快速简便可靠的方案(查看redis 重启记录)
- 使用Redis存储集合提高数据存储性能(用redis存集合)
- 学习Redis利用其魅力掌控数据(有关redis的电子书)
- 是否该安装Redis(是否安装了redis)
- 京东云Redis使用更高性能的非凡之选(京东云redis)
- 多终端同步利用云Redis实现多终端数据同步(云redis实现)
- WSL下轻松安装Redis服务Windows Subsystem学习入门(wsl安装redis)
- 简单步骤,快速搭建web端Redis服务(web安装redis)
- 实现单项目缓存管理基于Redis的技术探索(单项目redis)
- 快速学习使用 Redis(刚使用redis怎么使用)
- Redis使用它改善你的应用程序(哪些地方用redis)
- 千峰Redis学习笔记(千峰redis笔记)
- 保障稳定运行Redis集群持久化配置(redis集群持久化配置)
- 深入探索Redis集群的源码分析(redis集群分析源码)
- Redis长期学习体系让你一次性通关(redis长文教学)
- 学习Redis,面试超级提升(redis超级面试课)
- 中的数据Redis灵活提取所有数据库资源(redis 获取所有db)
- Redis实现自增主键解决方案(redis自增主键策略)