场景1:单发送单接收

使用场景:简单的发送与接收,没有特别的处理。

一个P向queue发送一个message,一个C从该queue接收message并打印。producer,连接至RabbitMQ Server,声明队列,发送message,关闭连接,退出。 

场景2:单发送多接收

使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息

场景3:Publish/Subscribe(发布、订阅模式)

发送端发送广播消息,多个接收端接收。应用场景挺多的。比如客户下单后,会发送消息告诉客户下单成功并同时通知仓库出货等。

使用"fanout"方式发送,即广播消息,不需要使用queue,发送端不需要关心谁接收 。

场景4:路由 Routing (按路线发送接收)

使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。

在这个场景中,我们可以看到直连交换机 X 和两个队列进行了绑定。第一个队列使用 orange 作为绑定键,第二个队列有两个绑定,一个使用 black 作为绑定键,另外一个使用 green。这样以来,当路由键为 orange 的消息发布到交换机,就会被路由到队列 Q1。路由键为 black 或者 green 的消息就会路由到 Q2。其他的所有消息都将会被丢弃。

多个绑定(Multiple bindings)

多个队列使用相同的绑定键是合法的。这个例子中,我们可以添加一个 X 和 Q1 之间的绑定,使用 black 绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有 black 路由键的消息会同时发送到 Q1 和 Q2。

场景5:主题交换机Topics (按topic发送接收)

使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。

  • 模型组成相较前几种没有什么变化,一个生产者P,一个交换机X,多个消息队列Q以及多个消费者C

  • 在Exchange派发消息到消息队列Queue所用的规则不同,我们看到了有符号"*"以及"#",可以认为是通配符

  • "*"用于匹配一个单词,比如"a","abc"等;"#"用于匹配0个或者多个单词,比如"", "abc", "abc.def"等

如下图所示,三个绑定键(binding key)的匹配规则分别是“*.orange.*”“*.*.rabbit ”和 “lazy.#”,则quick.orange.rabbit会被发送到C2,lazy.orange.elephant会被发送给C1

主题交换机是很强大的,它可以表现出跟其他交换机类似的行为,当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息(相当于fanout exchange)。当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

场景6:远程过程调用 PRC

尽管 RPC 在计算领域是一个常用模式,但它也经常被诟病。当一个问题被抛出的时候,程序员往往意识不到这到底是由本地调用还是由较慢的 RPC 调用引起的

我们的 RPC 如此工作:

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。

  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。

  • 将请求发送到一个 rpc_queue 队列中。

  • RPC 工作者(又名:服务器)等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。

  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用。 整合到一起

场景7:发布者确认

Publisher Confirms是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,代理将异步确认客户端发布的消息,这意味着它们已在服务器端处理。

生产者与broker之间的消息确认称为public confirms,public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功

  • 生产者与broker之间的消息可靠性保证的基本思路就是当消息发送到broker的时候,会执行监听的回调函数,其中deliveryTag是消息id(deliveryTag投递的标识,当Channel设置成confirm模式时,发布的每一条消息都会获得一个唯一的deliveryTag,任何channel上发布的第一条消息的deliveryTag为1,此后的每一条消息都会加1,deliveryTag在channel范围内是唯一的。)在同一个channel中这个数值是递增的,而multiple表示是否批量确认消息。
  • 在生产端要维护一个消息发送的表,消息发送的时候记录消息id,在消息成功落地broker磁盘并且进行回调确认(ack)的时候,根据本地消息表和回调确认的消息id进行对比,这样可以确保生产端的消息表中的没有进行回调确认(或者回调确认时网络问题)的消息进行补救式的重发,当然不可避免的就会在消息端可能会造成消息的重复消息。针对消费端重复消息,在消费端进行幂等处理。(丢消息和重复消息是不可避免的二个极端,比起丢消息,重复消息还有补救措施,而消息丢失就真的丢失了

四种交换机介绍

  • 直连交换机(Direct exchange): 具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列

  • 扇形交换机(Fanout exchange): 广播消息到所有队列,没有任何处理,速度最快

  • 主题交换机(Topic exchange): 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,*代表一个单词,#代表多个单词

  • 首部交换机(Headers exchange): 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则

Rabbitmq如何保证消息不丢失

  • 生产者提交给消息服务器时,使用确认机制
  • 消息服务器对应的队列、交换机等都持久化,保证数据的不丢失
  • 消费者采用消息确认机制,保证数据的不丢失

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