Processamento de vários fluxos com a KCL - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Processamento de vários fluxos com a KCL

Esta seção descreve as alterações necessárias na KCL que permitem criar aplicações de consumo da KCL que podem processar mais de um fluxo de dados ao mesmo tempo.

Importante
  • O processamento de vários fluxos é compatível somente na KCL 2.3 ou versão posterior.

  • O processamento de vários fluxos não será compatível se houver consumidores da KCL criados em linguagens não Java e executados com multilangdaemon.

  • O processamento de vários fluxos não tem suporte em nenhuma versão da KCL 1.x.

  • MultistreamTracker interface

    • Para criar um aplicativo de consumidor que possa processar vários fluxos ao mesmo tempo, você deve implementar uma nova interface chamada MultistreamTracker. Essa interface inclui o método streamConfigList, que retorna a lista de fluxos de dados, e suas configurações, a serem processados pela aplicação de consumo da KCL. Observe que os fluxos de dados processados podem ser alterados durante o runtime da aplicação de consumo. streamConfigList é chamado periodicamente pela KCL para obter informações das mudanças nos fluxos de dados a serem processados.

    • O streamConfigList preenche a StreamConfiglista.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

Ou você pode inicializar ConfigsBuilder com MultiStreamTracker se quiser implementar um aplicativo consumidor KCL que processe vários fluxos ao mesmo tempo.

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Com o suporte para vários fluxos implementado para sua aplicação de consumo da KCL, cada linha da tabela de concessões da aplicação contém agora a ID do fragmento e o nome do fluxo dos vários fluxos de dados que esta aplicação processa.

  • Quando o suporte para vários fluxos para sua aplicação de consumo da KCL é implementado, leaseKey assume a estrutura account-id:StreamName:streamCreationTimestamp:ShardId. Por exemplo, .111111111:multiStreamTest-1:12345:shardId-000000000336

Importante

Quando sua aplicação de consumo da KCL atual está configurada para processar somente um fluxo de dados, a leaseKey (que é a chave de partição da tabela de concessões) é a ID do fragmento. Se você reconfigurar uma aplicação de consumo da KCL atual para processar vários fluxos de dados, a tabela de concessões será quebrada, pois a estrutura da leaseKey deve ser account-id:StreamName:StreamCreationTimestamp:ShardId para oferecer suporte a vários fluxos.