第1章 Canal 简介

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

第2章 Canal 快速开始

环境准备

主机环境:Windows 11

数据库版本:MySQL-8数据库

Canal版本:canal.deployer-1.1.6

MySQL8准备

(1)检查MySQL 的binlog功能是否有开启

-- 是否开启binlogshow VARIABLES like 'log_bin';

(2)如果显示状态为OFF表示该功能未开启,开启binlog功能

1,修改 mysql 的配置文件 my.cnf**/**/my.cnf末尾追加内容:#binlog文件名log-bin=mysql-bin#选择row模式binlog_format=ROW#mysql实例id,不能和canal的slaveId重复server_id=12,windows 重启 mysql

(3)在mysql里面添加以下的相关用户和权限

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO'canal'@'%';FLUSH PRIVILEGES;

Windows 安装Canal

下载地址

Canal Git 地址:https://github.com/alibaba/canal/releases

解压及配置

解压canal.deployer-1.1.6.tar.gz,我们可以看到里面有四个文件夹:

Canal 启动配置

(1)打开配置文件conf/example/instance.properties

################################################### v1.0.26版本后会自动生成slaveId,所以可以不用配置# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=false# 数据库地址canal.instance.master.address=127.0.0.1:3306# binlog日志名称canal.instance.master.journal.name=mysql-bin.000001# binlog偏移量canal.instance.master.position=913canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=mysql\\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic=example# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.enableDynamicQueuePartition=false#canal.mq.partitionsNum=3#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################

知识点拓展:查看MySQL 的binlog日志名称 和binlog 偏移量

# 查看当前服务器使用的biglog文件及大小show binary logs;# 查看最新一个binlog日志文件名称和Positionshow master status;# 查看 binlog 日志列表show master logs;

(2)Canal 启动

切换至Canal项目bin 文件夹(D:\Canal\canal.deployer-1.1.6\bin),双击启动startup.bat

(3)查看Canal server 日志

切换至Canal项目logs/canal 文件夹

查看logs/canal/canal.log 日志内容

2023-02-06 15:45:55.188 [main] INFOcom.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler2023-02-06 15:45:55.193 [main] INFOcom.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations2023-02-06 15:45:55.198 [main] INFOcom.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.2023-02-06 15:45:55.358 [main] INFOcom.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.43.80(192.168.43.80):11111]2023-02-06 15:45:56.260 [main] INFOcom.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

(4)查看Canal instance 日志

切换至Canal项目logs/example文件夹

2023-02-06 17:06:18.146 [main] INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2023-02-06 17:06:18.148 [main] WARNc.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$2023-02-06 17:06:18.148 [main] WARNc.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$2023-02-06 17:06:18.148 [main] INFOc.a.otter.canal.instance.core.AbstractCanalInstance - start successful....2023-02-06 17:06:18.201 [destination = example , address = /127.0.0.1:3306 , EventParser] WARNc.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2023-02-06 17:06:18.316 [destination = example , address = /127.0.0.1:3306 , EventParser] WARNc.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position mysql-bin.000001:4:16756662120002023-02-06 17:06:18.809 [destination = example , address = /127.0.0.1:3306 , EventParser] WARNc.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=,timestamp=1675666212000] cost : 608ms , the next step is binlog dump2023-02-06 17:06:33.829 [MultiStageCoprocessor-other-example-0] WARNcom.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:1572023-02-06 17:06:48.835 [MultiStageCoprocessor-other-example-0] WARNcom.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:157

(5)Canal 停止

直接关闭Canal 服务运行窗口即可。

第3章 Docker 安装Canal

第一步:查看本地镜像、检查Canal镜像和下载Canal 镜像
# 查看本地镜像docker images# 检索Kafka镜像docker search canal# 下载Kafka 镜像指定版本docker pull canal/canal-server:latest
第二步:docker 启动Canal
docker run --name canal -d canal/canal-server

知识点拓展:拷贝Canal 容器内部配置文件拷贝到外部

语法:docker cp [容器索引]:[内部路径] [外部路径]

实例:

docker cp canal:/home/admin/canal-server/conf/canal.properties /home/canaldocker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/canal
第三步:修改Canal 配置文件instance.properties
# 编辑配置文件vi/home/canal/instance.properties

编辑内容:

################################################### v1.0.26版本后会自动生成slaveId,所以可以不用配置# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=false# 数据库地址canal.instance.master.address=127.0.0.1:3306# binlog日志名称canal.instance.master.journal.name=mysql-bin.000001# binlog偏移量canal.instance.master.position=913canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=rootcanal.instance.dbPassword=123456canal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=mysql\\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic=example# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.enableDynamicQueuePartition=false#canal.mq.partitionsNum=3#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################
第四步:基于Canal 外部配置文件,重新Canal 容器实例
docker run --name canal -p 11111:11111 -d -v /home/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /home/canal/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server
可选指令:
  • 关闭Canal 容器

docker stop canal
  • 移除Canal 容器

docker rm canal

第4章 基于Canal 和Kafka,实现MySQL的Binlog 近实时同步

搭建一套可以用的组件需要部署MySQL、Zookeeper、Kafka和Canal四个中间件的实例。

Docker 安装MySQL

请参考:Docker 安装MySQL

CentOS-7安装ZooKeeper

Canal和Kafka集群都依赖于Zookeeper做服务协调,为了方便管理,一般会独立部署Zookeeper服务或者Zookeeper集群。

midkr /data/zk# 创建数据目录midkr /data/zk/datacd /data/zkwget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gztar -zxvf apache-zookeeper-3.6.0-bin.tar.gzcd apache-zookeeper-3.6.0-bin/confcp zoo_sample.cfg zoo.cfg && vim zoo.cfg

把zoo.cfg文件中的dataDir设置为/data/zk/data,然后启动Zookeeper:

[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start/usr/bin/javaZooKeeper JMX enabled by defaultUsing config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfgStarting zookeeper ... STARTED

注意一点,要启动此版本的Zookeeper服务必须本地安装好JDK8+。启动的默认端口是2181,启动成功后的日志如下:

CentOS-7 安装Kafka

Kafka是一个高性能分布式消息队列中间件,它的部署依赖于Zookeeper。笔者在此选用2.4.0并且Scala版本为2.13的安装包:

mkdir /data/kafkamkdir /data/kafka/datawget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgztar -zxvf kafka_2.13-2.4.0.tgz

解压后/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中对应的zookeeper.connect=localhost:2181已经符合需要,不必修改,需要修改日志文件的存放目录log.dirs为/data/kafka/data。然后启动Kafka服务:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties

知识拓展:kafka 后台进行运行设置

Kafka启动后一旦退出控制台就会结束Kafka进程,可以添加-daemon参数用于控制Kafka进程后台不挂断运行。

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties

CentOS-7 安装Canal

CentOS 安装Canal 核心步骤

mkdir /data/canalcd /data/canal# 这里注意一点,Github在国内被墙,下载速度极慢,可以先用其他下载工具下载完再上传到服务器中wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gztar -zxvf canal.deployer-1.1.6.tar.gz

Canal 解压后目录说明:

- bin # 运维脚本- conf# 配置文件canal_local.properties# canal本地配置,一般不需要动canal.properties# canal服务配置logback.xml # logback日志配置metrics # 度量统计配置spring# spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件example # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹instance.properties # 实例配置,一般指单个数据库的配置- lib # 服务依赖包- logs# 日志文件输出目录

在开发和测试环境建议把logback.xml的日志级别修改为DEBUG方便定位问题。这里需要关注canal.properties和instance.properties两个配置文件。canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。

  • canal.serverMode配置项指定为kafka,可选值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以选用rabbitmq),默认是kafka。

  • canal.mq.servers配置需要指定为Kafka服务或者集群Broker的地址,这里配置为127.0.0.1:9092

canal.mq.servers在不同的canal.serverMode有不同的意义。kafka模式下,指Kafka服务或者集群Broker的地址,也就是bootstrap.serversrocketmq模式下,指NameServer列表rabbitmq模式下,指RabbitMQ服务的Host和Port

本文Kafka实例配置

找到canal.deployer-1.1.6/conf目录下的canal.properties配置文件:

# tcp, kafka, RocketMQ 这里选择kafka模式canal.serverMode = kafka# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况canal.instance.parser.parallelThreadSize = 16# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口canal.mq.servers = 127.0.0.1:9092# 配置instance,在conf目录下要有example同名的目录,可以配置多个canal.destinations = example

其他配置项可以参考下面两个官方Wiki的链接:

  • Canal-Kafka-RocketMQ-QuickStart

  • AdminGuide

