Flink 1.9.1:JDBCUpsertTableSInk 写数据到Phoenix
文章目录目的实现测试目的Flink插入跟新数据到Phoenix实现目前使用的JDBCUpsertTableSink支持如下集中RDB:DerbyMysqlPostgresql所以需要定义一下Apache Phoenix的写入形式:https://github.com/apache/flink/blob/release-1.9.1/flink-connectors/flink-jdbc/src/ma
·
目的
Flink插入跟新数据到Phoenix
实现
目前使用的JDBCUpsertTableSink支持如下几种RDB:
Derby
Mysql
Postgresql
所以需要定义一下Apache Phoenix的写入形式:
https://github.com/apache/flink/blob/release-1.9.1/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
JDBCDialects.java
private static final List<JDBCDialect> DIALECTS = Arrays.asList(
new DerbyDialect(),
new MySQLDialect(),
new PostgresDialect(),
new PhoenixJDBCDialect()
);
...
...
private static class PhoenixJDBCDialect implements JDBCDialect {
private static final long serialVersionUID = 1L;
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:phoenix:");
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("org.apache.phoenix.jdbc.PhoenixDriver");
}
@Override
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
String columns = Arrays.stream(fieldNames)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
.map(f -> "?")
.collect(Collectors.joining(", "));
return Optional.of("UPSERT INTO " + tableName +
"(" + columns + ")" + " VALUES (" + placeholders + ")");
}
}
}
重新编译打包flink-jdbc模块:
mvn package -Dmaven.test.skip=true -Dcheckstyle.skip=true
测试
val options = JDBCOptions.builder()
.setDBUrl("jdbc:phoenix:192.168.10.16:2181:/hbase-unsecure;autocommit=true")
.setDriverName("org.apache.phoenix.jdbc.PhoenixDriver")
.setTableName("ACTION_LOG")
.build
val tableSche:TableSchema = TableSchema.builder()
.field("id",DataTypes.STRING().notNull())
.field("cnt",DataTypes.BIGINT()).build()
val sink1 = JDBCUpsertTableSink.builder()
.setOptions(options)
.setMaxRetryTimes(5)
.setFlushMaxSize(1000)
.setFlushIntervalMills(1000)
.setTableSchema(tableSche)
.build()
tblEnv.registerTableSink("phoenixOutputTable",sink1)
res.insertInto("phoenixOutputTable")
测试通过!
更多推荐
已为社区贡献1条内容
所有评论(0)