

# データベースアクティビティストリーミングのモニタリング
<a name="DBActivityStreams.Monitoring"></a>

データベースアクティビティストリーミングは、アクティビティをモニタリングして報告します。アクティビティのストリーミングは、収集後、Amazon Kinesis に送信されます。Kinesis から、アクティビティストリーミングをモニタリングしたり、他のサービスやアプリケーションがアクティビティストリーミングを使用して詳細な分析を行うことができます。基礎となる Kinesis ストリーム名は、AWS CLI コマンドの `describe-db-clusters` または RDS API `DescribeDBClusters` オペレーションを使用して検索できます。

Aurora は、Kinesis ストリーミングを次のように管理します。
+ Aurora は、24 時間の保存期間の Kinesis ストリーミングを自動的に作成します。
+  Aurora は、必要に応じて Kinesis ストリーミングをスケールします。
+  データベースアクティビティストリーミングを停止したり、DB クラスターを削除したりすると、Aurora によって Kinesis ストリームが削除されます。

以下のカテゴリのアクティビティがモニタリングされ、アクティビティストリーミングの監査ログに追加されます。
+ **SQL コマンド** - すべての SQL コマンドに加えて、準備済みステートメント、組み込み関数、および PL/SQL の関数も監査されます。ストアドプロシージャへの呼び出しが監査されます。ストアドプロシージャまたは関数内で発行された SQL ステートメントも監査されます。
+ **他のデータベース情報** - モニタリングされるアクティビティには、完全な SQL ステートメント、DML コマンドから影響を受ける行の行数、アクセスされたオブジェクト、および一意のデータベース名が含まれます。Aurora PostgreSQL の場合、データベースアクティビティストリーミングは、バインド可変とストアドプロシージャパラメータもモニタリングします。
**重要**  
各ステートメントの完全な SQL テキストは、機密データを含むアクティビティストリーミング監査ログに表示されます。ただし、Aurora が次の SQL ステートメントのようにコンテキストから判断できる場合、データベースユーザーのパスワードは訂正されます。  

  ```
  ALTER ROLE role-name WITH password
  ```
+ **接続情報** - モニタリングされるアクティビティには、セッションとネットワークの情報、サーバープロセス ID、および終了コードなどがあります。

DB インスタンスのモニタリング中にアクティビティストリーミングに障害が発生した場合は、RDS イベントを通じて通知されます。

以下のセクションでは、データベースアクティビティストリームへのアクセス、監査、処理を行うことができます。

**Topics**
+ [Amazon Kinesis からのアクティビティストリーミングへのアクセス](DBActivityStreams.KinesisAccess.md)
+ [データベースアクティビティストリームの監査ログコンテンツおよび例](DBActivityStreams.AuditLog.md)
+ [データベースアクティビティストリームの databaseActivityEventList JSON 配列](DBActivityStreams.AuditLog.databaseActivityEventList.md)
+ [AWS SDK を使用したデータベースアクティビティストリーミングの処理](DBActivityStreams.CodeExample.md)

# Amazon Kinesis からのアクティビティストリーミングへのアクセス
<a name="DBActivityStreams.KinesisAccess"></a>

DB クラスターのアクティビティストリーミングを有効にすると、Kinesis ストリーミングが作成されます。データベースのアクティビティは、Kinesis からリアルタイムでモニタリングできます。データベースのアクティビティを詳細に分析するには、Kinesis ストリーミングをコンシューマーアプリケーションに接続します。また、IBM の Security Guardium または Imperva の SecureSphere Database Audit and Protection などのコンプライアンス管理アプリケーションにストリーミングを接続することもできます。

Kinesis ストリームには、RDS コンソールまたは Kinesis コンソールからアクセスできます。

**RDS コンソールを使用して、Kinesis からアクティビティストリーミングにアクセスするには**

1. Amazon RDS コンソール ([https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/)) を開きます。

1. ナビゲーションペインで、[**データベース**] を選択します。

1. アクティビティストリームを開始する DB クラスターを選択します。

1. **[設定]** を選択します。

1. **[Database activity stream]** (データベースアクティビティストリーム) で、**[Kinesis stream]** (Kinesis ストリーム) の下のリンクを選択します。

1. データベースアクティビティの観察を開始するには、Kinesis コンソールで **[Monitoring]** (モニタリング) を選択します。

