

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 教程：使用 KPL 和 KCL 1.x 处理实时股票数据
<a name="tutorial-stock-data-kplkcl"></a>

本教程的场景涉及将股票交易引入数据流中并编写对流执行计算的简单 Amazon Kinesis Data Streams 应用程序。您将了解如何将记录流发送到 Kinesis Data Streams 并实现近乎实时地使用和处理记录的应用程序。

**重要**  
创建直播后，您的账户会因使用 Kinesis Data Streams 而产生象征性的费用，因为 Kinesis Data Streams 不符合 AWS 免费套餐的资格。在消费端应用程序启动后，也会象征性收取 Amazon DynamoDB 使用费用。消费端应用程序使用 DynamoDB 跟踪处理状态。在使用完此应用程序后，请删除 AWS 资源以停止产生费用。有关更多信息，请参阅 [清理 资源](tutorial-stock-data-kplkcl-finish.md)。

代码不访问实际股票市场数据，而是模拟股票交易流。它通过使用随机股票交易生成器（将截至 2015 年 2 月市值排名前 25 位的股票的实际市场数据作为起始点）来执行此操作。如果您有权访问实时的股票交易流，则可能有兴趣从该流派生有用且及时的统计数据。例如，您可能希望执行滑动窗口分析，从而确定前 5 分钟内购买的最热门股票。或者，您可能希望在销售订单过大（即具有过多股份）时收到通知。可以扩展此系列代码以提供此类功能。

您可以在台式计算机或笔记本电脑上演练本教程中的步骤，然后在同一台计算机或支持已定义要求的任何平台 [如 Amazon Elastic Compute Cloud（Amazon EC2）] 上同时运行创建器和消费端代码。

显示的示例使用的是美国西部（俄勒冈州）区域，但它们适用于[支持 Kinesis Data Streams 的任何AWS 区域](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region)。

**Topics**
+ [满足先决条件](tutorial-stock-data-kplkcl-begin.md)
+ [创建数据流](tutorial-stock-data-kplkcl-create-stream.md)
+ [创建 IAM 策略和用户](tutorial-stock-data-kplkcl-iam.md)
+ [下载和构建实现代码](tutorial-stock-data-kplkcl-download.md)
+ [实现产生器](tutorial-stock-data-kplkcl-producer.md)
+ [实现消费端](tutorial-stock-data-kplkcl-consumer.md)
+ [（可选）扩展消费端](tutorial-stock-data-kplkcl-consumer-extension.md)
+ [清理 资源](tutorial-stock-data-kplkcl-finish.md)

# 满足先决条件
<a name="tutorial-stock-data-kplkcl-begin"></a>

