目录

一、消息队列

二、Rabbit MQ常用类、方法

三、简单消息队列模型

四、工作队列模型

1、轮询分发

2、公平分发

五、发布订阅模型(fanout)

六、路由模式(direct)

七、主题模式(topic)

八、rabbitMQ 生产者消息发送可达(事务机制)

九、rabbitMQ 生产者消息发送可达(confirm消息确认机制)

1、waitForConfirms【同步确认】

(1)消息消费者【这个消费者设置了topic模式后面的可以通用】

(2)消息生产者【单条发送】

(3)消息生产者【批量发送】

2、waitForConfirmsOrDie【同步确认】

(1)消息生产者【单条发送】

(2)消息生产者【批量发送】

3、addConfirmListener【异步监听确认】

(1)消息生产者【单条发送】

(2)消息生产者【批量发送】


一、消息队列

1、什么是消息队列MQ

MQ 是 Message Queue 的缩写,消息队列是应用程序和应用程序之间的通信方法。

RabbitMQ是一个开源的,在AMQP基础上完成的,可复用的企业消息系统。

AMQP: 即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

AMQP核心概念

  • Server:又称Broker,接收客户端的连接,实现AMQP实体服务
  • Connection:连接,应用程序与Broker的网络连接
  • Channel:网络信道,几乎所有的操作都在Channel中进行,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。
  • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual host可以有若干个Exchange和Queue,同一个Virtual host里面不能有相同的Exchange和Queue
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列

RabbitMQ中有三种常用的交换机类型:
direct: 如果路由键匹配,消息就投递到对应的队列
fanout:投递消息给所有绑定在当前交换机上面的队列
topic:允许实现有趣的消息通信场景,使得5不同源头的消息能够达到同一个队列。topic队列名称有两个特殊的关键字。
* 可以替换一个单词
# 可以替换所有的单词

  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
  • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者,多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
  • Prefetch count:如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

2、为什么要使用消息队列?

(1)解耦

传统模式:

传统模式的缺点:

  • 系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

中间件模式:

中间件模式的的优点:

  • 将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。

(2)异步

传统模式:

传统模式的缺点:

  • 一些非必要的业务逻辑以同步的方式运行,太耗费时间。

中间件模式:

中间件模式的的优点:

  • 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

(3)削峰

传统模式

传统模式的缺点:

  • 并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

中间件模式:

中间件模式的的优点:

  • 系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

3、使用了消息队列会有什么缺点?

分析:一个使用了MQ的项目,如果连这个问题都没有考虑过,就把MQ引进去了,那就给自己的项目带来了风险。我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防。要记住,不要给公司挖坑!
回答:回答也很容易,从以下两个个角度来答

  • 系统可用性降低:你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低
  • 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

4、RabbitMQ的几种消息队列模型

最基本的模型

工作队列模型

发布订阅模型

路由模型

主题模型

RPC模型

二、Rabbit MQ常用类、方法

1、连接对象

Connection【com.rabbitmq.client.Connection】

方法返回值描述
createChannel()Channel创建一个通道对象

2、通道对象

Channel【com.rabbitmq.client.Channel】

方法返回值描述
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)Queue.DeclareOk声明一个队列(创建一个队列)
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)void向队列中声明一个消息(发送消息)发送给队列或交换机
basicConsume(String queue, boolean autoAck, Consumer callback)String监听队列(消费端监听队列后才能时实获取队列的消息,立刻消费)
basicQos(int prefetchCount)void限制发送消息个数
basicAck(long deliveryTag, boolean multiple)void手动回执一个消息
exchangeDeclare(String exchange, String type)Exchange.DeclareOk声明交换机,常用类型为  fanout(发布订阅模式)、direct(路由模式)、topic(主题模式)
queueBind(String queue, String exchange, String routingKey)Queue.BindOk将队列绑定在交换机上
保证生产者消息可达MQ服务(避免丢失)
txSelect()Tx.SelectOk(事务机制)设置为transaction模式
txCommit()Tx.CommitOk(事务机制)提交事务
txRollback()Tx.RollbackOk(事务机制)回滚事务
confirmSelect()Confirm.SelectOk(消息确认机制)生产者调用confirmSelect 将channel设置为confirm模式

waitForConfirms()

waitForConfirms(long var1)

boolean(消息确认机制)消息确认

waitForConfirmsOrDie()

waitForConfirmsOrDie(long var1)

void(消息确认机制)消息确认【抛异常,内部仍然调用waitForConfirms实现】
addConfirmListener(ConfirmListener var1)(消息确认机制)异步监听发送方确认模式

