文章目录

目的

Flink插入跟新数据到Phoenix

实现

目前使用的JDBCUpsertTableSink支持如下几种RDB:
Derby
Mysql
Postgresql
在这里插入图片描述
所以需要定义一下Apache Phoenix的写入形式:

https://github.com/apache/flink/blob/release-1.9.1/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java

JDBCDialects.java

    private static final List<JDBCDialect> DIALECTS = Arrays.asList(
            new DerbyDialect(),
            new MySQLDialect(),
            new PostgresDialect(),
            new PhoenixJDBCDialect()
    );
    ...
    ...
    private static class PhoenixJDBCDialect implements JDBCDialect {

        private static final long serialVersionUID = 1L;

        @Override
        public boolean canHandle(String url) {
            return url.startsWith("jdbc:phoenix:");
        }

        @Override
        public Optional<String> defaultDriverName() {
            return Optional.of("org.apache.phoenix.jdbc.PhoenixDriver");
        }

        @Override
        public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
            String columns = Arrays.stream(fieldNames)
                    .collect(Collectors.joining(", "));
            String placeholders = Arrays.stream(fieldNames)
                    .map(f -> "?")
                    .collect(Collectors.joining(", "));
            return Optional.of("UPSERT INTO " + tableName +
                    "(" + columns + ")" + " VALUES (" + placeholders + ")");
        }

    }
}

重新编译打包flink-jdbc模块:

mvn package -Dmaven.test.skip=true -Dcheckstyle.skip=true

在这里插入图片描述

测试

val options = JDBCOptions.builder()
  .setDBUrl("jdbc:phoenix:192.168.10.16:2181:/hbase-unsecure;autocommit=true")
  .setDriverName("org.apache.phoenix.jdbc.PhoenixDriver")
  .setTableName("ACTION_LOG")
  .build

val tableSche:TableSchema = TableSchema.builder()
  .field("id",DataTypes.STRING().notNull())
  .field("cnt",DataTypes.BIGINT()).build()

val sink1 = JDBCUpsertTableSink.builder()
  .setOptions(options)
  .setMaxRetryTimes(5)
  .setFlushMaxSize(1000)
  .setFlushIntervalMills(1000)
  .setTableSchema(tableSche)
  .build()

tblEnv.registerTableSink("phoenixOutputTable",sink1)

res.insertInto("phoenixOutputTable")

在这里插入图片描述
测试通过!

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