摘要
Flink一般常用的集群模式有 flink on yarn 和standalone模式。
yarn模式需要搭建hadoop集群,该模式主要依靠hadoop的yarn资源调度来实现flink的高可用,达到资源的充分利用和合理分配。一般用于生产环境。
standalone模式主要利用flink自带的分布式集群来提交任务,该模式的优点是不借助其他外部组件,缺点是资源不足需要手动处理。
本文主要以 standalone集群模式为例。
觉得有帮助的话,传播给更多的小伙伴
提示:flinkcdc获取oracle date日期字段的值存在时差而且是long型
一种方法:改java代码new Timestamp((Long) 接收的值 – 8 * 60 * 60 * 1000)
另一种方法:flink-conf.yaml添加(未验证)
env.java.opts.taskmanager: -Duser.timezone=GMT+08
1.项目添加flink依赖
pom.xml
4.0.0com.testtest-cdc1.0-SNAPSHOT1.8${java.version}${java.version}1.2.681.2.81.14.32.3.02.1242.2.12org.postgresqlpostgresql${postgresql.version}com.alibabadruid-spring-boot-starter${druid.version}com.google.code.gsongson2.8.9org.apache.flinkflink-connector-kafka_${scala.version}${flink.version}kafka-clientsorg.apache.kafkacom.ververicaflink-connector-oracle-cdc${flinkcdc.vsersion}org.apache.flinkflink-connector-jdbc_${scala.version}${flink.version}org.apache.flinkflink-table-api-java-bridge_${scala.version}${flink.version} <!--org.apache.flinkflink-table-planner-blink_${scala.version}${flink.version} mysql mysql-connector-java 8.0.15com.oracle.database.jdbcojdbc1019.10.0.0-->org.apache.flinkflink-streaming-java_${scala.version}${flink.version}org.apache.flinkflink-clients_${scala.version}${flink.version}org.apache.flinkflink-cep_${scala.version}${flink.version}org.apache.flinkflink-java${flink.version}org.apache.flinkflink-json${flink.version}com.alibabafastjson${fastjson.vsersion}org.projectlomboklombok1.18.20providedorg.springframework.bootspring-boot-starter-logging2.1.5.RELEASE<!----><!--org.slf4j--><!--slf4j-api--><!--1.7.32--><!----><!----><!--org.slf4j--><!--slf4j-simple--><!--1.7.32--><!----><!----><!--ch.qos.logback--><!--logback-core--><!--1.2.11--><!----><!----><!--ch.qos.logback--><!--logback-classic--><!--1.2.11--><!---->org.springframeworkspring-context5.3.22compilecom.fasterxml.jackson.corejackson-databind2.12.7.1org.apache.maven.pluginsmaven-assembly-plugin3.0.0jar-with-dependenciesmake-assemblypackagesingle
2.oracle开启日志归档
sqlplus / as sysdba启用日志归档alter system set db_recovery_file_dest_size = 10G;alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;shutdown immediate;startup mount;alter database archivelog;alter database open;检查日志归档是否开启archive log list;为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;创建表空间CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;创建用户flinkcdc绑定表空间LOGMINER_TBSCREATE USER flinkcdc IDENTIFIED BY flinkcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;授予flinkcdc用户dba的权限 grant connect,resource,dba to flinkcdc;并授予权限GRANT CREATE SESSION TO flinkcdc;GRANT SELECT ON V_$DATABASE to flinkcdc;GRANT FLASHBACK ANY TABLE TO flinkcdc;GRANT SELECT ANY TABLE TO flinkcdc;GRANT SELECT_CATALOG_ROLE TO flinkcdc;GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;GRANT SELECT ANY TRANSACTION TO flinkcdc;GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;GRANT CREATE TABLE TO flinkcdc;GRANT LOCK ANY TABLE TO flinkcdc;GRANT ALTER ANY TABLE TO flinkcdc;GRANT CREATE SEQUENCE TO flinkcdc;GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;GRANT SELECT ON V_$LOG TO flinkcdc;GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;GRANT SELECT ON V_$LOGFILE TO flinkcdc;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;
修改以下表让其支持增量日志
ALTER TABLE test.table1 SUPPLEMENTAL LOG DATA (ALL) COLUMNS;ALTER TABLE test.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;ALTER TABLE test.table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
3.Flink集群搭建
版本类型 | 版本号 |
---|---|
项目版本 | flink1.14.3、scala2.12、flinkoraclecdc2.3.0 |
flink集群版本 | flink1.14.3 |
hostname | ip | 配置 |
---|---|---|
yy1 | 10.201.1.1 | StandaloneSessionClusterEntrypoint、Taskmanager |
yy2 | 10.201.1.2 | Taskmanager |
yy3 | 10.201.1.3 | Taskmanager |
3.1 Flink下载安装并配置
1) 登录linux
2) cd /usr/local/
3) wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
4) tar –zxvf flink-1.14.3-bin-scala_2.12.tgz
5) cd flink-1.14.3/conf
6) vi flink-conf.yaml
注意:冒号后面有个空格
jobmanager.rpc.address: yy1
# 这个参数比较重要,这个是总内存jobmanager.memory.process.size: 10gb# taskmanager大小taskmanager.memory.process.size: 3gb# 打开注释,并修改保存点存储目录# 配置hdfs目录,一般用于搭建了hadoop集群#state.savepoint.dir: hdfs://yz1:9000/flink/cdc#存储目录设为服务器本地state.checkpoints.dir: file:///bigdata/checkpointsstate.savepoints.dir: file:///bigdata/savepoints#设置检查点保存的数据 默认是一个,增加下面#state.checkpoints.num-retained: 3# 修改slot的个数taskmanager.numberOfTaskSlots: 3#如果不想用flink默认目录/temp 可以自己配置如下并打开# io.tmp.dirs: /data1/flink/tmp# env.pid.dir: /data1/flink/env# web.tmpdir: /data1/flink/tmp#上传的jar包目录,这样不用每次都上传#web.upload.dir: /data1/flink/jar
7)修改masters和workers 文件
masters内容:
yy1:8081
workers内容:
yy1
yy2
yy3
8)复制到其他节点
scp -rq flink-1.14.3 yy2:/usr/local
scp -rq flink-1.14.3 yy3:/usr/local
9)每个节点上建立flink-1.14.3目录的链接(每个节点操作)
ln -s flink-1.14.3 flink
10)配置flink的环境变量(每个节点操作)
vi /etc/profile
#配置如下
export JAVA_HOME=/usr/local/jdk18export FLINK_HOME=/usr/local/flinkexport JRE_HOME=$JAVA_HOME/jreexport CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/libexport PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$FLINK_HOME/bin
11)使其修改生效(每个节点操作)
source /etc/profile
12)在master节点上启动flink集群
start-cluster.sh
13)打开flink任务管理界面
http://10.201.1.1:8081
14)在界面提交任务
15)效果图
4. Flink 提交任务的常用命令
4.1 stantalone模式
flink run –m [ip]:[端口] -p[并行数] -c[main方法所在类的全路径] [jar文件的绝对路径]
flink run -m 10.201.1.1:8090 -p 1 -c com.test.TestStudent /bigdata/testCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
stantalone 模式下savepoint,取消任务的同时savepoint
flink cancel -s 282c334dd9dc9ae04c3d6cbe1bfdf8f2
暂停任务的同时savepoint
flink savepoint 282c334dd9dc9ae04c3d6cbe1bfdf8f2
4.2 flink on yarn模式
flink run -t yarn-per-job -c com.test.TestStudent /bigdata/testCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
Flink on yarn 模式下savepoint
flink savepoint 8f1d21525dc3bebf22f9c3a617326142 hdfs:///flink/cdc -yid application_1657250519562_0007
从保存点恢复
$ bin/flink run -s :savepointPath [:runArgs]
flink run-s hdfs:///flink/cdc/savepoint-a4f769-58ee3095ee02
5.完成
6.问题汇总
1)报错信息:ERROR: Attempting to operate on hdfs namenode as root
ERROR: but there is no HDFS_NAMENODE_USER defined. Aborting operation.
Starting datanodes
ERROR: Attempting to operate on hdfs datanode as root
ERROR: but there is no HDFS_DATANODE_USER defined. Aborting operation.
Starting secondary namenodes [hadoop]
ERROR: Attempting to operate on hdfs secondarynamenode as root
ERROR: but there is no HDFS_SECONDARYNAMENODE_USER defined. Aborting operation.
2018-07-16 05:45:04,628 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Starting resourcemanager
ERROR: Attempting to operate on yarn resourcemanager as root
ERROR: but there is no YARN_RESOURCEMANAGER_USER defined. Aborting operation.
Starting nodemanagers
ERROR: Attempting to operate on yarn nodemanager as root
ERROR: but there is no YARN_NODEMANAGER_USER defined. Aborting operation.
解决:
vi /etc/profile 加入以下信息,然后source /etc/profile
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
export HADOOP_CLASSPATH=hadoop classpath
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
2)报错信息:java.lang.IllegalStateException: Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately,
you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
解决:
修改flink配置文件:vi flink-conf.yaml
增加:classloader.check-leaked-classloader: false
3)File /tmp/logs/root/logs-tfile/application_1656991740104_0001 does not exist.
File /tmp/logs/root/bucket-logs-tfile/0001/application_1656991740104_0001 does not exist.
Can not find any log file matching the pattern: [ALL] for the application: application_1656991740104_0001
Can not find the logs for the application: application_1656991740104_0001 with the appOwner: root
解决:
yarn-site.xml 增加以下
yarn.log-aggregation-enabletrue
4)报错信息:DebeziumException: Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA
解决:
ALTER TABLE 表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;