3、消息获取对象

Consumer【com.rabbitmq.client.Consumer】

DefaultConsumer【com.rabbitmq.client.DefaultConsumer】

new DefaultConsumer(Channel channel)定义消费者,消费队列消息(接收消息)这是一个对象,并且重写里面的 handleDelivery方法
handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)void重写方法。body是消息队列里面的消息

4、创建maven工程,引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.my.RabbitMQ</groupId>
  <artifactId>myrabbitmq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <dependencies>
  	<dependency>
  		<groupId>com.rabbitmq</groupId>
  		<artifactId>amqp-client</artifactId>
  		<version>4.0.2</version>
  	</dependency>
  	<dependency>
  		<groupId>org.slf4j</groupId>
  		<artifactId>slf4j-log4j12</artifactId>
  		<version>1.7.5</version>
  	</dependency>
  	<dependency>
  		<groupId>log4j</groupId>
  		<artifactId>log4j</artifactId>
  		<version>1.2.17</version>
  	</dependency>
  	<dependency>
  		<groupId>junit</groupId>
  		<artifactId>junit</artifactId>
  		<version>4.11</version>
  	</dependency>
  </dependencies>
</project>

5、创建连接工厂类

package com.mmr.rabbitmq.util;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtils {
	/**
	 * 获取MQ的连接
	 * @return
	 * @throws TimeoutException 
	 * @throws IOException 
	 */
	public static Connection getConnection() throws IOException, TimeoutException{
		//定义一个连接工厂
		ConnectionFactory factory=new ConnectionFactory();
		//设置服务地址
		factory.setHost("127.0.0.1");
		//AMQP 5672
		factory.setPort(5672);
		//vhost
		factory.setVirtualHost("/virtual_host");
		//用户名
		factory.setUsername("marvin");
		//密码
		factory.setPassword("123");
		
		return factory.newConnection();
	}
}

三、简单消息队列模型

1、创建消息生产者

package com.mmr.rabbitmq01.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
	//队列名
	private static final String QUEUE_NAME="test_simple_queue";

	public static void main(String[] args) throws IOException, TimeoutException {
		//获取连接
		Connection connection=ConnectionUtils.getConnection();
		//从连接中创建通道
		Channel channel=connection.createChannel();
		//声明(创建)队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//发送的消息
		String msg="hello simple";
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		
		System.out.println("--send queue:"+msg);
		//关闭通道和连接
		channel.close();
		connection.close();
		
	}
}

2、消息消费者

消费者有两种方式消费消息队列

  • DefaultConsumer(推荐使用)
  • QueueingConsumer(旧的,不推荐使用)
package com.mmr.rabbitmq01.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * 消费者获取消息
 * @author Marvin
 *
 */
public class Recv {
	private static final String QUEUE_NAME="test_simple_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//		oldAPI();
		newAPI();
	}
	/**
	 * 新的方式(DefaultConsumer),推荐使用
	 */
	private static void newAPI() throws IOException, TimeoutException {
		//获取连接
		Connection connection=ConnectionUtils.getConnection();
		//创建频道
		Channel channel=connection.createChannel();
		//队列声明
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//定义消费者
		DefaultConsumer consumer=new DefaultConsumer(channel){
			//获取到达消息
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg=new String(body,"utf-8");
				System.out.println("new api recv:"+msg);
			}
		};
		//监听队列
		channel.basicConsume(QUEUE_NAME, true, consumer);
	}
	/**
	 * 旧的方式(QueueingConsumer),不推荐使用
	 */
	private static void oldAPI() throws IOException, TimeoutException, InterruptedException {
		//获取连接
		Connection connection=ConnectionUtils.getConnection();
		//创建频道
		Channel channel=connection.createChannel();
		//定义队列的消费者
		QueueingConsumer consumer=new QueueingConsumer(channel);
		//监听队列
		channel.basicConsume(QUEUE_NAME, true, consumer);
		while(true){
			Delivery delivery=consumer.nextDelivery();
			String msgString=new String(delivery.getBody());
			System.out.println("[recv] msg:"+msgString);
		}
	}
}

3、测试结果总结

运行java类:

可以看到 send 类一发出消息,recv 就立即消费了队列消息(因为消费者监听了这个队列)

我们的队列信息可以通过http://localhost:15672可视化平台看到我们定义的队列信息(队列名:test_simple_queue;虚拟主机:/virtual_host

四、工作队列模型

