前端时间系统改造,要使用MQ,开始MQ版本没定。自己就把ActiveMQ,RabbitMQ,RocketMQ的入门都粗略的学习一遍。现在老大已决定在给客户演示的时候用RocketMQ,然而上线的时候却要用非开源的,例如某东的JMQ,某Q的CMQ。现在版本已出,测试通过,无聊中,顺便整理一下前段时间自己的学习成果。

ActiveMQ入门

http://www.cnblogs.com/zhuxiaojie/p/5564187.html

1JMS

1.1JMS规范

连接工厂:连接工厂是客户用来创建连接的对象。

连接:封装了客户与JMS提供者之间的一个虚拟连接。

会话:生产和消费消息的一个单线程上下文。

目的地:客户用来指定生产消息的目标,和消费消息的数据来源。

消息生产者:由会话创建的一个对象,用于把消息发送到一个目的地。

消息发送者:由会话创建的一个对象,用于接收发送到目的地的对象。

消息:消息包含了消息头,消息属性,消息体。

1.2JMS的可靠性

ACK机制:只有在被确认之后,才能被认为成功的消费了。

持久性:ActiveMQ的持久性可以用,data logs,kaha ,jdbc

消息过期:

临时目的地:

持久订阅:

本地事物:

2.ActiveMQ

2.1ActiveMQ的通讯方式

2.2 ActiveMQ特色

2.2.1 独占消费

queue中的消息是按照顺序被分发到consumers中的,而独占消费就是在很多consumer种只选择一个consumer来处理queue中的所有消息。

2.2.2  消息分组

独占消费的增强,也就是保证了消息属性相同的JMSGroup ID的消息会发送到相同的consumer

2.2.3 JMS selectors

     消息过滤器,基于SQL语法。

2.2.4 Pending Message Limit Strategy

首先简要介绍一下prefetch机制。ActiveMQ通过prefetch机制来提高性能,这意味这客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由prefetch limit来控制。当某个

consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认。

慢消费者会在非持久的topics上导致问题:一旦消息积压起来,会导致broker把大量消息保存在内存中,broker也会因此而变慢。未来ActiveMQ可能会实现磁盘缓存,但是这也还

是会存在性能问题。目前ActiveMQ使用Pending Message Limit Strategy来解决这个问题。除了prefetch buffer之外,你还要配置缓存消息的上限,超过这个上限后,新消息到

来时会丢弃旧消息。通过在配置文件的destination map中配置PendingMessageLimitStrategy,可以为不用的topic namespace配置不同的策略。目前有以下两种:
• ConstantP

endingMessageLimitStrategy。这个策略使用常量限制。 例如:<constantPendingMessageLimitStrategy limit="50"/>
• PrefetchRatePendingMessageLimitStrategy。这个

策略使用prefetch size的倍数限制。 例如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
在以上两种方式中,如果设置0意味着除了prefetch之外不再缓存消

息;如果设置-1意味着禁止丢弃消息。 此外,你还可以配置消息的丢弃策略,目前有以下两种:
• oldestMessageEvictionStrategy。这个策略丢弃最旧的消息。
• oldestMessag

eWithLowestPriorityEvictionStrategy。这个策略丢弃最旧的,而且具有最低优先级的消息。

2.2.5 虚拟队列

topic模式的增强。

它允许用一个虚拟的destination 代表多个destinations。例如你可以通过composite destinations在一个操作中同时向12queue发送消息。

2.2.6 镜像队列

每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。 ActiveMQ支持Mirrored QueuesBroker会把发送到某个que

ue的所有消息转发到一个名称类似的topic,因此监控程序可以订阅这个mirrored queue topic。为了启用Mirrored Queues,首先要将BrokerServiceuseMirroredQue

ues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省是"VirtualTopic.Mirror."

2.2.7 支持同步异步发送

2.2.8 分发策略

2.2.9 消息的游标

http://blog.csdn.net/neareast/article/details/7582947

2.2.10 批量确认

------------个人测试代码

