

# 使用数据库活动流监控 Amazon Aurora
<a name="DBActivityStreams"></a><a name="das"></a>

通过使用数据库活动流，您可以监控近乎实时的数据库活动流。

**Topics**
+ [数据库活动流概览](#DBActivityStreams.Overview)
+ [Aurora MySQL 数据库活动流的网络先决条件](DBActivityStreams.Prereqs.md)
+ [启动数据库活动流](DBActivityStreams.Enabling.md)
+ [获取数据库活动流的状态](DBActivityStreams.Status.md)
+ [停止数据库活动流](DBActivityStreams.Disabling.md)
+ [监控数据库活动流](DBActivityStreams.Monitoring.md)
+ [数据库活动流的 IAM 策略示例](DBActivityStreams.ManagingAccess.md)

## 数据库活动流概览
<a name="DBActivityStreams.Overview"></a>

作为 Amazon Aurora 数据库管理员，您需要保障数据库的安全，并满足合规性和法规要求。一种策略是集成数据库活动流与监控工具。通过这种方式，您可以在 Amazon Aurora 集群中监控审计活动并相应设置警报。

安全威胁既可以来自外部，也可以来自内部。要防范内部威胁，您可以通过配置数据库活动流功能控制管理员对数据流的访问。 数据管理员无权收集、传输、存储和处理流。

**Contents**
+ [数据库活动流的工作原理](#DBActivityStreams.Overview.how-they-work)
+ [数据库活动流的异步和同步模式](#DBActivityStreams.Overview.sync-mode)
+ [数据库活动流的要求和限制](#DBActivityStreams.Overview.requirements)
+ [区域和版本可用性](#DBActivityStreams.Overview.Availability)
+ [数据库活动流支持的数据库实例类](#DBActivityStreams.Overview.requirements.classes)

### 数据库活动流的工作原理
<a name="DBActivityStreams.Overview.how-they-work"></a>

在 Amazon Aurora 中，您可以在集群级别启动数据库活动流。集群中的所有数据库实例都启用了数据库活动流。

您的 Aurora 数据库集群会近乎实时地将活动推送到 Amazon Kinesis 数据流。系统将自动创建 Kinesis 流。在 Kinesis 中，您可以配置 AWS 服务（如 Amazon Data Firehose）和 AWS Lambda 来使用 Kinesis 流并存储数据。

**重要**  
使用 中的 Amazon Aurora数据库活动流功能是免费的，但 Amazon Kinesis 会针对数据流收费。有关更多信息，请参阅 [Amazon Kinesis Data Streams 定价](https://aws.amazon.com/kinesis/data-streams/pricing/)。

如果您使用 Aurora 全局数据库，请分别在每个数据库集群上启动数据库活动流。每个集群在其自己的 AWS 区域中将审计数据传送到其自己的 Kinesis 流。活动流在失效转移期间不会以不同方式运行。它们继续像往常一样审计您的全局数据库。

您可以为合规性管理配置应用程序，以使用数据库活动流。对于 Aurora PostgreSQL，合规性应用程序包括 IBM 的 Security Guardium 和 Imperva 的 SecureSphere Database Audit and Protection。这些应用程序可以使用流生成警报，并审计 Aurora 数据库集群上的活动。

下图显示了使用 Amazon Data Firehose 配置的 Aurora 数据库集群。

![\[架构图显示了来自 Firehose 使用的 Aurora 数据库集群的数据库活动流\]](http://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/images/aurora-das.png)


### 数据库活动流的异步和同步模式
<a name="DBActivityStreams.Overview.sync-mode"></a>

您可以选择让数据库会话按以下任一模式处理数据库活动事件：
+ **异步模式** – 当数据库会话生成活动流事件时，会话将立即返回到正常活动。在后台，活动流事件将成为持久记录。如果后台任务出错，则将发送 RDS 事件。此事件指示活动流事件记录可能已丢失的任何时间段的开始和结束时间。

  异步模式可提高数据库性能，而不是活动流的准确性。
**注意**  
 异步模式适用于 Aurora PostgreSQL 和 Aurora MySQL。
+ **同步模式** – 当数据库会话生成活动流事件时，会话将阻止其他活动，直到该事件变为持久事件。如果因某个原因无法使事件持久，数据库会话将返回到正常活动。但会发送一个 RDS 事件来指示活动流记录可能会丢失一段时间。在系统恢复到正常运行状态后发送第二个 RDS 事件。

  同步模式可提高活动流的准确性，而不是数据库性能。
**注意**  
 同步模式适用于 Aurora PostgreSQL。您不能将同步模式用于 Aurora MySQL。

### 数据库活动流的要求和限制
<a name="DBActivityStreams.Overview.requirements"></a>

在 Aurora 中，数据库活动流具有以下要求和限制：
+ 数据库活动流需要使用 Amazon Kinesis。
+ 数据库活动流需要使用 AWS Key Management Service (AWS KMS)，因为这些活动流始终是加密的。
+ 对 Amazon Kinesis 数据流应用额外加密与数据库活动流不兼容，因为数据库活动流已使用 AWS KMS 密钥进行了加密。
+ 在数据库集群级别启动数据库活动流。如果您向集群添加数据库实例，则无需在该实例上启动活动流：它会自动进行审计。
+ 在 Aurora Global Database 中，请确保分别在每个数据库集群上启动活动流。每个集群在其自己的 AWS 区域中将审计数据传送到其自己的 Kinesis 流。
+ 在 Aurora PostgreSQL 中，确保在主要版本升级之前停止数据库活动流。升级完成后，您可以启动数据库活动流。

### 区域和版本可用性
<a name="DBActivityStreams.Overview.Availability"></a>

功能可用性和支持因每个 Aurora 数据库引擎的特定版本以及 AWS 区域而异。有关适用于 Aurora 和数据库活动流的版本和区域可用性的更多信息，请参阅 [支持数据库活动流的区域和 Aurora 数据库引擎](Concepts.Aurora_Fea_Regions_DB-eng.Feature.DBActivityStreams.md)。

### 数据库活动流支持的数据库实例类
<a name="DBActivityStreams.Overview.requirements.classes"></a>

对于 Aurora MySQL，您可以将数据库活动流与以下数据库实例类一起使用：
+ db.r8g.\$1large
+ db.r7g.\$1large
+ db.r7i.\$1large
+ db.r6g.\$1large
+ db.r6i.\$1large
+ db.r5.\$1large
+ db.x2g.\$1

对于 Aurora PostgreSQL，您可以将数据库活动流与以下数据库实例类一起使用：
+ db.r8g.\$1large
+ db.r7i.\$1large
+ db.r7g.\$1large
+ db.r6g.\$1large
+ db.r6i.\$1large
+ db.r6id.\$1large
+ db.r5.\$1large
+ db.r4.\$1large
+ db.x2g.\$1

# Aurora MySQL 数据库活动流的网络先决条件
<a name="DBActivityStreams.Prereqs"></a>

在以下部分中，您可以了解如何将 Virtual Private Cloud (VPC) 配置为与数据库活动流结合使用。

**注意**  
Aurora MySQL 网络先决条件适用于以下引擎版本：  
Aurora MySQL 版本 2，最高 2.11.3
Aurora MySQL 版本 2.12.0
Aurora MySQL 版本 3，最高 3.04.2

**Topics**
+ [AWS KMS 端点的先决条件](#DBActivityStreams.Prereqs.KMS)
+ [公开提供的先决条件](#DBActivityStreams.Prereqs.Public)
+ [私有提供的先决条件](#DBActivityStreams.Prereqs.Private)

## AWS KMS 端点的先决条件
<a name="DBActivityStreams.Prereqs.KMS"></a>

Aurora MySQL 集群中使用活动流的实例必须能够访问 AWS KMS 端点。在为 Aurora MySQL 集群启用数据库活动流之前，请确保满足此要求。如果 Aurora 集群公开可用，则会自动满足此要求。

**重要**  
如果 Aurora MySQL 数据库集群无法访问 AWS KMS 端点，活动流将停止运行。在这种情况下，Aurora 会使用 RDS 事件通知您此问题。

## 公开提供的先决条件
<a name="DBActivityStreams.Prereqs.Public"></a>

对于要设为公有的 Aurora 数据库集群，必须满足以下要求：
+ 在 AWS 管理控制台 集群详细信息页面中，**公开访问**为**是**。
+ 数据库集群在 Amazon VPC 公有子网中。有关可公开访问的数据库实例的更多信息，请参阅 [在 VPC 中使用数据库集群](USER_VPC.WorkingWithRDSInstanceinaVPC.md)。有关公有 Amazon VPC 子网的更多信息，请参阅[您的 VPC 和子网](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Subnets.html)。

## 私有提供的先决条件
<a name="DBActivityStreams.Prereqs.Private"></a>

如果您的 Aurora 数据库集群位于 VPC 公有子网中且不可公开访问，说明它是私有的。要保持集群是私有的，并将其与数据库活动流结合使用，您可使用以下选项：
+ 在 VPC 中配置网络地址转换 (NAT)。有关更多信息，请参阅 [NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)。
+ 在 VPC 中创建 AWS KMS 端点。建议使用此选项，因为它更易于配置。

**在 VPC 中创建 AWS KMS 端点**

1. 通过以下网址打开 Amazon VPC 控制台：[https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)。

1. 在导航窗格中，选择 **Endpoints**（端点）。

1. 选择 **Create Endpoint**（创建端点）。

   **创建端点**页面显示。

1. 执行以下操作：
   + 在**服务类别**中，选择 **AWS 服务**。
   + 在 **Service Name**（服务名称）中，选择 **com.amazonaws.*region*.kms**，其中 *region* 是集群所在的 AWS 区域。
   + 对于 **VPC**，选择集群所在的 VPC。

1. 选择 **Create Endpoint**（创建端点）。

有关配置 VPC 端点的更多信息，请参阅 [VPC 端点](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-endpoints.html)。

# 启动数据库活动流
<a name="DBActivityStreams.Enabling"></a>

要监控 Aurora 数据库集群所有实例的数据库活动，请在集群级别启动活动流。还将自动监控添加到集群的任何数据库实例。如果您使用 Aurora 全局数据库，请分别在每个数据库集群上启动数据库活动流。每个集群在其自己的 AWS 区域中将审计数据传送到其自己的 Kinesis 流。

在启动活动流时，在审计策略中配置的每个数据库活动事件都会生成一个活动流事件。SQL 命令（例如 `CONNECT` 和 `SELECT`）可生成访问事件。SQL 命令（例如 `CREATE` 和 `INSERT`）可生成更改事件。

------
#### [ Console ]

**要启动数据库活动流**

1. 通过以下网址打开 Amazon RDS 控制台：[https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)。

1. 在导航窗格中，选择 **Databases (数据库)**。

1. 选择要对其启动活动流的数据库集群。

1. 对于 **Actions (操作)**，选择 **Start activity stream (启动活动流)**。

   **启动数据库活动流：***名称*窗口出现，其中*名称*是您的 数据库实例。

1. 输入以下设置：
   + 对于 **AWS KMS key**，从 AWS KMS keys 列表中选择一个密钥。
**注意**  
 如果您的 Aurora MySQL 集群无法访问 KMS 密钥，请先按照 [Aurora MySQL 数据库活动流的网络先决条件](DBActivityStreams.Prereqs.md) 中的说明启用此类访问。

     Aurora 使用 KMS 密钥加密密钥，从而加密数据库活动。请选择原定设置密钥以外的 KMS 密钥。有关加密密钥和 AWS KMS 的更多信息，请参阅 *AWS Key Management Service 开发人员指南*中的[什么是 AWS Key Management Service？](https://docs.aws.amazon.com/kms/latest/developerguide/overview.html)。
   + 对于 **Database activity stream mode (数据库活动流模式)**，选择 **Asynchronous (异步)** 或 **Synchronous (同步)**。
**注意**  
此选项仅适用于 Aurora PostgreSQL。对于 Aurora MySQL，您只能使用异步模式。
   + 选择 **Immediately (立即)**。

     当您选择 **Immediately (立即)** 时，数据库集群会立即重新启动。如果选择 **During the next maintenance window (在下一维护时段内)**，数据库集群不会立即重新启动。在这种情况下，数据库活动流不会启动，直到下一个维护时段。

1. 选择 **Start database activity stream**（启动数据库活动流）。

   数据库集群的状态显示活动流正在启动。
**注意**  
如果您收到错误 `You can't start a database activity stream in this configuration`，请检查 [数据库活动流支持的数据库实例类](DBActivityStreams.md#DBActivityStreams.Overview.requirements.classes)，以了解您的数据库集群是否正在使用受支持的实例类。

------
#### [ AWS CLI ]

要为数据库集群启动数据库活动流，请使用 AWS CLI 命令 start-activity-stream 配置[数据库](https://docs.aws.amazon.com/cli/latest/reference/rds/start-activity-stream.html)。
+ `--resource-arn arn` – 指定数据库集群的 Amazon Resource Name (ARN)。
+ `--mode sync-or-async` – 指定同步 (`sync`) 或异步 (`async`) 模式。对于 Aurora PostgreSQL，您可以选择任一值。对于 Aurora MySQL，请指定 `async`。
+ `--kms-key-id key` – 指定用于加密数据库活动流中的消息的 KMS 密钥标识符。AWS KMS 密钥标识符是密钥 ARN、密钥 ID、别名 ARN 或者 AWS KMS key 的别名。

以下示例以异步模式启动数据库集群的数据库活动流。

对于 Linux、macOS 或 Unix：

```
aws rds start-activity-stream \
    --mode async \
    --kms-key-id my-kms-key-arn \
    --resource-arn my-cluster-arn \
    --apply-immediately
```

对于 Windows：

```
aws rds start-activity-stream ^
    --mode async ^
    --kms-key-id my-kms-key-arn ^
    --resource-arn my-cluster-arn ^
    --apply-immediately
```

------
#### [ Amazon RDS API ]

要为数据库集群启动数据库活动流，请使用 [StartActivityStream](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_StartActivityStream.html) 操作配置集群。

使用以下参数调用操作：
+ `Region`
+ `KmsKeyId`
+ `ResourceArn`
+ `Mode`

------

**注意**  
如果您收到一条错误，指出您无法使用当前版本的 Aurora PostgreSQL 数据库启动数据库活动流，请先应用 Aurora PostgreSQL 的最新补丁，然后再启动数据库活动流。有关升级 Aurora PostgreSQL 数据库的信息，请参阅[升级 Amazon Aurora 数据库集群](Aurora.VersionPolicy.Upgrading.md)。  
以下是用于使用 Aurora PostgreSQL 启动数据库活动流的最低补丁版本。  
3.4.15（11.9.15）、11.21.10
12.9.15、12.15.9、12.16.10、12.17.7、12.18.5、12.19.4、12.20.3、12.22.3
13.9.12、13.11.9、13.12.10、13.13.7、13.14.5、13.15.4、13.16.3、13.18.3
14.6.12、14.8.9、14.9.10、14.10.7、14.11.5、14.12.4、14.13.3、14.15.3
15.3.9、15.4.10、15.5.7、15.6.5、15.7.4、15.8.3、15.10.3
16.1.7、16.2.5、16.3.4、16.4.3、16.6.3

# 获取数据库活动流的状态
<a name="DBActivityStreams.Status"></a>

您可以使用控制台或 AWS CLI 获取活动流的状态。

## 控制台
<a name="DBActivityStreams.Status-collapsible-section-S1"></a>

**要获取数据库活动流的状态，请执行以下操作**

1. 通过以下网址打开 Amazon RDS 控制台：[https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)。

1. 在导航窗格中，选择 **Databases (数据库)**，然后选择数据库集群链接。

1. 选择 **Configuration (配置)** 选项卡，并选择 **Database activity stream (数据库活动流)** 以查看状态。

## AWS CLI
<a name="DBActivityStreams.Status-collapsible-section-S2"></a>

您可以获取数据库集群的活动流配置，作为对 [describe-db-clusters](https://docs.aws.amazon.com/cli/latest/reference/rds/describe-db-clusters.html) CLI 请求的响应。

以下示例描述 *my-cluster*。

```
aws rds --region my-region describe-db-clusters --db-cluster-identifier my-cluster
```

下面的示例介绍一个 JSON 响应。显示了以下字段：
+ `ActivityStreamKinesisStreamName`
+ `ActivityStreamKmsKeyId`
+ `ActivityStreamStatus`
+ `ActivityStreamMode`
+ 

这些字段对于 Aurora PostgreSQL 和 Aurora MySQL 是相同的，只是对于 Aurora MySQL，`ActivityStreamMode` 总是 `async`，而对于 Aurora PostgreSQL，它可能是 `sync` 或 `async`。

```
{
    "DBClusters": [
        {
      "DBClusterIdentifier": "my-cluster",
            ...
            "ActivityStreamKinesisStreamName": "aws-rds-das-cluster-A6TSYXITZCZXJHIRVFUBZ5LTWY",
            "ActivityStreamStatus": "starting",
            "ActivityStreamKmsKeyId": "12345678-abcd-efgh-ijkl-bd041f170262",
            "ActivityStreamMode": "async",
            "DbClusterResourceId": "cluster-ABCD123456"
            ...
        }
    ]
}
```

## RDS API
<a name="DBActivityStreams.Status-collapsible-section-S3"></a>

您可以获取数据库集群的活动流配置，作为对 [DescribeDBClusters](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_DescribeDBClusters.html) 操作的响应。

# 停止数据库活动流
<a name="DBActivityStreams.Disabling"></a>

您可以使用控制台或 AWS CLI 停止活动流。

如果您删除数据库集群，活动流将停止，并且会自动删除底层 Amazon Kinesis 流。

## 控制台
<a name="DBActivityStreams.Disabling-collapsible-section-D1"></a>

**关闭活动流**

1. 通过以下网址打开 Amazon RDS 控制台：[https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)。

1. 在导航窗格中，选择 **Databases (数据库)**。

1. 选择要为其停止数据库活动流的数据库集群。

1. 对于 **Actions (操作)**，选择 **Stop activity stream (停止活动流)**。此时将显示 **Database Activity Stream (数据库活动流)** 窗口。

   1. 选择 **Immediately (立即)**。

      当您选择 **Immediately (立即)** 时，数据库集群会立即重新启动。如果选择 **During the next maintenance window (在下一维护时段内)**，数据库集群不会立即重新启动。在这种情况下，数据库活动流直到下一个维护时段才会停止。

   1. 选择 **Continue (继续)**。

## AWS CLI
<a name="DBActivityStreams.Disabling-collapsible-section-D2"></a>

要为数据库集群停止数据库活动流，请使用 AWS CLI 命令 [stop-activity-stream](https://docs.aws.amazon.com/cli/latest/reference/rds/stop-activity-stream.html) 配置数据库集群。使用 `--region` 参数标识数据库集群的AWS区域。`--apply-immediately` 参数是可选的。

对于 Linux、macOS 或 Unix：

```
aws rds --region MY_REGION \
    stop-activity-stream \
    --resource-arn MY_CLUSTER_ARN \
    --apply-immediately
```

对于 Windows：

```
aws rds --region MY_REGION ^
    stop-activity-stream ^
    --resource-arn MY_CLUSTER_ARN ^
    --apply-immediately
```

## RDS API
<a name="DBActivityStreams.Disabling-collapsible-section-D3"></a>

要为数据库集群停止数据库活动流，请使用 [StopActivityStream](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_StopActivityStream.html) 操作配置集群。使用 `Region` 参数标识数据库集群的AWS区域。`ApplyImmediately` 参数是可选的。

# 监控数据库活动流
<a name="DBActivityStreams.Monitoring"></a>

数据库活动流监控并报告活动。收集活动流并将其传输到 Amazon Kinesis。从 Kinesis 中，您可以监控活动流，或者其他服务和应用程序可以使用活动流进行进一步分析。您可以使用 AWS CLI 命令 `describe-db-clusters` 或 RDS API `DescribeDBClusters` 操作查找底层 Kinesis 流名称。

Aurora 为您管理 Kinesis 流，如下所示：
+ Aurora 自动创建具有 24 小时保留期的 Kinesis 流。
+  如有必要，Aurora 会扩展 Kinesis 流。
+  如果停止了数据库活动流或删除了数据库集群，Aurora 将删除 Kinesis 流。

监控以下类别的活动并将其放入活动流审核日志中：
+ **SQL 命令** – 将审计所有 SQL 命令，以及预编译语句、内置函数和采用 PL/SQL 的函数。将审核对存储过程的调用。还将审核在存储过程或函数中发出的任何 SQL 语句。
+ **其他数据库信息** – 受监控的活动包括完整的 SQL 语句、来自 DML 命令的受影响行的行数、访问的对象以及唯一的数据库名称。对于 Aurora PostgreSQL，数据库活动流还监控绑定变量和存储过程参数。
**重要**  
每个语句的完整 SQL 文本在活动流审核日志中可见，包括任何敏感数据。但是，如果 Aurora 可以从上下文中确定数据库用户密码，则会对该密码进行修订，如下面的 SQL 语句所示。  

  ```
  ALTER ROLE role-name WITH password
  ```
+ **连接信息** – 受监控的活动包括会话和网络信息、服务器进程 ID 和退出代码。

如果在监控数据库实例时活动流发生故障，则会使用 RDS 事件通知您。

在以下各节中，您可以访问、审计和处理数据库活动流。

**Topics**
+ [从 Amazon Kinesis 访问活动流](DBActivityStreams.KinesisAccess.md)
+ [数据库活动流的审计日志内容和示例](DBActivityStreams.AuditLog.md)
+ [数据库活动流的 databaseActivityEventList JSON 数组](DBActivityStreams.AuditLog.databaseActivityEventList.md)
+ [使用 AWS 开发工具包处理数据库活动流](DBActivityStreams.CodeExample.md)

# 从 Amazon Kinesis 访问活动流
<a name="DBActivityStreams.KinesisAccess"></a>

在为数据库集群启用活动流时，将会为您创建一个 Kinesis 流。您可以从 Kinesis 实时监控数据库活动。要进一步分析数据库活动，您可以将 Kinesis 流连接到使用者应用程序。您还可以将流连接到合规性管理应用程序，例如 IBM 的 Security Guardium 或 Imperva 的 SecureSphere Database Audit and Protection。

您可以从 RDS 控制台或 Kinesis 控制台访问您的 Kinesis 流。

**使用 RDS 控制台从 Kinesis 中访问活动流**

1. 通过以下网址打开 Amazon RDS 控制台：[https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)。

1. 在导航窗格中，选择 **Databases**（数据库）。

1. 选择已启动活动流的数据库集群。

1. 选择**配置**。

1. 在 **Database activity stream**（数据库活动流）下，选择 **Kinesis stream**（Kinesis 流）下的链接。

1. 在 Kinesis 控制台中，选择 **Monitoring**（监控）以开始观察数据库活动。

**使用 Kinesis 控制台从 Kinesis 中访问活动流**

1. 打开 Kinesis 控制台，网址为：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 从 Kinesis 流列表中选择您的活动流。

   活动流的名称包含前缀 `aws-rds-das-cluster-`，后跟数据库集群的资源 ID。以下是示例。

   ```
   aws-rds-das-cluster-NHVOV4PCLWHGF52NP
   ```

   要使用 Amazon RDS 控制台查找数据库集群的资源 ID，请从数据库列表中选择数据库集群，然后选择 **Configuration**（配置）选项卡。

   要使用 AWS CLI 查找活动流的完整 Kinesis 流名称，请使用 [describe-db-clusters](https://docs.aws.amazon.com/cli/latest/reference/rds/describe-db-clusters.html) CLI 请求并记下响应中的 `ActivityStreamKinesisStreamName` 值。

1. 选择**监控**以开始观察数据库活动。

有关使用 Amazon Kinesis 的更多信息，请参阅[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)

# 数据库活动流的审计日志内容和示例
<a name="DBActivityStreams.AuditLog"></a>

受监控的事件在数据库活动流中表示为 JSON 字符串。结构包含一个 JSON 对象，该对象包含一个 `DatabaseActivityMonitoringRecord`，后者反过来包含活动事件的 `databaseActivityEventList` 阵列。

**注意**  
对于数据库活动流，`paramList` JSON 数组不包含来自 Hibernate 应用程序的空值。

**Topics**
+ [活动流的审计日志示例](#DBActivityStreams.AuditLog.Examples)
+ [DatabaseActivityMonitoringRecords JSON 对象](#DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords)
+ [databaseActivityEvents JSON 对象](#DBActivityStreams.AuditLog.databaseActivityEvents)

## 活动流的审计日志示例
<a name="DBActivityStreams.AuditLog.Examples"></a>

以下是活动事件记录的已解密 JSON 审核日志示例。

**Example Aurora PostgreSQLCONNECT SQL 语句的活动事件记录**  
以下活动事件记录显示 psql 客户端（`clientApplication`）使用 `CONNECT` SQL 语句（`command`）进行的登录。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-10-30 00:39:49.940668+00",
          "logTime": "2019-10-30 00:39:49.990579+00",
          "statementId": 1,
          "substatementId": 1,
          "objectType": null,
          "command": "CONNECT",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "49804",
          "sessionId": "5ce5f7f0.474b",
          "rowCount": null,
          "commandText": null,
          "paramList": [],
          "pid": 18251,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "MISC",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Aurora MySQL CONNECT SQL 语句的活动事件记录**  
以下活动事件记录显示 mysql 客户端（`clientApplication`）使用 `CONNECT` SQL 语句（`command`）进行的登录。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:13.267214+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"rdsadmin",
      "databaseName":"",
      "remoteHost":"localhost",
      "remotePort":"11053",
      "command":"CONNECT",
      "commandText":"",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"",
      "statementId":0,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725121",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:13.267207+00",
      "endTime":"2020-05-22 18:07:13.267213+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```

**Example Aurora PostgreSQL的活动事件记录**  
以下示例显示 Aurora PostgreSQL的 `CREATE TABLE` 事件。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:36:54.403455+00",
          "logTime": "2019-05-24 00:36:54.494235+00",
          "statementId": 2,
          "substatementId": 1,
          "objectType": null,
          "command": "CREATE TABLE",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": null,
          "commandText": "create table my_table (id serial primary key, name varchar(32));",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "DDL",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Aurora MySQL CREATE TABLE 语句的活动事件记录**  
以下示例显示 Aurora MySQL 的 `CREATE TABLE` 语句。操作表示为两个单独的事件记录。一个事件具有 `"class":"MAIN"`。另一个事件具有 `"class":"AUX"`。消息可能按任何顺序到达。`logTime` 事件的 `MAIN` 字段始终早于任何对应 `logTime` 事件的 `AUX` 字段。  
以下示例显示 `class` 值为 `MAIN` 的事件。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.250221+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"CREATE TABLE test1 (id INT)",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.250222+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 以下示例显示 `class` 值为 `AUX` 的相应事件。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.247182+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"CREATE",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.247182+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

**Example Aurora PostgreSQL的活动事件记录**  
以下示例显示 `SELECT` 事件。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:39:49.920564+00",
          "logTime": "2019-05-24 00:39:49.940668+00",
          "statementId": 6,
          "substatementId": 1,
          "objectType": "TABLE",
          "command": "SELECT",
          "objectName": "public.my_table",
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": 10,
          "commandText": "select * from my_table;",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "READ",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

```
{
    "type": "DatabaseActivityMonitoringRecord",
    "clusterId": "",
    "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q",
    "databaseActivityEventList": [
        {
            "class": "TABLE",
            "clientApplication": "Microsoft SQL Server Management Studio - Query",
            "command": "SELECT",
            "commandText": "select * from [testDB].[dbo].[TestTable]",
            "databaseName": "testDB",
            "dbProtocol": "SQLSERVER",
            "dbUserName": "test",
            "endTime": null,
            "errorMessage": null,
            "exitCode": 1,
            "logTime": "2022-10-06 21:24:59.9422268+00",
            "netProtocol": null,
            "objectName": "TestTable",
            "objectType": "TABLE",
            "paramList": null,
            "pid": null,
            "remoteHost": "local machine",
            "remotePort": null,
            "rowCount": 0,
            "serverHost": "172.31.30.159",
            "serverType": "SQLSERVER",
            "serverVersion": "15.00.4073.23.v1.R1",
            "serviceName": "sqlserver-ee",
            "sessionId": 62,
            "startTime": null,
            "statementId": "0x03baed90412f564fad640ebe51f89b99",
            "substatementId": 1,
            "transactionId": "4532935",
            "type": "record",
            "engineNativeAuditFields": {
                "target_database_principal_id": 0,
                "target_server_principal_id": 0,
                "target_database_principal_name": "",
                "server_principal_id": 2,
                "user_defined_information": "",
                "response_rows": 0,
                "database_principal_name": "dbo",
                "target_server_principal_name": "",
                "schema_name": "dbo",
                "is_column_permission": true,
                "object_id": 581577110,
                "server_instance_name": "EC2AMAZ-NFUJJNO",
                "target_server_principal_sid": null,
                "additional_information": "",
                "duration_milliseconds": 0,
                "permission_bitmask": "0x00000000000000000000000000000001",
                "data_sensitivity_information": "",
                "session_server_principal_name": "test",
                "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109",
                "audit_schema_version": 1,
                "database_principal_id": 1,
                "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000",
                "user_defined_event_id": 0,
                "host_name": "EC2AMAZ-NFUJJNO"
            }
        }
    ]
}
```

**Example Aurora MySQL SELECT 语句的活动事件记录**  
以下示例显示 `SELECT` 事件。  
 以下示例显示 `class` 值为 `MAIN` 的事件。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986467+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"SELECT * FROM test1 WHERE id < 28",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"726571",
      "rowCount":2,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986467+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 以下示例显示 `class` 值为 `AUX` 的相应事件。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986399+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"READ",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"726571",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986399+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

## DatabaseActivityMonitoringRecords JSON 对象
<a name="DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords"></a>

数据库活动事件记录位于包含以下信息的 JSON 对象中。


****  

| JSON 字段 | 数据类型 | 描述 | 
| --- | --- | --- | 
|  `type`  | 字符串 |  JSON 记录的类型。值为 `DatabaseActivityMonitoringRecords`。  | 
| version | 字符串 |  数据库活动监控记录的版本。所生成数据库活动记录的版本取决于数据库集群的引擎版本： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.html)除非另有注明，否则版本 1.0 和版本 1.1 中均包含以下所有字段。 | 
|  [databaseActivityEvents](#DBActivityStreams.AuditLog.databaseActivityEvents)  | 字符串 |  包含活动事件的 JSON 对象。  | 
| 键 | 字符串 | 用于解密 [databaseActivityEventList JSON 数组](DBActivityStreams.AuditLog.databaseActivityEventList.md) 的加密密钥  | 

## databaseActivityEvents JSON 对象
<a name="DBActivityStreams.AuditLog.databaseActivityEvents"></a>

`databaseActivityEvents` JSON 对象包含以下信息。

### JSON 记录中的顶级字段
<a name="DBActivityStreams.AuditLog.topLevel"></a>

 审核日志中的每个事件都包装在 JSON 格式的记录中。此记录包含以下字段。

**类型**  
 此字段始终具有值 `DatabaseActivityMonitoringRecords`。

**版本**  
 此字段表示数据库活动流数据协议或合同的版本。它定义了哪些字段可用。  
版本 1.0 表示对 Aurora PostgreSQL 10.7 和 11.4 版的原始数据活动流支持。版本 1.1 表示对 Aurora PostgreSQL 10.10 版及更高版本和 Aurora PostgreSQL 11.5 版及更高版本的数据活动流支持。版本 1.1 包括其他字段 `errorMessage` 和 `startTime`。版本 1.2 表示对 Aurora MySQL 2.08 及更高版本的数据活动流支持。版本 1.2 包括其他字段 `endTime` 和 `transactionId`。

**databaseActivityEvents**  
 表示一个或多个活动事件的加密字符串。它表示为 base64 字节数组。解密字符串时，结果是 JSON 格式的记录，其中包含字段，如本节中的示例所示。

**key**  
 用于加密 `databaseActivityEvents` 字符串的加密数据密钥。这与您在启动数据库活动流时提供的 AWS KMS key 密钥相同。

 以下示例显示了此记录的格式。

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents":"encrypted audit records",
  "key":"encrypted key"
}
```

执行以下步骤来解密 `databaseActivityEvents` 字段的内容：

1.  使用您在启动数据库活动流时提供的 KMS 密钥解密 `key` JSON 字段中的值。这样做将以明文形式返回数据加密密钥。

1.  对 `databaseActivityEvents` JSON 字段中的值进行 Base64 解码，以获取审核负载的二进制格式的密文。

1.  使用您在第一步中解码的数据加密密钥解密二进制密文。

1.  解压解已解密的负载。
   +  已加密的负载在 `databaseActivityEvents` 字段中。
   +  `databaseActivityEventList` 字段包含审核记录数组。此数组中的 `type` 字段可以是 `record` 或 `heartbeat`。

审核日志活动事件记录是包含以下信息的 JSON 对象。


****  

| JSON 字段 | 数据类型 | 描述 | 
| --- | --- | --- | 
|  `type`  | 字符串 |  JSON 记录的类型。值为 `DatabaseActivityMonitoringRecord`。  | 
| clusterId | 字符串 | 数据库集群资源标识符。它对应于数据库集群属性 DbClusterResourceId。 | 
| instanceId | 字符串 | 数据库实例资源标识符。它对应于数据库实例属性 DbiResourceId。 | 
|  [databaseActivityEventList JSON 数组](DBActivityStreams.AuditLog.databaseActivityEventList.md)   | 字符串 |  活动审核记录或检测信号消息的数组。  | 

# 数据库活动流的 databaseActivityEventList JSON 数组
<a name="DBActivityStreams.AuditLog.databaseActivityEventList"></a>

审核日志负载是解密的 `databaseActivityEventList` JSON 数组。以下表列表按字母顺序列出了审计日志的解密 `DatabaseActivityEventList` 数组中每个活动事件的字段。这些字段会因您使用的是 Aurora PostgreSQL 还是 Aurora MySQL 而有所不同。请参阅适用于您的数据库引擎的表。

**重要**  
事件结构可能会发生变化。Aurora 可能会将新字段添加到未来的活动事件中。在解析 JSON 数据的应用程序中，请确保您的代码可以忽略未知字段名称或对其采取适当操作。

## Aurora PostgreSQL 的 databaseActivityEventList 字段
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.apg"></a>

以下是 Aurora PostgreSQL 的 `databaseActivityEventList` 字段。


| 字段 | 数据类型 | 描述 | 
| --- | --- | --- | 
| class | 字符串 |  活动事件的类。Aurora PostgreSQL 的有效值如下所示： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | 字符串 | 客户端报告的其用于连接的应用程序。由于客户端不必提供此信息，因此值可以为 null。 | 
| command | 字符串 | 不带任何命令详细信息的 SQL 命令的名称。 | 
| commandText | 字符串 |  用户传入的实际 SQL 语句。对于 Aurora PostgreSQL，该值与原始 SQL 语句相同。此字段用于除连接或断开连接记录之外的所有类型的记录，在这种情况下，该值为 null。  每个语句的完整 SQL 文本在活动流审核日志中可见，包括任何敏感数据。但是，如果 Aurora 可以从上下文中确定数据库用户密码，则会对该密码进行修订，如下面的 SQL 语句所示。 <pre>ALTER ROLE role-name WITH password</pre>   | 
| databaseName | 字符串 | 用户连接到的数据库。 | 
| dbProtocol | 字符串 | 数据库协议，例如 Postgres 3.0。 | 
| dbUserName | 字符串 | 客户端对其进行身份验证的数据库用户。 | 
| errorMessage（仅版本 1.1 数据库活动记录） | 字符串 |  如果出现任何错误，则使用数据库服务器生成的错误消息填充此字段。对于未导致错误的普通语句，`errorMessage` 值为 null。 错误定义为生成客户端可见 PostgreSQL 错误日志事件（其严重性级别为 `ERROR` 或更高）的任何活动。有关更多信息，请参阅 [PostgreSQL 消息严重性级别](https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-SEVERITY-LEVELS)。例如，语法错误和查询取消会生成错误消息。 内部 PostgreSQL 服务器错误（例如后台检查指针进程错误）不会生成错误消息。但是，无论如何设置日志严重性级别，此类事件的记录仍会发出。这样可防止攻击者关闭日志记录以尝试避开检测。 另请参阅 `exitCode` 字段。  | 
| exitCode | int | 用于会话退出记录的值。在干净的出口，这包含退出代码。在某些故障场景中，无法始终获得退出代码。例如，如果 PostgreSQL 执行 exit() 或操作者执行 kill -9 等命令。如果存在任何错误，则 `exitCode` 字段将显示 SQL 错误代码 `SQLSTATE`，如 [PostgreSQL 错误代码](https://www.postgresql.org/docs/current/errcodes-appendix.html)中所列。另请参阅 `errorMessage` 字段。 | 
| logTime | 字符串 | 审核代码路径中记录的时间戳。这表示 SQL 语句执行结束时间。另请参阅 startTime 字段。 | 
| netProtocol | 字符串 | 网络通信协议。 | 
| objectName | 字符串 | 数据库对象的名称（如果正在对一个数据库对象运行 SQL 语句）。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句，则此值为 null。 | 
| objectType | 字符串 | 数据库对象类型，例如表、索引、视图等。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句，则此值为 null。包括下列有效值：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) | 
| paramList | 字符串 | 传递给 SQL 语句的逗号分隔的参数数组。如果 SQL 语句没有参数，则此值为空数组。 | 
| pid | int | 为向客户端连接提供服务而分配的后端进程的进程 ID。 | 
| remoteHost | 字符串 | 客户端 IP 地址或主机名。对于 Aurora PostgreSQL，使用哪一个取决于数据库的 log\$1hostname 参数设置。remoteHost 值还包括指示活动来自 rdsadmin 用户的 [local] 和 localhost。 | 
| remotePort | 字符串 | 客户端的端口号。 | 
| rowCount | int | SQL 语句所影响或检索的表行的数目。此字段仅用于作为数据操作语言 (DML) 语句的 SQL 语句。如果 SQL 语句不是 DML 语句，则此值为 null。 | 
| serverHost | 字符串 | 数据库服务器主机 IP 地址。serverHost 值还包括指示活动来自 rdsadmin 用户的 [local] 和 localhost。 | 
| serverType | 字符串 | 数据库服务器类型，例如 PostgreSQL。 | 
| serverVersion | 字符串 | 数据库服务器版本，例如对于 Aurora PostgreSQL 为 2.3.1。 | 
| serviceName | 字符串 | 服务名称，例如 Amazon Aurora PostgreSQL-Compatible edition。 | 
| sessionId | int | 伪唯一会话标识符。 | 
| sessionId | int | 伪唯一会话标识符。 | 
| startTime（仅版本 1.1 数据库活动记录） | 字符串 |  SQL 语句开始执行的时间。 要计算 SQL 语句的近似执行时间，请使用 `logTime - startTime`。另请参阅 `logTime` 字段。  | 
| statementId | int | 客户端的 SQL 语句的标识符。计数器处于会话级别，并随客户端输入的每个 SQL 语句递增。 | 
| substatementId | int | SQL 子语句的标识符。此值计算 statementId 字段标识的每个 SQL 语句的包含的子语句。 | 
| type | 字符串 | 事件类型。有效值为 record 或 heartbeat。 | 

## Aurora MySQL 的 databaseActivityEventList 字段
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.ams"></a>

以下是 Aurora MySQL 的 `databaseActivityEventList` 字段。


| 字段 | 数据类型 | 描述 | 
| --- | --- | --- | 
| class | 字符串 |  活动事件的类。 Aurora MySQL 的有效值如下所示： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | 字符串 | 客户端报告的其用于连接的应用程序。由于客户端不必提供此信息，因此值可以为 null。 | 
| command | 字符串 |  SQL 语句的常规类别。此字段的值取决于 `class` 的值。 `class` 为 `MAIN` 时的值包括以下值： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) `class` 为 `AUX` 时的值包括以下值： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| commandText | 字符串 |  对于 `class` 值为 `MAIN` 的事件，此字段表示用户传入的实际 SQL 语句。此字段用于除连接或断开连接记录之外的所有类型的记录，在这种情况下，该值为 null。  对于 `class` 值为 `AUX` 的事件，此字段包含有关事件中涉及的对象的补充信息。 对于 Aurora MySQL，字符（如引号）前面有反斜杠，表示转义字符。  每个语句的完整 SQL 文本在审核日志中可见，包括任何敏感数据。但是，如果 Aurora 可以从上下文中确定数据库用户密码，则会对该密码进行修订，如下面的 SQL 语句所示。 <pre>mysql> SET PASSWORD = 'my-password';</pre> 作为安全最佳实践，请指定除此处所示提示以外的密码。   | 
| databaseName | 字符串 | 用户连接到的数据库。 | 
| dbProtocol | 字符串 | 数据库协议。目前，对于 Aurora MySQL，此值始终为 MySQL。 | 
| dbUserName | 字符串 | 客户端对其进行身份验证的数据库用户。 | 
| endTime（仅版本 1.2 数据库活动记录） | 字符串 |  SQL 语句执行结束的时间。它以协调世界时 (UTC) 格式表示。 要计算 SQL 语句的执行时间，请使用 `endTime - startTime`。另请参阅 `startTime` 字段。  | 
| errorMessage（仅版本 1.1 数据库活动记录） | 字符串 |  如果出现任何错误，则使用数据库服务器生成的错误消息填充此字段。对于未导致错误的普通语句，`errorMessage` 值为 null。 错误定义为生成客户端可见 MySQL 错误日志事件（其严重性级别为 `ERROR` 或更高）的任何活动。有关更多信息，请参阅 *MySQL 参考手册*中的[错误日志](https://dev.mysql.com/doc/refman/5.7/en/error-log.html)。例如，语法错误和查询取消会生成错误消息。 内部 MySQL 服务器错误（例如后台检查指针进程错误）不会生成错误消息。但是，无论如何设置日志严重性级别，此类事件的记录仍会发出。这样可防止攻击者关闭日志记录以尝试避开检测。 另请参阅 `exitCode` 字段。  | 
| exitCode | int | 用于会话退出记录的值。在干净的出口，这包含退出代码。在某些故障场景中，无法始终获得退出代码。在此类情况下，此值可能为零，也可能为空。 | 
| logTime | 字符串 | 审核代码路径中记录的时间戳。它以协调世界时 (UTC) 格式表示。有关计算语句持续时间的最准确方法，请参阅 startTime 和 endTime 字段。 | 
| netProtocol | 字符串 | 网络通信协议。目前，对于 Aurora MySQL，此值始终为 TCP。 | 
| objectName | 字符串 | 数据库对象的名称（如果正在对一个数据库对象运行 SQL 语句）。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句，则此值为空。要构造对象的完全限定名称，请将 databaseName 和 objectName 组合起来。如果查询涉及多个对象，则此字段可以是名称的逗号分隔列表。 | 
| objectType | 字符串 |  数据库对象类型，例如表、索引等。此字段仅在对数据库对象运行 SQL 语句时使用。如果未对一个对象运行 SQL 语句，则此值为 null。 Aurora MySQL 的有效值包括： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| paramList | 字符串 | 此字段不用于 Aurora MySQL 且始终为 null。 | 
| pid | int | 为向客户端连接提供服务而分配的后端进程的进程 ID。当数据库服务器重新启动时，pid 会发生更改，并且 statementId 字段的计数器会重新开始。 | 
| remoteHost | 字符串 | 发出 SQL 语句的客户端的 IP 地址或主机名。对于 Aurora MySQL，使用哪一个取决于数据库的 skip\$1name\$1resolve 参数设置。值 localhost 指示来自 rdsadmin 特殊用户的活动。 | 
| remotePort | 字符串 | 客户端的端口号。 | 
| rowCount | int | SQL 语句返回的行数。例如，如果 SELECT 语句返回 10 行，则 rowCount 为 10。对于 INSERT 或 UPDATE 语句，则 rowCount 为 0。 | 
| serverHost | 字符串 | 数据库服务器实例标识符。 | 
| serverType | 字符串 | 数据库服务器类型，例如 MySQL。 | 
| serverVersion | 字符串 | 数据库服务器版本。目前，对于 Aurora MySQL，此值始终为 MySQL 5.7.12。 | 
| serviceName | 字符串 | 服务的名称。目前，对于 Aurora MySQL，此值始终为 Amazon Aurora MySQL。 | 
| sessionId | int | 伪唯一会话标识符。 | 
| startTime（仅版本 1.1 数据库活动记录） | 字符串 |  SQL 语句开始执行的时间。它以协调世界时 (UTC) 格式表示。 要计算 SQL 语句的执行时间，请使用 `endTime - startTime`。另请参阅 `endTime` 字段。  | 
| statementId | int | 客户端的 SQL 语句的标识符。计数器随客户端输入的每个 SQL 语句递增。在重新启动数据库实例时，将重置计数器。 | 
| substatementId | int | SQL 子语句的标识符。对于类为 MAIN 的事件，此值为 1；对于类为 AUX 的事件，此值为 2。使用 statementId 字段标识同一语句生成的所有事件。 | 
| transactionId（仅版本 1.2 数据库活动记录） | int | 事务的标识符。 | 
| type | 字符串 | 事件类型。有效值为 record 或 heartbeat。 | 

# 使用 AWS 开发工具包处理数据库活动流
<a name="DBActivityStreams.CodeExample"></a>

您可以使用 AWS 开发工具包以编程方式处理活动流。以下功能完善的 Java 和 Python 示例是关于可能会如何处理 Kinesis 数据流。

------
#### [ Java ]

```
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.Security;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;

import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.SecretKeySpec;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CryptoInputStream;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kms.AWSKMS;
import com.amazonaws.services.kms.AWSKMSClientBuilder;
import com.amazonaws.services.kms.model.DecryptRequest;
import com.amazonaws.services.kms.model.DecryptResult;
import com.amazonaws.util.Base64;
import com.amazonaws.util.IOUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

public class DemoConsumer {

    private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]";
    private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking
    private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]";
    private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]";
    private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]";
    private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2...
    private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY);
    private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS);

    private static final AwsCrypto CRYPTO = new AwsCrypto();
    private static final AWSKMS KMS = AWSKMSClientBuilder.standard()
            .withRegion(REGION_NAME)
            .withCredentials(CREDENTIALS_PROVIDER).build();

    class Activity {
        String type;
        String version;
        String databaseActivityEvents;
        String key;
    }

    class ActivityEvent {
        @SerializedName("class") String _class;
        String clientApplication;
        String command;
        String commandText;
        String databaseName;
        String dbProtocol;
        String dbUserName;
        String endTime;
        String errorMessage;
        String exitCode;
        String logTime;
        String netProtocol;
        String objectName;
        String objectType;
        List<String> paramList;
        String pid;
        String remoteHost;
        String remotePort;
        String rowCount;
        String serverHost;
        String serverType;
        String serverVersion;
        String serviceName;
        String sessionId;
        String startTime;
        String statementId;
        String substatementId;
        String transactionId;
        String type;
    }

    class ActivityRecords {
        String type;
        String clusterId;
        String instanceId;
        List<ActivityEvent> databaseActivityEventList;
    }

    static class RecordProcessorFactory implements IRecordProcessorFactory {
        @Override
        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }

    static class RecordProcessor implements IRecordProcessor {

        private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
        private static final int PROCESSING_RETRIES_MAX = 10;
        private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
        private static final Gson GSON = new GsonBuilder().serializeNulls().create();

        private static final Cipher CIPHER;
        static {
            Security.insertProviderAt(new BouncyCastleProvider(), 1);
            try {
                CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC");
            } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        private long nextCheckpointTimeInMillis;

        @Override
        public void initialize(String shardId) {
        }

        @Override
        public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) {
            for (final Record record : records) {
                processSingleBlob(record.getData());
            }

            if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
                checkpoint(checkpointer);
                nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
            }
        }

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            if (reason == ShutdownReason.TERMINATE) {
                checkpoint(checkpointer);
            }
        }

        private void processSingleBlob(final ByteBuffer bytes) {
            try {
                // JSON $Activity
                final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class);

                // Base64.Decode
                final byte[] decoded = Base64.decode(activity.databaseActivityEvents);
                final byte[] decodedDataKey = Base64.decode(activity.key);

                Map<String, String> context = new HashMap<>();
                context.put("aws:rds:dbc-id", DBC_RESOURCE_ID);

                // Decrypt
                final DecryptRequest decryptRequest = new DecryptRequest()
                        .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context);
                final DecryptResult decryptResult = KMS.decrypt(decryptRequest);
                final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext()));

                // GZip Decompress
                final byte[] decompressed = decompress(decrypted);
                // JSON $ActivityRecords
                final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class);

                // Iterate throught $ActivityEvents
                for (final ActivityEvent event : activityRecords.databaseActivityEventList) {
                    System.out.println(GSON.toJson(event));
                }
            } catch (Exception e) {
                // Handle error.
                e.printStackTrace();
            }
        }

        private static byte[] decompress(final byte[] src) throws IOException {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
            GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
            return IOUtils.toByteArray(gzipInputStream);
        }

        private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
            for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) {
                try {
                    checkpointer.checkpoint();
                    break;
                } catch (ShutdownException se) {
                    // Ignore checkpoint if the processor instance has been shutdown (fail over).
                    System.out.println("Caught shutdown exception, skipping checkpoint." + se);
                    break;
                } catch (ThrottlingException e) {
                    // Backoff and re-attempt checkpoint upon transient failures
                    if (i >= (PROCESSING_RETRIES_MAX - 1)) {
                        System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e);
                        break;
                    } else {
                        System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e);
                    }
                } catch (InvalidStateException e) {
                    // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
                    System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e);
                    break;
                }
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    System.out.println("Interrupted sleep" + e);
                }
            }
        }
    }

    private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException {
        // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm
        final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"),
                "BC", "DataKey", "AES/GCM/NoPadding");
        try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded));
             final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
            IOUtils.copy(decryptingStream, out);
            return out.toByteArray();
        }
    }

    public static void main(String[] args) throws Exception {
        final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        final KinesisClientLibConfiguration kinesisClientLibConfiguration =
                new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId);
        kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST);
        kinesisClientLibConfiguration.withRegionName(REGION_NAME);
        final Worker worker = new Builder()
                .recordProcessorFactory(new RecordProcessorFactory())
                .config(kinesisClientLibConfiguration)
                .build();

        System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId);

        try {
            worker.run();
        } catch (Throwable t) {
            System.err.println("Caught throwable while processing data.");
            t.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }

    private static byte[] getByteArray(final ByteBuffer b) {
        byte[] byteArray = new byte[b.remaining()];
        b.get(byteArray);
        return byteArray;
    }
}
```

------
#### [ Python ]

```
import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3