工作队列分为两种:

  1. 轮询分发:queue 向两个消费者交替分发消息,不考虑消费者处理消息的效率高低,是否完成。
  2. 公平分发:queue 向两个消费者每次发送一个消息,等到消费者处理完成后手动向队列回执一个确认信息,队列收到确认信息后再向消费者发送下一个消息。(消费者完成后才获取下一个消息,处理效率高的消费者相对来说会从队列中获取更多的消息,能者多劳)

1、轮询分发

(1)消息生产者

package com.mmr.rabbitmq02.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 轮询分发
 * 				|----clinet1
 * P----Queue---|
 * 				|----clinet2
 */
public class Send {
	private static final String QUEUE_NAME="test_work_queue";

	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		// TODO Auto-generated method stub
		//获取连接
		Connection connection=ConnectionUtils.getConnection();
		//获取channel
		Channel channel=connection.createChannel();
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		
		for(int i=0;i<10;i++){
			String s="hello:"+i;
			System.out.println("[work send] i="+i);
			channel.basicPublish("", QUEUE_NAME, null, s.getBytes());
			Thread.sleep(2*10);
		}
		channel.close();
		connection.close();
	}
}

(2)消费者1

3秒处理一个,处理效率低

package com.mmr.rabbitmq02.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv1 {
	private static final String QUEUE_NAME="test_work_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		//获取channel
		Channel channel=connection.createChannel();
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//定义一个消费者
		Consumer consumer=new DefaultConsumer(channel){
			//消息到达触发这个方法
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg=new String(body,"utf-8");
				System.out.println("[1] Recv msg:"+msg);
				try {
					Thread.sleep(3*1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally {
					System.out.println("[1] done");
				}
			}
		};
		//监听
		boolean autoAck =true;
		channel.basicConsume(QUEUE_NAME, autoAck, consumer);
	}
}

(3)消费者2

1秒处理一个,处理效率高

package com.mmr.rabbitmq02.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv2 {
	private static final String QUEUE_NAME="test_work_queue";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		//获取channel
		Channel channel=connection.createChannel();
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//定义一个消费者
		Consumer consumer=new DefaultConsumer(channel){
			//消息到达触发这个方法
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg=new String(body,"utf-8");
				System.out.println("[2] Recv msg:"+msg);
				try {
					Thread.sleep(1*1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally {
					System.out.println("[2] done");
				}
			}
		};
		//监听
		boolean autoAck =true;
		channel.basicConsume(QUEUE_NAME, autoAck, consumer);
	}
}

(3)测试结果总结

总结:两个消费者处理效率不同,但是队列仍会交替将消息发送给两个消费者,不管你是否处理完成。

2、公平分发

每个消费者发送确认消息之前,消息队列不发送下一个消息给消费者,一次只处理一个消息

  • basicQos():限制发送消息个数,限制发送给同一个消费者不得超过一条消息
  • basicAck():手动回执一个消息

注意:使用公平分发消费者一定要关闭自动应答,手动回执才有效

boolean autoAck =false;//关闭自动应答
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

(1)消息生产者

package com.mmr.rabbitmq03.work.fair;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 公平分发
 * 使用公平分发必须关闭自动应答
 * 				|----clinet1
 * P----Queue---|
 * 				|----clinet2
 */
public class Send {
	private static final String QUEUE_NAME="test_work_queue";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		// TODO Auto-generated method stub
		//获取连接
		Connection connection=ConnectionUtils.getConnection();
		//获取channel
		Channel channel=connection.createChannel();
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		/**
		 * 每个消费者发送确认消息之前,消息队列不发送下一个消息给消费者,一次只处理一个消息
		 * 
		 * 限制发送给同一个消费者不得超过一条消息
		 * basicQos()限制发送消息个数
		 */
		int prefetchCount=1;
		channel.basicQos(prefetchCount);
		
		for(int i=0;i<10;i++){
			String s="hello:"+i;
			System.out.println("[work send] i="+i);
			channel.basicPublish("", QUEUE_NAME, null, s.getBytes());
			Thread.sleep(2*10);
		}
		channel.close();
		connection.close();
	}
}

(2)消费者1(关闭自动应答)

3秒处理一个,处理效率

