前言 :项目中如果想要实时监听Mysql 表中数据的实时状态(插入,更新,删除),并根据不同的状态做出相应的动作,应该怎么办;

1 BinaryLogClient 介绍:

BinaryLogClient是一个Java库,用于解析和读取MySQL数据库的二进制日志。它基于MySQL的复制协议,允许应用程序在MySQL数据库实例上实时订阅和消费二进制日志事件。它提供了一种方便、高效的方式来跟踪和监控MySQL中的数据变化,帮助应用程序实时处理和响应这些变化。

2 BinaryLogClient 的使用:

2.1 mysql启用binlog:

show variables like 'log_bin';

如果为值为OFF,表示没有启用,那么需要首先启用binlog,修改配置文件:
找到mysql 服务的my.ini 文件,添加:

log_bin=mysql-bin
binlog-format=ROW
server-id=1
  • 在配置文件中加入了log_bin配置项后,表示启用了binlog;
  • binlog-format是binlog的日志格式,支持三种类型,分别是STATEMENT、ROW、MIXED,我们在这里使用ROW模式;
  • server-id用于标识一个sql语句是从哪一个server写入的,这里一定要进行设置,否则我们在后面的代码中会无法正常监听到事件;

在更改完配置文件后,重启mysql服务。再次查看是否启用binlog,返回为ON,表示已经开启成功。

2.2 pom 引入jar:

<dependency>
<groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- mysql-->
<dependency>
   <groupId>com.baomidou</groupId>
   <artifactId>mybatis-plus-boot-starter</artifactId>
   <version>3.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>8.0.21</version>
</dependency>
<!-- mysql-->
<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
</dependency>
<!-- binlog-->
<dependency>
   <groupId>com.github.shyiko</groupId>
   <artifactId>mysql-binlog-connector-java</artifactId>
   <version>0.21.0</version>
</dependency>
<!-- binlog-->
<!--json -->
<dependency>
   <groupId>com.alibaba.fastjson2</groupId>
   <artifactId>fastjson2</artifactId>
   <version>2.0.31</version>
</dependency>
<!--json -->

2.3 mysql 配置文件:
HikariBaseConfig.java:

package com.example.mysqlbinarylog.config;

import com.zaxxer.hikari.HikariDataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Configuration;

/**
 * 线程池
 */
@Configuration
public class HikariBaseConfig {

    @Value("${spring.datasource.hikari.pool-name}")
    private String poolName;
    @Value("${spring.datasource.hikari.maximum-pool-size}")
    private Integer maximumPoolSize;
    @Value("${spring.datasource.hikari.connection-timeout}")
    private Long connectionTimeout;
    @Value("${spring.datasource.hikari.minimum-idle}")
    private Integer minimumIdle;
    @Value("${spring.datasource.hikari.max-lifetime}")
    private Long maxLifetime;
    @Value("${spring.datasource.hikari.connection-test-query}")
    private String connectionTestQuery;


    public HikariDataSource getDataSource(String driverClassName, String url, String username, String password){
        HikariDataSource hikariDataSource = DataSourceBuilder.create().type(HikariDataSource.class).driverClassName(driverClassName).username(username).url(url).password(password).build();
        hikariDataSource.setConnectionTestQuery(connectionTestQuery);
        hikariDataSource.setMaxLifetime(maxLifetime);
        hikariDataSource.setMinimumIdle(minimumIdle);
        hikariDataSource.setConnectionTimeout(connectionTimeout);
        hikariDataSource.setPoolName(poolName);
        hikariDataSource.setMaximumPoolSize(maximumPoolSize);
        return hikariDataSource;
    }


}

UidDataSourceConfig.java:

package com.example.mysqlbinarylog.config;

import com.baomidou.mybatisplus.core.config.GlobalConfig;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;

/**
 * @Description TODO
 * @Date 2021/11/18 11:00
 * @Author lgx
 * @Version 1.0
 */
// // 标明注解
@Configuration
// 开启事务支持后,然后在访问数据库的Service方法上添加注解 @Transactional 便可
@EnableTransactionManagement
// 配置xml 扫描文件位置
@MapperScan(basePackages ={"com.example.mapper"}, sqlSessionFactoryRef = "uidSqlSessionFactory")
public class UidDataSourceConfig {
    @Value("${spring.datasource.uid.jdbc-url}")
    private String url;