以下是完成 [教程：使用 KPL 和 KCL 1.x 处理实时股票数据[教程：使用 KPL 和 KCL 1.x 处理实时股票数据](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) 的要求。

## 创建和使用亚马逊云科技账户
<a name="tutorial-stock-data-kplkcl-begin-aws"></a>

在开始之前，请确保熟悉[Amazon Kinesis Data Streams 术语和概念](key-concepts.md)中讨论的概念，特别是流、分片、创建者和消费端。参阅[教程：为 Kinesis Data AWS CLI Streams 安装和配置](kinesis-tutorial-cli-installation.md)也很有帮助。

您需要一个 AWS 帐户和一个网络浏览器才能访问 AWS 管理控制台.

要访问控制台，请使用您的 IAM 用户名和密码从 IAM 登录页面登录 [AWS 管理控制台](https://console.aws.amazon.com/console/home)。有关 AWS 安全证书的信息，包括编程访问权限和长期证书的替代方案，请参阅 *IAM 用户指南*中的[AWS 安全证书](https://docs.aws.amazon.com/IAM/latest/UserGuide/security-creds.html)。有关登录您的的详细信息 AWS 账户，请参阅*《AWS 登录 用户指南》 AWS*[中的如何登录](https://docs.aws.amazon.com/signin/latest/userguide/how-to-sign-in.html)。

有关 IAM 和安全密钥设置说明的更多信息，请参阅[创建 IAM 用户](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/get-set-up-for-amazon-ec2.html#create-an-iam-user)。

## 满足系统软件要求
<a name="tutorial-stock-data-kplkcl-begin-sys"></a>

用于运行应用程序的系统必须已安装 Java 7 或更高版本。要下载和安装最新 Java 开发工具包 (JDK)，请转到 [Oracle 的 Java SE 安装站点](http://www.oracle.com/technetwork/java/javase/downloads/index.html)。

如果您具有 Java IDE（如 [Eclipse](https://www.eclipse.org/downloads/)），则可打开源代码，然后编辑、构建并运行它。

您需要最新的 [适用于 Java 的 AWS SDK](https://aws.amazon.com/sdk-for-java/) 版本。如果您将 Eclipse 用作 IDE，则可改为安装 [AWS Toolkit for Eclipse](https://aws.amazon.com/eclipse/)。

[消费者应用程序需要 Kinesis 客户端库 (KCL) 1.2.1 或更高版本，您可以从 K GitHub inesis 客户端库 (Java) 中获取该版本。](https://github.com/awslabs/amazon-kinesis-client)

## 后续步骤
<a name="tutorial-stock-data-kplkcl-begin-next"></a>

[创建数据流](tutorial-stock-data-kplkcl-create-stream.md)

# 创建数据流
<a name="tutorial-stock-data-kplkcl-create-stream"></a>

在 [教程：使用 KPL 和 KCL 1.x 处理实时股票数据[教程：使用 KPL 和 KCL 1.x 处理实时股票数据](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) 的第一步中，创建后续步骤中将用到的流。

**创建流**

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 在导航栏中，展开区域选择器并选择一个区域。

1. 选择 **Create Kinesis stream (创建 Kinesis 流)**。

1. 输入流的名称（例如，**StockTradeStream**）。

1. 在分片数量中输入 **1**，但保留**估计您需要的分片数量**为折叠状态。

1. 选择 **Create Kinesis stream (创建 Kinesis 流)**。

在 **Kinesis 流**列表页面上，流状态在创建流的过程中为 `CREATING`。当流可以使用时，状态会更改为 `ACTIVE`。选择流的名称。在显示的页面中，**Details (详细信息)** 选项卡会显示您的流配置摘要。**Monitoring (监控)** 部分显示流的监控信息。

## 有关分片的其他信息
<a name="tutorial-stock-data-kplkcl-create-stream-info"></a>

在本教程之外开始使用 Kinesis Data Streams 时，可能需要更仔细地计划流创建过程。在配置分片时，您应规划预计最大需求。以此方案为例，美国股票市场某一天（东部时间）的交易流量峰值以及需求估计值应该从这一天的时间中采样。随后，您可以选择配置最大预计需求，或扩大或缩小流以响应需求波动。

*分片* 是吞吐容量的单位。在**创建 Kinesis 流**页面中，展开**估计您需要的分片数量**。根据以下准则输入平均记录大小、每秒写入的最大记录数以及使用应用程序数量：

**平均记录大小**  
您的记录的计算平均大小的估计值。如果您不知道此值，请使用估计的最大记录大小作为此值。

**最大写入记录数**  
考虑提供数据的实体的数量以及每个实体每秒生成的记录的大约数量。例如，如果要从 20 台交易服务器获取股票交易数据，并且每台服务器每秒生成 250 次交易，则每秒的交易（记录）总数为 5000。

**使用的应用程序数**  
应用程序的数量，这些应用程序单独从流进行读取以采用不同的方式处理流并生成不同的输出。每个应用程序可具有在不同计算机上运行（即在群集中运行）的多个实例，以便能跟进大容量流。

如果显示的估计分片数量超过当前分片数量限制，则可能需要先提交提高限制的请求，然后才能创建具有此分片数量的流。要请求增大分片限制，请使用 [Kinesis Data Streams 限制表单](https://console.aws.amazon.com/support/home#/case/create?issueType=service-limit-increase&limitType=service-code-kinesis)。有关流和分片的更多信息，请参阅 [创建和管理 Kinesis 数据流](working-with-streams.md)。

## 后续步骤
<a name="tutorial-stock-data-kplkcl-create-stream-next"></a>

[创建 IAM 策略和用户](tutorial-stock-data-kplkcl-iam.md)

# 创建 IAM 策略和用户
<a name="tutorial-stock-data-kplkcl-iam"></a>

的安全最佳实践 AWS 要求使用细粒度的权限来控制对不同资源的访问权限。 AWS Identity and Access Management (IAM) 允许您在中管理用户和用户权限 AWS。[IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/PoliciesOverview.html) 明确列出了允许的操作以及这些操作适用于的资源。

下面是 Kinesis Data Streams 创建器和消费端通常需要的最低权限。


**Producer**  

| 操作 | 资源 | 用途 | 
| --- | --- | --- | 
| DescribeStream, DescribeStreamSummary, DescribeStreamConsumer | Kinesis 数据流 | 在尝试写入记录之前，创建者会检查流是否存在且处于活动状态、分片是否包含在流中，以及流是否具有消费端。 | 
| SubscribeToShard, RegisterStreamConsumer | Kinesis 数据流 | 订阅 Kinesis 数据流分片并注册一个消费端。 | 
| PutRecord, PutRecords | Kinesis 数据流 | 将记录写入 Kinesis Data Streams。 | 


**消费端**  

| **操作** | **资源** | **目的** | 
| --- | --- | --- | 
| DescribeStream | Kinesis 数据流 | 在尝试读取记录前，消费端需检查流是否存在并处于活动状态，以及分片是否包含在流中。 | 
| GetRecords, GetShardIterator  | Kinesis 数据流 | 从 Kinesis Data Streams 分片读取记录。 | 
| CreateTable, DescribeTable, GetItem, PutItem, Scan, UpdateItem | Amazon DynamoDB 表 | 如果消费端是使用 Kinesis Client Library（KCL）开发的，则需要 DynamoDB 表的权限才能跟踪应用程序的处理状态。第一个消费端已开始创建表。 | 
| DeleteItem | Amazon DynamoDB 表 | 当使用者对 Kinesis Data Streams 分片执行 split/merge 操作时。 | 
| PutMetricData | 亚马逊 CloudWatch 日志 | KCL 还会将指标上传到 CloudWatch，这对于监控应用程序非常有用。 | 

对于此应用程序，请创建授予上述所有权限的单个 IAM policy。实际上，您可能需要考虑创建两个策略，一个策略适用于创建器，另一个策略适用于消费端。

**创建 IAM policy**

1. 找到新流的 Amazon 资源名称 (ARN)。您可以在**详细信息**选项卡顶部找到作为**流 ARN** 列出的此 ARN。ARN 格式如下所示：

   ```
   arn:aws:kinesis:region:account:stream/name
   ```  
*region*  
区域代码，例如 `us-west-2`。有关更多信息，请参阅[区域和可用区域概念](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-regions-availability-zones)。  
*账户*  
 AWS 账户 ID，如[账户设置](https://console.aws.amazon.com/billing/home?#/account)中所示。  
*name*  
[创建数据流](tutorial-stock-data-kplkcl-create-stream.md) 中的流名称，即 `StockTradeStream`。

1. 确定要由消费端使用（并由第一个消费端实例创建）的 DynamoDB 表的 ARN。它必须采用以下格式：

   ```
   arn:aws:dynamodb:region:account:table/name
   ```

   区域和账户来自上一步骤中的相同位置，但这一次，*名称* 为消费端应用程序创建和使用的表的名称。消费端所使用的 KCL 将应用程序名称用作表名称。使用 `StockTradesProcessor`，它是稍后使用的应用程序名称。

1. 在 IAM 控制台的**策略** (h [https://console.aws.amazon.com/iam/ome \$1policies](https://console.aws.amazon.com/iam/home#policies)) 中，选择**创建策略**。如果这是您首次使用 IAM policy，请依次选择**开始使用**、**创建策略**。

1. 在 **Policy Generator** 旁，选择 **Select**。

1. 选择 **Amazon Kinesis** 作为服务。 AWS 

1. 选择 `DescribeStream`、`GetShardIterator`、`GetRecords`、`PutRecord` 和 `PutRecords` 作为允许的操作。

1. 输入您在步骤 1 中创建的 ARN。

1. 对以下各项使用 **Add Statement (添加语句)**：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/streams/latest/dev/tutorial-stock-data-kplkcl-iam.html)

   在不需要指定 ARN 时使用的星号 (`*`)。在这种情况下，这是因为没有用于调用`PutMetricData`操作 CloudWatch 的特定资源。

1. 选择**下一步**。

1. 将 **Policy Name (策略名称)** 更改为 `StockTradeStreamPolicy`，审阅代码，然后选择 **Create Policy (创建策略)**。

生成的策略文档应类似于以下内容：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-west-2:111122223333:stream/StockTradeStream/*"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-west-2:111122223333:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**若要创建 IAM 用户**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 在 **Users (用户)** 页面上，选择 **Add user (添加用户)**。

1. 对于 **User name**，键入 `StockTradeStreamUser`。

1. 对于 **Access type (访问类型)**，选择 **Programmatic access (编程访问)**，然后选择 **Next: Permissions (下一步: 权限)**。

1. 选择**直接附上现有策略**。

1. 按名称搜索您创建的策略。选中策略名称左侧的框，然后选择 **Next: Review (下一步: 审核)**。

1. 查看详细信息和摘要，然后选择 **Create user (创建用户)**。

1. 复制 **Access key ID (访问密钥 ID)**，并将其私下保存。在 **Secret access key (私有访问密钥)** 下面选择 **Show (显示)**，然后也将该密钥私下保存。

1. 将访问密钥和私有密钥粘贴到一个只有您可以访问的位于安全位置的本地文件中。对于此应用程序，请创建名为 ` ~/.aws/credentials`（具有严格权限）的文件。该文件应采用以下格式：

   ```
   [default]
   aws_access_key_id=access key
   aws_secret_access_key=secret access key
   ```

**将 IAM policy 附加到用户**

1. 在 IAM 控制台中打开[策略](https://console.aws.amazon.com/iam/home?#policies)，然后选择**策略操作**。

1. 选择 `StockTradeStreamPolicy` 和 **Attach (附加)**。

1. 选择 `StockTradeStreamUser` 和 **Attach Policy (附加策略)**。

## 后续步骤
<a name="tutorial-stock-data-kplkcl-iam-next"></a>

[下载和构建实现代码](tutorial-stock-data-kplkcl-download.md)

# 下载和构建实现代码
<a name="tutorial-stock-data-kplkcl-download"></a>

已提供 [教程：使用 KPL 和 KCL 1.x 处理实时股票数据](tutorial-stock-data-kplkcl.md) 的框架代码。它包含用于股票交易流引入（*创建器*）和数据处理（*消费端*）的存根实施。以下过程演示如何完成实现。

**下载和构建实施代码**

1. 将[源代码](https://github.com/awslabs/amazon-kinesis-learning/tree/learning-module-1)下载到计算机上。

1. 按照提供的目录结构，使用源代码在您喜爱的 IDE 中创建一个项目。

1. 将以下库添加到该项目中：
   + Amazon Kinesis Client Library（KCL）
   + AWS SDK
   + Apache HttpCore
   + Apache HttpClient
   + Apache Commons Lang
   + Apache Commons Logging
   + Guava (适用于 Java 的 Google 核心库)
   + Jackson Annotations
   + Jackson Core
   + Jackson Databind
   + Jackson Dataformat：CBOR
   + Joda Time

1. 根据您的 IDE，项目可能会自动构建。如果未自动构建项目，请使用适合您的 IDE 的步骤构建项目。

如果已成功完成这些步骤，则可进入下一节 [实现产生器](tutorial-stock-data-kplkcl-producer.md)。如果您的构建在任何阶段出错，请调查并纠正错误，然后再继续。

## 后续步骤
<a name="tutorial-stock-data-kplkcl-download-next"></a>

[[实现产生器](tutorial-stock-data-kplkcl-producer.md)实现产生器](tutorial-stock-data-kplkcl-producer.md)

# 实现产生器
<a name="tutorial-stock-data-kplkcl-producer"></a>



[教程：使用 KPL 和 KCL 1.x 处理实时股票数据[教程：使用 KPL 和 KCL 1.x 处理实时股票数据](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) 中的应用程序使用实际股票市场交易监控场景。以下准则简要说明了此场景如何映射到创建器和支持的代码结构。

请参阅源代码并查看以下信息。

**StockTrade 班级**  
单次股票交易由一个 `StockTrade` 类实例表示。此实例包含一些属性，如股票代号、价格、股份数、交易类型（买入或卖出）以及唯一标识交易的 ID。将为您实现此类。

**流记录**  
流是一个记录序列。记录是 JSON 格式的 `StockTrade` 实例序列化。例如：  

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator 班级**  
`StockTradeGenerator` 包含一个名为 `getRandomTrade()` 的方法，当调用此方法时，它将返回一个随机生成的新股票交易。将为您实现此类。

**StockTradesWriter 班级**  
创建器的 `main` 方法 `StockTradesWriter` 将持续检索随机交易，然后通过执行以下任务将该交易发送到 Kinesis Data Streams：  

1. 将流名称和区域名称作为输入读取。

1. 创建一个 `AmazonKinesisClientBuilder`。

1. 使用客户端生成器来设置区域、凭证和客户端配置。

1. 使用客户端生成器构建一个 `AmazonKinesis` 客户端。

1. 检查流是否存在且处于活动状态 (如果不是这样，它将退出并显示错误)。

1. 在连续循环中，会依次调用 `StockTradeGenerator.getRandomTrade()` 方法和 `sendStockTrade` 方法以便每 100 毫秒将交易发送到流一次。
`sendStockTrade` 类的 `StockTradesWriter` 方法具有以下代码：  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

请参阅以下代码细分：
+ `PutRecord` API 需要一个字节数组，并且您必须将 `trade` 转换为 JSON 格式。此行代码将执行该操作：

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 您需要先创建新的 `PutRecordRequest` 实例（此示例中称为 `putRecord`），然后才能发送交易：

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  每个 `PutRecord` 调用均需要流名称、分区键和数据 Blob。以下代码使用 `putRecord` 对象的 `setXxxx()` 方法来填充该对象中的这些字段：

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  该示例使用股票行情自动收录器作为将记录映射到特定分片的分区键。实际上，每个分片应具有数百或数千个分区键，以便记录均匀地分布在流中。有关如何将数据添加到流的更多信息，请参阅 [向流添加数据](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)。

  现在 `putRecord` 已准备好发送到客户端（`put` 操作）：

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ 错误检查和日志记录始终是有用的附加功能。此代码将记录错误条件：

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  在`put`操作周围添加 try/catch 方块：

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  这是因为 Kinesis Data Streams `put` 操作可能因网络错误或数据流达到其吞吐量限额并受到限制而导致失败。建议您仔细考虑针对 `put` 操作的重试策略，以避免数据丢失，例如使用重试。
+ 状态日志记录很有用，但它是可选的：

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
此处显示的创建器使用 Kinesis Data Streams API 单记录功能 `PutRecord`。实际上，如果单个创建者生成许多记录，则使用 `PutRecords` 的多记录功能并一次性发送批量记录通常会更有效。有关更多信息，请参阅 [向流添加数据](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)。

**运行创建器**

1. 验证之前（在创建 IAM 用户时）检索到的访问密钥和私有密钥对是否保存到文件 `~/.aws/credentials` 中。

1. 使用以下参数运行 `StockTradeWriter` 类：

   ```
   StockTradeStream us-west-2
   ```

   如果您在 `us-west-2` 之外的区域中创建流，则必须改为在此处指定该区域。

您应该可以看到类似于如下所示的输出内容：

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

您的股票交易流现在正由 Kinesis Data Streams 摄取。

## 后续步骤
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[实现消费端](tutorial-stock-data-kplkcl-consumer.md)

# 实现消费端
<a name="tutorial-stock-data-kplkcl-consumer"></a>

[教程：使用 KPL 和 KCL 1.x 处理实时股票数据[教程：使用 KPL 和 KCL 1.x 处理实时股票数据](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) 中的消费端应用程序持续处理您在 [[实现产生器](tutorial-stock-data-kplkcl-producer.md)实现产生器](tutorial-stock-data-kplkcl-producer.md) 中创建的股票交易流。然后，它输出每分钟买入和卖出最多的股票。该应用程序基于 Kinesis Client Library（KCL）构建，后者需要完成对消费端应用程序常见的大量繁重工作。有关更多信息，请参阅 [开发 KCL 1.x 消费端](developing-consumers-with-kcl.md)。

请参阅源代码并查看以下信息。

**StockTradesProcessor 班级**  
为您提供的消费端的主类，它将执行以下任务：  
+ 读取作为参数传递的应用程序名称、流名称和区域名称。
+ 从 `~/.aws/credentials` 读取凭证。
+ 创建一个 `RecordProcessorFactory` 实例，该实例提供由 `RecordProcessor` 实例实施的 `StockTradeRecordProcessor` 的实例。
+ 利用 `RecordProcessorFactory` 实例和标准配置（包括流名称、凭证和应用程序名称）创建 KCL 工作程序。
+ 此工作程序为每个分片（已分配给此消费端实例）创建一个线程，以持续循环读取 Kinesis Data Streams 中的记录。之后，它调用 `RecordProcessor` 实例以处理收到的每批记录。

**StockTradeRecordProcessor 班级**  
`RecordProcessor` 实例的实施，该实例反过来将实施三个必需方法：`initialize`、`processRecords` 和 `shutdown`。  
正如其名称所示，`initialize` 和 `shutdown` 由 Kinesis Client Library 使用，旨在让记录处理器了解何时应准备好开始接收记录以及何时应停止接收记录，因此该方法可以执行任何特定于应用程序的设置和终止任务。将为您提供这些方法的代码。`processRecords` 方法中进行的主要处理，该处理反过来对每条记录使用 `processRecord`。后一个方法主要作为空框架代码提供给您，以便您在下一步骤中实施，届时将进一步对其进行说明。  
另外要注意的是 `processRecord` 的支持方法 `reportStats` 和 `resetStats` 的实施，二者在初始源代码中为空。  
已为您实施 `processRecords` 方法，并执行了以下步骤：  
+  对于传入的每条记录，对其调用 `processRecord`。
+ 如果自上一次报告以来已过去至少 1 分钟，请调用 `reportStats()`（它将打印出最新统计数据），然后调用 `resetStats()`（它将清除统计数据以便下一个间隔仅包含新记录）。
+ 设置下一次报告时间。
+ 如果自上一检查点以来已过去至少 1 分钟，请调用 `checkpoint()`。
+ 设置下一次检查点操作时间。
此方法使用 60 秒间隔作为报告和检查点操作比率。有关检查点操作的更多信息，请参阅 [有关消费端的其他信息](#tutorial-stock-data-kplkcl-consumer-supplement)。

**StockStats 班级**  
此类提供一段时间内针对最热门股票的数据保留和统计数据跟踪。此代码已提供给您并包含以下方法：  
+ `addStockTrade(StockTrade)`：将给定的 `StockTrade` 注入正在使用的统计数据。
+ `toString()`：以格式化字符串形式返回统计数据。
此类跟踪最热门股票的方式是，保留每只股票的总交易数的连续计数和最大计数。每当股票交易达成时，它都会更新这些计数。

将代码添加到 `StockTradeRecordProcessor` 类的方法，如以下步骤中所示。

**实施消费端**

1. 通过实例化大小正确的 `processRecord` 对象并将记录数据添加到该对象来实施 `StockTrade` 方法，并在出现问题时记录警告。

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. 实施简单的 `reportStats` 方法。可随时将输出格式修改为您的首选格式。

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. 最后，实施 `resetStats` 方法，这将创建新的 `stockStats` 实例。

   ```
   stockStats = new StockStats();
   ```

**运行消费端**

1. 运行您在 [[实现产生器](tutorial-stock-data-kplkcl-producer.md)实现产生器](tutorial-stock-data-kplkcl-producer.md) 中编写的创建者以将模拟股票交易记录引入流中。

1. 验证之前（在创建 IAM 用户时）检索到的访问密钥和私有密钥对是否保存到文件 `~/.aws/credentials` 中。

1. 使用以下参数运行 `StockTradesProcessor` 类：

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   请注意，如果您在 `us-west-2` 之外的区域中创建流，则必须改为在此处指定该区域。

1 分钟后，您应看到类似以下内容的输出，并且输出在此后每分钟刷新一次：

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## 有关消费端的其他信息
<a name="tutorial-stock-data-kplkcl-consumer-supplement"></a>

如果熟悉 Kinesis Client Library 的好处（在 [开发 KCL 1.x 消费端](developing-consumers-with-kcl.md) 中和其他位置已讨论），您可能想知道为何应在此处使用它。虽然您只使用单个分片流和单个消费端实例来处理它，但使用 KCL 实施消费端仍会更轻松。将创建器部分中的代码实施步骤与消费端部分中的进行对比，您会发现实施消费端相对来说容易一些。这主要是因为 KCL 提供的服务。

在此应用程序中，您专注于实施可处理单条记录的记录处理器类。您无需担心如何从 Kinesis Data Streams 提取记录；当有可用的新记录时，KCL 就会提取这些记录并调用记录处理器。此外，不必担心分片和消费端实例的数量。如果已扩展流，则不必重写应用程序以处理多个分片或多个消费端实例。

*检查点*一词是指记录流中的点，其中直到迄今为止已经使用和处理的数据记录。如果应用程序崩溃，则从该点读取流，而不是从流的开头进行读取。检查点操作主题、各种设计模式及其最佳实践不在本章讨论范围之内。但是，生产环境中可能会涉及上述内容。

正如您在 [[实现产生器](tutorial-stock-data-kplkcl-producer.md)实现产生器](tutorial-stock-data-kplkcl-producer.md) 中了解到的，Kinesis Data Streams API 中的 `put` 操作将采用*分区键*作为输入。Kinesis Data Streams 使用分区键作为跨多个分片拆分记录的机制（当流中有多个分片时）。相同的分区键将始终路由到同一个分片。这使得能够基于以下假设来设计用于处理特定分片的消费端：具有相同分区键的记录只会发送给该消费端，具有相同分区键的任何记录都不会在任何其他消费端处结束。因此，消费端的工作程序可聚合具有相同分区键的所有记录而不用担心丢失所需的数据。

在此应用程序中，消费端对记录的处理并不集中，因此您可以使用一个分片并在与 KCL 线程相同的线程中执行处理。但在实际应用中，请先考虑增加分片数量。在某些情况下，您可能需要将处理切换到其他线程或需要使用线程池（如果您的记录处理应是集中的）。这样一来，KCL 可以更快地提取新记录，而其他线程可并行处理记录。多线程设计并不是无关紧要的，应使用先进技术来实现，因此增加分片计数通常是最有效的纵向扩展方法。

## 后续步骤
<a name="tutorial-stock-data-kplkcl-consumer-next"></a>

[（可选）扩展消费端](tutorial-stock-data-kplkcl-consumer-extension.md)

# （可选）扩展消费端
<a name="tutorial-stock-data-kplkcl-consumer-extension"></a>

[教程：使用 KPL 和 KCL 1.x 处理实时股票数据[教程：使用 KPL 和 KCL 1.x 处理实时股票数据](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) 中的应用程序可能已足以达到您的目的。此可选部分演示如何针对更为复杂的场景扩展消费端代码。

如果要了解每分钟的最大销售订单数，可以修改三个位置的 `StockStats` 类以适应此新的优先级。

**扩展消费端**

1. 添加新实例变量：

   ```
    // Ticker symbol of the stock that had the largest quantity of shares sold 
    private String largestSellOrderStock;
    // Quantity of shares for the largest sell order trade
    private long largestSellOrderQuantity;
   ```

1. 将以下代码添加到 `addStockTrade`：

   ```
    if (type == TradeType.SELL) {
        if (largestSellOrderStock == null || trade.getQuantity() > largestSellOrderQuantity) {
            largestSellOrderStock = trade.getTickerSymbol();
            largestSellOrderQuantity = trade.getQuantity();
        }
    }
   ```

1. 修改 `toString` 方法以打印其他信息：

   ```
    public String toString() {
        return String.format(
                "Most popular stock being bought: %s, %d buys.%n" +
                "Most popular stock being sold: %s, %d sells.%n" +
                "Largest sell order: %d shares of %s.",
                getMostPopularStock(TradeType.BUY), getMostPopularStockCount(TradeType.BUY),
                getMostPopularStock(TradeType.SELL), getMostPopularStockCount(TradeType.SELL),
                largestSellOrderQuantity, largestSellOrderStock);
    }
   ```

如果您现在运行消费端（请记住同时运行创建器），则应看到类似于以下内容的输出：

```
 ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  Largest sell order: 996 shares of BUD.
  ****************************************************************
```

## 后续步骤
<a name="tutorial-stock-data-kplkcl-consumer-extension-next"></a>

[清理 资源](tutorial-stock-data-kplkcl-finish.md)

# 清理 资源
<a name="tutorial-stock-data-kplkcl-finish"></a>

由于您需要付费使用 Kinesis 数据流，请确保在使用完后删除流和相应的 Amazon DynamoDB 表。即使您不发送和获取记录，活动流也会产生象征性的费用。这是因为活动流将持续“侦听”传入记录和获取记录的请求，这将耗用资源。

**删除流和表**

1. 关闭您可能仍在运行的任何创建者和消费端。

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 选择为此应用程序创建的流 (`StockTradeStream`)。

1. 选择 **Delete Stream (删除流)**。

1. 打开 DynamoDB 控制台，网址为。[https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)

1. 删除 `StockTradesProcessor` 表。

## Summary
<a name="tutorial-stock-data-kplkcl-summary"></a>

近乎实时处理大量数据不需要编写任何复杂代码或开发大型基础设施。这就像编写逻辑来处理少量数据（如编写 `processRecord(Record)`）一样简单，但要使用 Kinesis Data Streams 进行扩展，才能让该逻辑处理大量流数据。您无需担心处理的扩展方式，因为 Kinesis Data Streams 将为您完成这一工作。您只需将流记录发送到 Kinesis Data Streams 并编写用于处理收到的每条新记录的逻辑。

以下是针对此应用程序的一些可能的改进。

**跨所有分片进行聚合**  
当前，通过聚合单个分片中单个工作线程收到的数据记录来获得统计数据。（一个分片不能同时由单个应用程序中的多个工作线程处理。） 当然，当您扩展并具有多个分片时，可能希望跨所有分片聚合。可通过部署管道架构完成此操作。在该架构中，每个工作线程的输出都注入具有单个分片的另一个流，分片由聚合第一个阶段输出的工作线程处理。由于来自第一个阶段的数据是有限的（每分片每分钟一个示例），因此一个分片即可轻松处理它。

**缩放处理**  
当流进行扩展以包含多个分片（因为多个创建器正在发送数据）时，扩展处理的方式是添加更多工作程序。可以在 Amazon EC2 实例中运行工作程序并使用自动扩缩组。

**使用连接器连接亚马逊 S3/ DynamoDB/Amazon Redshift/Storm**  
在连续处理流时，其输出可以发送到其他目的地。 AWS 提供了[用于将 Kinesis Data Streams AWS 与其他服务和第三方工具集成的连接器](https://github.com/awslabs/amazon-kinesis-connectors)。

## 后续步骤
<a name="tutorial-stock-data-kplkcl-next-steps"></a>
+ 有关使用 Kinesis Data Streams API 操作的更多信息，请参阅 [使用 Amazon Kinesis Data Streams API 开发制作人 适用于 Java 的 AWS SDK](developing-producers-with-sdk.md)、[使用开发吞吐量共享的消费者 适用于 Java 的 AWS SDK](developing-consumers-with-sdk.md) 和 [创建和管理 Kinesis 数据流](working-with-streams.md)。
+ 有关 Kinesis Client Library 的更多信息，请参阅 [开发 KCL 1.x 消费端](developing-consumers-with-kcl.md)。
+ 有关如何优化应用程序的更多信息，请参阅 [优化 Amazon Kinesis Data Streams 消费端优化 Kinesis Data Streams 消费端](advanced-consumers.md)。