Información sobre KCL 1.x y 2.x
importante
Las versiones 1.x y 2.x de la biblioteca de clientes de Amazon Kinesis (KCL) están obsoletas. KCL 1.x dejará de recibir asistencia el 30 de enero de 2026. Recomendamos que migre las aplicaciones de KCL que utilizan la versión 1.x a la última versión de KCL antes del 30 de enero de 2026. Para encontrar la versión más reciente de KCL, consulte la página de Amazon Kinesis Client Library en GitHub
Uno de los métodos para desarrollar aplicaciones de consumo personalizadas que puedan procesar datos de flujos de datos de KDS consiste en utilizar Kinesis Client Library (KCL).
nota
Se recomienda actualizar a la última versión tanto KCL 1.x como KCL 2.x, según el escenario de uso. Tanto KCL 1.x como KCL 2.x se actualizan periódicamente con versiones más recientes que incluyen las últimas revisiones de dependencia y seguridad, correcciones de errores y nuevas características compatibles con versiones anteriores. Para obtener más información, consulte https://github.com/awslabs/amazon-kinesis-client/releases
Acerca de KCL (versiones anteriores)
KCL ayuda a consumir y procesar los datos de un flujo de datos de Kinesis, ya que se encarga de muchas de las tareas complejas asociadas a la computación distribuida. Estas incluyen equilibrar la carga entre varias instancias de aplicaciones de consumo, responder a los errores de las instancias de aplicaciones de consumo, comprobar los registros procesados y reaccionar ante la repartición. KCL se encarga de todas estas subtareas para que pueda centrar sus esfuerzos en escribir una lógica de procesamiento de registros personalizada.
KCL es diferente de las API de Kinesis Data Streams que están disponibles en los SDK de AWS. La API de Kinesis Data Streams le ayuda a administrar numerosos aspectos de Kinesis Data Streams (como la creación de flujos, la repartición y la inserción y obtención de registros). KCL proporciona una capa de abstracción en torno a todas estas subtareas, específicamente para que pueda centrarse en la lógica de procesamiento de datos personalizada de su aplicación de consumo. Para obtener información sobre la API de Kinesis Data Streams, consulte la referencia de la API de Amazon Kinesis.
importante
KCL es una biblioteca de Java. El soporte para lenguajes distintos de Java se proporciona mediante una interfaz multilingüe llamada MultiLangDaemon. Este daemon está basado en Java y se ejecuta en segundo plano cuando se utiliza un lenguaje de KCL distinto de Java. Por ejemplo, si instala KCL para Python y escribe su aplicación de consumo completamente en Python, seguirá necesitando tener Java instalado en su sistema debido al MultiLangDaemon. Además, el MultiLangDaemon tiene algunos ajustes predeterminados que podría tener que personalizar para su caso de uso, como por ejemplo, la región de AWS a la que se conecta. Para obtener más información sobre MultiLangDaemon en GitHub, visite la página del proyecto MultiLangDaemon de KCL
KCL ejerce de intermediaria entre su lógica de procesamiento de registros y Kinesis Data Streams.
Versiones anteriores de KCL
Actualmente, puede utilizar cualquiera de las siguientes versiones compatibles de KCL para crear sus aplicaciones de consumo personalizadas:
-
KCL 1.x
Para obtener más información, consulte Desarrollar consumidores de KCL 1.x
-
KCL 2.x
Para obtener más información, consulte Desarrollar consumidores de KCL 2.x
Puede usar KCL 1.x o KCL 2.x para crear aplicaciones de consumo que utilicen un rendimiento compartido. Para obtener más información, consulte Desarrollar consumidores personalizados con rendimiento compartido mediante KCL.
Para crear aplicaciones de consumo que utilicen un rendimiento dedicado (consumidores con distribución mejorada), solo puede utilizar KCL 2.x. Para obtener más información, consulte Desarrollo de consumidores de distribución ramificada mejorada con rendimiento dedicado.
Para obtener información sobre las diferencias entre KCL 1.x y KCL 2.x e instrucciones sobre cómo migrar de KCL 1.x a KCL 2.x, consulte Migrar consumidores de KCL 1.x a KCL 2.x.
Conceptos de KCL (versiones anteriores)
-
Aplicación para consumidores de KCL: una aplicación creada a medida con KCL y diseñada para leer y procesar registros de flujos de datos.
-
Instancia de aplicación de consumo: las aplicaciones de consumo de KCL suelen estar distribuidas y una o más instancias de aplicación se ejecutan simultáneamente para coordinar los fallos y equilibrar la carga de forma dinámica del procesamiento de registro de datos.
-
Proceso de trabajo: clase de alto nivel que utiliza una instancia de aplicación de consumo de KCL para empezar a procesar datos.
importante
Cada instancia de aplicación de consumo de KCL tiene un proceso de trabajo.
El proceso de trabajo inicializa y supervisa diversas tareas, como la sincronización de la información sobre los arrendamientos y las particiones, el seguimiento de las asignaciones de las particiones y el procesamiento de los datos de las particiones. Un proceso de trabajo proporciona a KCL la información de configuración de la aplicación de consumo, como el nombre del flujo de datos cuyos registros de datos va a procesar la aplicación de consumo de KCL y las credenciales de AWS necesarias para acceder a este flujo de datos. El proceso de trabajo también pone en marcha esa instancia específica de la aplicación de consumo de KCL para entregar los registros de datos del flujo de datos a los procesadores de registros.
importante
En KCL 1.x, esta clase se denomina Proceso de trabajo. Para obtener más información (estos son los repositorios de KCL de Java), consulte https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
. En KCL 2.x, esta clase se denomina Programador. El propósito del programador en KCL 2.x es idéntico al propósito del proceso de trabajo en KCL 1.x. Para obtener más información sobre la clase Programador en KCL 2.x, consulte https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java . -
Arrendamiento: datos que definen el enlace entre un proceso de trabajo y una partición. Las aplicaciones de consumo distribuidas de KCL utilizan los arrendamientos para dividir el procesamiento de registros de datos entre una flota de procesos de trabajo. En un momento dado, cada partición de registros de datos está vinculada a un proceso de trabajo en particular mediante un arrendamiento identificado por la variable leaseKey.
De forma predeterminada, un proceso de trabajo puede tener uno o más arrendamientos (sujetos al valor de la variable maxLeasesForWorker) al mismo tiempo.
importante
Cada proceso de trabajo competirá por tener todos los arrendamientos disponibles para todas las particiones disponibles en un flujo de datos. Sin embargo, solo un proceso de trabajo podrá mantener satisfactoriamente cada arrendamiento a la vez.
Por ejemplo, si tiene una instancia de aplicación de consumo A con el proceso de trabajo A que procesa un flujo de datos con 4 particiones, el proceso de trabajo A puede retener los arrendamientos de las particiones 1, 2, 3 y 4 al mismo tiempo. Sin embargo, si tiene dos instancias de aplicaciones de consumo: A y B con el proceso de trabajo A y el proceso de trabajo B, y estas instancias procesan un flujo de datos con 4 particiones, el proceso de trabajo A y el proceso de trabajo B no pueden retener el arrendamiento de la partición 1 al mismo tiempo. Un proceso de trabajo retiene el arrendamiento de una partición concreta hasta que esté listo para dejar de procesar los registros de datos de esta partición o hasta que falle. Cuando un proceso de trabajo deja de ser titular del arrendamiento, otro proceso de trabajo lo acepta y lo retiene.
Para obtener más información (estos son los repositorios de KCL de Java), consulte https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java
para KCL 1.x y https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java para KCL 2.x. -
Tabla de arrendamiento: una tabla exclusiva de Amazon DynamoDB que se utiliza para realizar un seguimiento de las particiones de un flujo de datos de KDS que los procesos de trabajo de la aplicación de consumo de KCL están arrendando y procesando. La tabla de arrendamiento debe permanecer sincronizada (dentro de un proceso de trabajo y entre todos los procesos de trabajo) con la información más reciente sobre las particiones del flujo de datos mientras se ejecuta la aplicación de consumo de KCL. Para obtener más información, consulte Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL.
-
Procesador de registros: lógica que define la forma en que su aplicación de consumo de KCL procesa los datos que obtiene de los flujos de datos. En tiempo de ejecución, una instancia de una aplicación de consumo de KCL crea una instancia de un proceso de trabajo, y este proceso de trabajo crea una instancia de un procesador de registros por cada partición que tiene en arrendamiento.
Usar una tabla de arrendamiento para realizar el seguimiento de las particiones procesadas por la aplicación de consumo de KCL
Temas
Qué es una tabla de arrendamiento
Para cada aplicación de Amazon Kinesis Data Streams, KCL utiliza una tabla de arrendamiento única (almacenada en una tabla de Amazon DynamoDB) para realizar un seguimiento de las particiones de un flujo de datos de KDS que los procesos de trabajo de la aplicación de consumo de KCL están arrendando y procesando.
importante
KCL utiliza el nombre de la aplicación de consumo para crear el nombre de la tabla de arrendamiento que utiliza esta aplicación de consumo, por lo que el nombre de cada aplicación de consumo debe ser único.
Puede consultar la tabla con la consola de Amazon DynamoDB mientras se ejecuta la aplicación de consumo.
Si la tabla de arrendamiento de la aplicación de consumo de KCL no existe cuando se inicia la aplicación, uno de los procesos de trabajo crea la tabla de arrendamiento para esta aplicación.
importante
Se le realizará el cobro de los costos de su cuenta asociados a la tabla de DynamoDB, además de los costos propios asociados a Kinesis Data Streams.
Cada fila de la tabla de arrendamiento representa una partición que procesan los procesos de trabajo de la aplicación de consumo. Si la aplicación de consumo de KCL procesa solo un flujo de datos, la leaseKey que es la clave hash de la tabla de arrendamiento será el identificador de la partición. Si es Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java, la estructura de leaseKey tendrá el siguiente aspecto: account-id:StreamName:streamCreationTimestamp:ShardId. Por ejemplo, 111111111:multiStreamTest-1:12345:shardId-000000000336.
Además de la ID del fragmento, cada fila incluye también los siguientes datos:
-
checkpoint: el número secuencial de punto de comprobación más reciente del fragmento. Este valor es único en todas las particiones del flujo de datos.
-
checkpointSubSequenceNumber: cuando se utiliza la característica de agregación de Kinesis Client Library, esto es una extensión del valor checkpoint que realiza un seguimiento de los registros de los usuarios individuales en el registro de Kinesis.
-
leaseCounter: se utiliza para el control de versiones de las asignaciones, de modo que los procesos de trabajo puedan detectar que su asignación ha sido utilizada por otro proceso de trabajo.
-
leaseKey: un identificador único para una asignación. Cada arrendamiento es específico de una partición del flujo de datos y solo lo retiene un proceso de trabajo cada vez.
-
leaseOwner: el proceso de trabajo que tiene esta asignación.
-
ownerSwitchesSinceCheckpoint: veces que la asignación ha cambiado de proceso de trabajo desde la última vez que se escribió un punto de comprobación.
-
parentShardId: se utiliza para garantizar que el fragmento principal ha sido procesado totalmente antes de comenzar el procesamiento en los fragmentos secundarios. Así, se garantiza que los registros se procesen en el mismo orden en el que se introdujeron en la secuencia.
-
hashrange: lo utiliza
PeriodicShardSyncManagerpara ejecutar sincronizaciones periódicas para encontrar las particiones que faltan en la tabla de arrendamiento y crear arrendamientos para ellas si es necesario.nota
Estos datos están presentes en la tabla de arrendamiento de todas las particiones a partir de KCL 1.14 y KCL 2.3. Para obtener más información sobre
PeriodicShardSyncManagery la sincronización periódica entre los arrendamientos y las particiones, consulte Cómo se sincroniza una tabla de arrendamiento con las particiones de Kinesis Data Streams. -
childshards: lo utiliza
LeaseCleanupManagerpara revisar el estado de procesamiento de la partición secundaria y decidir si la partición principal se puede eliminar de la tabla de arrendamiento.nota
Estos datos están presentes en la tabla de arrendamiento de todas las particiones a partir de KCL 1.14 y KCL 2.3.
-
shardID: ID de la partición.
nota
Estos datos solo están presentes en la tabla de arrendamiento si es Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java. Esto solo se admite en KCL 2.x para Java, a partir de KCL 2.3 para Java y versiones posteriores.
-
nombre del flujo El identificador del flujo de datos en el siguiente formato:
account-id:StreamName:streamCreationTimestamp.nota
Estos datos solo están presentes en la tabla de arrendamiento si se dedica al Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java. Esto solo se admite en KCL 2.x para Java, a partir de KCL 2.3 para Java y versiones posteriores.
Rendimiento
Si la aplicación de Amazon Kinesis Data Streams recibe excepciones de rendimiento aprovisionado, debe aumentar el rendimiento aprovisionado para la tabla de DynamoDB. KCL crea la tabla con un rendimiento aprovisionado de 10 lecturas por segundo y 10 escrituras por segundo, pero esto podría no ser suficiente para su aplicación. Por ejemplo, si su aplicación de Amazon Kinesis Data Streams crea frecuentemente puntos de comprobación u opera en un flujo que se compone de muchas particiones, es posible que necesite más rendimiento.
Para obtener información sobre el rendimiento aprovisionado en DynamoDB, consulte Modo de capacidad de lectura y escritura y Uso de tablas y datos en la Guía para desarrolladores de Amazon DynamoDB.
Cómo se sincroniza una tabla de arrendamiento con las particiones de Kinesis Data Streams
Los procesos de trabajo de las aplicaciones de consumo de KCL utilizan los arrendamientos para procesar particiones de un flujo de datos determinado. La información sobre qué proceso de trabajo está arrendando cada partición en un momento dado se almacena en una tabla de arrendamiento. La tabla de arrendamiento debe permanecer sincronizada con la información más reciente sobre la partición del flujo de datos mientras se ejecuta la aplicación de consumo de KCL. KCL sincroniza la tabla de arrendamiento con la información de las particiones obtenida del servicio Kinesis Data Streams durante el arranque de la aplicación de consumo (ya sea al inicializar o reiniciar la aplicación de consumo) y también siempre que una partición que se esté procesando llegue a su fin (repartición). En otras palabras, los procesos de trabajo o una aplicación de consumo de KCL se sincronizan con el flujo de datos que están procesando durante el arranque inicial de la aplicación de consumo y siempre que la aplicación de consumo encuentra un evento de repartición del flujo de datos.
Temas
Sincronización en KCL 1.0 a 1.13 y KCL 2.0 a 2.2
En KCL 1.0 a 1.13 y KCL 2.0 a 2.2, durante el arranque de la aplicación de consumo y también durante cada evento de repartición del flujo de datos, KCL sincroniza la tabla de arrendamiento con la información de las particiones adquirida en el servicio Kinesis Data Streams invocando las ListShards o API de descubrimiento de DescribeStream. En todas las versiones de KCL enumeradas anteriormente, cada proceso de trabajo de una aplicación de consumo de KCL completa los siguientes pasos para realizar el proceso de sincronización entre arrendamientos y particiones durante el arranque de la aplicación de consumo y en cada evento de repartición del flujo:
-
Obtiene todos las particiones de datos del flujo que se está procesando.
-
Obtiene todos los arrendamientos de particiones de la tabla de arrendamiento.
-
Filtra cada partición abierta que no tenga arrendamientos en la tabla de arrendamiento.
-
Repite todas las particiones abiertas encontradas y para cada partición abierta sin un elemento principal abierto:
-
Recorre el árbol jerárquico siguiendo la ruta de sus antecesores para determinar si la partición es descendiente. Una partición se considera descendiente si se está procesando una partición anterior (en la tabla de arrendamiento se indica el arrendamiento de la partición anterior) o si se debe procesar una partición anterior (por ejemplo, si la posición inicial es
TRIM_HORIZONoAT_TIMESTAMP). -
Si la partición abierta en el contexto es descendiente, KCL comprueba la partición en función de su posición inicial y crea arrendamientos para sus elementos principales, si es necesario.
-
Sincronización en KCL 2.x, a partir de KCL 2.3 y versiones posteriores
A partir de las últimas versiones compatibles de KCL 2.x (KCL 2.3) y versiones posteriores, la biblioteca ahora admite los siguientes cambios en el proceso de sincronización. Estos cambios en la sincronización entre arrendamientos y particiones reducen significativamente el número de llamadas a la API que realizan las aplicaciones de consumo de KCL al servicio Kinesis Data Streams y optimizan la administración de arrendamientos en su aplicación de consumo de KCL.
-
Durante el arranque de la aplicación, si la tabla de arrendamiento está vacía, KCL utiliza la opción de filtrado de la API
ListShard(el parámetro de solicitudShardFilteropcional) para recuperar y crear arrendamientos únicamente para una instantánea de las particiones abiertas en el momento especificado por el parámetroShardFilter. El parámetroShardFilterpermite filtrar la respuesta de la APIListShards. La única propiedad obligatoria del parámetroShardFilteresType. KCL utiliza la propiedad de filtroTypey los siguientes valores válidos para identificar y devolver una instantánea de las particiones abiertas que podrían requerir nuevos arrendamientos:-
AT_TRIM_HORIZON: la respuesta incluye todas las particiones que estaban abiertas enTRIM_HORIZON. -
AT_LATEST: la respuesta incluye solo las particiones actualmente abiertas del flujo de datos. -
AT_TIMESTAMP: la respuesta incluye todas las particiones cuya marca de tiempo de inicio es anterior o igual a la marca de tiempo dada y cuya marca de tiempo de finalización es posterior o igual que la marca de tiempo dada, o que aún están abiertas.
ShardFilterse utiliza al crear arrendamientos para una tabla de arrendamiento vacía con el fin de inicializar los arrendamientos de una instantánea de las particiones especificadas enRetrievalConfig#initialPositionInStreamExtended.Para obtener más información acerca de
ShardFilter, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
En lugar de que todos los procesos de trabajo realicen la sincronización entre arrendamientos y particiones para mantener la tabla de arrendamiento actualizada con las particiones más recientes del flujo de datos, un único líder del proceso de trabajo elegido realiza la sincronización entre arrendamientos y particiones.
-
KCL 2.3 utiliza el parámetro de devolución
ChildShardsde las APIGetRecordsySubscribeToShardpara realizar la sincronización entre arrendamientos y particiones que se produce enSHARD_ENDpara las particiones cerradas, lo que permite a un proceso de trabajo de KCL crear arrendamientos solo para las particiones secundarias de la partición que ha terminado de procesar. Para compartirla entre aplicaciones de consumo, esta optimización de la sincronización entre arrendamientos y particiones utiliza el parámetroChildShardsde la APIGetRecords. En el caso de las aplicaciones de consumo dedicadas al rendimiento (distribución mejorada), esta optimización de la sincronización entre arrendamientos y particiones utiliza el parámetroChildShardsde la APISubscribeToShard. Para obtener más información, consulte GetRecords, SubscribeToShards y ChildShard. -
Con los cambios anteriores, el comportamiento de KCL está pasando del modelo en el que todos los procesos de trabajo aprenden sobre todas las particiones existentes al modelo en el que los procesos de trabajo aprenden solo sobre las particiones secundarias de las particiones que son propiedad de cada proceso de trabajo. Por lo tanto, además de la sincronización que se produce durante el arranque de las aplicaciones de consumo y los eventos de repartición, KCL ahora también realiza exámenes periódicos adicionales de las particiones o arrendamientos para identificar cualquier posible laguna en la tabla de arrendamiento (en otras palabras, para obtener información sobre todas las particiones nuevas) a fin de garantizar que se procese todo el rango de hash del flujo de datos y crear arrendamientos para ellos si es necesario.
PeriodicShardSyncManageres el componente responsable de ejecutar exámenes periódicos de arrendamientos o particiones.Para obtener más información sobre
PeriodicShardSyncManageren KCL 2.3, consulte https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213. En KCL 2.3, hay nuevas opciones de configuración disponibles para configurar
PeriodicShardSyncManagerenLeaseManagementConfig:Nombre Valor predeterminado Descripción leasesRecoveryAuditorExecutionFrequencyMillis 120 000 (2 minutos)
Frecuencia (en milisegundos) del trabajo del auditor para buscar arrendamientos parciales en la tabla de arrendamiento. Si el auditor detecta lagunas en los arrendamientos de un flujo, activará la sincronización de las particiones basándose en
leasesRecoveryAuditorInconsistencyConfidenceThreshold.leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
El umbral de confianza para el trabajo de auditor periódico permite determinar si los arrendamientos de un flujo de datos de la tabla de arrendamiento son incoherentes. Si el auditor encuentra varias veces el mismo conjunto de incoherencias consecutivas en un flujo de datos, se activará una sincronización de particiones.
Ahora también se emiten nuevas métricas de CloudWatch para supervisar el estado de
PeriodicShardSyncManager. Para obtener más información, consulte PeriodicShardSyncManager. -
Incluye una optimización de
HierarchicalShardSyncerpara crear solo arrendamientos para una capa de particiones.
Sincronización en KCL 1.x, a partir de KCL 1.14 y versiones posteriores
A partir de las últimas versiones compatibles de KCL 1.x (KCL 1.14) y versiones posteriores, la biblioteca ahora admite los siguientes cambios en el proceso de sincronización. Estos cambios en la sincronización entre arrendamientos y particiones reducen significativamente el número de llamadas a la API que realizan las aplicaciones de consumo de KCL al servicio Kinesis Data Streams y optimizan la administración de arrendamientos en su aplicación de consumo de KCL.
-
Durante el arranque de la aplicación, si la tabla de arrendamiento está vacía, KCL utiliza la opción de filtrado de la API
ListShard(el parámetro de solicitudShardFilteropcional) para recuperar y crear arrendamientos únicamente para una instantánea de las particiones abiertas en el momento especificado por el parámetroShardFilter. El parámetroShardFilterpermite filtrar la respuesta de la APIListShards. La única propiedad obligatoria del parámetroShardFilteresType. KCL utiliza la propiedad de filtroTypey los siguientes valores válidos para identificar y devolver una instantánea de las particiones abiertas que podrían requerir nuevos arrendamientos:-
AT_TRIM_HORIZON: la respuesta incluye todas las particiones que estaban abiertas enTRIM_HORIZON. -
AT_LATEST: la respuesta incluye solo las particiones actualmente abiertas del flujo de datos. -
AT_TIMESTAMP: la respuesta incluye todas las particiones cuya marca de tiempo de inicio es anterior o igual a la marca de tiempo dada y cuya marca de tiempo de finalización es posterior o igual que la marca de tiempo dada, o que aún están abiertas.
ShardFilterse utiliza al crear arrendamientos para una tabla de arrendamiento vacía con el fin de inicializar los arrendamientos de una instantánea de las particiones especificadas enKinesisClientLibConfiguration#initialPositionInStreamExtended.Para obtener más información acerca de
ShardFilter, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
En lugar de que todos los procesos de trabajo realicen la sincronización entre arrendamientos y particiones para mantener la tabla de arrendamiento actualizada con las particiones más recientes del flujo de datos, un único líder del proceso de trabajo elegido realiza la sincronización entre arrendamientos y particiones.
-
KCL 1.14 utiliza el parámetro de devolución
ChildShardsde las APIGetRecordsySubscribeToShardpara realizar la sincronización entre arrendamientos y particiones que se produce enSHARD_ENDpara las particiones cerradas, lo que permite a un proceso de trabajo de KCL crear arrendamientos solo para las particiones secundarias de la partición que ha terminado de procesar. Para obtener más información, consulte GetRecords y ChildShard. -
Con los cambios anteriores, el comportamiento de KCL está pasando del modelo en el que todos los procesos de trabajo aprenden sobre todas las particiones existentes al modelo en el que los procesos de trabajo aprenden solo sobre las particiones secundarias de las particiones que son propiedad de cada proceso de trabajo. Por lo tanto, además de la sincronización que se produce durante el arranque de las aplicaciones de consumo y los eventos de repartición, KCL ahora también realiza exámenes periódicos adicionales de las particiones o arrendamientos para identificar cualquier posible laguna en la tabla de arrendamiento (en otras palabras, para obtener información sobre todas las particiones nuevas) a fin de garantizar que se procese todo el rango de hash del flujo de datos y crear arrendamientos para ellos si es necesario.
PeriodicShardSyncManageres el componente responsable de ejecutar exámenes periódicos de arrendamientos o particiones.Cuando
KinesisClientLibConfiguration#shardSyncStrategyTypeestá establecido enShardSyncStrategyType.SHARD_END,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThresholdse utiliza para determinar el umbral del número de exámenes consecutivos que contienen lagunas en la tabla de arrendamiento, tras lo cual se exige la sincronización de las particiones. CuandoKinesisClientLibConfiguration#shardSyncStrategyTypese establece enShardSyncStrategyType.PERIODIC,leasesRecoveryAuditorInconsistencyConfidenceThresholdse ignora.Para obtener más información sobre
PeriodicShardSyncManageren KCL 1.14, consulte https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999. En KCL 1.14, hay una nueva opción de configuración disponible para configurar
PeriodicShardSyncManagerenLeaseManagementConfig:Nombre Valor predeterminado Descripción leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
El umbral de confianza para el trabajo de auditor periódico permite determinar si los arrendamientos de un flujo de datos de la tabla de arrendamiento son incoherentes. Si el auditor encuentra varias veces el mismo conjunto de incoherencias consecutivas en un flujo de datos, se activará una sincronización de particiones.
Ahora también se emiten nuevas métricas de CloudWatch para supervisar el estado de
PeriodicShardSyncManager. Para obtener más información, consulte PeriodicShardSyncManager. -
KCL 1.14 ahora también admite la limpieza de arrendamientos diferidos.
LeaseCleanupManagerelimina los arrendamientos de forma asíncrona al llegar aSHARD_END, cuando una partición ha caducado pasado el periodo de retención del flujo de datos o se ha cerrado como resultado de una operación de repartición.Están disponibles nuevas opciones de configuración para
LeaseCleanupManager.Nombre Valor predeterminado Descripción leaseCleanupIntervalMillis 1 minuto
Intervalo en el que se ejecuta el subproceso de limpieza del arrendamiento.
completedLeaseCleanupIntervalMillis 5 minutos Intervalo para comprobar si un arrendamiento se ha completado o no.
garbageLeaseCleanupIntervalMillis 30 minutos Intervalo en el que se comprueba si un arrendamiento es un elemento no utilizado (es decir, si ha superado el periodo de retención del flujo de datos) o no.
-
Incluye una optimización de
KinesisShardSyncerpara crear solo arrendamientos para una capa de particiones.
Procesar varios flujos de datos con el mismo KCL 2.x para aplicaciones de consumo de Java
En esta sección se describen los siguientes cambios en KCL 2.x para Java, que permiten crear aplicaciones de consumo de KCL que pueden procesar más de un flujo de datos al mismo tiempo.
importante
El procesamiento de varios flujos solo se admite en KCL 2.x para Java, a partir de KCL 2.3 para Java y versiones posteriores.
El procesamiento de varios flujos NO es compatible con ningún otro lenguaje en el que se pueda implementar KCL 2.x.
El procesamiento de varios flujos NO es compatible con ninguna versión de KCL 1.x.
-
Interfaz MultistreamTracker
Para crear una aplicación de consumo que pueda procesar varios flujos al mismo tiempo, debe implementar una nueva interfaz llamada MultistreamTracker
. Esta interfaz incluye el método streamConfigListque devuelve la lista de flujos de datos y sus configuraciones para que los procese la aplicación de consumo de KCL. Tenga en cuenta que los flujos de datos que se procesan pueden cambiar durante el tiempo de ejecución de la aplicación de consumo. KCL llama astreamConfigListperiódicamente para obtener información sobre los cambios en los flujos de datos que se van a procesar.El método
streamConfigListrellena la lista StreamConfig. package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }Tenga en cuenta que los campos
StreamIdentifieryInitialPositionInStreamExtendedson obligatorios, aunqueconsumerArnes opcional. Debe proporcionarconsumerArnúnicamente si utiliza KCL 2.x para implementar una aplicación de consumo con distribución mejorada.Para obtener más información sobre
StreamIdentifier, consulte https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129. Para crear un StreamIdentifier, le recomendamos que cree una instancia multiflujo a partir destreamArny lastreamCreationEpochque esté disponible en la versión 2.5.0 y versiones posteriores. En las versiones KCL 2.3 y 2.4, que no son compatibles constreamArm, cree una instancia multiflujo con el formatoaccount-id:StreamName:streamCreationTimestamp. Este formato quedará obsoleto y dejará de ser compatible a partir de la próxima versión principal.MultistreamTrackertambién incluye una estrategia para eliminar los arrendamientos de flujos antiguos en la tabla de arrendamiento (formerStreamsLeasesDeletionStrategy). Tenga en cuenta que la estrategia NO SE PUEDE cambiar durante el tiempo de ejecución de la aplicación de consumo. Para obtener más información, consulte https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java. -
ConfigsBuilder
es una clase que abarca toda la aplicación y que puede utilizar para especificar todos los ajustes de configuración de KCL 2.x que se utilizarán al crear su aplicación de consumo de KCL. La clase ConfigsBuilderahora es compatible con la interfaz deMultistreamTracker. Puede inicializar ConfigsBuilder con el nombre del flujo de datos del que desee consumir los registros:/** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }O bien, puede inicializar ConfigsBuilder con
MultiStreamTrackersi desea implementar una aplicación de consumo de KCL que procese varios flujos al mismo tiempo.* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; } -
Con la compatibilidad con varios flujos implementada para la aplicación de consumo de KCL, cada fila de la tabla de arrendamiento de la aplicación ahora contiene el ID de la partición y el nombre del flujo de los varios flujos de datos que procesa esta aplicación.
-
Cuando se implementa la compatibilidad con varios flujos para la aplicación de consumo de KCL, leaseKey adopta la siguiente estructura:
account-id:StreamName:streamCreationTimestamp:ShardId. Por ejemplo,111111111:multiStreamTest-1:12345:shardId-000000000336.importante
Cuando la aplicación de consumo de KCL existente está configurada para procesar solo un flujo de datos, leaseKey (que es la clave hash de la tabla de arrendamiento) es el ID de la partición. Si vuelve a configurar esta aplicación de consumo de KCL existente para procesar varios flujos de datos, se rompe la tabla de arrendamiento, ya que con la compatibilidad con varios flujos, la estructura de leaseKey debe ser la siguiente:
account-id:StreamName:StreamCreationTimestamp:ShardId.
Utilizar la KCL con el registro de esquemas AWS Glue
Puede integrar Kinesis Data Streams con AWS Glue Schema Registry. AWS Glue Schema Registry le permite descubrir, controlar y evolucionar de forma centralizada esquemas, además de garantizar que un esquema registrado valide de forma continua los datos generados. Un esquema define la estructura y el formato de un registro de datos. Un esquema es una especificación versionada para publicación, consumo o almacenamiento de confianza de datos. AWS Glue Schema Registry permite mejorar la calidad de los datos de principio a fin y la gobernanza de datos en las aplicaciones de transmisión. Para obtener más información, consulte AWS Glue Schema Registry. Una de las formas de configurar esta integración es a través de KCL en Java.
importante
Actualmente, la integración de Kinesis Data Streams y AWS Glue Schema Registry solo se admite para las instancias de Kinesis Data Streams que utilizan aplicaciones de consumo de KCL 2.3 implementadas en Java. No se proporciona soporte multilingüe. No se admiten las aplicaciones de consumo de KCL 1.0. No se admiten las aplicaciones de consumo de KCL 2.x anteriores a KCL 2.3.
Para obtener instrucciones detalladas sobre cómo configurar la integración de Kinesis Data Streams con Schema Registry mediante KCL, consulte la sección “Interacción con los datos mediante las bibliotecas de KPL/KCL” en Caso de uso: integración de Amazon Kinesis Data Streams con AWS Glue Schema Registry.