

# Monitoreo de secuencias de actividades de la base de datos
<a name="DBActivityStreams.Monitoring"></a>

Los flujos de actividad de la base de datos monitorean y notifican las actividades. La secuencia de actividades se recopila y se transmite a Amazon Kinesis. Desde Kinesis, puede monitorear la secuencia de actividad, o bien otros servicios y aplicaciones pueden consumir la secuencia de actividades para un análisis posterior. Puede encontrar el nombre del flujo de Kinesis subyacente mediante el comando `describe-db-clusters` de la AWS CLI o la operación de la API de RDS `DescribeDBClusters`.

Aurora administra el flujo de Kinesis de la siguiente manera:
+ Aurora crea el flujo de Kinesis automáticamente con un periodo de retención de 24 horas. 
+  Aurora escala el flujo de Kinesis si es necesario. 
+  Si detiene el flujo de actividad de la base de datos o elimina el clúster de bases de datos, Aurora elimina el flujo de Kinesis. 

Las categorías de actividad siguientes se monitorizan y se ponen en el registro de auditoría de secuencias de actividades:
+ **Comandos SQL**: todos los comandos SQL se auditan, así como las instrucciones preparadas, las funciones integradas y las funciones en lenguaje de procedimientos para SQL (PL/SQL). Las llamadas a procedimientos almacenados se auditan. Cualquier instrucción SQL emitida dentro de procedimientos o funciones almacenados también se auditan.
+ **Otra información de la base de datos**: la actividad monitoreada incluye la instrucción SQL completa, el recuento de las filas afectadas de los comandos DML, los objetos a los que se accede y el nombre único de la base de datos. Para Aurora PostgreSQL, los flujos de actividad de la base de datos también monitorean las variables de enlace y los parámetros del procedimiento almacenados. 
**importante**  
El texto SQL completo de cada instrucción está visible en el registro de auditoría de secuencia de actividades, incluida la información confidencial. Sin embargo, las contraseñas de usuario de base de datos se redactan siAurora las puede determinar a partir del contexto, tal y como pasa con la siguiente instrucción SQL.   

  ```
  ALTER ROLE role-name WITH password
  ```
+ **Información de conexión**: la actividad monitorizada incluye la información de sesión y de red, el ID de proceso del servidor y los códigos de salida.

Si un flujo de actividad tiene un error mientras monitorea una instancia de base de datos, se lo notificará mediante eventos RDS.

En las siguientes secciones, puede acceder a los flujos de actividad de base de datos, auditarlos y procesarlos.

**Topics**
+ [Acceso a un flujo de actividad desde Amazon Kinesis](DBActivityStreams.KinesisAccess.md)
+ [Ejemplos y contenido sobre el registro de auditoría en flujos de actividad de bases de datos](DBActivityStreams.AuditLog.md)
+ [Matriz JSON databaseActivityEventList para flujos de actividad de base de datos](DBActivityStreams.AuditLog.databaseActivityEventList.md)
+ [Procesamiento de un flujo de actividad de la base de datos mediante SDK de AWS](DBActivityStreams.CodeExample.md)

# Acceso a un flujo de actividad desde Amazon Kinesis
<a name="DBActivityStreams.KinesisAccess"></a>

Cuando habilite un flujo de actividad para un clúster de bases de datos, se creará un flujo de Kinesis para usted. En Kinesis podrá monitorizar la actividad de la base de datos en tiempo real. Para profundizar en el análisis de la actividad de la base de datos, puede conectar su secuencia de Kinesis a aplicaciones de consumidor. También puede conectar el flujo de datos con aplicaciones de administración de conformidad como Security Guardium de IBM o SecureSphere Database Audit and Protection de Imperva.

Puede acceder a su transmisión de Kinesis desde la consola de RDS o la consola de Kinesis.

**Para acceder a una secuencia de actividades desde Kinesis utilizando la consola de RDS**

