Monday, June 18, 2012

CDC (Change Data Capture)

Change data capture is designed to capture insert, update, and delete activity applied to SQL Server tables, and to make the details of the changes available in an easily consumed relational format. The change tables used by change data capture contain columns that mirror the column structure of a tracked source table, along with the metadata needed to understand the changes that have occurred. 

Change data capture is available only on the Enterprise, Developer, and Evaluation editions of SQL Server. 

Initial step to create CDC is to create logging table. 
--Examble DB name is "Work" and Table name is "Sales"

CREATE TABLE [dbo].[CDC_Logging](
[StartDateTime] [datetime] NULL,
[EndDateTime] [datetime] NULL,
[ExtractTime] [datetime] NULL,
[Status] [char](10) NULL,
[Type] [varchar](50) NULL
) ON [PRIMARY].

To enable CDC in a Data Base

USE [Work]  --DB Name
GO
EXEC sys.sp_cdc_enable_db
GO

To enable CDC in a Table

EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'sales' ,     --source table name
@role_name = 'db_reader',
@supports_net_changes = 1

To view the changes captured by system

Select * from cdc.dbo_sales_CT (dbo_sales = Capture Instance name)

To disable CDC in DB

USE [Work]
GO
EXEC sys.sp_cdc_disable_table
@source_schema = N'dbo',
@source_name = N'sales',
@capture_instance = N'dbo_sales'
GO

To disable CDC in table

USE [Work]
GO
EXEC sys.sp_cdc_disable_db
GO

Creat variables in package for declared values

