1、MysqlCDC配置指南
1.1说明
当前操作针对的是mysql5.8的版本,其他版本可能会有所差别。开启CDC需要更改mysql的配置文件,这就需要用户拥有服务器文件的操作权限。支持的版本:
参考地址:
https://gitee.com/heartape/flink-cdc-connectors/blob/master/docs/content/connectors/mysql-cdc.md
1.2开启CDC操作
Linux下开启Mysql-CDC的方法
第一步:查询MySQL数据库是否开启了Binlog, 若变量log_bin的值为“OFF”,则说明Binlog未开启。
show variables like ‘log_bin’;
第二步:编辑MySQL配置文件
vim /mysql config file path/my.cnf; — 根据实际配置文件的路径设定
第三步:在配置文件中增加如下配置,开启Binlog
server-id = 123 -- 对主从机器,每台机器必须唯一,建议设置大于100log_bin = mysql-binbinlog_format = rowbinlog_row_image = fullexpire_logs_days = 7 -- 日志过期时间,建议7天gtid_mode = onenforce_gtid_consistency = on
第四步:重启MySQL数据库
service mysqld restart
1.3权限配置
CREATE USER 'user'@'localhost' IDENTIFIED BY 'password'; GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';FLUSH PRIVILEGES;
2、SqlserverCDC配置指南
2.1说明
当前操作基于SQLServer2019的版本,要求SQLSERVER 代理服务必须保持运行状态,否则无法获取日志文件。开启SQLSERVER代理服务方式:SQL SERVER代理右键点击启动。SQL SERVER日志清理是由代理自动执行。
SQLServer支持版本
参考网址:
https://gitee.com/heartape/flink-cdc-connectors/blob/master/docs/content/connectors/sqlserver-cdc.md
2.2开启CDC操作
第一步:验证SQLServer库级别CDC是否启用
select is_cdc_enabled from sys.databases where name=''//name为您的数据库名 返回1表示已启
用库级别的CDC功能。
第二步:启用SQLServer库级别CDC功能
USEGOEXEC sys.sp_cdc_enable_db
如上执行完毕之后,会在数据库下的“系统表”中创建如下六个系统表
第三步:启用SQLServer表级别CDC功能(可选,后续在任务开启CDC时也会自动开启)
EXEC sys.sp_cdc_enable_table@source_schema =N'dbo',@source_name = N'your_table_name', //你需要开启cdc的表名称@capture_instance = NULL,@role_name = NULL,@supports_net_changes = 0go
第四步:验证SQLServer表级别是否启用(可选)
SELECT is_tracked_by_cdc FROM sys.tables WHERE name=”; //你的表名
查询语句的执行结果返回1表示该表已经开启表级别CDC
3、oracleCDC配置指南
3.1说明
当前操作针对的是oracle19c的版本,其他版本会有所差别。开启CDC需要在命令行里进行操作,用sysdba进行连接。参考网址:
https://gitee.com/heartape/flink-cdc-connectors/blob/master/docs/content/connectors/oracle-cdc.md
3.2开启CDC配置指南
Linux下开启Oracle-CDC的方法
第一步:在命令行工具中以sys用户连接到数据库
sys/manager as sysdba;
第二步:检查日志归档是否已开启
archive log list;
回显打印“Database log mode: No Archive Mode”,说明日志归档未开启
第三步:执行以下命令配置归档日志参数(顺序执行)
alter system set db_recovery_file_dest_size = 10G;alter system set db_recovery_file_dest =‘log_file_path’ scope=spfile; //修改目录--可不设置shutdown immediate; //关闭数据库服务,时间较长请耐心等待startup mount; //启动实例alter database archivelog; //开启归档alter database open; //启动数据库
第四步:开启表的补充日志(可选,任务开启cdc时自动开启)
ALTER TABLE 模式.表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
第五步:查询表的补充日志开启状态(可选)
SELECT count(0) AS OPEN_STATUS FROM DBA_LOG_GROUPS WHERE OWNER='模式' AND TABLE_NAME='表名'
3.3权限配置
创建用户并赋权
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT CREATE TABLE TO flinkuser; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
- postgresqlCDC配置指南
4.1说明
Postgresql开启CDC需要修改数据库配置文件,所以需要有服务器的权限。支持的版本:
4.2开启cdc配置指南
第一步:修改PostgreSQL数据库配置postgresql.conf文件
wal_level = logical
生效需重启pg服务
第二步:为需要同步的数据库(db)创建逻辑复制插槽(replication slot)(10+版本)
SELECT pg_create_logical_replication_slot('you slot name', 'pgoutput');
注意:现在只支持pgoutput输出插件
第三步:为数据库中的所有表或指定表创建一个发布(可选,已在任务中定义)
DROP PUBLICATION IF EXISTS pub01;CREATE PUBLICATION pub01 FOR TABLE test01,test02; --你可以指定表
第四步:验证指定的表是否在发布中
SELECT * FROM pg_publication_tables WHERE tablename ='test';
第五步:给用户复制流权限
ALTER ROLE you-user-name replication;