目录

 

1、冤孽的诞生

1.1 需求起因

1.2 策略之争

2、标准解决方案

2.1 延时双删策略

2.2 异步更新缓存(基于订阅binlog的同步机制)

3 、基于binlog订阅实现步骤

3.1 准备材料

3.2 代码实现


1、冤孽的诞生

1.1 需求起因

在高并发的业务场景下,数据库大多数情况都是用户并发访问最薄弱的环节。所以,就需要使用redis做一个缓冲操作,让请求先访问到redis,而不是直接访问MySQL等数据库 !

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

好了,我们现在引入缓存的概念,那么访问路程变成了如下:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

上面这个经典的读取缓存步骤一般没有什么问题,但是一旦涉及到数据更新:数据库和缓存更新,就容易出现缓存(Redis)和数据库间的数据一致性问题

有以下这些不一致的场景:

  • 当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后更新缓存,发现更新缓存失败了,这意味着数据库存的是99,而缓存还是100,这导致数据库和缓存不一致

  • 如果删除了缓存Redis记录,还没有来得及删除对应的数据库记录,另一个线程就来读取,发现缓存为空,则去数据库中读取数据写入缓存,此时缓存中为脏数据。

因为写和读是并发的,没法保证顺序,就会出现缓存和数据库的数据不一致的问题。

1.2 策略之争

我们讨论三种更新策略:

  • 先更新数据库,再更新缓存

    这套方案,大家是普遍反对的。为什么呢?有如下两点原因。

    原因一(线程安全角度) 同时有请求A和请求B进行更新操作,那么会出现

    (1)、线程A更新了数据库

    (2)、线程B更新了数据库

    (3)、线程B更新了缓存

    (4)、线程A更新了缓存

    这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

    原因二(业务场景角度) 有如下两点:

    (1)、如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。

    (2)、如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。

  • 先删除缓存,再更新数据库

    该方案会导致不一致的原因是。同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

    (1)、请求A进行写操作,删除缓存

    (2)、请求B查询发现缓存不存在

    (3)、请求B去数据库查询得到旧值

    (4)、请求B将旧值写入缓存

    (5)、请求A将新值写入数据库 上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。

  • 先更新数据库,再删除缓存

    知名社交网站facebook也在论文《Scaling Memcache at Facebook》中提出,他们用的也是先更新数据库,再删缓存的策略。

    脸书很牛B,但是代表他用这套方案就没问题吗?不是的!

    假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生

    (1)请求A查询数据库,得一个旧值

    (2)请求B将新值写入数据库

    (3)请求B删除缓存

    (4)请求A将查到的旧值写入缓存,如果发生上述情况,确实是会发生脏数据。

    然而,发生这种情况的概率又有多少呢?

    发生上述情况有一个先天性条件,就是步骤(2)的写数据库操作比步骤(1)的读数据库操作耗时更短,才有可能使得步骤(3)先于步骤(4)。

    可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(2)耗时比步骤(1)更短,这一情形很难出现。

    但是难出现不代表不出现!因为我们的数据库写跟表数据量无关,而读会随着数据量的增加而变慢!

    但是从业务层面讲,对于海量数据一般都不会放入缓存,毕竟内存相对于机械硬盘都要贵很多!并且就算你不缺钱,当数据量达到1千万左右时,由于内存中不能存储如此大量数目的数据,频繁同磁盘进行数据交换(持久化),导致数据查询、存储性能的急剧下降,将导致服务不可用。当前还没有好的产品可以实现key-value保证数据完整性,千万级条数量级的,高效存储和查询支持产品。

  • 总结

    对于上述三种方案,我们抛弃第一种,但是第二种和第三种还有一个共同的毛病,那就是更新数据库和删除缓存不是一个原子性操作!所以引入了以下两种业界普遍使用的标准方案

2、标准解决方案

2.1 延时双删策略

2.1.1 基本流程图

 

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

2.1.2 伪代码

public void write(String key,Object data){
​
        redis.delKey(key);
​
        db.updateData(data);
​
        Thread.sleep(1000);
​
        redis.delKey(key);
​
    }