package com.mmr.rabbitmq03.work.fair;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv1 {
	private static final String QUEUE_NAME="test_work_queue";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		//获取channel
		final Channel channel=connection.createChannel();
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//限制发送消息个数,保证一次 只发一个
		channel.basicQos(1);
		//定义一个消费者
		Consumer consumer=new DefaultConsumer(channel){
			//消息到达触发这个方法
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg=new String(body,"utf-8");
				System.out.println("[1] Recv msg:"+msg);
				try {
					Thread.sleep(3*1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally {
					System.out.println("[1] done");
					//手动回执一个消息
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		//监听,一定要关闭自动应答
		boolean autoAck =false;
		channel.basicConsume(QUEUE_NAME, autoAck, consumer);
	}
}

(3)消费者2(关闭自动应答)

1秒处理一个,处理效率高

package com.mmr.rabbitmq03.work.fair;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv2 {
	private static final String QUEUE_NAME="test_work_queue";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		//获取channel
		final Channel channel=connection.createChannel();
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		channel.basicQos(1);
		//定义一个消费者
		Consumer consumer=new DefaultConsumer(channel){
			//消息到达触发这个方法
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg=new String(body,"utf-8");
				System.out.println("[2] Recv msg:"+msg);
				try {
					Thread.sleep(1*1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally {
					System.out.println("[2] done");
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		//监听,一定要关闭自动应答
		boolean autoAck =false;
		channel.basicConsume(QUEUE_NAME, autoAck, consumer);
	}
}

(3)测试结果总结

总结:两个消费者处理效率不同,队列一次只向消费者发送一个消息,直到的收到消息回执后才发送下一个消息。

所以可以看到执行效率高的Recv2处理的消息更多。

五、发布订阅模型(fanout)

解读:

  • 一个生产者,多个消费者。
  • 每一个消费者都有自己的队列
  • 生产者没有直接把消息发送到队列,而是发到了交换机(转发器)exchange
  • 每个队列都要绑定到交换机上
  • 生产者发送的消息,经过交换机,到达队列,就能实现一个消息被多个消费者消费

1、消息生产者(声明交换机 fanout 模式

消息发送至 exchange(交换机) 而非先前的 queue(队列)

//声明交换机 fanout 模式
channel.exchangeDeclare(EXCHANGE_NANE, "fanout");//分发
//发送消息,发送到交换机(basicPublish第二个参数""是空字符串,表示发送给所有绑定的队列)
String msg="hello ps";
channel.basicPublish(EXCHANGE_NANE, "", null, msg.getBytes());

package com.mmr.rabbitmq04.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 发布订阅模式
 * channel.exchangeDeclare()交换机
 * 一个生产者消息发送到交换机,交换机分两个消息队列分别发送给两个消费者
 * 实现了一个任务后同时发送邮件和短信的功能
 */
public class Send {
	//交换机名
	private static final String EXCHANGE_NANE="test_exchange_fanout";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//声明交换机
		channel.exchangeDeclare(EXCHANGE_NANE, "fanout");//分发
		
		//发送消息,发送到交换机
		String msg="hello ps";
		channel.basicPublish(EXCHANGE_NANE, "", null, msg.getBytes());
		System.out.println("send "+msg);
		
		channel.close();
		connection.close();
	}
}

2、消费者SMS(绑定队列和交换机)

//消费者绑定交换机(转发器),将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NANE, "");

package com.mmr.rabbitmq04.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

public class Recv1SMS {
	//交换机名
	private static final String EXCHANGE_NANE="test_exchange_fanout";
	//队列名
	private static final String QUEUE_NAME="test_fanout_sms";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		final Channel channel=connection.createChannel();
		//先声明交换机
		channel.exchangeDeclare(EXCHANGE_NANE, "fanout");//分发
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//绑定交换机(转发器)将队列绑定到交换机
		channel.queueBind(QUEUE_NAME, EXCHANGE_NANE, "");
		Consumer consumer=new DefaultConsumer(channel){
			public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
				String s=new String(body,"utf-8");
				System.out.println("[2] Recv SMS msg:"+s);
				//手动回执一个消息
				channel.basicAck(envelope.getDeliveryTag(), false);
			};
		};
		channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}

3、消费者 Email(绑定队列和交换机)

package com.mmr.rabbitmq04.ps;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

public class Recv1Email {

	private static final String EXCHANGE_NANE="test_exchange_fanout";
	private static final String QUEUE_NAME="test_fanout_email";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		final Channel channel=connection.createChannel();
		channel.exchangeDeclare(EXCHANGE_NANE, "fanout");
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//绑定交换机(转发器)
		channel.queueBind(QUEUE_NAME, EXCHANGE_NANE, "");
		Consumer consumer=new DefaultConsumer(channel){
			public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
				String s=new String(body,"utf-8");
				System.out.println("[1] Recv Email msg:"+s);
				//手动回执一个消息
				channel.basicAck(envelope.getDeliveryTag(), false);
			};
		};
		channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}

4、测试结果总结

总结:生产者将消息发送到交换机,而非先前的队列。

每个消费者都有一个自己的队列,并将队列和交换机进行绑定。

这样发送到交换机的消息就会同时发送给msm 和 email的队列,两个队列都拿到了这个消息,并通过消费者消费消息。

六、路由模式(direct)

1、消息生产者(声明交换机 direct 模式)

//声明路由 direct 模式
channel.exchangeDeclare(EXCHANGE_NANE, "direct");

//error路由(通过routingKey定义,消费者通过key接收特定的消息)
String msgerr="hello direct err";
String
routingKeyErr="error";
channel.basicPublish(EXCHANGE_NANE, routingKeyErr, null, msgerr.getBytes());

package com.mmr.rabbitmq05.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * direct
 * 路由模式,通过routingKey定义,消费者通过key接收特定的消息
 */
public class Send {
	private static final String EXCHANGE_NANE="test_exchange_direct";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//声明路由
		channel.exchangeDeclare(EXCHANGE_NANE, "direct");
		//error路由(通过routingKey定义,消费者通过key接收特定的消息)
		String msgerr="hello direct err";
		String routingKeyErr="error";
		channel.basicPublish(EXCHANGE_NANE, routingKeyErr, null, msgerr.getBytes());
		//info路由
		String msginfo="hello direct info";
		String routingKeyInfo="info";
		channel.basicPublish(EXCHANGE_NANE, routingKeyInfo, null, msginfo.getBytes());
		//warning路由
		String msgwarning="hello direct warning";
		String routingKeyWarning="warning";
		channel.basicPublish(EXCHANGE_NANE, routingKeyWarning, null, msgwarning.getBytes());
		
		channel.close();
		connection.close();
	}
}

2、消费者1(只接收error消息)

//绑定
channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, "error");

package com.mmr.rabbitmq05.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

public class Recv1 {
	private static final String EXCHANGE_NANE="test_exchange_direct";
	private static final String QUEUE_NANE="test_direct_queue_1";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		final Channel channel=connection.createChannel();
		channel.basicQos(1);
		//声明路由
		channel.exchangeDeclare(EXCHANGE_NANE, "direct");
		//声明队列
		channel.queueDeclare(QUEUE_NANE, false, false, false, null);
		//绑定
		channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, "error");
		Consumer consumer=new DefaultConsumer(channel){
			public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
				String s=new String(body,"utf-8");
				System.out.println("[1 err] Recv msg:"+s);
				channel.basicAck(envelope.getDeliveryTag(), false);
			};
		};
		channel.basicConsume(QUEUE_NANE, false, consumer);
	}
}

