

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建和管理 Kinesis 数据流
<a name="working-with-streams"></a>

Amazon Kinesis Data Streams 实时吸收大量数据、持久存储数据并使这些数据可供使用。Kinesis Data Streams 存储的数据单位是*数据记录*。*数据流* 表示一组数据记录。数据流中的数据记录将分发到分片中。

*分片*具有流中的一系列数据记录。它是 Kinesis 数据流的基本吞吐量单位。在按需 MB/s 和预配置容量模式下，分片支持每秒 1 和 1000 条*写入*记录以及每秒 2 MB/s 条*读取*记录。分片限制确保性能可预测，使设计和操作高度可靠的数据流式传输工作流程变得更加容易。

在本节中，您将学习如何为直播设置容量模式，以及如何使用 AWS 管理控制台 或创建直播 APIs。然后，您可以对流执行其他操作。

**Topics**
+ [选择正确的流式传入的模式](how-do-i-size-a-stream.md)
+ [使用创建直播 AWS 管理控制台](how-do-i-create-a-stream.md)
+ [使用创建直播 APIs](kinesis-using-sdk-java-create-stream.md)
+ [更新流](updating-a-stream.md)
+ [列出流](kinesis-using-sdk-java-list-streams.md)
+ [列出分片](kinesis-using-sdk-java-list-shards.md)
+ [删除流](kinesis-using-sdk-java-delete-stream.md)
+ [对流进行重新分片](kinesis-using-sdk-java-resharding.md)
+ [更改数据留存期](kinesis-extended-retention.md)
+ [标记 Amazon Kinesis Data Streams 资源](tagging.md)
+ [处理大型记录](large-records.md)
+ [使用执行弹性测试 AWS Fault Injection Service](kinesis-fis.md)

# 选择正确的流式传入的模式
<a name="how-do-i-size-a-stream"></a>

以下主题说明了如何为应用程序选择最佳模式，以及如何根据需要切换模式。