​
/*转化为中文描述就是
(1)先淘汰缓存
(2)再写数据库
(3)休眠1秒(根据具体的读操作业务时间来定)
(4)再次淘汰缓存
​
这么做,可以将1秒内所造成的缓存脏数据,再次删除。
那么,这个1秒怎么确定的,具体该休眠多久呢?
针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。
如果你用了mysql的读写分离架构怎么办?
ok,在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。
(1)请求A进行写操作,删除缓存
(2)请求A将数据写入数据库了,
(3)请求B查询缓存发现,缓存没有值
(4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值
(5)请求B将旧值写入缓存
(6)数据库完成主从同步,从库变为新值
(7)请求A将缓存中B写入的旧值数据删除
​
上述情形,如果休眠时间没有考虑数据同步时间消耗,那么第七步先于第五步执行了会造成数据不一致。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。
​
采用这种同步淘汰策略,吞吐量降低怎么办?
ok,那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。
第二次删除,如果删除失败怎么办?
这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库:
(1)请求A进行写操作,删除缓存
(2)请求B查询发现缓存不存在
(3)请求B去数据库查询得到旧值
(4)请求B将旧值写入缓存
(5)请求A将新值写入数据库
(6)请求A试图去删除请求B写入的缓存值,结果失败了。
​
ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。咋办?
们需要提供一个保障重试的方案:
    定时任务,这样会压力太大,并且一直阻塞会影响性能
    消息队列,异步处理,可以让性能提升,但是对业务代码造成大量的侵入
    
最后的最后,这个延时的时间,你真的好把握吗?而且就算你能把握好,更新数据库和删除缓存的操作不是原子性的,你怎么解决掉?
    1、你可以使用重试删除,但是这样做的结果是以牺牲响应时间为代价
    2、另起一个异步线程处理,这又是以系统资源消耗作为代价并且凭空造成代码的复杂度
​

2.1.3 模拟代码

    /*
        延时双删
     */
    @RequestMapping("update2")
    public void update2() throws InterruptedException {
        //将redis中该缓存删除
        redisTemplate.delete("20200101010101");
​
        //这个位置另外一个读操作进程进来了
        Store oldStore = storeMapper.getStore("20200101010101");
​
        //写入数据库
        Store newStore = new Store("20200101010101", 97);
        storeMapper.update(newStore);
        try {
            Thread.sleep(3000);//这个地方需要自己去评估项目的读数据业务逻辑的耗时,然后加几百ms,如果是主从同步,还应该加上同步时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //读操作在这个位置进行数据缓存
        redisTemplate.opsForValue().append(oldStore.getCode(),JSON.toJSONString(oldStore));
​
        //再次删除缓存数据
        //问题,如果该处删除失败,则缓存里面还是旧数据(脏数据)
        Boolean isDelete = false;
        while (!isDelete){
            isDelete = redisTemplate.delete("20200101010101");
        }
        System.out.println("缓存删除成功");
    }
​

 

2.2 异步更新缓存(基于订阅binlog的同步机制)

2.2.1 基本流程图

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

这个也是我们今天的主要讲解,也是业界用得最多的方案,该方案的核心在于使用队列对读写操作进行排队操作,保证了数据的最终一致性,当然,性能相对上一种要差,但是还是那句话,数据的准确性才是最重要的!

2.2.2 初识Canal

阿里Canal主要是听过伪装成mysql从节点来向主节点拉取binlog日志解析成消息推送到MQ消息队列。

Canal在双写一致性中所处的位置:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)

  • canal解析binary log对象(原始为byte流)

  • canal将解析后的对象数据推送给监听的消息中间件(实时主动推送,rabbitmq需要是在线状态)

 

3 、基于binlog订阅实现步骤

3.1 准备材料

1、需要部署一个阿里巴巴的Canal服务端,用于订阅Mysql binlog日志并推送到MQ消息队列

#下载解压canal server
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz  #下载canal部署包
mkdir canal
tar -zxvf canal.deployer-1.1.1.tar.gz -C canal #解压
​
#编辑conf/canal.properties,修改MQ配置
canal.ip = 1 #canal服务器标识
canal.serverMode = rabbitmq # 指定rabbitmq
canal.mq.servers = 192.168.223.128 ## 注意不要加端口号,不然会报IPV6错误。
canal.mq.vhost=canal  #MQ虚拟机名称
canal.mq.exchange=exchange.trade #交换机名称,用于将消息发送到绑定的队列
canal.mq.username=guest #MQ登录账号,注意要有上面vhost的权限
canal.mq.password=guest #MQ密码
---------------------------------------------------------------------------------
    
#编辑conf/example/instance.properties实例配置,配置数据库信息
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.mysql.slaveId=1234 #不要与my.cnf中server_id相同,因为我要伪装为mysql的slave
canal.instance.master.address=192.168.223.128:3306 ## 数据库地址
canal.instance.defaultDatabaseName=test ## 数据库名
canal.mq.topic=example # 路由键,需要跟MQ中交换机队列的绑定路由key保持一致
​
​

2、安装RabbitMQ