3、消费者2(接收info_err_warning消息)

package com.mmr.rabbitmq05.routing;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

public class Recv2 {
	private static final String EXCHANGE_NANE="test_exchange_direct";
	private static final String QUEUE_NANE="test_direct_queue_2";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		final Channel channel=connection.createChannel();
		channel.basicQos(1);
		//声明路由
		channel.exchangeDeclare(EXCHANGE_NANE, "direct");
		//声明队列
		channel.queueDeclare(QUEUE_NANE, false, false, false, null);
		channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, "error");
		channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, "info");
		channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, "warning");
		Consumer consumer=new DefaultConsumer(channel){
			public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
				String s=new String(body,"utf-8");
				System.out.println("[2 info_err_warning] Recv msg:"+s);
				channel.basicAck(envelope.getDeliveryTag(), false);
			};
		};
		channel.basicConsume(QUEUE_NANE, false, consumer);
	}
}

4、测试结果总结

总结:通过队列绑定key来确定该队列具体接收的消息, Recv1里面只绑定了err信息,而Recv2里面绑定了info err warning三个信息

七、主题模式(topic)

1、消息生产者(声明交换机 topic 模式)

//声明交换机 topic 模式
channel.exchangeDeclare(EXCHANGE_NANE, "topic");

package com.mmr.rabbitmq06.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * topic
 * 路由匹配
 */
public class Send {
	private static final String EXCHANGE_NANE="test_exchange_topic";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//声明交换机 topic 模式
		channel.exchangeDeclare(EXCHANGE_NANE, "topic");
		
		String msg="商品....添加";
		String routingKey="goods.add";
		channel.basicPublish(EXCHANGE_NANE, routingKey, null, msg.getBytes("utf-8"));
		