REGION_NAME = '<region>'                    # us-east-1
RESOURCE_ID = '<external-resource-id>'      # cluster-ABCD123456
STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID  # aws-rds-das-cluster-ABCD123456

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT)

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt_payload(payload, data_key):
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    decrypted_plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
    return decrypted_plaintext


def decrypt_decompress(payload, key):
    decrypted = decrypt_payload(payload, key)
    return zlib.decompress(decrypted, zlib.MAX_WBITS + 16)


def main():
    session = boto3.session.Session()
    kms = session.client('kms', region_name=REGION_NAME)
    kinesis = session.client('kinesis', region_name=REGION_NAME)

    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    shard_iters = []
    for shard in response['StreamDescription']['Shards']:
        shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'],
                                                         ShardIteratorType='LATEST')
        shard_iters.append(shard_iter_response['ShardIterator'])

    while len(shard_iters) > 0:
        next_shard_iters = []
        for shard_iter in shard_iters:
            response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000)
            for record in response['Records']:
                record_data = record['Data']
                record_data = json.loads(record_data)
                payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
                data_key_decoded = base64.b64decode(record_data['key'])
                data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded,
                                                      EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
                print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']))
            if 'NextShardIterator' in response:
                next_shard_iters.append(response['NextShardIterator'])
        shard_iters = next_shard_iters


