KCL を使用したマルチストリーム処理 - Amazon Kinesis Data Streams

KCL を使用したマルチストリーム処理

このセクションでは、複数のデータストリームを同時に処理できる KCL コンシューマーアプリケーションを作成するために必要な KCL の変更点について説明します。

重要
  • マルチストリーム処理は KCL 2.3 以降でのみサポートされています。

  • multilangdaemon を使用して実行される、Java 以外の言語で記述された KCL コンシューマーでは、マルチストリーム処理はサポートされていません。

  • マルチストリーム処理は KCL 1.x のどのバージョンでもサポートされていません。

  • MultistreamTracker インターフェイス

    • 複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、MultistreamTracker という新しいインターフェイスを実装する必要があります。このインターフェースには、KCL コンシューマーアプリケーションによって処理されるデータストリームとその設定のリストを返す streamConfigList メソッドが含まれています。処理中のデータストリームは、コンシューマーアプリケーションのランタイム中に変更できることに注意してください。streamConfigList は、処理するデータストリームの変更について学習するために KCL によって定期的に呼び出されます。

    • streamConfigListStreamConfig リストに入力します。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

または、複数のストリームを同時に処理する KCL コンシューマーアプリケーションを実装する場合、MultiStreamTracker で ConfigsBuilder を初期化することもできます。

* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • KCL コンシューマーアプリケーションにマルチストリームサポートが実装されているため、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれるようになりました。

  • KCL コンシューマーアプリケーションのマルチストリームサポートが実装されている場合、leaseKey は次の構造を取ります: account-id:StreamName:streamCreationTimestamp:ShardId。例えば、111111111:multiStreamTest-1:12345:shardId-000000000336

重要

既存の KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理するように構成されている場合、leaseKey (リーステーブルのパーティションキー) はシャード ID です。この既存の KCL コンシューマーアプリケーションを複数のデータストリームを処理するように再構成すると、リーステーブルが壊れます。これは、マルチストリームをサポートするためには、leaseKey の構造が次の形式である必要があるためです: account-id:StreamName:StreamCreationTimestamp:ShardId