instance.properties一般指一个数据库实例的配置,Canal架构支持一个Canal服务实例,处理多个数据库实例的binlog异步解析。instance.properties需要修改的配置项主要包括:

  • canal.instance.mysql.slaveId需要配置一个和Master节点的服务ID完全不同的值,这里笔者配置为654321。

  • 配置数据源实例,包括地址、用户、密码和目标数据库:

  • canal.instance.master.address,这里指定为127.0.0.1:3306。

  • canal.instance.dbUsername,这里指定为canal。

  • canal.instance.dbPassword,这里指定为QWqw12!@。

  • 新增canal.instance.defaultDatabaseName,这里指定为test(需要在MySQL中建立一个test数据库,见前面的流程)。

  • Kafka相关配置,这里暂时使用静态topic和单个partition:

  • canal.mq.topic,这里指定为test,也就是解析完的binlog结构化数据会发送到Kafka的命名为testtopic

  • canal.mq.partition,这里指定为0。

本文MySQL8实例配置

配置instance,找到/conf/example/instance.properties配置文件:

## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0# position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canal.mq.topic=canaltopic#单队列模式的分区下标canal.mq.partition=0

配置工作做好之后,可以启动Canal服务:

sh /data/canal/bin/startup.sh # 查看服务日志tail -100f /data/canal/logs/canal/canal# 查看实例日志-- 一般情况下,关注实例日志即可tail -100f /data/canal/logs/example/example.log

启动正常后,见实例日志如下:

数据演示

在test数据库创建一个订单表

use `test`;CREATE TABLE `order`(idBIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',order_idVARCHAR(64)NOT NULL COMMENT '订单ID',amountDECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额',create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',UNIQUE uniq_order_id (`order_id`)) COMMENT '订单表';INSERT INTO `order`(order_id, amount) VALUES ('20230207093012', 1999);UPDATE `order` SET amount = 2000 WHERE order_id = '20230207093012';DELETEFROM `order` WHERE order_id = '20230207093012';

利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test

第5章 基于Canal 和RabbitMQ,实现MySQL的Binlog 近实时同步

Docker 安装MySQL

请参考:Docker 安装MySQL

Docker 安装RabbitMQ

请参考:Docker 安装RabbitMQ

RabbitMQ 增加交换机和队列
  • 添加交换机 canal_exchange

  • 添加队列 canal_queue

  • 队列绑定交换机

CentOS-7 安装Canal

centos-7 解压安装Canal与上 一章节一致,顾不在做详细描述***。本章节重点讲解Canal 配置RabbitMQ 参数配置。

Canal Server配置

需要配置的东西就两项,一个是监听数据库配置,另一个是 RabbitMQ 连接配置。

instance.properties

监听数据库配置

cd /example 目录下

canal.properties

配置 Canal 服务方式为 RabbitMQ 和连接配置

进入到conf文件,打开canal.properties

serverMode(服务模式)修改为rabbitMQ,默认TCP.

RabbitMQ 服务相关参数设置。

第6章 Canal API

快速开始

第一步:maven 添加相关jar包依赖

com.alibaba.ottercanal.client1.1.0

第二步:编写main方法测试

import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "update=" + column.getUpdated());}}}

第三步:启动Canal 服务

第四步: 运行Canal 客户端的main 方法,控制台输出如下信息:

empty count : 1empty count : 2empty count : 3empty count : 4

含义:数据库无变更记录。

第五步:模拟数据库变更操作

mysql> use test;Database changedmysql> CREATE TABLE `xdual` (-> `ID` int(11) NOT NULL AUTO_INCREMENT,-> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,-> PRIMARY KEY (`ID`)-> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;Query OK, 0 rows affected (0.06 sec)mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

再次查看Canal 客户端,控制台输出信息:

empty count : 1empty count : 2empty count : 3empty count : 4================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERTID : 4update=trueX : 2013-02-05 23:29:46update=true

Canal API 文档说明

Canal 类设计

大致分为几部分:

  • ClientIdentity
    canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)

  • CanalConnector
    SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制

  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.

  • ClientRunningMonitor/ClientRunningListener/ClientRunningData
    client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.

javadoc查看:

  • CanalConnector :http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/CanalConnector.html

Canal server/client交互协议

具体的网络协议格式,可参见:CanalProtocol.proto

Canal get/ack/rollback协议

get/ack/rollback协议介绍:

  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
    a. batch id 唯一标识
    b. entries 具体的数据对象,可参见下面的数据介绍

  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
    a. 拿够batchSize条记录或者超过timeout时间
    b. timeout=0,阻塞等到足够的batchSize

  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作

  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.

流式api设计的好处:
  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)

  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性

  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取

  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cursor

  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

流式api带来的异步响应模型:

数据对象格式简单介绍:EntryProtocol.proto

