摘要

Flink一般常用的集群模式有 flink on yarn 和standalone模式。
yarn模式需要搭建hadoop集群,该模式主要依靠hadoop的yarn资源调度来实现flink的高可用,达到资源的充分利用和合理分配。一般用于生产环境。
standalone模式主要利用flink自带的分布式集群来提交任务,该模式的优点是不借助其他外部组件,缺点是资源不足需要手动处理。
本文主要以 standalone集群模式为例。

提示:flinkcdc获取oracle date日期字段的值存在时差而且是long型
一种方法:改java代码 例如:
preparedStatement.setObject(i, new Timestamp((Long) o - 8 * 60 * 60 * 1000));
另一种方法:flink-conf.yaml添加(未验证)
env.java.opts.taskmanager: -Duser.timezone=GMT+08

1.项目添加flink依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>test-cdc</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <fastjson.vsersion>1.2.68</fastjson.vsersion>
        <druid.version>1.2.8</druid.version>
        <flink.version>1.13.6</flink.version>
        <flinkcdc.vsersion>3.0.0</flinkcdc.vsersion>
        <scala.version>2.12</scala.version>
        <postgresql.version>42.2.12</postgresql.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>${postgresql.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-oracle-cdc</artifactId>
            <version>${flinkcdc.vsersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

       <!--  <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
                    <version>${flink.version}</version>
                </dependency>
              <dependency>
                       <groupId>mysql</groupId>
                       <artifactId>mysql-connector-java</artifactId>
                       <version>8.0.15</version>
                </dependency>
                <dependency>
                    <groupId>com.oracle.database.jdbc</groupId>
                    <artifactId>ojdbc10</artifactId>
                    <version>19.10.0.0</version>
                </dependency>
          -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.vsersion}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
            <version>2.1.5.RELEASE</version>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>org.slf4j</groupId>-->
        <!--            <artifactId>slf4j-api</artifactId>-->
        <!--            <version>1.7.32</version>-->
        <!--        </dependency>-->
        <!--  slf4j 内置的简单实现      -->
        <!--                <dependency>-->
        <!--                    <groupId>org.slf4j</groupId>-->
        <!--                    <artifactId>slf4j-simple</artifactId>-->
        <!--                    <version>1.7.32</version>-->
        <!--                </dependency>-->
        <!--                <dependency>-->
        <!--            <groupId>ch.qos.logback</groupId>-->
        <!--            <artifactId>logback-core</artifactId>-->
        <!--            <version>1.2.11</version>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>ch.qos.logback</groupId>-->
        <!--            <artifactId>logback-classic</artifactId>-->
        <!--            <version>1.2.11</version>-->
        <!--        </dependency>-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.3.22</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.oracle开启日志归档

sqlplus / as sysdba

启用日志归档
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

检查日志归档是否开启
archive log list;

为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

创建用户flinkcdc绑定表空间LOGMINER_TBS
CREATE USER flinkcdc IDENTIFIED BY flinkcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

授予flinkcdc用户dba的权限
 grant connect,resource,dba to flinkcdc;

并授予权限
  GRANT CREATE SESSION TO flinkcdc;
  GRANT SELECT ON V_$DATABASE to flinkcdc;
  GRANT FLASHBACK ANY TABLE TO flinkcdc;
  GRANT SELECT ANY TABLE TO flinkcdc;
  GRANT SELECT_CATALOG_ROLE TO flinkcdc;
  GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;
  GRANT SELECT ANY TRANSACTION TO flinkcdc;
  GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
  GRANT CREATE TABLE TO flinkcdc;
  GRANT LOCK ANY TABLE TO flinkcdc;
  GRANT ALTER ANY TABLE TO flinkcdc;
  GRANT CREATE SEQUENCE TO flinkcdc;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;

  GRANT SELECT ON V_$LOG TO flinkcdc;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;
  GRANT SELECT ON V_$LOGFILE TO flinkcdc;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;

修改以下表让其支持增量日志

ALTER TABLE test.table1 SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE test.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE test.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

3.Flink集群搭建

版本类型版本号
项目版本flink1.13.6、scala2.12、flinkoraclecdc3.0.0
flink集群版本flink1.15.3
hostnameip配置内存核数solt
yy110.201.1.1StandaloneSessionClusterEntrypoint、Taskmanager64核512G2个
yy210.201.1.2Taskmanager112核512G10个
yy310.201.1.3Taskmanager112核512G10个

3.1 Flink下载安装并配置
1) 登录linux
2) cd /usr/local/
3) wget https://archive.apache.org/dist/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz
4) tar –zxvf flink-1.15.3-bin-scala_2.12.tgz
5) cd flink-1.15.3/conf
6) vi flink-conf.yaml
注意:冒号后面有个空格
jobmanager.rpc.address: yy1

# 这个参数比较重要,这个是总内存
jobmanager.memory.process.size: 10gb

# taskmanager大小
taskmanager.memory.process.size: 3gb

