

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 实现产生器
<a name="tutorial-stock-data-kplkcl-producer"></a>



[教程：使用 KPL 和 KCL 1.x 处理实时股票数据[教程：使用 KPL 和 KCL 1.x 处理实时股票数据](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) 中的应用程序使用实际股票市场交易监控场景。以下准则简要说明了此场景如何映射到创建器和支持的代码结构。

请参阅源代码并查看以下信息。

**StockTrade 班级**  
单次股票交易由一个 `StockTrade` 类实例表示。此实例包含一些属性，如股票代号、价格、股份数、交易类型（买入或卖出）以及唯一标识交易的 ID。将为您实现此类。

**流记录**  
流是一个记录序列。记录是 JSON 格式的 `StockTrade` 实例序列化。例如：  

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator 班级**  
`StockTradeGenerator` 包含一个名为 `getRandomTrade()` 的方法，当调用此方法时，它将返回一个随机生成的新股票交易。将为您实现此类。

**StockTradesWriter 班级**  
创建器的 `main` 方法 `StockTradesWriter` 将持续检索随机交易，然后通过执行以下任务将该交易发送到 Kinesis Data Streams：  

1. 将流名称和区域名称作为输入读取。

1. 创建一个 `AmazonKinesisClientBuilder`。

1. 使用客户端生成器来设置区域、凭证和客户端配置。

1. 使用客户端生成器构建一个 `AmazonKinesis` 客户端。

1. 检查流是否存在且处于活动状态 (如果不是这样，它将退出并显示错误)。

1. 在连续循环中，会依次调用 `StockTradeGenerator.getRandomTrade()` 方法和 `sendStockTrade` 方法以便每 100 毫秒将交易发送到流一次。
`sendStockTrade` 类的 `StockTradesWriter` 方法具有以下代码：  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

请参阅以下代码细分：
+ `PutRecord` API 需要一个字节数组，并且您必须将 `trade` 转换为 JSON 格式。此行代码将执行该操作：

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 您需要先创建新的 `PutRecordRequest` 实例（此示例中称为 `putRecord`），然后才能发送交易：

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  每个 `PutRecord` 调用均需要流名称、分区键和数据 Blob。以下代码使用 `putRecord` 对象的 `setXxxx()` 方法来填充该对象中的这些字段：

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  该示例使用股票行情自动收录器作为将记录映射到特定分片的分区键。实际上，每个分片应具有数百或数千个分区键，以便记录均匀地分布在流中。有关如何将数据添加到流的更多信息，请参阅 [向流添加数据](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)。

  现在 `putRecord` 已准备好发送到客户端（`put` 操作）：

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ 错误检查和日志记录始终是有用的附加功能。此代码将记录错误条件：

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  在`put`操作周围添加 try/catch 方块：

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  这是因为 Kinesis Data Streams `put` 操作可能因网络错误或数据流达到其吞吐量限额并受到限制而导致失败。建议您仔细考虑针对 `put` 操作的重试策略，以避免数据丢失，例如使用重试。
+ 状态日志记录很有用，但它是可选的：

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
此处显示的创建器使用 Kinesis Data Streams API 单记录功能 `PutRecord`。实际上，如果单个创建者生成许多记录，则使用 `PutRecords` 的多记录功能并一次性发送批量记录通常会更有效。有关更多信息，请参阅 [向流添加数据](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)。

**运行创建器**

1. 验证之前（在创建 IAM 用户时）检索到的访问密钥和私有密钥对是否保存到文件 `~/.aws/credentials` 中。

1. 使用以下参数运行 `StockTradeWriter` 类：

   ```
   StockTradeStream us-west-2
   ```

   如果您在 `us-west-2` 之外的区域中创建流，则必须改为在此处指定该区域。

您应该可以看到类似于如下所示的输出内容：

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

您的股票交易流现在正由 Kinesis Data Streams 摄取。

## 后续步骤
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[实现消费端](tutorial-stock-data-kplkcl-consumer.md)