Redis cluster 原理
Redis cluster 原理Redis cluster 设计文档可见:https://redis.io/topics/cluster-spec#configuration-handling-propagation-and-failovers相关资料https://chanjarster.github.io/post/redis-cluster-config-propagation/在当前Red
Redis cluster 原理
Redis cluster 设计文档可见:
https://redis.io/topics/cluster-spec#configuration-handling-propagation-and-failovers
相关资料
https://chanjarster.github.io/post/redis-cluster-config-propagation/
在当前Redis6.0的版本中,可以使用 redis-cli --cluster create
命令来规划一个集群,如下面的命令(前提是你已经启动了7001 7002…这些Redis服务并且cluster_enable开启)
redis-cli --cluster create 11.158.133.251:7001 11.158.133.251:7002 11.158.133.251:7003 11.158.133.251:7004 11.158.133.251:7005 11.158.133.251:7006 --cluster-replicas 1
实际上,这个--cluster create
在背后执行了诸多Redis的命令来完成集群的构建,这也是我们本文需要剖析的地方。
如何感知集群
首先, 假设你启动了3台redis机器A B C,期望构建Redis集群,构建Redis集群的前提条件是ABC三台机器要互相知道对方的存在。那如何快速的让其中一台机器,感知到另外两台机器?拍拍脑袋想当然的,那就是告诉A存在B、C ,告诉B存在A、C,告诉C存在A、B,逻辑上没问题,单Redis并没有这么做。Redis 是这么做的:
Redis 给B和C分别发送CLUSTER MEET A
,B收到CLUSTER MEET A
后,会和A进行交互,这样A、B就能互相知道对方的存在(cluster nodes命令就能看到),我们用"A.集群:A+B"表示A知道的集群信息,自然B的集群信息是"B.集群:A+B"。
接着C收到CLUSTER MEET A
后,C和A交互交换信息,因为A的信息里面包含了B,所以C就能知道A以及B,同样A也知道C的存在,此时A 和 C的状态就是"A.集群:A+B+C"、“C.集群:A+B+C”;剩下就是B了,A会定时广播自己的信息给B,所以在一段时间后,B也知道了A里面的集群信息新增了C,于是B更新自己的集群信息为"B.集群:A+B+C",最终达到一个稳定状态。
B收到CLUSTER MEET A
流程
....
} else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
/* CLUSTER MEET <ip> <port> [cport] */
long long port, cport;
if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
(char*)c->argv[3]->ptr);
return;
}
if (c->argc == 5) {
if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
(char*)c->argv[4]->ptr);
return;
}
} else {
cport = port + CLUSTER_PORT_INCR;
}
//核心是这个函数 他将 当前 需要meet的节点,加入到全局server.cluster->nodes 中
if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
errno == EINVAL)
{
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
addReply(c,shared.ok);
}
}
B 和 A交互 是在 Redis后台线程中执行的
//clusterCron 函数
di = dictGetSafeIterator(server.cluster->nodes);
server.cluster->stats_pfail_nodes = 0;
//循环遍历本redis节点的 cluster node
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
//对为建立连接的集群其他node进行建立连接
if (node->link == NULL) {
clusterLink *link = createClusterLink(node);
link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
connSetPrivateData(link->conn, link);
//核心函数是 连接建立完成后的回调函数 clusterLinkConnectHandler
if (connConnect(link->conn, node->ip, node->cport, NET_FIRST_BIND_ADDR,
clusterLinkConnectHandler) == -1) {
/* We got a synchronous error from connect before
* clusterSendPing() had a chance to be called.
* If node->ping_sent is zero, failure detection can't work,
* so we claim we actually sent a ping now (that will
* be really sent as soon as the link is obtained). */
if (node->ping_sent == 0) node->ping_sent = mstime();
serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->cport, server.neterr);
freeClusterLink(link);
continue;
}
node->link = link;
}
}
回调函数 clusterLinkConnectHandler
void clusterLinkConnectHandler(connection *conn) {
clusterLink *link = connGetPrivateData(conn);
clusterNode *node = link->node;
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
node->name, node->ip, node->cport,
connGetLastError(conn));
freeClusterLink(link);
return;
}
connSetReadHandler(conn, clusterReadHandler);
mstime_t old_ping_sent = node->ping_sent;
//发送 meet类型的ping信息,所谓的ping信息,就是包含本节已知点的其他节点信息
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
node->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->cport);
}
总结下,如果B收到CLUSTER MEET A
消息,就会保存A的地址,然后在后台线程中,和A建立连接并且给A发送PING消息,PING中的类型的meet。然后再看 A收到 B发来的PING消息如何处理。
clusterReadHandler->clusterProcessPacket
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, slaveof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. */
//将发送方加为集群的node,很好解释,因为发送meet的人本身就是集群的节点
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
nodeIp2String(node->ip,link,hdr->myip);
node->port = ntohs(hdr->port);
node->cport = ntohs(hdr->cport);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
//meet消息中携带了发送方中已经存在的节点,此时自己也需要更新这些节点到本地。
if (!sender && type == CLUSTERMSG_TYPE_MEET)
clusterProcessGossipSection(hdr,link);
/* Anyway reply with a PONG */
//回复pong,注意pong中也有当前自己本节点中已知的其他节点信息
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
至此,A和B同步了各自的节点信息,加入其他节点,同样的道理。
几个容易混淆的概念
1、PING PONG 有歧义,不是简单的心跳,而是包含了各自节点已知的集群信息。
2、CLUSTER_NODE_MEET 类的PING和普通的没多大区别,收到带有MEET标的PING的node强制认为sender是集群中的一部分。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)