RabbitMQ最原始实现方式_Java

说明:对于Rabbitmq的安装方式有很多种, 因为rabbitmq是使用Erlang语言进行编写的, 所以我们在安装的时候需要先搭建Erlang的一个运行环境
我们安装的rabbitmq的版本是3.7的,要求使用的Erlang版本必须是20.3.x的版本,我们可以去github下载对应的Erlang的对应版本https://github.com/rabbitmq/erlang-rpm
vi /etc/yum.repos.d/rabbitmq-erlang.repo
Erlang 20.3.x

相关参数解读:
durable
RabbitMQ默认将消息存储在内存中,若RabbitMQ宕机,那么整个队列会丢失.可以把这个属性设置成true,表示这个队列需要做持久化.这个属性只是声明队列是持久化的,RabbitMQ宕机或者重启之后,队列依然存在,但是里面的消息没有持久化,也会丢失.所以需要针对消息也做持久化.

channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());

exclusive
如果你想创建一个只有自己可见的队列,即不允许其它用户访问,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
该队列的特点是:
1.只对首次声明它的连接(Connection)可见
2.会在其连接断开的时候自动删除。
autoDelete
当所有消费客户端连接断开后,是否自动删除队列 true:删除false:不删除

消息签收机制
分为自动签收和手动签收
默认是自动签收机制

导入依赖

<!--rabbitMq客户端-->
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.6.0</version>
</dependency>

对于生产者和消费者来说都属于客户端,而服务端是消息服务中心server(也叫broker).
案例实现
生产者produce

/**
 * 生产消息的:把消息放入消息队列
 * java_RabbitMq :produce最原始
 */
public class Send {
    // 定义消息队列名称
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        //通过工厂创建连接,和JDBC类似
        ConnectionFactory factory = new ConnectionFactory();
        //设置rabbitMq服务器的主机地址
        factory.setHost("192.168.142.129");
        //创建连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //通过通道向服务器发送请求,声明一个队列,队列名词
            //channel.queueDeclare(队列名称, 队列是否需要持久化, 是否排他性的队列, false, null);
            //排他性的队列 这个队列的消息只能给当前connection所创建出来的通道所使用,别的connection是无法获取到此消息的
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //自定义字符串变量 把"Hello World"消息发送至消息队列
            String message = "Hello World!";
            //通过通道channel往服务器发送消息
            //channel.basicPublish("", "发送的队列名称", 消息的属性信息, 字符串的byte数组);
            //MessageProperties.PERSISTENT_TEXT_PLAIN:消息需要持久化
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
            //测试控制台查看消息的发送情况
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者Consumer

/**
 * 监听消息队列的消费者
 * java_RabbitMq :consumer最原始
 */
public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.142.129");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //如果队列在服务器中已经存在就不在创建,如果不存在就创建
        //如果服务器中已经有了这个队列,执行这句话,定义队列的参数需要和服务器上的队列参数保持一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback = (String consumerTag, Delivery delivery) -> {
            //处理消息的逻辑块儿
            //consumerTag 消费者的标识
            try {
                //int i=1/0
                //delivery.getBody()存放在消息中心的内容
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                //delivery.getEnvelope().getDeliveryTag()标识这条消息在通道中唯一标识Long数值
                //参数2:是否批量签收
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }catch (Exception ex){
                //消息放入队列失败重新放入队列(重试)
                ex.printStackTrace();
                //channel.basicNack(消息的唯一标识,是否批量处理,是否需要重新进入队列))
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,true);
            }
        };
        //通过通道定义消费的监听对象
        //channel.basicConsume(消息队列名称(要和生产者保持一致), 消息签收方式(true自动签收-false手动签收), 消息的监听事件处理器, 消息队列删除的时候会触发此方法consumerTag -> { });
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