1、部署
1.1、修改flink-conf.yaml
1.1.1、flink-17
jobmanager.rpc.address: boshi-122jobmanager.rpc.port: 6123jobmanager.memory.process.size: 2048mtaskmanager.memory.process.size: 4096mbtaskmanager.memory.task.heap.size: 3072mtaskmanager.memory.framework.heap.size: 128mtaskmanager.memory.managed.size: 128mtaskmanager.memory.framework.off-heap.size: 128mtaskmanager.memory.network.max: 128mtaskmanager.memory.jvm-metaspace.size: 256mtaskmanager.memory.jvm-overhead.max: 256mtaskmanager.numberOfTaskSlots: 8parallelism.default: 1jobmanager.execution.failover-strategy: regionclassloader.check-leaked-classloader: falseakka.ask.timeout: 50sweb.timeout: 50000heartbeat.timeout: 180000taskmanager.network.request-backoff.max: 240000state.savepoints.dir: file:///data/flink/savepoint/state.checkpoints.dir: file:///data/flink/checkpoint/env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
1.1.2、flink-1-13
jobmanager.rpc.address: boshi-146jobmanager.rpc.port: 6123jobmanager.memory.process.size: 4096mtaskmanager.memory.process.size: 16384mbtaskmanager.memory.task.heap.size: 15360mtaskmanager.memory.framework.heap.size: 128mtaskmanager.memory.managed.size: 128mtaskmanager.memory.framework.off-heap.size: 128mtaskmanager.memory.network.max: 128mtaskmanager.memory.jvm-metaspace.size: 256mtaskmanager.memory.jvm-overhead.max: 256mtaskmanager.numberOfTaskSlots: 2parallelism.default: 2jobmanager.execution.failover-strategy: regionclassloader.check-leaked-classloader: falseakka.ask.timeout: 100sweb.timeout: 100000heartbeat.timeout: 180000taskmanager.network.request-backoff.max: 240000state.savepoints.dir: hdfs://hdfs-ha/flink/savepoint/state.checkpoints.dir: hdfs://hdfs-ha/flink/checkpoint/env.java.opts: -server -XX:+UseG1GC -Xloggc:/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=100M
1.2、masters
boshi-122:8081
1.3、workers
boshi-129boshi-137boshi-144boshi-166
2、提交任务
2.1、mysql-to-kafka-starrocks
CREATE TABLE mysql_crawl_enterprise_website (`id` int,`eid` varchar,`enterprise_name` varchar,`website` varchar,`html` varchar, PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'ip','port' = '3306','username' = 'root','password' = '','database-name' = 'db_enterprise_outer_resource','table-name' = 'crawl_enterprise_website','scan.incremental.snapshot.enabled' = 'false');CREATE TABLE kafka_crawl_enterprise_website (`id` int,`eid` varchar,`enterprise_name` varchar,`website` varchar,`html` varchar,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'upsert-kafka','topic' = 'ods_crawl_enterprise_website','properties.bootstrap.servers' = 'ip:6667,ip:6667,ip:6667','properties.group.id' = 'source_province','properties.max.request.size' = '512000000','properties.session.timeout.ms' = '60000','properties.request.timeout.ms' = '40000','key.format' = 'json','value.format' = 'json');CREATE TABLE starrock_ods_crawl_enterprise_website (`id` int,`eid` varchar,`enterprise_name` varchar,`website` varchar,`html` varchar, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ('connector' = 'starrocks','jdbc-url' = 'jdbc:mysql://ip:9030','load-url' = 'ip:8030','database-name' = 'ods','table-name' = 'ods_crawl_enterprise_website','username' = 'starrocks','password' = '','sink.max-retries' = '5','sink.buffer-flush.max-bytes' = '256000000','sink.buffer-flush.interval-ms' = '3000','sink.properties.format' = 'json','sink.properties.strip_outer_array' = 'true','sink.properties.ignore_json_size' = 'true' );insert into kafka_crawl_enterprise_website select * from mysql_crawl_enterprise_website;insert into starrock_ods_crawl_enterprise_website select * from kafka_crawl_enterprise_website;
2.2、提交参数
jobmanager.memory.process.size=4096mtaskmanager.memory.process.size=8192mtaskmanager.memory.task.heap.size=7168mtaskmanager.memory.framework.heap.size=128mtaskmanager.memory.framework.off-heap.size=128mtaskmanager.memory.managed.size=128mtaskmanager.memory.network.max=128mtaskmanager.memory.jvm-metaspace.size=256mtaskmanager.memory.jvm-overhead.max=256mparallelism.default=3taskmanager.numberOfTaskSlots=1yarn.containers.vcores=1