**Kinesis コンソールを使用して、Kinesis からアクティビティストリーミングにアクセスするには**

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. Kinesis ストリーミングのリストからアクティビティストリーミングを選択します。

   アクティビティストリーミングの名前には、プレフィックス `aws-rds-das-cluster-` が付き、その後に DB クラスターのリソース ID が続きます。次に例を示します。

   ```
   aws-rds-das-cluster-NHVOV4PCLWHGF52NP
   ```

   Amazon RDS コンソールを使用して DB クラスターのリソース ID を検索するには、データベースのリストから DB クラスターを選択した上で、[**設定**] タブを選択します。

   AWS CLI を使用して、アクティビティストリーミングの Kinesis ストリームの完全な名前を検索するには、[describe-db-clusters](https://docs.aws.amazon.com/cli/latest/reference/rds/describe-db-clusters.html) CLI リクエストを使用し、そのレスポンスに含まれる `ActivityStreamKinesisStreamName` の値を書き留めます。

1. データベースアクティビティの観察をスタートするには、[**モニタリング**] を選択します。

Amazon Kinesis の使用の詳細については、「[Amazon Kinesis Data Streams とは](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)」を参照してください。

# データベースアクティビティストリームの監査ログコンテンツおよび例
<a name="DBActivityStreams.AuditLog"></a>

モニタリングされるイベントは、データベースアクティビティストリーミングでは JSON 文字列として表されます。この構造は、`DatabaseActivityMonitoringRecord` を含む JSON オブジェクトで構成されます。このオブジェクトには、アクティビティイベントの `databaseActivityEventList` 配列が含まれます。

**注記**  
データベースアクティビティストリームの場合、`paramList` JSON 配列には Hibernate アプリケーションからの null 値は含まれません。

**Topics**
+ [アクティビティストリーミングの監査ログの例](#DBActivityStreams.AuditLog.Examples)
+ [DatabaseActivityMonitoringRecords JSON オブジェクト](#DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords)
+ [databaseActivityEvents JSON オブジェクト](#DBActivityStreams.AuditLog.databaseActivityEvents)

## アクティビティストリーミングの監査ログの例
<a name="DBActivityStreams.AuditLog.Examples"></a>

以下に、アクティビティイベントレコードの復号されたサンプルの JSON 監査ログを示します。

**Example Aurora PostgreSQL のアクティビティイベントレコード**  
次のアクティビティイベントレコードは、psql クライアント (`clientApplication`) による `CONNECT` SQL ステートメント (`command`) を使用したログインを示しています。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-10-30 00:39:49.940668+00",
          "logTime": "2019-10-30 00:39:49.990579+00",
          "statementId": 1,
          "substatementId": 1,
          "objectType": null,
          "command": "CONNECT",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "49804",
          "sessionId": "5ce5f7f0.474b",
          "rowCount": null,
          "commandText": null,
          "paramList": [],
          "pid": 18251,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "MISC",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Aurora MySQL CONNECT SQL ステートメントのアクティビティイベントレコード**  
次のアクティビティイベントレコードは、mysql クライアント (`CONNECT`) による `command` SQL ステートメント (`clientApplication`) を使用したログインを示しています。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:13.267214+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"rdsadmin",
      "databaseName":"",
      "remoteHost":"localhost",
      "remotePort":"11053",
      "command":"CONNECT",
      "commandText":"",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"",
      "statementId":0,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725121",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:13.267207+00",
      "endTime":"2020-05-22 18:07:13.267213+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```

**Example Aurora PostgreSQL CREATE TABLE ステートメントのアクティビティイベントレコード**  
次の例は、Aurora PostgreSQL の `CREATE TABLE` イベントを示しています。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:36:54.403455+00",
          "logTime": "2019-05-24 00:36:54.494235+00",
          "statementId": 2,
          "substatementId": 1,
          "objectType": null,
          "command": "CREATE TABLE",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": null,
          "commandText": "create table my_table (id serial primary key, name varchar(32));",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "DDL",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Aurora MySQL CREATE TABLE ステートメントのアクティビティイベントレコード**  
次の例は、Aurora MySQL の `CREATE TABLE` ステートメントを示しています。オペレーションは、2 つの個別のイベントレコードとして表されます。1 つのイベントに `"class":"MAIN"` があります。他方のイベントには、`"class":"AUX"` があります。メッセージは任意の順序で到着する可能性があります。`logTime` イベントの `MAIN` フィールドは、常に対応する `logTime` イベントの `AUX` フィールドよりも前にあります。  
次の例は、`class` の値が `MAIN` のイベントを示しています。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.250221+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"CREATE TABLE test1 (id INT)",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.250222+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 次の例は、`class` の値 が `AUX` を持つ対応するイベントを示しています。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.247182+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"CREATE",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.247182+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

**Example Aurora PostgreSQL SELECT ステートメントのアクティビティイベントレコード**  
次の例は、 `SELECT` イベントを示しています。  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:39:49.920564+00",
          "logTime": "2019-05-24 00:39:49.940668+00",
          "statementId": 6,
          "substatementId": 1,
          "objectType": "TABLE",
          "command": "SELECT",
          "objectName": "public.my_table",
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": 10,
          "commandText": "select * from my_table;",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "READ",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

```
{
    "type": "DatabaseActivityMonitoringRecord",
    "clusterId": "",
    "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q",
    "databaseActivityEventList": [
        {
            "class": "TABLE",
            "clientApplication": "Microsoft SQL Server Management Studio - Query",
            "command": "SELECT",
            "commandText": "select * from [testDB].[dbo].[TestTable]",
            "databaseName": "testDB",
            "dbProtocol": "SQLSERVER",
            "dbUserName": "test",
            "endTime": null,
            "errorMessage": null,
            "exitCode": 1,
            "logTime": "2022-10-06 21:24:59.9422268+00",
            "netProtocol": null,
            "objectName": "TestTable",
            "objectType": "TABLE",
            "paramList": null,
            "pid": null,
            "remoteHost": "local machine",
            "remotePort": null,
            "rowCount": 0,
            "serverHost": "172.31.30.159",
            "serverType": "SQLSERVER",
            "serverVersion": "15.00.4073.23.v1.R1",
            "serviceName": "sqlserver-ee",
            "sessionId": 62,
            "startTime": null,
            "statementId": "0x03baed90412f564fad640ebe51f89b99",
            "substatementId": 1,
            "transactionId": "4532935",
            "type": "record",
            "engineNativeAuditFields": {
                "target_database_principal_id": 0,
                "target_server_principal_id": 0,
                "target_database_principal_name": "",
                "server_principal_id": 2,
                "user_defined_information": "",
                "response_rows": 0,
                "database_principal_name": "dbo",
                "target_server_principal_name": "",
                "schema_name": "dbo",
                "is_column_permission": true,
                "object_id": 581577110,
                "server_instance_name": "EC2AMAZ-NFUJJNO",
                "target_server_principal_sid": null,
                "additional_information": "",
                "duration_milliseconds": 0,
                "permission_bitmask": "0x00000000000000000000000000000001",
                "data_sensitivity_information": "",
                "session_server_principal_name": "test",
                "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109",
                "audit_schema_version": 1,
                "database_principal_id": 1,
                "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000",
                "user_defined_event_id": 0,
                "host_name": "EC2AMAZ-NFUJJNO"
            }
        }
    ]
}
```

**Example Aurora MySQL SELECT ステートメントのアクティビティイベントレコード**  
次の例は、`SELECT` イベントを示しています。  
 次の例は、`class` の値が `MAIN` のイベントを示しています。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986467+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"SELECT * FROM test1 WHERE id < 28",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"726571",
      "rowCount":2,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986467+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 次の例は、`class` の値 が `AUX` を持つ対応するイベントを示しています。  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986399+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"READ",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"726571",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986399+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

## DatabaseActivityMonitoringRecords JSON オブジェクト
<a name="DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords"></a>

データベースアクティビティイベントレコードは、次の情報を含む JSON オブジェクトにあります。


****  

| JSON フィールド | データ型 | 説明 | 
| --- | --- | --- | 
|  `type`  | 文字列 |  JSON レコードのタイプ。値は `DatabaseActivityMonitoringRecords` です。  | 
| version | 文字列 |  データベースアクティビティモニタリングレコードのバージョン。生成されたデータベースアクティビティレコードのバージョンは、次のように、DB クラスターのエンジンのバージョンによって異なります。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.html)次のフィールドは、すべてバージョン 1.0 とバージョン 1.1 の両方にあります。ただし、明記されている場合を除きます。 | 
|  [databaseActivityEvents](#DBActivityStreams.AuditLog.databaseActivityEvents)  | string |  アクティビティイベントを含む JSON オブジェクト。  | 
| key | string | [databaseActivityEventList JSON 配列](DBActivityStreams.AuditLog.databaseActivityEventList.md) の復号に使用する暗号化キー。 | 

## databaseActivityEvents JSON オブジェクト
<a name="DBActivityStreams.AuditLog.databaseActivityEvents"></a>

`databaseActivityEvents` JSON オブジェクトには、次の情報が含まれています。

### JSON レコードの最上位フィールド
<a name="DBActivityStreams.AuditLog.topLevel"></a>

 監査ログの各イベントは、JSON 形式のレコード内にラップされます。このレコードには、次のフィールドが含まれます。

**type**  
 このフィールドは常に値 `DatabaseActivityMonitoringRecords` を持ちます。

**バージョン**  
 このフィールドは、データベースアクティビティストリーミングデータプロトコルまたはコントラクトのバージョンを表します。これは、使用可能なフィールドを定義します。  
バージョン 1.0 は、Aurora PostgreSQL バージョン 10.7 および 11.4 の元のデータアクティビティストリーミングのサポートを表します。バージョン 1.1 は、Aurora PostgreSQL バージョン 10.10 以降、および Aurora PostgreSQL 11.5 以降のデータアクティビティストリーミングのサポートを表します。バージョン 1.1 には、追加のフィールド `errorMessage` と `startTime` が含まれています。バージョン 1.2 は、Aurora MySQL 2.08 以降のデータアクティビティストリーミングのサポートを表します。バージョン 1.2 には、追加のフィールド `endTime` と `transactionId` が含まれています。

**databaseActivityEvents**  
 1 つ以上のアクティビティイベントを表す暗号化された文字列。これは、base64 バイト配列として表されます。文字列を復号すると、このセクションの例に示すフィールドを持つ JSON 形式のレコードが生成されます。

**key**  
 `databaseActivityEvents` 文字列の暗号化に使用される暗号化されたデータキー。これは、データベースアクティビティストリーミングをスタートしたときに指定した AWS KMS key と同じです。

 以下の例は、このレコードの形式を示しています。

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents":"encrypted audit records",
  "key":"encrypted key"
}
```

`databaseActivityEvents` フィールドの内容を復号化するには、次のステップを実行します。

1.  データベースアクティビティストリーミングをスタートするときに指定した KMS キーを使用して、`key` JSON フィールドの値を復号します。これにより、データ暗号化キーがクリアテキストで返されます。

1.  `databaseActivityEvents` JSON フィールドの値を Base64 デコードして、監査ペイロードの暗号化テキストをバイナリ形式で取得します。

1.  初期のステップでデコードしたデータ暗号化キーを使用して、バイナリ暗号文を復号化します。

1.  復号化されたペイロードを解凍します。
   +  暗号化されたペイロードは、`databaseActivityEvents` フィールドにあります。
   +  `databaseActivityEventList` フィールドには、監査レコードの配列が含まれます。配列内の `type` フィールドには、 `record` または `heartbeat` を使用できます。

監査ログのアクティビティイベントレコードは、次の情報を含む JSON オブジェクトです。


****  

| JSON フィールド | データ型 | 説明 | 
| --- | --- | --- | 
|  `type`  | 文字列 |  JSON レコードのタイプ。値は `DatabaseActivityMonitoringRecord` です。  | 
| clusterId | 文字列 | DB クラスターリソース識別子。これは DB クラスター属性 DbClusterResourceId に対応します。 | 
| instanceId | 文字列 | DB インスタンスのリソース識別子。DB インスタンス属性 DbiResourceId に対応します。 | 
|  [databaseActivityEventList JSON 配列](DBActivityStreams.AuditLog.databaseActivityEventList.md)   | 文字列 |  アクティビティ監査レコードまたはハートビートメッセージの配列。  | 

# データベースアクティビティストリームの databaseActivityEventList JSON 配列
<a name="DBActivityStreams.AuditLog.databaseActivityEventList"></a>

監査ログのペイロードは、暗号化された `databaseActivityEventList` JSON 配列です。以下の表に、監査ログの復号された `DatabaseActivityEventList` 配列内の各アクティビティイベントのフィールドをアルファベット順に示します。Aurora PostgreSQL または Aurora MySQL を使用するかどうかによって、フィールドは異なります。データベースエンジンに適用される表を参照してください。

**重要**  
イベントの構造は変わる場合があります。Aurora では、将来、アクティビティイベントに新しいフィールドが追加される可能性があります。JSON データを分析するアプリケーションでは、コードが未知のフィールド名に対して無視または適切なアクションを実行できることを確認します。

## Aurora PostgreSQL の databaseActivityEventList フィールド
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.apg"></a>

Aurora PostgreSQL の `databaseActivityEventList` フィールドは次のとおりです。


| フィールド | データ型 | 説明 | 
| --- | --- | --- | 
| class | 文字列 |  アクティビティイベントのクラス。Aurora PostgreSQL の有効な値は以下のとおりです。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | 文字列 | クライアントのレポートどおりにクライアントが接続に使用していたアプリケーション。クライアントはこの情報を指定する必要はないため、値は null でも問題ありません。 | 
| command | 文字列 | SQL コマンドの名前 (コマンドの詳細は含まない)。 | 
| commandText | 文字列 |  ユーザーによって渡された実際の SQL ステートメント。Aurora PostgreSQL の場合、値は元の SQL ステートメントと同じです。このフィールドは、接続レコードまたは切断レコードを除くすべてのタイプのレコードに使用されます。この場合、値は null です。  各ステートメントの完全な SQL テキストは、機密データを含むアクティビティストリーミング監査ログに表示されます。ただし、Aurora が次の SQL ステートメントのようにコンテキストから判断できる場合、データベースユーザーのパスワードは編集されます。 <pre>ALTER ROLE role-name WITH password</pre>   | 
| databaseName | 文字列 | ユーザーが接続したデータベース。 | 
| dbProtocol | 文字列 | データベースプロトコル (例: Postgres 3.0)。 | 
| dbUserName | 文字列 | クライアントが認証したデータベースユーザー。 | 
| errorMessage(バージョン 1.1 のデータベースアクティビティレコードのみ) | 文字列 |  エラーがあった場合、このフィールドには DB サーバーによって生成されるはずのエラーメッセージが表示されます。エラーにならなかった通常のステートメントの場合、`errorMessage` の値は null です。 エラーは、重要度が `ERROR` 以上の、クライアントで表示される PostgreSQL エラーログイベントを生成するアクティビティとして定義されます。詳細については、「[PostgreSQL メッセージの重要度](https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-SEVERITY-LEVELS)」を参照してください。例えば、構文エラーやクエリのキャンセルは、エラーメッセージを生成します。 バックグラウンドのチェックポインタープロセスエラーなどの内部の PostgreSQL サーバーエラーは、エラーメッセージを生成しません。ただし、ログの重要度レベルの設定に関係なく、このようなイベントのレコードは引き続き出力されます。これにより、攻撃者がログをオフにして検出を回避することを防ぎます。 `exitCode` フィールドも参照してください。  | 
| exitCode | int | セッション終了レコードに使用される値。clean exit では、終了コードが含まれます。エラーシナリオによっては、終了コードが常に得られるとは限りません。例えば、PostgreSQL で exit() を実行している場合や、オペレーターが kill -9 などのコマンドを実行している場合があります。エラーが発生した場合は、[PostgreSQL エラーコード](https://www.postgresql.org/docs/current/errcodes-appendix.html)にリストされている SQL エラーコード (`SQLSTATE`) が `exitCode` フィールドに表示されます。`errorMessage` フィールドも参照してください。 | 
| logTime | 文字列 | 監査コードパスに記録されているタイムスタンプ。これは、SQL ステートメントの実行終了時刻を表します。startTime フィールドも参照してください。 | 
| netProtocol | 文字列 | ネットワーク通信プロトコル。 | 
| objectName | 文字列 | データベースオブジェクトの名前 (SQL ステートメントを使用している場合)。このフィールドは、SQL ステートメントがデータベースオブジェクトに対して機能する場合にのみ使用されます。SQL ステートメントがオブジェクトに対して機能していない場合、この値は null です。 | 
| objectType | 文字列 | テーブル、インデックス、ビューなどのデータベースオブジェクトタイプ。このフィールドは、SQL ステートメントがデータベースオブジェクトに対して機能する場合にのみ使用されます。SQL ステートメントがオブジェクトに対して機能していない場合、この値は null です。有効な値には次のようなものがあります。[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) | 
| paramList | 文字列 | SQL ステートメントに渡されるカンマ区切りのパラメータの配列。SQL ステートメントにパラメータがない場合、この値は空の配列です。 | 
| pid | int | クライアント接続を処理するために割り当てられているバックエンドプロセスのプロセス ID。 | 
| remoteHost | 文字列 | クライアントの IP アドレスまたはホスト名。Aurora PostgreSQL では、どちらが使用されるかは、データベースの log\$1hostname パラメータ設定によって異なります。remoteHost 値には、rdsadmin ユーザーからのアクティビティを示す [local] と localhost も含まれます。 | 
| remotePort | 文字列 | クライアントのポート番号。 | 
| rowCount | int | SQL ステートメントによって影響を受けた、または取得されたテーブルの行数。このフィールドは、データ操作言語 (DML) ステートメントである SQL ステートメントでのみ使用されます。SQL ステートメントが DML ステートメントではない場合、この値は null です。 | 
| serverHost | 文字列 | データベースサーバーのホスト IP アドレス。serverHost 値には、rdsadmin ユーザーからのアクティビティを示す [local] と localhost も含まれます。 | 
| serverType | 文字列 | データベースサーバーのタイプ (例: PostgreSQL)。 | 
| serverVersion | 文字列 | データベースサーバーのバージョン (例: Aurora PostgreSQL の 2.3.1)。 | 
| serviceName | 文字列 | サービスの名前 (例: Amazon Aurora PostgreSQL-Compatible edition)。 | 
| sessionId | int | 一意の疑似セッション識別子。 | 
| sessionId | int | 一意の疑似セッション識別子。 | 
| startTime(バージョン 1.1 のデータベースアクティビティレコードのみ) | 文字列 |  SQL ステートメントの実行がスタートされた時刻。 SQL ステートメントのおおよその実行時間を計算するには、`logTime - startTime` を使用します。`logTime` フィールドも参照してください。  | 
| statementId | int | クライアントの SQL ステートメントの識別子。カウンターはセッションレベルであり、クライアントによって SQL ステートメントが入力される度に増加します。 | 
| substatementId | int | SQL サブステートメントの識別子。この値は、statementId フィールドで識別された各 SQL ステートメントに含まれるサブステートメントをカウントします。 | 
| type | 文字列 | イベントタイプ。有効な値は record または heartbeat です。 | 

## Aurora MySQL の databaseActivityEventList フィールド
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.ams"></a>

Aurora MySQL の `databaseActivityEventList` フィールドは次のとおりです。


| フィールド | データ型 | 説明 | 
| --- | --- | --- | 
| class | 文字列 |  アクティビティイベントのクラス。 Aurora MySQL の有効な値は以下のとおりです。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | 文字列 | クライアントのレポートどおりにクライアントが接続に使用していたアプリケーション。クライアントはこの情報を指定する必要はないため、値は null でも問題ありません。 | 
| command | 文字列 |  SQL ステートメントの一般的なカテゴリ。このフィールドの値は `class` の値によって異なります。 `class` が `MAIN` の場合の値には、次の値が含まれます。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) `class` が `AUX` の場合の値には、次の値が含まれます。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| commandText | 文字列 |  `class` の `MAIN` 値を持つイベントの場合、このフィールドはユーザーが渡した実際の SQL ステートメントを表します。このフィールドは、接続レコードまたは切断レコードを除くすべてのタイプのレコードに使用されます。この場合、値は null です。  `class` の `AUX` 値を持つイベントの場合、このフィールドには、イベントに関係するオブジェクトに関する補足情報が含まれます。 Aurora MySQL では、引用符などの文字の前には、エスケープ文字を表すバックスラッシュが付きます。  各ステートメントの SQL テキスト全体が、機密データを含む監査ログに表示されます。ただし、Aurora が次の SQL ステートメントのようにコンテキストから判断できる場合、データベースユーザーのパスワードは編集されます。 <pre>mysql> SET PASSWORD = 'my-password';</pre> セキュリティ上のベストプラクティスとして、ここに示されているプロンプト以外のパスワードを指定してください。   | 
| databaseName | 文字列 | ユーザーが接続したデータベース。 | 
| dbProtocol | 文字列 | データベースプロトコル。現在、この値は常に Aurora MySQL の MySQL です。 | 
| dbUserName | 文字列 | クライアントが認証したデータベースユーザー。 | 
| endTime(バージョン 1.2 のデータベースアクティビティレコードのみ) | 文字列 |  SQL ステートメントの実行が終了した時刻。これは、協定世界時 (UTC) 形式で表されます。 SQL ステートメントの実行時間を計算するには、`endTime - startTime` を使用します。`startTime` フィールドも参照してください。  | 
| errorMessage(バージョン 1.1 のデータベースアクティビティレコードのみ) | 文字列 |  エラーがあった場合、このフィールドには DB サーバーによって生成されるはずのエラーメッセージが表示されます。エラーにならなかった通常のステートメントの場合、`errorMessage` の値は null です。 エラーは、重要度が `ERROR` 以上の、クライアントで表示される MySQL エラーログイベントを生成するアクティビティとして定義されます。詳細については、*MySQL リファレンスマニュアル*の「[エラーログ](https://dev.mysql.com/doc/refman/5.7/en/error-log.html)」を参照してください。例えば、構文エラーやクエリのキャンセルは、エラーメッセージを生成します。 バックグラウンドのチェックポインタープロセスエラーなどの内部の MySQL サーバーエラーは、エラーメッセージを生成しません。ただし、ログの重要度レベルの設定に関係なく、このようなイベントのレコードは引き続き出力されます。これにより、攻撃者がログをオフにして検出を回避することを防ぎます。 `exitCode` フィールドも参照してください。  | 
| exitCode | int | セッション終了レコードに使用される値。clean exit では、終了コードが含まれます。エラーシナリオによっては、終了コードが常に得られるとは限りません。このような場合、この値はゼロになるか、空白になる可能性があります。 | 
| logTime | 文字列 | 監査コードパスに記録されているタイムスタンプ。これは、協定世界時 (UTC) 形式で表されます。ステートメントの期間を計算する最も正確な方法については、startTime および endTime フィールドを参照してください。 | 
| netProtocol | 文字列 | ネットワーク通信プロトコル。現在、この値は常に Aurora MySQL の TCP です。 | 
| objectName | 文字列 | データベースオブジェクトの名前 (SQL ステートメントを使用している場合)。このフィールドは、SQL ステートメントがデータベースオブジェクトに対して機能する場合にのみ使用されます。SQL ステートメントがオブジェクトに対して機能していない場合、この値は空白になります。オブジェクトの完全修飾名を作成するには、databaseName と objectName を結合します。クエリに複数のオブジェクトが含まれる場合、このフィールドはカンマで区切られた名前のリストにすることができます。 | 
| objectType | 文字列 |  テーブル、インデックスなどのデータベースオブジェクトタイプ。このフィールドは、SQL ステートメントがデータベースオブジェクトに対して機能する場合にのみ使用されます。SQL ステートメントがオブジェクトに対して機能していない場合、この値は null です。 Aurora MySQL の有効な値には次のようなものがあります。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| paramList | 文字列 | このフィールドは Aurora MySQL には使用されず、常に null です。 | 
| pid | int | クライアント接続を処理するために割り当てられているバックエンドプロセスのプロセス ID。データベースサーバー再起動すると、pid が変更され、statementId フィールドのカウンターが初期からやり直されます。 | 
| remoteHost | 文字列 | SQL ステートメントを発行したクライアントの IP アドレスまたはホスト名。Aurora MySQL では、どちらが使用されるかは、データベースの skip\$1name\$1resolve パラメータ設定によって異なります。この値 localhost は、rdsadmin スペシャルユーザーからのアクティビティを示します。 | 
| remotePort | 文字列 | クライアントのポート番号。 | 
| rowCount | int | SQL 文によって返された行数。例えば、SELECT ステートメントが 10 行を返す場合、rowCount は 10 になります。INSERT ステートメントまたは UPDATE ステートメントの場合、RowCount は 0 です。 | 
| serverHost | 文字列 | データベースサーバーインスタンス識別子。 | 
| serverType | 文字列 | データベースサーバーのタイプ (例: MySQL)。 | 
| serverVersion | 文字列 | データベースサーバーのバージョン。現在、この値は常に Aurora MySQL の MySQL 5.7.12 です。 | 
| serviceName | 文字列 | サービスの名前。現在、この値は常に Aurora MySQL の Amazon Aurora MySQL です。 | 
| sessionId | int | 一意の疑似セッション識別子。 | 
| startTime(バージョン 1.1 のデータベースアクティビティレコードのみ) | 文字列 |  SQL ステートメントの実行がスタートされた時刻。これは、協定世界時 (UTC) 形式で表されます。 SQL ステートメントの実行時間を計算するには、`endTime - startTime` を使用します。`endTime` フィールドも参照してください。  | 
| statementId | int | クライアントの SQL ステートメントの識別子。カウンターは、クライアントによって SQL ステートメントが入力される度に増加します。DB インスタンスが再起動されると、カウンターがリセットされます。 | 
| substatementId | int | SQL サブステートメントの識別子。この値は、クラス MAIN を持つイベントの場合は 1、クラス AUX を持つイベントの場合は 2 です。statementId フィールドを使用して、同じステートメントによって生成されたすべてのイベントを識別します。 | 
| transactionId(バージョン 1.2 のデータベースアクティビティレコードのみ) | int | トランザクションの識別子。 | 
| type | 文字列 | イベントタイプ。有効な値は record または heartbeat です。 | 

# AWS SDK を使用したデータベースアクティビティストリーミングの処理
<a name="DBActivityStreams.CodeExample"></a>

アクティビティストリーミングをプログラムで処理するには、AWS SDK を使用します。以下は、Kinesis データストリームを処理する方法で完全に機能する Java および Python の例です。

------
#### [ Java ]

```
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.Security;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;

import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.SecretKeySpec;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CryptoInputStream;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kms.AWSKMS;
import com.amazonaws.services.kms.AWSKMSClientBuilder;
import com.amazonaws.services.kms.model.DecryptRequest;
import com.amazonaws.services.kms.model.DecryptResult;
import com.amazonaws.util.Base64;
import com.amazonaws.util.IOUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

public class DemoConsumer {

    private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]";
    private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking
    private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]";
    private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]";
    private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]";
    private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2...
    private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY);
    private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS);

    private static final AwsCrypto CRYPTO = new AwsCrypto();
    private static final AWSKMS KMS = AWSKMSClientBuilder.standard()
            .withRegion(REGION_NAME)
            .withCredentials(CREDENTIALS_PROVIDER).build();

    class Activity {
        String type;
        String version;
        String databaseActivityEvents;
        String key;
    }

    class ActivityEvent {
        @SerializedName("class") String _class;
        String clientApplication;
        String command;
        String commandText;
        String databaseName;
        String dbProtocol;
        String dbUserName;
        String endTime;
        String errorMessage;
        String exitCode;
        String logTime;
        String netProtocol;
        String objectName;
        String objectType;
        List<String> paramList;
        String pid;
        String remoteHost;
        String remotePort;
        String rowCount;
        String serverHost;
        String serverType;
        String serverVersion;
        String serviceName;
        String sessionId;
        String startTime;
        String statementId;
        String substatementId;
        String transactionId;
        String type;
    }

    class ActivityRecords {
        String type;
        String clusterId;
        String instanceId;
        List<ActivityEvent> databaseActivityEventList;
    }

    static class RecordProcessorFactory implements IRecordProcessorFactory {
        @Override
        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }

    static class RecordProcessor implements IRecordProcessor {

        private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
        private static final int PROCESSING_RETRIES_MAX = 10;
        private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
        private static final Gson GSON = new GsonBuilder().serializeNulls().create();

        private static final Cipher CIPHER;
        static {
            Security.insertProviderAt(new BouncyCastleProvider(), 1);
            try {
                CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC");
            } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        private long nextCheckpointTimeInMillis;

        @Override
        public void initialize(String shardId) {
        }

        @Override
        public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) {
            for (final Record record : records) {
                processSingleBlob(record.getData());
            }

            if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
                checkpoint(checkpointer);
                nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
            }
        }

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            if (reason == ShutdownReason.TERMINATE) {
                checkpoint(checkpointer);
            }
        }

        private void processSingleBlob(final ByteBuffer bytes) {
            try {
                // JSON $Activity
                final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class);

                // Base64.Decode
                final byte[] decoded = Base64.decode(activity.databaseActivityEvents);
                final byte[] decodedDataKey = Base64.decode(activity.key);

                Map<String, String> context = new HashMap<>();
                context.put("aws:rds:dbc-id", DBC_RESOURCE_ID);

                // Decrypt
                final DecryptRequest decryptRequest = new DecryptRequest()
                        .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context);
                final DecryptResult decryptResult = KMS.decrypt(decryptRequest);
                final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext()));

                // GZip Decompress
                final byte[] decompressed = decompress(decrypted);
                // JSON $ActivityRecords
                final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class);

                // Iterate throught $ActivityEvents
                for (final ActivityEvent event : activityRecords.databaseActivityEventList) {
                    System.out.println(GSON.toJson(event));
                }
            } catch (Exception e) {
                // Handle error.
                e.printStackTrace();
            }
        }

        private static byte[] decompress(final byte[] src) throws IOException {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
            GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
            return IOUtils.toByteArray(gzipInputStream);
        }

        private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
            for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) {
                try {
                    checkpointer.checkpoint();
                    break;
                } catch (ShutdownException se) {
                    // Ignore checkpoint if the processor instance has been shutdown (fail over).
                    System.out.println("Caught shutdown exception, skipping checkpoint." + se);
                    break;
                } catch (ThrottlingException e) {
                    // Backoff and re-attempt checkpoint upon transient failures
                    if (i >= (PROCESSING_RETRIES_MAX - 1)) {
                        System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e);
                        break;
                    } else {
                        System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e);
                    }
                } catch (InvalidStateException e) {
                    // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
                    System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e);
                    break;
                }
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    System.out.println("Interrupted sleep" + e);
                }
            }
        }
    }

    private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException {
        // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm
        final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"),
                "BC", "DataKey", "AES/GCM/NoPadding");
        try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded));
             final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
            IOUtils.copy(decryptingStream, out);
            return out.toByteArray();
        }
    }

    public static void main(String[] args) throws Exception {
        final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        final KinesisClientLibConfiguration kinesisClientLibConfiguration =
                new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId);
        kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST);
        kinesisClientLibConfiguration.withRegionName(REGION_NAME);
        final Worker worker = new Builder()
                .recordProcessorFactory(new RecordProcessorFactory())
                .config(kinesisClientLibConfiguration)
                .build();

        System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId);

        try {
            worker.run();
        } catch (Throwable t) {
            System.err.println("Caught throwable while processing data.");
            t.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }

    private static byte[] getByteArray(final ByteBuffer b) {
        byte[] byteArray = new byte[b.remaining()];
        b.get(byteArray);
        return byteArray;
    }
}
```

------
#### [ Python ]

```
import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3

