

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 捕获数据更改，以便从 SQL Server 进行持续复制
<a name="CHAP_Source.SQLServer.CDC"></a>

本主题介绍如何在 SQL Server 源上设置 CDC 复制。

**Topics**
+ [捕获本地或 Amazon EC2 上的自管理 SQL Server 的数据更改](#CHAP_Source.SQLServer.CDC.Selfmanaged)
+ [在云 SQL Server 数据库实例上设置持续复制](#CHAP_Source.SQLServer.Configuration)

## 捕获本地或 Amazon EC2 上的自管理 SQL Server 的数据更改
<a name="CHAP_Source.SQLServer.CDC.Selfmanaged"></a>

要从源 Microsoft SQL Server 数据库中捕获更改，请确保该数据库已配置为进行完整备份。将数据库配置为完全恢复模式或批量日志记录模式。

对于自行管理的 SQL Server 源，请 AWS DMS 使用以下内容：

**MS-Replication**  
用于捕获带主键的表的更改。您可以通过向源 SQL Server 实例上的 AWS DMS 端点用户授予系统管理员权限来自动进行配置。或者，您可以按照本节中的步骤准备源代码并使用对端点不具有 sysadmin 权限的 AWS DMS 用户。

**MS-CDC**  
用于捕获不带主键的表的更改。MS-CDC 必须在数据库级别上启用，并且为各个表分别启用。

设置 SQL Server 数据库进行持续复制（CDC）时，可以执行下列操作之一：
+ 使用 sysadmin 角色设置持续复制。
+ 将持续复制设置为不使用 sysadmin 角色。

**注意**  
您可以使用以下脚本查找所有没有主键或唯一键的表：  

```
USE [DBname]
SELECT SCHEMA_NAME(schema_id) AS schema_name, name AS table_name
FROM sys.tables
WHERE OBJECTPROPERTY(object_id, 'TableHasPrimaryKey') = 0
        AND  OBJECTPROPERTY(object_id, 'TableHasUniqueCnst') = 0
ORDER BY schema_name, table_name;
```

### 在自管理 SQL Server 上设置持续复制
<a name="CHAP_Source.SQLServer.CDC.MSCDC"></a>

本部分包含有关使用或不使用 sysadmin 角色在自管理 SQL Server 上设置持续复制的信息。

**Topics**
+ [在自管理 SQL Server 上设置持续复制：使用 sysadmin 角色](#CHAP_Source.SQLServer.CDC.MSCDC.Sysadmin)
+ [在独立 SQL Server 上设置持续复制：不使用 sysadmin 角色](#CHAP_SupportScripts.SQLServer.standalone)
+ [在可用性组环境中的 SQL Server 上设置持续复制：无 sysadmin 角色](#CHAP_SupportScripts.SQLServer.ag)

#### 在自管理 SQL Server 上设置持续复制：使用 sysadmin 角色
<a name="CHAP_Source.SQLServer.CDC.MSCDC.Sysadmin"></a>

AWS DMS SQL Server 的持续复制对带主键的表使用本机 SQL Server 复制，对没有主键的表使用更改数据捕获 (CDC)。

在设置持续复制之前，请参阅[在 SQL Server 源中使用持续复制（CDC）的先决条件](CHAP_Source.SQLServer.md#CHAP_Source.SQLServer.Prerequisites)。

对于带有主键的表，通常 AWS DMS 可以在源上配置所需的工件。但对于自管理 SQL Server 源数据库实例，必须先手动配置 SQL Server 分发。执行此操作后，具有 sysadmin 权限的 AWS DMS 源用户可以自动为具有主键的表创建发布。

要检查是否已配置分发，请运行以下命令。

```
sp_get_distributor
```

如果列分发的结果是 `NULL`，则未配置分发。请使用以下过程设置分发。<a name="CHAP_Source.SQLServer.CDC.MSCDC.Setup"></a>

**设置分发**

1. 使用 SQL Server Management Studio（SSMS）工具连接到 SQL Server 源数据库。

1. 打开**复制**文件夹的上下文（右键单击）菜单，然后选择**配置分发**。此时将显示“配置分发向导”。

1. 按照向导输入默认值并创建分发。<a name="CHAP_Source.SQLServer.CDC.MSCDC.Setup.CDC"></a>

**设置 CDC**

AWS DMS 如果您不使用只读副本，3.4.7 及更高版本可以自动为您的数据库和所有表设置 MS CDC。要使用此功能，请将 `SetUpMsCdcForTables` ECA 设置为 true。有关的信息 ECAs，请参见[端点设置](CHAP_Source.SQLServer.md#CHAP_Source.SQLServer.ConnectionAttrib)。

对于 3.4.7 AWS DMS 之前的版本或作为源的只读副本，请执行以下步骤：

1. 对于无主键的表，请为数据库设置 MS-CDC。要执行此操作，请使用分配了 sysadmin 角色的账户，然后运行以下命令。

   ```
   use [DBname]
   EXEC sys.sp_cdc_enable_db
   ```

1. 接下来，为每个源表设置 MS-CDC。对于具有唯一键但没有主键的每个表，运行以下查询来设置 MS-CDC。

   ```
   exec sys.sp_cdc_enable_table
   @source_schema = N'schema_name',
   @source_name = N'table_name',
   @index_name = N'unique_index_name',
   @role_name = NULL,
   @supports_net_changes = 1
   GO
   ```

1. 对于没有主键或唯一键的每个表，运行以下查询来设置 MS-CDC。

   ```
   exec sys.sp_cdc_enable_table
   @source_schema = N'schema_name',
   @source_name = N'table_name',
   @role_name = NULL
   GO
   ```

有关为特定表设置 MS-CDC 的更多信息，请参阅 [ SQL Server 文档](https://msdn.microsoft.com/en-us/library/cc627369.aspx)。

#### 在独立 SQL Server 上设置持续复制：不使用 sysadmin 角色
<a name="CHAP_SupportScripts.SQLServer.standalone"></a>

本节介绍如何针对 SQL Server 数据库源设置不需要用户账户具有 sysadmin 权限的持续复制。

**注意**  
运行本节中的步骤后，非 sysadmin DMS 用户将有权执行以下操作：  
从联机事务日志文件中读取更改
通过访问磁盘从事务日志备份文件中读取更改
添加或更改 DMS 使用的发布内容
向发布内容中添加文章

1. 按照[捕获数据更改，以便从 SQL Server 进行持续复制](#CHAP_Source.SQLServer.CDC)中所述设置用于复制的 Microsoft SQL Server。

1. 在源数据库上启用 MS-REPLICATION。这可以手动完成，也可以通过 sysadmin 用户身份运行一次任务来完成。

1. 使用以下脚本，在源数据库上创建 `awsdms` 架构：

   ```
   use master
   go
   create schema awsdms
   go
   
   
   -- Create the table valued function [awsdms].[split_partition_list] on the Master database, as follows:
   USE [master]
   GO
   
   set ansi_nulls on
   go
   
   set quoted_identifier on
   go
   
   if (object_id('[awsdms].[split_partition_list]','TF')) is not null
   
   drop function [awsdms].[split_partition_list];
   
   go
   
   create function [awsdms].[split_partition_list]
   
   (
   
   @plist varchar(8000), --A delimited list of partitions
   
   @dlm nvarchar(1) --Delimiting character
   
   )
   
   returns @partitionsTable table --Table holding the BIGINT values of the string fragments
   
   (
   
   pid bigint primary key
   
   )   
   
   as
   
   begin
   
   declare @partition_id bigint;
   
   declare @dlm_pos integer;
   
   declare @dlm_len integer;
   
   set @dlm_len = len(@dlm);
   
   while (charindex(@dlm,@plist)>0)
   
   begin
   
   set @dlm_pos = charindex(@dlm,@plist);
   
   set @partition_id = cast( ltrim(rtrim(substring(@plist,1,@dlm_pos-1))) as bigint);
   
   insert into @partitionsTable (pid) values (@partition_id)
   
   set @plist = substring(@plist,@dlm_pos+@dlm_len,len(@plist));
   
   end
   
   set @partition_id = cast (ltrim(rtrim(@plist)) as bigint);
   
   insert into @partitionsTable (pid) values ( @partition_id );
   
   return
   
   end
   
   GO
   ```

1. 使用以下脚本，在 Master 数据库上创建 `[awsdms].[rtm_dump_dblog]` 过程：

   ```
   use [MASTER]
   
   go
   
   if (object_id('[awsdms].[rtm_dump_dblog]','P')) is not null drop procedure [awsdms].[rtm_dump_dblog];
   go
   
   
   set ansi_nulls on
   go
   
   set quoted_identifier on
   GO
   
   
   
   CREATE procedure [awsdms].[rtm_dump_dblog]
   
   (
   
   @start_lsn varchar(32),
   
   @seqno integer,
   
   @filename varchar(260),
   
   @partition_list varchar(8000), -- A comma delimited list: P1,P2,... Pn
   
   @programmed_filtering integer,
   
   @minPartition bigint,
   
   @maxPartition bigint
   
   )
   
   as begin
   
   declare @start_lsn_cmp varchar(32); -- Stands against the GT comparator
   
   SET NOCOUNT ON -- – Disable "rows affected display"
   
   set @start_lsn_cmp = @start_lsn;
   
   if (@start_lsn_cmp) is null
   
   set @start_lsn_cmp = '00000000:00000000:0000';
   
   if (@partition_list is null)
   
   begin
   
   RAISERROR ('Null partition list waspassed',16,1);
   
   return
   
   end
   
   if (@start_lsn) is not null
   
   set @start_lsn = '0x'+@start_lsn;
   
   if (@programmed_filtering=0)
   
   
   
   SELECT
   
   [Current LSN],
   
   [operation],
   
   [Context],
   
   [Transaction ID],
   
   [Transaction Name],
   
   [Begin Time],
   
   [End Time],
   
   [Flag Bits],
   
   [PartitionID],
   
   [Page ID],
   
   [Slot ID],
   
   [RowLog Contents 0],
   
   [Log Record],
   
   [RowLog Contents 1]
   
   FROM
   
   fn_dump_dblog (
   
   @start_lsn, NULL, N'DISK', @seqno, @filename,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default)
   
   where [Current LSN] collate SQL_Latin1_General_CP1_CI_AS > @start_lsn_cmp collate SQL_Latin1_General_CP1_CI_AS
   
   and
   
   (
   
   ( [operation] in ('LOP_BEGIN_XACT','LOP_COMMIT_XACT','LOP_ABORT_XACT') )
   
   or
   
   ( [operation] in ('LOP_INSERT_ROWS','LOP_DELETE_ROWS','LOP_MODIFY_ROW')
   
   and
   
   ( ( [context] in ('LCX_HEAP','LCX_CLUSTERED','LCX_MARK_AS_GHOST') ) or ([context] = 'LCX_TEXT_MIX' and (datalength([RowLog Contents 0]) in (0,1))))
   
   and [PartitionID] in ( select * from master.awsdms.split_partition_list (@partition_list,','))
   
   )
   
   or
   
   ([operation] = 'LOP_HOBT_DDL')
   
   )
   
   
   else
   
   
   SELECT
   
   [Current LSN],
   
   [operation],
   
   [Context],
   
   [Transaction ID],
   
   [Transaction Name],
   
   [Begin Time],
   
   [End Time],
   
   [Flag Bits],
   
   [PartitionID],
   
   [Page ID],
   
   [Slot ID],
   
   [RowLog Contents 0],
   
   [Log Record],
   
   [RowLog Contents 1] -- After Image
   
   FROM
   
   fn_dump_dblog (
   
   @start_lsn, NULL, N'DISK', @seqno, @filename,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default,
   
   default, default, default, default, default, default, default)
   
   where [Current LSN] collate SQL_Latin1_General_CP1_CI_AS > @start_lsn_cmp collate SQL_Latin1_General_CP1_CI_AS
   
   and
   
   (
   
   ( [operation] in ('LOP_BEGIN_XACT','LOP_COMMIT_XACT','LOP_ABORT_XACT') )
   
   or
   
   ( [operation] in ('LOP_INSERT_ROWS','LOP_DELETE_ROWS','LOP_MODIFY_ROW')
   
   and
   
   ( ( [context] in ('LCX_HEAP','LCX_CLUSTERED','LCX_MARK_AS_GHOST') ) or ([context] = 'LCX_TEXT_MIX' and (datalength([RowLog Contents 0]) in (0,1))))
   
   and ([PartitionID] is not null) and ([PartitionID] >= @minPartition and [PartitionID]<=@maxPartition)
   
   )
   
   or
   
   ([operation] = 'LOP_HOBT_DDL')
   
   )
   
   
   
   SET NOCOUNT OFF -- Re-enable "rows affected display"
   
   end
   
   GO
   ```

1. 使用以下脚本，在 Master 数据库上创建证书：

   ```
   Use [master]
   Go
   
   CREATE CERTIFICATE [awsdms_rtm_dump_dblog_cert] ENCRYPTION BY PASSWORD = N'@5trongpassword'
   
   WITH SUBJECT = N'Certificate for FN_DUMP_DBLOG Permissions';
   ```

1. 使用以下脚本，从证书创建登录名：

   ```
   Use [master]
   Go
   
   CREATE LOGIN awsdms_rtm_dump_dblog_login FROM CERTIFICATE [awsdms_rtm_dump_dblog_cert];
   ```

1. 使用以下脚本，将登录名添加到 sysadmin 服务器角色：

   ```
   ALTER SERVER ROLE [sysadmin] ADD MEMBER [awsdms_rtm_dump_dblog_login];
   ```

1. 通过以下脚本，使用证书将签名添加到 [master].[awsdms].[rtm\$1dump\$1dblog]。

   ```
   Use [master]
   GO
   ADD SIGNATURE
   TO [master].[awsdms].[rtm_dump_dblog] BY CERTIFICATE [awsdms_rtm_dump_dblog_cert] WITH PASSWORD = '@5trongpassword';
   ```
**注意**  
如果重新创建存储过程，您需要再次添加签名。

1. 使用以下脚本在 Master 数据库上创建 [awsdms].[rtm\$1position\$11st\$1timestamp]：

   ```
   use [master]
       if object_id('[awsdms].[rtm_position_1st_timestamp]','P') is not null
       DROP PROCEDURE [awsdms].[rtm_position_1st_timestamp];
       go
       create procedure [awsdms].[rtm_position_1st_timestamp]
       (
       @dbname                sysname,      -- Database name
       @seqno                 integer,      -- Backup set sequence/position number within file
       @filename              varchar(260), -- The backup filename
       @1stTimeStamp          varchar(40)   -- The timestamp to position by
       ) 
       as begin
   
       SET NOCOUNT ON       -- Disable "rows affected display"
   
       declare @firstMatching table
       (
       cLsn varchar(32),
       bTim datetime
       )
   
       declare @sql nvarchar(4000)
       declare @nl                       char(2)
       declare @tb                       char(2)
       declare @fnameVar                 nvarchar(254) = 'NULL'
   
       set @nl  = char(10); -- New line
       set @tb  = char(9)   -- Tab separator
   
       if (@filename is not null)
       set @fnameVar = ''''+@filename +''''
   
       set @sql='use ['+@dbname+'];'+@nl+
       'select top 1 [Current LSN],[Begin Time]'+@nl+
       'FROM fn_dump_dblog (NULL, NULL, NULL, '+ cast(@seqno as varchar(10))+','+ @fnameVar+','+@nl+
       @tb+'default, default, default, default, default, default, default,'+@nl+
       @tb+'default, default, default, default, default, default, default,'+@nl+
       @tb+'default, default, default, default, default, default, default,'+@nl+
       @tb+'default, default, default, default, default, default, default,'+@nl+
       @tb+'default, default, default, default, default, default, default,'+@nl+
       @tb+'default, default, default, default, default, default, default,'+@nl+
       @tb+'default, default, default, default, default, default, default,'+@nl+
       @tb+'default, default, default, default, default, default, default,'+@nl+
       @tb+'default, default, default, default, default, default, default)'+@nl+
       'where operation=''LOP_BEGIN_XACT''' +@nl+
       'and [Begin Time]>= cast('+''''+@1stTimeStamp+''''+' as datetime)'+@nl
   
       --print @sql
       delete from  @firstMatching 
       insert into @firstMatching  exec sp_executesql @sql    -- Get them all
   
       select top 1 cLsn as [matching LSN],convert(varchar,bTim,121) as [matching Timestamp] from @firstMatching;
   
       SET NOCOUNT OFF      -- Re-enable "rows affected display"
   
       end
       GO
   ```

1. 使用以下脚本，在 Master 数据库上创建证书：

   ```
   Use [master]
   Go
   CREATE CERTIFICATE [awsdms_rtm_position_1st_timestamp_cert]
   ENCRYPTION BY PASSWORD = '@5trongpassword'
   WITH SUBJECT = N'Certificate for FN_POSITION_1st_TIMESTAMP Permissions';
   ```

1. 使用以下脚本，从证书创建登录名：

   ```
   Use [master]
   Go
   CREATE LOGIN awsdms_rtm_position_1st_timestamp_login FROM CERTIFICATE [awsdms_rtm_position_1st_timestamp_cert];
   ```

1. 使用以下脚本，将登录名添加到 sysadmin 角色：

   ```
   ALTER SERVER ROLE [sysadmin] ADD MEMBER [awsdms_rtm_position_1st_timestamp_login];
   ```

1. 通过以下脚本，使用证书将签名添加到 [master].[awsdms].[rtm\$1position\$11st\$1timestamp]：

   ```
   Use [master]
       GO
       ADD SIGNATURE
       TO [master].[awsdms].[rtm_position_1st_timestamp]
       BY CERTIFICATE [awsdms_rtm_position_1st_timestamp_cert]
       WITH PASSWORD = '@5trongpassword';
   ```

1. 使用以下脚本向 DMS 用户授予对新存储过程的执行访问权限：

   ```
   use master
   go
   GRANT execute on [awsdms].[rtm_position_1st_timestamp] to dms_user;
   ```

1. 在以下每个数据库中，创建具有以下权限和角色的用户：
**注意**  
您应该在每个副本上使用相同的 SID 创建 dmsnosysadmin 用户账户。以下 SQL 查询可以帮助验证每个副本上的 dmsnosysadmin 账户 SID 值。有关创建用户的更多信息，请参阅 [Microsoft SQL Server 文档](https://learn.microsoft.com/en-us/sql/)中的 [CREATE USER（Transact-SQL）](https://learn.microsoft.com/en-us/sql/t-sql/statements/create-user-transact-sql)。有关为 Azure SQL 数据库创建 SQL 用户账户的更多信息，请参阅[活动异地复制](https://learn.microsoft.com/en-us/azure/azure-sql/database/active-geo-replication-overview)。

   ```
   use master
   go
   grant select on sys.fn_dblog to [DMS_user]
   grant view any definition to [DMS_user]
   grant view server state to [DMS_user]--(should be granted to the login).
   grant execute on sp_repldone to [DMS_user]
   grant execute on sp_replincrementlsn to [DMS_user]
   grant execute on sp_addpublication to [DMS_user]
   grant execute on sp_addarticle to [DMS_user]
   grant execute on sp_articlefilter to [DMS_user]
   grant select on [awsdms].[split_partition_list] to [DMS_user]
   grant execute on [awsdms].[rtm_dump_dblog] to [DMS_user]
   ```

   ```
   use msdb
   go
   grant select on msdb.dbo.backupset to self_managed_user
   grant select on msdb.dbo.backupmediafamily to self_managed_user
   grant select on msdb.dbo.backupfile to self_managed_user
   ```

   在源数据库上运行以下脚本：

   ```
   use Source_DB
       Go
       EXEC sp_addrolemember N'db_owner', N'DMS_user'
   ```

1. 最后，向源 SQL Server 端点添加额外连接属性（ECA）：

   ```
   enableNonSysadminWrapper=true;
   ```

#### 在可用性组环境中的 SQL Server 上设置持续复制：无 sysadmin 角色
<a name="CHAP_SupportScripts.SQLServer.ag"></a>

本节介绍如何在不需要用户账户具有 sysadmin 权限的可用性组环境中为 SQL Server 数据库源设置持续复制。

**注意**  
运行本节中的步骤后，非 sysadmin DMS 用户将有权执行以下操作：  
从联机事务日志文件中读取更改
通过访问磁盘从事务日志备份文件中读取更改
添加或更改 DMS 使用的发布内容
向发布内容中添加文章

**在可用性组环境中设置持续复制而不使用 sysadmin 用户**

1. 按照[捕获数据更改，以便从 SQL Server 进行持续复制](#CHAP_Source.SQLServer.CDC)中所述设置用于复制的 Microsoft SQL Server。

1. 在源数据库上启用 MS-REPLICATION。这可以手动完成，也可以使用 sysadmin 用户身份运行一次任务来完成。
**注意**  
您应该将 MS-REPLICATION 分发服务器配置为本地分发服务器，或者配置为允许非 sysadmin 用户通过关联的链接服务器进行访问。

1. 如果启用**在单个任务中独占使用 sp\$1repldone** 端点选项，请停止 MS-REPLICATION 日志读取器作业。

1. 在每个副本上，执行以下步骤：

   1. 在 Master 数据库中创建 `[awsdms]`[awsdms] 架构：

      ```
      CREATE SCHEMA [awsdms]
      ```

   1. 在 Master 数据库中创建 `[awsdms].[split_partition_list]` 表值函数：

      ```
      USE [master]
      GO
      
      SET ansi_nulls on
      GO
        
      SET quoted_identifier on
      GO
      
      IF (object_id('[awsdms].[split_partition_list]','TF')) is not null
        DROP FUNCTION [awsdms].[split_partition_list];
      GO
      
      CREATE FUNCTION [awsdms].[split_partition_list] 
      ( 
        @plist varchar(8000),    --A delimited list of partitions    
        @dlm nvarchar(1)    --Delimiting character
      ) 
      RETURNS @partitionsTable table --Table holding the BIGINT values of the string fragments
      (
        pid bigint primary key
      ) 
      AS 
      BEGIN
        DECLARE @partition_id bigint;
        DECLARE @dlm_pos integer;
        DECLARE @dlm_len integer;  
        SET @dlm_len = len(@dlm);
        WHILE (charindex(@dlm,@plist)>0)
        BEGIN 
          SET @dlm_pos = charindex(@dlm,@plist);
          SET @partition_id = cast( ltrim(rtrim(substring(@plist,1,@dlm_pos-1))) as bigint);
          INSERT into @partitionsTable (pid) values (@partition_id)
          SET @plist = substring(@plist,@dlm_pos+@dlm_len,len(@plist));
        END 
        SET @partition_id = cast (ltrim(rtrim(@plist)) as bigint);
        INSERT into @partitionsTable (pid) values (  @partition_id  );
        RETURN
      END
      GO
      ```

   1. 在 Master 数据库中创建 `[awsdms].[rtm_dump_dblog]` 过程：

      ```
      USE [MASTER] 
      GO
      
      IF (object_id('[awsdms].[rtm_dump_dblog]','P')) is not null
        DROP PROCEDURE [awsdms].[rtm_dump_dblog]; 
      GO
      
      SET ansi_nulls on
      GO 
      
      SET quoted_identifier on 
      GO
                                          
      CREATE PROCEDURE [awsdms].[rtm_dump_dblog]
      (
        @start_lsn            varchar(32),
        @seqno                integer,
        @filename             varchar(260),
        @partition_list       varchar(8000), -- A comma delimited list: P1,P2,... Pn
        @programmed_filtering integer,
        @minPartition         bigint,
        @maxPartition         bigint
      ) 
      AS 
      BEGIN
      
        DECLARE @start_lsn_cmp varchar(32); -- Stands against the GT comparator
      
        SET NOCOUNT ON  -- Disable "rows affected display"
      
        SET @start_lsn_cmp = @start_lsn;
        IF (@start_lsn_cmp) is null
          SET @start_lsn_cmp = '00000000:00000000:0000';
      
        IF (@partition_list is null)
          BEGIN
            RAISERROR ('Null partition list was passed',16,1);
            return
            --set @partition_list = '0,';    -- A dummy which is never matched
          END
      
        IF (@start_lsn) is not null
          SET @start_lsn = '0x'+@start_lsn;
      
        IF (@programmed_filtering=0)
          SELECT
            [Current LSN],
            [operation],
            [Context],
            [Transaction ID],
            [Transaction Name],
            [Begin Time],
            [End Time],
            [Flag Bits],
            [PartitionID],
            [Page ID],
            [Slot ID],
            [RowLog Contents 0],
            [Log Record],
            [RowLog Contents 1] -- After Image
          FROM
            fn_dump_dblog (
              @start_lsn, NULL, N'DISK', @seqno, @filename,
              default, default, default, default, default, default, default,
              default, default, default, default, default, default, default,
              default, default, default, default, default, default, default,
              default, default, default, default, default, default, default,
              default, default, default, default, default, default, default,
              default, default, default, default, default, default, default,
              default, default, default, default, default, default, default,
              default, default, default, default, default, default, default,
              default, default, default, default, default, default, default)
          WHERE 
            [Current LSN] collate SQL_Latin1_General_CP1_CI_AS > @start_lsn_cmp collate SQL_Latin1_General_CP1_CI_AS -- This aims for implementing FN_DBLOG based on GT comparator.
            AND
            (
              (  [operation] in ('LOP_BEGIN_XACT','LOP_COMMIT_XACT','LOP_ABORT_XACT') )
              OR
              (  [operation] in ('LOP_INSERT_ROWS','LOP_DELETE_ROWS','LOP_MODIFY_ROW')
                AND
                ( ( [context]   in ('LCX_HEAP','LCX_CLUSTERED','LCX_MARK_AS_GHOST') ) or ([context] = 'LCX_TEXT_MIX') )
                AND       
                [PartitionID] in ( select * from master.awsdms.split_partition_list (@partition_list,','))
              )
            OR
            ([operation] = 'LOP_HOBT_DDL')
          )
          ELSE
            SELECT
              [Current LSN],
              [operation],
              [Context],
              [Transaction ID],
              [Transaction Name],
              [Begin Time],
              [End Time],
              [Flag Bits],
              [PartitionID],
              [Page ID],
              [Slot ID],
              [RowLog Contents 0],
              [Log Record],
              [RowLog Contents 1] -- After Image
            FROM
              fn_dump_dblog (
                @start_lsn, NULL, N'DISK', @seqno, @filename,
                default, default, default, default, default, default, default,
                default, default, default, default, default, default, default,
                default, default, default, default, default, default, default,
                default, default, default, default, default, default, default,
                default, default, default, default, default, default, default,
                default, default, default, default, default, default, default,
                default, default, default, default, default, default, default,
                default, default, default, default, default, default, default,
                default, default, default, default, default, default, default)
            WHERE [Current LSN] collate SQL_Latin1_General_CP1_CI_AS > @start_lsn_cmp collate SQL_Latin1_General_CP1_CI_AS -- This aims for implementing FN_DBLOG based on GT comparator.
            AND
            (
              (  [operation] in ('LOP_BEGIN_XACT','LOP_COMMIT_XACT','LOP_ABORT_XACT') )
              OR
              (  [operation] in ('LOP_INSERT_ROWS','LOP_DELETE_ROWS','LOP_MODIFY_ROW')
                AND
                ( ( [context]   in ('LCX_HEAP','LCX_CLUSTERED','LCX_MARK_AS_GHOST') ) or ([context] = 'LCX_TEXT_MIX') )
                AND ([PartitionID] is not null) and ([PartitionID] >= @minPartition and [PartitionID]<=@maxPartition)
              )
              OR
              ([operation] = 'LOP_HOBT_DDL')
            )
            SET NOCOUNT OFF -- Re-enable "rows affected display"
      END
      GO
      ```

   1. 在 Master 数据库上创建证书：

      ```
      USE [master]
      GO
      CREATE CERTIFICATE [awsdms_rtm_dump_dblog_cert]
        ENCRYPTION BY PASSWORD = N'@hardpassword1'
        WITH SUBJECT = N'Certificate for FN_DUMP_DBLOG Permissions'
      ```

   1. 从证书创建登录名：

      ```
      USE [master]
      GO
      CREATE LOGIN awsdms_rtm_dump_dblog_login FROM CERTIFICATE
        [awsdms_rtm_dump_dblog_cert];
      ```

   1. 将登录名添加到 sysadmin 服务器角色：

      ```
      ALTER SERVER ROLE [sysadmin] ADD MEMBER [awsdms_rtm_dump_dblog_login];
      ```

   1. 使用证书将签名添加到 [master].[awsdms].[rtm\$1dump\$1dblog] 过程：

      ```
      USE [master]
      GO
      
      ADD SIGNATURE
        TO [master].[awsdms].[rtm_dump_dblog]
        BY CERTIFICATE [awsdms_rtm_dump_dblog_cert]
        WITH PASSWORD = '@hardpassword1';
      ```
**注意**  
如果重新创建存储过程，您需要再次添加签名。

   1. 在 Master 数据库中创建 `[awsdms].[rtm_position_1st_timestamp]` 过程：

      ```
      USE [master]
      IF object_id('[awsdms].[rtm_position_1st_timestamp]','P') is not null
        DROP PROCEDURE [awsdms].[rtm_position_1st_timestamp];
      GO
      CREATE PROCEDURE [awsdms].[rtm_position_1st_timestamp]
      (
        @dbname                sysname,      -- Database name
        @seqno                 integer,      -- Backup set sequence/position number within file
        @filename              varchar(260), -- The backup filename
        @1stTimeStamp          varchar(40)   -- The timestamp to position by
      ) 
      AS 
      BEGIN
        SET NOCOUNT ON       -- Disable "rows affected display"
      
        DECLARE @firstMatching table
        (
          cLsn varchar(32),
          bTim datetime
        )
        DECLARE @sql nvarchar(4000)
        DECLARE @nl                       char(2)
        DECLARE @tb                       char(2)
        DECLARE @fnameVar                 sysname = 'NULL'
      
        SET @nl  = char(10); -- New line
        SET @tb  = char(9)   -- Tab separator
      
        IF (@filename is not null)
          SET @fnameVar = ''''+@filename +''''
        SET @filename = ''''+@filename +''''
        SET @sql='use ['+@dbname+'];'+@nl+
          'SELECT TOP 1 [Current LSN],[Begin Time]'+@nl+
          'FROM fn_dump_dblog (NULL, NULL, NULL, '+ cast(@seqno as varchar(10))+','+ @filename +','+@nl+
          @tb+'default, default, default, default, default, default, default,'+@nl+
          @tb+'default, default, default, default, default, default, default,'+@nl+
          @tb+'default, default, default, default, default, default, default,'+@nl+
          @tb+'default, default, default, default, default, default, default,'+@nl+
          @tb+'default, default, default, default, default, default, default,'+@nl+
          @tb+'default, default, default, default, default, default, default,'+@nl+
          @tb+'default, default, default, default, default, default, default,'+@nl+
          @tb+'default, default, default, default, default, default, default,'+@nl+
          @tb+'default, default, default, default, default, default, default)'+@nl+
          'WHERE operation=''LOP_BEGIN_XACT''' +@nl+
          'AND [Begin Time]>= cast('+''''+@1stTimeStamp+''''+' as datetime)'+@nl
      
          --print @sql
          DELETE FROM @firstMatching 
          INSERT INTO @firstMatching  exec sp_executesql @sql    -- Get them all
          SELECT TOP 1 cLsn as [matching LSN],convert(varchar,bTim,121) AS[matching Timestamp] FROM @firstMatching;
      
          SET NOCOUNT OFF      -- Re-enable "rows affected display"
      
      END
      GO
      ```

   1. 在 Master 数据库上创建证书：

      ```
      USE [master]
      GO
      CREATE CERTIFICATE [awsdms_rtm_position_1st_timestamp_cert]
        ENCRYPTION BY PASSWORD = N'@hardpassword1'
        WITH SUBJECT = N'Certificate for FN_POSITION_1st_TIMESTAMP Permissions';
      ```

   1. 从证书创建登录名：

      ```
      USE [master]
      GO
      CREATE LOGIN awsdms_rtm_position_1st_timestamp_login FROM CERTIFICATE
        [awsdms_rtm_position_1st_timestamp_cert];
      ```

   1. 将登录名添加到 sysadmin 服务器角色：

      ```
      ALTER SERVER ROLE [sysadmin] ADD MEMBER [awsdms_rtm_position_1st_timestamp_login];
      ```

   1. 使用证书将签名添加到 `[master].[awsdms].[rtm_position_1st_timestamp]` 过程：

      ```
      USE [master]
      GO
      ADD SIGNATURE
        TO [master].[awsdms].[rtm_position_1st_timestamp]
        BY CERTIFICATE [awsdms_rtm_position_1st_timestamp_cert]
        WITH PASSWORD = '@hardpassword1';
      ```
**注意**  
如果重新创建存储过程，您需要再次添加签名。

   1. 在以下每个数据库 permissions/roles 中创建具有以下内容的用户：
**注意**  
您应该在每个副本上使用相同的 SID 创建 dmsnosysadmin 用户账户。以下 SQL 查询可以帮助验证每个副本上的 dmsnosysadmin 账户 SID 值。有关创建用户的更多信息，请参阅 [Microsoft SQL Server 文档](https://learn.microsoft.com/en-us/sql/)中的 [CREATE USER（Transact-SQL）](https://learn.microsoft.com/en-us/sql/t-sql/statements/create-user-transact-sql)。有关为 Azure SQL 数据库创建 SQL 用户账户的更多信息，请参阅[活动异地复制](https://learn.microsoft.com/en-us/azure/azure-sql/database/active-geo-replication-overview)。

      ```
      SELECT @@servername servername, name, sid, create_date, modify_date
        FROM sys.server_principals
        WHERE name = 'dmsnosysadmin';
      ```

   1. 在每个副本上授予对 Master 数据库的权限：

      ```
      USE master
      GO 
      
      GRANT select on sys.fn_dblog to dmsnosysadmin;
      GRANT view any definition to dmsnosysadmin;
      GRANT view server state to dmsnosysadmin -- (should be granted to the login).
      GRANT execute on sp_repldone to dmsnosysadmin;
      GRANT execute on sp_replincrementlsn to dmsnosysadmin;
      GRANT execute on sp_addpublication to dmsnosysadmin;
      GRANT execute on sp_addarticle to dmsnosysadmin;
      GRANT execute on sp_articlefilter to dmsnosysadmin;
      GRANT select on [awsdms].[split_partition_list] to dmsnosysadmin;
      GRANT execute on [awsdms].[rtm_dump_dblog] to dmsnosysadmin;
      GRANT execute on [awsdms].[rtm_position_1st_timestamp] to dmsnosysadmin;
      ```

   1. 在每个副本上授予对 MSDB 数据库的权限：

      ```
      USE msdb
      GO
      GRANT select on msdb.dbo.backupset TO self_managed_user
      GRANT select on msdb.dbo.backupmediafamily TO self_managed_user
      GRANT select on msdb.dbo.backupfile TO self_managed_user
      ```

   1. 在源数据库上将 `db_owner` 角色添加到 `dmsnosysadmin`。由于数据库已同步，您只能在主副本上添加角色。

      ```
      use <source DB>
      GO 
      EXEC sp_addrolemember N'db_owner', N'dmsnosysadmin'
      ```

## 在云 SQL Server 数据库实例上设置持续复制
<a name="CHAP_Source.SQLServer.Configuration"></a>

此部分介绍如何在云托管 SQL Server 数据库实例上设置 CDC。云托管的 SQL 服务器实例是在 Amazon RDS for SQL Server、Azure SQL 托管实例或任何其他托管云 SQL Server 实例上运行的实例。有关每种数据库类型的持续复制限制的信息，请参阅[使用 SQL Server 作为源代码的限制 AWS DMS](CHAP_Source.SQLServer.md#CHAP_Source.SQLServer.Limitations)。

在设置持续复制之前，请参阅[在 SQL Server 源中使用持续复制（CDC）的先决条件](CHAP_Source.SQLServer.md#CHAP_Source.SQLServer.Prerequisites)。

不同于自管理 Microsoft SQL Server 源，Amazon RDS for SQL Server 不支持 MS-Replication。因此， AWS DMS 需要对带或不带主键的表使用 MS-CDC。

Amazon RDS 不授予系统管理员设置用于源 SQL Server 实例中持续更改的复制项目的权限。 AWS DMS 请确保在以下过程中使用主用户权限为 Amazon RDS 实例启用 MS-CDC。

**在云 SQL Server 数据库实例上启用 MS-CDC**

1. 在数据库级别运行以下查询之一。

   对于 RDS for SQL Server 数据库实例，请使用此查询。

   ```
   exec msdb.dbo.rds_cdc_enable_db 'DB_name'
   ```

   对于 Azure SQL 托管数据库实例，请使用此查询。

   ```
   USE DB_name 
   GO 
   EXEC sys.sp_cdc_enable_db 
   GO
   ```

1. 对于带主键的每个表，运行以下查询来启用 MS-CDC。

   ```
   exec sys.sp_cdc_enable_table
   @source_schema = N'schema_name',
   @source_name = N'table_name',
   @role_name = NULL,
   @supports_net_changes = 1
   GO
   ```

   对于有唯一键但没有主键的每个表，运行以下查询来启用 MS-CDC。

   ```
   exec sys.sp_cdc_enable_table
   @source_schema = N'schema_name',
   @source_name = N'table_name',
   @index_name = N'unique_index_name',
   @role_name = NULL,
   @supports_net_changes = 1
   GO
   ```

    对于既没有主键又没有唯一键的每个表，运行以下查询来启用 MS-CDC。

   ```
   exec sys.sp_cdc_enable_table
   @source_schema = N'schema_name',
   @source_name = N'table_name',
   @role_name = NULL
   GO
   ```

1. 设置保留期：
   + 对于使用 DMS 版本 3.5.3 及更高版本进行复制的 RDS for SQL Server 实例，请确保将保留期设置为 5 秒（默认值）。如果要从 DMS 3.5.2 及更低版本升级或迁移到 DMS 3.5.3 及更高版本，当任务在新实例或升级后的实例上运行后，请更改轮询间隔值。以下脚本将保留期设置为 5 秒：

     ```
     use dbname
     EXEC sys.sp_cdc_change_job @job_type = 'capture' ,@pollinginterval = 5
     exec sp_cdc_stop_job 'capture'
     exec sp_cdc_start_job 'capture'
     ```
   + 参数 `@pollinginterval` 按秒计，建议值设置为 86399。当该参数为 `@pollinginterval = 86399` 时，这意味着事务日志将更改保留 86399 秒（一天）。过程 `exec sp_cdc_start_job 'capture'` 启动这些设置。
**注意**  
对于某些 SQL Server 版本，如果 `pollinginterval` 的值设置为超过 3599 秒，该值将重置为默认的五秒。发生这种情况时，T-Log 条目会在读取之前 AWS DMS 被清除。要确定哪些 SQL Server 版本受到此已知问题的影响，请参阅[这篇 Microsoft 知识库文章](https://support.microsoft.com/en-us/topic/kb4459220-fix-incorrect-results-occur-when-you-convert-pollinginterval-parameter-from-seconds-to-hours-in-sys-sp-cdc-scan-in-sql-server-dac8aefe-b60b-7745-f987-582dda2cfa78)。

     如果您使用带有多可用区的 Amazon RDS，请确保也设置辅助项，以便在失效转移时具有正确的值。

     ```
     exec rdsadmin..rds_set_configuration 'cdc_capture_pollinginterval' , <5 or preferred value>
     ```

**在 AWS DMS 复制任务停止超过一小时时保持保留期**
**注意**  
使用 DMS 3.5.3 及更高版本复制 RDS for SQL Server 源时无需执行以下步骤。

1. 使用以下命令停止截断事务日志的作业。

   ```
   exec sp_cdc_stop_job 'capture'
   ```

1. 在 AWS DMS 控制台上找到您的任务并继续执行任务。

1. 选择**监控**选项卡，然后勾选 `CDCLatencySource` 指标。

1. 如果 `CDCLatencySource` 指标等于 0（零）并保持不变，请使用以下命令重新启动截断事务日志的作业。

   ```
   exec sp_cdc_start_job 'capture'
   ```

记得启动截断 SQL Server 事务日志的作业。否则，您的 SQL Server 实例上的存储空间可能会被填满。

### 使用适用于 SQL Server 的 RDS 作为来源时的推荐设置 AWS DMS
<a name="CHAP_Source.SQLServer.Configuration.Settings"></a>

#### 适用于 AWS DMS 3.5.3 及更高版本
<a name="CHAP_Source.SQLServer.Configuration.Settings.353"></a>

**注意**  
对于在 DMS 版本 3.5.3 发布之后创建或修改的端点，默认启用 RDS for SQL Server 日志备份特征的初始版本。要将此特征用于现有端点，请在不进行任何更改的情况下修改端点。

AWS DMS 版本 3.5.3 引入了对从日志备份中读取的支持。DMS 主要依靠从活动事务日志中读取数据来复制事件。如果在 DMS 可以从活动日志中读取事务之前对事务进行了备份，则该任务将按需访问 RDS 备份并从后续备份日志中读取数据，直到赶上活动事务日志的步伐。为确保 DMS 可以访问日志备份，请将 RDS 自动备份保留期设置为至少一天。有关设置自动备份保留期的信息，请参阅《Amazon RDS 用户指南》**中的[备份保留期](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_ManagingAutomatedBackups.html#USER_WorkingWithAutomatedBackups.BackupRetention)。

访问日志备份的 DMS 任务会使用 RDS 实例上的存储空间。请注意，该任务仅访问复制所需的日志备份。Amazon RDS 会在几小时内删除这些下载的备份。此删除不会影响保留在 Amazon S3 中的 Amazon RDS 备份，也不会影响 Amazon RDS `RESTORE DATABASE` 功能。如果您打算使用 DMS 进行复制，建议在 RDS for SQL Server 源上分配额外的存储空间。估算所需存储量的一种方法是确定 DMS 将从哪个备份开始或恢复复制，然后使用 RDS `tlog backup` 元数据函数将所有后续备份的文件大小相加。有关 `tlog backup` 函数的更多信息，请参阅《Amazon RDS 用户指南》**中的[列出可用的事务日志备份](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER.SQLServer.AddlFeat.TransactionLogAccess.html#USER.SQLServer.AddlFeat.TransactionLogAccess.Listing)。

或者，您可以根据您的 Amazon RDS 实例的 CloudWatch `FreeStorageSpace`指标选择启用存储自动扩展和/或触发存储扩展。

强烈建议您不要从事务日志备份中太远的位置开始或恢复，因为这可能会导致 SQL Server 实例上的存储空间被填满。在这种情况下，建议启动完全加载。从事务日志备份中复制比从活动事务日志中读取要慢。有关更多信息，请参阅 [针对 RDS for SQL Server 的事务日志备份处理](CHAP_Troubleshooting_Latency_Source_SQLServer.md#CHAP_Troubleshooting_Latency_Source_SQLServer_backup)。

请注意，访问日志备份需要额外的权限。有关更多信息，请参阅[从云 SQL Server 数据库进行持续复制所需的设置权限](CHAP_Source.SQLServer.md#CHAP_Source.SQLServer.Permissions.Cloud)中的详细说明。确保在任务开始复制之前授予这些权限。

#### 适用于 AWS DMS 3.5.2 及更低版本
<a name="CHAP_Source.SQLServer.Configuration.Settings.352"></a>

当您使用 Amazon RDS for SQL Server 作为源时，MS-CDC 捕获作业依赖于参数 `maxscans` 和 `maxtrans`。这些参数控制 MS-CDC 捕获对事务日志执行的最大扫描次数以及每次扫描处理的事务数。

对于事务数大于 `maxtrans*maxscans` 的数据库，增加其 `polling_interval` 值可能会导致活动事务日志记录的累积。这种累积反过来又会导致事务日志的大小增加。

请注意，这 AWS DMS 并不依赖于 MS-CDC 捕获作业。MS-CDC 捕获作业将事务日志条目标记为已处理。这样可允许事务日志备份作业从事务日志中删除条目。

我们建议您监控事务日志的大小和 MS-CDC 作业是否成功。如果 MS-CDC 作业失败，事务日志可能会过度增长并导致 AWS DMS 复制失败。您可以使用源数据库中的 `sys.dm_cdc_errors` 动态管理视图来监控 MS-CDC 捕获作业错误。您可以使用 `DBCC SQLPERF(LOGSPACE)` 管理命令来监控事务日志的大小。

**解决由 MS-CDC 引起的事务日志增加问题**

1. 检查数据库是否 AWS DMS 正在从中复制，并验证它是否持续增加。`Log Space Used %`

   ```
   DBCC SQLPERF(LOGSPACE)
   ```

1. 确定阻碍事务日志备份过程的原因。

   ```
   Select log_reuse_wait, log_reuse_wait_desc, name from sys.databases where name = db_name();
   ```

   如果 `log_reuse_wait_desc` 值等于 `REPLICATION`，则说明日志备份保留是 MS-CDC 中的延迟所导致。

1. 通过增加 `maxtrans` 和 `maxscans` 参数值来增加捕获作业处理的事件数。

   ```
   EXEC sys.sp_cdc_change_job @job_type = 'capture' ,@maxtrans = 5000, @maxscans = 20 
   exec sp_cdc_stop_job 'capture'
   exec sp_cdc_start_job 'capture'
   ```

要解决此问题，请将`maxscans`和`maxtrans`的值设置`maxtrans*maxscans`为等于每天为从源数据库 AWS DMS 复制的表生成的事件的平均数。

如果将这些参数设置为高于建议值，则捕获作业会处理事务日志中的所有事件。如果将这些参数设置为低于建议值，则 MS-CDC 延迟会增加，事务日志也会增加。

为 `maxscans` 和 `maxtrans` 确定合适的值可能很困难，因为工作负载的变化会产生不同数量的事件。在这种情况下，建议您对 MS-CDC 延迟设置监控。有关更多信息，请参阅 SQL Server 文档中的[监控流程](https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/administer-and-monitor-change-data-capture-sql-server?view=sql-server-ver15#Monitor)。然后根据监控结果动态配置 `maxtrans` 和 `maxscans`。

如果 AWS DMS 任务找不到恢复或继续任务所需的日志序列号 (LSNs)，则该任务可能会失败并需要完全重新加载。

**注意**  
使用 AWS DMS 从 RDS for SQL Server 源复制数据时，在 Amazon RDS 实例的停止启动事件之后尝试恢复复制时，您可能会遇到错误。这是由于 SQL Server Agent 进程在停止启动事件后重新启动时，会重新启动捕获作业进程。这绕过了 MS-CDC 轮询间隔。  
因此，在事务量低于 MS-CDC 捕获作业处理的数据库上，这可能会导致数据在从停止位置恢复之前 AWS DMS 被处理或标记为已复制和备份，从而导致以下错误：  

```
[SOURCE_CAPTURE ]E: Failed to access LSN '0000dbd9:0006f9ad:0003' in the backup log sets since BACKUP/LOG-s are not available. [1020465] (sqlserver_endpoint_capture.c:764)
```
要缓解此问题，请按照之前的建议设置 `maxtrans` 和 `maxscans` 值。