翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
KCL によるマルチストリーム処理
このセクションでは、複数のデータストリームを同時に処理できる KCL コンシューマーアプリケーションを作成できる KCL で必要な変更について説明します。
重要
-
マルチストリーム処理は KCL 2.3 以降でのみサポートされています。
-
マルチストリーム処理は、 で実行される Java 以外の言語で記述された KCL コンシューマーではサポートされていません
multilangdaemon。 -
マルチストリーム処理は、KCL 1.x のどのバージョンでもサポートされていません。
-
MultistreamTracker インターフェイス
-
複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、MultistreamTracker
という新しいインターフェイスを実装する必要があります。このインターフェースには、KCL コンシューマーアプリケーションによって処理されるデータストリームとその設定のリストを返す streamConfigListメソッドが含まれています。処理中のデータストリームは、コンシューマーアプリケーションのランタイム中に変更できることに注意してください。streamConfigListは、処理するデータストリームの変更について知るために KCL によって定期的に呼び出されます。 -
は
streamConfigListStreamConfigリストを入力します。
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }-
StreamIdentifierとInitialPositionInStreamExtendedは必須フィールドで、consumerArnはオプションです。拡張ファンアウトコンシューマーアプリケーションの実装に KCL を使用している場合consumerArnのみ、 を指定する必要があります。 -
の詳細については
StreamIdentifier、https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129を参照してください。を作成するには StreamIdentifier、streamArnと KCL 2.5.0 以降でstreamCreationEpoch利用可能な からマルチストリームインスタンスを作成することをお勧めします。streamArmをサポートしていない KCL v2.3 および v2.4 では、account-id:StreamName:streamCreationTimestamp形式を使用してマルチストリームインスタンスを作成します。この形式は廃止され、次のメジャーリリース以降はサポートされなくなります。 -
MultistreamTracker には、リーステーブル (formerStreamsLeasesDeletionStrategy) で古いストリームのリースを削除するための戦略も含まれています。コンシューマーアプリケーションのランタイム中は、ストラテジーを変更できないことに注意してください。詳細については、https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java
を参照してください。
-
または、複数のストリームを同時に処理する KCL コンシューマーアプリケーションを実装する場合、MultiStreamTracker で ConfigsBuilder を初期化することもできます。
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
KCL コンシューマーアプリケーションにマルチストリームサポートが実装されている場合、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれるようになりました。
-
KCL コンシューマーアプリケーションのマルチストリームサポートが実装されている場合、 leaseKey は次の構造になります:
account-id:StreamName:streamCreationTimestamp:ShardId。例えば、111111111:multiStreamTest-1:12345:shardId-000000000336。
重要
既存の KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理するように設定されている場合、 leaseKey (リーステーブルのパーティションキー) はシャード ID です。既存の KCL コンシューマーアプリケーションを再設定して複数のデータストリームを処理する場合、マルチストリームをサポートするaccount-id:StreamName:StreamCreationTimestamp:ShardIdには leaseKey構造が次のようになっている必要があるため、リーステーブルが壊れます。