翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
プロデューサーを実装する
チュートリアル: KPL と KCL 1.x を使用して株式データをリアルタイム処理する のアプリケーションでは、株式市場取引をモニタリングする実際のシナリオが使用されます。次の原理によって、このシナリオをプロデューサーおよびサポートコード構造にマッピングすることができます。
ソースコードを参照し、次の情報を確認してください。
- StockTrade クラス
-
株式取引は、
StockTradeクラスのインスタンスによって個別に表されます。このインスタンスには、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID などの属性が含まれます。このクラスは、既に実装されています。 - ストリームレコード
-
ストリームとは、一連のレコードのことです。レコードとは、JSON 形式による連続する
StockTradeインスタンスの 1 つを表しています。例:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 } - StockTradeGenerator クラス
-
StockTradeGeneratorには、呼び出されるたびにランダムに生成された新しい株式取引を返す、getRandomTrade()と呼ばれるメソッドが含まれています。このクラスは、既に実装されています。 - StockTradesWriter クラス
-
プロデューサーの
mainメソッドであるStockTradesWriterは、継続的にランダム取引を取得し、以下のタスクを実行してそれらを Kinesis Data Streams に送信します。-
ストリーム名とリージョン名を入力として読み取ります。
-
AmazonKinesisClientBuilderを作成します。 -
クライアントビルダーを使用してリージョン、認証情報、およびクライアント構成を設定します。
-
クライアントビルダーを使用して
AmazonKinesisクライアントを構成します。 -
ストリームが存在し、アクティブであることを確認します (そうでない場合は、エラーで終了します)。
-
連続ループで、
StockTradeGenerator.getRandomTrade()メソッドに続きsendStockTradeメソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。
sendStockTradeクラスのStockTradesWriterメソッドには次のコードがあります。private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }次のコードの詳細を参照してください。
-
PutRecordAPI はバイト配列を想定するため、tradeを JSON 形式に変換する必要があります。この操作は、次の 1 行のコードによって行われます。byte[] bytes = trade.toJsonAsBytes(); -
取引を送信する前に、新しい
PutRecordRequestインスタンス (この場合、putRecordと呼ばれる) を作成する必要があります。PutRecordRequest putRecord = new PutRecordRequest();各
PutRecordの呼び出しには、ストリーム名、パーティションキー、およびデータ BLOB が必要です。次のコードによって、putRecordメソッドを使用して、これらのフィールドをsetXxxx()オブジェクトに追加します。putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));この例では、株式チケットをパーティションキーとして使用することで、レコードを特定のシャードにマッピングしています。実際には、レコードがストリーム全体に均等に分散するように、シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。ストリームにデータを追加する方法の詳細については、ストリームにデータを追加するを参照してください。
次に、
putRecordをクライアントに送信 (putオペレーション) することができます。kinesisClient.putRecord(putRecord); -
エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状態を記録します。
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }putオペレーションの前後に try/catch ブロックを追加します。try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }これは、ネットワークエラーや、ストリームがスループット制限を超えて抑制されたことが原因で、Kinesis Data Streams の
putオペレーションが失敗することがあるためです。データが失われることがないように、単純な再試行を使用するなど、putオペレーションの再試行ポリシーを慎重に検討することをお勧めします。 -
ステータスのログ記録は有益ですが、オプションです。
LOG.info("Putting trade: " + trade.toString());
ここに示されているプロデューサーでは、Kinesis Data Streams API のシングルレコード機能
PutRecordが使用されています。実際には、個々のプロデューサーで大量のレコードが生成される場合があります。その場合、PutRecordsのマルチレコード機能を使用して、レコードのバッチを一度に送信する方が効率的です。詳細については、ストリームにデータを追加するを参照してください。 -
プロデューサーを実行するには
-
前のステップ (IAM ユーザーを作成したとき) で取得したアクセスキーとシークレットキーのペアがファイル
~/.aws/credentialsに保存されていることを確認します。 -
次の引数を指定して
StockTradeWriterクラスを実行します。StockTradeStream us-west-2us-west-2以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。
次のような出力が表示されます:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Kinesis Data Streams によって株式取引ストリームが取り込まれます。