Procesamiento de varios flujos con KCL - Amazon Kinesis Data Streams

Procesamiento de varios flujos con KCL

En esta sección se describen los cambios necesarios en KCL, que permiten crear aplicaciones de consumo de KCL que pueden procesar más de un flujo de datos al mismo tiempo.

importante
  • El procesamiento de varios flujos solo es compatible con KCL 2.3 o versiones posteriores.

  • Los consumidores de KCL que estén escritos en lenguajes distintos de Java y que funcionen con multilangdaemon no son compatibles con el procesamiento de varios flujos.

  • El procesamiento de varios flujos no es compatible con ninguna versión de KCL 1.x.

  • Interfaz MultistreamTracker

    • Para crear una aplicación de consumo que pueda procesar varios flujos al mismo tiempo, debe implementar una nueva interfaz llamada MultistreamTracker. Esta interfaz incluye el método streamConfigList que devuelve la lista de flujos de datos y sus configuraciones para que los procese la aplicación de consumo de KCL. Tenga en cuenta que los flujos de datos que se procesan pueden cambiar durante el tiempo de ejecución de la aplicación de consumo. KCL llama a streamConfigList periódicamente para obtener información sobre los cambios en los flujos de datos que se van a procesar.

    • La streamConfigList rellena la lista StreamConfig.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

O bien, puede inicializar ConfigsBuilder con MultiStreamTracker si desea implementar una aplicación de consumo de KCL que procese varios flujos al mismo tiempo.

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Con la compatibilidad con varios flujos implementada para la aplicación de consumo de KCL, cada fila de la tabla de arrendamiento de la aplicación ahora contiene el ID de la partición y el nombre del flujo de los varios flujos de datos que procesa esta aplicación.

  • Cuando se implementa la compatibilidad con varios flujos para la aplicación de consumo de KCL, leaseKey adopta la siguiente estructura: account-id:StreamName:streamCreationTimestamp:ShardId. Por ejemplo, 111111111:multiStreamTest-1:12345:shardId-000000000336.

importante

Cuando la aplicación de consumo de KCL existente está configurada para procesar solo un flujo de datos, leaseKey (que es la clave de partición de la tabla de arrendamiento) es el ID de la partición. Si vuelve a configurar una aplicación de consumo de KCL existente para procesar varios flujos de datos, se rompe la tabla de arrendamiento, ya que la estructura de leaseKey debe ser la siguiente: account-id:StreamName:StreamCreationTimestamp:ShardId para admitir varios flujos.