package com.czy.demo.ActiveDemo.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class Receiver {
	Connection connection = null;
	Session session = null;
	MessageConsumer consumer = null;

	@Before
	public void before() throws JMSException {
		// 1.创建ConnectionFactory工厂
		ConnectionFactory factory = new ActiveMQConnectionFactory("czy", "czy", "tcp://localhost:61616");
		// 2.创建Connection
		connection = factory.createConnection();
		connection.setClientID("11");// 持久化订阅要设置ID,
		 //生产者创建的消息类型必须为持久化的
		connection.start();
	}

	@Test
	public void testQueue() throws JMSException {
		// 3.创建section
		// 第一个参数,是否开启事物 第二个参数 ACK的类型
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 4.创建destination
		Destination destination = session.createQueue("queue1");
		consumer = session.createConsumer(destination);
		receive();
		/*
		 * 分别运行receive和receive2的testQueue方法 发现俩个ActiveMQ会把队列中的消息依次发送给消费者,直到发送完毕
		 * 可能出现的结果 receive接受到消息0,2,4 receive2接受到结果1,3
		 */

	}
	
	@Test
	public void testTopic() throws JMSException {
		// 3.创建section
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 4.创建destination
		Destination destination = session.createTopic("topic1");
		consumer = session.createConsumer(destination);
		receive();
		/*
		 * 分别运行receive和receive2的testTopic方法 发现俩个ActiveMQ会把队列中的消息都发送给激活状态下的消费者
		 * 出现的结果 receive接受到消息0,1,2,3,4 receive2接受到结果0,1,2,3,4
		 * 如果消费者后运行,先运行生产者,消费者是接受不到消费的
		 */
	}

	@Test
	public void testAckNowledge() throws JMSException {
		// 3.创建section
		// Session.AUTO_ACKNOWLEDGE 自动提交
		// Session.CLIENT_ACKNOWLEDGE 客户端手动确认
		// Session.DUPS_OK_ACKNOWLEDGE 自动批量提交
		// Session.SESSION_TRANSACTED 在事物的环境中 这种ACK类型,前面的参数要为true
		session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
		// ssession=connection.createSession(Boolean.TRUE,
		// Session.SESSION_TRANSACTED);
		// 4.创建destination
		Destination destination = session.createQueue("queue1");
		consumer = session.createConsumer(destination);
		// receiveWithACK(false);
		// receiveWithACK(true);
		// receiveWithTransaction(null);
		// receiveWithTransaction(false);
		// receiveWithTransaction(true);
		/*
		 * 分别先用前三章种ACK类型,创建session,然后运行receiveWithACK 发现自动提交和自动批量提交,表现一致,
		 * 都是无论是否手动ACK,接受到的消息都会从未消费队列中删除 而手动提交只有提交之后才会在未消费队列中删除
		 * 用事物类型Ack创建session,然后运行receiveWithTransaction,只有显示提交才会从未消费队列中删除
		 * 回滚,或者不处理事物都不会从未消费队列中删除
		 */
	}
	@Test
	public void testDurableSubscriber() throws JMSException {
		// 3.创建session
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// ssession=connection.createSession(Boolean.TRUE, Session.SESSION_TRANSACTED);
		// 4.创建destination
		Topic topic = session.createTopic("topic1");
		consumer = session.createDurableSubscriber(topic, "11");
		receive();
		//先运行生产者生产消息,在运行消费者仍然可以接受到消费者离线前的消息
		//而且如果是客户端,手动ACK,俩个持久化的订阅不会相互影响
		//比如11,手动确认了,22仍然可以接受到消息,直到22也手动确认,事物也是一样
	}
	@Test
	public void testExclusive() throws JMSException{
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue("queue1?consumer.exclusive=true");
		consumer = session.createConsumer(destination);
		receive();
		/**
		 * 测试结果,会从多个consumers中挑选一个consumer来处理queue中所有的消息,
		 * 从而保证了消息的有序处理,而不是分发到各个消费者
		 */
	}
	
	@Test
	public void testSelecor() throws JMSException{
		// 3.创建section
		// 第一个参数,是否开启事物 第二个参数 ACK的类型
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 4.创建destination
		Destination destination = session.createQueue("queue1");
		consumer = session.createConsumer(destination,"number>=2");
		receive();
		/**
		 * 选择器,只会选择符合条件的消息,
		 * 制定注意的是表达式中的属性不会进行自动类型转换
		 */
	}

	public void receiveWithTransaction(Boolean isCommit) throws JMSException {
		receive(null, isCommit);
	}

	public void receiveWithACK(Boolean isAck) throws JMSException {
		receive(isAck, null);
	}

	public void receive() throws JMSException {
		receive(null, null);
	}

	public void receive(Boolean isAck, Boolean isCommit) throws JMSException {
		// 6.消费消息内容
		while (true) {
			TextMessage message = (TextMessage) consumer.receive();
			System.out.println("");
			if (message == null)
				break;
			System.out.println("接受的内容" + message.getText());
			if (isAck!=null && isAck == true) {
				message.acknowledge();
			}
			if (isCommit == null) {

			} else if (isCommit == true) {
				session.commit();
			} else if (isCommit == false) {
				session.rollback();
			}
		}
	}

	@After
	public void close() throws JMSException {
		if (connection != null) {
			connection.close();
		}
	}

}
package com.czy.demo.ActiveDemo.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class Receiver2 {
	Connection connection = null;
	Session session = null;
	MessageConsumer consumer = null;

	@Before
	public void before() throws JMSException {
		// 1.创建ConnectionFactory工厂
		ConnectionFactory factory = new ActiveMQConnectionFactory("czy", "czy", "tcp://localhost:61616");
		// 2.创建Connection
		connection = factory.createConnection();
		connection.setClientID("22");// 持久化订阅要设置ID,
		 //生产者创建的消息类型必须为持久化的
		connection.start();
	}

	@Test
	public void testQueue() throws JMSException {
		// 3.创建section
		// 第一个参数,是否开启事物 第二个参数 ACK的类型
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 4.创建destination
		Destination destination = session.createQueue("queue1");
		consumer = session.createConsumer(destination);
		receive();
		/*
		 * 分别运行receive和receive2的testQueue方法 发现俩个ActiveMQ会把队列中的消息依次发送给消费者,直到发送完毕
		 * 可能出现的结果 receive接受到消息0,2,4 receive2接受到结果1,3
		 */

	}

	@Test
	public void testTopic() throws JMSException {
		// 3.创建section
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 4.创建destination
		Destination destination = session.createTopic("topic1");
		consumer = session.createConsumer(destination);
		receive();
		/*
		 * 分别运行receive和receive2的testTopic方法 发现俩个ActiveMQ会把队列中的消息都发送给激活状态下的消费者
		 * 出现的结果 receive接受到消息0,1,2,3,4 receive2接受到结果0,1,2,3,4
		 * 如果消费者后运行,先运行生产者,消费者是接受不到消费的
		 */
	}

	@Test
	public void testAckNowledge() throws JMSException {
		// 3.创建section
		// Session.AUTO_ACKNOWLEDGE 自动提交
		// Session.CLIENT_ACKNOWLEDGE 客户端手动确认
		// Session.DUPS_OK_ACKNOWLEDGE 自动批量提交
		// Session.SESSION_TRANSACTED 在事物的环境中 这种ACK类型,前面的参数要为true
		session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
		// ssession=connection.createSession(Boolean.TRUE,
		// Session.SESSION_TRANSACTED);
		// 4.创建destination
		Destination destination = session.createQueue("queue1");
		consumer = session.createConsumer(destination);
		// receiveWithACK(false);
		// receiveWithACK(true);
		// receiveWithTransaction(null);
		// receiveWithTransaction(false);
		// receiveWithTransaction(true);
		/*
		 * 分别先用前三章种ACK类型,创建session,然后运行receiveWithACK 发现自动提交和自动批量提交,表现一致,
		 * 都是无论是否手动ACK,接受到的消息都会从未消费队列中删除 而手动提交只有提交之后才会在未消费队列中删除
		 * 用事物类型Ack创建session,然后运行receiveWithTransaction,只有显示提交才会从未消费队列中删除
		 * 回滚,或者不处理事物都不会从未消费队列中删除
		 */
	}
	
	@Test
	public void testExclusive() throws JMSException{
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue("queue1?consumer.exclusive=true");
		consumer = session.createConsumer(destination);
		receive();
	}
	@Test
	public void testDurableSubscriber() throws JMSException {
		// 3.创建session
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// ssession=connection.createSession(Boolean.TRUE, Session.SESSION_TRANSACTED);
		// 4.创建destination
		Topic topic = session.createTopic("topic1");
		consumer = session.createDurableSubscriber(topic, "22");
		receive();
		//先运行生产者生产消息,在运行消费者仍然可以接受到消费者离线前的消息
		//而且如果是客户端,手动ACK,俩个持久化的订阅不会相互影响
		//比如11,手动确认了,22仍然可以接受到消息,直到22也手动确认,事物也是一样
	}
	

	public void receiveWithTransaction(Boolean isCommit) throws JMSException {
		receive(null, isCommit);
	}

	public void receiveWithACK(Boolean isAck) throws JMSException {
		receive(isAck, null);
	}

	public void receive() throws JMSException {
		receive(null, null);
	}

	public void receive(Boolean isAck, Boolean isCommit) throws JMSException {
		// 6.消费消息内容
		while (true) {
			TextMessage message = (TextMessage) consumer.receive();
			System.out.println("");
			if (message == null)
				break;
			System.out.println("接受的内容" + message.getText());
			if (isAck!=null && isAck == true) {
				message.acknowledge();
			}
			if (isCommit == null) {

			} else if (isCommit == true) {
				session.commit();
			} else if (isCommit == false) {
				session.rollback();
			}
		}
	}

	@After
	public void close() throws JMSException {
		if (connection != null) {
			connection.close();
		}
	}

}


