Procesamiento de varios flujos con KCL
En esta sección se describen los cambios necesarios en KCL, que permiten crear aplicaciones de consumo de KCL que pueden procesar más de un flujo de datos al mismo tiempo.
importante
-
El procesamiento de varios flujos solo es compatible con KCL 2.3 o versiones posteriores.
-
Los consumidores de KCL que estén escritos en lenguajes distintos de Java y que funcionen con
multilangdaemonno son compatibles con el procesamiento de varios flujos. -
El procesamiento de varios flujos no es compatible con ninguna versión de KCL 1.x.
-
Interfaz MultistreamTracker
-
Para crear una aplicación de consumo que pueda procesar varios flujos al mismo tiempo, debe implementar una nueva interfaz llamada MultistreamTracker
. Esta interfaz incluye el método streamConfigListque devuelve la lista de flujos de datos y sus configuraciones para que los procese la aplicación de consumo de KCL. Tenga en cuenta que los flujos de datos que se procesan pueden cambiar durante el tiempo de ejecución de la aplicación de consumo. KCL llama astreamConfigListperiódicamente para obtener información sobre los cambios en los flujos de datos que se van a procesar. -
La
streamConfigListrellena la lista StreamConfig.
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }-
Los campos
StreamIdentifieryInitialPositionInStreamExtendedson obligatorios, aunqueconsumerArnes opcional. Debe proporcionarconsumerArnúnicamente si utiliza KCL para implementar una aplicación de consumo con distribución ramificada mejorada. -
Para obtener más información sobre
StreamIdentifier, consulte https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129. Para crear un StreamIdentifier, le recomendamos que cree una instancia multiflujo a partir destreamArny lastreamCreationEpochque esté disponible en KCL 2.5.0 o versiones posteriores. En las versiones KCL 2.3 y 2.4, que no son compatibles constreamArm, cree una instancia multiflujo con el formatoaccount-id:StreamName:streamCreationTimestamp. Este formato quedará obsoleto y dejará de ser compatible a partir de la próxima versión principal. -
MultistreamTracker también incluye una estrategia para eliminar los arrendamientos de flujos antiguos en la tabla de arrendamiento (formerStreamsLeasesDeletionStrategy). Tenga en cuenta que la estrategia NO SE PUEDE cambiar durante el tiempo de ejecución de la aplicación de consumo. Para obtener más información, consulte https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java
.
-
O bien, puede inicializar ConfigsBuilder con MultiStreamTracker si desea implementar una aplicación de consumo de KCL que procese varios flujos al mismo tiempo.
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Con la compatibilidad con varios flujos implementada para la aplicación de consumo de KCL, cada fila de la tabla de arrendamiento de la aplicación ahora contiene el ID de la partición y el nombre del flujo de los varios flujos de datos que procesa esta aplicación.
-
Cuando se implementa la compatibilidad con varios flujos para la aplicación de consumo de KCL, leaseKey adopta la siguiente estructura:
account-id:StreamName:streamCreationTimestamp:ShardId. Por ejemplo,111111111:multiStreamTest-1:12345:shardId-000000000336.
importante
Cuando la aplicación de consumo de KCL existente está configurada para procesar solo un flujo de datos, leaseKey (que es la clave de partición de la tabla de arrendamiento) es el ID de la partición. Si vuelve a configurar una aplicación de consumo de KCL existente para procesar varios flujos de datos, se rompe la tabla de arrendamiento, ya que la estructura de leaseKey debe ser la siguiente: account-id:StreamName:StreamCreationTimestamp:ShardId para admitir varios flujos.