

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用資料庫活動串流來監控 Amazon Aurora
<a name="DBActivityStreams"></a><a name="das"></a>

透過使用資料庫活動串流，您就可以監控資料庫活動的近乎即時的串流。

**Topics**
+ [資料庫活動串流概觀](#DBActivityStreams.Overview)
+ [Aurora MySQL 資料庫活動串流的聯網先決條件](DBActivityStreams.Prereqs.md)
+ [開始資料庫活動串流](DBActivityStreams.Enabling.md)
+ [取得資料庫活動串流的狀態](DBActivityStreams.Status.md)
+ [停用資料庫活動串流](DBActivityStreams.Disabling.md)
+ [監控資料庫活動串流](DBActivityStreams.Monitoring.md)
+ [資料庫活動串流的 IAM 政策範例](DBActivityStreams.ManagingAccess.md)

## 資料庫活動串流概觀
<a name="DBActivityStreams.Overview"></a>

作為 Amazon Aurora 資料庫管理員，您需要保護資料庫並遵守合規與法規要求。其中一項策略是整合資料庫活動串流與監控工具。透過此方式，您可以監控 Amazon Aurora 叢集中的稽核活動，並設定警示。

外部和內部都有安全威脅。若要防範內部威脅，您可以設定資料庫活動串流功能來控制管理員對資料串流的存取。 資料庫管理員沒有存取收集、傳輸、儲存和處理串流的權限。

**Contents**
+ [資料庫活動串流運作方式](#DBActivityStreams.Overview.how-they-work)
+ [資料庫活動串流的非同步和同步模式](#DBActivityStreams.Overview.sync-mode)
+ [資料庫活動串流的要求與限制](#DBActivityStreams.Overview.requirements)
+ [區域和版本可用性](#DBActivityStreams.Overview.Availability)
+ [支援資料庫活動串流的資料庫執行個體類別](#DBActivityStreams.Overview.requirements.classes)

### 資料庫活動串流運作方式
<a name="DBActivityStreams.Overview.how-they-work"></a>

於 Amazon Aurora 中，您可在叢集層級啟動資料庫活動串流。叢集中的所有資料庫執行個體皆啟用了資料庫活動串流。

您的 Aurora 資料庫叢集 會近乎即時地將活動推送至 Amazon Kinesis 資料串流。系統會自動建立 Kinesis 串流。從 Kinesis，您可以設定 Amazon Data Firehose 和 等 AWS 服務 AWS Lambda ，以取用串流並存放資料。

**重要**  
在 Amazon Aurora 中使用資料庫活動串流功能是一項免費功能，但 Amazon Kinesis 會收取資料串流費用。如需詳細資訊，請參閱 [Amazon Kinesis Data Streams 定價](https://aws.amazon.com/kinesis/data-streams/pricing/)。

若您使用 Aurora 全域資料庫，請在每個資料庫叢集上分別啟動資料庫活動串流。每個叢集在其自己的 AWS 區域中將稽核資料傳送至其自己的 Kinesis 串流。在容錯移轉期間，活動串流的作業方式不會有所不同。其繼續像往常一樣稽核您的全域資料庫。

您可為合規管理設定應用程式以使用資料庫活動串流。若為 Aurora PostgreSQL，合規應用程式包括 IBM 的 Security Guardium 和 Imperva 的 SecureSphere Database Audit and Protection。這些應用程式可以使用串流來產生警示，並稽核您的 Aurora 資料庫叢集。

下圖展示使用 Amazon Data Firehose 設定的 Aurora 資料庫叢集。

![\[架構圖顯示 Firehose 使用之 Aurora 資料庫叢集的資料庫活動串流\]](http://docs.aws.amazon.com/zh_tw/AmazonRDS/latest/AuroraUserGuide/images/aurora-das.png)


### 資料庫活動串流的非同步和同步模式
<a name="DBActivityStreams.Overview.sync-mode"></a>

您可以選擇讓資料庫工作階段使用以下任一模式來處理資料庫活動事件：
+ **非同步模式** – 在資料庫工作階段產生活動串流事件時，工作階段會立即傳回正常的活動。系統會在背景中將活動串流事件變成耐久的記錄。如果背景任務中發生錯誤，就會傳送 RDS 事件。此事件會指出活動串流事件記錄可能遺失的任何時段的開頭和結尾。

  非同步模式對資料庫效能的幫助較大，對活動串流精準度的幫助較小。
**注意**  
 非同步模式適用於 Aurora PostgreSQL 和 Aurora MySQL。
+ **同步模式** – 在資料庫工作階段產生活動串流事件時，在該事件變得耐久前工作階段都會封鎖。如果事件因故而無法變得耐久，資料庫工作階段會傳回正常的活動。然而，系統會傳送 RDS 事件，指出活動串流記錄可能已遺失一段時間。在系統回到運作良好的狀態後，就會傳送第二個 RDS 事件。

  同步模式對活動串流精準度的幫助較大，對資料庫效能的幫助較小。
**注意**  
 同步模式可用於 Aurora PostgreSQL。您不能搭配 Aurora MySQL 使用同步模式。

### 資料庫活動串流的要求與限制
<a name="DBActivityStreams.Overview.requirements"></a>

在 Aurora 中，資料庫活動串流具有以下要求和限制：
+ 資料庫活動串流需要使用 Amazon Kinesis。
+ AWS Key Management Service 資料庫活動串流需要 (AWS KMS)，因為它們一律會加密。
+ 將其他加密套用至 Amazon Kinesis 資料串流與已使用 AWS KMS 金鑰加密的資料庫活動串流不相容。
+ 在資料庫叢集層級啟動資料庫活動串流。若您將資料庫執行個體新增至叢集，則無需在執行個體上啟動活動串流：其會自動進行稽核。
+ 在 Aurora 全域資料庫中，請確保個別在每個資料庫叢集上啟動活動串流。每個叢集在其自己的 AWS 區域中將稽核資料傳送至其自己的 Kinesis 串流。
+ 在 Aurora PostgreSQL 中，請務必在主要版本升級之前停止資料庫活動串流。升級完成後，您可以啟動資料庫活動串流。

### 區域和版本可用性
<a name="DBActivityStreams.Overview.Availability"></a>

功能可用性和支援會因每個 Aurora 資料庫引擎的特定版本以及 AWS 區域而有所不同。如需有關 Aurora 和資料庫活動串流版本和區域可用性的詳細資訊，請參閱 [資料庫活動串流的支援區域和 Aurora 資料庫引擎](Concepts.Aurora_Fea_Regions_DB-eng.Feature.DBActivityStreams.md)。

### 支援資料庫活動串流的資料庫執行個體類別
<a name="DBActivityStreams.Overview.requirements.classes"></a>

針對 Aurora MySQL,您可以搭配下列資料庫執行個體類別來使用資料庫活動串流：
+ db.r8g.\$1large
+ db.r7g.\$1large
+ db.r7i.\$1large
+ db.r6g.\$1large
+ db.r6i.\$1large
+ db.r5.\$1large
+ db.x2g.\$1

針對 Aurora PostgreSQL,您可以搭配下列資料庫執行個體類別來使用資料庫活動串流：
+ db.r8g.\$1large
+ db.r7i.\$1large
+ db.r7g.\$1large
+ db.r6g.\$1large
+ db.r6i.\$1large
+ db.r6id.\$1large
+ db.r5.\$1large
+ db.r4.\$1large
+ db.x2g.\$1

# Aurora MySQL 資料庫活動串流的聯網先決條件
<a name="DBActivityStreams.Prereqs"></a>

在下文中，您可以了解如何設定 Virtual Private Cloud (VPC) 以與資料庫活動串流搭配使用。

**注意**  
Aurora MySQL 網路先決條件適用於下列引擎版本：  
Aurora MySQL 第 2 版，最高至 2.11.3
Aurora MySQL 第 2.12.0 版
Aurora MySQL 第 3 版，最高至 3.04.2

**Topics**
+ [AWS KMS 端點的先決條件](#DBActivityStreams.Prereqs.KMS)
+ [公開可用性的先決條件](#DBActivityStreams.Prereqs.Public)
+ [私有可用性的先決條件](#DBActivityStreams.Prereqs.Private)

## AWS KMS 端點的先決條件
<a name="DBActivityStreams.Prereqs.KMS"></a>

Aurora MySQL 叢集中使用活動串流的執行個體必須能夠存取 AWS KMS 端點。在啟用 Aurora MySQL 叢集的資料庫活動串流之前，請確定已滿足此需求。如果 Aurora 叢集可公開使用，則會自動滿足此要求。

**重要**  
如果 Aurora MySQL 資料庫叢集無法存取 AWS KMS 端點，活動串流便會停止。在這種情況下，Aurora 會使用 RDS 事件通知您有關此問題。

## 公開可用性的先決條件
<a name="DBActivityStreams.Prereqs.Public"></a>

Aurora 資料庫叢集必須符合下列條件才能公開：
+ AWS 管理主控台叢集詳細資訊頁面中的 **Publicly Accessible** (可公開存取) 為 **Yes** (是)。
+ 資料庫叢集位於 Amazon VPC 公有子網路中。如需可公開存取之資料庫執行個體的詳細資訊，請參閱[在 VPC 中使用資料庫叢集](USER_VPC.WorkingWithRDSInstanceinaVPC.md)。如需有關公有 Amazon VPC 子網路的更多資訊，請參閱[您的 VPC 與子網路](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Subnets.html)。

## 私有可用性的先決條件
<a name="DBActivityStreams.Prereqs.Private"></a>

如果您的 Aurora 資料庫叢集位於 VPC 公有子網路中且不可公開存取，則為私有。若要將您的叢集保持為私有狀態，並將其與資料庫活動串流一起使用，您有下列選項：
+ 在 VPC 中設定網路位址轉譯 (NAT)。如需更多詳細資訊，請參閱 [NAT 閘道](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)。
+ 在 VPC 中建立 AWS KMS 端點。建議使用此選項，因為設定更容易。

**在 VPC 中建立 AWS KMS 端點**

1. 在 [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) 開啟 Amazon VPC 主控台。

1. 在導覽窗格中選擇 **Endpoints (端點)**。

1. 選擇**建立端點**。

   **Create Endpoint** (建立端點) 頁面隨即出現。

1. 請執行下列操作：
   + 在 **Service category** (服務類別) 中，選擇​ **AWS services** ( 服務)。
   + 在 **Service Name** (服務名稱) 中，選擇 **com.amazonaws.*region*.kms**，其中 *region* 是叢集所在的 AWS 區域。
   + 在 **VPC** 中，選擇叢集所在的 VPC。

1. 選擇**建立端點**。

如需設定 VPC 端點的詳細資訊，請參閱 [VPC 端點](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-endpoints.html)。

# 開始資料庫活動串流
<a name="DBActivityStreams.Enabling"></a>

如要監控您 Aurora 資料庫叢集中所有執行個體的資料庫活動，請在叢集層級開始活動串流。您新增至該叢集的任何資料庫執行個體也會自動受到監控。若您使用 Aurora 全域資料庫，請在每個資料庫叢集上分別啟動資料庫活動串流。每個叢集在其自己的 AWS 區域 中將稽核資料傳送至其自己的 Kinesis 串流。

在開啟的活動串流時，您在稽核政策中設定的每個資料庫活動事件都會產生活動串流事件。`CONNECT` 和 `SELECT` 之類的 SQL 命令會產生存取事件。`CREATE` 和 `INSERT` 之類的 SQL 命令會產生變更事件。

------
#### [ Console ]

**若要開始資料庫活動串流**

1. 前往 [https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)，開啟 Amazon RDS 主控台。

1. 在導覽窗格中，選擇 **Databases** (資料庫)。

1. 選擇您要在其上啟動活動串流的資料庫叢集 。

1. 針對 **Actions** (動作)，選擇 **Start activity stream** (啟動活動串流)。

   **Start database activity stream: ***name* (開始資料庫活動串流：name) 視窗隨即出現，其中 *name* 是您的 資料庫叢集。

1. 輸入以下設定：
   + 對於 **AWS KMS key**，請從 AWS KMS keys清單中選擇一個金鑰。
**注意**  
 如果您的 Aurora MySQL 叢集無法存取 KMS 金鑰，請依照 [Aurora MySQL 資料庫活動串流的聯網先決條件](DBActivityStreams.Prereqs.md) 中的指示，先啟用此類存取。

     Aurora 會使用 KMS 金鑰來加密金鑰，此金鑰會依序加密資料庫活動。請選擇預設金鑰以外的 KMS 金鑰。如需更多有關加密金鑰和 AWS KMS 的資訊，請參閱 *AWS Key Management Service 開發人員指南*中的[什麼是 AWS Key Management Service ?](https://docs.aws.amazon.com/kms/latest/developerguide/overview.html)。
   + 對於 **Database activity stream mode (資料庫活動串流模式)**，選擇 **Asynchronous (非同步)** 或 **Synchronous (同步)**。
**注意**  
此選項僅適用於 Aurora PostgreSQL。針對 Aurora MySQL，您只能使用非同步模式。
   + 選擇 **Immediately** (立即)。

     在您選擇 **Immediately** (立即) 時，資料庫叢集會立即重新啟動。如果您選擇 **During the next maintenance window** (下個維護時段期間)，則資料庫叢集不會立即重新啟動。在這種情況下直到下一個維護時段前，資料庫活動串流都不會啟動。

1. 選擇 **Start database activity stream** (啟動資料庫活動串流)。

   資料庫叢集的狀態會顯示活動串流正在開始。
**注意**  
如果收到錯誤訊息 `You can't start a database activity stream in this configuration`，請檢查 [支援資料庫活動串流的資料庫執行個體類別](DBActivityStreams.md#DBActivityStreams.Overview.requirements.classes) 以查看 資料庫叢集是否使用支援的執行個體類別。

------
#### [ AWS CLI ]

若要開始資料庫叢集的資料庫活動串流，請使用 [start-activity-stream](https://docs.aws.amazon.com/cli/latest/reference/rds/start-activity-stream.html) AWS CLI 命令來設定資料庫叢集。
+ `--resource-arn arn`– 指定資料庫的 Amazon 資源名稱 (ARN) 叢集。
+ `--mode sync-or-async` – 指定同步 (`sync`) 或非同步 (`async`) 模式。對於 Aurora PostgreSQL，您可以選擇其中一個值。針對 Aurora MySQL，請指定 `async`。
+ `--kms-key-id key` – 指定用於加密資料庫活動串流中訊息的 KMS 金鑰識別碼。AWS KMS 金鑰識別碼是 AWS KMS key的金鑰 ARN、金鑰 ID、別名 ARN 或別名。

下列範例會在非同步模式下開始資料庫叢集的活動串流。

對於 Linux、macOS 或 Unix：

```
aws rds start-activity-stream \
    --mode async \
    --kms-key-id my-kms-key-arn \
    --resource-arn my-cluster-arn \
    --apply-immediately
```

在 Windows 中：

```
aws rds start-activity-stream ^
    --mode async ^
    --kms-key-id my-kms-key-arn ^
    --resource-arn my-cluster-arn ^
    --apply-immediately
```

------
#### [ Amazon RDS API ]

若要開始資料庫叢集的資料庫活動串流，請使用 [StartActivityStream](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_StartActivityStream.html) 操作來設定叢集。

使用以下參數呼叫動作：
+ `Region`
+ `KmsKeyId`
+ `ResourceArn`
+ `Mode`

------

**注意**  
如果您收到錯誤，指出無法使用目前版本的 Aurora PostgreSQL 資料庫啟動資料庫活動串流，請在啟動資料庫活動串流之前套用 Aurora PostgreSQL 的最新修補程式。如需升級 Aurora PostgreSQL 資料庫的詳細資訊，請參閱[升級 Amazon Aurora 資料庫叢集](Aurora.VersionPolicy.Upgrading.md)。  
以下是使用 Aurora PostgreSQL 啟動資料庫活動串流的最低修補程式版本。  
3.4.15 (11.9.15)、11.21.10
12.9.15、12.15.9、12.16.10、12.17.7、12.18.5、12.19.4、12.20.3、12.22.3
13.9.12、13.11.9、13.12.10、13.13.7、13.14.5、13.15.4、13.16.3、13.18.3
14.6.12、14.8.9、14.9.10、14.10.7、14.11.5、、14.12.4、14.13.3、14.15.3
15.3.9、15.4.10、15.5.7、15.6.5、、15.7.4、15.8.3、、15.10.3
16.1.7、16.2.5、16.3.4、16.4.3、、16.6.3

# 取得資料庫活動串流的狀態
<a name="DBActivityStreams.Status"></a>

您可以使用主控台或 AWS CLI 來取得 的活動串流狀態。

## 主控台
<a name="DBActivityStreams.Status-collapsible-section-S1"></a>

**取得資料庫活動串流的狀態**

1. 前往 [https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)，開啟 Amazon RDS 主控台。

1. 在導覽窗格中，選擇 **Databases** (資料庫)，然後選擇 DB cluster (資料庫叢集) (資料庫執行個體) 連結。

1. 選擇 **Configuration (組態)** 標籤，然後確認 **Database activity stream (資料庫活動串流)** 的狀態。

## AWS CLI
<a name="DBActivityStreams.Status-collapsible-section-S2"></a>

您可以取得資料庫叢集的活動串流組態，作為對 [describe-db-clusters](https://docs.aws.amazon.com/cli/latest/reference/rds/describe-db-clusters.html) CLI 請求的回應。

以下範例描述 *my-cluster*。

```
aws rds --region my-region describe-db-clusters --db-cluster-identifier my-cluster
```

JSON 回應如以下範例所示。下列欄位會顯示：
+ `ActivityStreamKinesisStreamName`
+ `ActivityStreamKmsKeyId`
+ `ActivityStreamStatus`
+ `ActivityStreamMode`
+ 

這些欄位對於 Aurora PostgreSQL 和 Aurora MySQL 是相同的，除非 `ActivityStreamMode` 對於 Aurora MySQL 一律是 `async`，而對於 Aurora PostgreSQL，它可能是 `sync` 或 `async`。

```
{
    "DBClusters": [
        {
      "DBClusterIdentifier": "my-cluster",
            ...
            "ActivityStreamKinesisStreamName": "aws-rds-das-cluster-A6TSYXITZCZXJHIRVFUBZ5LTWY",
            "ActivityStreamStatus": "starting",
            "ActivityStreamKmsKeyId": "12345678-abcd-efgh-ijkl-bd041f170262",
            "ActivityStreamMode": "async",
            "DbClusterResourceId": "cluster-ABCD123456"
            ...
        }
    ]
}
```

## RDS API
<a name="DBActivityStreams.Status-collapsible-section-S3"></a>

您可以取得資料庫叢集的活動串流組態，作為對 [DescribeDBClusters](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_DescribeDBClusters.html) 操作的回應。

# 停用資料庫活動串流
<a name="DBActivityStreams.Disabling"></a>

您可以使用主控台或 AWS CLI 來停止活動串流。

若您刪除資料庫叢集 ，則會停止活動串流，並自動刪除底層的 Amazon Kinesis 串流。

## 主控台
<a name="DBActivityStreams.Disabling-collapsible-section-D1"></a>

**關閉活動串流**

1. 前往 [https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)，開啟 Amazon RDS 主控台。

1. 在導覽窗格中，選擇 **Databases** (資料庫)。

1. 選擇您要停止資料庫活動串流的資料庫叢集。

1. 針對 **Actions** (動作)，選擇 **Stop activity stream** (停止活動串流)。**Database Activity Stream (資料庫活動串流)** 即會顯示。

   1. 選擇 **Immediately** (立即)。

      在您選擇 **Immediately** (立即) 時，資料庫叢集會立即重新啟動。如果您選擇 **During the next maintenance window** (下個維護時段期間)，則資料庫叢集 不會立即重新啟動。在此情況下，直到下一個維護時段前，資料庫活動串流都不會被停用。

   1. 選擇 **Continue (繼續)**。

## AWS CLI
<a name="DBActivityStreams.Disabling-collapsible-section-D2"></a>

若要停止您資料庫叢集的資料庫活動串流，請使用 AWS CLI 命令 [stop-activity-stream](https://docs.aws.amazon.com/cli/latest/reference/rds/stop-activity-stream.html) 來設定資料庫叢集。使用 `--region` 參數來識別資料庫叢集的 AWS 區域。`--apply-immediately` 為選用參數。

對於 Linux、macOS 或 Unix：

```
aws rds --region MY_REGION \
    stop-activity-stream \
    --resource-arn MY_CLUSTER_ARN \
    --apply-immediately
```

在 Windows 中：

```
aws rds --region MY_REGION ^
    stop-activity-stream ^
    --resource-arn MY_CLUSTER_ARN ^
    --apply-immediately
```

## RDS API
<a name="DBActivityStreams.Disabling-collapsible-section-D3"></a>

若要停止您的資料庫叢集的資料庫活動串流，請使用 [StopActivityStream](https://docs.aws.amazon.com/AmazonRDS/latest/APIReference/API_StopActivityStream.html) 操作來設定叢集。使用 `Region` 參數來識別資料庫叢集的 AWS 區域。`ApplyImmediately` 為選用參數。

# 監控資料庫活動串流
<a name="DBActivityStreams.Monitoring"></a>

資料庫活動串流會監控和報告活動。活動資料串流會收集並傳送至 Amazon Kinesis。從 Kinesis 中，您可以監視活動串流，或者其他服務和應用程式可以使用活動串流以供進一步分析。您可以使用 AWS CLI 命令 `describe-db-clusters` 或 RDS API `DescribeDBClusters` 操作來尋找基礎 Kinesis 串流名稱。

Aurora 會為您管理 Kinesis 串流，如下所示：
+ Aurora 會自動建立保留時間為 24 小時的 Kinesis 串流。
+  如有必要，Aurora 可擴展 Kinesis 串流。
+  如果您停止資料庫活動串流或刪除資料庫叢集，則 Aurora 會刪除 Kinesis 串流。

系統會監控以下類別的活動並將其放在活動串流稽核日誌中：
+ **SQL 命令** – 所有 SQL 命令都經稽核，也是預備陳述式、內建函數和 PL/SQL 函數。對預存程序的呼叫會進行稽核。在儲存的程序或函數中發出的任何 SQL 語句也被稽核。
+ **其他資料庫資訊** – 監控的活動包含完整的 SQL 陳述式、因 DML 命令而受影響的資料列數、經存取的物件和唯一的資料庫名稱。若為 Aurora PostgreSQL，資料庫活動串流還會監控連結變數和已儲存的程序參數。
**重要**  
活動資料串流稽核記錄中會顯示每個陳述式的完整 SQL 文字，包括任何敏感資料。但是，如果 Aurora 可以從內容 (例如下列 SQL 陳述式) 判斷資料庫使用者密碼，則該密碼將會被修訂。  

  ```
  ALTER ROLE role-name WITH password
  ```
+ **連線資訊** – 監控的活動包含工作階段和網路資訊、伺服器程序 ID 和結束代碼。

如果活動串流在監控資料庫執行個體時失敗，系統會透過 RDS 事件來通知您。

在下列各節中，您可以存取、稽核和處理資料庫活動串流。

**Topics**
+ [透過 Amazon Kinesis 存取活動串流](DBActivityStreams.KinesisAccess.md)
+ [資料庫活動串流的稽核日誌內容和範例](DBActivityStreams.AuditLog.md)
+ [適用於資料庫活動串流的 databaseActivityEventList JSON 陣列](DBActivityStreams.AuditLog.databaseActivityEventList.md)
+ [使用 AWS SDK 來處理資料庫活動串流](DBActivityStreams.CodeExample.md)

# 透過 Amazon Kinesis 存取活動串流
<a name="DBActivityStreams.KinesisAccess"></a>

在您啟用資料庫叢集的活動串流時，系統就會為您建立 Kinesis 串流。透過 Kinesis，您就可以即時監控資料庫活動。若要進一步分析資料庫活動，您可以將 Kinesis 串流連線至消費者應用程式。您也可以將串流連線至合規性管理應用程式，例如 IBM 的 Security Guardium 或 Imperva 的 SecureSphere Database Audit and Protection。

您可以從 RDS 主控台或 Kinesis 主控台存取 Kinesis 串流。

**使用 RDS 主控台從 Kinesis 存取活動串流**

1. 前往 [https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)，開啟 Amazon RDS 主控台。

1. 在導覽窗格中，選擇 **Databases** (資料庫)。

1. 選擇已在其上啟動活動串流的資料庫叢集。

1. 選擇 **Configuration (組態)**。

1. 在 **Database activity stream** (資料庫活動串流) 下，選擇 **Kinesis stream** (Kinesis 串流) 下的連結。

1. 在 Kinesis 主控台中，選擇 **Monitoring** (監控) 來開始觀察資料庫活動。

**使用 Kinesis 主控台從 Kinesis 存取活動串流**

1. 在 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 上開啟 Kinesis 主控台。

1. 從 Kinesis 串流清單中選擇活動串流。

   活動串流的名稱包含字首 `aws-rds-das-cluster-`，後接資料庫叢集的資源 ID。以下是範例。

   ```
   aws-rds-das-cluster-NHVOV4PCLWHGF52NP
   ```

   若要使用 Amazon RDS 主控台來尋找資料庫叢集的資源 ID，請從資料庫清單中選擇您的資料庫叢集，然後選擇**Configuration (組態)** 索引標籤。

   若要使用 AWS CLI 來尋找活動串流的完整 Kinesis 串流名稱，請使用 [describe-db-clusters](https://docs.aws.amazon.com/cli/latest/reference/rds/describe-db-clusters.html) CLI 請求，並記下回應中的 `ActivityStreamKinesisStreamName` 值。

1. 選擇 **Monitoring (監控)** 來開始觀察資料庫活動。

如需使用 Amazon Kinesis 的詳細資訊，請參閱[什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)。

# 資料庫活動串流的稽核日誌內容和範例
<a name="DBActivityStreams.AuditLog"></a>

受監控的事件會以 JSON 字串的形式在資料庫活動串流中顯示。此結構包含 JSON 物件，內含的 `DatabaseActivityMonitoringRecord` 會依序包含活動事件的 `databaseActivityEventList` 陣列。

**注意**  
對於資料庫活動串流，`paramList` JSON 陣列不包含休眠應用程式的 null 值。

**Topics**
+ [活動串流的稽核記錄範例](#DBActivityStreams.AuditLog.Examples)
+ [DatabaseActivityMonitoringRecords JSON 物件](#DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords)
+ [databaseActivityEvents JSON 物件](#DBActivityStreams.AuditLog.databaseActivityEvents)

## 活動串流的稽核記錄範例
<a name="DBActivityStreams.AuditLog.Examples"></a>

以下是活動事件記錄的範例解密 JSON 稽核日誌。

**Example Aurora PostgreSQL CONNECT SQL 陳述式的活動事件記錄**  
以下活動事件記錄顯示 psql 用戶端 (`clientApplication`) 使用 `CONNECT` SQL 陳述式 (`command`) 登入。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-10-30 00:39:49.940668+00",
          "logTime": "2019-10-30 00:39:49.990579+00",
          "statementId": 1,
          "substatementId": 1,
          "objectType": null,
          "command": "CONNECT",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "49804",
          "sessionId": "5ce5f7f0.474b",
          "rowCount": null,
          "commandText": null,
          "paramList": [],
          "pid": 18251,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "MISC",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Aurora MySQL CONNECT SQL 陳述式的活動事件記錄**  
以下是 mysql 用戶端 (`clientApplication`) 使用 `CONNECT` SQL 陳述式 (`command`) 登入的活動事件記錄。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:13.267214+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"rdsadmin",
      "databaseName":"",
      "remoteHost":"localhost",
      "remotePort":"11053",
      "command":"CONNECT",
      "commandText":"",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"",
      "statementId":0,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725121",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:13.267207+00",
      "endTime":"2020-05-22 18:07:13.267213+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```

**Example Aurora PostgreSQL CREATE TABLE 陳述式的活動事件記錄**  
以下是 Aurora PostgreSQL 的 `CREATE TABLE` 事件範例。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:36:54.403455+00",
          "logTime": "2019-05-24 00:36:54.494235+00",
          "statementId": 2,
          "substatementId": 1,
          "objectType": null,
          "command": "CREATE TABLE",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": null,
          "commandText": "create table my_table (id serial primary key, name varchar(32));",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "DDL",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Aurora MySQL CREATE TABLE 陳述式的活動事件記錄**  
以下範例顯示 Aurora MySQL 的 `CREATE TABLE` 陳述式。該操作會以兩個不同的事件記錄表示。一個活動具有 `"class":"MAIN"`。另一個活動具有 `"class":"AUX"`。這些訊息可能以任何順序到達。`logTime` 事件的 `MAIN` 欄位永遠早於任何對應 `logTime` 事件的 `AUX` 欄位。  
下列範例會顯示 `class` 值為 `MAIN` 的事件。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.250221+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"CREATE TABLE test1 (id INT)",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.250222+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 下列範例會顯示 `class` 值為 `AUX` 的對應事件。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.247182+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"CREATE",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.247182+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

**Example Aurora PostgreSQLSELECT 陳述式的活動事件記錄**  
下列範例顯示 的 `SELECT` 事件。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:39:49.920564+00",
          "logTime": "2019-05-24 00:39:49.940668+00",
          "statementId": 6,
          "substatementId": 1,
          "objectType": "TABLE",
          "command": "SELECT",
          "objectName": "public.my_table",
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": 10,
          "commandText": "select * from my_table;",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "READ",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

```
{
    "type": "DatabaseActivityMonitoringRecord",
    "clusterId": "",
    "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q",
    "databaseActivityEventList": [
        {
            "class": "TABLE",
            "clientApplication": "Microsoft SQL Server Management Studio - Query",
            "command": "SELECT",
            "commandText": "select * from [testDB].[dbo].[TestTable]",
            "databaseName": "testDB",
            "dbProtocol": "SQLSERVER",
            "dbUserName": "test",
            "endTime": null,
            "errorMessage": null,
            "exitCode": 1,
            "logTime": "2022-10-06 21:24:59.9422268+00",
            "netProtocol": null,
            "objectName": "TestTable",
            "objectType": "TABLE",
            "paramList": null,
            "pid": null,
            "remoteHost": "local machine",
            "remotePort": null,
            "rowCount": 0,
            "serverHost": "172.31.30.159",
            "serverType": "SQLSERVER",
            "serverVersion": "15.00.4073.23.v1.R1",
            "serviceName": "sqlserver-ee",
            "sessionId": 62,
            "startTime": null,
            "statementId": "0x03baed90412f564fad640ebe51f89b99",
            "substatementId": 1,
            "transactionId": "4532935",
            "type": "record",
            "engineNativeAuditFields": {
                "target_database_principal_id": 0,
                "target_server_principal_id": 0,
                "target_database_principal_name": "",
                "server_principal_id": 2,
                "user_defined_information": "",
                "response_rows": 0,
                "database_principal_name": "dbo",
                "target_server_principal_name": "",
                "schema_name": "dbo",
                "is_column_permission": true,
                "object_id": 581577110,
                "server_instance_name": "EC2AMAZ-NFUJJNO",
                "target_server_principal_sid": null,
                "additional_information": "",
                "duration_milliseconds": 0,
                "permission_bitmask": "0x00000000000000000000000000000001",
                "data_sensitivity_information": "",
                "session_server_principal_name": "test",
                "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109",
                "audit_schema_version": 1,
                "database_principal_id": 1,
                "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000",
                "user_defined_event_id": 0,
                "host_name": "EC2AMAZ-NFUJJNO"
            }
        }
    ]
}
```

**Example Aurora MySQL SELECT 陳述式的活動事件記錄**  
下列範例顯示 `SELECT` 事件。  
 下列範例會顯示 `class` 值為 `MAIN` 的事件。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986467+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"SELECT * FROM test1 WHERE id < 28",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"726571",
      "rowCount":2,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986467+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 下列範例會顯示 `class` 值為 `AUX` 的對應事件。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986399+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"READ",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"726571",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986399+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

## DatabaseActivityMonitoringRecords JSON 物件
<a name="DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords"></a>

資料庫活動事件記錄位於 JSON 物件中，其中包含下列資訊。


****  

| JSON 欄位 | 資料類型 | 描述 | 
| --- | --- | --- | 
|  `type`  | string |  JSON 記錄類型。值為 `DatabaseActivityMonitoringRecords`。  | 
| version | string |  資料庫活動監控記錄的版本。產生的資料庫活動記錄版本取決於資料庫叢集的引擎版本： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.html)除非特別註明，否則下列所有欄位都在 1.0 版和 1.1 版中。 | 
|  [databaseActivityEvents](#DBActivityStreams.AuditLog.databaseActivityEvents)  | string |  包含活動事件的 JSON 物件。  | 
| 金鑰 | string | 您用來解密 [databaseActivityEventList JSON 陣列](DBActivityStreams.AuditLog.databaseActivityEventList.md) 的加密金鑰  | 

## databaseActivityEvents JSON 物件
<a name="DBActivityStreams.AuditLog.databaseActivityEvents"></a>

`databaseActivityEvents` JSON 物件包含以下資訊。

### JSON 記錄中的最上層欄位
<a name="DBActivityStreams.AuditLog.topLevel"></a>

 稽核記錄檔中的每個事件都會包裝在 JSON 格式的記錄中。此記錄包含下列欄位。

**type**  
 此欄位永遠具有值 `DatabaseActivityMonitoringRecords`。

**version**  
 此欄位代表資料庫活動串流資料通訊協定或合約的版本。其會定義哪些欄位可用。  
1.0 版代表 Aurora PostgreSQL 10.7 和 11.4 版的原始資料活動串流支援。1.1 版代表 Aurora PostgreSQL 10.10 及更高版本和 Aurora PostgreSQL 11.5 及更高版本的資料活動串流支援。1.1 版包含其他欄位 `errorMessage` 和 `startTime`。1.2 版代表 Aurora MySQL 2.08 及更高版本的資料活動串流支援。1.2 版包含其他欄位 `endTime` 和 `transactionId`。

**databaseActivityEvents**  
 代表一或多個活動事件的加密字串。它被表示為一個 base64 位元組陣列。在您解密字串時，結果會是 JSON 格式的記錄，其中包含欄位，如本節範例所示。

**金鑰**  
 用來加密 `databaseActivityEvents` 字串的加密資料金鑰。這與您啟動資料庫活動串流時 AWS KMS key 提供的相同。

 下列範例顯示此記錄的格式。

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents":"encrypted audit records",
  "key":"encrypted key"
}
```

採取下列步驟來解密 `databaseActivityEvents` 欄位的內容：

1.  使用您在啟動資料庫活動串流時提供的 KMS 金鑰，解密 `key` JSON 欄位中的值。這麼做會以純文字傳回資料加密金鑰。

1.  Base64 解碼 `databaseActivityEvents` JSON 欄位中的值，以取得稽核承載的二進位格式的加密文字。

1.  使用您在第一個步驟中解碼的資料加密金鑰來解密二進位密文。

1.  解壓縮已解密的承載。
   +  加密的承載在 `databaseActivityEvents` 欄位。
   +  該 `databaseActivityEventList` 欄位包含稽核記錄的陣列。陣列中的 `type` 欄位可以是 `record` 或 `heartbeat`。

稽核日誌活動事件記錄是包含以下資訊的 JSON 物件。


****  

| JSON 欄位 | 資料類型 | 描述 | 
| --- | --- | --- | 
|  `type`  | string |  JSON 記錄類型。值為 `DatabaseActivityMonitoringRecord`。  | 
| clusterId | string | 資料庫叢集資源識別符。其對應於資料庫叢集屬性 DbClusterResourceId。 | 
| instanceId | string | 資料庫執行個體資源識別符。它對應於資料庫執行個體屬性 DbiResourceId。 | 
|  [databaseActivityEventList JSON 陣列](DBActivityStreams.AuditLog.databaseActivityEventList.md)   | string |  活動稽核記錄或活動訊號訊息的陣列。  | 

# 適用於資料庫活動串流的 databaseActivityEventList JSON 陣列
<a name="DBActivityStreams.AuditLog.databaseActivityEventList"></a>

稽核日誌承載是加密的 `databaseActivityEventList` JSON 陣列。以下資料表列出稽核記錄中已解密 `DatabaseActivityEventList` 陣列中，每個活動事件的欄位 (按英文字母順序列出)。這些欄位會根據您使用 Aurora PostgreSQL 或 Aurora MySQL 而有所不同。請參閱適用於資料庫引擎的表格。

**重要**  
事件結構可能會改變。Aurora 可能會在未來將新的欄位新增至活動事件。在剖析 JSON 資料的應用程式中，請確定程式碼可以忽略，或針對未知欄位名稱採取適當的動作。

## Aurora PostgreSQL 的 databaseActivityEventList 欄位
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.apg"></a>

以下是 Aurora PostgreSQL 的 `databaseActivityEventList` 欄位。


| 欄位 | 資料類型 | 描述 | 
| --- | --- | --- | 
| class | string |  活動事件的類別。Aurora PostgreSQL 的有效值如下： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | string | 用戶端報告用來連接的應用程式。用戶端不需提供此資訊，因此此值可以是 Null。 | 
| command | string | SQL 命令名稱，其中不含任何命令詳細資訊。 | 
| commandText | string |  使用者傳遞的實際 SQL 陳述式。針對 Aurora PostgreSQL，該值與原始 SQL 陳述式相同。此欄位可用於連接或中斷連接記錄以外的所有記錄類型，在前述兩種例外類型中值為 Null。  活動資料串流稽核記錄中會顯示每個陳述式的完整 SQL 文字，包括任何敏感資料。但是，如果 Aurora 可以從內容 (例如下列 SQL 陳述式) 判斷資料庫使用者密碼，則會編寫密文。 <pre>ALTER ROLE role-name WITH password</pre>   | 
| databaseName | string | 使用者連接的資料庫。 | 
| dbProtocol | string | 資料庫通訊協定，例如 Postgres 3.0。 | 
| dbUserName | string | 用戶端驗證所用的資料庫使用者。 | 
| errorMessage(僅限 1.1 版資料庫活動記錄) | string |  如果有任何錯誤，該欄位會填入由資料庫伺服器產生的錯誤訊息。對於未導致錯誤的正常陳述式，此 `errorMessage` 值為 null。 錯誤是定義為任何會產生用戶端可見 PostgreSQL 錯誤日誌事件的活動，其嚴重性層級為 `ERROR` 或更高。如需詳細資訊，請參閱 [PostgreSQL 訊息嚴重性等級](https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-SEVERITY-LEVELS)。例如，語法錯誤和查詢取消會產生錯誤訊息。 內部 PostgreSQL 伺服器錯誤，例如背景檢查點指標處理程序錯誤，並不會產生錯誤訊息。不過，無論日誌嚴重性層級的設定為何，仍會發出這類事件的記錄。這可以防止攻擊者關閉記錄日誌以嘗試避免偵測。 另請參閱 `exitCode` 欄位。  | 
| exitCode | int | 工作階段結束記錄所用的值。清除結束時，此會包含結束代碼。在某些失敗狀況下可能無法隨時取得結束代碼。範例為 PostgreSQL 執行 exit() 或運算子執行 kill -9 之類的命令。如果發生任何錯誤，此 `exitCode` 欄位會顯示 SQL 錯誤代碼 `SQLSTATE`，如 [PostgreSQL 錯誤代碼](https://www.postgresql.org/docs/current/errcodes-appendix.html)中所列。另請參閱 `errorMessage` 欄位。 | 
| logTime | string | 在稽核程式碼路徑中記錄的時間戳記。這代表 SQL 陳述式執行結束時間。另請參閱 startTime 欄位。 | 
| netProtocol | string | 網路通訊協定。 | 
| objectName | string | SQL 陳述式在其中操作的資料庫物件名稱。僅在 SQL 陳述式在資料庫物件上操作時才會使用此欄位。如果 SQL 陳述式沒有在物件上操作，則此值為 Null。 | 
| objectType | string | 資料表、索引、檢視等之類的資料庫物件類型。僅在 SQL 陳述式在資料庫物件上操作時才會使用此欄位。如果 SQL 陳述式沒有在物件上操作，則此值為 Null。有效值包括以下項目：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) | 
| paramList | string | 傳遞至 SQL 陳述式的參數陣列 (以逗號分隔)。如果 SQL 陳述式沒有任何參數，此值會是空的陣列。 | 
| pid | int | 後端程序的程序 ID，此程序的配置是用來提供用戶端連線。 | 
| remoteHost | string | 用戶端 IP 地址或主機名稱。針對 Aurora PostgreSQL，使用哪一個取決於資料庫的 log\$1hostname 參數設定。此 remoteHost 值也包含 [local]，且 localhost 表示來自 rdsadmin 使用者的活動。 | 
| remotePort | string | 用戶端連接埠號碼。 | 
| rowCount | int | SQL 陳述式影響或擷取的資料列數。僅會對資料處理語言 (DML) 陳述式的 SQL 陳述式使用此欄位。如果 SQL 陳述式不是 DML 陳述式，則此值為 Null。 | 
| serverHost | string | 資料庫伺服器主機 IP 地址。此 serverHost 值也包含 [local]，且 localhost 表示來自 rdsadmin 使用者的活動。 | 
| serverType | string | 資料庫伺服器類型，例如 PostgreSQL。 | 
| serverVersion | string | 資料庫伺服器版本，例如 Aurora PostgreSQL 的 2.3.1。 | 
| serviceName | string | 服務名稱，例如 Amazon Aurora PostgreSQL-Compatible edition。 | 
| sessionId | int | 虛擬唯一的工作階段識別符。 | 
| sessionId | int | 虛擬唯一的工作階段識別符。 | 
| startTime(僅限 1.1 版資料庫活動記錄) | string |  SQL 陳述式開始執行的時間。 若要計算 SQL 陳述式大約的執行時間，請使用 `logTime - startTime`。另請參閱 `logTime` 欄位。  | 
| statementId | int | 用戶端 SQL 陳述式的識別符。記數器是使用工作階段層級，且會隨著用戶端輸入的每個 SQL 陳述式遞增。 | 
| substatementId | int | SQL 子陳述式的識別符。此值會計算由 statementId 欄位識別的每個 SQL 陳述式包含的子陳述式。 | 
| type | string | 事件類型。有效值為 record 或 heartbeat。 | 

## Aurora MySQL 的 databaseActivityEventList 欄位
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.ams"></a>

以下是 Aurora MySQL 的 `databaseActivityEventList` 欄位。


| 欄位 | 資料類型 | 描述 | 
| --- | --- | --- | 
| class | string |  活動事件的類別。 Aurora MySQL 的有效值如下： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | string | 用戶端報告用來連接的應用程式。用戶端不需提供此資訊，因此此值可以是 Null。 | 
| command | string |  SQL 陳述式的一般類別。此欄位的值取決於 `class` 的值。 當 `class` 為 `MAIN` 時的值包括以下內容： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) 當 `class` 為 `AUX` 時的值包括以下內容： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| commandText | string |  對於 `class` 值為 `MAIN` 的事件，此欄位代表使用者傳入的實際 SQL 陳述式。此欄位可用於連接或中斷連接記錄以外的所有記錄類型，在前述兩種例外類型中值為 Null。  對於 `class` 值為 `AUX` 的事件，此欄位包含有關事件涉及之物件的補充資訊。 針對 Aurora MySQL，引號之類的字元前面加上反斜線，代表逸出字元。  稽核記錄中會顯示每個陳述式的完整 SQL 文字，包括任何敏感資料。但是，如果 Aurora 可以從內容 (例如下列 SQL 陳述式) 判斷資料庫使用者密碼，則會編寫密文。 <pre>mysql> SET PASSWORD = 'my-password';</pre> 指定此處所顯示提示以外的密碼，作為安全最佳實務。   | 
| databaseName | string | 使用者連接的資料庫。 | 
| dbProtocol | string | 資料庫通訊協定。目前，Aurora MySQL 的這個值永遠是 MySQL。 | 
| dbUserName | string | 用戶端驗證所用的資料庫使用者。 | 
| endTime(僅限 1.2 版資料庫活動記錄) | string |  SQL 陳述式結束執行的時間。以國際標準時間 (UTC) 格式表示。 若要計算 SQL 陳述式的執行時間，請使用 `endTime - startTime`。另請參閱 `startTime` 欄位。  | 
| errorMessage(僅限 1.1 版資料庫活動記錄) | string |  如果有任何錯誤，該欄位會填入由資料庫伺服器產生的錯誤訊息。對於未導致錯誤的正常陳述式，此 `errorMessage` 值為 null。 錯誤是定義為任何會產生用戶端可見 MySQL 錯誤日誌事件的活動，其嚴重性層級為 `ERROR` 或更高。如需更多資訊，請參閱 *MySQL 參考手冊*中的[錯誤記錄](https://dev.mysql.com/doc/refman/5.7/en/error-log.html)。例如，語法錯誤和查詢取消會產生錯誤訊息。 內部 MySQL 伺服器錯誤，例如背景檢查點指標處理程序錯誤，並不會產生錯誤訊息。不過，無論日誌嚴重性層級的設定為何，仍會發出這類事件的記錄。這可以防止攻擊者關閉記錄日誌以嘗試避免偵測。 另請參閱 `exitCode` 欄位。  | 
| exitCode | int | 工作階段結束記錄所用的值。清除結束時，此會包含結束代碼。在某些失敗狀況下可能無法隨時取得結束代碼。在這種情況下，此值可能是零，也可能是空白。 | 
| logTime | string | 在稽核程式碼路徑中記錄的時間戳記。以國際標準時間 (UTC) 格式表示。如需計算陳述式持續時間的最準確方式，請參閱 startTime 和 endTime 欄位。 | 
| netProtocol | string | 網路通訊協定。目前，Aurora MySQL 的這個值永遠是 TCP。 | 
| objectName | string | SQL 陳述式在其中操作的資料庫物件名稱。僅在 SQL 陳述式在資料庫物件上操作時才會使用此欄位。如果 SQL 陳述式沒有在物件上操作，則此值為空白。若要建構物件的完整名稱，請結合 databaseName 和 objectName。如果查詢涉及多個物件，則此欄位可以是以逗號分隔的名稱清單。 | 
| objectType | string |  資料表、索引、檢視等之類的資料庫物件類型。僅在 SQL 陳述式在資料庫物件上操作時才會使用此欄位。如果 SQL 陳述式沒有在物件上操作，則此值為 Null。 Aurora MySQL 的有效值包括以下項目： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| paramList | string | 此欄位不會用於 Aurora MySQL 且一律為空。 | 
| pid | int | 後端程序的程序 ID，此程序的配置是用來提供用戶端連線。重新啟動資料庫伺服器時，pid 變更和 statementId 欄位的計數器會重新啟動。 | 
| remoteHost | string | 發出 SQL 陳述式之用戶端的 IP 地址或主機名稱。針對 Aurora MySQL，使用哪一個取決於資料庫的 skip\$1name\$1resolve 參數設定。localhost 的值表示來自 rdsadmin 特殊使用者的活動。 | 
| remotePort | string | 用戶端連接埠號碼。 | 
| rowCount | int | SQL 陳述式傳回的資料列數目。例如，如果一個 SELECT 陳述式傳回 10 個資料列，則 rowCount 為 10。對於 INSERT 或 UPDATE 陳述式，rowCount 為 0。 | 
| serverHost | string | 資料庫伺服器執行個體識別符。 | 
| serverType | string | 資料庫伺服器類型，例如 MySQL。 | 
| serverVersion | string | 資料庫伺服器版本。目前，Aurora MySQL 的這個值永遠是 MySQL 5.7.12。 | 
| serviceName | string | 服務的名稱。目前，Aurora MySQL 的這個值永遠是 Amazon Aurora MySQL。 | 
| sessionId | int | 虛擬唯一的工作階段識別符。 | 
| startTime(僅限 1.1 版資料庫活動記錄) | string |  SQL 陳述式開始執行的時間。以國際標準時間 (UTC) 格式表示。 若要計算 SQL 陳述式的執行時間，請使用 `endTime - startTime`。另請參閱 `endTime` 欄位。  | 
| statementId | int | 用戶端 SQL 陳述式的識別符。計數器會隨著用戶端輸入的每個 SQL 陳述式而增加。當資料庫執行個體重新啟動時，計數器會重設。 | 
| substatementId | int | SQL 子陳述式的識別符。對於具有類別 MAIN 的事件此值是 1，對於具有類別 AUX 的事件則為 2。使用此 statementId 欄位可識別由相同陳述式產生的所有事件。 | 
| transactionId(僅限 1.2 版資料庫活動記錄) | int | 交易的識別符。 | 
| type | string | 事件類型。有效值為 record 或 heartbeat。 | 

# 使用 AWS SDK 來處理資料庫活動串流
<a name="DBActivityStreams.CodeExample"></a>

您可以透過使用 AWS 開發套件來以程式設計的方式處理活動串流。以下是功能完整的 Java 和 Python 範例，其示範您可以如何處理 Kinesis 資料串流。

------
#### [ Java ]

```
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.Security;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;

import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.SecretKeySpec;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CryptoInputStream;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kms.AWSKMS;
import com.amazonaws.services.kms.AWSKMSClientBuilder;
import com.amazonaws.services.kms.model.DecryptRequest;
import com.amazonaws.services.kms.model.DecryptResult;
import com.amazonaws.util.Base64;
import com.amazonaws.util.IOUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

public class DemoConsumer {

    private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]";
    private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking
    private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]";
    private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]";
    private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]";
    private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2...
    private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY);
    private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS);

    private static final AwsCrypto CRYPTO = new AwsCrypto();
    private static final AWSKMS KMS = AWSKMSClientBuilder.standard()
            .withRegion(REGION_NAME)
            .withCredentials(CREDENTIALS_PROVIDER).build();

    class Activity {
        String type;
        String version;
        String databaseActivityEvents;
        String key;
    }

    class ActivityEvent {
        @SerializedName("class") String _class;
        String clientApplication;
        String command;
        String commandText;
        String databaseName;
        String dbProtocol;
        String dbUserName;
        String endTime;
        String errorMessage;
        String exitCode;
        String logTime;
        String netProtocol;
        String objectName;
        String objectType;
        List<String> paramList;
        String pid;
        String remoteHost;
        String remotePort;
        String rowCount;
        String serverHost;
        String serverType;
        String serverVersion;
        String serviceName;
        String sessionId;
        String startTime;
        String statementId;
        String substatementId;
        String transactionId;
        String type;
    }

    class ActivityRecords {
        String type;
        String clusterId;
        String instanceId;
        List<ActivityEvent> databaseActivityEventList;
    }

    static class RecordProcessorFactory implements IRecordProcessorFactory {
        @Override
        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }

    static class RecordProcessor implements IRecordProcessor {

        private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
        private static final int PROCESSING_RETRIES_MAX = 10;
        private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
        private static final Gson GSON = new GsonBuilder().serializeNulls().create();

        private static final Cipher CIPHER;
        static {
            Security.insertProviderAt(new BouncyCastleProvider(), 1);
            try {
                CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC");
            } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        private long nextCheckpointTimeInMillis;

        @Override
        public void initialize(String shardId) {
        }

        @Override
        public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) {
            for (final Record record : records) {
                processSingleBlob(record.getData());
            }

            if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
                checkpoint(checkpointer);
                nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
            }
        }

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            if (reason == ShutdownReason.TERMINATE) {
                checkpoint(checkpointer);
            }
        }

        private void processSingleBlob(final ByteBuffer bytes) {
            try {
                // JSON $Activity
                final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class);

                // Base64.Decode
                final byte[] decoded = Base64.decode(activity.databaseActivityEvents);
                final byte[] decodedDataKey = Base64.decode(activity.key);

                Map<String, String> context = new HashMap<>();
                context.put("aws:rds:dbc-id", DBC_RESOURCE_ID);

                // Decrypt
                final DecryptRequest decryptRequest = new DecryptRequest()
                        .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context);
                final DecryptResult decryptResult = KMS.decrypt(decryptRequest);
                final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext()));

                // GZip Decompress
                final byte[] decompressed = decompress(decrypted);
                // JSON $ActivityRecords
                final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class);

                // Iterate throught $ActivityEvents
                for (final ActivityEvent event : activityRecords.databaseActivityEventList) {
                    System.out.println(GSON.toJson(event));
                }
            } catch (Exception e) {
                // Handle error.
                e.printStackTrace();
            }
        }

        private static byte[] decompress(final byte[] src) throws IOException {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
            GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
            return IOUtils.toByteArray(gzipInputStream);
        }

        private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
            for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) {
                try {
                    checkpointer.checkpoint();
                    break;
                } catch (ShutdownException se) {
                    // Ignore checkpoint if the processor instance has been shutdown (fail over).
                    System.out.println("Caught shutdown exception, skipping checkpoint." + se);
                    break;
                } catch (ThrottlingException e) {
                    // Backoff and re-attempt checkpoint upon transient failures
                    if (i >= (PROCESSING_RETRIES_MAX - 1)) {
                        System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e);
                        break;
                    } else {
                        System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e);
                    }
                } catch (InvalidStateException e) {
                    // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
                    System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e);
                    break;
                }
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    System.out.println("Interrupted sleep" + e);
                }
            }
        }
    }

    private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException {
        // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm
        final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"),
                "BC", "DataKey", "AES/GCM/NoPadding");
        try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded));
             final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
            IOUtils.copy(decryptingStream, out);
            return out.toByteArray();
        }
    }

    public static void main(String[] args) throws Exception {
        final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        final KinesisClientLibConfiguration kinesisClientLibConfiguration =
                new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId);
        kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST);
        kinesisClientLibConfiguration.withRegionName(REGION_NAME);
        final Worker worker = new Builder()
                .recordProcessorFactory(new RecordProcessorFactory())
                .config(kinesisClientLibConfiguration)
                .build();

        System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId);

        try {
            worker.run();
        } catch (Throwable t) {
            System.err.println("Caught throwable while processing data.");
            t.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }

    private static byte[] getByteArray(final ByteBuffer b) {
        byte[] byteArray = new byte[b.remaining()];
        b.get(byteArray);
        return byteArray;
    }
}
```

------
#### [ Python ]

```
import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3

