1. 介绍:

apache基金会开源的一个消息中间件,由java语言开发,基于JMS(Java Message Service)协议的实现,支持JMS1.1和J2EE 1.4规范。流行,开源,性能强劲。支持多种语言客户端,如:Java、C、C++、C#、Ruby、Perl、PHP、Python等。

2. 消息模式:
  • 点对点:每一条生产者发送的消息,只能被一个消费者消费一次,其他消费者不能再次消费。
  • 主题(Topic):多个消费者订阅队列,每一条生产者发送的消息,所有订阅者都会接收进行消费。
    [外链图片转存失败(img-H28BVveW-1568181460859)image
    image
3. 特点:
3.1 支持的消息类型

在javax.jms包Message接口中有定义:

    TextMessage     //文本消息
    MapMessage      //键值对消息
    ObjectMessage   //对象消息(需要序列化)
    BytesMessage    //字节消息
    StreamMessage   //stream流消息
3.2 消息权重:
  • 概述:生产者在往队列发送消息时候,可以指定消息的权重,权重越高越被优先消费。权重的范围:0~9,默认都为0,即默认消息消费顺序就是发送时的顺序,FIFO。
  • 实现方式:

1.修改activemq.xml配置文件,指定某个队列是否开启权重消费:

        <destinationPolicy>
            <policyMap>
              <policyEntries>  
                <policyEntry topic=">" >
                 <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <!-- 指定开启消息权重的队列 -->
                <policyEntry queue="test" prioritizedMessages="true"/>
             </policyEntries>
            </policyMap>
        </destinationPolicy> 

2.producer端程序发送消息时,指定消息权重。

//设定消息权重为6,优先消费
producer.send(message, DeliveryMode.PERSISTENT, 6, 0);
3.3 消费者权重
  • 概述:ActiveMQ支持消费者优先级设置,Consumer的Priority划分为0~127个级别,127为最高,0最低,0也是ActiveMQ默认的级别。Broker通过消费者的权重进行排序,优先将消息分发给最高优先级的Consumer,直到该优先的consumer的prefetch buffer满了,Broker才把消息分发给优先级次高的。
  • 使用场景:通常对网络较好、机器配置较高、处理速度较快、业务较简单的消费者设置较高的权重。
  • 实现方式:
    只需在创建目的地时候,目的地参数上加个权重属性。
//5.创建目的地
destination = session.createQueue("test?consumer.priority=30&consumer.prefetchSize=5");
  • 备注:权重可以影响负载均衡,权重大的负载也大。
3.4 消息最大消费时间
  • 概述:发送消息时,可以设置最大存活时间T,消费者可以在T时间内正常的消费该消息,一旦超过时间T则无法再消费。
  • 实现方式:

1.producer发送消息前设置,然后所有发送的消息都按这个存活时间。

//设置消息最大消费时间,后续发送的消息都按这个存活时间,超时未正常消费进入死信队列
producer.setTimeToLive(10000l);
producer.send(message);

2.发送单条消息时,指定该消息的最大消费时间。

//设置消息最大消费时间,超时未正常消费进入死信队列
producer.send(message, DeliveryMode.PERSISTENT, 0, 10000);
  • 备注:超过最大消费时间还未被消费的消息,会进入死信队列。
3.5 死信队列
  • 概述:由于某些原因一些消息未能被消费者处理,activemq会将这些消息持久化到死信队列中,不再处理这些消息。ActiveMQ默认的死信队列名为:ActiveMQ.DLQ
  • 作用:死信队列是MQ服务可靠性保障手段之一,未被处理的消息放到死信队列等待特殊处理,总比消息丢失要强得多。
  • 什么情况下消息会进入死信队列?

1.消费者在最大存活时间T内未消费掉该消息
2.异步接收时,默认重试6次都无法消费掉该消息
3.默认broker端重发6次该消息,消费者都无法消费掉时

4. 事务:
  • 概述:事务是MQ消息投递可靠性保障的一种手段,传统的数据库事务主要包括:A(原子性)、C(一致性)、I(隔离性)、D(持久性),在MQ当中主要满足"一致性"的要求。
  • 举例:比如我们有多个producer,或要发送多条message,如果在业务上要求这些消息要么同时都发送成功,要么都发送失败,此时就需要放在同一事物中。
  • 实现:
/**
     * 开启事务--模拟多次发送消息,中间有业务异常导致事务提交失败,这时已发送的消息也不会在MQ中看到。
     */
    public static void main(String[] args) {
        //mq服务地址
        String broker = "tcp://192.168.1.113:61616";
        //链接工厂接口
        ConnectionFactory connectionFactory = null;
        //链接接口
        Connection connection = null;
        //会话接口
        Session session = null;
        //目的地接口
        Destination destination = null;
        //生产者接口
        MessageProducer producer = null;
        //消息接口
        Message message = null;

        String[] msgs = new String[]{"zhangsan", "lisi", "wangwu", "james", "wade", "kobe"};
        try {
            //1.创建链接工厂
            connectionFactory = new ActiveMQConnectionFactory(broker);
            //2.获取链接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session,第一个参数为是否开启事务,第二个为ack方式。
            //当开启事务时,ack方式只能设为0,跟随事务提交ack。
            session = connection.createSession(true, 0);
            //5.创建目的地
            destination = session.createQueue("test");
            //6.创建消息生产者
            producer = session.createProducer(destination);
            //8.发送消息
            for(String m:msgs) {
                message = session.createTextMessage(m+" hit the boll!");
                producer.send(message);
            }
            //模拟出现业务异常,未正常提交事务,消息发送全都不会成功。
            if(0==0) {
                int s = 100/0;
            }
            //9.提交事务(开启事务时执行)
            session.commit();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //10.关闭资源
            try {
                if (producer!=null) {producer.close();}
                if (session!=null) {session.close();}
                if (connection!=null) {connection.close();}
                System.out.printf("资源关闭...");
            } catch (JMSException ex) {
                ex.printStackTrace();
            }
        }
    }
5. ACK机制:
  • 为何会有ACK机制?
    消息投放过程中会存在以下问题:
  1. prudocer生产的消息,如何确保已成功发送到了MQ中?(消息发送不丢失)
  2. consumer消费消息后,如何及时通知MQ把已消费的消息从服务器清除掉?(消息消费不重复)
  • 概述:ACK即acknowleage,应答机制,也是MQ消息投递可靠性保障的一种手段。MQ服务器通过客户端的ack消息,确定消息是否要从服务器清除。
  • ack机制的策略:
    在javax.jms包Session接口中有定义:
    int SESSION_TRANSACTED = 0; //事务提交时自动发送ack
    int AUTO_ACKNOWLEDGE = 1;   //自动确认,一旦消息接收到就附带发送ack
    int CLIENT_ACKNOWLEDGE = 2; //手动确认,客户端代码手动ack
    int DUPS_OK_ACKNOWLEDGE = 3;//重复副本的确认
  • 实现方式:
    这里主要针对消费端举例ack的实现方式:
  1. 开启事务情况下:
    a.同步消费:
/**
     * 开启事务+同步消费时的ack
     */
    public static void main(String[] args) {
        //mq服务地址
        String broker = "tcp://192.168.1.111:61616";
        //链接工厂接口
        ConnectionFactory connectionFactory = null;
        //链接接口
        Connection connection = null;
        //会话接口
        Session session = null;
        //目的地接口
        Destination destination = null;
        //消费者接口
        MessageConsumer consumer = null;
        //消息接口
        Message message = null;

        try {
            //1.创建链接工厂
            connectionFactory = new ActiveMQConnectionFactory(broker);
            //2.获取链接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session,第一个参数为是否开启事务,第二个为ack方式。
            // 当手动确认时,事务参数需要设为true,第二个参数只能事务提交时自动ack。
            // ack的时机在执行session.commit()时触发
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //5.创建目的地
            destination = session.createQueue("test");
            //6.创建消息消费者
            consumer = session.createConsumer(destination);
            //7.消费消息(同步)
            message = consumer.receive();
            String text = ((TextMessage)message).getText();
            System.out.println(text);
            //8.提交事务
            //注意:开启事务时,ack在commit()时同时提交
            //      如果此时未正常commit(),ActiveMQ会认为该消息未正常ack,此时该消息不会在MQ服务器清除,导致再次发送给其他消费者。
//            session.commit();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //9.关闭资源
            try {
                if (consumer!=null) {consumer.close();}
                if (session!=null) {session.close();}
                if (connection!=null) {connection.close();}
                System.out.printf("资源关闭...");
            } catch (JMSException ex) {
                ex.printStackTrace();
            }
        }
    }

b.异步消费:

/**
     * 开启事务+异步消费时的ack
     */
    public static void main(String[] args) {
        //mq服务地址
        String broker = "tcp://192.168.1.111:61616";
        //链接工厂接口
        ConnectionFactory connectionFactory = null;
        //链接接口
        Connection connection = null;
        //会话接口
        Session session = null;
        //目的地接口
        Destination destination = null;
        //消费者接口
        MessageConsumer consumer = null;

        try {
            //1.创建链接工厂
            connectionFactory = new ActiveMQConnectionFactory(broker);
            //2.获取链接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session,第一个参数为是否开启事务,第二个为ack方式。
            // 当手动确认时,事务参数需要设为true,第二个参数只能事务提交时自动ack。
            // ack的时机在执行session.commit()时触发
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //5.创建目的地
            destination = session.createQueue("test");
            //6.创建消息消费者
            consumer = session.createConsumer(destination);
            //7.消费消息(异步),需要实现MessageListener接口
            Session finalSession = session;
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //消费消息
                    try {
                        String text = ((TextMessage)message).getText();
                        System.out.println(text);
                        //8.提交事务
                        //注意:开启事务时,ack在commit()时同时提交
                        //      如果此时未正常commit(),ActiveMQ会认为该消息未正常ack,此时该消息不会在MQ服务器清除,导致再次发送给其他消费者。
//                        finalSession.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
  1. 自动ack情况下:
    a.同步消费:
/**
     * 手动确认+同步消费时的ack
     */
    public static void main(String[] args) {
        //mq服务地址
        String broker = "tcp://192.168.1.111:61616";
        //链接工厂接口
        ConnectionFactory connectionFactory = null;
        //链接接口
        Connection connection = null;
        //会话接口
        Session session = null;
        //目的地接口
        Destination destination = null;
        //消费者接口
        MessageConsumer consumer = null;
        //消息接口
        Message message = null;

        try {
            //1.创建链接工厂
            connectionFactory = new ActiveMQConnectionFactory(broker);
            //2.获取链接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session,第一个参数为是否开启事务,第二个为ack方式。
            // 当手动确认时,事务参数需要设为false,第二个参数选客户端手动确认。
            // ack的时机由代码调用处触发
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //5.创建目的地
            destination = session.createQueue("test");
            //6.创建消息消费者
            consumer = session.createConsumer(destination);
            //7.消费消息(同步)
            message = consumer.receive();
            String text = ((TextMessage)message).getText();
            System.out.println(text);
            // 注意:此时需要调用acknowledge()手动确认
            // 如未显示调用该方法,ActiveMQ会认为该消息未正常ack,此时该消息不会在MQ服务器清除,导致再次发送给其他消费者。
//            message.acknowledge();

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //9.关闭资源
            try {
                if (consumer!=null) {consumer.close();}
                if (session!=null) {session.close();}
                if (connection!=null) {connection.close();}
                System.out.printf("资源关闭...");
            } catch (JMSException ex) {
                ex.printStackTrace();
            }
        }
    }

b.异步消费:

/**
     * 开启事务+异步消费时的ack
     */
    public static void main(String[] args) {
        //mq服务地址
        String broker = "tcp://192.168.1.111:61616";
        //链接工厂接口
        ConnectionFactory connectionFactory = null;
        //链接接口
        Connection connection = null;
        //会话接口
        Session session = null;
        //目的地接口
        Destination destination = null;
        //消费者接口
        MessageConsumer consumer = null;
        //消息接口
        Message message = null;

        try {
            //1.创建链接工厂
            connectionFactory = new ActiveMQConnectionFactory(broker);
            //2.获取链接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session,第一个参数为是否开启事务,第二个为ack方式。
            // 当手动确认时,事务参数需要设为false,第二个参数选客户端手动确认。
            // ack的时机由代码调用处触发
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //5.创建目的地
            destination = session.createQueue("test");
            //6.创建消息消费者
            consumer = session.createConsumer(destination);

            //7.消费消息(异步),需要实现MessageListener接口
            Session finalSession = session;
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                        System.out.println(text);
                        // 注意:此时需要调用acknowledge()手动确认
                        // 如未显示调用该方法,ActiveMQ会认为该消息未正常ack,此时该消息不会在MQ服务器清除,导致再次发送给其他消费者。
//                         message.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
  1. 手动ack情况下:
    a.同步消费:
/**
     * 自动确认+同步消费时的ack
     */
    public static void main(String[] args) {
        //mq服务地址
        String broker = "tcp://192.168.1.111:61616";
        //链接工厂接口
        ConnectionFactory connectionFactory = null;
        //链接接口
        Connection connection = null;
        //会话接口
        Session session = null;
        //目的地接口
        Destination destination = null;
        //消费者接口
        MessageConsumer consumer = null;
        //消息接口
        Message message = null;

        try {
            //1.创建链接工厂
            connectionFactory = new ActiveMQConnectionFactory(broker);
            //2.获取链接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session,第一个参数为是否开启事务,第二个为ack方式。
            // 当自动确认时,事务参数需要设为false,第二个参数选自动确认。
            // ack的时机由consumer.receive()执行完时触发
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建目的地
            destination = session.createQueue("test");
            //6.创建消息消费者
            consumer = session.createConsumer(destination);
            //7.消费消息(同步)
            // 注意:当执行.receive()方法时,同时就执行了ack,此时MQ会将该消息在服务器中清除。
            message = consumer.receive();
            String text = ((TextMessage)message).getText();
            System.out.println(text);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //9.关闭资源
            try {
                if (consumer!=null) {consumer.close();}
                if (session!=null) {session.close();}
                if (connection!=null) {connection.close();}
                System.out.printf("资源关闭...");
            } catch (JMSException ex) {
                ex.printStackTrace();
            }
        }
    }

b.异步消费:

/**
     * 自动确认+异步消费时的ack
     */
    public static void main(String[] args) {
        //mq服务地址
        String broker = "tcp://192.168.1.111:61616";
        //链接工厂接口
        ConnectionFactory connectionFactory = null;
        //链接接口
        Connection connection = null;
        //会话接口
        Session session = null;
        //目的地接口
        Destination destination = null;
        //消费者接口
        MessageConsumer consumer = null;
        //消息接口
        Message message = null;

        try {
            //1.创建链接工厂
            connectionFactory = new ActiveMQConnectionFactory(broker);
            //2.获取链接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session,第一个参数为是否开启事务,第二个为ack方式。
            // 当自动确认时,事务参数需要设为false,第二个参数选自动确认。
            // ack的时机由异步处理的onMessage()方法执行完时触发
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建目的地
            destination = session.createQueue("test");
            //6.创建消息消费者
            consumer = session.createConsumer(destination);

            //7.消费消息(异步),需要实现MessageListener接口
            consumer.setMessageListener(new MessageListener() {
                // 该方法执行完毕时会自动ack
                // 注意:如果该方法内部出现异常,导致方法未执行完,那么就不会自动ack。
                //      此时该消息不会在MQ服务器清除,导致该消息再次发送。
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                        System.out.println(text);
                        //以下模拟消息消费后发生异常,未正常执行ack的情景,ActiveMQ会重复推送该消息,直到超过次数进入死信队列。
                        int sss = 10/0;
                    } catch (JMSException e) {
                        throw new RuntimeException(e);
                    }
                }
            });

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
6. 消息重发:
  • 概述:这里指的是MQ服务器到consumer客户端的又一次消息发送,也是MQ一种保障手段。
  • 说明:
  1. consumer调用session.recover()方法可以触发MQ消息重发
  2. ActiveMQ会对所有"之前已发送过但未正常ack确认"的消息进行重发
  3. 如果一条消息,默认被重发6次后还未正常ack,该消息会进入死信队列不再被重发
  4. 重发跟同步消费或异步消费没有关系,结论都一样
  • 实现方式:
if("james hit the boll!".equals(s)) {
    // 调用session.recover()触发消息重发
    // 消息重发的范围:ActiveMQ会对所有"之前已发送过但未正常ack确认"的消息进行重发
    // 如果重发6次后还未正常ack的消息,会进入死信队列不再被重发。
    session.recover();
} else {
    //message.acknowledge();
}
7. 持久化方式:
  • LevelDB,从V5.8版本后支持
  • KahaDB,从V5.3版本后支持
  • JDBC数据库,从V4.X版本支持
  1. 集群架构及特点:
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