As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Implementar o consumidor
O aplicativo consumidor neste tutorial processa continuamente as transações de ações em seu fluxo de dados. Em seguida, ele produz as ações mais populares compradas e vendidas a cada minuto. A aplicação é compilada com base na Kinesis Client Library (KCL), que faz grande parte do trabalho pesado comum às aplicações de consumo. Para obter mais informações, consulte Informações sobre KCL 1.x e 2.x.
Consulte o código-fonte e analise as informações a seguir.
- StockTradesProcessor classe
-
A principal classe do consumidor fornecida e que executa as seguintes tarefas:
-
Lê o aplicativo, o fluxo de dados e os nomes de região passados como argumentos.
-
Cria uma instância de
KinesisAsyncClientcom o nome da região. -
Cria uma instância de
StockTradeRecordProcessorFactoryque veicula instâncias deShardRecordProcessor, implementadas por uma instância deStockTradeRecordProcessor. -
Cria uma instância de
ConfigsBuildercom a instância deKinesisAsyncClient,StreamName,ApplicationNameeStockTradeRecordProcessorFactory. Isso é útil para criar todas as configurações com valores padrão. -
Cria um programador da KCL (anteriormente, nas versões 1.x da KCL, era conhecido como o operador da KCL) com a instância de
ConfigsBuilder. -
O programador cria uma nova thread para cada fragmento (atribuído a essa instância de consumidor), que faz loop continuamente para ler registros do fluxo de dados. Em seguida, ele invoca a instância de
StockTradeRecordProcessorpara processar cada lote de registros recebidos.
-
- StockTradeRecordProcessor classe
-
Implementação da instância de
StockTradeRecordProcessor, que, por sua vez, implementa cinco métodos necessários:initialize,processRecords,leaseLost,shardEndedeshutdownRequested.Os métodos
initializeeshutdownRequestedsão usados pela KCL para permitir que o processador de registros saiba quando ele deve estar pronto para começar a receber registros e quando ele deve esperar parar de receber registros, respectivamente, para que ele possa executar qualquer configuração específica do aplicativo e tarefas de encerramento.leaseLosteshardEndedsão usados para implementar qualquer lógica para o que fazer quando um contrato de aluguel é perdido ou um processamento chegou ao fim de um fragmento. Neste exemplo, simplesmente registramos em log mensagens indicando esses eventos.O código para esses métodos é fornecido para você. O processamento principal ocorre no método
processRecords, que, por sua vez, usaprocessRecordpara cada registro. Esse último método é fornecido como o código esqueleto quase todo vazio, para que seja implementado na próxima etapa, onde é explicado em mais detalhes.Observe também a implementação dos métodos de suporte de
processRecord:reportStatseresetStats, que estão vazios no código-fonte original.O método
processRecords, implementado previamente, executa as seguintes etapas:-
Para cada registro passado, ele chama
processRecord. -
Se pelo menos 1 minuto houver decorrido após o último relatório, chamará
reportStats(), que imprime as estatísticas mais recentes e, em seguida,resetStats(), que limpa as estatísticas para que o próximo intervalo inclua apenas registros novos. -
Define o próximo horário para geração de relatórios.
-
Se houver decorrido pelo menos 1 minuto após o último ponto de verificação, chamará
checkpoint(). -
Define o próximo horário do ponto de verificação.
Este método usa intervalos de 60 segundos como taxa de geração de relatórios e definição de pontos de verificação. Para obter mais informações sobre pontos de verificação, consulte Using the Kinesis Client Library.
-
- StockStats classe
-
Essa classe fornece retenção de dados e rastreamento de estatísticas em relação às ações mais populares ao longo do tempo. Esse código é fornecido e contém os seguintes métodos:
-
addStockTrade(StockTrade): injeta oStockTradeconhecido nas estatísticas correntes. -
toString(): retorna as estatísticas em uma string formatada.
Essa classe rastreia as ações mais populares mantendo uma contagem corrente do número total de negociações de cada ação e a contagem máxima. Ela atualiza essas contagens sempre que chega uma negociação de ação.
-
Adicione código aos métodos da classe StockTradeRecordProcessor, como mostrado nas etapas a seguir.
Como implementar o consumidor
-
Implemente o método
processRecordinstanciando um objetoStockTradede tamanho correto e adicionando a ele os dados do registro, registrando um aviso caso ocorra problema.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade); -
Implemente um método
reportStats. Modifique o formato de saída para se adequar às suas preferências.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n"); -
Implemente o método
resetStats, que cria uma nova instância destockStats.stockStats = new StockStats(); -
Implemente os seguintes métodos exigidos pela interface
ShardRecordProcessor:@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Como executar o consumidor
-
Execute a aplicação de produção escrita em Implementar o produtor para injetar registros de negociações de ações no fluxo.
-
Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo
~/.aws/credentials. -
Execute a classe
StockTradesProcessorcom os seguintes argumentos:StockTradesProcessor StockTradeStream us-west-2Observe que, ao criar o fluxo em uma região diferente de
us-west-2, é necessário especificar essa região aqui.
Depois de um minuto, deverá aparecer uma saída como a seguir, atualizada a cada minuto a partir de então:
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Próximas etapas
(Opcional) Estender o consumidor