		String msgd="商品....删除";
		String routingKeyd="goods.delete";
		channel.basicPublish(EXCHANGE_NANE, routingKeyd, null, msgd.getBytes("utf-8"));
		System.out.println("---send:"+msg);
		
		channel.close();
		connection.close();
	}
}

2、消费者1(只接收goods.add消息,和原先一样)

//绑定
channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, "goods.add");

package com.mmr.rabbitmq06.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

public class Recv1 {
	private static final String EXCHANGE_NANE="test_exchange_topic";
	private static final String QUEUE_NANE="test_topic_queue_1";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		final Channel channel=connection.createChannel();
		channel.basicQos(1);
		//声明路由
		channel.exchangeDeclare(EXCHANGE_NANE, "topic");
		//声明队列
		channel.queueDeclare(QUEUE_NANE, false, false, false, null);
		//绑定
		channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, "goods.add");
		Consumer consumer=new DefaultConsumer(channel){
			public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
				String s=new String(body,"utf-8");
				System.out.println("[1 goods.add] Recv msg:"+s);
				channel.basicAck(envelope.getDeliveryTag(), false);
			};
		};
		channel.basicConsume(QUEUE_NANE, false, consumer);
	}
}

3、消费者2(接收匹配符绑定)

package com.mmr.rabbitmq06.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

public class Recv2 {
	private static final String EXCHANGE_NANE="test_exchange_topic";
	private static final String QUEUE_NANE="test_topic_queue_2";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		final Channel channel=connection.createChannel();
		channel.basicQos(1);
		//声明路由(修改类型为topic)
		channel.exchangeDeclare(EXCHANGE_NANE, "topic");
		//声明队列
		channel.queueDeclare(QUEUE_NANE, false, false, false, null);
		//绑定方式(变为匹配符绑定)
		channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, "goods.#");
		Consumer consumer=new DefaultConsumer(channel){
			public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
				String s=new String(body,"utf-8");
				System.out.println("[1 goods.#] Recv msg:"+s);
				channel.basicAck(envelope.getDeliveryTag(), false);
			};
		};
		channel.basicConsume(QUEUE_NANE, false, consumer);
	}
}

4、测试结果总结

总结:交换机设置为 topic 模式后 Recv1 还可以使用老方法 key 全名,Recv2 可以使用通配符匹配,绑定消息

八、rabbitMQ 生产者消息发送可达(事务机制)

在rabbitmq中,我们可以通过持久化数据 解决rabbitmq 服务器异常 的数据丢失问题

问题: 生产者将消息发送出去之后,消息到底有没有到达rabbitmq 服务器,默认的情况是不知道的

两种方式:

  • AMQP实现了事务机制
  • Confirm模式

事务机制(txSelect、txCommit、txRollback)

  • txSelect 用户将当前 channel 设置成 transaction 模式
  • txCommit用于提交事务
  • txRollback回滚事务

1、消息生产者

package com.mmr.rabbitmq07.transaction;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class TxSend {
	private static final String QUEUE_NANE="test_queue_Transaction";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		channel.queueDeclare(QUEUE_NANE, false, false, false, null);
		String msg="hello transaction message";
		try {
			//设置为transaction模式
			channel.txSelect();
			channel.basicPublish("", QUEUE_NANE, null, msg.getBytes());
			//提交事务
			channel.txCommit();
			System.out.println("commit");
		} catch (Exception e) {
			//回滚事务
			channel.txRollback();
			System.out.println("rollback");
		}
		channel.close();
		connection.close();
	}
}

2、消息消费者

package com.mmr.rabbitmq07.transaction;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

public class TxRecv1 {
	private static final String QUEUE_NANE="test_queue_Transaction";
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		channel.queueDeclare(QUEUE_NANE, false, false, false, null);
		Consumer consumer=new DefaultConsumer(channel){
			public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
				String s=new String(body,"utf-8");
				System.out.println("[TX] Recv msg:"+s);
			};
		};
		channel.basicConsume(QUEUE_NANE, true, consumer);
	}
}

九、rabbitMQ 生产者消息发送可达(confirm消息确认机制)

开启confirm模式:channel.confirmSelect()

confirm的三种实现方式

方法描述
waitForConfirms()同步确认模式,支持单条和批量确认,等待所有消息确认,如果所有的消息都被服务端成功接收 返回true,只要有一条没有被成功接收就 返回false
waitForConfirmsOrDie()同步确认模式,支持单条和批量确认也是等待所有消息确认,区别在于该方法没有返回值(Void),如果有任意一条消息没有被成功接收,该方法会立即抛出一个IOException类型异常,并且在抛异常前会关闭信道channel.close();【此方法内部其实仍然是调用waitForConfirms实现】
addConfirmListener()异步监听发送方确认模式