EntryHeaderlogfileName [binlog文件名]logfileOffset [binlog position]executeTime [binlog里记录变更发生的时间戳,精确到秒]schemaName tableNameeventType [insert/update/delete类型]entryType [事务头BEGIN/事务尾END/数据ROWDATA]storeValue[byte数据,可展开,对应的类型为RowChange]RowChangeisDdl [是否是ddl变更操作,比如create table/drop table]sql [具体的ddl sql]rowDatas[具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]beforeColumns [Column类型的数组,变更前的数据字段]afterColumns [Column类型的数组,变更后的数据字段]ColumnindexsqlType [jdbc type]name[column name]isKey [是否为主键]updated [是否发生过变更]isNull[值是否为null]value [具体的内容,注意为string文本]

说明:

  • 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全

  • 可以提供ddl的变更语句

  • insert只有after columns, delete只有before columns,而update则会有before / after columns数据.

快速开始代码剖析

1. 创建Connector

a. 创建SimpleCanalConnector (直连ip,不支持server/client的failover机制)

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");

b. 创建ClusterCanalConnector (基于zookeeper获取canal server ip,支持server/client的failover机制)

CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");

c. 创建ClusterCanalConnector (基于固定canal server的地址,支持固定的server ip的failover机制,不支持client的failover机制

CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");

2. get/ack/rollback使用

3. RowData数据处理

第7章 Canal 适配器

基本说明

canal 1.1.1版本之后,提供了适配器功能,可将canal server的数据直接输出到目的地,不需要用户编写客户端。

温馨提示:特殊功能需求,还需要用户编写客户端实现

适配器整体结构

client-adapter分为适配器和启动器两部分,每个适配器会将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载。

启动器为 SpringBoot 项目, 支持canal-client启动的同时提供相关REST管理接口, 运行目录结构为:

- binrestart.shstartup.batstartup.shstop.sh- lib ...- plugin client-adapter.logger-1.1.1-jar-with-dependencies.jarclient-adapter.hbase-1.1.1-jar-with-dependencies.jar...- confapplication.yml- hbasemytest_person2.yml- logs

以上目录结构最终会打包成 canal-adapter-*.tar.gz 压缩包

源码结构解析

launcher:启动器

logger:日志适配器

rdb:支持jdbc的关系型数据库适配器(mysql、oracle、postgress、sqlserver等)

hbase:hbase适配器

kudu:kudu适配器

Canal适配器之启动

Canal 适配器启动之配置文件application.yml

server:port: 8081spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQ #客户端消费模式,对应下面的consumerPropertiesflatMessage: true#是否以json字符串传递数据,仅对mq生效zookeeperHosts:#canal server集群部署时,创建curator客户端 #tcp mode需要在consumerProperties tcp中设置syncBatchSize: 1000#每次同步的批数量retries: -1#重试次数,-1为无限次timeout:accessKey:secretKey:consumerProperties:# canal tcp consumer #canal adapter连接的canal servercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumer#canal adapter连接的kafkakafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumer#canal adapter连接的rocketmqrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumer #canal adapter连接的rabbitmqrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:#srcDataSources:#defaultDS:#url: jdbc:mysql://127.0.0.1:3306/mytest" />Canal适配器实例

业务需求:商品表新增商品数据实时同步ES搜索引擎。

解决方案:基于Canal监听MySQL-binlog 日志信息变化,通过Canal-Adapter 读取Canal 数据变更记录,同时写入ES搜索引擎。

1、软件版本

MySQL:8.xcanal:1.1.6adapter:1.1.6elasticsearch:7.4.2

2、MySQL开启binlog

[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
# 是否开启binlogshow variables like 'log_bin';# 结果log_binON# binlog模式show variables like 'binlog_format';# 结果binlog_formatROW
# 创建用户canal及密码设置CREATE USER canal IDENTIFIED BY 'canal';# 赋权GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新生效FLUSH PRIVILEGES;

3、下载canal及adapter

# canal-serverhttps://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz# canal-adapterhttps://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.adapter-1.1.6.tar.gz

4、编辑canal配置文件

vi conf/example/instance.properties

此处只展示修改的配置

# 伪装成从库的slaveId,不能与MySQL重复canal.instance.mysql.slaveId=1234# 数据库的ip:端口canal.instance.master.address=127.0.0.1:3306# 数据库用户名密码canal.instance.dbUsername=canalcanal.instance.dbPassword=canal

5、启动canal

cd binsh startup.sh

如遇到如下报错

OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.The stack size specified is too small, Specify at least 384kError: Could not create the Java Virtual Machine.Error: A fatal exception has occurred. Program will exit.

解决办法:调整startup.sh脚本的-Xss参数

vi bin/startup.sh
# 我这里调整到-Xss512k
if [ -n "$str" ]; then
JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss512k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
else
JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "

6、编辑adapter配置文件

cd confvi application.yml

此处只展示修改的配置

canal.conf:consumerProperties:# 单机配置属性# canal.tcp.server.host: 127.0.0.1:11111# 此配置数据库信息与canal-server配置的数据库信息相同srcDataSources:defaultDs:url: jbdc:mysql://127.0.0.1:3306/canal_testusername: canalpassword: canal# 配置 ES信息canalAdapters:groups:outerAdapters: - name: logger- name: es7hosts: http://127.0.0.1:9200properties:mode: restsecurity.auth: es账号:es密码cluster.name: es的名字

7、编辑es7 索引配置文件

cd conf/es7/cp mytest_user.yml canal_test_order.ymlrm biz_order.yml customer.yml mytest_user.ymlvi canal_test_order.ymldataSourceKey: defaultDSdestination: examplegroupId: g1esMapping:_index: canal_test_order_id: _id# 这个必须要加,源文件没有_type: _docupsert: true#pk: idsql: "selecta.id as _id,a.order_no as orderNo,a.order_name as orderNamefrom t_order a"#objFields:#_labels: array:;etlCondition: "where a.c_time>={}"commitBatch: 3000

8、启动Canal adapter

cd binsh startup.sh

查看Adapter 日志记录出现如下错误时:

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSourceat com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_322]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_322]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_322]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_322]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]

