几天前,canal发布了1.1.5版本,通过该版本可以将mysql热迁移到es7,1.1.5之前的版本是不支持es7的。
官方地址
1.下载,canal-1.1.5,下载下面几个:
在这里插入图片描述
2.配置mysql和deployer,参考文档https://github.com/alibaba/canal/wiki/QuickStart
3.配置adapter,参考文档https://github.com/alibaba/canal/wiki/ClientAdapterhttps://github.com/alibaba/canal/wiki/Sync-ES,或者参考下面配置
vi conf/application.yml
(1)修改端口

server:
  port: 9091

(2)指定前面的deployer的ip和端口

canal.conf:
  consumerProperties:
    canal.tcp.server.host: 192.168.23.128:11111

(3)指定数据源

canal.conf:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.23.128:3339/test2?useUnicode=true
      username: canal
      password: canal

(4)指定es,没密码的话如下

canal.conf:
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7 #名字不要输错了,对应目录es7
        hosts: 192.168.23.128:9300 # 127.0.0.1:9200 for rest mode
        properties:
          #          mode: transport # or rest
          #          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch #名字可以浏览器访问es所在服务器:9200查看,如192.168.23.128:9200

有密码,比如admin,123456的话如下

canal.conf:
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7 #名字不要输错了,对应目录es7
        hosts: http://192.168.23.128:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
          security.auth: admin:123456 #  only used for rest mode
          cluster.name: elasticsearch

其它配置不用改,完整配置如下:

server:
  port: 9091
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 192.168.23.128:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.23.128:3339/test2?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7
        hosts: 192.168.23.128:9300 # 127.0.0.1:9200 for rest mode
        properties:
          #          mode: transport # or rest
          #          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

然后 cd es7,在这里可以配置要迁移数据的表,比如迁移user表的数据,需要先在es创建对应的索引,然后在es7目录创建一个配置文件。
vim user.yml

dataSourceKey: defaultDS #对应application.yml指定的数据库
destination: example #对应application.yml的instance
groupId: g1 #对应application.yml的groupId
esMapping:
  _index: user #数据库对应的索引
  _id: _id #下面sql查询出来的主键
  upsert: true #更新时es没有对应数据则会在更新时顺便插入
  sql: "select id as _id, id,username from user"
#  etlCondition: "where t.c_time>={}"
  commitBatch: 3000 #每次批量插入条数

这里有个小点:
假如是这样
_id: id
sql: “select id,username from user”
这时候如果es的mapping有id字段,这个id字段是没有值的,如果需要id字段有值可以这样:
_id: _id
sql: “select id as _id, id,username from user”
之后启动adapter
sh bin/startup.sh
查看日志,没有报异常并提示以下信息则启动成功
tail -100f logs/adapter/adapter.log

2021-04-23 16:12:03.362 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2021-04-23 16:12:03.368 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2021-04-23 16:12:03.384 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2021-04-23 16:12:03.409 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2021-04-23 16:12:03.430 [main] INFO  c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 9.075 seconds (JVM running for 9.746)
2021-04-23 16:12:03.500 [Thread-4] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============

不过按照当前的版本会报如下错误

 java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

需要修改源码解决,打开刚刚下载的源码,修改canal-canal-1.1.5\client-adapter\es7x\pom.xml

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>client-adapter.common</artifactId>
            <version>${project.version}</version>
            <!--增加下面这段-->
            <scope>provided</scope>
        </dependency>

然后修改canal-canal-1.1.5\pom.xml,主要是打包跳过测试,避免报错

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${maven-surefire.version}</version>
                <configuration>
                    <useSystemClassLoader>true</useSystemClassLoader>
                    <forkMode>once</forkMode>
                    <argLine>${argline} ${jacocoArgLine}</argLine>
                    <systemProperties>
                        <!-- common shared -->
                    </systemProperties>
                    <skipTests>true</skipTests>    <!--默认关掉单元测试 -->
                </configuration>
            </plugin>

