Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Implementar el productor
La aplicación del Tutorial: Procesar operaciones bursátiles en tiempo real con KPL y KCL 1.x utiliza un escenario real de monitorización del mercado bursátil. Los siguientes principios explicar brevemente la forma en que este escenario se asocia con la estructura del productor y el código de apoyo.
Consulte el código fuente y revise la siguiente información.
- StockTrade clase
-
Una operación bursátil está representada por una instancia de la clase
StockTrade. Esta instancia contiene atributos como el símbolo de cotización, el precio, el número de acciones, el tipo de transacción (compra o venta) y un ID que identifica de forma exclusiva la transacción. Esta clase se implementa por usted. - Registro de la secuencia
-
Una secuencia es una serie de registros. Un registro es la sucesión en serie de una instancia de
StockTradeen formato JSON. Por ejemplo:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 } - StockTradeGenerator clase
-
StockTradeGeneratortiene un método denominadogetRandomTrade()que proporciona una nueva operación bursátil generada aleatoriamente cada vez que se invoca. Esta clase se implementa por usted. - StockTradesWriter clase
-
El método
maindel productor,StockTradesWriterrecupera continuamente una operación aleatoria y luego la envía a Kinesis Data Streams mediante la ejecución de las siguientes tareas:-
Lee los nombres de la secuencia y la región como entrada.
-
Crea un
AmazonKinesisClientBuilder. -
Utiliza el compilador de clientes para establecer la región, las credenciales y la configuración de cliente.
-
Crea un cliente de
AmazonKinesismediante el compilador de clientes. -
Comprueba que la secuencia existe y está activa (si no, sale con un error).
-
En un bucle continuo, llama al método
StockTradeGenerator.getRandomTrade()y después al métodosendStockTradepara enviar la transacción a la secuencia cada 100 milisegundos.
El método
sendStockTradede la claseStockTradesWritertiene el siguiente código:private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }Consulte el siguiente desglose del código:
-
La API de
PutRecordespera una matriz de bytes, y debe convertir eltradea formato JSON. Esta única línea de código realiza esa operación:byte[] bytes = trade.toJsonAsBytes(); -
Antes de poder enviar la transacción, debe crear una nueva instancia de
PutRecordRequest(denominadaputRecorden este caso):PutRecordRequest putRecord = new PutRecordRequest();Cada llamada a
PutRecordrequiere el nombre de la secuencia, la clave de partición y el blob de datos. El siguiente código rellenará estos campos en el objetoputRecordutilizando sus métodossetXxxx():putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));El ejemplo utiliza un ticker como clave de partición, que asigna el registro a una partición específica. En la práctica, debería tener cientos o miles de claves de partición por fragmento, de forma que los registros se dispersen de forma uniforme en toda la secuencia. Para obtener más información acerca de cómo agregar datos a una secuencia, consulte Agregar datos a un flujo.
Ahora
putRecordestará listo para el envío al cliente (la operaciónput):kinesisClient.putRecord(putRecord); -
Siempre es útil agregar funciones de comprobación y registro de errores. Este código registra las condiciones de error:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }Añada el try/catch bloque alrededor de la
putoperación:try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }Esto se debe a que una operación
putde Kinesis Data Streams puede fallar debido a un error de red o debido a que el flujo alcanza sus límites de rendimiento y se ve limitado. Le recomendamos comprobar detalladamente su política de reintentos para las operaciones deputde modo que evite pérdida de datos, por ejemplo, al utilizar un reintento. -
El registro de estado resulta útil, pero es opcional:
LOG.info("Putting trade: " + trade.toString());
El productor que se muestra aquí utiliza la funcionalidad de registro único de la API de Kinesis Data Streams,
PutRecord. En la práctica, si un solo productor genera una gran cantidad de registros, suele ser más eficaz utilizar la funcionalidad de varios registros dePutRecordsy enviar lotes de registros de una vez. Para obtener más información, consulte Agregar datos a un flujo. -
Para ejecutar el productor
-
Compruebe que la clave de acceso y el par de claves secretas recuperadas anteriormente (al crear el usuario de IAM) se guardaron en el archivo
~/.aws/credentials. -
Ejecute la clase
StockTradeWritercon los siguientes argumentos:StockTradeStream us-west-2Si ha creado su secuencia en una región diferente a
us-west-2tendrá que especificar esa región aquí.
Debería ver una salida similar a esta:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
El flujo de operaciones bursátiles está siendo adquirido por Kinesis Data Streams.