REGION_NAME = '<region>'                    # us-east-1
RESOURCE_ID = '<external-resource-id>'      # cluster-ABCD123456
STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID  # aws-rds-das-cluster-ABCD123456

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT)

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt_payload(payload, data_key):
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    decrypted_plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
    return decrypted_plaintext


def decrypt_decompress(payload, key):
    decrypted = decrypt_payload(payload, key)
    return zlib.decompress(decrypted, zlib.MAX_WBITS + 16)


def main():
    session = boto3.session.Session()
    kms = session.client('kms', region_name=REGION_NAME)
    kinesis = session.client('kinesis', region_name=REGION_NAME)

    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    shard_iters = []
    for shard in response['StreamDescription']['Shards']:
        shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'],
                                                         ShardIteratorType='LATEST')
        shard_iters.append(shard_iter_response['ShardIterator'])

    while len(shard_iters) > 0:
        next_shard_iters = []
        for shard_iter in shard_iters:
            response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000)
            for record in response['Records']:
                record_data = record['Data']
                record_data = json.loads(record_data)
                payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
                data_key_decoded = base64.b64decode(record_data['key'])
                data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded,
                                                      EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
                print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']))
            if 'NextShardIterator' in response:
                next_shard_iters.append(response['NextShardIterator'])
        shard_iters = next_shard_iters


if __name__ == '__main__':
    main()
```

------