1、waitForConfirms【同步确认】

编程模式

  1. 普通 发一条 waitForConfirms()
  2. 批量的 发一批 waitForConfirms()

官方文档描述:等待直到上次调用之后发布的所有消息被broker ack或nack。注意,当在非confirm通道上调用waitforconfirm时,会抛出一个IllegalStateException。 

(1)消息消费者【这个消费者设置了topic模式后面的可以通用】

package com.mmr.rabbitmq08.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RecvConfirm {
	public static final String EXCHANGE_NANE = "test_exchange_confirm";
	public static final String QUEUE_NANE="test_queue_confirm";
	public static final String ROUTING_KEY = "confirm.#";

	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();

		//声明交换机和队列,然后进行绑定设置路由Key
		channel.exchangeDeclare(EXCHANGE_NANE, "topic", true);
		channel.queueDeclare(QUEUE_NANE, false, false, false, null);
		channel.queueBind(QUEUE_NANE, EXCHANGE_NANE, ROUTING_KEY);

		Consumer consumer=new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				String msg=new String(body,"utf-8");
				System.out.println("【confirm】 RecvConfirm msg:"+msg);
			}
		};
		channel.basicConsume(QUEUE_NANE, true, consumer);
	}
}

(2)消息生产者【单条发送】

package com.mmr.rabbitmq08.confirm.waitForConfirms;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.mmr.rabbitmq08.confirm.RecvConfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 单条发送,waitForConfirms确认
 */
public class SendSingle {
	private static final String ROUTING_KEY = "confirm.single_waitForConfirms";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//生产者调用confirmSelect 将channel设置为confirm模式
		channel.confirmSelect();
		//发送消息
		String msg="hello confrom message【single waitForConfirms】";
		channel.basicPublish(RecvConfirm.EXCHANGE_NANE, ROUTING_KEY, null, msg.getBytes());
		//确认消息(可以同时确认多条消息)是否发送成功
		if(channel.waitForConfirms()){
			System.out.println("message send success【single waitForConfirms】");
		}else{
			System.out.println("message send failed【single waitForConfirms】");
		}
		channel.close();
		connection.close();
	}
}

(3)消息生产者【批量发送】

package com.mmr.rabbitmq08.confirm.waitForConfirms;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.mmr.rabbitmq08.confirm.RecvConfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 批量发送,waitForConfirms确认
 */
public class SendBatch {
	private static final String ROUTING_KEY = "confirm.batch_waitForConfirms";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//生产者调用confirmSelect 将channel设置为confirm模式
		channel.confirmSelect();
		//批量发送
		for(int i=0;i<10;i++){
			String msg="hello confrom message【batch waitForConfirms】=="+i;
			channel.basicPublish(RecvConfirm.EXCHANGE_NANE, ROUTING_KEY, null, msg.getBytes());
		}
		//确认消息(可以同时确认多条消息)是否发送成功
		if(channel.waitForConfirms()){
			System.out.println("message send success【batch waitForConfirms】");
		}else{
			System.out.println("message send failed【batch waitForConfirms】");
		}
		channel.close();
		connection.close();
	}
}

2、waitForConfirmsOrDie【同步确认】

编程模式

  1. 普通 发一条 waitForConfirmsOrDie()
  2. 批量的 发一批 waitForConfirmsOrDie()

源码内部调用了 waitForConfirm()

官方文档描述:等待直到上次调用之后发布的所有消息被broker ack或nack。如果任何消息被锁定,waitForConfirmsOrDie将抛出IOException。当在非confirm通道上调用时,它将抛出一个IllegalStateException。

(1)消息生产者【单条发送】

package com.mmr.rabbitmq08.confirm.waitForConfirmsOrDie;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.mmr.rabbitmq08.confirm.RecvConfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 单条发送,waitForConfirmsOrDie确认
 */
public class SendSingle {
	private static final String ROUTING_KEY = "confirm.single_waitForConfirmsOrDie";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//生产者调用confirmSelect 将channel设置为confirm模式
		channel.confirmSelect();
		//发送消息
		String msg="hello confrom message【single waitForConfirmsOrDie】";
		channel.basicPublish(RecvConfirm.EXCHANGE_NANE, ROUTING_KEY, null, msg.getBytes());
		//确认消息(可以同时确认多条消息)是否发送成功【通过异常来判断,源码是通过调用waitForConfirms】
		try {
			channel.waitForConfirmsOrDie();
			System.out.println("message send success【single waitForConfirmsOrDie】");
		} catch (IOException e) {
			e.printStackTrace();
			System.out.println("message send failed【single waitForConfirmsOrDie】");
		}

