

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 KCL 进行多流处理
<a name="kcl-multi-stream"></a>

本节介绍了 KCL 中所需的更改，这些更改让您能够创建可同时处理多个数据流的 KCL 消费端应用程序。
**重要**  
只有 KCL 2.3 或更高版本才支持多流处理功能。
使用非 Java 语言编写的通过 `multilangdaemon` 运行的 KCL 消费端*不*支持多流处理功能。
任何版本的 KCL 1.x 都*不*支持多流处理功能。
+ **MultistreamTracker 接口**
  + 要构建可以同时处理多个流的使用者应用程序，必须实现一个名为的新接口[MultistreamTracker](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/MultiStreamTracker.java)。此接口包括返回要由 KCL 消费端应用程序处理的数据流及其配置列表的 `streamConfigList` 方法。请注意，正在处理的数据流可以在消费端应用程序运行时进行更改。KCL 会定期调用 `streamConfigList` 来了解要处理的数据流的变化。
  + `streamConfigList`填充[StreamConfig](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java#L23)列表。

  ```
  package software.amazon.kinesis.common;
  
  import lombok.Data;
  import lombok.experimental.Accessors;
  
  @Data
  @Accessors(fluent = true)
  public class StreamConfig {
      private final StreamIdentifier streamIdentifier;
      private final InitialPositionInStreamExtended initialPositionInStreamExtended;
      private String consumerArn;
  }
  ```
  + `StreamIdentifier` 和 `InitialPositionInStreamExtended` 是必填字段，`consumerArn` 是选填字段。只有在使用 KCL 实现增强型扇出消费端应用程序时，才必须提供 `consumerArn`。
  + 有关的更多信息`StreamIdentifier`，请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java \$1L129](https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129). 要创建 `StreamIdentifier`，我们建议您从 KCL 2.5.0 或更高版本提供的 `streamArn` 和 `streamCreationEpoch` 创建一个多流实例。在不支持 `streamArm` 的 KCL v2.3 和 v2.4 中，请使用格式 `account-id:StreamName:streamCreationTimestamp` 创建一个多流实例。从下一个主要版本开始，此格式将弃用且不再受支持。
  +  MultistreamTracker 还包括删除租赁表中旧直播租约的策略 (formerStreamsLeasesDeletionStrategy)。请注意，在消费端应用程序运行时无法更改策略。欲了解更多信息，请参阅 [https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0 .java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java)。
+   [ConfigsBuilder](https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ConfigsBuilder.java)是一个应用程序范围的类，可用于指定在构建 KCL 2.x 或更高版本的 KCL 使用者应用程序时要使用的所有 KCL 配置设置。 `ConfigsBuilder`类现在支持该`MultistreamTracker`接口。你可以用一个数据流的名称进行初始化 ConfigsBuilder 以使用来自以下内容的记录： 

  ```
  /**
       * Constructor to initialize ConfigsBuilder with StreamName
       * @param streamName
       * @param applicationName
       * @param kinesisClient
       * @param dynamoDBClient
       * @param cloudWatchClient
       * @param workerIdentifier
       * @param shardRecordProcessorFactory
       */
      public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName,
              @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
              @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
              @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
          this.appStreamTracker = Either.right(streamName);
          this.applicationName = applicationName;
          this.kinesisClient = kinesisClient;
          this.dynamoDBClient = dynamoDBClient;
          this.cloudWatchClient = cloudWatchClient;
          this.workerIdentifier = workerIdentifier;
          this.shardRecordProcessorFactory = shardRecordProcessorFactory;
      }
  ```  

或者，`MultiStreamTracker`如果要实现同时处理多个流的 KCL 使用者应用程序，也可以使用进行初始化 ConfigsBuilder 。

```
* Constructor to initialize ConfigsBuilder with MultiStreamTracker
     * @param multiStreamTracker
     * @param applicationName
     * @param kinesisClient
     * @param dynamoDBClient
     * @param cloudWatchClient
     * @param workerIdentifier
     * @param shardRecordProcessorFactory
     */
    public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName,
            @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient,
            @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier,
            @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) {
        this.appStreamTracker = Either.left(multiStreamTracker);
        this.applicationName = applicationName;
        this.kinesisClient = kinesisClient;
        this.dynamoDBClient = dynamoDBClient;
        this.cloudWatchClient = cloudWatchClient;
        this.workerIdentifier = workerIdentifier;
        this.shardRecordProcessorFactory = shardRecordProcessorFactory;
    }
```
+ 通过为您的 KCL 消费端应用程序实现多流支持功能，应用程序租约表的每一行现在都包含该应用程序处理的多个数据流的分片 ID 和流名称。
+ 在为 KCL 消费端应用程序实现多流支持功能后，leaseKey 采用以下结构：`account-id:StreamName:streamCreationTimestamp:ShardId`。例如 `111111111:multiStreamTest-1:12345:shardId-000000000336`。

**重要**  
当现有 KCL 消费端应用程序配置为仅处理一个数据流时，`leaseKey`（租约表的分区键）就是分片 ID。如果您将此现有 KCL 消费端应用程序重新配置为处理多个数据流，则会破坏租约表，因为 `leaseKey` 必须采用如下结构才能支持多流功能：`account-id:StreamName:StreamCreationTimestamp:ShardId`。