使用 KCL 进行多流处理
本节介绍了 KCL 中所需的更改,这些更改让您能够创建可同时处理多个数据流的 KCL 消费端应用程序。
重要
-
只有 KCL 2.3 或更高版本才支持多流处理功能。
-
使用非 Java 语言编写的通过
multilangdaemon运行的 KCL 消费端不支持多流处理功能。 -
任何版本的 KCL 1.x 都不支持多流处理功能。
-
MultistreamTracker 接口
-
要构建可以同时处理多个流的消费端应用程序,必须实现一个名为 MultistreamTracker
的新接口。此接口包括返回要由 KCL 消费端应用程序处理的数据流及其配置列表的 streamConfigList方法。请注意,正在处理的数据流可以在消费端应用程序运行时进行更改。KCL 会定期调用streamConfigList来了解要处理的数据流的变化。 -
streamConfigList会填充 StreamConfig列表。
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }-
StreamIdentifier和InitialPositionInStreamExtended是必填字段,consumerArn是选填字段。只有在使用 KCL 实现增强型扇出消费端应用程序时,才必须提供consumerArn。 -
有关
StreamIdentifier的更多信息,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129。要创建 StreamIdentifier,我们建议您从 KCL 2.5.0 或更高版本提供的streamArn和streamCreationEpoch创建一个多流实例。在不支持streamArm的 KCL v2.3 和 v2.4 中,请使用格式account-id:StreamName:streamCreationTimestamp创建一个多流实例。从下一个主要版本开始,此格式将弃用且不再受支持。 -
MultistreamTracker 还包括可删除租约表中旧流租约的策略 (formerStreamsLeasesDeletionStrategy)。请注意,在消费端应用程序运行时无法更改策略。有关更多信息,请参阅 https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java
。
-
如果要实现同时处理多个流的 KCL 消费端应用程序,您也可以使用 MultiStreamTracker 来初始化 ConfigsBuilder。
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
通过为您的 KCL 消费端应用程序实现多流支持功能,应用程序租约表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。
-
在为 KCL 消费端应用程序实现多流支持功能后,leaseKey 采用以下结构:
account-id:StreamName:streamCreationTimestamp:ShardId。例如111111111:multiStreamTest-1:12345:shardId-000000000336。
重要
当现有 KCL 消费端应用程序配置为仅处理一个数据流时,leaseKey(租约表的分区键)就是分片 ID。如果您将此现有 KCL 消费端应用程序重新配置为处理多个数据流,则会破坏租约表,因为 leaseKey 必须采用如下结构才能支持多流功能:account-id:StreamName:StreamCreationTimestamp:ShardId。