BatchNumber   (Datatype - Int32)  (value – "0")
CDCStatus       (Datatype - String) (no value)
DataReady       (Datatype - Int32) (no value)
Delay              (Datatype - Int32) (value – "5")
EndDateTime    (Datatype - String) (value – 1900-01-01)
IsInitialLoad     (Datatype - Byte)  (value – "0")
LastModDate    (Datatype - String) (no value)
RowAffected     (Datatype – Int32) (value – "0")
SQLDateQuery  (Datatype – String) (Value - "SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_sales(null,null,'all with merge')
StartDateTime   (Datatype - String) (no value)
Status              (Datatype – String) (value – "Completed")
TableName       (Datatype - String) (value – Sales)
TempDateTime  (Datatype - String) (no value)

In the Control Flow Tab

get Exe SQL Task – name it as "SQL insert CDC_Logging" and write the below script:

if NOT exists (select 1 from [dbo].[CDC_Logging] where type=?)
INSERT into CDC_Logging(
[StartDateTime]
,[EndDateTime]
,[ExtractTime]
,[Status]
,[Type])
values (
NULL,
NULL,
NULL,
NULL,
?
)
;

And go the Parameter Mapping option and add variable as below

variable Name      Direction  DataType  ParameterName  ParameterSize

User::TableName   Input       varchar            0                   -1
User::TableName   Input       varchar            1                   -1

again get Exe SQL Task – name it as "SQL GetDateTime Imports_CDC_Loggin" and write the below script:

select
?=[status],
? = case when [status] is null or [status] = '' then 1 else 0 end,
? = case
when status is null then Convert(varchar(30),'1900-01-01 00:00:00.000',121)
when status ='Ready' then Convert(varchar(30),StartDateTime,121)
else Convert(varchar(30), EndDateTime,121)
end
from [dbo].[CDC_Logging] where type=?

And go the Parameter Mapping option and select as below

variable Name            Direction   DataType   ParameterName   ParameterSize

User::CDCStatus         Output       varchar             0                      -1
User::IsInitialLoad      Output         Byte               1                      -1
User::TempDateTime   Output       varchar             2                      -1
User::TableName         Input        varchar             3                      -1

again get Exe SQL Task – name it as "SQL Calculate Interval" and write the below script:

declare @startDateTime varchar(30),
@endDateTime varchar(30),
@tempDateTime varchar(30),
@minDateTime varchar(30),
@startLSN binary(10),
@status char(10),
@isInitialLoad bit
--Initialize
set @tempDateTime=?
set @isInitialLoad =?
set @status=?

--Get the MinLSN and map it to datetime type

--If it is the first time to load
if(@isInitialLoad =1)
begin
select @startLSN =sys.fn_cdc_get_min_lsn('dbo_Sales')
select @startDateTime= Convert(varchar(30),sys.fn_cdc_map_lsn_to_time(@startLSN),121)
end
else select @startDateTime =@tempDateTime

-- Get the last LSN for customer changes and change it to end datetime
select @endDateTime = Convert(varchar(30),sys.fn_cdc_map_lsn_to_time(sys.fn_cdc_get_max_lsn()),121)

select ? = Convert(varchar(30),@startDateTime,121)
select ? = Convert(varchar(30),@endDateTime,121)

And go the Parameter Mapping option and add variable as below

variable Name           Direction    DataType   ParameterName   ParameterSize

User::StartDateTime    Output      varchar          3                       -1
User::EndDateTime     Output       varchar          4                       -1
User::TempDateTime   Input        varchar          0                       -1
User::IsInitialLoad      Input         Byte             1                       -1
User::CDCStatus         Input        varchar           2                      -1


again get 'ForLoopContainer' and name it as 'Wait for capture process extraction interval'.
In For Loop Properties section,
set the InitExpression as '@DataReady = 0'
and EvalExpression as '@DataReady == 0'

and inside the container, get the new SQL Task and Name it as 'SQL Check for Data' and write the below script.

declare @startDateTime datetime,
@endDateTime DateTime,
@DataReady int

set @startDateTime =?
set @endDateTime = ?

select @DataReady = 1

if not exists (select tran_end_time from cdc.lsn_time_mapping where tran_end_time <= @startDateTime ) select @DataReady = 0

if not exists (select tran_end_time from cdc.lsn_time_mapping where tran_end_time >= @endDateTime ) select @DataReady = 0

select ?=@DataReady

And go the Parameter Mapping option and select as below

variable               Name Direction  DataType  ParameterName  ParameterSize

User::StartDateTime   Input         varchar             0                    -1
User::EndDateTime     Input         varchar             1                    -1
User::DataReady       Output        Long                2                    -1

again inside the container, get the 'Script Task' named as 'SRC Delay'. Set the 'ReadOnlyVariable' as 'User::Delay'.

And go to Edit Script option – write the below query

{
// TODO: Add your code here
System.Threading.Thread.Sleep((int)Dts.Variables["User::Delay"].Value * 1000);
Dts.TaskResult = (int)ScriptResults.Success;
}


again get Exe SQL Task – name it as "SQL Truncate Sales" and write the below script:

Truncate table [dbo].[StgSales]

again get Script Task – name it as "SRC Generate SQL Script" - In the ReadOnlyVariable option – select "User::EndDateTime,User::StartDateTime" and in the ReadWriteVariables option – select "User::SQLDataQuery".


And go to Edit Script option – write the below query

{
// TODO: Add your code here
Variable varStartDateTime = Dts.Variables["User::StartDateTime"];
Variable varEndDateTime = Dts.Variables["User::EndDateTime"];
Variable varSQLDataQuery = Dts.Variables["User::SQLDataQuery"];

string sStartDateTime = null;
string sEndDateTime = null;

Dts.VariableDispenser.LockForRead("User::StartDateTime");
Dts.VariableDispenser.LockForRead("User::EndDateTime");
Dts.VariableDispenser.LockForWrite("User::SQLDataQuery");

sStartDateTime = varStartDateTime.Value.ToString();
sEndDateTime = varEndDateTime.Value.ToString();
varSQLDataQuery.Value = " Select * FROM [dbo].fn_net_changes_dbo_A10_AccountPledges('" + sStartDateTime + "', '" + sEndDateTime + "','all with merge') ";
Dts.Variables["User::SQLDataQuery"].Value = varSQLDataQuery.Value;
Dts.Variables.Unlock();

Dts.TaskResult = (int)ScriptResults.Success;
}

From the SRC Generate SQL Script – get two line to connect two DFT task

"DFT Load Sales Initial Load (AND) DFT Load CDC Sales"

In connector I – select the Evalution Operation as "Expression and Constraint"
and Value as "Success" and Expression as "@IsInitialLoad==1".

Also select the Multiple Constraints with "AND" option.

And connect line to – "DFT Load Sales Intial Load" (DFT Task)

In which, do as below:

--In the OLEDB source task – select the database name and name of the source table in Connection Manager / and select the columns

--In the derived column task, get the derived column name as Operation and expression as (DT_STR,1,1252)("M")

--In the Row count task – go to Customer Properties option – select the VariableName as "User::RowsAffected".

--In OLEDB Destination task – do select the destinaion file and map the related tables.

In connector II – select the Evalution Operation as "Expression and Constraint"
and Value as "Success" and Expression as "@IsInitialLoad==0".

Also select the Multiple Constraints with "AND" option.

And connect line to – "DFT Load Sales Intial Load" (DFT Task)

In which, do as below:

--In the OLEDB source task – select the database name and select the data access mode as "SQL command from variable" and select the variable name as "User::SQLDataQuery".

In the Variable Value field, update script as below:

Select * FROM [dbo].fn_net_changes_dbo_Sales(null,null,'all with merge')  

--In the derived column task, get the derived column name as Operation and expression as (DT_STR,1,1252)_$operation

--In the Row count task – go to Customer Properties option – select the VariableName as "User::RowsAffected".

--In OLEDB Destination task – do select the destinaion file and map the related tables.


Now, from the two DFT tasks, get connector to point an other SQL task named as 'SQL Maintain Sales_CDC_Logging'.

From the DFT task which is under the IsInitalLoad=1, select the Evalution Operation as "Constraint" and Value as "Success".

Also select the Multiple Constraints with "OR" option.

And connect line to – "SQL Maintain Sales_CDC_Logging "

and from the DFT task which is under the IsInitalLoad=0, select the Evalution Operation as "Constraint" and Value as "Success".

Also select the Multiple Constraints with "OR" option.

And connect line to – "SQL Maintain Sales_CDC_Logging "

Now get into the 'SQL Maintain Sales_CDC_Logging' task and write the script as given below.

merge dbo.cdc_logging as myTarget
using (
select
? as StartDateTime,
? as EndDateTime,
GETDATE() as ExtractTime,
'Ready' as Status,
? as Type
) AS mySource
ON (mySource.Type = myTarget.[Type] )
WHEN MATCHED THEN UPDATE
SET
[EndDateTime] = mySource.EndDateTime
,[StartDateTime]=mySource.StartDateTime
,[ExtractTime] = mySource.ExtractTime
,Status = mySource.[Status]
WHEN NOT MATCHED THEN
INSERT
([StartDateTime]
,[EndDateTime]
,[ExtractTime]
,[Status]
,[Type])
values (
mySource.StartDateTime,
mySource.EndDateTime,
mySource.ExtractTime,
mySource.Status,
mySource.Type
)
;
SELECT ?= @@ROWCOUNT;

And go the Parameter Mapping option and select as below

variable               Name Direction  DataType  ParameterName  ParameterSize

User::StartDateTime   Input            varchar          0                 -1
User::EndDateTime     Intput           varchar          1                 -1
User::TableName       Input            varchar           2                 -1
User::RowsAffected    Output          Long              3                 -1


CDC Select Query to execute

DECLARE @begin_lsn BINARY(10), @end_lsn BINARY(10)
SELECT @begin_lsn = sys.fn_cdc_get_min_lsn('dbo_sales')
SELECT @end_lsn = sys.fn_cdc_get_max_lsn()

SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_sales(@begin_lsn, @end_lsn, 'all with merge');

Identifies the data manipulation language (DML) operation associated with the change. Can be one of the following:
1 = delete
2 = insert
3 = update (old values)
Column data has row values before executing the update statement.
4 = update (new values)
Column data has row values after executing the update statement.

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.