1.flink版本我用的是1.13.6版本,注意flink版本要与驱动包版本相对应,版本不对应会导致任务启动失败,1.13.6版本对应驱动如下。

2.postgres数据库和mysql数据库我都是用docker搭建的,搭建postgres数据库参考:docker部署postgres数据库_今朝花落悲颜色的博客-CSDN博客

搭建好了一定要修改postgres.conf配置文件,在挂载目录/docker/postgresql/data/下面找到postgres.conf,修改wal_level=logical,然后重启postgres。

 mysql表结构sql

CREATE TABLE `sync_test_1` (
  `total_gmv` bigint(20) DEFAULT NULL,
  `day_time` varchar(255) COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

postgres表结构

CREATE TABLE "public"."ball" (
  "total_gmv" int8,
  "day_time" varchar(32) COLLATE "pg_catalog"."default"
);

ALTER TABLE "public"."ball" ADD CONSTRAINT "ball_pkey" PRIMARY KEY ("day_time");

Flink ddl语句postgres实时同步到mysql如下

--源表

CREATE TABLE pgtest (

  day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义

) WITH (

  'connector' = 'postgres-cdc',  -- 必须为 'postgres-cdc'

  'hostname' = 'localhost',     -- 数据库的 IP

  'port' = '5432',     -- 数据库的访问端口

  'username' = 'postgres',           -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例)

  'password' = '123456',    -- 数据库访问使用的密码

  'database-name' = 'postgres',  -- 需要同步的数据库名

  'schema-name' = 'public',      -- 需要同步的数据库模式 (Schema)

  'table-name' = 'ball' ,      -- 需要同步的数据表名

  'decoding.plugin.name' = 'pgoutput',  -- pgoutput,必须

   'debezium.slot.name' = 'pgtestflink'  -- 指定slot名称,必须

);

--结果表

create table mysqltest ( day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

    'connector' = 'jdbc',

    'url' = 'jdbc:mysql://172.18.11.224:3306/flinktest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',

    'table-name' = 'sync_test_1',

    'username' = 'root',

    'password' = '123456'

);

INSERT INTO mysqltest  

SELECT day_time,total_gmv

FROM pgtest ;

这里要说明一下cdc和jdbc,cdc是实时捕获源表数据变化的,jdbc是sink表连接用的,所以要读哪张表用cdc连接器,写哪张表用jdbc连接器。 

反过来mysql到postgres

--源表

create table mysqltest ( day_time VARCHAR,

    total_gmv bigint,

    PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

       'connector' = 'mysql-cdc',

       'hostname' = '172.18.11.224',

       'port' = '3306',

       'username' = 'root',

       'password' = '123456',

       'database-name' = 'flinktest',

       'table-name' = 'sync_test_1'

);

//结果表

create table pgtest ( day_time VARCHAR,

    total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH (

    'connector' = 'jdbc',

    'url' = 'jdbc:postgresql://localhost:5432/postgres',

    'table-name' = 'public.ball',

    'username' = 'postgres',

    'password' = '123456'

);

INSERT INTO pgtest  

SELECT day_time,total_gmv

FROM mysqltest ;

注意:必须指定主键PRIMARY KEY (主键字段) NOT ENFORCED,与数据表中的主键要对应。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