文章目录

  • MySQL CDC配置
    • 第一步: 启用binlog
      • 1. 检查MySQL的binlog是否已启用
      • 2. 若未启用binlog
    • 第二步: 设置binlog格式为row
      • 1. 确保MySQL的binlog格式设置为ROW
      • 2. 若未设置为row
    • 第三步: 创建CDC用户
  • MySQL CDC DataStream API实现
    • 1. 定义MySqlSource
    • 2. 数据处理
    • 3. sink到MySQL
  • 参考

MySQL CDC配置

第一步: 启用binlog

1. 检查MySQL的binlog是否已启用

show variables like '%log_bin%';

2. 若未启用binlog

  1. 打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)
  2. 找到[mysqld]部分,添加如下配置
    log-bin=mysql-bin# 指定二进制日志文件的名称前缀server-id=1# 唯一标识MySQL服务器的数字expire_logs_days=30# binlog日志过期时间(按实际情况配置)
  3. 保存并关闭配置文件, 并重启MySQL服务使配置生效
    sudo systemctl restart mysqld

第二步: 设置binlog格式为row

因为要监控表记录变更前后的具体数据, 需要将binlog格式设置为row.

1. 确保MySQL的binlog格式设置为ROW

show variables like '%binlog_format%';

2. 若未设置为row

  1. 打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)
  2. 找到[mysqld]部分,添加如下配置
    binlog_format=ROW
  3. 保存并关闭配置文件, 并重启MySQL服务使配置生效
    sudo systemctl restart mysqld

第三步: 创建CDC用户

创建一个具备合适权限的MySQL用户, 使得Debezium MySQL connector可以监控数据库的变化.

  • 创建MySQL用户, 用于Flink CDC连接到MySQL数据库

    CREATE USER 'flinkcdc'@'%' IDENTIFIED BY 'FlinkCDC_123456';
  • 授予该用户适当的权限以访问要采集的数据库和表。

    GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY 'FlinkCDC_123456';
  • 使权限生效

    FLUSH PRIVILEGES;

MySQL CDC DataStream API实现

所使用软件的版本

  • java 1.8
  • Scala 2.11
  • Flink 1.14.2
  • Flink CDC 2.3.0
  • Source MySQL 5.7
  • Sink MySQL 5.7
  • jackson 2.10.2

MySQL CDC DataStream API可实现一个job监控采集多个数据库、多个表.

1. 定义MySqlSource

//源数据库连接配置文件Properties dbProps = DbConfigUtil.loadConfig("mysql.properties");//Debezium配置Properties debeziumProps = new Properties();//decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string//precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)//以double值来表示它们,这可能会到值精度丢失//string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)debeziumProps.setProperty("decimal.handling.mode", "string");//Time、date和timestamps可以以不同的精度表示,包括://adaptive_time_microseconds(默认值):精确地捕获date、datetime和timestamp的值,使用毫秒、微秒或纳秒精度值,具体取决于数据库列的类型,但 TIME 类型字段除外,它们始终以微秒表示。//adaptive(不建议使用):以数据库列类型为基础,精确地捕获时间和时间戳值,使用毫秒、微秒或纳秒精度值。//connect:总是使用 Kafka Connect 内置的 Time、Date 和 Timestamp 表示法表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。debeziumProps.setProperty("time.precision.mode", "connect");//MySQL CDC数据源MySqlSource<String> sourceFunction = MySqlSource.<String>builder().hostname(dbProps.getProperty("host")).port(Integer.parseInt(dbProps.getProperty("port"))).databaseList(dbProps.getProperty("database_list").split(",")).tableList(dbProps.getProperty("table_list").split(",")).username(dbProps.getProperty("username")).password(dbProps.getProperty("password")).connectionPoolSize(2).serverTimeZone("Asia/Shanghai").debeziumProperties(debeziumProps).deserializer(new JsonDebeziumDeserializationSchema()).serverId("6001").startupOptions(StartupOptions.initial()).build();

2. 数据处理

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 启用Checkpointenv.enableCheckpointing(60000);// 默认即为EXACTLY_ONCE。设置Checkpoint模式为EXACTLY_ONCE,每条记录在恢复的时候都是精确一次地处理的env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置状态后端env.setStateBackend(new HashMapStateBackend());// 设置Checkpoint状态存储系统及目录env.getCheckpointConfig().setCheckpointStorage("hdfs://ns/flink/checkpoint/mysql_cdc");// 两次Checkpoint之间的最小暂停时间是500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// checkpoints必须在指定的时间内完成,否则被丢弃env.getCheckpointConfig().setCheckpointTimeout(60000);//只允许checkpoint连续失败两次env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);// 设置最大并行运行的Checkpoint数量env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 在作业取消时保留外部检查点env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 启用非对齐Checkpoint,可以极大减少背压情况的下Checkpoint次数env.getCheckpointConfig().enableUnalignedCheckpoints();//获取数据源SingleOutputStreamOperator<String> dataStreamSource = env.addSource(sourceFunction).uid("source-01").name("read-from-source");ObjectMapper mapper = new ObjectMapper();mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);//JSON字符串转JsonNodeSingleOutputStreamOperator<JsonNode> dataStreamJsonNode = dataStreamSource.map(line -> mapper.readTree(line)).uid("map-01").name("source-to-JsonNode");// 从监控的多个表中过滤出'订单表', 并解析Json的after数据SingleOutputStreamOperator<OrderInfo> orderOperator = dataStreamJsonNode.filter(line -> "order_info".equalsIgnoreCase(line.get("source").get("table").asText())).uid("order-info-filter-01").name("filter-order-info").map(line -> line.get("after").toString()).uid("order-info-map-01").name("parse-order-info-after").map(line -> mapper.readValue(line, OrderInfo.class)).uid("order-info-map-02").name("order-info-to-pojo");

3. sink到MySQL

// 定义JdbcSinkSinkFunction<OrderInfo> orderInfoSink = JdbcSink.sink(UPSERT_SQL,(JdbcStatementBuilder<OrderInfo>) (ps, order) -> new OrderInfoPreparedStatementSetter().setParams(ps, order),JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(2000).withMaxRetries(3).build(),JdbcSinkConnUtil.getConnOptions("sink-mysql.properties"));orderOperator.addSink(orderInfoSink);

参考

  1. https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-property-decimal-handling-mode
  2. https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc.html
  3. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/