Desarrollar un consumidor de Kinesis Client Library en Python - Amazon Kinesis Data Streams

Desarrollar un consumidor de Kinesis Client Library en Python

importante

Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x dejará de recibir asistencia el 30 de enero de 2026. Recomendamos que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de Amazon Kinesis Client Library en GitHub. Para obtener información sobre las versiones más recientes de KCL, consulte Uso de Kinesis Client Library. Para obtener información sobre cómo migrar de KCL 1.x a KCL 3.x, consulte Migración de KCL 1.x a KCL 3.x.

Puede utilizar Kinesis Client Library (KCL) para crear aplicaciones que procesen datos de los flujos de datos de Kinesis. Kinesis Client Library está disponible en varios idiomas. En este tema se habla de Python.

KCL es una biblioteca de Java; el soporte para lenguajes distintos de Java se proporciona mediante una interfaz multilingüe llamada MultiLangDaemon. Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por tanto, si instala KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando tener Java instalado en su sistema debido al MultiLangDaemon. Además, el MultiLangDaemon tiene algunos ajustes predeterminados que podría tener que personalizar para su caso de uso; por ejemplo, la región de AWS a la que se conecta. Para más información sobre MultiLangDaemon en GitHub, vaya a la página del proyecto MultiLangDaemon de KCL.

Para descargar KCL para Python desde GitHub, vaya a Kinesis Client Library (Python). Para descargar código de muestra para una aplicación de consumo de KCL para Python, vaya a la página del proyecto de muestra de KCL para Python en GitHub.

Debe completar las siguientes tareas a la hora de implementar una aplicación de consumo de KCL en Python:

Implementar los métodos de clase RecordProcessor

La clase RecordProcess debe ampliar la RecordProcessorBase para implementar los siguientes métodos. En la muestra se presentan implementaciones que puede utilizar como punto de partida (consulte sample_kclpy_app.py).

def initialize(self, shard_id) def process_records(self, records, checkpointer) def shutdown(self, checkpointer, reason)
inicializar

KCL llama al método initialize cuando se crea una instancia del procesador de registros y pasa un ID de partición específico como parámetro. Este procesador de registros procesa solo este fragmento, y normalmente también se produce la situación contraria (este fragmento solo es procesado por este procesador de registros). Sin embargo, el consumidor debe contar con la posibilidad de que un registro de datos pueda ser procesado más de una vez. Esto se debe a que Kinesis Data Streams tiene una semántica de al menos una vez, lo que significa que cada registro de datos de una partición se procesa al menos una vez por parte de un proceso de trabajo del consumidor. Para obtener más información sobre los casos en los que un fragmento en particular puede ser procesado por más de un proceso de trabajo, consulte Utilizar la nueva partición, el escalado y el procesamiento paralelo para cambiar el número de particiones.

def initialize(self, shard_id)
process_records

KCL llama a este método y pasa una lista de registros de datos de la partición especificada por el método initialize. El procesador de registros que implemente procesa los datos en estos registros según la semántica del consumidor. Por ejemplo, el proceso de trabajo podría realizar una transformación de los datos y, a continuación, almacenar el resultado en un bucket de Amazon Simple Storage Service (Amazon S3).

def process_records(self, records, checkpointer)

Además de los datos en sí, el registro contiene un número secuencial y una clave de partición. El proceso de trabajo puede utilizar estos valores al procesar los datos. Por ejemplo, el proceso de trabajo podría elegir el bucket de S3 en el que almacenar los datos en función del valor de la clave de partición. El diccionario record expone los siguientes pares clave-valor para obtener acceso a los datos, el número secuencial y la clave de partición del registro:

record.get('data') record.get('sequenceNumber') record.get('partitionKey')

Tenga en cuenta que los datos se codifican en Base64.

En el ejemplo, el método process_records tiene un código que muestra cómo un proceso de trabajo puede obtener acceso a los datos, el número secuencial y la clave de partición del registro.

Kinesis Data Streams requiere que el procesador de registros realice un seguimiento de los registros que ya se han procesado en una partición. KCL se ocupa de este seguimiento, pasando un objeto Checkpointer a process_records. El procesador de registros llama al método checkpoint en este objeto para informar a KCL de su avance en el procesamiento de los registros de la partición. Si se produce un error en el proceso de trabajo, KCL utiliza esta información para reiniciar el procesamiento de la partición en el último registro procesado conocido.

