目录

 一、消息队列

1、定义

2、为什么会出现消息队列

案例1:

案例2:

案例3:Email邮件案例分析

3、常见通信方式

4、消息队列的特点

5、消息队列相关

       1、AMQP      

       2、技术选型

二、RabbitMQ结构

三、在Docker中部署

        1、拉取镜像

        2、挂载目录

        3、容器启动后,可以通过 docker logs 容器 查看日志

        4、进入管理后台

        5、springboot连接配置(切记需要授权)

四、搭建RabbitMQ项目

        1、创建maven项目作为父项目,只提供依赖

        2、创建消费者和生产者(依赖父项目)

        3、导入所需依赖

        4、yml文件配置 

五、生产者与消费者实例

        1、案列一、接收一个string类型的数据

        2、案列二、接收一个对象

        2、案列三、接收一个json 


 一、消息队列

1、定义

          消息队列中间件是分布式系统中重要的组件,主要用于:异步处理,应用解耦,流量削锋,消息通讯等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

2、为什么会出现消息队列

用身边最常见的案例来阐述这个问题

案例1:

1、案例:假设我们在淘宝下了一笔订单后,淘宝后台需要做这些事情

1. 消息通知系统:通知商家,你有一笔新的订单,请及时发货
2. 推荐系统:更新用户画像,重新给用户推荐他可能感兴趣的商品
3. 会员系统:更新用户的积分和等级信息

代码:

createOrder(...) {
  // 完成订单服务
  doCreateOrder(...);
  // 调用其他服务接口
  sendMsg(...);
  updateUserInterestedGoods(...);
  updateMemberCreditInfo(...);

现在模式的案列图: 

2、中间存在的问题

1、过度耦合:如果后面创建订单时,需要触发新的动作,那就得去改代码,在原有的创建订单函数末尾,再追加一行代码
2、缺少缓冲:如果创建订单时,会员系统恰好处于非常忙碌或者宕机的状态,那这时更新会员信息就会失败,我们需要一个地方,来暂时存放无法被消费的消息

3、优化方案
我们需要一个消息中间件,来实现解耦和缓冲的功能

案例2:

小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走.久而久之,两人都觉得麻烦.

1、优化方案

小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看.

书架就是一个消息队列,小红是生产者,小明是消费者.

2、带来的好处

1. 小红想给小明书的时候,不必问小明什么时候有空,亲手把书交给他了,小红只把书放到书架上就行了.这样小红小明的时间都更自由.
2. 小红相信小明的读书自觉和读书能力,不必亲眼观察小明的读书过程,小红只要做一个放书的动作,很节省时间.
3. 当明天有另一个爱读书的小伙伴小强加入,小红仍旧只需要把书放到书架上,小明和小强从书架上取书即可
4. 书架上的书放在那里,小明阅读速度快就早点看完,阅读速度慢就晚点看完,没关系,比起小红把书递给小明并监督小明读完的方式,小明的压力会小一些

案例3:Email邮件案例分析

有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题.
例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况.
导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了

面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能.

3、常见通信方式

服务之间最常见的通信方式是直接调用彼此来通信,消息从一端发出后立即就可以达到另一端,称为即时消息通讯(同步通信)
消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端,称为延迟消息通讯(异步通信)

4、消息队列的特点

1. 解耦:每个成员不必受其他成员影响,可以更独立自主,只通过一个简单的容器来联系.
2. 提速:小红选只要做一个放书的动作,为自己节省了大量时间.
3. 广播:小红只需要劳动一次,就可以让多个小伙伴有书可读,这大大地节省了她的时间,也让新的小的加伙伴入成本很低.
4. 错峰与流控:小红给书的频率不稳定,如果今明两天连给了五本,之后隔三个月才又给一本,那小明只要在三个月内从书架上陆续取走五本书读完就行了,压力就不那么大了.

5、消息队列相关

       1、AMQP      

一个提供统一消息服务的应用层标准高级消息队列协议,是一个通用的应用层协议
消息发送与接受的双方遵守这个协议可以实现异步通讯.这个协议约定了消息的格式和工作方式.

       2、技术选型

二、RabbitMQ结构

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言

 Server(Broker):接收客户端连接,实现AMQP协议的消息队列和路由功能的进程.
Virtual Host:虚拟主机的概念,类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue.
Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue.
ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic.
Message Queue:消息队列,用于存储还未被消费者消费的消息.
Message:由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等.body是真正需要发送的数据内容.
BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来

三、在Docker中部署

        1、拉取镜像

docker pull rabbitmq:management

注意获取镜像的时候要获取management版本的,不要获取last版本的,management版本的才带有管理界面

        2、挂载目录

docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq:/var/lib/rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management

注意:

--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量:
RABBITMQ_DEFAULT_VHOST:默认虚拟机名
RABBITMQ_DEFAULT_USER:默认的用户名
RABBITMQ_DEFAULT_PASS:默认用户名的密码

        3、容器启动后,可以通过 docker logs 容器 查看日志

docker logs my-rabbitmq

        4、进入管理后台

http://ip:15672(注意是15672不是5672)

在这里需要进行配置一个springBoot账号,以便登录,最后在IDEA进行实战的时候运行成功

        5、springboot连接配置(切记需要授权

四、搭建RabbitMQ项目

        1、创建maven项目作为父项目,只提供依赖

        2、创建消费者和生产者(依赖父项目)

创建provider只要继承Lombok依赖即可,但是consumer还要继承SpringWeb

        3、导入所需依赖

放入到父项目中的依赖中

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

        4、yml文件配置 

server:
 port: 8081(2)
spring:
 application:
  name: provider(consumer)
 rabbitmq:
  host: 虚拟机端口
  password: 123456(登录虚拟机的密码)
  port: 5672
  username: springboot
  virtual-host: my_vhost(这个不能改变)

五、生产者与消费者实例

        1、案列一、接收一个string类型的数据

生产者:

RabbitConfig:
package com.zj.provider;


import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@SuppressWarnings("all")
public class RabbitConfig {


    @Bean
    public Queue firstQueue(){
        return new Queue("firstQueue");
        }
    }
Sender:用来发送信息
package com.zj.provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class ProviderApplicationTests {


    @Autowired
    private Sender sender;

    @Test
    @SneakyThrows
    void contextLoads() {
//         sender.sendFirst();
       sender.sendFirst(new User("aa","bb"));


//        User user=new User("aa","bb");
//        ObjectMapper mapper=new ObjectMapper();
//        sender.sendFirst(mapper.writeValueAsString(user));
    }

}
User:
package com.zj.provider;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@SuppressWarnings("all")
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class User{
 private String username;
 private String userpwd;
}

消费者:

Receiver:
package com.zj.consumer;


import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@SuppressWarnings("all")
@RabbitListener(queues="firstQueue")
public class Receiver {


    /**
     *rabbit处理器
     */
  //   @RabbitHandler
  //  public void process(String msg){
 //        log.warn("接收到:"+msg);
  //   }

    /**
     * rabbit处理器
     */
    @RabbitHandler
    public void process(User user){
        log.warn("接收到:"+user);
   }
//    @RabbitHandler
//    @SneakyThrows
//    public void process(String json) {
//        log.warn("接收到:" + json);
//        ObjectMapper mapper = new ObjectMapper();
//        log.warn("接收到:"+mapper.readValue(json,User.class));
//
//    }

}

其中的注解需要注意:

消费中的注解:

@RabbitListener(queues="firstQueue"),监听名为firstQueue队列

生产者中的注解:

测试:注意顺序,先运行生产者,再运行消费者

 消费者已接收:

        2、案列二、接收一个对象

生产者:

Sender:用来发送信息

package com.zj.provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class ProviderApplicationTests {


    @Autowired
    private Sender sender;

    @Test
    @SneakyThrows
//     void contextLoads() {
//         sender.sendFirst();
//        sender.sendFirst(new User("aa","bb"));


        User user=new User("aa","bb");
        ObjectMapper mapper=new ObjectMapper();
        sender.sendFirst(mapper.writeValueAsString(user));
    }

}

User类:

消费者:

Receiver:

package com.zj.consumer;


import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@SuppressWarnings("all")
@RabbitListener(queues="firstQueue")
public class Receiver {


    /**
     *rabbit处理器
     */
  //   @RabbitHandler
  //  public void process(String msg){
 //        log.warn("接收到:"+msg);
  //   }

    /**
     * rabbit处理器
     */
     @RabbitHandler
      public void process(User user){
         log.warn("接收到:"+user);
    }
  // //   @RabbitHandler
   //  @SneakyThrows
 //      public void process(String json) {
  //       log.warn("接收到:" + json);
 //        ObjectMapper mapper = new ObjectMapper();
  //       log.warn("接收到:"+mapper.readValue(json,User.class));

  //   }

}

这边会报错,因为本来就不会接收一个user,必需要用ObjectMapper类来解决 

        2、案列三、接收一个json 

生产者:

Sender:用来发送信息

package com.zj.provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class ProviderApplicationTests {


    @Autowired
    private Sender sender;

    @Test
    @SneakyThrows
//     void contextLoads() {
//         sender.sendFirst();
//        sender.sendFirst(new User("aa","bb"));


        User user=new User("aa","bb");
        ObjectMapper mapper=new ObjectMapper();
        sender.sendFirst(mapper.writeValueAsString(user));
    }

}

消费者:

Receiver:

package com.zj.consumer;


import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@SuppressWarnings("all")
@RabbitListener(queues="firstQueue")
public class Receiver {


    /**
     *rabbit处理器
     */
  //   @RabbitHandler
  //  public void process(String msg){
 //        log.warn("接收到:"+msg);
  //   }

    /**
     * rabbit处理器
     */
 //    @RabbitHandler
 //    public void process(User user){
  //       log.warn("接收到:"+user);
  //  }
    @RabbitHandler
    @SneakyThrows
      public void process(String json) {
        log.warn("接收到:" + json);
        ObjectMapper mapper = new ObjectMapper();
        log.warn("接收到:"+mapper.readValue(json,User.class));

    }

}

生产者已成功发送信息,消费者才能收取到信息

今天的知识就分享到这了,希望能够帮助到你!

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