package com.czy.demo.ActiveDemo.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * ActiveMQ 版本 5.14.2
 * 
 * @author chenzhiyuan
 *
 */
public class Sender {
	Session session = null;
	MessageProducer producer = null;
	Connection connection = null;

	@Before
	public void before() throws JMSException {
		// 1.创建ConnectionFactory工厂
		ConnectionFactory factory = new ActiveMQConnectionFactory("czy", "czy", "tcp://localhost:61616");
		// 2.创建Connection
		connection = factory.createConnection();
		connection.start();
	}

	@Test
	public void testQueue() throws JMSException {
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		// 4.创建目的地
		Destination des = session.createQueue("queue1");
		// 5.创建生产者
		producer = session.createProducer(des);
		// 6.发送消息
		send();
		// 测试结果在消费者里描述
	}

	@Test
	public void testTopic() throws JMSException {
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		// 4.创建目的地
		Destination des = session.createTopic("topic1");
		// 5.创建生产者
		producer = session.createProducer(des);
		// 6.发送消息
		send();
		// 测试结果在消费者里描述
	}

	@Test
	public void testTransaction() throws JMSException {
		// 3.创建session true开启事物,AUTO_ACKNOWLEDGE 自动提交
		// 这俩种方式创建Session,不影响下面发送消息的结果
		// session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		session = connection.createSession(true, Session.SESSION_TRANSACTED);

		// 4.创建目的地
		Destination des = session.createQueue("queue1");
		// 5.创建生产者
		producer = session.createProducer(des);
		// 6.生产消息
		// sendWithTransaction(true);
		// sendWithTransaction(false);
		// sendWithTransaction(null);
		/*
		 * 依次取消上面注释,分别执行,得到测试结果 在开启事物的时候,只有显示提交时才会有消息进入队列
		 */

	}

