一、环境
- jdk8
- Flink 1.16.1(部署在远程服务器:192.168.137.99)
- Flink CDC 2.3.0
- MySQL 8.0(安装在本地:192.168.3.31)
(安装部署过程略)
二、准备
准备三个数据库:flink_source、flink_sink、flink_sink_second。
将flink_source.source_test表实时同步到flink_sink和flink_sink_second的sink_test表。
(建库建表过程略)
三、Flink SQL Client上开发SQL作业
开发过程可以参考Flink CDC官网的例子(譬如:基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL)。
这里介绍的是MySQL到MySQL的同步,稍有不同,需要下载JDBC SQL连接器的依赖包(下载地址,使用方法),放到目录 {flink-1.16.1}/lib/ 下,用于使用JDBC SQL连接器连接MySQL的sink库。
✳注意下面两个jar的区别,是需要将flink-sql-connector-mysql-cdc-2.3.0.jar放到{flink-1.16.1}/lib/ 下。由于网络限制,本人最初是通过maven下载的flink-connector-mysql-cdc-2.3.0.jar,导致了各种找不到类的异常。
flink-sql-connector-mysql-cdc-2.3.0.jar(包含有依赖的jar)
flink-connector-mysql-cdc-2.3.0.jar(不包含依赖jar)
flink-sql-connector-mysql-cdc-2.3.0.jar:
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency>
flink-connector-mysql-cdc-2.3.0.jar:
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency>
使用下面的命令启动 Flink SQL CLI
./bin/sql-client.sh
然后使用下面的建表语句创建Flink表,以及将flink_source.source_test表实时同步到flink_sink、flink_sink_second的sink_test表。
Flink SQL> SET execution.checkpointing.interval = 3s;[INFO] Session property has been set.Flink SQL> CREATE TABLE source_test (> user_id STRING,> user_name STRING,> PRIMARY KEY (user_id) NOT ENFORCED> ) WITH (>'connector' = 'mysql-cdc',>'hostname' = '192.168.3.31',>'port' = '3306',>'username' = 'root',>'password' = '******',>'database-name' = 'flink_source',>'table-name' = 'source_test'> );[INFO] Execute statement succeed.Flink SQL> CREATE TABLE sink_test (> user_id STRING,> user_name STRING,> PRIMARY KEY (user_id) NOT ENFORCED> ) WITH (>'connector' = 'jdbc',>'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink',>'driver' = 'com.mysql.cj.jdbc.Driver',>'username' = 'root',>'password' = '******',>'table-name' = 'sink_test'> );[INFO] Execute statement succeed.Flink SQL> CREATE TABLE sink_test_second (> user_id STRING,> user_name STRING,> PRIMARY KEY (user_id) NOT ENFORCED> ) WITH (>'connector' = 'jdbc',>'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink_second',>'driver' = 'com.mysql.cj.jdbc.Driver',>'username' = 'root',>'password' = '******',>'table-name' = 'sink_test'> );[INFO] Execute statement succeed.Flink SQL> insert into sink_test select * from source_test;[INFO] Submitting SQL update statement to the cluster...[INFO] SQL update statement has been successfully submitted to the cluster:Job ID: 0c49758cc251699f0b4acd6c9f735e6eFlink SQL> insert into sink_test_second select * from source_test;[INFO] Submitting SQL update statement to the cluster...[INFO] SQL update statement has been successfully submitted to the cluster:Job ID: ecea685a715d7d40ee1a94aac3236c18Flink SQL>
访问Flink web:http://192.168.137.99:8081/#/job/running,可以看到已经新建了两个作业。
四、使用本机执行环境开发SQL作业
直接贴代码。
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkDemo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.1</flink.version><flink-cdc.version>2.3.0</flink-cdc.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><!--com.ververicaflink-connector-mysql-cdc${flink-cdc.version}--></dependencies></project>
FlinkDemo.java:
package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = null;// 本机执行环境env = StreamExecutionEnvironment.getExecutionEnvironment();// 远程执行环境//env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.137.99", 8081);env.enableCheckpointing(3000l).setParallelism(2);EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// CREATE TABLE source_testtableEnv.executeSql(sourceDDL());// CREATE TABLE sink_testtableEnv.executeSql(sinkDDL());// CREATE TABLE sink_test_secondtableEnv.executeSql(sinkDDLOfSecondDb());// 将source_test同步到sink_test和sink_test_secondtableEnv.getConfig().set("pipeline.name", "Flink Demo - To sink_test"); // 设置job名称tableEnv.executeSql("insert into sink_test select * from source_test;");tableEnv.getConfig().set("pipeline.name", "Flink Demo - To sink_test_second");// 设置job名称tableEnv.executeSql("insert into sink_test_second select * from source_test;");}public static String sourceDDL() {String sourceDDL = "CREATE TABLE source_test (\n" +"user_id STRING,\n" +"user_name STRING,\n" +"PRIMARY KEY (user_id) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = '192.168.3.31',\n" +" 'port' = '3306',\n" +" 'username' = 'root',\n" +" 'password' = '******',\n" +" 'database-name' = 'flink_source',\n" +" 'table-name' = 'source_test'\n" +");";return sourceDDL;}public static String sinkDDL() {String sinkDDL = "CREATE TABLE sink_test (\n" +"user_id STRING,\n" +"user_name STRING,\n" +"PRIMARY KEY (user_id) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'username' = 'root',\n" +" 'password' = '******',\n" +" 'table-name' = 'sink_test'\n" +");";return sinkDDL;}public static String sinkDDLOfSecondDb() {String sinkDDL = "CREATE TABLE sink_test_second (\n" +"user_id STRING,\n" +"user_name STRING,\n" +"PRIMARY KEY (user_id) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'url' = 'jdbc:mysql://192.168.3.31:3306/flink_sink_second',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'username' = 'root',\n" +" 'password' = '******',\n" +" 'table-name' = 'sink_test'\n" +");";return sinkDDL;}}
五、远程执行环境开发SQL作业
提交SQL作业到远程Flink集群上执行,需要使用下面的方法创建执行环境:
StreamExecutionEnvironment.createRemoteEnvironment("192.168.137.99", 8081);
而本机执行是:
StreamExecutionEnvironment.getExecutionEnvironment();
上面的FlinkDemo.java,修改执行环境后,运行代码提交作业:
另外在远程执行的时候,必须使用flink-sql-connector-mysql-cdc,不能使用flink-connector-mysql-cdc。否则会报错:Caused by: java.io.StreamCorruptedException: unexpected block data
"C:\Program Files\Java\jdk1.8.0_351\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2022.3.3\lib\idea_rt.jar=53061:C:\Program Files\JetBrains\IntelliJ IDEA 2022.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_351\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_351\jre\lib\rt.jar;E:\java\code\FlinkDemo\target\classes;E:\java\.m2\repository\org\apache\flink\flink-java\1.16.1\flink-java-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-core\1.16.1\flink-core-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-annotations\1.16.1\flink-annotations-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-metrics-core\1.16.1\flink-metrics-core-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-asm-9\9.2-15.0\flink-shaded-asm-9-9.2-15.0.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-jackson\2.12.4-15.0\flink-shaded-jackson-2.12.4-15.0.jar;E:\java\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;E:\java\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\java\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;E:\java\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;E:\java\.m2\repository\org\apache\commons\commons-compress\1.21\commons-compress-1.21.jar;E:\java\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;E:\java\.m2\repository\org\apache\commons\commons-math3\3.6.1\commons-math3-3.6.1.jar;E:\java\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;E:\java\.m2\repository\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;E:\java\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-force-shading\15.0\flink-shaded-force-shading-15.0.jar;E:\java\.m2\repository\org\apache\flink\flink-clients\1.16.1\flink-clients-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-runtime\1.16.1\flink-runtime-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-rpc-core\1.16.1\flink-rpc-core-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-rpc-akka-loader\1.16.1\flink-rpc-akka-loader-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.16.1\flink-queryable-state-client-java-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-hadoop-fs\1.16.1\flink-hadoop-fs-1.16.1.jar;E:\java\.m2\repository\commons-io\commons-io\2.11.0\commons-io-2.11.0.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.70.Final-15.0\flink-shaded-netty-4.1.70.Final-15.0.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-zookeeper-3\3.5.9-15.0\flink-shaded-zookeeper-3-3.5.9-15.0.jar;E:\java\.m2\repository\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;E:\java\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.3\snappy-java-1.1.8.3.jar;E:\java\.m2\repository\org\lz4\lz4-java\1.8.0\lz4-java-1.8.0.jar;E:\java\.m2\repository\org\apache\flink\flink-optimizer\1.16.1\flink-optimizer-1.16.1.jar;E:\java\.m2\repository\commons-cli\commons-cli\1.5.0\commons-cli-1.5.0.jar;E:\java\.m2\repository\org\apache\flink\flink-streaming-java\1.16.1\flink-streaming-java-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-file-sink-common\1.16.1\flink-file-sink-common-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-shaded-guava\30.1.1-jre-15.0\flink-shaded-guava-30.1.1-jre-15.0.jar;E:\java\.m2\repository\org\apache\flink\flink-table-api-java-bridge\1.16.1\flink-table-api-java-bridge-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-api-java\1.16.1\flink-table-api-java-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-api-bridge-base\1.16.1\flink-table-api-bridge-base-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-planner-loader\1.16.1\flink-table-planner-loader-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-runtime\1.16.1\flink-table-runtime-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-table-common\1.16.1\flink-table-common-1.16.1.jar;E:\java\.m2\repository\com\ibm\icu\icu4j\67.1\icu4j-67.1.jar;E:\java\.m2\repository\org\apache\flink\flink-cep\1.16.1\flink-cep-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-connector-base\1.16.1\flink-connector-base-1.16.1.jar;E:\java\.m2\repository\org\apache\flink\flink-connector-jdbc\1.16.1\flink-connector-jdbc-1.16.1.jar;E:\java\.m2\repository\com\ververica\flink-connector-mysql-cdc\2.3.0\flink-connector-mysql-cdc-2.3.0.jar;E:\java\.m2\repository\com\ververica\flink-connector-debezium\2.3.0\flink-connector-debezium-2.3.0.jar;E:\java\.m2\repository\io\debezium\debezium-api\1.6.4.Final\debezium-api-1.6.4.Final.jar;E:\java\.m2\repository\io\debezium\debezium-embedded\1.6.4.Final\debezium-embedded-1.6.4.Final.jar;E:\java\.m2\repository\org\apache\kafka\connect-api\2.7.1\connect-api-2.7.1.jar;E:\java\.m2\repository\org\apache\kafka\kafka-clients\2.7.1\kafka-clients-2.7.1.jar;E:\java\.m2\repository\com\github\luben\zstd-jni\1.4.5-6\zstd-jni-1.4.5-6.jar;E:\java\.m2\repository\javax\ws\rs\javax.ws.rs-api\2.1.1\javax.ws.rs-api-2.1.1.jar;E:\java\.m2\repository\org\apache\kafka\connect-runtime\2.7.1\connect-runtime-2.7.1.jar;E:\java\.m2\repository\org\apache\kafka\kafka-tools\2.7.1\kafka-tools-2.7.1.jar;E:\java\.m2\repository\net\sourceforge\argparse4j\argparse4j\0.7.0\argparse4j-0.7.0.jar;E:\java\.m2\repository\org\apache\kafka\connect-transforms\2.7.1\connect-transforms-2.7.1.jar;E:\java\.m2\repository\com\fasterxml\jackson\jaxrs\jackson-jaxrs-json-provider\2.10.5\jackson-jaxrs-json-provider-2.10.5.jar;E:\java\.m2\repository\com\fasterxml\jackson\jaxrs\jackson-jaxrs-base\2.10.5\jackson-jaxrs-base-2.10.5.jar;E:\java\.m2\repository\com\fasterxml\jackson\module\jackson-module-jaxb-annotations\2.10.5\jackson-module-jaxb-annotations-2.10.5.jar;E:\java\.m2\repository\jakarta\xml\bind\jakarta.xml.bind-api\2.3.2\jakarta.xml.bind-api-2.3.2.jar;E:\java\.m2\repository\jakarta\activation\jakarta.activation-api\1.2.1\jakarta.activation-api-1.2.1.jar;E:\java\.m2\repository\org\glassfish\jersey\containers\jersey-container-servlet\2.31\jersey-container-servlet-2.31.jar;E:\java\.m2\repository\org\glassfish\jersey\containers\jersey-container-servlet-core\2.31\jersey-container-servlet-core-2.31.jar;E:\java\.m2\repository\org\glassfish\hk2\external\jakarta.inject\2.6.1\jakarta.inject-2.6.1.jar;E:\java\.m2\repository\jakarta\ws\rs\jakarta.ws.rs-api\2.1.6\jakarta.ws.rs-api-2.1.6.jar;E:\java\.m2\repository\org\glassfish\jersey\inject\jersey-hk2\2.31\jersey-hk2-2.31.jar;E:\java\.m2\repository\org\glassfish\hk2\hk2-locator\2.6.1\hk2-locator-2.6.1.jar;E:\java\.m2\repository\org\glassfish\hk2\external\aopalliance-repackaged\2.6.1\aopalliance-repackaged-2.6.1.jar;E:\java\.m2\repository\org\glassfish\hk2\hk2-api\2.6.1\hk2-api-2.6.1.jar;E:\java\.m2\repository\org\glassfish\hk2\hk2-utils\2.6.1\hk2-utils-2.6.1.jar;E:\java\.m2\repository\javax\xml\bind\jaxb-api\2.3.0\jaxb-api-2.3.0.jar;E:\java\.m2\repository\javax\activation\activation\1.1.1\activation-1.1.1.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-server\9.4.38.v20210224\jetty-server-9.4.38.v20210224.jar;E:\java\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-http\9.4.38.v20210224\jetty-http-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-io\9.4.38.v20210224\jetty-io-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-servlet\9.4.38.v20210224\jetty-servlet-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-security\9.4.38.v20210224\jetty-security-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-util-ajax\9.4.38.v20210224\jetty-util-ajax-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-servlets\9.4.38.v20210224\jetty-servlets-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-continuation\9.4.38.v20210224\jetty-continuation-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-util\9.4.38.v20210224\jetty-util-9.4.38.v20210224.jar;E:\java\.m2\repository\org\eclipse\jetty\jetty-client\9.4.38.v20210224\jetty-client-9.4.38.v20210224.jar;E:\java\.m2\repository\org\reflections\reflections\0.9.12\reflections-0.9.12.jar;E:\java\.m2\repository\org\apache\maven\maven-artifact\3.6.3\maven-artifact-3.6.3.jar;E:\java\.m2\repository\org\codehaus\plexus\plexus-utils\3.2.1\plexus-utils-3.2.1.jar;E:\java\.m2\repository\org\apache\kafka\connect-json\2.7.1\connect-json-2.7.1.jar;E:\java\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.10.5\jackson-datatype-jdk8-2.10.5.jar;E:\java\.m2\repository\org\apache\kafka\connect-file\2.7.1\connect-file-2.7.1.jar;E:\java\.m2\repository\io\debezium\debezium-connector-mysql\1.6.4.Final\debezium-connector-mysql-1.6.4.Final.jar;E:\java\.m2\repository\io\debezium\debezium-core\1.6.4.Final\debezium-core-1.6.4.Final.jar;E:\java\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.10.5\jackson-core-2.10.5.jar;E:\java\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.10.5.1\jackson-databind-2.10.5.1.jar;E:\java\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.10.5\jackson-annotations-2.10.5.jar;E:\java\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.10.5\jackson-datatype-jsr310-2.10.5.jar;E:\java\.m2\repository\com\google\guava\guava\30.0-jre\guava-30.0-jre.jar;E:\java\.m2\repository\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;E:\java\.m2\repository\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;E:\java\.m2\repository\io\debezium\debezium-ddl-parser\1.6.4.Final\debezium-ddl-parser-1.6.4.Final.jar;E:\java\.m2\repository\org\antlr\antlr4-runtime\4.8\antlr4-runtime-4.8.jar;E:\java\.m2\repository\com\zendesk\mysql-binlog-connector-java\0.25.3\mysql-binlog-connector-java-0.25.3.jar;E:\java\.m2\repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar;E:\java\.m2\repository\com\esri\geometry\esri-geometry-api\2.2.0\esri-geometry-api-2.2.0.jar;E:\java\.m2\repository\com\zaxxer\HikariCP\4.0.3\HikariCP-4.0.3.jar;E:\java\.m2\repository\org\awaitility\awaitility\4.0.1\awaitility-4.0.1.jar;E:\java\.m2\repository\org\hamcrest\hamcrest\2.1\hamcrest-2.1.jar" org.example.FlinkDemoSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.Exception in thread "main" org.apache.flink.table.api.TableException: Failed to execute sqlat org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:867)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)at org.example.FlinkDemo.main(FlinkDemo.java:29)Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Flink Demo - To sink_test'.at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:850)... 4 moreCaused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:750)Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: source_test[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: sink_test[3]at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)... 3 moreCaused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: source_test[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: sink_test[3]at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)... 3 moreCaused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: source_test[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> Sink: sink_test[3]at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)... 4 moreCaused by: java.io.StreamCorruptedException: unexpected block dataat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1685)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2310)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2310)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2310)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2118)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1656)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460)at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488)at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)... 20 moreProcess finished with exit code 1