解决办法,更改源码

下载canal-adapter源码

修改client-adapter/escore/pom.xml为

原com.alibabadruid改成com.alibabadruidprovided

重新打包编译生成client-adapter.es7x-1.1.5-jar-with-dependencies.jar

放入canal-adapter的plugin目录下,替换原jar

重新启动,Canal Adapter 日志如下

9、测试

在MySQL手动插入一条数据

adapter.log打印日志如下
2023-02-09 15:20:25.519 [pool-2-thread-1] INFOc.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":7,"order_no":1122,"order_name":"2211"}],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"}2023-02-09 15:20:25.520 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":7,"order_no":1122,"order_name":"2211"}],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"} Affected indexes: canal_test_order 
查看elasticsearch数据
# get 127.0.0.1:9200/canal_test_order/_search{"took": 0,"timed_out": false,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0},"hits": {"total": {"value": 1,"relation": "eq"},"max_score": 1.0,"hits": [{"_index": "canal_test_order","_type": "_doc","_id": "1","_score": 1.0,"_source": {"orderNo": 111111,"orderName": "11111"}}]}}

第8章 Canal 管理平台搭建

背景

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

准备

canal-admin的限定依赖:

  1. MySQL,用于存储配置和节点等相关数据

  1. canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

部署

  1. 下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.6 版本为例

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.admin-1.1.6.tar.gz
  1. 解压缩

mkdir /tmp/canal-admintar zxvf canal.admin-$version.tar.gz-C /tmp/canal-admin

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x 6 agapplestaff 204B8 31 15:37 bindrwxr-xr-x 8 agapplestaff 272B8 31 15:37 confdrwxr-xr-x90 agapplestaff 3.0K8 31 15:37 libdrwxr-xr-x 2 agapplestaff68B8 31 15:26 logs
  1. 配置修改

vi conf/application.yml
server:port: 8089spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: canalpassword: canaldriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin
  1. 初始化元数据库

mysql -h127.1 -uroot -p# 导入初始化SQL> source conf/canal_manager.sql

a. 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下。

  1. 启动

sh bin/startup.sh
  1. 日志查看

vi logs/admin.log2023-02-09 15:43:38.162 [main] INFOo.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)2023-02-09 15:43:38.180 [main] INFOorg.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]2023-02-09 15:43:38.191 [main] INFOorg.apache.catalina.core.StandardService - Starting service [Tomcat]2023-02-09 15:43:38.194 [main] INFOorg.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29....2023-02-09 15:43:39.789 [main] INFOo.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler2023-02-09 15:43:39.825 [main] INFOo.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

  1. 关闭

sh bin/stop.sh
  1. canal-server端配置

使用canal_local.properties的配置覆盖canal.properties

# register ipcanal.register.ip =# canal admin configcanal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441# admin auto registercanal.admin.register.auto = truecanal.admin.register.cluster =

启动admin-server即可。

第9章 Canal 管理平台操作指南

请参考:Canal-Admin-指南