

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 實作生產者
<a name="tutorial-stock-data-kplkcl-producer"></a>



[教學課程：使用 KPL 和 KCL 1.x 處理即時庫存資料[教學課程：使用 KPL 和 KCL 1.x 處理即時庫存資料](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md)所述的應用程式使用真實情境的股票市場交易監控。以下原則簡要說明此情境如何對應到生產者和支援的程式碼結構。

請查看原始碼並對照檢閱以下資訊。

**StockTrade 類別**  
單次股票交易是由 `StockTrade` 類別的執行個體表示。此執行個體包含若干屬性，如股票代號、價格、股份數、交易類型 (買進或賣出) 以及唯一識別該交易的 ID。程式碼已為您實作此類別。

**串流記錄**  
串流是一連串的記錄。記錄則是 JSON 格式的序列化 `StockTrade` 執行個體。例如：  

```
{
  "tickerSymbol": "AMZN", 
  "tradeType": "BUY", 
  "price": 395.87,
  "quantity": 16, 
  "id": 3567129045
}
```

**StockTradeGenerator 類別**  
`StockTradeGenerator` 具有 `getRandomTrade()` 方法，每次叫用後將傳回隨機產生的新股票交易。程式碼已為您實作此類別。

**StockTradesWriter 類別**  
生產者的 `main` 方法 `StockTradesWriter` 會持續擷取隨機交易，然後透過執行以下任務將該交易傳送至 Kinesis Data Streams：  

1. 讀取串流名稱和區域名稱做為輸入。

1. 建立 `AmazonKinesisClientBuilder`。

1. 使用用戶端建置器設定區域、登入資料和用戶端組態。

1. 使用用戶端建置器建置 `AmazonKinesis` 用戶端。

1. 檢查串流是否存在且處於作用中狀態 (否則將結束此方法並顯示錯誤)。

1. 在連續迴圈中依序呼叫 `StockTradeGenerator.getRandomTrade()` 方法和 `sendStockTrade` 方法，每隔 100 毫秒將交易傳送至串流。
`sendStockTrade` 類別的 `StockTradesWriter` 方法含有以下程式碼：  

```
private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) {
    byte[] bytes = trade.toJsonAsBytes();
    // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
    if (bytes == null) {
        LOG.warn("Could not get JSON bytes for stock trade");
        return;
    }
    
    LOG.info("Putting trade: " + trade.toString());
    PutRecordRequest putRecord = new PutRecordRequest();
    putRecord.setStreamName(streamName);
    // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
    putRecord.setPartitionKey(trade.getTickerSymbol());
    putRecord.setData(ByteBuffer.wrap(bytes));

    try {
        kinesisClient.putRecord(putRecord);
    } catch (AmazonClientException ex) {
        LOG.warn("Error sending record to Amazon Kinesis.", ex);
    }
}
```

請參閱以下的程式碼詳解：
+ `PutRecord` API 需要位元組陣列，您必須`trade`轉換為 JSON 格式。這一行程式碼將執行該項操作：

  ```
  byte[] bytes = trade.toJsonAsBytes();
  ```
+ 傳送交易之前，您必須先建立新的 `PutRecordRequest` 執行個體 (本例中其名稱為 `putRecord`)：

  ```
  PutRecordRequest putRecord = new PutRecordRequest();
  ```

  每次呼叫 `PutRecord` 都需要串流名稱、分割區索引鍵和資料 Blob。以下程式碼使用 `putRecord` 物件的 `setXxxx()` 方法填入這些欄位：

  ```
  putRecord.setStreamName(streamName);
  putRecord.setPartitionKey(trade.getTickerSymbol());
  putRecord.setData(ByteBuffer.wrap(bytes));
  ```

  本範例使用股票代號做為分割區索引鍵，將記錄對應到特定碎片。實際上，每個碎片應該會有成千上百的分割區索引鍵，使記錄均勻地分佈於串流中。如需如何加入資料至串流的詳細資訊，請參閱[將資料新增至串流](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)。

  現在 `putRecord` 已準備好傳送用戶端 (`put` 操作)：

  ```
  kinesisClient.putRecord(putRecord);
  ```
+ 錯誤檢查和日誌記錄肯定是頗為實用的附加功能。此程式碼將記錄錯誤情況：

  ```
  if (bytes == null) {
      LOG.warn("Could not get JSON bytes for stock trade");
      return;
  }
  ```

  在 `put` 操作的周圍加入 try/catch 區塊：

  ```
  try {
         kinesisClient.putRecord(putRecord);
  } catch (AmazonClientException ex) {
         LOG.warn("Error sending record to Amazon Kinesis.", ex);
  }
  ```

  這麼做是因為 Kinesis Data Streams `put` 操作可能由於網路錯誤或是串流達到其輸送量限制並受到調節而導致失敗。我們建議您仔細考慮`put`操作的重試政策，以避免資料遺失，例如使用重試。
+ 狀態記錄也很實用，但可有可無：

  ```
  LOG.info("Putting trade: " + trade.toString());
  ```
此處所示的生產者是使用 Kinesis Data Streams API 的單一記錄功能 `PutRecord`。實際上，如果個別的生產者將產生許多記錄，則使用 `PutRecords` 的多筆記錄功能並一次性傳送各個批次的記錄通常會更有效率。如需詳細資訊，請參閱[將資料新增至串流](developing-producers-with-sdk.md#kinesis-using-sdk-java-add-data-to-stream)。

**執行生產者**

1. 確認稍早 (建立 IAM 使用者 使用者時) 擷取的存取金鑰和私密金鑰對是否已儲存至 `~/.aws/credentials` 檔案。

1. 使用以下引數執行 `StockTradeWriter` 類別：

   ```
   StockTradeStream us-west-2
   ```

   如果您是在 `us-west-2` 以外的區域建立串流，則此處必須改為指定該區域。

您應該會看到類似下列的輸出：

```
Feb 16, 2015 3:53:00 PM  
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM 
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
```

您的股票交易串流現在正由 Kinesis Data Streams 擷取中。

## 後續步驟
<a name="tutorial-stock-data-kplkcl-producer-next"></a>

[實作消費者](tutorial-stock-data-kplkcl-consumer.md)