1. Abra la consola de Amazon RDS en [https://console.aws.amazon.com/rds/](https://console.aws.amazon.com/rds/).

1. En el panel de navegación, elija **Databases** (Bases de datos).

1. Elija el clúster de base de datos en la que desea iniciar un flujo de actividad.

1. Elija **Configuration (Configuración)**.

1. En **Database activity stream** (Secuencia de actividad de base de datos), seleccione el enlace en **Kinesis stream** (Secuencia de Kinesis).

1. En la colsola de Kinesis, elija **Monitoring** (Supervisión) para empezar a observar la actividad de la base de datos.

**Para acceder a una secuencia de actividades desde Kinesis utilizando la consola de Kinesis**

1. Abra la consola de Kinesis en [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Elija la secuencia de actividades en la lista de secuencias de Kinesis.

   El nombre de un flujo de actividad consta de un prefijo `aws-rds-das-cluster-` seguido del ID de recurso del clúster de bases de datos. A continuación se muestra un ejemplo. 

   ```
   aws-rds-das-cluster-NHVOV4PCLWHGF52NP
   ```

   Para utilizar la consola de Amazon RDS para encontrar el ID de recurso del clúster de bases de datos, elija su clúster de bases de datos en la lista de bases de datos y, luego, elija la pestaña **Configuration (Configuración)**.

   Para utilizar la AWS CLI para encontrar el nombre completo del flujo de Kinesis de un flujo de actividad, utilice una solicitud [describe-db-clústers](https://docs.aws.amazon.com/cli/latest/reference/rds/describe-db-clusters.html) de la CLI y anote el valor de `ActivityStreamKinesisStreamName` en la respuesta.

1. Elija **Monitoring (Monitorización)** para empezar a observar la actividad de la base de datos.

Para obtener más información acerca del uso de Amazon Kinesis, consulte [¿Qué es Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html).

# Ejemplos y contenido sobre el registro de auditoría en flujos de actividad de bases de datos
<a name="DBActivityStreams.AuditLog"></a>

Los eventos monitoreados se representan en el flujo de actividad de la base de datos como cadenas JSON. La estructura está formada por un objeto JSON que contiene un `DatabaseActivityMonitoringRecord`, el cual, a su vez, contiene una matriz `databaseActivityEventList` de eventos de actividad. 

**nota**  
Para los flujos de actividad de la base de datos, la matriz JSON de `paramList` no incluye valores nulos de las aplicaciones de Hibernate.

**Topics**
+ [Ejemplos de un registro de auditoría de flujo de actividad de la base de datos](#DBActivityStreams.AuditLog.Examples)
+ [Objeto JSON DatabaseActivityMonitoringRecords](#DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords)
+ [Objeto JSON databaseActivityEvents](#DBActivityStreams.AuditLog.databaseActivityEvents)

## Ejemplos de un registro de auditoría de flujo de actividad de la base de datos
<a name="DBActivityStreams.AuditLog.Examples"></a>

A continuación mostramos registros de auditoría JSON descifrados de muestra de registros de eventos de actividad.

**Example Registro de eventos de actividad de una instrucción Aurora PostgreSQL**  
El siguiente registro de eventos de actividad muestra un inicio de sesión con el uso de una instrucción SQL `CONNECT` (`command`) por parte de un cliente psql (`clientApplication`).  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-10-30 00:39:49.940668+00",
          "logTime": "2019-10-30 00:39:49.990579+00",
          "statementId": 1,
          "substatementId": 1,
          "objectType": null,
          "command": "CONNECT",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "49804",
          "sessionId": "5ce5f7f0.474b",
          "rowCount": null,
          "commandText": null,
          "paramList": [],
          "pid": 18251,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "MISC",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Registro de evento de actividad de una instrucción SQL CONNECT de Aurora MySQL**  
El siguiente registro de eventos de actividad muestra un inicio de sesión donde un cliente mysql (`clientApplication`) usa una instrucción SQL `CONNECT` (`command`).   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:13.267214+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"rdsadmin",
      "databaseName":"",
      "remoteHost":"localhost",
      "remotePort":"11053",
      "command":"CONNECT",
      "commandText":"",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"",
      "statementId":0,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725121",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:13.267207+00",
      "endTime":"2020-05-22 18:07:13.267213+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```

**Example Registro de evento de actividad de una instrucción CREATE TABLE de Aurora PostgreSQL**  
En el siguiente ejemplo, se muestra un evento `CREATE TABLE` para Aurora PostgreSQL.  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:36:54.403455+00",
          "logTime": "2019-05-24 00:36:54.494235+00",
          "statementId": 2,
          "substatementId": 1,
          "objectType": null,
          "command": "CREATE TABLE",
          "objectName": null,
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": null,
          "commandText": "create table my_table (id serial primary key, name varchar(32));",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "DDL",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

**Example Registro de evento de actividad de una instrucción CREATE TABLE de Aurora MySQL**  
En el siguiente ejemplo, se muestra una instrucción `CREATE TABLE` para Aurora MySQL. La operación se representa como dos registros de eventos independientes. Un evento tiene `"class":"MAIN"`. El otro evento tiene `"class":"AUX"`. Los mensajes pueden llegar en cualquier orden. El campo `logTime` del evento `MAIN` es siempre anterior a los campos `logTime` de cualquier evento `AUX` correspondiente.  
En el ejemplo siguiente se muestra el evento con un valor `class` de `MAIN`.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.250221+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"CREATE TABLE test1 (id INT)",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.250222+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 El siguiente ejemplo muestra el evento correspondiente con un valor `class` de `AUX`.  

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:07:12.247182+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"CREATE",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65459278,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"725118",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:07:12.226384+00",
      "endTime":"2020-05-22 18:07:12.247182+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

**Example Registro de evento de actividad de una instrucción SELECT**  
En el siguiente ejemplo, se muestra un evento `SELECT` .  

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents": 
    {
      "type":"DatabaseActivityMonitoringRecord",
      "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ",
      "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM",
      "databaseActivityEventList":[
        {
          "startTime": "2019-05-24 00:39:49.920564+00",
          "logTime": "2019-05-24 00:39:49.940668+00",
          "statementId": 6,
          "substatementId": 1,
          "objectType": "TABLE",
          "command": "SELECT",
          "objectName": "public.my_table",
          "databaseName": "postgres",
          "dbUserName": "rdsadmin",
          "remoteHost": "172.31.3.195",
          "remotePort": "34534",
          "sessionId": "5ce73c6f.7e64",
          "rowCount": 10,
          "commandText": "select * from my_table;",
          "paramList": [],
          "pid": 32356,
          "clientApplication": "psql",
          "exitCode": null,
          "class": "READ",
          "serverVersion": "2.3.1",
          "serverType": "PostgreSQL",
          "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
          "serverHost": "172.31.3.192",
          "netProtocol": "TCP",
          "dbProtocol": "Postgres 3.0",
          "type": "record",
          "errorMessage": null
        }
      ]
    },
   "key":"decryption-key"
}
```

```
{
    "type": "DatabaseActivityMonitoringRecord",
    "clusterId": "",
    "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q",
    "databaseActivityEventList": [
        {
            "class": "TABLE",
            "clientApplication": "Microsoft SQL Server Management Studio - Query",
            "command": "SELECT",
            "commandText": "select * from [testDB].[dbo].[TestTable]",
            "databaseName": "testDB",
            "dbProtocol": "SQLSERVER",
            "dbUserName": "test",
            "endTime": null,
            "errorMessage": null,
            "exitCode": 1,
            "logTime": "2022-10-06 21:24:59.9422268+00",
            "netProtocol": null,
            "objectName": "TestTable",
            "objectType": "TABLE",
            "paramList": null,
            "pid": null,
            "remoteHost": "local machine",
            "remotePort": null,
            "rowCount": 0,
            "serverHost": "172.31.30.159",
            "serverType": "SQLSERVER",
            "serverVersion": "15.00.4073.23.v1.R1",
            "serviceName": "sqlserver-ee",
            "sessionId": 62,
            "startTime": null,
            "statementId": "0x03baed90412f564fad640ebe51f89b99",
            "substatementId": 1,
            "transactionId": "4532935",
            "type": "record",
            "engineNativeAuditFields": {
                "target_database_principal_id": 0,
                "target_server_principal_id": 0,
                "target_database_principal_name": "",
                "server_principal_id": 2,
                "user_defined_information": "",
                "response_rows": 0,
                "database_principal_name": "dbo",
                "target_server_principal_name": "",
                "schema_name": "dbo",
                "is_column_permission": true,
                "object_id": 581577110,
                "server_instance_name": "EC2AMAZ-NFUJJNO",
                "target_server_principal_sid": null,
                "additional_information": "",
                "duration_milliseconds": 0,
                "permission_bitmask": "0x00000000000000000000000000000001",
                "data_sensitivity_information": "",
                "session_server_principal_name": "test",
                "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109",
                "audit_schema_version": 1,
                "database_principal_id": 1,
                "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000",
                "user_defined_event_id": 0,
                "host_name": "EC2AMAZ-NFUJJNO"
            }
        }
    ]
}
```

**Example Registro de evento de actividad de una instrucción SELECT de Aurora MySQL**  
En el siguiente ejemplo, se muestra un evento `SELECT`.  
 En el ejemplo siguiente se muestra el evento con un valor `class` de `MAIN`.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "clusterId":"cluster-some_id",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986467+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"QUERY",
      "commandText":"SELECT * FROM test1 WHERE id < 28",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":1,
      "exitCode":"0",
      "sessionId":"726571",
      "rowCount":2,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986467+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"MAIN"
    }
  ]
}
```
 El siguiente ejemplo muestra el evento correspondiente con un valor `class` de `AUX`.   

```
{
  "type":"DatabaseActivityMonitoringRecord",
  "instanceId":"db-some_id",
  "databaseActivityEventList":[
    {
      "logTime":"2020-05-22 18:29:57.986399+00",
      "type":"record",
      "clientApplication":null,
      "pid":2830,
      "dbUserName":"master",
      "databaseName":"test",
      "remoteHost":"localhost",
      "remotePort":"11054",
      "command":"READ",
      "commandText":"test1",
      "paramList":null,
      "objectType":"TABLE",
      "objectName":"test1",
      "statementId":65469218,
      "substatementId":2,
      "exitCode":"",
      "sessionId":"726571",
      "rowCount":0,
      "serverHost":"master",
      "serverType":"MySQL",
      "serviceName":"Amazon Aurora MySQL",
      "serverVersion":"MySQL 5.7.12",
      "startTime":"2020-05-22 18:29:57.986364+00",
      "endTime":"2020-05-22 18:29:57.986399+00",
      "transactionId":"0",
      "dbProtocol":"MySQL",
      "netProtocol":"TCP",
      "errorMessage":"",
      "class":"AUX"
    }
  ]
}
```

## Objeto JSON DatabaseActivityMonitoringRecords
<a name="DBActivityStreams.AuditLog.DatabaseActivityMonitoringRecords"></a>

Los registros de eventos de actividad de la base de datos se encuentran en un objeto JSON que contiene la siguiente información.


****  

| Campo JSON | Tipo de datos | Descripción | 
| --- | --- | --- | 
|  `type`  | string |  Tipo de registro JSON. El valor es `DatabaseActivityMonitoringRecords`.  | 
| version | string |  La versión de los registros de monitoreo de actividad de la base de datos. La versión de los registros de actividad de la base de datos generados dependen de la versión del motor del clúster de bases de datos. [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.html)Todos los campos siguientes están en la versión 1.0 y en la versión 1.1, excepto donde se indique específicamente. | 
|  [databaseActivityEvents](#DBActivityStreams.AuditLog.databaseActivityEvents)  | cadena |  Un objeto JSON que contiene los eventos de actividad.  | 
| key | cadena | Una clave de cifrado que se utiliza para descifrar el [Matriz de JSON databaseActivityEventList](DBActivityStreams.AuditLog.databaseActivityEventList.md).  | 

## Objeto JSON databaseActivityEvents
<a name="DBActivityStreams.AuditLog.databaseActivityEvents"></a>

El objeto JSON `databaseActivityEvents` contiene la siguiente información.

### Campos de nivel superior en el registro JSON
<a name="DBActivityStreams.AuditLog.topLevel"></a>

 Cada evento del registro de auditoría se envuelve dentro de un registro en formato JSON. Este registro contiene los siguientes campos. 

**type**  
 Este campo siempre tiene el valor `DatabaseActivityMonitoringRecords`. 

**version**  
 Este campo representa la versión del protocolo o contrato de datos del flujo de actividad de la base de datos. Define los campos que están disponibles.  
La versión 1.0 representa el soporte de secuencias de actividades de datos originales para Aurora PostgreSQL versiones 10.7 y 11.4. La versión 1.1 representa el soporte de secuencias de actividades de datos para Aurora PostgreSQL versiones 10.10 y posteriores y Aurora PostgreSQL 11.5 y posteriores. La versión 1.1 incluye los campos adicionales `errorMessage` y `startTime`. La versión 1.2 representa el soporte de secuencias de actividad de datos para Aurora MySQL 2.08 y superior. La versión 1.2 incluye los campos adicionales `endTime` y `transactionId`.

**databaseActivityEvents**  
 Una cadena cifrada que representa uno o más eventos de actividad. Se representa como una matriz de bytes base64. Al descifrar la cadena, el resultado es un registro en formato JSON con campos, tal y como se muestra en los ejemplos de esta sección.

**key**  
 Clave de datos cifrada utilizada para cifrar la cadena `databaseActivityEvents`. Esta es la misma AWS KMS key que proporcionó cuando inició la secuencia de actividades de la base de datos.

 En el ejemplo siguiente se muestra el formato de este registro.

```
{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents":"encrypted audit records",
  "key":"encrypted key"
}
```

Siga estos pasos para descifrar el contenido del campo:`databaseActivityEvents`

1.  Descifrar el valor en el campo JSON `key` mediante la clave de KMS que proporcionó al iniciar la secuencia de actividades de la base de datos. Al hacerlo, se devuelve la clave de cifrado de datos en texto sin cifrar. 

1.  Decodifique en base64 el valor en el campo JSON `databaseActivityEvents` para obtener el texto cifrado, en formato binario, de la carga útil de auditoría. 

1.  Descifrar el texto cifrado binario con la clave de cifrado de datos que decodificó en el primer paso. 

1.  Descomprimir la carga útil descifrada. 
   +  La carga cifrada está en el campo `databaseActivityEvents`. 
   +  El campo `databaseActivityEventList` contiene una matriz de registros de auditoría. Los campos `type` de la matriz pueden ser `record` o `heartbeat`. 

Un registro de evento de actividad de registro de auditoría es un objeto JSON que contiene la información siguiente.


****  

| Campo JSON | Tipo de datos | Descripción | 
| --- | --- | --- | 
|  `type`  | string |  Tipo de registro JSON. El valor es `DatabaseActivityMonitoringRecord`.  | 
| clusterId | string | Identificador del recurso del clúster de bases de datos. Corresponde al atributo del clúster de bases de datos DbClusterResourceId. | 
| instanceId | string | El identificador del recurso de instancia de base de datos. Corresponde al atributo de instancia de base de datos DbiResourceId. | 
|  [Matriz de JSON databaseActivityEventList](DBActivityStreams.AuditLog.databaseActivityEventList.md)   | string |  Una matriz de registros de auditoría de actividad o mensajes de latido.  | 

# Matriz JSON databaseActivityEventList para flujos de actividad de base de datos
<a name="DBActivityStreams.AuditLog.databaseActivityEventList"></a>

La carga de registro de auditoría es una matriz JSON `databaseActivityEventList` cifrada. En las tablas siguientes, se enumeran alfabéticamente los campos de cada evento de actividad de la matriz `DatabaseActivityEventList` descifrada de un registro de auditoría. Los campos son diferentes en función de si utiliza Aurora PostgreSQL o Aurora MySQL. Consulte la tabla que se aplica al motor de base de datos.

**importante**  
La estructura de los eventos está sujeta a cambio. Aurora podría agregar nuevos campos a eventos de actividad en el futuro. En las aplicaciones que analizan los datos JSON, asegúrese de que el código puede ignorar o tomar las acciones adecuadas para nombres de campo desconocidos. 

## Campos de databaseActivityEventList para Aurora PostgreSQL
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.apg"></a>

A continuación, encontrará campos `databaseActivityEventList` para Aurora PostgreSQL.


| Campo | Tipo de datos | Descripción | 
| --- | --- | --- | 
| class | string |  Clase de evento de actividad. Los valores válidos para Aurora PostgreSQL son los siguientes: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | string | Aplicación que el cliente ha usado para establecer conexión según notificación del cliente. No es obligatorio que el cliente notifique esta información, por lo que el valor puede ser nulo. | 
| command | string | Nombre del comando SQL sin ningún detalle del comando. | 
| commandText | string |  Instrucción SQL real que el usuario ha pasado. Para Aurora PostgreSQL, el valor es idéntico a la instrucción SQL original. Este campo se usa para todo tipo de registros, salvo para registros de conexión o desconexión, en cuyo caso el valor es nulo.  El texto SQL completo de cada instrucción está visible en el registro de auditoría de secuencia de actividades, incluida la información confidencial. Sin embargo, las contraseñas de usuario de la base de datos se redactan si se Aurora puede determinar a partir del contexto, tal y como pasa con la siguiente instrucción SQL.  <pre>ALTER ROLE role-name WITH password</pre>   | 
| databaseName | string | Base de datos a la que se ha conectado el usuario. | 
| dbProtocol | string | El protocolo de base de datos, por ejemplo Postgres 3.0. | 
| dbUserName | string | Usuario de base de datos que el cliente ha usado para autenticarse. | 
| errorMessage(solo registros de actividad de la base de datos de la versión 1.1) | string |  Si hubo algún error, este campo se rellena con el mensaje de error que habría generado el servidor de base de datos. El valor `errorMessage` es nulo para las declaraciones normales que no dieron lugar a un error.  Un error se define como cualquier actividad que produzca un evento de registro de errores de PostgreSQL visible para el cliente en un nivel de gravedad de `ERROR` o superior. Para obtener más información, consulte [Niveles de gravedad de mensajes de PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-SEVERITY-LEVELS). Por ejemplo, los errores de sintaxis y las cancelaciones de consultas generan un mensaje de error.  Los errores internos del servidor PostgreSQL, como los errores de proceso del generador de puntos de comprobación en segundo plano, no generan un mensaje de error. Sin embargo, los registros de tales eventos se siguen emitiendo independientemente de la configuración del nivel de gravedad del registro. Esto evita que los atacantes desactiven el registro para intentar evitar la detección. Consulte también el campo `exitCode`.  | 
| exitCode | int | Valor usado para un registro de salida de sesión. Si la salida es limpia, contiene el código de salida. No siempre se puede obtener un código de salida en algunos escenarios de error. Por ejemplo, este es el caso cuando PostgreSQL genera un exit() o si un operador ejecuta un comando kill -9.Si se produjo algún error, el campo `exitCode` muestra el código de error SQL, `SQLSTATE`, como se muestra en [Códigos de error de PostgreSQL](https://www.postgresql.org/docs/current/errcodes-appendix.html). Consulte también el campo `errorMessage`. | 
| logTime | string | Marca temporal según se ha registrado en la ruta del código de auditoría. Esto representa la hora de finalización de la ejecución de la instrucción SQL. Consulte también el campo startTime. | 
| netProtocol | string | Protocolo de comunicación de red | 
| objectName | string | Nombre del objeto de base de datos si la instrucción SQL opera en uno. Este campo solo se usa cuando la instrucción SQL funciona en un objeto de base de datos. Si la instrucción SQL no opera en un objeto, este valor es nulo. | 
| objectType | string | Tipo de objeto de base de datos como mesa, índice, vista, etc. Este campo solo se usa cuando la instrucción SQL funciona en un objeto de base de datos. Si la instrucción SQL no opera en un objeto, este valor es nulo. Entre los valores válidos se incluyen los siguientes:[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) | 
| paramList | string | Matriz de parámetros separados por comas que se transfiere a la instrucción SQL. Si la instrucción SQL no tiene parámetros, este valor es una matriz vacía. | 
| pid | int | ID del proceso de backend que se asigna para atender la conexión cliente. | 
| remoteHost | string | La dirección IP del cliente o el nombre de host. Para Aurora PostgreSQL, el que se utiliza depende de la configuración del parámetro log\$1hostname de la base de datos. El valor remoteHost también incluye [local] y localhost, que indican actividad del usuario rdsadmin. | 
| remotePort | cadena | Número de puerto del cliente. | 
| rowCount | int | Número de filas de tabla que la instrucción SQL recupera o a las que afecta. Este campo se usa únicamente para instrucciones SQL que son instrucciones de lenguaje de manipulación de datos (DML). Si la instrucción SQL no es una instrucción DML, este valor es nulo. | 
| serverHost | cadena | Dirección IP del host de servidor de la base de datos. El valor serverHost también incluye [local] y localhost, que indican actividad del usuario rdsadmin. | 
| serverType | cadena | Tipo de servidor de base de datos; por ejemplo, PostgreSQL. | 
| serverVersion | string | La versión del servidor de base de datos, por ejemplo 2.3.1 para Aurora PostgreSQL. | 
| serviceName | string | Nombre del servicio, por ejemplo Amazon Aurora PostgreSQL-Compatible edition.  | 
| sessionId | int | Identificador de sesión pseudoúnico. | 
| sessionId | int | Identificador de sesión pseudoúnico. | 
| startTime(solo registros de actividad de la base de datos de la versión 1.1) | string |  La hora a la que comenzó la ejecución de la instrucción SQL.  Para calcular el tiempo de ejecución aproximado de la instrucción SQL, utilice `logTime - startTime`. Consulte también el campo `logTime`.  | 
| statementId | int | Un identificador de la instrucción SQL del cliente. El contador está en el nivel de sesión y aumenta con cada instrucción SQL que el cliente introduce.  | 
| substatementId | int | Un identificador de una subinstrucción SQL. Este valor cuenta las subinstrucciones contenidas por cada instrucción SQL que el campo statementId identifica. | 
| type | string | Tipo de evento. Los valores válidos son record o heartbeat. | 

## Campos de databaseActivityEventList para Aurora MySQL
<a name="DBActivityStreams.AuditLog.databaseActivityEventList.ams"></a>

A continuación, encontrará campos `databaseActivityEventList` para Aurora MySQL.


| Campo | Tipo de datos | Descripción | 
| --- | --- | --- | 
| class | string |  Clase de evento de actividad. Los valores válidos para Aurora MySQL son los siguientes: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| clientApplication | string | Aplicación que el cliente ha usado para establecer conexión según notificación del cliente. No es obligatorio que el cliente notifique esta información, por lo que el valor puede ser nulo. | 
| command | string |  La categoría general de la instrucción SQL. Los valores de este campo dependen del valor de `class`. Los valores cuando `class` es `MAIN` incluyen lo siguiente: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html) Los valores cuando `class` es `AUX` incluyen lo siguiente: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| commandText | string |  Para los eventos con un valor `class` de `MAIN`, este campo representa la instrucción SQL real que el usuario ha pasado. Este campo se usa para todo tipo de registros, salvo para registros de conexión o desconexión, en cuyo caso el valor es nulo.  Para los eventos con un valor `class` de `AUX`, este campo contiene información adicional sobre los objetos involucrados en el evento.  Para Aurora MySQL, los caracteres como comillas van precedidos de una barra diagonal inversa, que representa un carácter de escape.  El texto SQL completo de cada instrucción está visible en el registro de auditoría, incluida la información confidencial. Sin embargo, las contraseñas de usuario de la base de datos se redactan si se Aurora puede determinar a partir del contexto, tal y como pasa con la siguiente instrucción SQL.  <pre>mysql> SET PASSWORD = 'my-password';</pre> Especifique una contraseña distinta de la que se muestra aquí como práctica recomendada de seguridad.   | 
| databaseName | cadena | Base de datos a la que se ha conectado el usuario. | 
| dbProtocol | string | Protocolo de la base de datos. Actualmente, este valor es siempre MySQL para Aurora MySQL. | 
| dbUserName | string | Usuario de base de datos que el cliente ha usado para autenticarse. | 
| endTime(solo registros de actividad de la base de datos de la versión 1.2) | string |  La hora a la que finalizó la ejecución de la instrucción SQL. Se representa en formato de hora universal coordinada (UTC). Para calcular el tiempo de ejecución de la instrucción SQL, utilice `endTime - startTime`. Consulte también el campo `startTime`.  | 
| errorMessage(solo registros de actividad de la base de datos de la versión 1.1) | string |  Si hubo algún error, este campo se rellena con el mensaje de error que habría generado el servidor de base de datos. El valor `errorMessage` es nulo para las declaraciones normales que no dieron lugar a un error.  Un error se define como cualquier actividad que produciría un evento de registro de errores de MySQL visible para el cliente en un nivel de gravedad de `ERROR` o superior. Para obtener más información, consulte [The Error Log](https://dev.mysql.com/doc/refman/5.7/en/error-log.html) en el *Manual de referencia de MySQL*. Por ejemplo, los errores de sintaxis y las cancelaciones de consultas generan un mensaje de error.  Los errores internos del servidor MySQL, como los errores de proceso del generador de puntos de comprobación en segundo plano, no generan un mensaje de error. Sin embargo, los registros de tales eventos se siguen emitiendo independientemente de la configuración del nivel de gravedad del registro. Esto evita que los atacantes desactiven el registro para intentar evitar la detección. Consulte también el campo `exitCode`.  | 
| exitCode | int | Valor usado para un registro de salida de sesión. Si la salida es limpia, contiene el código de salida. No siempre se puede obtener un código de salida en algunos escenarios de error. En tales casos, este valor puede ser cero o puede estar en blanco. | 
| logTime | string | Marca temporal según se ha registrado en la ruta del código de auditoría. Se representa en formato de hora universal coordinada (UTC). Para obtener la forma más precisa de calcular la duración de la instrucción, consulte los campos startTime y endTime. | 
| netProtocol | string | Protocolo de comunicación de red Actualmente, este valor es siempre TCP para Aurora MySQL. | 
| objectName | string | Nombre del objeto de base de datos si la instrucción SQL opera en uno. Este campo solo se usa cuando la instrucción SQL funciona en un objeto de base de datos. Si la instrucción SQL no opera en un objeto, este valor es nulo. Para crear el nombre completo del objeto, combine databaseName y objectName. Si la consulta implica a varios objetos, este campo puede ser una lista de nombres separados por comas. | 
| objectType | string |  El tipo de objeto de base de datos como tabla, índice, vista, etc. Este campo solo se usa cuando la instrucción SQL funciona en un objeto de base de datos. Si la instrucción SQL no opera en un objeto, este valor es nulo. Los valores válidos de Aurora MySQL incluyen lo siguiente: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.AuditLog.databaseActivityEventList.html)  | 
| paramList | string | Este campo no se utiliza para Aurora MySQL y siempre es nulo. | 
| pid | int | ID del proceso de backend que se asigna para atender la conexión cliente. Cuando se reinicia el servidor de base de datos, los cambios pid y el contador para el campo statementId se inicia de nuevo. | 
| remoteHost | string | La dirección IP o el nombre de host del cliente que emitió la instrucción SQL. Para Aurora MySQL, el que se utiliza depende de la configuración del parámetro skip\$1name\$1resolve de la base de datos. El valor localhost indica la actividad del usuario especial rdsadmin.  | 
| remotePort | string | Número de puerto del cliente. | 
| rowCount | int | La cantidad de filas devueltas por la instrucción SQL. Por ejemplo, si una instrucción SELECT devuelve 10 filas, rowCount es 10. Para las instrucciones INSERT o UPDATE, rowCount es 0. | 
| serverHost | cadena | El identificador de instancia del servidor de base de datos. | 
| serverType | cadena | Tipo de servidor de base de datos; por ejemplo, MySQL. | 
| serverVersion | string | Versión del servidor de base de datos. Actualmente, este valor es siempre MySQL 5.7.12 para Aurora MySQL. | 
| serviceName | string | Nombre del servicio de Actualmente, este valor es siempre Amazon Aurora MySQL para Aurora MySQL. | 
| sessionId | int | Identificador de sesión pseudoúnico. | 
| startTime(solo registros de actividad de la base de datos de la versión 1.1) | string |  La hora a la que comenzó la ejecución de la instrucción SQL. Se representa en formato de hora universal coordinada (UTC). Para calcular el tiempo de ejecución de la instrucción SQL, utilice `endTime - startTime`. Consulte también el campo `endTime`.  | 
| statementId | int | Un identificador de la instrucción SQL del cliente. El contador aumenta con cada instrucción SQL introducida por el cliente. El contador se restablece cuando se reinicia la instancia de base de datos. | 
| substatementId | int | Un identificador de una subinstrucción SQL. Este valor es 1 para eventos con clase MAIN y 2 para eventos con clase AUX. Utilice el campo statementId para identificar todos los eventos generados por la misma instrucción. | 
| transactionId(solo registros de actividad de la base de datos de la versión 1.2) | int | Un identificador de una transacción. | 
| type | string | Tipo de evento. Los valores válidos son record o heartbeat. | 

# Procesamiento de un flujo de actividad de la base de datos mediante SDK de AWS
<a name="DBActivityStreams.CodeExample"></a>

Puede procesar una secuencia de actividades mediante programación con AWS SDK. A continuación, mostramos ejemplos de Java y Python totalmente funcionales sobre cómo puede procesar el flujo de datos de Kinesis.

------
#### [ Java ]

```
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.Security;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;

import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.SecretKeySpec;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CryptoInputStream;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kms.AWSKMS;
import com.amazonaws.services.kms.AWSKMSClientBuilder;
import com.amazonaws.services.kms.model.DecryptRequest;
import com.amazonaws.services.kms.model.DecryptResult;
import com.amazonaws.util.Base64;
import com.amazonaws.util.IOUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.bouncycastle.jce.provider.BouncyCastleProvider;

public class DemoConsumer {

    private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]";
    private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking
    private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]";
    private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]";
    private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]";
    private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2...
    private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY);
    private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS);

    private static final AwsCrypto CRYPTO = new AwsCrypto();
    private static final AWSKMS KMS = AWSKMSClientBuilder.standard()
            .withRegion(REGION_NAME)
            .withCredentials(CREDENTIALS_PROVIDER).build();

    class Activity {
        String type;
        String version;
        String databaseActivityEvents;
        String key;
    }

    class ActivityEvent {
        @SerializedName("class") String _class;
        String clientApplication;
        String command;
        String commandText;
        String databaseName;
        String dbProtocol;
        String dbUserName;
        String endTime;
        String errorMessage;
        String exitCode;
        String logTime;
        String netProtocol;
        String objectName;
        String objectType;
        List<String> paramList;
        String pid;
        String remoteHost;
        String remotePort;
        String rowCount;
        String serverHost;
        String serverType;
        String serverVersion;
        String serviceName;
        String sessionId;
        String startTime;
        String statementId;
        String substatementId;
        String transactionId;
        String type;
    }

    class ActivityRecords {
        String type;
        String clusterId;
        String instanceId;
        List<ActivityEvent> databaseActivityEventList;
    }

    static class RecordProcessorFactory implements IRecordProcessorFactory {
        @Override
        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }

    static class RecordProcessor implements IRecordProcessor {

        private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
        private static final int PROCESSING_RETRIES_MAX = 10;
        private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
        private static final Gson GSON = new GsonBuilder().serializeNulls().create();

        private static final Cipher CIPHER;
        static {
            Security.insertProviderAt(new BouncyCastleProvider(), 1);
            try {
                CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC");
            } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) {
                throw new ExceptionInInitializerError(e);
            }
        }

        private long nextCheckpointTimeInMillis;

        @Override
        public void initialize(String shardId) {
        }

        @Override
        public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) {
            for (final Record record : records) {
                processSingleBlob(record.getData());
            }

            if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
                checkpoint(checkpointer);
                nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
            }
        }

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
            if (reason == ShutdownReason.TERMINATE) {
                checkpoint(checkpointer);
            }
        }

        private void processSingleBlob(final ByteBuffer bytes) {
            try {
                // JSON $Activity
                final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class);

                // Base64.Decode
                final byte[] decoded = Base64.decode(activity.databaseActivityEvents);
                final byte[] decodedDataKey = Base64.decode(activity.key);

                Map<String, String> context = new HashMap<>();
                context.put("aws:rds:dbc-id", DBC_RESOURCE_ID);

                // Decrypt
                final DecryptRequest decryptRequest = new DecryptRequest()
                        .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context);
                final DecryptResult decryptResult = KMS.decrypt(decryptRequest);
                final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext()));

                // GZip Decompress
                final byte[] decompressed = decompress(decrypted);
                // JSON $ActivityRecords
                final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class);

                // Iterate throught $ActivityEvents
                for (final ActivityEvent event : activityRecords.databaseActivityEventList) {
                    System.out.println(GSON.toJson(event));
                }
            } catch (Exception e) {
                // Handle error.
                e.printStackTrace();
            }
        }

        private static byte[] decompress(final byte[] src) throws IOException {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
            GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
            return IOUtils.toByteArray(gzipInputStream);
        }

        private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
            for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) {
                try {
                    checkpointer.checkpoint();
                    break;
                } catch (ShutdownException se) {
                    // Ignore checkpoint if the processor instance has been shutdown (fail over).
                    System.out.println("Caught shutdown exception, skipping checkpoint." + se);
                    break;
                } catch (ThrottlingException e) {
                    // Backoff and re-attempt checkpoint upon transient failures
                    if (i >= (PROCESSING_RETRIES_MAX - 1)) {
                        System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e);
                        break;
                    } else {
                        System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e);
                    }
                } catch (InvalidStateException e) {
                    // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
                    System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e);
                    break;
                }
                try {
                    Thread.sleep(BACKOFF_TIME_IN_MILLIS);
                } catch (InterruptedException e) {
                    System.out.println("Interrupted sleep" + e);
                }
            }
        }
    }

    private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException {
        // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm
        final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"),
                "BC", "DataKey", "AES/GCM/NoPadding");
        try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded));
             final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
            IOUtils.copy(decryptingStream, out);
            return out.toByteArray();
        }
    }

    public static void main(String[] args) throws Exception {
        final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        final KinesisClientLibConfiguration kinesisClientLibConfiguration =
                new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId);
        kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST);
        kinesisClientLibConfiguration.withRegionName(REGION_NAME);
        final Worker worker = new Builder()
                .recordProcessorFactory(new RecordProcessorFactory())
                .config(kinesisClientLibConfiguration)
                .build();

        System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId);

        try {
            worker.run();
        } catch (Throwable t) {
            System.err.println("Caught throwable while processing data.");
            t.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }

    private static byte[] getByteArray(final ByteBuffer b) {
        byte[] byteArray = new byte[b.remaining()];
        b.get(byteArray);
        return byteArray;
    }
}
```

------
#### [ Python ]

```
import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3

REGION_NAME = '<region>'                    # us-east-1
RESOURCE_ID = '<external-resource-id>'      # cluster-ABCD123456
STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID  # aws-rds-das-cluster-ABCD123456

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT)

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt_payload(payload, data_key):
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    decrypted_plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
    return decrypted_plaintext


def decrypt_decompress(payload, key):
    decrypted = decrypt_payload(payload, key)
    return zlib.decompress(decrypted, zlib.MAX_WBITS + 16)


def main():
    session = boto3.session.Session()
    kms = session.client('kms', region_name=REGION_NAME)
    kinesis = session.client('kinesis', region_name=REGION_NAME)

    response = kinesis.describe_stream(StreamName=STREAM_NAME)
    shard_iters = []
    for shard in response['StreamDescription']['Shards']:
        shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'],
                                                         ShardIteratorType='LATEST')
        shard_iters.append(shard_iter_response['ShardIterator'])

    while len(shard_iters) > 0:
        next_shard_iters = []
        for shard_iter in shard_iters:
            response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000)
            for record in response['Records']:
                record_data = record['Data']
                record_data = json.loads(record_data)
                payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
                data_key_decoded = base64.b64decode(record_data['key'])
                data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded,
                                                      EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
                print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']))
            if 'NextShardIterator' in response:
                next_shard_iters.append(response['NextShardIterator'])
        shard_iters = next_shard_iters


if __name__ == '__main__':
    main()
```

------