随着上篇Redis实现分布式锁的完结,Redis的核心知识点也就暂时告一段落了,后续会介绍Redis一些零碎的知识点,比如事务、发布订阅、stream结构等,还有一些基于Redis实现的功能,比如订单到期关闭、排行榜、分布式限流、类似微信的共同关注、推荐关注等功能。我们先从Redis的发布订阅讲起。

一、什么是发布和订阅

Redis发布/订阅是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息,而传递消息的通道为channel。如下图:

client1、2、3都订阅了channel频道,当向这个频道发布消息的时候,client1、2、3都能接收到。

二、发布和订阅相关命令

  • SUBSCRIBE channel [channel....]:订阅一个或多个频道,接收发布到这些频道的消息。

  • PUBLISH channel message:将消息发送到指定的频道,订阅该频道的订阅者会收到消息。

  • PSUBSCRIBE pattern [pattern....]:订阅一个或多个符合给定模式的频道,每个模式以*作为匹配符,比如news.*匹配所有以news.开头的频道(news.global等)。

  • PUNSUBSCRIBE [pattern [pattern...]]:对应PSUBSCRIBE,退订给定模式的频道。

  • PUBSUB subcommand [argument[argument...]]:用于检索有关发布订阅系统的信息,如订阅者数量、频道数量等。

    • PUBSUB CHANNELS [pattern]:列出当前活跃的频道,没pattern参数列出所有活跃频道,反之列出给定模式相匹配的频道。

    • PUBSUB NUMSUB [channel-1...channel-N]:返回给定频道的订阅者数量。

    • PUBSUB NUMPAT:返回订阅模式的数量,也就是使用PSUBSCRIBE订阅的数量。

  • UNSUBSCRIBE [channel [channel1...]]:取消订阅一个或多个频道,如果不指定频道,那么客户端使用SUBSCRIBE命令订阅的所有频道都会被退订。

三、Redis发布订阅与消息队列的区别

使用Redis发布订阅功能确实可以实现一个简易的消息队列,但和真正的消息队列还是有区别的:

  • 消息持久化:

    • 发布订阅:Redis的发布订阅不支持消息持久化,也就是如果订阅者没在线,它们就会错过发布的消息。

    • 消息队列:消息是持久化的,即使没有在线的消费者,消息也不会丢。

  • 通信模式:

    • 发布订阅:发布者将消息发送到特定频道,然后所有订阅了该频道的订阅者都会接收到消息。

    • 消息队列:消息队列通常是一种点对点的模式,每条消息只有一个消费者。

  • 可靠性:

    • 发布订阅:消息发布后即使没有订阅者也不会有影响,因为发布者和订阅者是解耦的,由于没有确认机制,消息丢失的可能性大。

    • 消息队列:消息队列都有确认和重试机制,确保消息的可靠传递,即使消费者离线或故障,消息也不会丢。

  • 使用场景:

    • 发布订阅:适用于实时消息推送、事件通知等场景,因为消息的及时性比可靠性更为重要。

    • 消息队列:适用于任务队列、异步处理、流量削峰等场景,消息的可靠性比较重要。

四、SpringBoot中使用Redis的发布订阅功能

4.1 引入依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

4.2 yaml配置

spring:
  redis:
    host: 127.0.0.1
    port: 6379

4.3 发布类(Publisher)

用于将消息发送到Redis指定的频道上。

@Component
public class RedisPublisher {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    public void publish(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

4.4 订阅者类(Subscriber)

订阅指定频道,并处理该频道的消息。

@Component
public class RedisSubscriber implements MessageListener {
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String body = new String(message.getBody());
        System.out.println("接收到来自:" + channel + "频道的消息体:" + body);
        //处理消息
}

4.5 配置订阅频道(Channel)

@Configuration
public class RedisConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter messageListenerAdapter) {
​
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 设置订阅的频道名
        container.addMessageListener(messageListenerAdapter, new ChannelTopic("橡皮人"));
        return container;
    }
​
    @Bean
    MessageListenerAdapter messageListenerAdapter(RedisSubscriber redisSubscriber) {
        return new MessageListenerAdapter(redisSubscriber);
    }
}

4.6 测试

@RestController
public class PublishController {
    @Autowired
    private RedisPublisher publisher;
    @PostMapping("/publish")
    public void publishMessage(@RequestBody String message) {
        publisher.publish("橡皮人", message);
    }
}

结果:

End:希望对大家有所帮助,如果有纰漏或者更好的想法,请您一定不要吝啬你的赐教🙋。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