之后maven插件执行下面
在这里插入图片描述
执行完后用canal-canal-1.1.5\client-adapter\es7x\target\client-adapter.es7x-1.1.5-jar-with-dependencies.jar下面包替换conf/plugins/client-adapter.es7x-1.1.5-jar-with-dependencies.jar
之后重启

cd bin
sh restart.sh

全量更新,更新时会停止消费binlog,待更新完后继续消费,全量更新时,es已有的数据会被覆盖,不会重复插入,但是es多出来的数据不会被删除,比如es有id为1001的数据,然后mysql没有,这是全量迁移后,这条1001的数据还在。

curl http://127.0.0.1:9091/etl/es7/user.yml -X POST

问题:插入或更新的时候有些字段不变。
1.可能字段包含关键字,比如group

select id as _id,id,name,group from test

含有关键字group,会在全量迁移时报错。需要改为如下

select id as _id,id,name,`group` from test

但是这样子,在增量插入或更新时,group字段更新不到
最终还是用别名好,既可以全量也可以增量

select id as _id,id,name,t.group from test t

2.可能日志格式不对
mysql的binlog_row_image默认为FULL,但也有例外,比如我们公司为MINIMAL
两者区别
FULL:会记录更新前后所有列的数据
MINIMAL:没有主键,会记录更新前所有列,有主键,只会记录更新前的主键,更新后的列只会记录变化的。
而canal_adapter获取的是后置镜像,也就是更新后的列,这时MINIMAL后置镜像没有主键,所以更新失败。可以通过修改源码解决(针对单一主键的情况,无主键或者多主键可以自己改)
在这里插入图片描述

else if (eventType == CanalEntry.EventType.UPDATE) {
                        columns = rowData.getAfterColumnsList();
                        if (rowData.getBeforeColumnsCount() == 1) {
                            //当binlog的binlog_row_image为MINIMAL时,前镜像只记录主键,后镜像只记录修改的列,需要把主键加进来
                            CanalEntry.Column pkColumn = rowData.getBeforeColumns(0);
                            List<CanalEntry.Column> list = new ArrayList<>(columns.size() + 1);
                            list.add(pkColumn);
                            list.addAll(columns);
                            columns = list;
                        }
                    }

在这里插入图片描述

 if (eventType == CanalEntry.EventType.UPDATE) {
                        Map<String, Object> rowOld = new LinkedHashMap<>();
                        List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                        if (rowData.getBeforeColumnsCount() == 1) {
                            beforeColumnsList = columns;
                        }
                        for (CanalEntry.Column column : beforeColumnsList) {
                            if (updateSet.contains(column.getName())) {
                                if (column.getIsNull()) {
                                    rowOld.put(column.getName(), null);
                                } else {
                                    rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(msg.getTable(),
                                            column.getName(),
                                            rowData.getBeforeColumnsCount() == 1 ? "unknown" : column.getValue(),
                                            column.getSqlType(),
                                            column.getMysqlType()));
                                }
                            }
                        }

在这里插入图片描述
然后打包为connector.core-1.1.5.jar,再把lib/connector.core-1.1.5.jar替换掉就可以。

一些其它的配置(不是必要配置):

白名单黑名单,可以设置哪些需要关注的数据库或者不需要关注的数据库。在deployer/conf/example/instance.properties配置,参考http://blog.sina.com.cn/s/blog_6dd718930102xevp.html

# 白名单,比如关注前缀为test_的数据库下所有表
canal.instance.filter.regex=test_.*\\..*
# 黑名单
canal.instance.filter.black.regex=mysql\\.slave_.*

配置tsdb,如果是单机不用改,如果是canal集群需要将信息保存在数据库
先创建一个数据库,比如 canal_config,然后执行deployer/conf/spring/tsdb/sql/create_table.sql语句,创建两张表。修改deployer/conf/canal/properties

canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:mysql://192.168.23.128:3306/canal_config?useSSL=false
canal.instance.tsdb.dbUsername = canal_config #需要有写权限的账号
canal.instance.tsdb.dbPassword = 123456
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

然后重启deployer和adapter就可以。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