

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# コンシューマーを実装する
<a name="tutorial-stock-data-kplkcl-consumer"></a>

[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する[チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) のコンシューマーアプリケーションでは、[[プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)で作成した株式取引ストリームを継続的に処理します。その後、1 分ごとに売買されている最も人気のある株式を出力します。このアプリケーションは、Kinesis Client Library (KCL) 上に構築されており、コンシューマーアプリケーションに共通する面倒な作業の多くを行います。詳細については、[KCL 1.x コンシューマーを開発する](developing-consumers-with-kcl.md)を参照してください。

ソースコードを参照し、次の情報を確認してください。

**StockTradesProcessor クラス**  
事前に用意されているコンシューマーのメインクラスで、次のタスクを実行します。  
+ 引数として渡されたアプリケーション、ストリーム、およびリージョン名を読み取ります。
+ `~/.aws/credentials` から認証情報を読み取ります。
+ `RecordProcessor` のインスタンスとして機能し、`StockTradeRecordProcessor` インスタンスによって実装される、`RecordProcessorFactory` インスタンスを作成します。
+ `RecordProcessorFactory` インスタンスおよび標準設定 (例: ストリーム名、認証情報、アプリケーション名) が指定された KCL ワーカーを作成します。
+ このワーカーは、(このコンシューマーインスタンスに割り当てられた) 各シャードに新しいスレッドを作成します。これにより、継続的に Kinesis Data Streams からレコードが読み取られます。次に、`RecordProcessor` インスタンスを呼び出して、受信したレコードのバッチを処理します。

**StockTradeRecordProcessor クラス**  
`RecordProcessor` インスタンスを実装したら、次に `initialize`、`processRecords`、`shutdown` の 3 つの必須メソッドを実装します。  
Kinesis Client Library によって使用される `initialize` および `shutdown` は、名前が示すとおり、レコードの受信がいつ開始し、いつ終了するかをレコードプロセッサに知らせます。これにより、レコードプロセッサは、アプリケーションに固有の設定および終了タスクを行うことができます。これらのコードは事前に用意されています。主な処理は `processRecords` メソッドで行われ、そこでは各レコードの `processRecord` が使用されます。後者のメソッドは、ほとんどの場合、空のスケルトンコードとして提供されます。次のステップでは、これを実装する方法について説明します。詳細は、次のステップを参照してください。  
また、`processRecord` のサポートメソッドである `reportStats` および `resetStats` の実装にも注目してください。これらのメソッドは、元のソースコードでは空になっています。  
`processRecords` メソッドは既に実装されており、次のステップを実行します。  
+  渡された各レコードについて、レコード上で `processRecord` を呼び出します。
+ 最後のレポートから 1 分間以上経過した場合は、`reportStats()` を呼び出して最新の統計を出力し、次の間隔に新しいレコードのみ含まれるように `resetStats()` を呼び出して統計を消去します。
+ 次のレポート時間を設定します。
+ 最後のチェックポイントから 1 分間以上経過した場合は、`checkpoint()` を呼び出します。
+ 次のチェックポイント時間を設定します。
このメソッドでは、60 秒間間隔でレポートおよびチェックポイント時間が設定されています。チェックポイントの詳細については、[コンシューマーに関する追加情報](#tutorial-stock-data-kplkcl-consumer-supplement)を参照してください。

**StockStats クラス**  
このクラスでは、データを保持し、最も人気のある株式の経時的な統計を示すことができます。このコードは、事前に用意されており、次のメソッドが含まれています。  
+ `addStockTrade(StockTrade)`: 指定された `StockTrade` を実行中の統計に取り込みます。
+ `toString()`: 特定の形式の文字列として統計を返します。
このクラスは、各株式の合計取引数と最大取引数を継続的にカウントすることで、最も人気のある株式を追跡します。これらの数は、株式取引を受け取る度に更新されます。

次のステップに示されているコードを `StockTradeRecordProcessor` クラスのメソッドに追加します。

**コンシューマーを実装するには**

1. `processRecord` メソッドを実装するには、サイズの正しい `StockTrade` オブジェクトを開始し、それにレコードデータを追加します。また、問題が発生した場合に警告がログに記録されるようにします。

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. 簡単な `reportStats` メソッドを実装します。出力形式は好みに応じて自由に変更することができます。

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. 最後に、新しい `stockStats` インスタンスを作成する `resetStats` メソッドを実装します。

   ```
   stockStats = new StockStats();
   ```

**コンシューマーを実行するには**

1. [[プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md) で記述したプロデューサーを実行し、シミュレートした株式取引レコードをストリームに取り込みます。

1. 前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアがファイル `~/.aws/credentials` に保存されていることを確認します。

1. 次の引数を指定して `StockTradesProcessor` クラスを実行します。

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   `us-west-2` 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

1 分後、次のような出力が表示されます。その後、1 分間ごとに出力が更新されます。

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## コンシューマーに関する追加情報
<a name="tutorial-stock-data-kplkcl-consumer-supplement"></a>

[KCL 1.x コンシューマーを開発する](developing-consumers-with-kcl.md)などで説明されている Kinesis Client Library のメリットに詳しい方であれば、ここで使用することに疑問を感じるかもしれません。1 つのシャードストリームとそれを処理する 1 つのコンシューマーインスタンスしか使用しない場合でも、KCL を使用して簡単にコンシューマーを実装することができます。プロデューサーセクションとコンシューマーセクションのコードの実装手順を比較すると、コンシューマーの実装の方が比較的に簡単であることがわかります。これは、KCL で提供されているサービスが大きく関係しています。

このアプリケーションでは、個別のレコードを処理できるレコードプロセッサクラスの実装に焦点を合わせてきました。新しいレコードが使用可能になると、KCL がレコードを取得してレコードプロセッサを呼び出すため、Kinesis Data Streams からレコードを取得する方法を心配しなくて済みます。また、シャードカウントやコンシューマーインスタンス数についても心配しなくて済みます。ストリームがスケールアップされても、複数のシャードやコンシューマーインスタンスを処理するためにアプリケーションを書き直す必要はありません。

チェックポイントという用語は、ストリーム内のポイントを、これまで消費および処理されたデータレコードまで記録することを意味します。**アプリケーションがクラッシュすると、ストリームはストリームの先頭からではなく、その時点から読み取られます。チェックポイントやそのさまざまな設計パターン、およびベストプラクティスは、この章の範囲外です。ただし、本番環境ではこのような問題に直面することがあります。

[[プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)プロデューサーを実装する](tutorial-stock-data-kplkcl-producer.md)で学習したように、Kinesis Data Streams API の `put` オペレーションは、*パーティションキー*を入力として受け取ります。Kinesis Data Streams は、レコードを複数のシャードに分割するメカニズムとしてパーティションキーを使用します (複数のシャードがストリームに含まれる場合)。同じパーティションキーは、常に同じシャードにルーティングされます。このため、同じパーティションキーを持つレコードはそのコンシューマーにのみ送信され、他のコンシューマーに送信されることはないと仮定して、特定のシャードを処理するコンシューマーを設計できます。したがって、コンシューマーのワーカーは、必要なデータが欠落しているかもしれないと心配することなく、同じパーティションキーを持つすべてのレコードを集計できます。

このアプリケーションでは、コンシューマーによるレコードの処理の負荷は高くないため、1 つのシャードを使用して、KCL スレッドと同じスレッドで処理することができます。ただし、実際には、まずシャードの数のスケールアップを検討します。レコードの処理が大変になることが予想される場合は、異なるスレッドに処理を切り替えたり、スレッドプールを使用したりする必要があるかもしれません。このように、その他のスレッドがレコードを並列処理していても、KCL は新しいレコードを迅速に取得できます。一般的に、マルチスレッド設計は簡単ではなく高度な技術が必要になるため、シャードの数を増やすことが最も効果的な拡張方法です。

## 次の手順
<a name="tutorial-stock-data-kplkcl-consumer-next"></a>

[（オプション) コンシューマーを拡張する](tutorial-stock-data-kplkcl-consumer-extension.md)