Flink CDC-SQL Server CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表
Flink CDC-SQL Server CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表
·
文章目录
SQL Server CDC配置
第一步:启用指定数据库的CDC功能
- 查看SQL Server是否已启用CDC功能
-- 返回1表示已启用CDC功能 select is_cdc_enabled from db_name.sys.databases where name = 'db_name';
- 若未启用CDC功能,依次执行如下语句开启数据库的CDC
ALTER AUTHORIZATION ON DATABASE::db_name TO sa; USE db_name; EXEC sys.sp_cdc_enable_db;
第二步:创建数据库角色
- 查看数据库角色
select name from db_name.sys.database_principals where type_desc = 'DATABASE_ROLE' and name not like '##%'
- 新建数据库角色
USE db_name; create role cdc_role; grant select on schema::schema_name to cdc_role;
- 检查用户是否具有db_owner角色
SELECT r.name AS RoleName, m.name AS MemberName FROM sys.database_role_members drm JOIN sys.database_principals r ON drm.role_principal_id = r.principal_id JOIN sys.database_principals m ON drm.member_principal_id = m.principal_id --WHERE m.name = 'sa' AND r.name = 'db_owner';
第三步:创建文件组&文件
alter database db_name add filegroup cdc_filegroup;
alter database db_name add file (
name = 'cdc_data',
filename = 'D:\RDSDBDATA\DATA\cdc_data.mdf', -- 需查看SQLServer实际数据路径
size = 16MB
)
to filegroup cdc_filegroup;
-- select data_space_id,name,type_desc,t1.is_read_only,t1.type,t1.log_filegroup_id from db_name.sys.filegroups t1;
-- select file_id,name,physical_name,type_desc,data_space_id from db_name.sys.database_files;
第四步:启用指定表的CDC功能
-
对要采集的表启用CDC
- 检查表是否开启CDC
select is_tracked_by_cdc from db_name.sys.tables where name = 'table_name';
- 表开启CDC
USE db_name; EXEC sys.sp_cdc_enable_table @source_schema = N'schema_name', @source_name = N'table_name', @role_name = N'cdc_role_name', @filegroup_name = N'cdc_filegroup', @supports_net_changes = 0
- 检查表是否开启CDC
-
开启表CDC可能出现如下报错:
SQL 错误 [22832] [S0001]: 无法更新元数据来指示已对表 [schema_name].[table_name] 启用了变更数据捕获。 执行命令 '[sys].[sp_cdc_add_job] @job_type = N'capture'' 时失败。 返回的错误为 22836: '无法更新数据库 db_name 的元数据来指示已添加某变更数据捕获作业。 执行命令 'sp_add_jobstep_internal' 时失败。返回的错误为 14234: '指定的 '@server' 无效(有效值由 sp_helpserver 返回)。 '。请使用此操作和错误来确定失败的原因并重新提交请求。'。请使用此操作和错误来确定失败的原因并重新提交请求。
原因:
-- 检查当前服务器名称。 select @@servername; -- 检查实例名称 SELECT SERVERPROPERTY('ServerName'); -- SQLServer安装程序在安装时将服务器名设置为计算机名。 -- SERVERPROPERTY函数的ServerName属性和@@SERVERNAME返回相似的信息。 -- 1. ServerName属性提供Windows服务器和实例名称,两者共同构成唯一的服务器实例。(如果服务器的网络名称更改,此值会返回新的名称) -- 2. @@SERVERNAME 提供当前配置的本地服务器名称。(代表在安装或设置SQLServer实例时指定的服务器和实例名称。如果后续对服务器的网络名称进行了更改,此值不会自动更新) -- 如果安装时未更改默认服务器名称,则ServerName属性和@@SERVERNAME返回相同的信息。 可以通过执行以下过程配置本地服务器的名称: EXEC sp_dropserver 'current_server_name'; EXEC sp_addserver 'new_server_name', 'local';
查看SQLServer CDC自动清理周期
EXEC sys.sp_cdc_help_jobs;
-
验证用户是否有权限访问CDC表
USE db_name; EXEC sys.sp_cdc_help_change_data_capture;
SQLServer CDC DataStream API实现
所使用软件的版本
- java 1.8
- Scala 2.11
- Flink 1.14.2
- Flink CDC 2.3.0
- Source SQLServer2016
- Sink MySQL 5.7
- jackson 2.10.2
SQLServer CDC DataStream API可实现一个job监控采集一个数据库的多个表.
1. 定义SqlServerSource
//源数据库连接配置文件
Properties dbProps = DbConfigUtil.loadConfig("sqlserver.properties");
//Debezium配置
Properties debeziumProps = new Properties();
//decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string
//precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)
//以double值来表示它们,这可能会到值精度丢失
//string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)
debeziumProps.setProperty("decimal.handling.mode", "string");
//Time、date和timestamps可以以不同的精度表示,包括:
//adaptive_time_microseconds(默认值):精确地捕获date、datetime和timestamp的值,使用毫秒、微秒或纳秒精度值,具体取决于数据库列的类型,但 TIME 类型字段除外,它们始终以微秒表示。
//adaptive(不建议使用):以数据库列类型为基础,精确地捕获时间和时间戳值,使用毫秒、微秒或纳秒精度值。
//connect:总是使用 Kafka Connect 内置的 Time、Date 和 Timestamp 表示法表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。
debeziumProps.setProperty("time.precision.mode", "connect");
//SQLServer CDC数据源
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
.hostname(dbProps.getProperty("host"))
.port(Integer.parseInt(dbProps.getProperty("port")))
.database(dbProps.getProperty("database"))
.tableList(dbProps.getProperty("table_list").split(","))
.username(dbProps.getProperty("username"))
.password(dbProps.getProperty("password"))
.debeziumProperties(debeziumProps)
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
2. 数据处理
参考: MySQL CDC配置及DataStream API实现代码
3. Sink到MySQL
参考: MySQL CDC配置及DataStream API实现代码
参考
- https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html
- https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/sqlserver-cdc.html
- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
- https://learn.microsoft.com/zh-cn/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-2016
更多推荐
已为社区贡献1条内容
所有评论(0)