En el caso de una operación de división o fusión, KCL no comenzará a procesar las particiones nuevas hasta que los procesadores de las particiones originales hayan llamado a checkpoint para indicar que se ha completado el procesamiento en las particiones originales.

Si no se pasa un parámetro, KCL supone que la llamada a checkpoint significa que todos los registros se han procesado, hasta el último registro pasado al procesador de registros. Por tanto, el procesador de registros debería llamar a checkpoint solo después de haber procesado todos los registros de la lista que se le ha pasado. Los procesadores de registros no necesitan llamar a checkpoint en cada llamada a process_records. Un procesador podría, por ejemplo, llamar a checkpoint cada tercera vez que llame. Puede especificar opcionalmente el número secuencial exacto de un registro como un parámetro para checkpoint. En este caso, KCL supone que todos los registros se han procesado exclusivamente hasta ese registro.

En el ejemplo, el método privado checkpoint muestra cómo llamar al método Checkpointer.checkpoint mediante la administración de excepciones y la lógica de reintentos apropiadas.

KCL depende de process_records para administrar cualquier excepción que surja del procesamiento de los registros de datos. Si process_records genera una excepción, KCL omite los registros de datos que se pasaron a process_records antes de la excepción. Es decir, estos registros no se reenviarán al procesador de registros que generó la excepción ni a ningún otro procesador de registros en el consumidor.

shutdown

KCL llama al método shutdown cuando finaliza el procesamiento (el motivo del cierre es TERMINATE) o cuando el proceso de trabajo ya no responde (el reason del cierre es ZOMBIE).

def shutdown(self, checkpointer, reason)

El procesamiento finaliza cuando el procesador de registros no recibe más registros desde el fragmento, ya sea porque el fragmento se ha dividido o fusionado o porque la secuencia se ha eliminado.

KCL también pasa un objeto Checkpointer a shutdown. Si el reason del shutdown es TERMINATE, el procesador de registros debería terminar de procesar los registros de datos y llamar al método checkpoint en esta interfaz.

Modificar las propiedades de configuración

En la muestra se proporcionan valores predeterminados para las propiedades de configuración. Puede sobrescribir cualquiera de estas propiedades con sus propios valores (consulte sample.properties).

Nombre de la aplicación

KCL requiere un nombre de aplicación que sea único entre las aplicaciones y en las tablas de Amazon DynamoDB de la misma región. La biblioteca utiliza el valor del nombre de la aplicación de las siguientes formas:

  • Se supone que los procesos de trabajo que están asociados a este nombre de aplicación operan de forma conjunta en la misma secuencia. Estos procesos de trabajo pueden distribuirse en varias instancias. Si ejecuta otra instancia del mismo código de aplicación, pero con otro nombre de aplicación, KCL considera que la segunda instancia es una aplicación completamente independiente de la otra que opera en el mismo flujo.

  • KCL crea una tabla de DynamoDB con el nombre de la aplicación y utiliza la tabla para actualizar la información de estado (como los puntos de verificación y el mapeo procesos de trabajo-particiones) para la aplicación. Cada aplicación tiene su propia tabla de DynamoDB. Para obtener más información, consulte Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL.

Configuración de credenciales

Debe poner sus credenciales de AWS a disposición de uno de los proveedores de credenciales en la cadena de proveedores de credenciales predeterminada. Puede usar la propiedad AWSCredentialsProvider para configurar un proveedor de credenciales. Las propiedades de muestra deben poner sus credenciales a disposición de uno de los proveedores de credenciales de la cadena de proveedores de credenciales predeterminada. Si ejecuta su aplicación de consumo en una instancia de Amazon EC2, se recomienda que configure la instancia con un rol de IAM. Las credenciales de AWS que reflejan los permisos asociados a este rol de IAM se ponen a disposición de las aplicaciones de la instancia a través de los metadatos de esta. Esta es la forma más segura de administrar las credenciales para una aplicación consumidora que se ejecute en una instancia EC2.

El archivo de propiedades de ejemplo configura KCL para procesar un flujo de datos de Kinesis llamado “words” utilizando el procesador de registros facilitado en sample_kclpy_app.py.