mkdir /usr/local/rabbitmq;
cd /usr/local/rabbitmq;
​
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers;
​
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm;
​
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm;
​
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm;
​
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm;
​
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm;
​
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm;
​
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
#找到loopback_users  修改后台登录用户为[guest]
​
#启动rabbitmq:service rabbitmq-server start
#启动监控管理器:service rabbitmq-plugins enable rabbitmq_management
​
#开启端口:firewall-cmd --zone=public --add-port=15672/tcp --permanent 
#重启防火墙:firewall-cmd --reload

更多精彩请移步《RabbitMQ工作模型及Java编程》

3、安装mysql并开启binlog

rpm -Uvh http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm #下载
yum -y install mysql-community-server #rpm安装
​
#编辑my.cnf配置文件,开启binlog
vim /etc/my.cnf
​
#增加以下配置
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复(没有数据库主从不配也行)
​
#加入开机启动
systemctl enable mysqld
#启动mysql服务进程
systemctl start mysqld
#初始化,执行命令,重置密码 
mysql_secure_installation
#会依次出现以下问题。
    Set root password? [Y/n]
    是否设置root用户的密码 (y后【设置登录密码】)
    Remove anonymous users? [Y/n]
    是否删除匿名用户 (y)
    Disallow root login remotely? [Y/n]
    是否禁止root远程登录 (n)
    Remove test database and access to it? [Y/n]
    是否删除test数据库(y)
    Reload privilege tables now? [Y/n]
    是否重新加载授权信息 (y)
​
# 先进入mysql
mysql -u root -p
# 授权(root用户)远程连接权限(不建议)
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '远程登录密码' WITH GRANT OPTION;
FLUSH PRIVILEGES;
# 使用单独的远程登录用户(推荐)
GRANT ALL PRIVILEGES ON *.* TO '新用户名'@'%' IDENTIFIED BY '远程登录密码' WITH GRANT OPTION;
FLUSH PRIVILEGES;
​
#查看是否已经开启了binlog日志
#登录mysql后输入如下命令:
show variables like '%log_bin%';
​
| Variable_name                   | Value                          |
+---------------------------------+--------------------------------+
| log_bin                         | ON                             |
| log_bin_basename                | /var/lib/mysql/mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF                            |
| log_bin_use_v1_row_events       | OFF                            |
| sql_log_bin                     | ON  
​
#查看binlog日志:
#1、查看第一个binlog文件的内容
mysql> show binlog events;
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
| Log_name         | Pos | Event_type  | Server_id | End_log_pos | Info                                  |
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
| mysql-bin.000001 |   4 | Format_desc |         1 |         120 | Server ver: 5.6.49-log, Binlog ver: 4 |
| mysql-bin.000001 | 120 | Query       |         1 |         192 | BEGIN                                 |
| mysql-bin.000001 | 192 | Table_map   |         1 |         249 | table_id: 70 (test.goods_store)       |
| mysql-bin.000001 | 249 | Delete_rows |         1 |         294 | table_id: 70 flags: STMT_END_F        |
| mysql-bin.000001 | 294 | Xid         |         1 |         325 | COMMIT /* xid=11 */                   |
+------------------+-----+-------------+-----------+-------------+---------------------------------------+
​

分别启动mysql,rabbitmq,canal

service mysql start #启动mysql
service rabbitmq-server start #启动rabbitmq
canal目录/bin/startup.sh #启动canal服务

问题:

 {"identity":{"slaveId":-1,"sourceAddress":{"address":"ydt1","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000026","position":551,"serverId":1,"timestamp":1594283137000}}
2020-07-31 17:27:24.973 [destination = example , address = /192.168.223.128:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000026,position=551,serverId=1,gtid=,timestamp=1594283137000] cost : 617ms , the next step is binlog dump
2020-07-31 17:27:25.106 [destination = example , address = /192.168.223.128:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket
​
#如果出现了以上问题,可能是mysql数据库的binlog日志位置不对,重新设置一下
#先找出当前mysql的binlog日志position,进入mysql客户端,输入如下命令:
show master status;
#找到当前binlog以及position,编辑canal目录/conf/example/meta.dat元数据脚本
vim /usr/local/canal/conf/example/meta.dat
将----》"journalName":"mysql-bin.000003","position":499改为自己查到的或者比查到的小即可
#或者直接将该文件删除,重新生成当前数据库执行命令的position位置对应的元数据脚本
​

3.2 代码实现

3.2.1 pom.xml

<dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.2</version>
        </dependency>
​
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.16</version>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
​
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.3</version>
        </dependency>

3.2.2 application.yml

spring:
  rabbitmq:
    virtual-host: canal
    host: 192.168.223.128
    publisher-confirms: true
  #数据源
  datasource:
    url: jdbc:mysql://192.168.223.128:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC
    username: root
    password: root
    driver-class-name: com.mysql.jdbc.Driver
  redis:
    host: 192.168.223.128
​

 

3.2.3 消息队列配置类

package com.ydt.test.message;
​
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
​
@Configuration
public class DirectRabbitConfig {
 
    //队列 起名:exchange.trade.canal
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("exchange.trade.canal",true);
    }
 
    //Direct交换机 起名:exchange.trade
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("exchange.trade");
    }
 
    //绑定  将队列和交换机绑定, 并设置用于匹配键:example
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("example");
    }
}

