

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Node.js 中开发 Kinesis Client Library 消费端
<a name="kinesis-record-processor-implementation-app-nodejs"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

可以使用 Kinesis Client Library（KCL）构建处理 Kinesis 数据流中数据的应用程序。Kinesis Client Library 提供多种语言版本。本主题将讨论 Node.js。

KCL 是一个 Java 库；对 Java 以外其他语言的支持是使用名为的多语言接口提供的。*MultiLangDaemon*此进程守护程序基于 Java，当您使用 Java 以外的 KCL 语言时，该程序会在后台运行。因此，如果你安装适用于 Node.js 的 KCL 并完全用 Node.js 编写消费者应用程序，那么你仍然需要在系统上安装 Java，因为. MultiLangDaemon 此外 MultiLangDaemon ，您可能需要根据自己的用例自定义一些默认设置，例如它所连接的 AWS 区域。有关 MultiLangDaemon on 的更多信息 GitHub，请访问 [KCL MultiLangDaemon 项目](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)页面。

要从中下载 Node.js KCL GitHub，请前往 K [inesis 客户端库 (Node.js)](https://github.com/awslabs/amazon-kinesis-client-nodejs)。

**示例代码下载**

Node.js 中有两个代码示例可用于 KCL：
+ [basic-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/basic_sample)

  在下列节中用于阐释在 Node.js 中构建 KCL 消费端应用程序的基础知识。
+ [click-stream-sample](https://github.com/awslabs/amazon-kinesis-client-nodejs/tree/master/samples/click_stream_sample)

   稍微复杂一些，使用了现实世界的情景，适合在您熟悉基本示例代码之后采用。此示例在这里不做讨论，但它有一个包含更多信息的自述文件。

在 Node.js 中实现 KCL 消费端应用程序时，您必须完成下列任务：

**Topics**
+ [实现记录处理器](#kinesis-record-processor-implementation-interface-nodejs)
+ [修改配置属性](#kinesis-record-processor-initialization-nodejs)

## 实现记录处理器
<a name="kinesis-record-processor-implementation-interface-nodejs"></a>

使用适用于 Node.js 的 KCL 的最简易的潜在消费端必须实现 `recordProcessor` 函数，该函数反之包含函数 `initialize`、`processRecords` 和 `shutdown`。该示例提供了可用作起点的实现（请参阅 `sample_kcl_app.js`）。

```
function recordProcessor() {
  // return an object that implements initialize, processRecords and shutdown functions.}
```

**初始化**  
KCL 在记录处理器启动时调用 `initialize` 函数。此记录处理器只处理作为 `initializeInput.shardId` 传递的分片 ID，并且通常情况下反过来说也成立（此分片只能由此记录处理器处理）。但是，您的消费端应该考虑数据记录可能会经过多次处理的情况。这是因为 Kinesis Data Streams 具有*至少一次*语义，即分片中的每个数据记录在您的消费端中由工作程序至少处理一次。有关特定分片可能由多个工作线程处理的情况的更多信息，请参阅[使用重新分片、扩展和并行处理更改分片数量](kinesis-record-processor-scaling.md)。

```
initialize: function(initializeInput, completeCallback)
```

**processRecords**  
 KCL 使用包含一个数据记录的列表（这些记录来自在 `initialize` 函数中指定的分片）的输入来调用此函数。您实现的记录处理器根据您的消费端的语义处理这些记录中的数据。例如，工作程序可能对数据执行转换，然后将结果存储在 Amazon Simple Storage Service（Amazon S3）存储桶中。

```
processRecords: function(processRecordsInput, completeCallback)
```

除了数据本身之外，记录还包含工作程序在处理数据时可使用的序号和分区键。例如，工作线程可选择 S3 存储桶，并在其中根据分区键的值存储数据。`record` 词典公开了以下键-值对来访问记录的数据、序号和分区键：

```
record.data
record.sequenceNumber
record.partitionKey
```

请注意，数据是 Base64 编码的。

在该基本示例中，函数 `processRecords` 具有显示工作程序如何访问记录的数据、序号和分区键的代码。

Kinesis Data Streams 需要记录处理器来跟踪已在分片中处理的记录。KCL 利用作为 `processRecordsInput.checkpointer` 传递的 `checkpointer` 对象执行此跟踪。您的记录处理器将调用 `checkpointer.checkpoint` 函数以向 KCL 告知记录处理器处理分片中的记录的进度。如果工作程序失败，KCL 将在您重新启动分片的处理时使用此信息，以便在上一个已知的已处理记录处继续处理。

对于拆分或合并操作，在原始分片的处理器调用 `checkpoint` 以指示原始分片上的所有处理操作都已完成之前，KCL 不会开始处理新分片。

如果您未将序列号传递到 `checkpoint` 函数，KCL 将假定对 `checkpoint` 的调用表示所有记录都已处理，一直处理到传递到记录处理器的最后一个记录。因此，记录处理器**只**应在已处理传递到它的列表中的所有记录后才调用 `checkpoint`。记录处理器不需要在每次调用 `checkpoint` 时调用 `processRecords`。例如，处理器可以调`checkpoint`用每三次呼叫或记录处理器外部的某个事件（例如您实现的自定义 verification/validation 服务）。

您可以选择性地将某个记录的确切序号指定为 `checkpoint` 的参数。在本例中，KCL 将假定所有记录都已处理，直至处理到该记录。

基本示例应用程序显示了对 `checkpointer.checkpoint` 函数最简单的调用。您此时可以在该函数中为您的消费端添加您需要的其他检查点逻辑。

**shutdown**  
KCL 在处理结束（`shutdownInput.reason` 为 `TERMINATE`）或工作程序不再响应（`shutdownInput.reason` 为 `ZOMBIE`）时调用 `shutdown` 函数。

```
shutdown: function(shutdownInput, completeCallback)
```

处理操作在记录处理器不再从分片中接收任何记录时结束，因为分片已被拆分或合并，或者流已删除。

KCL 还会将 `shutdownInput.checkpointer` 对象传递到 `shutdown`。如果关闭原因为 `TERMINATE`，则应确保记录处理器已完成处理任何数据记录，然后对此接口调用 `checkpoint` 函数。

## 修改配置属性
<a name="kinesis-record-processor-initialization-nodejs"></a>

该示例提供了配置属性的默认值。您可使用自己的值覆盖任何这些属性（请参阅基本示例中的 `sample.properties`）。

### 应用程序名称
<a name="kinesis-record-processor-application-name-nodejs"></a>

KCL 需要一个应用程序，该应用程序在您的各个应用程序中以及同一区域的各个 Amazon DynamoDB 表中处于唯一状态。KCL 通过以下方法使用应用程序名称配置值：
+ 假定所有与此应用程序名称关联的工作程序在同一数据流上合作。这些工作程序可被分配到多个实例上。如果运行同一应用程序代码的其他实例，但使用不同的应用程序名称，则 KCL 会将第二个实例视为在同一数据流运行的完全独立的应用程序。
+ KCL 利用应用程序名称创建 DynamoDB 表并使用该表保留应用程序的状态信息（如检查点和工作程序-分片映射）。每个应用程序都有自己的 DynamoDB 表。有关更多信息，请参阅 [使用租约表跟踪 KCL 消费端应用程序处理的分片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 设置凭证
<a name="kinesis-record-processor-credentials-nodejs"></a>

您必须将您的 AWS 证书提供给默认凭证提供者链中的一个凭证提供商。可以使用 `AWSCredentialsProvider` 属性设置凭证提供程序。`sample.properties` 文件必须向[默认凭证提供程序链](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的凭证提供程序之一提供您的凭证。如果您在 Amazon EC2 实例上运行使用器，我们建议您使用 IAM 角色配置该实例。 AWS 反映与此 IAM 角色关联的权限的证书可通过实例元数据提供给实例上的应用程序。使用这种方式管理在 EC2 实例上运行的消费端应用程序的凭证最安全。

以下示例配置 KCL 以使用 `sample_kcl_app.js` 中提供的记录处理器，处理名为 `kclnodejssample` 的 Kinesis 数据流。

```
# The Node.js executable script
executableName = node sample_kcl_app.js
# The name of an Amazon Kinesis stream to process
streamName = kclnodejssample
# Unique KCL application name
applicationName = kclnodejssample
# Use default AWS credentials provider chain
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# Read from the beginning of the stream
initialPositionInStream = TRIM_HORIZON
```