本文章基于Redis 6.0.9版本,Lettuce 6.0.1.RELEASE版本

目录

1.Redis Cluster

1.1.命令路由

1.2.跨槽命令执行和所选命令的全集群执行

例子1.使用NodeSelection API从所有复制节点中读取所有键

1.3.刷新集群拓扑视图

1.4.Redis群集连接对象的连接计数

1.5.客户端选项

1.6.例子

例子2.连接到Redis集群

示例3.连接到具有多个种子节点的Redis群集

例子4.启用定期集群拓扑视图更新

例子5.启用自适应集群拓扑视图更新

例子6.获得一个节点连接


1.Redis Cluster

Lettuce通过以下方式支持Redis Cluster:

  • 支持所有CLUSTER命令
  • 基于命令键的哈希槽(hash slot)的命令路由
  • 所选集群命令的高级抽象
  • 在多个群集节点上执行命令
  • MOVED和ASK重定向处理
  • 通过插槽(slot)和host/port获得与群集节点的直接连接(自3.3起)
  • SSL和身份验证(从4.2开始)
  • 定期和自适应群集拓扑更新
  • 发布/订阅

连接到Redis集群需要一个或多个初始种子节点。 完整的集群拓扑视图(topology)(分区)是在第一个连接上获得的,因此你无需指定所有集群节点。 指定多个种子节点有助于提高弹性,因为即使种子节点不可用,Lettuce仍可以连接群集。 Lettuce拥有多个连接,可根据需要打开。 你可以自由操作这些连接。

可以将连接绑定到特定的host或nodeId。 即使nodeId由其他host处理,绑定到nodeId的连接也将始终坚持该nodeId。 对不属于群集的未知nodeId或host/ports的请求将被拒绝。 不要关闭连接。 否则,将发生不可预测的行为。 还请记住,群集连接本身会使用节点连接来执行群集操作:如果阻止一个连接,则群集连接的所有其他用户可能会受到影响。

1.1.命令路由

Redis Cluster的概念基于分片。 群集中的每个上游(upstream)节点都处理一个或多个插槽(slots)。 插槽是分片的单位,使用CRC16 MOD 16384根据命令的键计算得出。还可以使用哈希标签(例如{user:1000}.foo)指定哈希插槽。

包含至少一个键的每个请求都基于其哈希槽路由到相应的节点。没有键的命令在默认连接上执行,该默认连接最有可能指向第一个提供的RedisURI。 相同的规则适用于对多个键进行操作的命令,但限制是所有键必须位于同一插槽中。在多个插槽上运行的命令将以CROSSSLOT 错误终止。

1.2.跨槽命令执行和所选命令的全集群执行

常规Redis Cluster命令仅限于单槽键操作(共享同一哈希槽的单键命令或多键命令)。

可以通过对一组选定的多键命令使用高级群集API来缓解跨槽限制。 对具有不同插槽的键进行操作的命令将分解为多个命令。 单个命令以fork/join方式触发。 同时发出命令以避免同步链接。 结果在命令完成之前已同步。

跨槽命令执行支持以下命令:

  • DEL: 删除键。 返回已删除的键的数量。
  • EXISTS: 计算负责特定键的上游(upstream)节点中存在的键数量。
  • MGET: 获取所有给定键的值。 按键顺序返回值。
  • MSET: 为所有给定的键设置多个key/value对。 始终返回OK。
  • TOUCH: 更改所有给定键的最后访问时间。 返回被触摸的键数。
  • UNLINK: 删除键并在其他线程中回收内存。 返回已删除的键的数量。

在多个群集节点操作上执行以下命令:

  • CLIENT SETNAME: 在所有已知的群集节点连接上设置客户端名称。 始终返回OK。
  • KEYS: Return/Stream存储在所有上游节点(upstreams)的所有键。
  • DBSIZE: 返回所有上游节点(upstreams)存储的键数。
  • FLUSHALL: 刷新集群上游节点(upstreams)的所有数据。 始终返回OK。
  • FLUSHDB: 刷新集群上节点(upstreams)的所有数据。 始终返回OK。
  • RANDOMKEY: Return a random key from a random upstream.从随机上游节点(upstream)返回一个随机键。
  • SCAN: 根据ReadFrom 设置在整个集群中扫描键空间。
  • SCRIPT FLUSH: 从所有群集节点上的脚本缓存中删除所有脚本。
  • SCRIPT LOAD: 将脚本加载到所有节点上的Lua脚本缓存中。
  • SCRIPT KILL: 杀死所有集群节点上当前正在执行的脚本。 即使没有脚本在运行,此调用也不会失败。
  • SHUTDOWN: 同步将数据集保存到磁盘,然后关闭群集的所有节点。

以下API可以执行跨槽命令:

  • RedisAdvancedClusterCommands
  • RedisAdvancedClusterAsyncCommands
  • RedisAdvancedClusterReactiveCommands

一个或多个群集节点上执行命令

有时必须在多个群集节点上执行命令。 先进的群集API允许选择一组节点(例如,所有上游节点(upstreams),所有复制节点(replicas))并在该节点上触发命令。

例子1.使用NodeSelection API从所有复制节点中读取所有键

RedisAdvancedClusterAsyncCommands<String, String> async = clusterClient.connect().async();
AsyncNodeSelection<String, String> replicas = connection.slaves();