REGION_NAME = '<region>'                    # us-east-1
RESOURCE_ID = '<external-resource-id>'      # cluster-ABCD123456
STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID  # aws-rds-das-cluster-ABCD123456

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT)

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt_payload(payload, data_key):
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    decrypted_plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
    return decrypted_plaintext


def decrypt_decompress(payload, key):
    decrypted = decrypt_payload(payload, key)
    return zlib.decompress(decrypted, zlib.MAX_WBITS + 16)


def main():
    session = boto3.session.Session()
    kms = session.client('kms', region_name=REGION_NAME)
    kinesis = session.client('kinesis', region_name=REGION_NAME)

    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    shard_iters = []
    for shard in response['StreamDescription']['Shards']:
        shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'],
                                                         ShardIteratorType='LATEST')
        shard_iters.append(shard_iter_response['ShardIterator'])

    while len(shard_iters) > 0:
        next_shard_iters = []
        for shard_iter in shard_iters:
            response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000)
            for record in response['Records']:
                record_data = record['Data']
                record_data = json.loads(record_data)
                payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
                data_key_decoded = base64.b64decode(record_data['key'])
                data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded,
                                                      EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
                print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']))
            if 'NextShardIterator' in response:
                next_shard_iters.append(response['NextShardIterator'])
        shard_iters = next_shard_iters