# 打开注释,并修改保存点存储目录
# 配置hdfs目录,一般用于搭建了hadoop集群
#state.savepoint.dir: hdfs://yy1:9000/flink/cdc

#存储目录设为服务器本地
state.checkpoints.dir: file:///bigdata/checkpoints

state.savepoints.dir: file:///bigdata/savepoints

#设置检查点保存的数据 默认是一个,增加下面
#state.checkpoints.num-retained: 3

# 修改slot的个数
taskmanager.numberOfTaskSlots: 2

#如果不想用flink默认目录/temp 可以自己配置如下并打开
# io.tmp.dirs: /data1/flink/tmp
# env.pid.dir: /data1/flink/env
# web.tmpdir: /data1/flink/tmp
#上传的jar包目录,这样不用每次都上传
#web.upload.dir: /data1/flink/jar

7)修改masters和workers 文件
masters内容:
yy1:8081

workers内容:
yy1
yy2
yy3

8)复制到其他节点
scp -rq flink-1.15.3 yy2:/usr/local
scp -rq flink-1.15.3 yy3:/usr/local

9)每个节点上建立flink-1.15.3目录的链接(每个节点操作)
ln -s flink-1.15.3 flink

10)配置flink的环境变量(每个节点操作)
vi /etc/profile
#配置如下

export JAVA_HOME=/usr/local/jdk18
export FLINK_HOME=/usr/local/flink
export JRE_HOME=$JAVA_HOME/jre
export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$FLINK_HOME/bin

11)使其修改生效(每个节点操作)
source /etc/profile
12)在master节点上启动flink集群
start-cluster.sh
13)打开flink任务管理界面

http://10.201.1.1:8081

在这里插入图片描述

14)在界面提交任务

在这里插入图片描述
15)效果图
在这里插入图片描述

4. Flink 提交任务的常用命令

4.1 stantalone模式
flink run –m [ip]:[端口] -p[并行数] -c[main方法所在类的全路径] [jar文件的绝对路径]

flink run -m 10.201.1.1:8090 -p 1 -c com.test.TestStudent /bigdata/testCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

stantalone 模式下savepoint,取消任务的同时savepoint

flink cancel -s 282c334dd9dc9ae04c3d6cbe1bfdf8f2

暂停任务的同时savepoint

flink savepoint 282c334dd9dc9ae04c3d6cbe1bfdf8f2

4.2 flink on yarn模式

flink run -t yarn-per-job -c com.test.TestStudent /bigdata/testCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

Flink on yarn 模式下savepoint

flink savepoint 8f1d21525dc3bebf22f9c3a617326142 hdfs:///flink/cdc -yid application_1657250519562_0007

从保存点恢复
$ bin/flink run -s :savepointPath [:runArgs]

flink run  -s hdfs:///flink/cdc/savepoint-a4f769-58ee3095ee02

5.完成

6.问题汇总

1)报错信息:ERROR: Attempting to operate on hdfs namenode as root
ERROR: but there is no HDFS_NAMENODE_USER defined. Aborting operation.
Starting datanodes
ERROR: Attempting to operate on hdfs datanode as root
ERROR: but there is no HDFS_DATANODE_USER defined. Aborting operation.
Starting secondary namenodes [hadoop]
ERROR: Attempting to operate on hdfs secondarynamenode as root
ERROR: but there is no HDFS_SECONDARYNAMENODE_USER defined. Aborting operation.
2018-07-16 05:45:04,628 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Starting resourcemanager
ERROR: Attempting to operate on yarn resourcemanager as root
ERROR: but there is no YARN_RESOURCEMANAGER_USER defined. Aborting operation.
Starting nodemanagers
ERROR: Attempting to operate on yarn nodemanager as root
ERROR: but there is no YARN_NODEMANAGER_USER defined. Aborting operation.

解决:
vi /etc/profile 加入以下信息,然后source /etc/profile
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
export HADOOP_CLASSPATH=hadoop classpath
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

2)报错信息:java.lang.IllegalStateException: Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately,
you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
解决:
修改flink配置文件:vi flink-conf.yaml
增加:classloader.check-leaked-classloader: false

3)File /tmp/logs/root/logs-tfile/application_1656991740104_0001 does not exist.
File /tmp/logs/root/bucket-logs-tfile/0001/application_1656991740104_0001 does not exist.

Can not find any log file matching the pattern: [ALL] for the application: application_1656991740104_0001
Can not find the logs for the application: application_1656991740104_0001 with the appOwner: root

解决:
yarn-site.xml 增加以下

<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>

4)报错信息:DebeziumException: Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA

解决:
ALTER TABLE 表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

5)oracle版本是19c,flink ui界面报错
报错信息:Caused by: java.sql.SQLException: ORA-44609: CONTINOUS_MINE is desupported for use with DBMS_LOGMNR.START_LOGMNR.
ORA-06512: at “SYS.DBMS_LOGMNR”, line 72
解决
注释以下配置
// proper.setProperty(“log.mining.continuous.mine”, “true”);

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