As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Implementar o produtor
O aplicativo no Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x usa o cenário real de monitoramento de negociações em bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de apoio.
Consulte o código-fonte e analise as informações a seguir.
- StockTrade classe
-
Uma negociação de ação individual é representada por uma instância da classe
StockTrade. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é previamente implementada. - Registro de fluxo
-
Um fluxo é uma sequência de registros. Um registro é uma serialização de uma instância
StockTradeno formato JSON. Por exemplo:{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 } - StockTradeGenerator classe
-
StockTradeGeneratortem um método denominadogetRandomTrade(), que retorna uma nova negociação de ações gerada aleatoriamente sempre que ela é invocada. Essa classe é previamente implementada. - StockTradesWriter classe
-
O método
maindo produtor,StockTradesWriter, recupera continuamente uma negociação aleatória e a envia ao Kinesis Data Streams executando as seguintes tarefas:-
Lê o nome do fluxo e o nome da região como entrada.
-
Cria um
AmazonKinesisClientBuilder. -
Usa o criador do cliente para definir região, credenciais e configuração do cliente.
-
Cria um cliente
AmazonKinesisusando o criador do cliente. -
Verifica se o stream existe e está ativo (se não, ele será encerrado com um erro).
-
Em um loop contínuo, chama o método
StockTradeGenerator.getRandomTrade()e o métodosendStockTradepara enviar a negociação ao stream a cada 100 milissegundos.
O método
sendStockTradeda classeStockTradesWritertem o seguinte código:private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }Consulte o desmembramento do código a seguir:
-
A API de
PutRecordespera uma matriz de bytes, e é necessário convertertradepara o formato JSON. Essa única linha de código executa a seguinte operação:byte[] bytes = trade.toJsonAsBytes(); -
Antes de enviar a negociação, crie uma nova instância de
PutRecordRequest(denominadaputRecordneste caso):PutRecordRequest putRecord = new PutRecordRequest();Cada chamada a
PutRecordrequer o nome do fluxo, uma chave de partição e um blob de dados. O código a seguir preenche esses campos no objetoputRecordusando seus métodossetXxxx():putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));O exemplo usa um tíquete de ações como uma chave de partição, que mapeia o registro para um determinado fragmento. Na prática, deve haver centenas ou milhares de chaves de partição por fragmento, de forma que os registros sejam uniformemente disseminados no fluxo. Para obter mais informações sobre como adicionar dados a um fluxo, consulte Adicionar dados a um stream.
Agora
putRecordestá pronto para enviar para o cliente (operaçãoput):kinesisClient.putRecord(putRecord); -
A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro:
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }Adicione o try/catch bloco ao redor da
putoperação:try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }Isso ocorre porque uma operação
putdo Kinesis Data Streams pode falhar devido a um erro de rede ou porque o fluxo de dados atinge o limite de throughput e tem sua utilização controlada. Recomendamos considerar cuidadosamente sua política de tentativa para operaçõesputa fim de evitar perda de dados, usando como uma nova tentativa. -
O registro de status é útil mas opcional:
LOG.info("Putting trade: " + trade.toString());
O produtor mostrado aqui usa a funcionalidade de registro único da API do Kinesis Data Streams,
PutRecord. Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros dePutRecordse enviar lotes de registros por vez. Para obter mais informações, consulte Adicionar dados a um stream. -
Como executar o produtor
-
Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo
~/.aws/credentials. -
Execute a classe
StockTradeWritercom os seguintes argumentos:StockTradeStream us-west-2Se o fluxo foi criado em uma região diferente de
us-west-2, é necessário especificar essa região aqui.
A saída deve ser semelhante a:
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Seu fluxo de negociações de ações agora está sendo ingerido pelo Kinesis Data Streams.