

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# KCL を使用したマルチストリーム処理
<a name="kcl-multi-stream"></a>

このセクションでは、複数のデータストリームを同時に処理できる KCL コンシューマーアプリケーションを作成するために必要な KCL の変更点について説明します。
**重要**  
マルチストリーム処理は KCL 2.3 以降でのみサポートされています。
`multilangdaemon` を使用して実行される、Java 以外の言語で記述された KCL コンシューマーでは、マルチストリーム処理はサポートされていません。**
マルチストリーム処理は KCL 1.x のどのバージョンでもサポートされていません。**
+ **MultistreamTracker インターフェイス**
  + 複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、[MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java) という新しいインターフェイスを実装する必要があります。このインターフェースには、KCL コンシューマーアプリケーションによって処理されるデータストリームとその設定のリストを返す `streamConfigList` メソッドが含まれています。処理中のデータストリームは、コンシューマーアプリケーションのランタイム中に変更できることに注意してください。`streamConfigList` は、処理するデータストリームの変更について学習するために KCL によって定期的に呼び出されます。
  + `streamConfigList` が [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) リストに入力します。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + `StreamIdentifier` および `InitialPositionInStreamExtended` は必須フィールドですが、`consumerArn` は省略可能です。KCL を使用して拡張ファンアウトコンシューマーアプリケーションを実装する場合にのみ、`consumerArn` を提供する必要があります。
  + の詳細については`StreamIdentifier`、[https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129) を参照してください。`StreamIdentifier` を作成するには、`streamArn` および KCL 2.5.0 以降で利用可能な `streamCreationEpoch` からマルチストリームインスタンスを作成することをお勧めします。`streamArm` をサポートしていない KCL v2.3 および v2.4 では、`account-id:StreamName:streamCreationTimestamp` 形式を使用してマルチストリームインスタンスを作成します。この形式は廃止され、次のメジャーリリース以降はサポートされなくなります。
  +  MultistreamTracker には、リーステーブル内の古いストリームのリースを削除するための戦略 (formerStreamsLeasesDeletionStrategy) も含まれています。コンシューマーアプリケーションのランタイム中は、ストラテジーを変更できないことに注意してください。詳細については、[https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java) を参照してください。
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) は、KCL 2.x 以降向けの KCL コンシューマーアプリケーションを構築する際に使用する KCL のすべての設定項目をアプリケーション全体で指定できるクラスです。`ConfigsBuilder` クラスは `MultistreamTracker` インターフェイスをサポートするようになりました。ConfigsBuilder は、レコードを消費する 1 つのデータストリームの名前を使用して初期化できます。 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

または、複数のストリームを同時に処理する KCL コンシューマーアプリケーションを実装する場合、`MultiStreamTracker` で ConfigsBuilder を初期化することもできます。

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ KCL コンシューマーアプリケーションにマルチストリームサポートが実装されているため、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれるようになりました。
+ KCL コンシューマーアプリケーションのマルチストリームサポートが実装されている場合、leaseKey は次の構造を取ります: `account-id:StreamName:streamCreationTimestamp:ShardId`。例えば、`111111111:multiStreamTest-1:12345:shardId-000000000336`。

**重要**  
既存の KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理するように構成されている場合、`leaseKey` (リーステーブルのパーティションキー) はシャード ID です。この既存の KCL コンシューマーアプリケーションを複数のデータストリームを処理するように再構成すると、リーステーブルが壊れます。これは、マルチストリームをサポートするためには、`leaseKey` の構造が次の形式である必要があるためです: `account-id:StreamName:StreamCreationTimestamp:ShardId`。