As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Processamento de vários fluxos com a KCL
Esta seção descreve as alterações necessárias na KCL que permitem criar aplicações de consumo da KCL que podem processar mais de um fluxo de dados ao mesmo tempo.
Importante
-
O processamento de vários fluxos é compatível somente na KCL 2.3 ou versão posterior.
-
O processamento de vários fluxos não será compatível se houver consumidores da KCL criados em linguagens não Java e executados com
multilangdaemon. -
O processamento de vários fluxos não tem suporte em nenhuma versão da KCL 1.x.
-
MultistreamTracker interface
-
Para criar um aplicativo de consumidor que possa processar vários fluxos ao mesmo tempo, você deve implementar uma nova interface chamada MultistreamTracker
. Essa interface inclui o método streamConfigList, que retorna a lista de fluxos de dados, e suas configurações, a serem processados pela aplicação de consumo da KCL. Observe que os fluxos de dados processados podem ser alterados durante o runtime da aplicação de consumo.streamConfigListé chamado periodicamente pela KCL para obter informações das mudanças nos fluxos de dados a serem processados. -
O
streamConfigListpreenche a StreamConfiglista.
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }-
Os campos
StreamIdentifiereInitialPositionInStreamExtendedsão obrigatórios, enquantoconsumerArné opcional. Só é necessário fornecerconsumerArnse você estiver usando a KCL para implementar uma aplicação de consumo de distribuição avançada. -
Para obter mais informações sobre
StreamIdentifier, consulte https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129. Para criar um StreamIdentifier, recomenda-se a criação de uma instância de vários fluxos a partir dostreamArne dostreamCreationEpochque esteja disponível na KCL 2.5.0 ou versões posteriores. Na KCL v2.3 e v2.4, que não oferecem suporte aostreamArm, crie uma instância multifluxo usando o formatoaccount-id:StreamName:streamCreationTimestamp. Esse formato será descontinuado e não terá mais suporte a partir da próxima versão principal. -
MultistreamTracker também inclui uma estratégia para excluir locações de fluxos antigos na tabela de locação (). formerStreamsLeases DeletionStrategy Observe que a estratégia NÃO PODE ser alterada durante o runtime da aplicação de consumo. Para obter mais informações, consulte https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0 b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy .java
.
-
Ou você pode inicializar ConfigsBuilder com MultiStreamTracker se quiser implementar um aplicativo consumidor KCL que processe vários fluxos ao mesmo tempo.
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Com o suporte para vários fluxos implementado para sua aplicação de consumo da KCL, cada linha da tabela de concessões da aplicação contém agora a ID do fragmento e o nome do fluxo dos vários fluxos de dados que esta aplicação processa.
-
Quando o suporte para vários fluxos para sua aplicação de consumo da KCL é implementado, leaseKey assume a estrutura
account-id:StreamName:streamCreationTimestamp:ShardId. Por exemplo, .111111111:multiStreamTest-1:12345:shardId-000000000336
Importante
Quando sua aplicação de consumo da KCL atual está configurada para processar somente um fluxo de dados, a leaseKey (que é a chave de partição da tabela de concessões) é a ID do fragmento. Se você reconfigurar uma aplicação de consumo da KCL atual para processar vários fluxos de dados, a tabela de concessões será quebrada, pois a estrutura da leaseKey deve ser account-id:StreamName:StreamCreationTimestamp:ShardId para oferecer suporte a vários fluxos.