3.2.4 消息监听者类

package com.ydt.test.message;
​
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
​
import java.util.Map;
​
@Component
public class DirectReceiver {
​
    @Autowired
    private StringRedisTemplate redisTemplate;
​
    @RabbitListener(queues = "exchange.trade.canal")
    public void process(Message message){
        String json = new String(message.getBody());
        System.out.println("消费的消息:" + json);
        Map map = JSON.parseObject(json,Map.class);
        JSONArray array = null;
        String sqlType = (String) map.get("type");
        if(StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
            array = JSONArray.parseArray((String)map.get("data"));
        }else{
            array = (JSONArray)map.get("data");
        }
        if(array == null){
            return;
        }
        JSONObject object = array.getJSONObject(0);
       /* if(StringUtils.endsWithIgnoreCase("UPDATE",sqlType)
                || StringUtils.endsWithIgnoreCase("INSERT",sqlType)
                || StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
            redisTemplate.boundValueOps(object.get("code").toString()).set(object.toString());
        }else if(StringUtils.endsWithIgnoreCase("DELETE",sqlType)){
            redisTemplate.delete(object.get("code").toString());
        }*/
        if(StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
            redisTemplate.boundValueOps(object.get("code").toString()).set(object.toString());
        }else{
            redisTemplate.delete(object.get("code").toString());
        }
    }
}
​

 

3.2.5 Controller调用

package com.ydt.test.controller;
​
import com.alibaba.fastjson.JSON;
import com.ydt.test.domain.Store;
import com.ydt.test.mapper.StoreMapper;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
​
import java.util.HashMap;
import java.util.Map;
​
@RestController
public class MessageController {
​
​
    @Autowired
    private StoreMapper storeMapper;
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @Autowired
    private RedisTemplate redisTemplate;
​
    /**
     * 案例1
     */
    @RequestMapping("update1")
    public void update1(){
        Store store = new Store("20200101010101", 98);
        storeMapper.update(store);
        int i = 1/0;
        redisTemplate.delete("20200101010101");
    }
​
    @RequestMapping("get")
    public String getMessage(){
        //查询操作
        Store store = storeMapper.getStore("20200101010101");
        System.out.println("-----------我进行了查询,现在我要开始进行redis缓存了-----------");
        //同一数据源
        Map map = new HashMap();
        map.put("type", "SELECT");
        map.put("data", "[{'code':'20200101010101','store':"+store.getStore()+"}]");
        rabbitTemplate.convertAndSend("exchange.trade", "example", JSON.toJSONString(map));
        return  "";
    }
}
​

3.2.6 启动测试

1、先将库存表中库存修改为111,会通过canal伪slave拿到binlog日志,然后推送到rabbitmq

2、然后调用get方法,拿到数据库中数据,同时将数据推送到rabbitmq

3、重复1操作,将库存改为321

4、重复2操作

打开监听,启动服务可以看到我们的消费者会按照顺序消费队列中的数据!

DirectReceiver消费者收到消息  : {"data":[{"code":"20200101010101","store":"111"}],"database":"test","es":1596198062000,"id":13,"isDdl":false,"mysqlType":{"code":"varchar(255)","store":"int(11)"},"old":[{"store":"123"}],"pkNames":["code"],"sql":"","sqlType":{"code":12,"store":4},"table":"goods_store","ts":1596198062527,"type":"UPDATE"}
DirectReceiver消费者收到消息  : {"data":"[{'code':'20200101010101','store':111}]","type":"SELECT"}
DirectReceiver消费者收到消息  : {"data":[{"code":"20200101010101","store":"321"}],"database":"test","es":1596198075000,"id":14,"isDdl":false,"mysqlType":{"code":"varchar(255)","store":"int(11)"},"old":[{"store":"111"}],"pkNames":["code"],"sql":"","sqlType":{"code":12,"store":4},"table":"goods_store","ts":1596198075520,"type":"UPDATE"}
DirectReceiver消费者收到消息  : {"data":"[{'code':'20200101010101','store':321}]","type":"SELECT"}
​

 

 

 

 

 

 

 

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