if __name__ == '__main__':
    main()
```

------

# 数据库活动流的 IAM 策略示例
<a name="DBActivityStreams.ManagingAccess"></a>

具有数据库活动流的相应 AWS Identity and Access Management（IAM）角色权限的任何用户均可以创建、启动、停止和修改数据库集群的活动流设置。这些操作包含在流的审核日志中。对于最佳合规性实践，我们建议您不要向 DBA 提供这些权限。

您可以使用 IAM 策略设置对数据库活动流的访问权限。有关 Aurora 身份验证的更多信息，请参阅[Amazon Aurora 的 Identity and Access Management](UsingWithRDS.IAM.md)。有关创建 IAM 策略的更多信息，请参阅 [创建和使用适用于 IAM 数据库访问的 IAM 策略](UsingWithRDS.IAMDBAuth.IAMPolicy.md)。

**Example 允许配置数据库活动流的策略**  
要为用户提供精细访问权限以修改活动流，请在 IAM 策略中使用服务特定的操作上下文密钥 `rds:StartActivityStream` 和 `rds:StopActivityStream`。以下 IAM 策略示例允许用户或角色配置活动流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "ConfigureActivityStreams",
            "Effect": "Allow",
            "Action": [
                "rds:StartActivityStream",
                "rds:StopActivityStream"
            ],
            "Resource": "*"
        }
    ]
}
```

**Example 允许启动数据库活动流的策略**  
以下 IAM 策略示例允许用户或角色启动活动流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Sid":"AllowStartActivityStreams",
            "Effect":"Allow",
            "Action":"rds:StartActivityStream",
            "Resource":"*"
        }
    ]
}
```

**Example 允许停止数据库活动流的策略**  
以下 IAM 策略示例允许用户或角色停止活动流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Sid":"AllowStopActivityStreams",
            "Effect":"Allow",
            "Action":"rds:StopActivityStream",
            "Resource":"*"
        }
     ]
}
```

**Example 拒绝启动数据库活动流的策略**  
以下 IAM 策略示例阻止用户或角色启动活动流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Sid":"DenyStartActivityStreams",
            "Effect":"Deny",
            "Action":"rds:StartActivityStream",
            "Resource":"*"
        }
     ]
}
```

**Example 拒绝停止数据库活动流的策略**  
以下 IAM 策略示例阻止用户或角色停止活动流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Sid":"DenyStopActivityStreams",
            "Effect":"Deny",
            "Action":"rds:StopActivityStream",
            "Resource":"*"
        }
    ]
}
```