flinksql postgres到mysql
最近在学flink用ddl语句写任务,postgres到mysql总是报错,中间遇到了很多坑,自己做完总结一下。
1.flink版本我用的是1.13.6版本,注意flink版本要与驱动包版本相对应,版本不对应会导致任务启动失败,1.13.6版本对应驱动如下。
2.postgres数据库和mysql数据库我都是用docker搭建的,搭建postgres数据库参考:docker部署postgres数据库_今朝花落悲颜色的博客-CSDN博客
搭建好了一定要修改postgres.conf配置文件,在挂载目录/docker/postgresql/data/下面找到postgres.conf,修改wal_level=logical,然后重启postgres。
mysql表结构sql
CREATE TABLE `sync_test_1` (
`total_gmv` bigint(20) DEFAULT NULL,
`day_time` varchar(255) COLLATE utf8mb4_bin NOT NULL,
PRIMARY KEY (`day_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
postgres表结构
CREATE TABLE "public"."ball" (
"total_gmv" int8,
"day_time" varchar(32) COLLATE "pg_catalog"."default"
);ALTER TABLE "public"."ball" ADD CONSTRAINT "ball_pkey" PRIMARY KEY ("day_time");
Flink ddl语句postgres实时同步到mysql如下
--源表
CREATE TABLE pgtest (
day_time VARCHAR,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
'connector' = 'postgres-cdc', -- 必须为 'postgres-cdc'
'hostname' = 'localhost', -- 数据库的 IP
'port' = '5432', -- 数据库的访问端口
'username' = 'postgres', -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例)
'password' = '123456', -- 数据库访问使用的密码
'database-name' = 'postgres', -- 需要同步的数据库名
'schema-name' = 'public', -- 需要同步的数据库模式 (Schema)
'table-name' = 'ball' , -- 需要同步的数据表名
'decoding.plugin.name' = 'pgoutput', -- pgoutput,必须
'debezium.slot.name' = 'pgtestflink' -- 指定slot名称,必须
);
--结果表
create table mysqltest ( day_time VARCHAR,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED ) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.18.11.224:3306/flinktest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
'table-name' = 'sync_test_1',
'username' = 'root',
'password' = '123456'
);
INSERT INTO mysqltest
SELECT day_time,total_gmv
FROM pgtest ;
这里要说明一下cdc和jdbc,cdc是实时捕获源表数据变化的,jdbc是sink表连接用的,所以要读哪张表用cdc连接器,写哪张表用jdbc连接器。
反过来mysql到postgres
--源表
create table mysqltest ( day_time VARCHAR,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED ) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.18.11.224',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'flinktest',
'table-name' = 'sync_test_1'
);
//结果表
create table pgtest ( day_time VARCHAR,
total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'public.ball',
'username' = 'postgres',
'password' = '123456'
);
INSERT INTO pgtest
SELECT day_time,total_gmv
FROM mysqltest ;
注意:必须指定主键PRIMARY KEY (主键字段) NOT ENFORCED,与数据表中的主键要对应。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)