使用 KCL 进行多流处理 - Amazon Kinesis Data Streams

使用 KCL 进行多流处理

本节介绍了 KCL 中所需的更改,这些更改让您能够创建可同时处理多个数据流的 KCL 消费端应用程序。

重要
  • 只有 KCL 2.3 或更高版本才支持多流处理功能。

  • 使用非 Java 语言编写的通过 multilangdaemon 运行的 KCL 消费端支持多流处理功能。

  • 任何版本的 KCL 1.x 都支持多流处理功能。

  • MultistreamTracker 接口

    • 要构建可以同时处理多个流的消费端应用程序,必须实现一个名为 MultistreamTracker 的新接口。此接口包括返回要由 KCL 消费端应用程序处理的数据流及其配置列表的 streamConfigList 方法。请注意,正在处理的数据流可以在消费端应用程序运行时进行更改。KCL 会定期调用 streamConfigList 来了解要处理的数据流的变化。

    • streamConfigList 会填充 StreamConfig 列表。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

如果要实现同时处理多个流的 KCL 消费端应用程序,您也可以使用 MultiStreamTracker 来初始化 ConfigsBuilder。

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • 通过为您的 KCL 消费端应用程序实现多流支持功能,应用程序租约表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。

  • 在为 KCL 消费端应用程序实现多流支持功能后,leaseKey 采用以下结构:account-id:StreamName:streamCreationTimestamp:ShardId。例如 111111111:multiStreamTest-1:12345:shardId-000000000336

重要

当现有 KCL 消费端应用程序配置为仅处理一个数据流时,leaseKey(租约表的分区键)就是分片 ID。如果您将此现有 KCL 消费端应用程序重新配置为处理多个数据流,则会破坏租约表,因为 leaseKey 必须采用如下结构才能支持多流功能:account-id:StreamName:StreamCreationTimestamp:ShardId