	@Test
	public void testNoTransaction() throws JMSException {
		// 3.创建session false不开启事物,AUTO_ACKNOWLEDGE 自动提交
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 这样创建session会报错
		// session = connection.createSession(false,
		// Session.SESSION_TRANSACTED);

		// 4.创建目的地
		Destination des = session.createQueue("queue1");
		// 5.创建生产者
		producer = session.createProducer(des);
		// 6.生产消息
		sendWithTransaction(true);
		// sendWithTransaction(false);
		// sendWithTransaction(null);
		/*
		 * 依次取消上面注释,分别执行,得到测试结果 在未开启事物的时候,显示提交或回滚会报错, 但是提交或者回滚前生产的那条记录,会入队成功。
		 */

	}

	@Test
	public void testAcknowledgeType() throws JMSException {

		// 3.创建session false不开启事物,CLIENT_ACKNOWLEDGE 自动提交
		// Session.AUTO_ACKNOWLEDGE 自动提交
		// Session.CLIENT_ACKNOWLEDGE 客户端手动确认
		// Session.DUPS_OK_ACKNOWLEDGE 自动批量提交
		// Session.SESSION_TRANSACTED 在事物的环境中
		session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		// 4.创建目的地
		Destination des = session.createQueue("queue1");
		// 5.创建生产者
		producer = session.createProducer(des);
		// SendWithAck(false);
		// SendWithAck(true);
		/*
		 * 测试结果 无论哪种ACK类型,无论是否显示的进行却仍,都会入队成功, ACK的类型不影响生产者生产消息,或者说ACK机制就是针对客户端的
		 */
	}