**Topics**
+ [Kinesis Data Streams 中有哪些不同的模式？](#diff-modes-kds)
+ [按需标准模式的特征和用例](#ondemandmode)
+ [按需优势模式的特征和用例](#ondemand-advantage-mode)
+ [预置模式的特征和用例](#provisionedmode)
+ [切换模式](#switchingmodes)

## Kinesis Data Streams 中有哪些不同的模式？
<a name="diff-modes-kds"></a>

模式决定如何管理数据流的容量以及如何对数据流的使用收费。在 Amazon Kinesis Data Streams 中，您可以为数据流选择**按需标准**、**按需优势**和**预置**模式。
+ **按需标准**：按需模式的数据流无需容量规划，并且可以自动扩展以处理每分钟数 GB 的写入和读取吞吐量。若采用按需模式，Kinesis Data Streams 会自动管理分片来提供必要的吞吐量。
+  **按需优势**：这是一种账户级模式，可提供更多功能，并简化按需流的定价体系。在此模式下，您可以在任何时间主动预热某个流的写入吞吐能力。对于定价，不再采用固定的流级计费模式，而且所有按需流的数据摄取、数据检索和延长留存使用量比**按需标准**流至少低 60%。
+ **预置**：对于处于预置模式的数据流，您必须指定数据流的分片数量。数据流的总容量是其分片容量的总和。您可以根据需要增加或减少数据流中的分片数。

你可以使用 Kinesis Data `PutRecord` Streams `PutRecords` APIs 和在任何模式下将数据写入数据流。为了检索数据，所有三种模式都支持使用 `GetRecords` API 的默认消费端和使用 `SubscribeToShard` API 的增强型扇出（EFO）消费端。

按需模式和预置模式均支持 Kinesis Data Streams 的所有功能，包括保留模式、加密、监控指标等。Kinesis Data Streams 在按需和预置容量模式下都具有较高持久性和可用性。

## 按需标准模式的特征和用例
<a name="ondemandmode"></a>

按需模式的数据流无需容量规划，并且可以自动扩展以处理每分钟数 GB 的写入和读取吞吐量。按需模式以低延迟的方式简化了摄取和存储大量数据的过程，因为它无需预置和管理服务器、存储或吞吐量。您每天可以摄取数十亿条记录，而不会产生任何运营开销。

按需模式非常适合用于应对高度可变且不可预测的应用程序流量的需求。您不再需要为峰值容量预置这些工作负载，因为峰值容量可能会因为利用率低而导致成本更高。按需模式适用于流量模式不可预测且高度可变的工作负载。

在按需容量模式下，您将按数据流中写入和读取的数据，为每 GB 数据付费。您无需指定预期应用程序执行的读写吞吐量。Kinesis Data Streams 会随着工作负载的增加或减少立即进行调整。有关更多信息，请参阅 [Amazon Kinesis Data Streams 定价](https://aws.amazon.com/kinesis/data-streams/pricing/)。

按需模式下的数据流最多可容纳过去 30 天内观察到的峰值写入吞吐量的两倍。当数据流的写入吞吐量达到新的峰值时，Kinesis Data Streams 会自动扩展数据流的容量。例如，如果您的数据流的写入吞吐量在写入吞吐量的 10 MB/s % 到 40MB/s, then Kinesis Data Streams ensures that you can easily burst to double your previous peak throughput, or 80 MB/s. If the same data stream sustains a new peak throughput of 50 MB/s, Kinesis Data Streams ensures that there is enough capacity to ingest 100 MB/s% 之间。但是，如果您的流量在 15 分钟内增加到前一个峰值的两倍以上，则可能会产生写入节流。您需要重试这些受限的请求。

按需模式下的数据流聚合读取容量与写入吞吐量成比例增加。这有助于确保消费端应用程序始终有足够的读取吞吐量来实时处理传入的数据。与使用 `GetRecords` API 读取数据相比，您获得的写入吞吐量至少是其两倍。我们建议您使用带有 `GetRecord` API 的消费端应用程序，这样当应用程序需要从停机时间中恢复时，有足够的空间来赶上。对于需要添加多个消费端应用程序的场景，建议您使用 Kinesis Data Streams 的增强型扇出功能。增强型扇出功能支持使用 `SubscribeToShard` API 将最多 20 个消费端应用程序添加到数据流中，每个消费端应用程序都有专用的吞吐量。

### 处理读写吞吐量异常
<a name="hotshards"></a>

在按需模式（与预置容量模式相同）下，必须为每条记录指定一个分区键，才能将数据写入数据流。Kinesis Data Streams 使用分区键在分片之间分配数据。Kinesis Data Streams 监控每个分片的流量。当 KB/s 每个分片的传入流量超过 500 时，它会在 15 分钟内拆分分该分片。父分片的哈希键值在子分片之间均匀地重新分配。

 如果您的传入流量超过之前峰值的两倍，即使您的数据在分片上均匀分配，您也可能会在约 15 分钟内遇到读取或写入异常的问题。我们建议您重试所有此类请求，以便将所有记录正确存储在 Kinesis Data Streams 中。

如果您使用的分区键会导致数据分配不均匀，并且分配给特定分片的记录超出了其限制，则可能会遇到读写异常的问题。在按需模式下，数据流会自动适应以处理不均匀的数据分配模式，除非单个分区键超过分片的 1 MB/s 吞吐量和每秒 1000 条记录的限制。

在按需模式下，Kinesis Data Streams 在检测到流量增加时会均匀地拆分分片。但是，它不会检测和隔离将更高比例的传入流量传送到特定分片的哈希键。如果您使用的分区键非常不均匀，则可能会继续收到写入异常的提示。对于此类使用案例，我们建议您使用支持精细分片拆分的预置容量模式。

## 按需优势模式的特征和用例
<a name="ondemand-advantage-mode"></a>

按需优势模式是一种账户级设置，可解锁更多功能，并为区域内的所有按需流提供不同的定价体系。在此模式下，按需流仍保留其功能，并根据数据实际使用量继续自动扩缩容量。如果想主动预热某个流的写入吞吐能力，可配置热吞吐量。例如，如果您的数据流的写入吞吐量介于 10 MB/s, you can expect it to handle up to 80MB/s of instant throughput increases without throttling. However, if you forecast an upcoming event to peak around 200MB/s of traffic, you can configure the stream with a warm throughput of 200MB/s 到 40 MB/s 之间，则可以确保在数据吞吐量到达时容量可用。使用热吞吐量不会产生额外费用。

按需优势模式的另一个好处是按需流可以过渡至更简单的定价体系。启用该模式后，账户将不再产生固定的按流计费，您只需处理数据摄取、数据检索和可选的延长留存费用。与按需标准模式相比，每个定价维度都有很大的折扣。有关更多信息，请参阅 [Amazon Kinesis Data Streams 定价](https://aws.amazon.com/kinesis/data-streams/pricing/)。

在这种模式下，增强扇出型数据检索也不会产生相较于标准数据检索的价格溢价。此外，在按需优势模式下，您可以为每个直播注册多达 50 个消费者，以使用增强的扇出功能。启用 Ondemand Advantage 会使账户在所有按需流中至少有 25MiB/s of data ingest and 25MiB/s% 的数据检索。对于满足最低使用要求的账户，Kinesis Data Streams 控制台会检查账户的使用模式是否适合使用**按需优势**模式。

如果账户的数据使用量低于要求，则需要对差额付费，但折扣率仍将保持不变。启用按需优势模式之后，至少还需要 24 小时才能禁用此模式。总体而言，如果持续使用的吞吐量接近或超过最低承诺值、需要大量扇出型消费端或使用数百个数据流，则按需优势模式即为使用 Kinesis Data Streams 进行流式传输的最佳方案。

## 预置模式的特征和用例
<a name="provisionedmode"></a>

在预配置模式下，在创建数据流之后，您可以使用或 API 动态地向上或向下扩展分片容量。 AWS 管理控制台 [UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html)您可以在 Kinesis Data Streams 创建器或消费端应用程序在向流写入数据或从中读取数据时进行更新。

预置模式适用于容量需求易于预测的可预测流量。如果您想精细控制分片间数据的分配方式，则可以使用预置模式。

若采用预置模式，您必须为数据流指定分片数。要确定预置模式下数据流的大小，您需要以下输入值：
+ 写入流的数据记录的平均大小，以 KB 为单位，四舍五入为 1 KB (`average_data_size_in_KB`)。
+ 每秒写入流和从流读取的数据记录数 (`records_per_second`)。
+ 消费端数量，即并发且独立使用流中数据的 Kinesis Data Streams 应用程序的数量 (`number_of_consumers`)。
+ 以 KB 为单位的传入写入带宽 (`incoming_write_bandwidth_in_KB`)，等于 `average_data_size_in_KB` 乘以 `records_per_second`。
+ 以 KB 为单位的传出读取带宽 (`outgoing_read_bandwidth_in_KB`)，等于 `incoming_write_bandwidth_in_KB` 乘以 `number_of_consumers`。

可使用以下公式中的输入值来计算流所需的分片数量 (`number_of_shards`)。

```
number_of_shards = ceiling(max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048))
```

如果您未将数据流配置为处理峰值吞吐量，则在预置模式下仍可能遇到读写吞吐量异常的问题。在这种情况下，您必须手动扩展数据流以适应数据流量。

如果您使用的分区键会导致数据分配不均匀，并且分配给特定分片的记录超出了其限制，则可能也会遇到读写异常的问题。要在预置模式下解决此问题，请确定此类分片并手动拆分，以更好地适应流量。有关更多信息，请参阅[对流进行重新分片](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html)。

## 切换模式
<a name="switchingmodes"></a>

对于中的每个数据流 AWS 账户，您可以在 24 小时内在按需模式和预配置模式之间切换两次。切换模式不会对使用该数据流的应用程序造成任何中断。您可以继续读写该数据流。当您在各模式之间切换（从按需切换到预置，或反之）时，流的状态将设置为*正在更新*。必须等待数据流状态变为*活动*，然后才能再次修改其属性。

当您从预置容量模式切换到按需容量模式时，数据流最初会保留转换之前的所有分片数量，从此时起，Kinesis Data Streams 将监控您的数据流量，并根据写入吞吐量扩展此按需数据流的分片数量。当您从按需模式切换到预置模式时，数据流最初也会保留转换之前的所有分片数量，但是从此时起，您将负责监控和调整此数据流的分片数量，以正确适应写入吞吐量。

可以通过启用账户级设置，从**按需标准**模式切换到**按需优势**模式。启用后，该账户承诺在该地区所有按需流中至少 25MiB/s of data ingest and 25MiB/s% 的数据检索使用量。启用后，至少须等待 24 小时才能禁用**按需优势**模式，但您可以随时申请更改。如果要从**按需优势**模式切换到**按需标准**模式，则必须先移除任何配置了按需流的热吞吐量。

# 使用创建直播 AWS 管理控制台
<a name="how-do-i-create-a-stream"></a>

可以使用 Kinesis Data Streams 控制台、Kinesis Data Streams API 或 AWS Command Line Interface （AWS CLI）创建流。

**使用控制台创建数据流**

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航栏中，展开区域选择器并选择一个区域。

1. 选择**创建数据流**。

1. 在**创建 Kinesis 流**页面上，输入数据流的名称，然后选择**按需**或**预置**容量模式。默认情况下，系统将选择**按需**模式。有关更多信息，请参阅 [选择正确的流式传入的模式](how-do-i-size-a-stream.md)。

   在**按需**模式下，您可以选择**创建 Kinesis 流**来创建数据流。在**预置**模式下，您必须指定所需的分片数量，然后选择**创建 Kinesis 流**。

   在 **Kinesis stream (Kinesis 流)**页面上，当流处于创建中时，流的 **Status (状态)** 为 **Creating (正在创建)**。当流可以使用时，**Status (状态)** 会更改为 **Active (有效)**。

1. 选择流的名称。**Stream Details (流详细信息)** 页面显示了流配置摘要以及监控信息。

**使用 Kinesis Data Streams API 创建流**
+ 有关使用 Kinesis Data Streams API 创建流的信息，请参阅 [使用创建直播 APIs](kinesis-using-sdk-java-create-stream.md)。

**要使用创建直播 AWS CLI**
+ 有关使用创建直播的信息，请参阅 cre [ate-stre](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html) am 命令。 AWS CLI

# 使用创建直播 APIs
<a name="kinesis-using-sdk-java-create-stream"></a>

请使用以下步骤创建 Kinesis 数据流。

## 构建 Kinesis Data Streams 客户端
<a name="kinesis-using-sdk-java-create-client"></a>

必须先构建客户端对象，然后才能处理 Kinesis 数据流。以下 Java 代码实例化一个客户端生成器，并使用它来设置区域、凭据和客户端配置。然后，它会构建一个客户端对象。

```
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
clientBuilder.setRegion(regionName);
clientBuilder.setCredentials(credentialsProvider);
clientBuilder.setClientConfiguration(config);
        
AmazonKinesis client = clientBuilder.build();
```

有关更多信息，请参阅《AWS 一般参考》**中的 [Kinesis Data Streams Regions and Endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region)。

## 创建流
<a name="kinesis-using-sdk-java-create-the-stream"></a>

现在您已创建 Kinesis Data Streams 客户端，接下来可使用控制台或以编程方式来创建流。要以编程方式创建流，请实例化 `CreateStreamRequest` 对象并指定流名称。若要使用预置模式，请为数据流指定分片数量。
+ **按需**：

  ```
  CreateStreamRequest createStreamRequest = new CreateStreamRequest();
  createStreamRequest.setStreamName( myStreamName );
  ```
+ **预置**：

  ```
  CreateStreamRequest createStreamRequest = new CreateStreamRequest();
  createStreamRequest.setStreamName( myStreamName );
  createStreamRequest.setShardCount( myStreamSize );
  ```

流名称用于标识流。该名称的作用域仅限于应用程序使用的 AWS 帐户。它还受区域限制。也就是说，两个不同 AWS 账户中的两个直播可以有相同的名称，同一个 AWS 账户中但位于两个不同区域的两个直播可以有相同的名称，但不能有两个直播在同一个账户和同一个区域中。

流的吞吐量是分片数量的函数。要获得更高的预置吞吐量，您需要更多的分片。分片越多还会增加对直播 AWS 收取的费用。有关计算适合您的应用程序的分片数量的更多信息，请参阅[选择正确的流式传入的模式](how-do-i-size-a-stream.md)。

 配置 `createStreamRequest` 对象后，应通过对客户端调用 `createStream` 方法来创建流。在调用 `createStream` 之后，应等待流达到 `ACTIVE` 状态，然后再对流执行任何操作。要查看流的状态，请调用 `describeStream` 方法。但是，如果流不存在，`describeStream` 将引发异常。因此，请将 `describeStream` 调用包括在 `try/catch` 块中。

```
client.createStream( createStreamRequest );
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) {
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
    //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) {
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}
```

# 更新流
<a name="updating-a-stream"></a>

可以使用 Kinesis Data Streams 控制台、Kinesis Data Streams API 或 AWS CLI更新流详细信息。

**注意**  
您可以为现有流或最近刚创建的流启用服务器端加密。

## 使用控制台
<a name="update-stream-console"></a>

**使用控制台更新数据流**

1. 打开亚马逊 Kinesis 控制台，网址为。[https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)

1. 在导航栏中，展开区域选择器并选择一个区域。

1. 在列表中选择流的名称。**Stream Details (流详细信息)** 页面显示流配置摘要和监控信息。

1. 要在数据流的按需容量模式和预置容量模式之间切换，请在**配置**选项卡中选择**编辑容量模式**。有关更多信息，请参阅 [选择正确的流式传入的模式](how-do-i-size-a-stream.md)。
**重要**  
对于您 AWS 账户中的每个数据流，您可以在 24 小时内在按需模式和预配置模式之间切换两次。

1. 对于处于预置模式下的数据流，要编辑分片数量，请在**配置**选项卡中选择**编辑预置分片**，然后输入新的分片数。

1. 要对数据记录启用服务器端加密，请选择**服务器端加密**部分中的**编辑**。选择要用作加密主密钥的 KMS 密钥，或者使用由 Kinesis 管理的默认主密钥 **aws/kinesis**。如果您为直播启用加密并使用自己的 AWS KMS 主密钥，请确保您的制作者和使用者应用程序可以访问您使用的 AWS KMS 主密钥。要将权限分配给应用程序以访问用户生成的 AWS KMS 密钥，请参阅[使用用户生成的 KMS 密钥的权限](permissions-user-key-KMS.md)。

1. 要编辑数据保留期，请选择 **Data retention period** 部分中的 **Edit**，然后输入新的数据保留期。

1. 如果您已在自己账户上启用自定义指标，请在**分片级别指标**部分中选择**编辑**，然后指定流的指标。有关更多信息，请参阅 [使用亚马逊监控亚马逊 Kinesis Data Streams 服务 CloudWatch](monitoring-with-cloudwatch.md)。

## 使用 API
<a name="update-stream-api"></a>

要使用 API 更新流详细信息，请参阅以下方法：
+ [AddTagsToStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_AddTagsToStream.html)
+ [DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html)
+ [DisableEnhancedMonitoring](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DisableEnhancedMonitoring.html)
+ [EnableEnhancedMonitoring](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_EnableEnhancedMonitoring.html)
+ [IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html)
+ [RemoveTagsFromStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RemoveTagsFromStream.html)
+ [StartStreamEncryption](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartStreamEncryption.html)
+ [StopStreamEncryption](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StopStreamEncryption.html)
+ [UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html)

## 使用 AWS CLI
<a name="update-stream-cli"></a>

有关使用更新直播的信息 AWS CLI，请参阅 [Kinesis CLI](https://docs.aws.amazon.com/cli/latest/reference/kinesis/index.html) 参考。

# 列出流
<a name="kinesis-using-sdk-java-list-streams"></a>

流的范围限定为与用于实例化 Kinesis Data Streams 客户端的 AWS 凭据关联的 AWS 账户以及为该客户端指定的区域。一个 AWS 账户可以同时具有多个活动流。您可在 Kinesis Data Streams 控制台中列出流，也可以编程方式列出流。本节中的代码显示了如何列出您 AWS 账户的所有直播。

```
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
listStreamsRequest.setLimit(20); 
ListStreamsResult listStreamsResult = client.listStreams(listStreamsRequest);
List<String> streamNames = listStreamsResult.getStreamNames();
```

此代码示例首先创建 `ListStreamsRequest` 的一个新实例，然后调用其 `setLimit` 方法来指定对 `listStreams` 的每次调用应返回最多 20 个流。如果您没有为 `setLimit` 指定值，则 Kinesis Data Streams 将返回的流数量小于或等于账户中的流数量。此代码之后将 `listStreamsRequest` 传递到客户端的 `listStreams` 方法。返回值 `listStreams` 存储在 `ListStreamsResult` 对象中。此代码对此对象调用 `getStreamNames` 方法，并将返回的流名称存储在 `streamNames` 列表中。请注意，Kinesis Data Streams 返回的流数量可能少于指定限制值所指定的数量，即使账户和区域中的流数量多于此数量也是如此。要确保检索所有流，请使用下一代码示例中描述的 `getHasMoreStreams` 方法。

```
while (listStreamsResult.getHasMoreStreams()) 
{
    if (streamNames.size() > 0) {
      listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1));
    }
    listStreamsResult = client.listStreams(listStreamsRequest);
    streamNames.addAll(listStreamsResult.getStreamNames());
}
```

此代码对 `getHasMoreStreams` 调用 `listStreamsRequest` 方法，以检查是否有超出在对 `listStreams` 的初始调用中返回的流数量的可用流。如果有，则此代码将使用在对 `setExclusiveStartStreamName` 的上一调用中返回的最后一个流的名称调用 `listStreams` 方法。`setExclusiveStartStreamName` 方法将导致对 `listStreams` 的下一调用在该流之后开始。该调用返回的一组流名称之后将添加到 `streamNames` 列表。此过程将继续，直到所有流名称已收集到列表中。

 `listStreams` 返回的流可能处于下列状态之一：
+ `CREATING`
+ `ACTIVE`
+ `UPDATING`
+ `DELETING`

您可使用 `describeStream` 方法查看流的状态（如上一节 [使用创建直播 APIs](kinesis-using-sdk-java-create-stream.md) 中所示）。

# 列出分片
<a name="kinesis-using-sdk-java-list-shards"></a>

一个数据流可以有一个或多个分片。从数据流中列出或检索分片的推荐方法是使用 API。[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)以下示例说明如何获取数据流中的分片列表。有关本示例中使用的主操作的完整说明以及您可以为该操作设置的所有参数，请参阅[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)。

```
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;

import java.util.concurrent.TimeUnit;

public class ShardSample {

    public static void main(String[] args) {

        KinesisAsyncClient client = KinesisAsyncClient.builder().build();

        ListShardsRequest request = ListShardsRequest
                .builder().streamName("myFirstStream")
                .build();

        try {
            ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS);
            System.out.println(response.toString());
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}
```

要运行上一个代码示例，您可以使用类似于下文的 POM 文件。

```
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kinesis.data.streams.samples</groupId>
    <artifactId>shards</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>kinesis</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
</project>
```

通过 `ListShards` API，您可以使用[ShardFilter](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html)参数筛选出 API 的响应。一次只能指定一个筛选条件。

如果您在调用 ListShards API 时使用`ShardFilter`参数，则`Type`为必填属性，必须指定。如果指定 `AT_TRIM_HORIZON`、`FROM_TRIM_HORIZON` 或 `AT_LATEST` 类型，则无需指定 `ShardId` 或 `Timestamp` 可选属性。

如果指定 `AFTER_SHARD_ID` 类型，则还必须提供可选 `ShardId` 属性的值。该`ShardId`属性的功能与 ListShards API 的`ExclusiveStartShardId`参数相同。指定 `ShardId` 属性后，响应将包括多个分片，其中开头的分片 ID 紧随您提供的 `ShardId`。

如果指定 `AT_TIMESTAMP` 或 `FROM_TIMESTAMP_ID` 类型，则还必须提供可选 `Timestamp` 属性的值。如果指定 `AT_TIMESTAMP` 类型，则返回在提供的时间戳上打开的所有分片。如果指定 `FROM_TIMESTAMP` 类型，则返回从提供的时间戳开始到 TIP 的所有分片。

**重要**  
`DescribeStreamSummary`并`ListShard` APIs 提供一种更具扩展性的方式来检索有关您的数据流的信息。更具体地说， DescribeStream API 的配额可能会导致限制。有关更多信息，请参阅 [限额和限制](service-sizes-and-limits.md)。另请注意，与您 AWS 账户中所有数据流交互的所有应用程序均共享`DescribeStream`配额。另一方面， ListShards API 的配额特定于单个数据流。因此，使用 ListShards API 不仅可以获得更高的 TPS，而且随着您创建更多数据流，操作可以更好地扩展。  
我们建议您迁移所有调用 DescribeStream API 的生产者和使用者，改为调用 DescribeStreamSummary 和 ListShard APIs。为了识别这些生产者和使用者，我们建议使用 Athena 来 CloudTrail 解析日志，因为在 API 调用中捕获 KPL 和 KCL 的用户代理。  

```
SELECT useridentity.sessioncontext.sessionissuer.username, 
useridentity.arn,eventname,useragent, count(*) FROM 
cloudtrail_logs WHERE Eventname IN ('DescribeStream')  AND 
eventtime
    BETWEEN ''
        AND ''
GROUP BY  useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent
ORDER BY  count(*) DESC LIMIT 100
```
我们还建议重新配置调用 API 的 AWS Lambda 和 Amazon Firehose 与 Kinesis Data Streams 的集成，以便集成改为调用`DescribeStream`和。`DescribeStreamSummary` `ListShards`具体而言，对于 AWS Lambda，您必须更新您的事件源映射。对于 Amazon Firehose，必须更新相应的 IAM 权限，使其包含 `ListShards` IAM 权限。

# 删除流
<a name="kinesis-using-sdk-java-delete-stream"></a>

您可使用 Kinesis Data Streams 控制台或以编程方式删除流。要以编程方式删除流，请使用 `DeleteStreamRequest`，如以下代码所示。

```
DeleteStreamRequest deleteStreamRequest = new DeleteStreamRequest();
deleteStreamRequest.setStreamName(myStreamName);
client.deleteStream(deleteStreamRequest);
```

删除流之前，请关闭流上运行的任何应用程序。如果应用程序尝试对已删除的流进行操作，则将收到 `ResourceNotFound` 异常。此外，如果您随后创建名称与之前的流相同的新流，并且在之前的流上运行的应用程序仍在运行，则这些应用程序可能会尝试与新流交互（就像新流是之前的流一样），这将产生无法预测的结果。

# 对流进行重新分片
<a name="kinesis-using-sdk-java-resharding"></a>

**重要**  
您可以使用 API 对直播进行重新分片。[UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html)否则，您可以按照此处的说明继续执行拆分和合并。

Amazon Kinesis Data Streams 支持*重新分片*，这使您能够调整流中的分片数量以适应流中数据流的速率变化。重新分片被视为高级操作。如果您不熟悉 Kinesis Data Streams，请在熟悉 Kinesis Data Streams 的所有其他方面之后再回来阅读本主题。

这里有两种类型的重新分片操作：分片拆分和分片合并。在分片拆分中，可将一个分片拆分为两个分片。在分片合并中，可将两个分片组合成一个分片。重新分片始终是*成对* 进行的，也就是说，无法在一次操作中拆分为两个以上的分片，并且无法在一次操作中合并两个以上的分片。重新分片操作涉及的分片或分片对称为*父* 分片。从重新分片操作中生成的分片或分片对称为*子* 分片。

拆分将增加流中分片的数量，从而增加流的数据容量。由于按分片收费，因此拆分将增加流的费用。同样，合并会减少流中的分片数量，从而降低流的数据容量和成本。

重新分片一般由不同于创建者（放置）应用程序和消费端（获取）应用程序的管理应用程序执行。这样的管理应用程序根据Amazon提供的指标 CloudWatch 或从制作者和消费者那里收集的指标来监控直播的整体性能。与使用者或创建者相比，管理应用程序还需要更广泛的 IAM 权限，因为使用者和创建者通常不需要访问 APIs 用于重新分片的用户。有关 Kinesis Data Streams 的 IAM 权限的更多信息，请参阅 [使用 IAM 控制对 Amazon Kinesis Data Streams 资源的访问](controlling-access.md)。

有关重新分片的更多信息，请参阅[如何更改 Kinesis Data Streams 中打开的分片数？](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/)

**Topics**
+ [确定重新分片策略](kinesis-using-sdk-java-resharding-strategies.md)
+ [拆分分片](kinesis-using-sdk-java-resharding-split.md)
+ [合并两个分片](kinesis-using-sdk-java-resharding-merge.md)
+ [完成重新分片操作](kinesis-using-sdk-java-after-resharding.md)

# 确定重新分片策略
<a name="kinesis-using-sdk-java-resharding-strategies"></a>

在 Amazon Kinesis Data Streams 中重新分片的目的是，使流能够适应数据流的速率的变化。拆分分片将增加流的容量（和费用）。合并分片将减少流的费用（和容量）。

 一种重新分片的方式可能是拆分流中的每个分片，这将使流的容量增加一倍。但是，这提供的容量可能比您实际需要的要多，从而产生不必要的费用。

您还可使用指标确定您的*热*或*冷*分片（即，接收的数据多于预期或少于预期的分片）。之后您可选择性地拆分热分片以增加面向这些分片的哈希键的容量。同样，您可合并冷分片以更好地利用其未使用的容量。

您可以从 Kinesis Data Streams 发布的亚马逊 CloudWatch 指标中获取直播的一些性能数据。但是，您也可收集您自己的一些关于流的指标。一种方式是记录由您的数据记录的分区键生成的哈希键值。记住，您在向流添加记录时指定了分区键。

```
putRecordRequest.setPartitionKey( String.format( "myPartitionKey" ) );
```

Kinesis Data Streams [MD5](http://en.wikipedia.org/wiki/MD5)使用分区键计算哈希键。由于您为记录指定了分区键，因此您可以使用 MD5 计算该记录的哈希键值并将其记录下来。

您也可以记录分配给您的数据记录的分片。 IDs 分片 ID 是通过使用 `getShardId` 对象（由 `putRecordResults` 方法返回）和对 `putRecords` 对象（由 `putRecordResult` 方法返回）的 `putRecord` 方法提供的。

```
String shardId = putRecordResult.getShardId();
```

使用分片 IDs 和哈希键值，您可以确定哪些分片和哈希键接收的流量最多或最少。您之后可使用重新分片操作来增加或减少容量（视这些键的情况而定）。

# 拆分分片
<a name="kinesis-using-sdk-java-resharding-split"></a>

要在 Amazon Kinesis Data Streams 中拆分分片，需要指定父分片中的哈希键值应如何重新分配给子分片。当您向流添加数据记录时，将基于哈希键值将数据记录分配给分片。哈希键值是您在向流中添加数据记录时为数据记录指定的分区键的[MD5](http://en.wikipedia.org/wiki/MD5)哈希值。具有相同分区键的数据记录也具有相同的哈希键值。

指定分片的可能的哈希键值构成一组有序的连续非负整数。此可能的哈希键值范围是通过以下命令指定的：

```
shard.getHashKeyRange().getStartingHashKey();
shard.getHashKeyRange().getEndingHashKey();
```

在拆分分片时，您指定此范围内的一个值。此哈希键值和所有较高的哈希键值将分配到其中一个子分片。所有较小的哈希键值将分配到另一子分片。

以下代码演示在所有子分片之间均匀地重新分配哈希键的分片拆分操作，基本上是将父分片一分为二。这只是一种可能的父分片划分方式。例如，可以拆分分片，以便父分片中下层三分之一的键分配给一个子分片，上层三分之二的键分配给另一子分片。但是，在许多应用程序中，将分片一分为二是一种很有效的方法。

此代码假定 `myStreamName` 包含您的流名称，对象变量 `shard` 包含要拆分的分片。首先实例化一个新的 `splitShardRequest` 对象并设置流名称和分片 ID。

```
SplitShardRequest splitShardRequest = new SplitShardRequest();
splitShardRequest.setStreamName(myStreamName);
splitShardRequest.setShardToSplit(shard.getShardId());
```

确定位于分片中最低值和最高值的中间的哈希键值。这是将包含父分片中上半层哈希键的子分片的起始哈希键值。在 `setNewStartingHashKey` 方法中指定此值。您只需要指定此值。Kinesis Data Streams 会自动将低于此值的哈希键分配给拆分操作所创建的另一子分片。最后一步是对 Kinesis Data Streams 客户端调用 `splitShard` 方法。

```
BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKey   = new BigInteger(shard.getHashKeyRange().getEndingHashKey());
String newStartingHashKey  = startingHashKey.add(endingHashKey).divide(new BigInteger("2")).toString();

splitShardRequest.setNewStartingHashKey(newStartingHashKey);
client.splitShard(splitShardRequest);
```

[等待流再次变为活动状态](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-wait-until-active)中演示了此过程之后的第一步。

# 合并两个分片
<a name="kinesis-using-sdk-java-resharding-merge"></a>

 分片合并操作使用两个指定分片并将它们合并为一个分片。在合并后，单个子分片将收到两个父分片所包含的所有哈希键值的数据。

**分片相邻**  
要合并两个分片，分片必须*相邻*。如果两个分片的哈希键范围联合构成一个无间断的连续集，则认为两个分片相邻。例如，假设您有两个分片，一个分片的哈希键范围为 276...381，另一个分片的哈希键范围为 382...454。您可以将这两个分片合并为一个分片，其哈希键范围为 276...454。

另举一例，假设您有两个分片，一个分片的哈希键范围为 276...381，另一个分片的哈希键范围为 455...560。您无法合并这两个分片，因为这两个分片之间有一个或多个哈希键位于 382...454 范围的分片。

流中所有`OPEN`分片的集合（作为一个组）始终跨越整个哈希键值范围。 MD5 有关分片状态（例如 `CLOSED`）的更多信息，请参阅 [重新分片之后考虑数据路由、数据保留和分片状态](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-data-routing)。

要标识作为合并候选的分片，您应筛选出处于 `CLOSED` 状态的所有分片。状态为 `OPEN`（也就是非 `CLOSED`）的分片的结尾序列号为 `null`。您可使用以下命令测试此结束序列号：

```
if( null == shard.getSequenceNumberRange().getEndingSequenceNumber() ) 
{
  // Shard is OPEN, so it is a possible candidate to be merged.
}
```

在筛选出已关闭分片后，按每个分片支持的最高哈希键值对剩余分片进行排序。您可使用以下命令检索此值：

```
shard.getHashKeyRange().getEndingHashKey();
```

 如果两个分片在这个经过筛选和排序的列表中相邻，则可合并它们。

**合并操作的代码**  
 以下代码可合并两个分片。此代码假定 `myStreamName` 包含您的流名称并且对象变量 `shard1` 和 `shard2` 包含要合并的两个相邻分片。

为进行合并操作，首先实例化一个新的 `mergeShardsRequest` 对象。使用 `setStreamName` 方法指定流名称。然后使用 `setShardToMerge` 和 `setAdjacentShardToMerge` 方法指定要合并的两个分片。最后，对 Kinesis Data Streams 客户端调用 `mergeShards` 方法以执行此操作。

```
MergeShardsRequest mergeShardsRequest = new MergeShardsRequest();
mergeShardsRequest.setStreamName(myStreamName);
mergeShardsRequest.setShardToMerge(shard1.getShardId());
mergeShardsRequest.setAdjacentShardToMerge(shard2.getShardId());
client.mergeShards(mergeShardsRequest);
```

[等待流再次变为活动状态](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-wait-until-active)中演示了此过程之后的第一步。

# 完成重新分片操作
<a name="kinesis-using-sdk-java-after-resharding"></a>

在 Amazon Kinesis Data Streams 中执行任何类型的重新分片过程之后，在继续常规记录处理之前，需要执行其他流程并注意一些事项。以下各节介绍了这些过程。

**Topics**
+ [等待流再次变为活动状态](#kinesis-using-sdk-java-resharding-wait-until-active)
+ [重新分片之后考虑数据路由、数据保留和分片状态](#kinesis-using-sdk-java-resharding-data-routing)

## 等待流再次变为活动状态
<a name="kinesis-using-sdk-java-resharding-wait-until-active"></a>

在调用重新分片操作 `splitShard` 或 `mergeShards` 之后，您必须等待流再次变为活动状态。要使用的代码与您在[创建流](kinesis-using-sdk-java-create-stream.md)之后等待流变为活动状态时使用的代码相同。代码如下所示：

```
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) 
{
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
   //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) 
{
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}
```

## 重新分片之后考虑数据路由、数据保留和分片状态
<a name="kinesis-using-sdk-java-resharding-data-routing"></a>

Kinesis Data Streams 是一项实时数据流服务。您的应用程序应假定数据不断地在流中的分片内流动。当您重新分片时，流至父分片的数据记录将基于数据记录分区键映射到的哈希键值重新路由至子分片。但是，重新分片前位于父分片中的任何数据记录将仍位于这些父分片中。父分片在重新分片发生后不会消失。它们与重新分片之前包含的数据一起保留。可以使用 Kinesis Data Streams API 中的 [`getShardIterator` 和 `getRecords`](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data) 操作或通过 Kinesis Client Library 访问父分片中的数据记录。

**注意**  
数据记录在当前保留期内添加到流中后即可访问。不论在该期间内对流中的分片进行了任何更改，都是如此。有关流的保留期的更多信息，请参阅[更改数据留存期](kinesis-extended-retention.md)。

在重新分片过程中，父分片将从 `OPEN` 状态过渡到 `CLOSED` 状态再过渡到 `EXPIRED` 状态。
+  **OPEN**：在重新分片操作之前，父分片处于 `OPEN` 状态，这意味着数据记录可添加到分片中并且可从分片进行检索。
+  **CLOSED**：在重新分片操作之后，父分片将过渡到 `CLOSED` 状态。这意味着无法再向此分片添加数据记录。原本应该已添加到此分片的数据记录现在将改为添加到子分片。但是，数据记录在有限时间内仍可从此分片进行检索。
+  **EXPIRED**：在流的保留期过期之后，父分片中的所有数据记录将会过期，不再可供访问。此时，父分片自身将过渡到 `EXPIRED` 状态。用于枚举流中的分片的 `getStreamDescription().getShards` 调用不包括返回的分片列表中的 `EXPIRED` 分片。有关流的保留期的更多信息，请参阅[更改数据留存期](kinesis-extended-retention.md)。

在进行重新分片并且流再次处于 `ACTIVE` 状态之后，您可立即开始读取子分片中的数据。但是，在重新分片后保留的父分片可能仍包含您尚未读取并且已在重新分片前添加到流中的数据。如果您在读取完父分片中的所有数据之前读取子分片中的数据，则可不按数据记录的序列号指定的顺序读取特定哈希键的数据。因此，假定数据顺序很重要，您在重新分片后应始终继续读取父分片中的数据直到读取完。并且只有在这之后才应开始读取子分片中的数据。如果 `getRecordsResult.getNextShardIterator` 返回 `null`，则这指示您已读取完父分片中的所有数据。

# 更改数据留存期
<a name="kinesis-extended-retention"></a>

Amazon Kinesis Data Streams 支持更改数据流的数据记录保留期。Kinesis 数据流是数据记录的有序序列，可用于执行实时写入和读取。因此，数据记录临时存储在您的流的分片中。从添加记录开始，到记录不再可供访问为止的时间段称为*保留期*。默认情况下，Kinesis 数据流的记录存储时间从 24 小时到 8760 小时（365 天）不等。

您可以通过 Kinesis Data Streams 控制台或使用[IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html)和[DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html)操作来更新保留期。利用 Kinesis Data Streams 控制台，您可以同时批量编辑多个数据流的保留期。您可以使用[IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html)操作或 Kinesis Data Streams 控制台将保留期延长至最长 8760 小时（365 天）。您可以使用该[DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html)操作或 Kinesis Data Streams 控制台将保留期缩短至至少 24 小时。两个操作的请求语法均包括流名称和保留期（以小时为单位）。最后，您可以通过调用 [DescribeStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html) 操作来检查流的当前保留期。

以下是使用 AWS CLI更改保留期的示例：

```
aws kinesis increase-stream-retention-period --stream-name retentionPeriodDemo --retention-period-hours 72
```

在增加保留期后的数分钟内，Kinesis Data Streams 会开放对处于先前保留期内的记录的访问权限。例如，将保留期从 24 小时更改为 48 小时意味着，在 23 小时 55 分钟之前添加到流中的记录在 24 小时之后仍可用。

在缩短保留期后，Kinesis Data Streams 几乎会立即使比新保留期更早的记录不可供访问。因此，在调用 [DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html) 操作时务必小心谨慎。

设置数据保留期以确保在出现问题时，您的消费端可以在数据过期之前读取数据。您应该仔细考虑所有可能的情况，例如记录处理逻辑出现问题，或者下游依赖关系长时间断开。将保留期视为安全网，可留出更多时间供您的数据消费端恢复。使用保留期 API 操作，您可以主动设置此项，或者积极响应操作事件。

 对于保留期设置为 24 小时以上的流，将收取额外费用。有关更多信息，请参阅 [Amazon Kinesis Data Streams 定价](https://aws.amazon.com/kinesis/data-streams/pricing/)。

# 标记 Amazon Kinesis Data Streams 资源
<a name="tagging"></a>

您可以将自己的元数据以*标签*的形式分配给您在 Amazon Kinesis Data Streams 中创建的流和增强扇出型消费端。标签是您为流定义的键值对。使用标签是一种管理 AWS 资源和整理数据（包括账单数据）的简单而强大的方法。

**Topics**
+ [查看标签基本知识](#tagging-basics)
+ [使用标签跟踪成本](#tagging-billing)
+ [了解标签限制](#tagging-restrictions)
+ [使用 Kinesis Data Streams 控制台标记流](#tagging-console)
+ [使用标记直播 AWS CLI](#tagging-cli)
+ [使用 Kinesis Data Streams 标记直播 APIs](#tagging-api)
+ [使用标记消费者 AWS CLI](#tagging-consumers-cli)
+ [使用 Kinesis Data Streams 标记消费者 APIs](#tagging-consumers-api)

## 查看标签基本知识
<a name="tagging-basics"></a>

可标记的 Kinesis Data Streams 资源包括数据流和增强型扇出消费端。你可以使用 Kinesis Data Streams 控制台 AWS CLI或 Kinesis Data Streams API 来完成以下任务：
+ 使用标签创建资源
+ 向资源添加标签
+ 列出资源的标签
+ 从资源中删除标签

**注意**  
使用 Kinesis Data Streams 控制台无法将标签应用于增强型扇出消费端。要将标签应用于使用者，请使用 AWS CLI 或 Kinesis Data Streams API。

您可以使用标签对 资源进行分类。例如，您可以按用途、所有者或环境对资源进行分类。由于您定义每个标签的键和值，因此您可以创建一组自定义类别来满足您的特定需求。例如，您可以定义一组标签来帮助您按拥有者和关联应用程序跟踪资源。以下几个标签示例：
+ 项目：项目名称
+ 所有者：名称
+ 用途：负载测试 
+ 应用程序：应用程序名称
+ 环境：生产 

**重要**  
要在创建流时添加标签，必须为流添加 `kinesis:CreateStream` 和 `kinesis:AddTagsToStream` 权限。在创建流时，**不能使用** `kinesis:TagResource` 权限来标记流。
要在消费端注册期间添加标签，就必须添加 `kinesis:TagResource` 和 `kinesis:RegisterStreamConsumer` 权限。

## 使用标签跟踪成本
<a name="tagging-billing"></a>

您可以使用标签对 AWS 费用进行分类和跟踪。当您对 Kinesis Data Streams 资源应用标签时， AWS 您的成本分配报告包括按标签汇总的使用量和成本。您可以设置代表业务类别（例如成本中心、应用程序名称或所有者）的标签，以便整理多种服务的成本。有关更多信息，请参阅《AWS Billing 用户指南》**中的[对自定义账单报告使用成本分配标签](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html)。

## 了解标签限制
<a name="tagging-restrictions"></a>

以下限制适用于标签：

**基本限制**
+ 每个资源的最大标签数是 50。
+ 标签键和值区分大小写。
+ 无法更改或编辑已删除的资源的标签。

**标签键限制**
+ 每个标签键必须是唯一的。如果您添加的标签具有已使用的键，则您的新标签将覆盖现有键值对。
+ 标签密钥不能以开头，`aws:`因为此前缀已保留供使用 AWS。 AWS 代表您创建以此前缀开头的标签，但您无法对其进行编辑或删除。
+ 标签键的长度必须介于 1 和 128 个 Unicode 字符之间。
+ 标签键必须包含以下字符：Unicode 字母、数字、空格和以下特殊字符：`_ . / = + - @`。

**标签值限制**
+ 标签值的长度必须介于 0 和 255 个 Unicode 字符之间。
+ 标签值可以为空。另外，它们必须包含以下字符：Unicode 字母、数字、空格和以下任意特殊字符：`_ . / = + - @`。

## 使用 Kinesis Data Streams 控制台标记流
<a name="tagging-console"></a>

您可以使用 Kinesis Data Streams 控制台在流上添加、更新、列出和移除标签。

**查看流的标签**

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在左侧导航窗格中，选择**数据流**。

1. 在**数据流**页面上，选择您要标记的流。

1. 在流详细信息页面上，选择**配置**。

1. 在**标签**部分，查看应用于流的标签。

**创建带有标签的数据流**

1. 打开 Kinesis Data Streams 控制台。

1. 在左侧导航窗格中，选择**数据流**。

1. 选择**创建数据流**。

1. 在**创建数据流**页面上，输入数据流的名称。

1. 对于**数据流容量**，选择**按需**或**预置**容量模式。

   有关容量模式的更多信息，请参阅[选择正确的流式传入的模式](how-do-i-size-a-stream.md)。

1. 请在**标签**部分执行以下操作：

   1. 选择**添加新标签**。

   1. 在**键**中输入标签，然后也可以选择在**值**字段中指定值。

      如果出现错误，则您指定的标签键或值不满足标签限制。有关更多信息，请参阅 [了解标签限制](#tagging-restrictions)。

1. 选择**创建数据流**。

**在流中添加或更新标签**

1. 打开 Kinesis Data Streams 控制台。

1. 在左侧导航窗格中，选择**数据流**。

1. 在**数据流**页面上，选择要向其添加或更新标签的流。

1. 在流详细信息页面上，选择**配置**。

1. 在**标签**部分中，选择**管理标签**。

1. 在**标签**下，执行以下操作之一：
   + 要添加标签，请选择**添加新标签**，然后输入标签的**键**和**值**数据。将该步骤重复执行所需的次数。

     每个流可以添加的最大标签数量为 50 个。
   + 要更新现有标签，请在该标签的**键**的**值**字段中输入新的标签值。

   如果出现错误，则您指定的标签键或值不满足标签限制。有关更多信息，请参阅 [了解标签限制](#tagging-restrictions)。

1. 选择**保存更改**。

**从流中删除标签**

1. 打开 Kinesis Data Streams 控制台。

1. 在左侧导航窗格中，选择**数据流**。

1. 在**数据流**页面上，选择要从中移除标签的流。

1. 在流详细信息页面上，选择**配置**。

1. 在**标签**部分中，选择**管理标签**。

1. 查找要移除的标签**键**和**值**对。然后，选择**移除**。

1. 选择**保存更改**。

## 使用标记直播 AWS CLI
<a name="tagging-cli"></a>

您可以使用 AWS CLI在流上添加、列出和移除标签。有关示例，请参阅以下文档。

 [create-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html)   
创建带有标签的流。

 [add-tags-to-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/add-tags-to-stream.html)   
为指定的流添加或更新标签。

 [list-tags-for-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/list-tags-for-stream.html)  
列出指定流的标签。

 [remove-tags-from-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/remove-tags-from-stream.html)  
从指定的流中删除标签。

## 使用 Kinesis Data Streams 标记直播 APIs
<a name="tagging-api"></a>

你可以使用 Kinesis Dat APIs a Streams 在直播中添加、列出和删除标签。有关示例，请参阅以下文档：

 [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)   
创建带有标签的流。

 [AddTagsToStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_AddTagsToStream.html)   
为指定的流添加或更新标签。

 [ListTagsForStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListTagsForStream.html)  
列出指定流的标签。

 [RemoveTagsFromStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RemoveTagsFromStream.html)  
从指定的流中删除标签。

## 使用标记消费者 AWS CLI
<a name="tagging-consumers-cli"></a>

您可以使用 AWS CLI在消费端上添加、列出和移除标签。有关示例，请参阅以下文档：

[register-stream-consumer](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/register-stream-consumer.html)  
在带有标签的 Kinesis 数据流中注册消费端。

[tag-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/tag-resource.html)  
为指定的 Kinesis 资源添加或更新标签。

[list-tags-for-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/list-tags-for-resource.html)  
列出指定 Kinesis 资源的标签。

[untag-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/untag-resource.html)  
从指定的 Kinesis 资源中移除标签。

## 使用 Kinesis Data Streams 标记消费者 APIs
<a name="tagging-consumers-api"></a>

您可以使用 Kinesis Dat APIs a Streams 在使用者身上添加、列出和删除标签。有关示例，请参阅以下文档：

[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)  
在带有标签的 Kinesis 数据流中注册消费端。

[TagResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_TagResource.html)  
为指定的 Kinesis 资源添加或更新标签。

[ListTagsForResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListTagsForResource.html)  
列出指定 Kinesis 资源的标签。

[UntagResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UntagResource.html)  
从指定的 Kinesis 资源中移除标签。

# 处理大型记录
<a name="large-records"></a>

Amazon Kinesis Data Streams 支持最大 10 兆MiBs字节 () 的记录。建议使用此功能来处理超过 1 MiB 默认记录大小限制的间歇性数据有效载荷。现有和新建流的默认最大记录大小设置为 1 MiB。

该功能有利于物联网（IoT）应用程序、更改数据捕获（CDC）管道以及偶尔需要处理较大数据有效载荷的机器学习工作流程。要开始在流中使用大型记录，请更新流的最大记录大小限制。

**重要**  
写入的单个分片吞吐量限制 MB/s 为 1，读取 2 MB/s 的吞吐量限制保持不变，同时支持更大的记录大小。Kinesis Data Streams 旨在容纳间歇性的大型记录，以及小于或等于 1 MiB 的记录基准流量。但并非为适应持续的大容量大型记录摄取而设计。

## 更新您的流以使用大型记录
<a name="update-stream"></a>

**使用 Kinesis Data Streams 处理较大记录**

1. 导航到 Kinesis Data Streams 控制台。

1. 选择您的流，然后前往**配置**选项卡。

1. 单击**最大记录大小**旁边的**编辑**。

1. 设置最大记录大小（最大值为 10 MiB）。

1. 保存更改。

此设置仅调整该 Kinesis 数据流的最大记录大小。在添加该限制之前，请验证所有下游应用程序是否能处理较大的记录。

您也可以使用 AWS CLI 更新此设置：

```
aws kinesis update-max-record-size \ --stream-arn  \
        --max-record-size-in-ki-b 5000
```

## 使用大型记录优化流性能
<a name="optimizing-performance"></a>

建议将大型记录保持在总流量的 2% 以下。流中每个分片的吞吐能力为每秒 1 MiB。为了容纳大型记录，Kinesis 数据流最多可爆发 10 个 MiBs，而平均每秒 1 MiB。这种支持大型记录的能力会持续注入流中。注入速度取决于大型记录的大小和基准记录的大小。为获得最佳效果，请使用均匀分布的分区键。有关 Kinesis 如何按需扩展的更多信息，请参阅[按需模式功能和用例](how-do-i-size-a-stream.html#ondemandmode)。

## 使用大型记录缓解限制
<a name="mitigate-throttling"></a>

**缓解限制**

1. 在创建者应用程序中实现带有指数级退避的重试逻辑。

1. 使用随机的分区键在可用分片之间分配大型记录。

1. 对于连续的大型记录流，将有效载荷存储在 Amazon S3 中，仅向流发送元数据引用。有关更多信息，请参阅[使用 Amazon Kinesis Data Streams 处理大型记录](https://aws.amazon.com/blogs/big-data/processing-large-records-with-amazon-kinesis-data-streams/)。

## 使用 Kinesis Data Streams 处理大型记录 APIs
<a name="records-apis"></a>

大型记录支持引入了一个新的 API，并更新了两个现有的控制平面 APIs 以处理最多 10 个记录 MiBs。

用于修改记录大小的 API：
+ `UpdateMaxRecordSize`：为现有直播配置最大记录大小限制，最多 10 MiBs。

对现有版本的更新 APIs：
+ `CreateStream`：添加可选 `MaxRecordSizeInKiB` 参数，用于在创建流期间设置记录大小限制。
+ `DescribeStreamSummary`：返回 `MaxRecordSizeInKiB` 字段显示当前的流配置。

所有 APIs 列出的内容都保持了对现有直播的向后兼容性。有关完整的 API 文档，请参阅 [Amazon Kinesis Data Streams 服务 API 参考](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html)。

## AWS 与大型唱片兼容的组件
<a name="record-compatability"></a>

以下 AWS 组件与大型唱片兼容：


| 组件 | 说明 | 
| --- | --- | 
|  AWS SDK | AWS SDK 支持处理大型记录。您可以使用中的可用方法更新直播的最大记录大小（最多 10 MiB）。 AWS SDKs有关更多信息，请参阅将[此服务与 AWS SDK 配合使用](https://docs.aws.amazon.com/streams/latest/dev/sdk-general-information-section.html)。 | 
|  Kinesis Consuer Library（KCL） | KCL 从 2.x 版开始支持处理大型记录。要使用大型录制支持，请更新流的 `maxRecordSize` 并使用 KCL。有关更多信息，请参阅[使用 Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html) 。 | 
|  Kinesis Producer Library（KPL） | KPL 从 1.0.5 版开始支持处理大型记录。要使用大型录制支持，请更新流的 maxRecordSize 并使用 KPL。有关更多信息，请参阅[使用 Amazon Kinesis Producer Library（KPL）开发产生器](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)。 | 
|  Amazon EMR | 带有 Apache Spark 的 Amazon EMR 支持处理不超过 Kinesis Data Streams 限制 (10) 的大型记录。 MiBs要使用大型记录支持，请使用 `readStream` 函数。有关更多信息，请参阅[ Amazon EMR 与 Amazon Kinesis 集成](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-kinesis.html)。 | 
|  Amazon Data Firehose | 与 Kinesis Data Streams 一起使用时，Amazon Data Firehose 处理大型记录的行为取决于传输目的地： [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/large-records.html) 对于需要向 Snowflake 或 Redshift 传输大型记录的应用程序，请先将数据传输至 Amazon S3。然后使用提取、转换、加载（ETL）过程来加载数据。对于所有其他目的地，在扩展到生产使用量之前，请在 proof-of-concept环境中使用大型记录测试行为。大型记录的处理因目的地而不同。  | 
|  AWS Lambda | AWS Lambda 支持最多 6 MiBs 个有效载荷。此限制适用于转换为 base-64 编码的 Kinesis 有效载荷，以及与事件源映射（ESM）有关的元数据。对于小于 6 的记录 MiBs，Lambda 使用 ESM 处理这些记录，无需额外配置。对于大于 6 的记录 MiBs，Lambda 使用失败时目标对其进行处理。必须使用 ESM 配置失败时的目标，才能处理超出 Lambda 处理限制的记录。发送到失败时的目标的各个事件都是一个 JSON 文档，其中包含有关调用失败的元数据。 无论记录大小如何，都建议在 ESM 中创建一个失败时的目标。这可以确保不会丢弃任何记录。有关更多信息，请参阅[配置调用失败时的目标](https://docs.aws.amazon.com/lambda/latest/dg/kinesis-on-failure-destination.html#kinesis-on-failure-destination-console)。 | 
|  Amazon Redshift | 在从 Kinesis Data Streams 流式传输数据时，Amazon Redshift 仅支持小于 1 MiB 的记录大小。超过此限制的记录将不予处理。未处理的记录记录为 `sys_stream_scan_errors`。有关更多信息，请参阅 [SYS\$1STREAM\$1SCAN\$1ERRORS](https://docs.aws.amazon.com/redshift/latest/dg/r_SYS_STREAM_SCAN_ERRORS.html)。 | 
|  Kinesis Data Streams 的 Flink 连接器 | 使用来自 Kinesis Data Streams 的数据有两种方法：Kinesis 来源连接器以及 Kinesis 接收器连接器。源连接器支持处理小于 1 MiB 和最多 10 MiB 的记录。 MiBs对于大于 1 MiB 的记录，请勿使用接收器连接器。有关更多信息，请参阅[使用 API 使用连接器在适用于 Apache Flink 的亚马逊托管服务中 DataStream移动数据](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)。 | 

## 支持大型记录的区域
<a name="supported-regions"></a>

此 Amazon Kinesis Data Streams 功能仅在 AWS 以下区域可用：


| AWS 区域 | 区域名称 | 
| --- | --- | 
|  eu-north-1 | 欧洲地区（斯德哥尔摩） | 
|  me-south-1 | 中东（巴林） | 
|  ap-south-1 | 亚太地区（孟买） | 
|  eu-west-3 | 欧洲地区（巴黎） | 
|  ap-southeast-3 | 亚太地区（雅加达） | 
|  us-east-2 | 美国东部（俄亥俄州） | 
|  af-south-1 | 非洲（开普敦） | 
|  eu-west-1 | 欧洲地区（爱尔兰） | 
|  me-central-1 | 中东（阿联酋）： | 
|  eu-central-1 | 欧洲地区（法兰克福） | 
|  sa-east-1 | 南美洲（圣保罗） | 
|  ap-east-1 | 亚太地区（香港） | 
|  ap-south-2 | 亚太地区（海得拉巴） | 
|  us-east-1 | 美国东部（弗吉尼亚州北部） | 
|  ap-northeast-2 | 亚太地区（首尔） | 
|  ap-northeast-3 | 亚太地区（大阪） | 
|  eu-west-2 | 欧洲地区（伦敦） | 
|  ap-southeast-4 | 亚太地区（墨尔本） | 
|  ap-northeast-1 | 亚太地区（东京） | 
|  us-west-2 | 美国西部（俄勒冈州） | 
|  us-west-1 | 美国西部（北加利福尼亚） | 
|  ap-southeast-1 | 亚太地区（新加坡） | 
|  ap-southeast-2 | 亚太地区（悉尼） | 
|  il-central-1 | 以色列（特拉维夫） | 
|  ca-central-1 | 加拿大（中部） | 
|  ca-west-1 | 加拿大西部（卡尔加里） | 
|  eu-south-2 | 欧洲（西班牙） | 
|  cn-northwest-1 | 中国（宁夏） | 
|  eu-central-2 | 欧洲（苏黎世） | 
| us-gov-east-1 | AWS GovCloud （美国东部） | 
| us-gov-west-1 | AWS GovCloud （美国西部） | 

# 使用执行弹性测试 AWS Fault Injection Service
<a name="kinesis-fis"></a>

AWS Fault Injection Service 是一项完全托管的服务，可帮助您对 AWS 工作负载执行故障注入实验。 AWS FIS 与 Amazon Kinesis Data Streams 的集成使您能够在受控环境中针对常见的 Amazon Kinesis Data Streams API 错误测试应用程序弹性。此功能支持在遇到故障之前验证错误处理、重试逻辑和监控系统。有关更多信息，请参阅[什么是 AWS Fault Injection Service？](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html) 。

**操作**
+ API 内部错误：在目标 IAM 角色发出的请求中注入内部错误。具体响应取决于每项服务和 API。`aws:fis:inject-api-internal-error` 操作产生 `InternalFailure` 错误（HTTP 500）。
+ API 限制错误：在目标 IAM 角色发出的请求中注入内部错误。具体响应取决于每项服务和 API。`aws:fis:inject-api-throttle-error` 操作产生 `ThrottlingException` 错误（HTTP 400）。
+ API 不可用错误：在目标 IAM 角色发出的请求中注入内部错误。具体响应取决于每项服务和 API。`aws:fis:inject-api-unavailable-error` 操作产生 `ServiceUnavailable` 错误（HTTP 503）。
+ API 预置吞吐量异常：在目标 IAM 角色发出的请求中注入内部错误。具体响应取决于每项服务和 API。`aws:kinesis:inject-api-provisioned-throughput-exception` 操作产生 `ProvisionedThroughputExceededException` 错误（HTTP 400）。
+ API 迭代器到期异常：在目标 IAM 角色发出的请求中注入内部错误。具体响应取决于每项服务和 API。`aws:kinesis:inject-api-expired-iterator-exception` 操作产生 `ExpiredIteratorException` 错误（HTTP 400）。

有关更多信息，请参阅 [Amazon Kinesis Data Streams 操作](https://docs.aws.amazon.com/fis/latest/userguide/fis-actions-reference.html#aws-kinesis-actions)。

**注意事项**
+ 可以在 Amazon Kinesis Data Streams 的预置和按需产品中使用上述操作。
+ 根据所选时间完成实验后，流式传输随之恢复。您也可以在实验完成之前停止运行中的实验。或者，您可以根据在 Amazon A CloudWatch pplication Insights 中定义应用程序运行状况的警报来定义停止实验的停止条件。
+ 最多可以测试 280 个流。

有关区域支持的更多信息，请参阅 [AWS Fault Injection Service 端点和配额](https://docs.aws.amazon.com/general/latest/gr/fis.html)。

# 预置吞吐量错误
<a name="kinesis-fis-provisioned-throughput"></a>

当某个 Kinesis 流的请求速率超出一个或多个分片的吞吐量限制时，就会出现超出预置吞吐量异常错误（HTTP 400）。每个分片都有特定的读取和写入容量限制，超出这些限制就会触发这种异常。导致此种异常的场景包括：数据摄取或消耗量突然飙升，分片容量不足以应对正在处理的数据量，或者分区键分布不均。

**处理异常的建议**
+ 实现指数退避与重试机制。
+ 增加分片数量以适应更高的吞吐量。
+ 确保分区键正常分布。
+ 监控流指标。

此外，使用 Kinesis 按需容量模式有助于自动调整工作负载，并最大限度地减少这种异常的发生。有关更多信息，请参阅[什么是 AWS Fault Injection Service？](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html)

**注意**  
分布不当问题不在按需模式的自动扩缩功能范围之内。

**进行基础实验**

1. 使用基准指标：测试之前记录正常的吞吐量模式。

1. 创建实验：使用 `aws:kinesis:inject-api-provisioned-throughput-exception` 操作。

1. 配置强度：从 25% 的请求限制开始。

1. 监控响应：通过指数退避验证重试逻辑。

1. 验证扩缩：确认自动扩缩放触发了激活。

1. 检查警报：确保 `CloudWatch` 警报按预期运行。

应用程序应实施适当的退避策略、监控 `WriteProvisionedThroughputExceeded` 和 `ReadProvisionedThroughputExceeded` 指标，并在适当时触发分片扩缩。

**操作详细信息**
+ **资源类型**：IAM 角色 ARN
+ **目标操作**：`PutRecord`、`PutRecords`、`GetRecords`
+ ****错误代码****：`ProvisionedThroughputExceededException`（HTTP 400）
+ ****描述****：模拟请求速率超出分片容量限制的场景，测试应用程序限制和扩缩响应。

**参数**
+ **IAM 角色 ARN**：应用程序用于 Kinesis Data Streams 操作的角色。
+ **操作**：目标操作：`PutRecord`、`PutRecords`、`GetRecords`。
+ **资源列表**：特定的流名称或分片标识符。
+ **持续时间**：实验的持续时间，从一分钟到 12 小时不等。在 AWS FIS API 中，该值是 ISO 8601 格式的字符串。例如， PT1M 代表一分钟。在 AWS FIS 控制台中，您可以输入秒数、分钟数或小时数。
+ **强度**：要施加限制的请求的百分比。

**所需的权限**
+ `kinesis:InjectApiError`

实验模板示例

 以下示例显示了带有指定标签的所有请求（最多 5 个 Kinesis 数据流）的预配置吞吐量异常。 AWS FIS 随机选择要影响的直播。5 分钟后故障解决。

```
{
    "description": "Kinesis stream experiment",
    "targets": {
        "KinesisStreams-Target-1": {
            "resourceType": "aws:kinesis:stream",
            "resourceTags": {
                   "tag-key": "tag-value"
            },
            "selectionMode": "COUNT(5)"
        }
    },
    "actions": {
         "kinesis": {
              "actionId": "aws:kinesis:stream-provisioned-throughput-exception",
              "description": "my-stream",
              "parameters": {
                   "duration": "PT5M",
                   "percentage": "100",
                   "service": "kinesis"
              },
              "targets": {
                    "KinesisStreams": "KinesisStreams-Target-1"
              }
         }
   },
   "stopConditions": [
         {
              "source": "none"
         }
   ],
   "roleArn": "arn:aws:iam::111122223333:role/role-name",
   "tags": {},
   "experimentOptions": {
       "accountTargeting": "single-account",
       "emptyTargetResolutionMode": "fail"
   }    
}
```

实验角色权限示例

以下权限支持在特定流上执行 `aws:kinesis:stream-provisioned-throughput-exception` 和 `aws:kinesis:stream-expired-iterator-exception` 操作来影响 50% 的请求。

# 迭代器过期异常错误
<a name="kinesis-fis-expired-iterator"></a>

 迭代器过期异常错误（HTTP 400）是在分片迭代器过期时发生的，在调用 `GetRecords` 时不再用于检索流记录。读取操作之间因为长时间运行数据处理任务、网络问题或应用程序停机而产生延迟时，就会发生这种情况。

**注意**  
分片迭代器在发出后 5 分钟有效。

**处理异常的建议**
+ 在分片迭代器到期之前进行刷新。
+ 整合错误处理以获取新的迭代器。
+ 利用 Kinesis Client Library（KCL），该服务可自动管理分片迭代器的到期时间。

有关更多信息，请参阅[什么是 AWS Fault Injection Service？](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html)

**进行基础实验**

1. 创建实验模板：使用 AWS FIS 控制台。

1. 选择操作：使用 `aws:kinesis:inject-api-expired-iterator-exception` 操作。

1. 配置目标：指定 IAM 角色及 Kinesis Data Streams 操作。

1. 设置持续时间：最初测试从 5-10 分钟开始。

1. 添加停止条件：的[停止条件 AWS FIS](https://docs.aws.amazon.com/fis/latest/userguide/stop-conditions.html)。

1. 运行实验：监控应用程序的行为。

**操作详细信息**
+ **资源类型**：IAM 角色 ARN
+ **目标操作**：`GetRecords`
+ ****错误代码****：`ExpiredIteratorException`（HTTP 400）
+ ****描述****：提供的迭代器超出了允许的最大期限，模拟了记录处理速度太慢或检查点操作逻辑失败的场景。

**参数**
+ **IAM 角色 ARN**：应用程序用于 Kinesis Data Streams 操作的角色。
+ **操作**：目标操作：`GetRecords`
+ **资源列表**：特定的直播名称或 ARNs。
+ **持续时间**：实验的持续时间。此项可配置。
+ **强度**：要施加限制的请求的百分比。

**所需的权限**
+ `kinesis:InjectApiError`