		channel.close();
		connection.close();
	}
}

(2)消息生产者【批量发送】

package com.mmr.rabbitmq08.confirm.waitForConfirmsOrDie;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.mmr.rabbitmq08.confirm.RecvConfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 批量发送,waitForConfirmsOrDie确认
 */
public class SendBatch {
	private static final String ROUTING_KEY = "confirm.batch_waitForConfirmsOrDie";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//生产者调用confirmSelect 将channel设置为confirm模式
		channel.confirmSelect();
		//批量发送
		for(int i=0;i<10;i++){
			String msg="hello confrom message【batch waitForConfirmsOrDie】=="+i;
			channel.basicPublish(RecvConfirm.EXCHANGE_NANE, ROUTING_KEY, null, msg.getBytes());
		}
		//确认消息(可以同时确认多条消息)是否发送成功【通过异常来判断,源码是通过调用waitForConfirms】
		try {
			channel.waitForConfirmsOrDie();
			System.out.println("message send success【batch waitForConfirmsOrDie】");
		} catch (IOException e) {
			e.printStackTrace();
			System.out.println("message send failed【batch waitForConfirmsOrDie】");
		}

		channel.close();
		connection.close();
	}
}

3、addConfirmListener【异步监听确认】

(1)消息生产者【单条发送】

package com.mmr.rabbitmq08.confirm.addConfirmListener;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.mmr.rabbitmq08.confirm.RecvConfirm;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 单条发送,addConfirmListener异步监听确认
 */
public class SendSingle {
	private static final String ROUTING_KEY = "confirm.addConfirmListener";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//生产者调用confirmSelect 将channel设置为confirm模式
		channel.confirmSelect();
		String msg="hello confrom message【single addConfirmListener】";
		channel.basicPublish(RecvConfirm.EXCHANGE_NANE, ROUTING_KEY, null, msg.getBytes());
		//异步监听确认是否发送成功
		channel.addConfirmListener(new ConfirmListener() {
			/**
			 * 消息成功处理
			 * @param deliveryTag:唯一消息标签
			 * @param multiple:是否批量
			 * @throws IOException
			 */
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("message send success【single addConfirmListener】:deliveryTag【"+ deliveryTag +"】multiple【"+ multiple +"】");
			}

			/**
			 * 消息失败处理
			 * @param deliveryTag:唯一消息标签
			 * @param multiple:是否批量
			 * @throws IOException
			 */
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("message send failed【single addConfirmListener】:deliveryTag【"+ deliveryTag +"】multiple【"+ multiple +"】");
			}
		});

		//给监听一点时间再关闭
		Thread.sleep(5000);
		channel.close();
		connection.close();
	}
}

(2)消息生产者【批量发送】

package com.mmr.rabbitmq08.confirm.addConfirmListener;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.mmr.rabbitmq08.confirm.RecvConfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 批量发送,addConfirmListener异步监听确认
 */
public class SendBatch {
	private static final String ROUTING_KEY = "confirm.addConfirmListener";
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection=ConnectionUtils.getConnection();
		Channel channel=connection.createChannel();
		//生产者调用confirmSelect 将channel设置为confirm模式
		channel.confirmSelect();
		//批量发送
		for(int i=0;i<50;i++){
			String msg="hello confrom message【batch addConfirmListener】=="+i;
			channel.basicPublish(RecvConfirm.EXCHANGE_NANE, ROUTING_KEY, null, msg.getBytes());
		}

		//异步监听确认是否发送成功
		channel.addConfirmListener(new ConfirmListener() {
			/**
			 * 消息成功处理
			 * @param deliveryTag:唯一消息标签
			 * @param multiple:是否批量
			 * @throws IOException
			 */
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("message send success【batch addConfirmListener】:deliveryTag【"+ deliveryTag +"】multiple【"+ multiple +"】");
			}

			/**
			 * 消息失败处理
			 * @param deliveryTag:唯一消息标签
			 * @param multiple:是否批量
			 * @throws IOException
			 */
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("message send failed【batch addConfirmListener】:deliveryTag【"+ deliveryTag +"】multiple【"+ multiple +"】");
			}
		});


		//给监听一点时间再关闭
		Thread.sleep(10000);
		channel.close();
		connection.close();
	}
}

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