canal_adapter1.1.5将mysql数据同步至elasticsearch7,可以少走一些坑
几天前,canal发布了1.1.5版本,通过该版本可以将mysql热迁移到es7,1.1.5之前的版本是不支持es7的。官方地址1.下载,canal-1.1.5,下载下面几个:2.配置mysql和deployer,参考文档https://github.com/alibaba/canal/wiki/QuickStart3.配置adapter,参考文档https://github.com/alibab
几天前,canal发布了1.1.5版本,通过该版本可以将mysql热迁移到es7,1.1.5之前的版本是不支持es7的。
官方地址
1.下载,canal-1.1.5,下载下面几个:
2.配置mysql和deployer,参考文档https://github.com/alibaba/canal/wiki/QuickStart
3.配置adapter,参考文档https://github.com/alibaba/canal/wiki/ClientAdapter和https://github.com/alibaba/canal/wiki/Sync-ES,或者参考下面配置
vi conf/application.yml
(1)修改端口
server:
port: 9091
(2)指定前面的deployer的ip和端口
canal.conf:
consumerProperties:
canal.tcp.server.host: 192.168.23.128:11111
(3)指定数据源
canal.conf:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.23.128:3339/test2?useUnicode=true
username: canal
password: canal
(4)指定es,没密码的话如下
canal.conf:
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7 #名字不要输错了,对应目录es7
hosts: 192.168.23.128:9300 # 127.0.0.1:9200 for rest mode
properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch #名字可以浏览器访问es所在服务器:9200查看,如192.168.23.128:9200
有密码,比如admin,123456的话如下
canal.conf:
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7 #名字不要输错了,对应目录es7
hosts: http://192.168.23.128:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
security.auth: admin:123456 # only used for rest mode
cluster.name: elasticsearch
其它配置不用改,完整配置如下:
server:
port: 9091
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 192.168.23.128:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.23.128:3339/test2?useUnicode=true
username: canal
password: canal
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es7
hosts: 192.168.23.128:9300 # 127.0.0.1:9200 for rest mode
properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
然后 cd es7,在这里可以配置要迁移数据的表,比如迁移user表的数据,需要先在es创建对应的索引,然后在es7目录创建一个配置文件。
vim user.yml
dataSourceKey: defaultDS #对应application.yml指定的数据库
destination: example #对应application.yml的instance
groupId: g1 #对应application.yml的groupId
esMapping:
_index: user #数据库对应的索引
_id: _id #下面sql查询出来的主键
upsert: true #更新时es没有对应数据则会在更新时顺便插入
sql: "select id as _id, id,username from user"
# etlCondition: "where t.c_time>={}"
commitBatch: 3000 #每次批量插入条数
这里有个小点:
假如是这样
_id: id
sql: “select id,username from user”
这时候如果es的mapping有id字段,这个id字段是没有值的,如果需要id字段有值可以这样:
_id: _id
sql: “select id as _id, id,username from user”
之后启动adapter
sh bin/startup.sh
查看日志,没有报异常并提示以下信息则启动成功
tail -100f logs/adapter/adapter.log
2021-04-23 16:12:03.362 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2021-04-23 16:12:03.368 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2021-04-23 16:12:03.384 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2021-04-23 16:12:03.409 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2021-04-23 16:12:03.430 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 9.075 seconds (JVM running for 9.746)
2021-04-23 16:12:03.500 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
不过按照当前的版本会报如下错误
java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
需要修改源码解决,打开刚刚下载的源码,修改canal-canal-1.1.5\client-adapter\es7x\pom.xml
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>client-adapter.common</artifactId>
<version>${project.version}</version>
<!--增加下面这段-->
<scope>provided</scope>
</dependency>
然后修改canal-canal-1.1.5\pom.xml,主要是打包跳过测试,避免报错
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire.version}</version>
<configuration>
<useSystemClassLoader>true</useSystemClassLoader>
<forkMode>once</forkMode>
<argLine>${argline} ${jacocoArgLine}</argLine>
<systemProperties>
<!-- common shared -->
</systemProperties>
<skipTests>true</skipTests> <!--默认关掉单元测试 -->
</configuration>
</plugin>
之后maven插件执行下面
执行完后用canal-canal-1.1.5\client-adapter\es7x\target\client-adapter.es7x-1.1.5-jar-with-dependencies.jar下面包替换conf/plugins/client-adapter.es7x-1.1.5-jar-with-dependencies.jar
之后重启
cd bin
sh restart.sh
全量更新,更新时会停止消费binlog,待更新完后继续消费,全量更新时,es已有的数据会被覆盖,不会重复插入,但是es多出来的数据不会被删除,比如es有id为1001的数据,然后mysql没有,这是全量迁移后,这条1001的数据还在。
curl http://127.0.0.1:9091/etl/es7/user.yml -X POST
问题:插入或更新的时候有些字段不变。
1.可能字段包含关键字,比如group
如
select id as _id,id,name,group from test
含有关键字group,会在全量迁移时报错。需要改为如下
select id as _id,id,name,`group` from test
但是这样子,在增量插入或更新时,group字段更新不到
最终还是用别名好,既可以全量也可以增量
select id as _id,id,name,t.group from test t
2.可能日志格式不对
mysql的binlog_row_image默认为FULL,但也有例外,比如我们公司为MINIMAL
两者区别
FULL:会记录更新前后所有列的数据
MINIMAL:没有主键,会记录更新前所有列,有主键,只会记录更新前的主键,更新后的列只会记录变化的。
而canal_adapter获取的是后置镜像,也就是更新后的列,这时MINIMAL后置镜像没有主键,所以更新失败。可以通过修改源码解决(针对单一主键的情况,无主键或者多主键可以自己改)
else if (eventType == CanalEntry.EventType.UPDATE) {
columns = rowData.getAfterColumnsList();
if (rowData.getBeforeColumnsCount() == 1) {
//当binlog的binlog_row_image为MINIMAL时,前镜像只记录主键,后镜像只记录修改的列,需要把主键加进来
CanalEntry.Column pkColumn = rowData.getBeforeColumns(0);
List<CanalEntry.Column> list = new ArrayList<>(columns.size() + 1);
list.add(pkColumn);
list.addAll(columns);
columns = list;
}
}
if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, Object> rowOld = new LinkedHashMap<>();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
if (rowData.getBeforeColumnsCount() == 1) {
beforeColumnsList = columns;
}
for (CanalEntry.Column column : beforeColumnsList) {
if (updateSet.contains(column.getName())) {
if (column.getIsNull()) {
rowOld.put(column.getName(), null);
} else {
rowOld.put(column.getName(), JdbcTypeUtil.typeConvert(msg.getTable(),
column.getName(),
rowData.getBeforeColumnsCount() == 1 ? "unknown" : column.getValue(),
column.getSqlType(),
column.getMysqlType()));
}
}
}
然后打包为connector.core-1.1.5.jar,再把lib/connector.core-1.1.5.jar替换掉就可以。
一些其它的配置(不是必要配置):
白名单黑名单,可以设置哪些需要关注的数据库或者不需要关注的数据库。在deployer/conf/example/instance.properties配置,参考http://blog.sina.com.cn/s/blog_6dd718930102xevp.html
# 白名单,比如关注前缀为test_的数据库下所有表
canal.instance.filter.regex=test_.*\\..*
# 黑名单
canal.instance.filter.black.regex=mysql\\.slave_.*
配置tsdb,如果是单机不用改,如果是canal集群需要将信息保存在数据库
先创建一个数据库,比如 canal_config,然后执行deployer/conf/spring/tsdb/sql/create_table.sql语句,创建两张表。修改deployer/conf/canal/properties
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:mysql://192.168.23.128:3306/canal_config?useSSL=false
canal.instance.tsdb.dbUsername = canal_config #需要有写权限的账号
canal.instance.tsdb.dbPassword = 123456
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
然后重启deployer和adapter就可以。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)