    @Value("${spring.datasource.uid.driver-class-name}")
    private String driverClassName;

    @Value("${spring.datasource.uid.username}")
    private String username;

    @Value("${spring.datasource.uid.password}")
    private String password;

    @Autowired
    private HikariBaseConfig hikariBaseConfig;

    @Primary
    @Bean(name = "uidDataSource")
    public DataSource masterDataSource() {
        return hikariBaseConfig.getDataSource(driverClassName, url, username, password);
    }

    @Primary
    @Bean
    public JdbcTemplate jdbcTemplate(@Qualifier("uidDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Primary
    @Bean(name = "uidSqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("uidDataSource") DataSource dataSource) throws Exception {
        MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);
        sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:mapper/**/*.xml"));
        // 全局字段创建人/更新人/创建时间/更新时间 字段的填充
        GlobalConfig globalConfig = new GlobalConfig();
        sessionFactoryBean.setGlobalConfig(globalConfig);
        // sessionFactoryBean.setPlugins(new Interceptor[]{pageHelper()});
//        Interceptor[] plugins = {paginationInterceptor()};
//        sessionFactoryBean.setPlugins(plugins);
        return sessionFactoryBean.getObject();
    }

    @Primary
    @Bean(name = "uidSqlSessionTemplate")
    public SqlSessionTemplate sqlSessionTemplate(@Qualifier("uidSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

    /**
     * 事务管理
     *
     * @param dataSource
     * @return
     */
    @Primary
    @Bean(name = "uidTransactionManager")
    public DataSourceTransactionManager transactionManager(@Qualifier("uidDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }


}

application.properties 配置文件:


server.port=9083
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.pool-name=KevinHikariPool
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.connection-timeout=60000
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.idle-timeout=500000
spring.datasource.hikari.max-lifetime=600000
spring.datasource.hikari.connection-test-query=SELECT 1


spring.datasource.uid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.uid.jadbc-url=jdbc:mysql://localhost:3306/biglog-cpa?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useAffectedRows=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.uid.username=root
spring.datasource.uid.password=123


binlog.datasource.host=127.0.0.1
binlog.datasource.port=3406
binlog.datasource.username=root
binlog.datasource.passwd=ddsoft
binlog.datasource.table=biglog.user,biglog.student
binlog.datasource.serverId=3


2.4 binlog 日志监听:

Tool.java 工具类:

public class Tool {

    public final static String COMMA = ",";
    public final static String POINT = ".";

    public static boolean isNotNull(List records){
        return (null != records && records.size() > 0);
    }
    public static boolean isNull(List records){
        return !isNotNull(records);
    }

}

Event.java 事件类型类:

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum Event {
    UPDATE("UPDATE"),DELETE("DELETE"),WRITE("WRITE");
    private String key;
}

BinLogConstants.java 配置读取类:

@Data
@Component
@ConfigurationProperties(prefix = "binlog.datasource")
public class BinLogConstants {

    private String host;

    private int port;


    private String username;

    private String passwd;

    private String table;
    private List<String> tables;


    private Integer serverId;

    public static final int consumerThreads = 5;

    public static final long queueSleep = 1000;

    public List<String> getTables() {
        if (StringUtils.hasText(table)){
            tables = Arrays.asList(table.split(","));
        }
        return tables;
    }
}

BingLogMetadata.java 数据库下的表字段读取工具类:

package com.example.mysqlbinarylog.binlog;


import com.example.mysqlbinarylog.util.Tool;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 监听获取元数据
 *
 * @author xulinglin
 * @since 2022/7/21
 **/
@Component
public class BingLogMetadata {

    private String driver = "com.mysql.cj.jdbc.Driver";

    public static Map<String,Map<Integer,String>> database(BinLogConstants binLog) throws Exception{
        Map<String,Map<Integer,String>> metadata = new ConcurrentHashMap<>();
        List<String> table = binLog.getTables();
        if(Tool.isNotNull(table)){
            Map<String,List<String>> group = new ConcurrentHashMap<>();
            for (int i = 0; i < table.size(); i++) {
                String key = table.get(i);
                String[] split = key.split("\\"+Tool.POINT);
                if(null == split || split.length != 2){
                    throw new Exception("BinLog配置同步,类型错误 [库名.表名]。请正确配置:"+key);
                }
                String database = split[0];
                String tableName = split[1];
                List<String> list = group.get(database);
                if(null == list){
                    group.put(database, list = new ArrayList());
                }
                list.add(tableName);
            }
            Iterator<Map.Entry<String, List<String>>> iterator = group.entrySet().iterator();
            while (iterator.hasNext()){
                Map.Entry<String, List<String>> next = iterator.next();
                String key = next.getKey();
                List<String> value = next.getValue();
                Properties props = new Properties();
                props.setProperty("user", binLog.getUsername());
                props.setProperty("password", binLog.getPasswd());
                props.setProperty("remarks", "true");
                props.setProperty("useInformationSchema", "true");
                String url = "jdbc:mysql://"+binLog.getHost()+":"+binLog.getPort()+"/"+key+"?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";
                Connection connection = DriverManager.getConnection(url, props);
                DatabaseMetaData metaData = connection.getMetaData();
                ResultSet tableRs = metaData.getTables(connection.getCatalog(), connection.getCatalog(), "%", new String[]{"TABLE"});
                while (tableRs.next()) {
                    String tableName = tableRs.getString("TABLE_NAME");
                    if(value.contains(tableName)){
                        Map<Integer,String> map = new HashMap<>();
                        metadata.put(key+"."+tableName,map);
                        ResultSet columnRs = metaData.getColumns(connection.getCatalog(), connection.getCatalog(), tableName, "%");
                        int i = 0;
                        while (columnRs.next()) {
                          map.put(Integer.valueOf(i),columnRs.getString("COLUMN_NAME"));
                          i++;
                        }
                    }
                }
            }
        }
        return metadata;
    }

}


BinaryLogClientRunner.java 日志监听类:

package com.example.mysqlbinarylog.binlog;

import com.alibaba.fastjson2.JSONObject;
import com.example.mysqlbinarylog.util.Tool;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
@Order(1)

public class BinaryLogClientRunner implements CommandLineRunner {
    @Autowired
    private BinLogConstants binLogConstants;
    private static volatile Map<String, TableData> map = new ConcurrentHashMap<>();

    @Override
    public void run(String... args) throws Exception {
        String host = binLogConstants.getHost();
        int port = binLogConstants.getPort();
        String username = binLogConstants.getUsername();
        String passwd = binLogConstants.getPasswd();
        Integer serverId = binLogConstants.getServerId();
        List<String> tableList = binLogConstants.getTables();
//        // 数据库下的表字段
        Map<String, Map<Integer, String>> metadata = BingLogMetadata.database(binLogConstants);
        /**
         * BinaryLogClient 类是 MySQL 提供的一个 Java 客户端,用于监听 MySQL 数据库的二进制日志(Binary Log),并实现实时的数据同步。
         * BinaryLogClient 库底层使用了 MySQL 原生协议和通信机制,能够准确地捕获到 MySQL 数据库中的变化事件,
         * 并将这些事件以异步回调的方式传递给应用程序进行处理
         */
        BinaryLogClient client = new BinaryLogClient(host, port, username, passwd);
        /**
         * EventDeserializer 是一个接口,主要用于将 Event 数据从序列化格式反序列化为对象格式
         */
        // 数据序列化为java 对象
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                // 此模式下,时间戳表示为自 Unix 纪元(1970 年 1 月 1 日 UTC)以来的毫秒数。这与其他兼容模式不同,后者可能使用不同的格式来表示时间戳
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                // 在此模式下,通过使用字节数组来表示字符和二进制数据,同时保留其编码和格式信息。这使得在读取时更容易处理这些数据,并将它们重新还原为原始格式
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        client.setEventDeserializer(eventDeserializer);
        // 在BinaryLogClient中,serverId属性是一个客户端与MySQL实例之间的唯一标识符
        // 如果有多个客户端同时订阅同一个MySQL实例上的二进制日志,那么每个客户端的serverId应该是唯一的,以避免冲突
        client.setServerId(serverId);

        // 监听db 库中的数据变化
        client.registerEventListener(event -> {
            EventHeader header = event.getHeader();
            EventType eventType = header.getEventType();
            TableData tableData = null;
            /**
             * TableMapEventData,通过它可以获取操作的数据库名称、表名称以及表的id。之所以我们要监听这个事件,
             * 是因为之后监听的实际操作中返回数据中包含了表的id,而没有表名等信息,
             * 所以如果我们想知道具体的操作是在哪一张表的话,就要先维护一个id与表的对应关系
             */
            if (eventType == EventType.TABLE_MAP) {
                TableMapEventData eventData = event.getData();
                long tableId = eventData.getTableId();
                String database = eventData.getDatabase();//库名
                String table = eventData.getTable();//表名称
                StringBuilder builder = new StringBuilder();
                builder.append(database).append(Tool.POINT).append(table);
                if (tableList.contains(builder.toString())) {
                    map.put(String.valueOf(tableId), TableData.builder().database(database).table(table).build());
                }
            }
            List<JSONObject> lists = null;
            if (EventType.isWrite(eventType)) {
                WriteRowsEventData data = event.getData();
                if (null != (tableData = isListener(data.getTableId(), tableList))) {
                    log.info("--------------新增--------------");
                    List<Serializable[]> rows = data.getRows();
                    lists = parseListenerList(rows, tableData, metadata, Event.WRITE);
                }
            } else if (EventType.isUpdate(eventType)) {
                UpdateRowsEventData data = event.getData();
                if (null != (tableData = isListener(data.getTableId(), tableList))) {
                    log.info("--------------修改--------------");
                    List<Map.Entry<Serializable[], Serializable[]>> rows = data.getRows();
                    lists = parseListener(rows, tableData, metadata, Event.UPDATE);
                }
            } else if (EventType.isDelete(eventType)) {
                DeleteRowsEventData data = event.getData();
                if (null != (tableData = isListener(data.getTableId(), tableList))) {
                    log.info("--------------删除--------------");
                    List<Serializable[]> rows = data.getRows();
                    lists = parseListenerList(rows, tableData, metadata, Event.DELETE);
                }
            }
            if (!CollectionUtils.isEmpty(lists)) {
                lists.stream().forEach(e -> log.info(e.toString()));
            }
        });
        try {
            client.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Data
    @Builder
    @AllArgsConstructor
    public static class TableData {
        private String database;
        private String table;
        private String databaseTable;
    }

    private TableData isListener(long tableId, List<String> tableList) {
        TableData tableData = map.get(String.valueOf(tableId));
        if (null == tableData || Tool.isNull(tableList))
            return null;
        String database = tableData.getDatabase();
        String table = tableData.getTable();
        StringBuilder builder = new StringBuilder();
        builder.append(database).append(Tool.POINT).append(table);
        if (tableList.contains(builder.toString())) {
            tableData.setDatabaseTable(builder.toString());
            return tableData;
        }
        return null;
    }

    private List<JSONObject> parseListenerList(List<Serializable[]> rows, TableData tableData, Map<String, Map<Integer, String>> metadata, Event event) {
        Map<Integer, String> map = metadata.get(tableData.getDatabaseTable());
        if (CollectionUtils.isEmpty(map)) {
            return new ArrayList<>(0);
        }
        List<JSONObject> lists = new ArrayList<>(rows.size());
        for (int i = 0; i < rows.size(); i++) {
            Serializable[] serializables = rows.get(i);
            JSONObject resultObject = new JSONObject();
            for (int j = 0; j < serializables.length; j++) {
                Serializable serializable = serializables[j];
                if (null != serializable) {
                    Object value = null;
                    Object valueObject = serializable;
                    if (null != valueObject) {
                        Class<?> aClass = valueObject.getClass();
                        if (null != aClass && aClass.getName().equals("[B")) {
                            value = new String((byte[]) valueObject);
                        } else {
                            value = valueObject;
                        }
                        resultObject.put(map.get(Integer.valueOf(j)), value);
                    }
                }
            }
            resultObject.put("binlog_event", event.getKey());
            resultObject.put("binlog_table_Name", tableData.getTable());
            lists.add(resultObject);
        }
        return lists;
    }

    private List<JSONObject> parseListener(List<Map.Entry<Serializable[], Serializable[]>> rows, TableData tableData, Map<String, Map<Integer, String>> metadata, Event event) {
        Map<Integer, String> map = metadata.get(tableData.getDatabaseTable());
        if (CollectionUtils.isEmpty(map)) {
            return new ArrayList<>(0);
        }

        List<JSONObject> lists = new ArrayList<>(rows.size());
        for (Map.Entry<Serializable[], Serializable[]> row : rows) {
            List<Serializable> entriesBefore = Arrays.asList(row.getKey());
            List<Serializable> entriesAfter = Arrays.asList(row.getValue());
            // before
            JSONObject dataObjectBefore = getDataObject(entriesBefore, map);
            // after
            JSONObject dataObject = getDataObject(entriesAfter, map);
            dataObject.put("before", dataObjectBefore);
            dataObject.put("binlog_event", event.getKey());
            dataObject.put("binlog_table_Name", tableData.getTable());
            lists.add(dataObject);
        }
        return lists;
    }

    private JSONObject getDataObject(List message, Map<Integer, String> metadata) {
        JSONObject resultObject = new JSONObject();
        for (int i = 0; i < message.size(); i++) {
            Object value = null;
            Object valueObject = message.get(i);
            String key = metadata.get(Integer.valueOf(i));
            if (null != valueObject) {
                Class<?> aClass = valueObject.getClass();
                if (null != aClass && aClass.getName().equals("[B")) {
                    value = new String((byte[]) valueObject);
                } else {
                    value = valueObject;
                }

                if (key.equals("create_time") || key.equals("update_time")) {
                    value = (long) value - 8 * 60 * 60 * 1000;
                }

                resultObject.put(key, value);
            }
        }
        return resultObject;
    }
}

3 扩展:

3.1 MySQL的binlog-forma:
MySQL的binlog-format是用于指定二进制日志的格式的设置,它决定了MySQL服务器记录的事件的详细程度和内容。在MySQL中,有以下三种binlog-format可供选择:

  • STATEMENT格式:此格式是最基本的格式,它记录每个执行的SQL语句和执行结果。这种格式的好处在于它可以减少日志大小,因为它只记录SQL语句并不记录执行过程的详细信息。但是,当SQL语句涉及到随机函数或NOW()等动态函数时,日志复制时可能会导致不一致性。

  • . ROW格式:此格式会记录每行数据更新的情况,包括更新前的数据和更新后的数据。这种格式记录的细节比STATEMENT格式更多,因此它会产生更多的日志数据。但是,由于binlog记录的是更改数据的内容,所以在slave节点上进行重放时不需要再次解析SQL语句。

  • MIXED格式:此格式会在STATEMENT格式和ROW格式之间自动切换,以便在记录每个SQL语句的同时还记录更改的行数据。这种格式在大多数情况下都是最好的选择,因为它可以充分利用STATEMENT和ROW格式的优点。

可以通过配置MySQL的my.cnf文件中在[mysqld]节下增加binlog_format=ROW(或STATEMENT、MIXED)来设置。注意,修改binlog_format设置需要重新启动MySQL服务器以使其生效。

4 总结:

BinaryLogClient是一个Java库,用于解析和读取MySQL数据库的二进制日志。它基于MySQL的复制协议,允许应用程序在MySQL数据库实例上实时订阅和消费二进制日志事件。

下面是BinaryLogClient的工作原理:

  • 首先,应用程序创建一个BinaryLogClient实例,并设置必要的连接参数,例如主机名、端口号、用户名、密码等。

  • 客户端连接到MySQL数据库实例,并发送一个特殊的请求,称为dump请求,来指示MySQL开始从当前日志位置发送日志事件。 在请求中,可以指定日志的起始位置和结束位置,以及需要订阅的事件类型、数据库名称或表名称等过滤条件。

  • MySQL服务器将二进制日志事件发送到客户端。BinaryLogClient接收到事件后,解析并转换为Java对象,并尝试回调一个或多个事件监听器。每个监听器可以实现自己的逻辑,例如将事件记录到文件、将事件写入队列或执行自定义代码等。

  • 应用程序可以随时关闭客户端连接,MySQL将停止向客户端发送事件,并且客户端将在退出之前自动处理所有未处理的事件。

BinaryLogClient通过与MySQL复制协议进行交互来实现了从MySQL实例中实时读取和解析二进制日志事件的功能。 它提供了一种方便、高效的方式来跟踪和监控MySQL中的数据变化,帮助应用程序实时处理和响应这些变化。

5 参考:

5.1 java实现mysql的binlog监听;

6 git 地址:

https://codeup.aliyun.com/61cd21816112fe9819da8d9c/mysql-binarylog.git

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