1、部署
1.1、修改flink-conf.yaml
1.1.1、flink-17
jobmanager.rpc.address: boshi-122jobmanager.rpc.port: 6123# 设置jobmanager总内存jobmanager.memory.process.size: 2048m# 设置taskmanager的运行总内存taskmanager.memory.process.size: 4096mb# 设置用户代码运行内存taskmanager.memory.task.heap.size: 3072m# 设置flink框架内存taskmanager.memory.framework.heap.size: 128m# 设置managed memory内存taskmanager.memory.managed.size: 128m# 设置堆外内存taskmanager.memory.framework.off-heap.size: 128m# 设置网络缓存taskmanager.memory.network.max: 128m# 设置JVM内存taskmanager.memory.jvm-metaspace.size: 256mtaskmanager.memory.jvm-overhead.max: 256mtaskmanager.numberOfTaskSlots: 8parallelism.default: 1jobmanager.execution.failover-strategy: regionclassloader.check-leaked-classloader: falseakka.ask.timeout: 50sweb.timeout: 50000heartbeat.timeout: 180000taskmanager.network.request-backoff.max: 240000state.savepoints.dir: file:///data/flink/savepoint/state.checkpoints.dir: file:///data/flink/checkpoint/env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
1.1.2、flink-1-13
jobmanager.rpc.address: boshi-146jobmanager.rpc.port: 6123# 设置jobmanager总内存jobmanager.memory.process.size: 4096m# 设置taskmanager的运行总内存taskmanager.memory.process.size: 16384mb# 设置用户代码运行内存taskmanager.memory.task.heap.size: 15360m# 设置flink框架内存taskmanager.memory.framework.heap.size: 128m# 设置managed memory内存taskmanager.memory.managed.size: 128m# 设置堆外内存taskmanager.memory.framework.off-heap.size: 128m# 设置网络缓存taskmanager.memory.network.max: 128m# 设置JVM内存taskmanager.memory.jvm-metaspace.size: 256mtaskmanager.memory.jvm-overhead.max: 256mtaskmanager.numberOfTaskSlots: 2parallelism.default: 2jobmanager.execution.failover-strategy: regionclassloader.check-leaked-classloader: falseakka.ask.timeout: 100sweb.timeout: 100000heartbeat.timeout: 180000taskmanager.network.request-backoff.max: 240000state.savepoints.dir: hdfs://hdfs-ha/flink/savepoint/state.checkpoints.dir: hdfs://hdfs-ha/flink/checkpoint/env.java.opts: -server -XX:+UseG1GC -Xloggc:/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=100M
1.2、masters
boshi-122:8081
1.3、workers
boshi-129boshi-137boshi-144boshi-166
2、提交任务
2.1、mysql-to-kafka-starrocks
--MySQL表CREATE TABLE mysql_crawl_enterprise_website (`id` int,`eid` varchar,`enterprise_name` varchar,`website` varchar,`html` varchar, PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'ip','port' = '3306','username' = 'root','password' = '','database-name' = 'db_enterprise_outer_resource','table-name' = 'crawl_enterprise_website','scan.incremental.snapshot.enabled' = 'false');--Kafka表CREATE TABLE kafka_crawl_enterprise_website (`id` int,`eid` varchar,`enterprise_name` varchar,`website` varchar,`html` varchar,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'upsert-kafka','topic' = 'ods_crawl_enterprise_website','properties.bootstrap.servers' = 'ip:6667,ip:6667,ip:6667','properties.group.id' = 'source_province','properties.max.request.size' = '512000000','properties.session.timeout.ms' = '60000','properties.request.timeout.ms' = '40000',-- 'properties.max.poll.records' = '100',-- 'properties.auto.offset.reset' = 'latest','key.format' = 'json','value.format' = 'json');--Starrocks表CREATE TABLE starrock_ods_crawl_enterprise_website (`id` int,`eid` varchar,`enterprise_name` varchar,`website` varchar,`html` varchar, PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'starrocks','jdbc-url' = 'jdbc:mysql://ip:9030','load-url' = 'ip:8030','database-name' = 'ods','table-name' = 'ods_crawl_enterprise_website','username' = 'starrocks','password' = '','sink.max-retries' = '5',-- 'sink.parallelism' = '2',-- 'sink.version' = 'V1',-- 'sink.buffer-flush.max-rows' = '64000','sink.buffer-flush.max-bytes' = '256000000','sink.buffer-flush.interval-ms' = '3000',-- 'sink.properties.label' ='ods_crawl_enterprise_website','sink.properties.format' = 'json','sink.properties.strip_outer_array' = 'true','sink.properties.ignore_json_size' = 'true' --忽略对 JSON Body 大小的检查);--MySQL数据同步到Kafkainsert into kafka_crawl_enterprise_website select * from mysql_crawl_enterprise_website;--Kafka数据同步到Starrocksinsert into starrock_ods_crawl_enterprise_website select * from kafka_crawl_enterprise_website;
2.2、提交参数
jobmanager.memory.process.size=4096mtaskmanager.memory.process.size=8192mtaskmanager.memory.task.heap.size=7168mtaskmanager.memory.framework.heap.size=128mtaskmanager.memory.framework.off-heap.size=128mtaskmanager.memory.managed.size=128mtaskmanager.memory.network.max=128mtaskmanager.memory.jvm-metaspace.size=256mtaskmanager.memory.jvm-overhead.max=256mparallelism.default=3taskmanager.numberOfTaskSlots=1yarn.containers.vcores=1
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END