AsyncExecutions<List<String>> executions = replicas.commands().keys("*");
executions.forEach(result -> result.thenAccept(keys -> System.out.println(keys)));

这些命令是同时触发的。 该API当前仅适用于异步命令。 命令被分派到选择中的节点,结果(CompletionStage)可通过AsyncExecutions获得。

节点选择可以是动态的也可以是静态的。 动态节点选择会在集群拓扑视图刷新时更新其节点集。 可以通过以下预设来构造节点选择:

  • 上游节点(upstreams)
  • 复制节点(replicas )(在激活的READONLY模式下进行连接)
  • 所有节点(all nodes)

通过实现自定义谓词或lambda,可以自定义选择节点。

特定结果映射到节点选择中涉及的群集节点(RedisClusterNode)。 你可以从AsyncExecutions获取涉及的RedisClusterNodes 集合和所有结果作为CompletableFuture 。

节点选择API是技术预览,可以随时更改。 该方法允许强大的操作,但需要用户的进一步反馈。 因此,随时贡献自己的力量。

1.3.刷新集群拓扑视图

Redis群集配置可能会在运行时更改。 可以添加新节点,可以更改特定插槽的上游节点(upstream)。 Lettuce透明地处理MOVED和ASK重定向,但是如果有太多命令运行到重定向中,则应刷新集群拓扑(topology)视图。 拓扑绑定到RedisClusterClient实例。 一个RedisClusterClient实例创建的所有集群连接共享同一集群拓扑视图。 该视图可以通过三种方式更新:

  • 通过调用 RedisClusterClient.reloadPartitions
  • 根据时间间隔在后台进行定期更新
  • 基于持久断开连接和MOVED/ASK重定向的后台自适应更新

缺省情况下,命令遵循-ASK-MOVED最多重定向5次,直到命令执行失败为止。 后台拓扑更新从通过RedisClusterClient获得的第一个连接开始。

1.4.Redis群集连接对象的连接计数

使用独立Redis,单个连接对象与单个传输连接相关。 Redis Cluster的工作原理有所不同:Redis Cluster的连接对象由多个传输连接组成。 这些是:

  • 默认连接对象(用于key-less命令和发布/订阅消息发布)
  • 每个节点的连接(用于与单个群集节点通信的读/写连接)
  • 使用ReadFrom时:每个只读复制节点的只读连接(只读连接,用于从只读复制节点读取数据)

连接是按需分配的,而不是从最少量的连接开始就预先分配的。 计算单个连接对象的最大传输连接数的公式:

1 + (N * 2)

其中N是群集节点的数量。

除连接对象外,RedisClusterClient使用其他连接进行拓扑刷新。 这些在拓扑刷新时创建,并在获取拓扑后关闭:

  • 集群拓扑刷新的连接集(每个集群节点的连接)

1.5.客户端选项

参见L6.客户端选项

1.6.例子

例子2.连接到Redis集群

RedisURI redisUri = RedisURI.Builder.redis("localhost").withPassword("authentication").build();

RedisClusterClient clusterClient = RedisClusterClient.create(redisUri);
StatefulRedisClusterConnection<String, String> connection = clusterClient.connect();
RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync();

...

connection.close();
clusterClient.shutdown();

示例3.连接到具有多个种子节点的Redis群集

RedisURI node1 = RedisURI.create("node1", 6379);
RedisURI node2 = RedisURI.create("node2", 6379);

RedisClusterClient clusterClient = RedisClusterClient.create(Arrays.asList(node1, node2));
StatefulRedisClusterConnection<String, String> connection = clusterClient.connect();
RedisAdvancedClusterCommands<String, String> syncCommands = connection.sync();

...

connection.close();
clusterClient.shutdown();

例子4.启用定期集群拓扑视图更新

RedisClusterClient clusterClient = RedisClusterClient.create(RedisURI.create("localhost", 6379));

ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                                .enablePeriodicRefresh(10, TimeUnit.MINUTES)
                                .build();

clusterClient.setOptions(ClusterClientOptions.builder()
                                .topologyRefreshOptions(topologyRefreshOptions)
                                .build());
...

clusterClient.shutdown();

例子5.启用自适应集群拓扑视图更新

RedisURI node1 = RedisURI.create("node1", 6379);
RedisURI node2 = RedisURI.create("node2", 6379);

RedisClusterClient clusterClient = RedisClusterClient.create(Arrays.asList(node1, node2));

ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
                                .enableAdaptiveRefreshTrigger(RefreshTrigger.MOVED_REDIRECT, RefreshTrigger.PERSISTENT_RECONNECTS)
                                .adaptiveRefreshTriggersTimeout(30, TimeUnit.SECONDS)
                                .build();

clusterClient.setOptions(ClusterClientOptions.builder()
                                .topologyRefreshOptions(topologyRefreshOptions)
                                .build());
...

clusterClient.shutdown();

例子6.获得一个节点连接

RedisURI node1 = RedisURI.create("node1", 6379);
RedisURI node2 = RedisURI.create("node2", 6379);

RedisClusterClient clusterClient = RedisClusterClient.create(Arrays.asList(node1, node2));
StatefulRedisClusterConnection<String, String> connection = clusterClient.connect();

RedisClusterCommands<String, String> node1 = connection.getConnection("host", 7379).sync();

...
// do not close node1

connection.close();
clusterClient.shutdown();

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