	@Test
	public void testNonPersistent() throws JMSException {
		session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		// 4.创建目的地
		Destination des = session.createQueue("queue1");
		// 5.创建生产者
		producer = session.createProducer(des);
		// 非持久化
		// producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		// 发送消息
		send();
		/**
		 * 测试结果 如果没有设置持久化,重启ActiveServer消息会丢失, 消费者接受不到消息,
		 * 如果设置了持久化,重启之后消费者仍然能接受到消息 默认的持久化策略是kahadb,当然mysql,oracle等传统关系型数据库也可以
		 * 想不到什么情况下会有关系型数据库当成持久化策略,不演示
		 * 
		 */
	}
	
	@Test
	public void testMessageGroupId() throws JMSException {
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		// 4.创建目的地
		Destination des = session.createQueue("queue1");
		// 5.创建生产者
		producer = session.createProducer(des);
		// 6.发送消息
		for (int i = 0; i < 5; i++) {
			TextMessage message = session.createTextMessage();
			message.setText("我是消息内容i=" + i);
			message.setStringProperty("JMSXGroupID", "groupId11");
			producer.send(message);
			/*
			 * 跟所有的消息都由唯一的consumer处理不同, JMS 消息属性JMSXGroupID 被用来区分message group。
			 * Message Groups特性保证所有具有相同JMSXGroupID 的消息
			 * 会被分发到相同的consumer(只要这个consumer保持active)。 另外一方面,Message
			 * Groups特性也是一种负载均衡的机制。
			 * 在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。 如果存在,那么broker
			 * 会检查是否有某个consumer拥有这个message group。
			 * 如果没有,那么broker会选择一个consumer,并将它关联到这个message group。
			 * 此后,这个consumer会接收这个message group的所有消息,直到Consumer被关闭。
			 */ 
			}
	}
	
	@Test
	public void testMessageSelector() throws JMSException {
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		// 4.创建目的地
		Destination des = session.createQueue("queue1");
		// 5.创建生产者
		producer = session.createProducer(des);
		// 6.发送消息
		for (int i = 0; i < 5; i++) {
			TextMessage message = session.createTextMessage();
			message.setIntProperty("number",i);
			message.setText("我是消息内容i=" + i);
			System.out.println("这是生产者:" + message.getText());
			producer.send(message);
			
			}
	}
	@Test
	public void testCompositeDestination() throws JMSException {
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		
		// 4.创建目的地
		Destination des = session.createQueue("queue1,queue2,queue3,queue4,queue5");
		// 5.创建生产者
		producer = session.createProducer(des);
		// 6.发送消息
		send();
		//CompositeDestination它允许用一个虚拟的destination 代表多个destinations。
		//例如你可以通过composite destinations在一个操作中同时向12个queue发送消息。
		//在composite destinations中,多个destination之间采用","分割
		//如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://
	}

	public void send() throws JMSException {
		send(null, false);
	}

	public void sendWithTransaction(Boolean isCommit) throws JMSException {
		send(isCommit, false);
	}

	public void SendWithAck(Boolean isAck) throws JMSException {
		send(null, isAck);
	}

	public void send(Boolean isCommit, Boolean isAck) throws JMSException {
		// 发送消息
		for (int i = 0; i < 5; i++) {
			TextMessage message = session.createTextMessage();
			message.setText("我是消息内容i=" + i);
			producer.send(message);
			if (isAck == true) {
				message.acknowledge();
			}
			if (isCommit == null) {
				System.out.println("这是生产者:" + message.getText());
			} else if (isCommit == true) {
				session.commit();
				System.out.println("commit---这是生产者:" + message.getText());
			} else if (isCommit == false) {
				session.rollback();
				System.out.println("rollback---这是生产者:" + message.getText());
			}
		}
	}

	@After
	public void after() throws JMSException {
		if (connection != null) {
			connection.close();
		}
	}

}



Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