前言
最近遇到一个需求,需要监听数据库中的数据变化,并及时通知后端服务做出相应的处理。本文将介绍如何使用四种方式实现监听MySQL数据库中的数据变化并通知后端服务的功能,包括:
- 轮询方式
- 使用触发器方式
MySQL
自带的Binlog
方式- 使用开源的Canal工具
一、轮询方式
轮询方式是指定时查询MySQL数据库中的某个表,然后与上一次查询结果进行比较,从而得知是否有数据发生变化。
它通过定期查询MySQL数据库的方式来检测数据变化。我们可以在后端服务中使用定时器,在一定时间间隔内轮询MySQL数据库,从而检测数据变化并做出相应的动作。
具体实现步骤如下:
- 在后端服务中使用定时器,定时轮询MySQL数据库。
- 定时器触发时,执行SQL语句查询MySQL数据库中的数据,比较查询结果与之前保存的结果,判断是否有变化。
- 如果数据发生变化,则通知后端服务做出相应的动作。
轮询方法的优点是实现简单,不需要额外的插件或配置,适用于数据变化频率较低的场景。但是存在以下几个问题:
- 浪费资源:需要不断地查询数据库,即使数据库中没有数据变化,也会一直消耗资源。
- 延迟高:轮询的时间间隔一般比较大,因此可能会存在较大的延迟。
对于高并发场景下实时性要求较高的情况不适用,轮询方法存在一定的性能问题,轮询间隔过短会增加数据库负担,轮询间隔过长又可能会错过数据变化。
二、使用触发器方式
这种方法是在MySQL数据库中使用触发器,触发器是一种数据库对象,在数据插入或更新时触发相应的事件,并将事件信息传递给后端服务。这种方法可以保证实时性,但是需要在MySQL数据库中添加触发器,增加了系统复杂度。此外,触发器的数量和复杂度也会对数据库性能造成影响。
这种方法的优点是可以减少对数据库的轮询次数,提高了性能,同时也可以更及时地获得数据变化的通知。但是,它的缺点是比较难以维护,容易导致性能问题,因为触发器的开销很大。
具体实现步骤如下:
- 在MySQL数据库中创建触发器,定义触发器的类型和事件,以及后续操作。
CREATE TRIGGER my_trigger AFTER INSERT ON my_tableFOR EACH ROWBEGIN-- 在此处编写通知后端服务的代码END;
- 当触发器中定义的事件发生时,MySQL数据库会自动触发后续操作,向后端服务发送通知。
基于触发器的方法的优点是能够实时检测数据变化,并且不需要在后端服务中进行轮询操作。但是,该方法需要在MySQL数据库中创建触发器,需要对MySQL数据库有一定的了解和操作权限,并且在高并发的场景下可能存在性能问题。
三、MySQL自带的Binlog方式
使用MySQL的binlog日志:MySQL的binlog日志记录了数据库的所有修改操作,可以通过读取binlog日志来获取数据库中的数据变化,并发送通知到后端服务。
这种方法是通过解析MySQL Binlog日志文件中的数据变更事件,识别和提取感兴趣的事件,并通知后端服务。这种方法可以保证实时性,且不需要在MySQL数据库中添加额外的触发器或表,但需要对MySQL Binlog日志文件进行解析,实现起来较为复杂。
该方式具有以下优点:
- 实时性高:可以实时获取数据变化。
- 精度高:可以记录到具体的数据变化操作。
- 避免对数据库的轮询操作
具体实现步骤如下:
1. 开启binlog:在MySQL配置文件中,将`log_bin`参数设置为ON。2. 使用MySQL的`mysqlbinlog`命令行工具将binlog中的内容读取出来,并进行解析。3. 解析binlog中的内容,判断数据变更的类型,如果是插入、更新或删除,则触发对应的回调函数,通知后端服务进行相应的处理。
需要注意的是,使用binlog监听MySQL数据变更需要注意以下问题:
- 对MySQL的性能有一定影响,因为binlog记录了所有的数据变更操作。
- binlog中的内容是以二进制形式存储的,需要进行解析。解析过程需要一定的技术功底。
- binlog只记录了数据变更操作,没有记录查询操作,如果需要监听查询操作,需要使用其他方式。
- binlog只记录了当前MySQL实例中的数据变更操作,如果需要监听多个MySQL实例,需要分别监听每个实例的binlog。
- 不稳定:MySQL的Binlog并不稳定,可能会出现丢失或损坏的情况。
综上所述,使用binlog实现MySQL数据变更的监听需要一定的技术功底和额外的开销,但是可以实现较为精确的数据变更监听,并且支持对数据变更进行回滚等操作。在一些对数据一致性要求较高的场景中,可以考虑使用这种方式。
四、使用开源的Canal工具
Canal是阿里巴巴开源的一款基于MySQL数据库的增量数据订阅和消费组件,可以将MySQL数据库的数据变更事件以消息的形式通知到后端服务,支持多种协议和多种编程语言。且相比从MySQL Binlog中解析法来说实现起来更为简便。
Canal通过读取MySQL的binlog日志来获取数据库中数据的变化情况,提供了高吞吐、低延迟、低侵入性的数据库增量订阅&消费解决方案。
Canal的工作原理如下图所示:
Canal架构图中,MySQL的binlog日志是Canal的数据源,Canal Server是数据消费者,将解析后的数据发送给下游的消费者进行处理。Canal Client是在业务系统中部署的,用于连接Canal Server,订阅指定的数据变更事件,接收Canal Server发送的数据变更事件,并将数据变更事件转换为Java对象。
Canal的优点:
- Canal能够对MySQL的数据进行精确的增量订阅,对MySQL的性能影响较小。
- Canal能够将MySQL的数据变更事件以消息的形式进行订阅,适用于分布式架构。
- Canal提供了多种消息传递协议和序列化协议,具有较高的可扩展性。
- Canal具有较高的稳定性和可靠性,支持HA和负载均衡。
- Canal是开源的,具有较高的社区活跃度和技术支持。
Canal的缺点:
- Canal需要在业务系统中部署Canal Client,增加了部署和维护的难度。
- Canal对MySQL的binlog日志的解析需要占用一定的资源,对MySQL的性能有一定的影响。
- Canal的部署和维护需要一定的技术水平,对于小型项目来说可能过于复杂。
Canal 的工作原理是模拟 MySQL 自身的复制机制。当有数据发生变化时,MySQL 会将该变化以二进制格式写入 binlog 日志中,Canal 会监听这个 binlog 日志,将其中的增量数据解析成对象,并发送给订阅者。
Canal 支持多种方式接入后端服务,包括 Kafka、RocketMQ、ActiveMQ 等,也可以通过 HTTP 接口直接获取数据。使用 Canal 监听 MySQL 数据库的数据变化,需要进行如下步骤:
1、下载和安装 Canal Server。
首先需要从 Canal 的 GitHub 上下载 Canal 的 Server 和 Client,地址为 https://github.com/alibaba/canal/releases。
选择最新版本,然后下载 Server 和 Client 压缩包,解压缩。
2、配置 Canal Server,包括 MySQL 数据库连接信息、Canal 监听的 binlog 位置等信息。
在解压缩后的 Server 目录中,找到 conf 目录,编辑 instance.properties 文件,进行以下配置:
################################################### mysql serverId 实例名称canal.instance.mysql.slaveId = 1234# position info # MySQL 地址和端口号canal.instance.master.address = 127.0.0.1:3306canal.instance.master.journal.name =canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name =#canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password# MySQL 用户名和密码canal.instance.dbUsername = canalcanal.instance.dbPassword = canalcanal.instance.defaultDatabaseName = testcanal.instance.connectionCharset = UTF-8# table regex# 指定需要监听的数据库和表canal.instance.filter.regex = .*\\..*# table black regexcanal.instance.filter.black.regex =#################################################
address设置为mysql的连接地址,defaultDatabaseName设置为自己要监听的库名,如test。
在mysql命令行,创建一个新用户,作为slave
CREATE USER canal IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;
对应配置文件里的canal用户。到此配置完毕。
执行bin目录下的startup.sh
启动后可以在logs目录下查看日志。在example目录下的example.log,如果没有报错,说明启动成功。
3、编写 Canal 客户端,订阅 Canal Server 发送的增量数据,并处理数据。
服务端启动完毕后,在客户端即可监听test库的变化。
新建一个java maven项目,pom.xml里添加依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.0.12</version></dependency>
import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;import java.util.List;/** * A Camel Application */public class MainApp {/** * A main() so we can easily run these routing rules in our IDE */public static void main(String... args) throws Exception {// 创建链接// 设置canal server的ip和端口,端口默认为11111。// example是和conf目录下的相对应CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) { // 获取指定数量的数据Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("--> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "update=" + column.getUpdated());}}}
总结
以上四种方法都可以实现对 MySQL 数据库中的数据变化进行监听和通知,不同的方法适用于不同的场景,可以根据实际情况选择最合适的方法。
轮询方式和使用触发器方式相对简单,适合对数据变化的及时性和精确性要求不高的场景;Binlog
方式和Canal工具则更加强大和灵活,适用于对数据变化的及时性和精确性要求比较高的场景。
在实际应用中,需要根据业务需求和技术实现难度进行选择。同时,为了确保系统的稳定性和数据安全,需要对监听和通知的过程进行充分的测试和安全评估。