if __name__ == '__main__':
    main()
```

------

# 資料庫活動串流的 IAM 政策範例
<a name="DBActivityStreams.ManagingAccess"></a>

具有資料庫活動串流適當 AWS Identity and Access Management(IAM) 角色權限的任何使用者可以建立、啟動、停止和修改資料庫叢集的活動串流設定。這些動作會包含在串流的稽核日誌中。為了達到最佳合規實務，我們建議您不要將這些權限提供給 DBA。

您可以使用 IAM 政策來設定資料庫活動串流的存取。如需更多有關 Aurora 身分驗證的資訊，請參閱 [Amazon Aurora 的 Identity and access management](UsingWithRDS.IAM.md)。如需建立 IAM 政策的詳細資訊，請參閱 [建立並使用 IAM 政策進行 IAM 資料庫存取](UsingWithRDS.IAMDBAuth.IAMPolicy.md)。

**Example 允許資料庫活動串流設定的原則**  
若要提供使用者更精細的存取權來修改活動串流，請在 IAM 政策中使用服務特定操作內容金鑰 `rds:StartActivityStream` 與 `rds:StopActivityStream` 。以下 IAM 政策範例會允許使用者或角色設定活動串流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "ConfigureActivityStreams",
            "Effect": "Allow",
            "Action": [
                "rds:StartActivityStream",
                "rds:StopActivityStream"
            ],
            "Resource": "*"
        }
    ]
}
```

**Example 允許資料庫活動串流開始的原則**  
以下 IAM 政策範例會允許使用者或角色開始活動串流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Sid":"AllowStartActivityStreams",
            "Effect":"Allow",
            "Action":"rds:StartActivityStream",
            "Resource":"*"
        }
    ]
}
```

**Example 允許資料庫活動串流停用的原則**  
以下 IAM 政策範例會允許使用者或角色停止活動串流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Sid":"AllowStopActivityStreams",
            "Effect":"Allow",
            "Action":"rds:StopActivityStream",
            "Resource":"*"
        }
     ]
}
```

**Example 拒絕資料庫活動串流開始的原則**  
以下 IAM 政策範例會防止使用者或角色開始活動串流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Sid":"DenyStartActivityStreams",
            "Effect":"Deny",
            "Action":"rds:StartActivityStream",
            "Resource":"*"
        }
     ]
}
```

**Example 拒絕資料庫活動串流停用的原則**  
以下 IAM 政策範例會防止使用者或角色停止活動串流。    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement":[
        {
            "Sid":"DenyStopActivityStreams",
            "Effect":"Deny",
            "Action":"rds:StopActivityStream",
            "Resource":"*"
        }
    ]
}
```