

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 KCL 進行多串流處理
<a name="kcl-multi-stream"></a>

本節說明 KCL 中的必要變更，可讓您建立可同時處理多個資料串流的 KCL 取用者應用程式。
**重要**  
只有 KCL 2.3 或更新版本才支援多串流處理。
使用 執行的非 Java 語言撰寫的 KCL 取用者*不支援*多串流處理`multilangdaemon`。
任何 KCL 1.x 版本*都*不支援多串流處理。
+ **MultistreamTracker interface**
  + 若要建置可以同時處理多個串流的取用者應用程式，您必須實作名為 [MultiStreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java) 的新介面。此介面包含傳回資料串流清單及其組態的 `streamConfigList` 方法，以供 KCL 取用者應用程式處理。請注意，正在處理的資料串流可以在取用者應用程式執行時間期間變更。KCL `streamConfigList`會定期呼叫 ，以了解要處理的資料串流中的變更。
  + 會填入 `streamConfigList` [StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23) 清單。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + `StreamIdentifier` 和 `InitialPositionInStreamExtended`是必要欄位，而 `consumerArn`是選用欄位。`consumerArn` 只有在您使用 KCL 實作增強型廣發消費者應用程式時，才必須提供 。
  + 如需 的詳細資訊`StreamIdentifier`，請參閱 https：//[https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java\$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129)。若要建立 `StreamIdentifier`，建議您從 `streamArn`和 KCL 2.5.0 或更新版本`streamCreationEpoch`提供的 建立多串流執行個體。在不支援 的 KCL v2.3 和 v2.4 中`streamArm`，使用 格式建立多串流執行個體`account-id:StreamName:streamCreationTimestamp`。從下一個主要版本開始，此格式將被取代，不再受支援。
  +  MultistreamTracker 也包含刪除租用資料表中舊串流租用的策略 (formerStreamsLeasesDeletionStrategy)。請注意，在取用者應用程式執行期，無法變更策略。如需詳細資訊，請參閱 https：//[https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)。
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java) 是全應用程式類別，可用來指定在為 KCL 2.x 版或更新版本建置 KCL 取用者應用程式時要使用的所有 KCL 組態設定。 `ConfigsBuilder`類別現在支援 `MultistreamTracker`界面。您可以使用一個資料串流的名稱初始化 ConfigsBuilder，以取用來自以下內容的記錄： 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

或者，如果您想實作一個同時處理多個串流的 KCL 取用者應用程式，則可以使用 `MultiStreamTracker` 初始化 ConfigsBuilder。

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ 透過針對 KCL 取用者應用程式實作的多串流支援，應用程式租用資料表的每一列現在都包含碎片 ID 和此應用程式處理之多個資料串流的串流名稱。
+ 實作 KCL 取用者應用程式的多串流支援時， leaseKey 會採用下列結構：`account-id:StreamName:streamCreationTimestamp:ShardId`。例如 `111111111:multiStreamTest-1:12345:shardId-000000000336`。

**重要**  
當您現有的 KCL 取用者應用程式設定為僅處理一個資料串流時， `leaseKey`（這是租用資料表的分割區索引鍵） 是碎片 ID。如果您重新設定現有的 KCL 取用者應用程式來處理多個資料串流，它會中斷您的租用資料表，因為`leaseKey`結構必須如下所示： `account-id:StreamName:StreamCreationTimestamp:ShardId`以支援多串流。