博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CDC 实现数据同步,增量更新
阅读量:5994 次
发布时间:2019-06-20

本文共 3788 字,大约阅读时间需要 12 分钟。

在Sqlserver2008上利用CDC实现了数据更新的跟踪,比以往的利用时间戳,触发器实现更加方便快捷.

参考资料:

实现步骤如下:

1.配置cdc

-- 开启cdc

USE db1
GO
EXEC sys.sp_cdc_enable_db
--验证
--0 :未开启cdc 1:开启cdc
SELECT is_cdc_enabled FROM sys.databases WHERE database_id=DB_ID()
--表开启cdc
USE db1;
GO
EXEC sys.sp_cdc_enable_table
@source_schema ='dbo'
,@source_name='t_cdc_ta'
,@role_name=null
,@capture_instance=NULL
,@supports_net_changes=1
,@index_name=null
,@captured_column_list=null
,@filegroup_name=default
,@allow_partition_switch=1
/*
开启之后会生成cdc构架,并生成查询函数和变更数据表
cdc.captured_columns
cdc.change_tables
cdc.ddl_history
cdc.index_columns
cdc.lsn_time_mapping
dbo.systranschemas
cdc.dbo_t_cdc_ta_CT 以构架名和表名组合的变更数据表
*/
--表结构
CREATE TABLE [t_cdc_ta]
(
[id] [int] IDENTITY(1,1) PRIMARY KEY NOT NULL,
[name] [varchar](20) NULL,
[addr] [varchar](20) NULL,
[ttime] [datetime] NULL
)

2.跟踪变更数据

当往源表t_cdc_ta中新增,插入,删除数据时,可以在变更数据表[cdc].[dbo_t_cdc_ta_CT]中看到如下数据

__$operation:

1-删除 ,2-新增,4-更新

3.根据变更数据,利用ETL可以实现数据的增量更新

脚本如下:

USE [db1]

GO
CREATE TABLE [dbo].[cdc_capture_log](
[cdc_capture_log_id] [int] IDENTITY(1,1) NOT NULL,
[capture_instance] [nvarchar](50) NOT NULL,
[start_time] [datetime] NOT NULL,
[min_lsn] [binary](10) NOT NULL,
[max_lsn] [binary](10) NOT NULL,
[end_time] [datetime] NULL,
[status_code] [int] NOT NULL DEFAULT(0)
)
CREATE PROCEDURE [dbo].[usp_init_cdc_capture_log]
@capture_instance NVARCHAR(50)
AS
BEGIN
SET nocount ON ;
DECLARE @start_lsn BINARY(10),
@end_lsn BINARY(10),
@prev_max_lsn BINARY(10)
--get the max LSN for the capture instance from --the last extract
SELECT @prev_max_lsn = MAX(max_lsn)
FROM dbo.cdc_capture_log
WHERE capture_instance = @capture_instance
-- if no row found in cdc_capture_log get the min lsn -- for the capture instance
IF @prev_max_lsn IS NULL
SET @start_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
ELSE
SET @start_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn)
-- get the max lsn
SET @end_lsn = sys.fn_cdc_get_max_lsn()
IF @start_lsn>=@end_lsn
SET @start_lsn=@end_lsn
INSERT INTO dbo.cdc_capture_log
(
capture_instance,
start_time,
min_lsn,
max_lsn
)
VALUES (
@capture_instance,
GETDATE(),
@start_lsn,
@end_lsn
)
SELECT CAST(SCOPE_IDENTITY() AS INT) cdc_capture_log_id
END
GO
create procedure [dbo].[usp_extract_userm_capture_log]
@cdc_capture_log_id INT
AS
BEGIN
set nocount on;
DECLARE @start_lsn binary(10),@end_lsn binary(10)-- get the lsn range to process
SELECT @start_lsn = min_lsn,@end_lsn = max_lsn from dbo.cdc_capture_log
where cdc_capture_log_id = @cdc_capture_log_id
-- extract and return the changes
select m.tran_end_time modified_ts,
x.*
from cdc.fn_cdc_get_all_changes_dbo_t_cdc_ta(@start_lsn, @end_lsn, 'all') x
join cdc.lsn_time_mapping m on m.start_lsn = x.__$start_lsn ;
end
GO
CREATE PROCEDURE [dbo].[usp_end_cdc_capture_log]
@cdc_capture_log_id INT
AS
BEGIN
SET nocount ON ;
UPDATE dbo.cdc_capture_log
SET end_time = GETDATE(),
status_code = 1
WHERE cdc_capture_log_id = @cdc_capture_log_id
END
GO
--在另一个库上建一个相同的结构的表作为同步数据测试用表
USE montior
GO
CREATE TABLE [dbo].[t_cdc_ta](
[id] [int] PRIMARY KEY NOT NULL,
[name] [varchar](20) NULL,
[addr] [varchar](20) NULL,
[ttime] [datetime] NULL
)
GO
CREATE PROC [dbo].[p_merge]
@oper INT,
@id INT,
@name VARCHAR(20),
@addr VARCHAR(20),
@ttime DATETIME
AS
-- 删除
IF @oper=1
BEGIN
DELETE FROM dbo.t_cdc_ta
WHERE id=@id
END
ELSE IF @oper=2 -- 新增
BEGIN
INSERT INTO dbo.t_cdc_ta(id,NAME,addr,ttime)
VALUES(@id,@name,@addr,@ttime)
END
ELSE IF @oper=4 -- 更新
BEGIN
UPDATE dbo.t_cdc_ta
SET NAME=@name,addr=@addr,ttime=@ttime
WHERE id=@id
END
GO

停用cdc

EXEC sp_cdc_disable_table

EXEC sp_cdc_disable_db

这样能实现一个定时的同步更新,利用作业来不断的读取新增加的lsn来更新目的数据表,当然同步的时间一定要大于数据变更的清理作业的时间,

默认配置cdc的时候会配置两个job

cdc.db1_capture :捕获变更的作业

cdc.db1_cleanup : 数据清理作业 ,每天凌晨两天清理

之前看到一个哥们在同步数据的时候用的 SSIS的条件拆分组件,我测试了下这个数据变更是有先后顺序的,不能直接拆分数据集直接执行,

这里我时显得方式是利用循环组件一条一条数据处理,希望能有更好的办法。。。

转载地址:http://eiqlx.baihongyu.com/

你可能感兴趣的文章
Kafka如何实现每秒上百万的高并发写入
查看>>
css ::selection 的妙用
查看>>
JAVA高并发秒杀API项目的学习笔记
查看>>
web测试之功能测试总结
查看>>
开发工程师人生之路
查看>>
用python实现把数字人民币金额转换成大写的脚本程序
查看>>
Python实现网页截图(PyQT5)
查看>>
【Android】15.3 Notification基础知识
查看>>
ES6学习笔记(五)—— 编程风格
查看>>
Chapter 8. 面向对象(类、对象、字段、方法、属性、构造函数)
查看>>
[BZOJ 3680]吊打XXX(模拟退火)
查看>>
多线程基础知识总结
查看>>
Android 项目隐藏标题栏
查看>>
(九)Maven坐标详解
查看>>
正在与拖延症病魔抗争中
查看>>
[转载]100盏灯泡的开关问题
查看>>
Linux 下执行Mysql的 sql文件
查看>>
node.js中使用 http-proxy 创建代理服务器
查看>>
SDUT 最终排名
查看>>
线性动态规划
查看>>