

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Implementar o consumidor
<a name="tutorial-stock-data-kplkcl-consumer"></a>

A aplicação de consumo no [Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x[Tutorial: Processar dados de ações em tempo real usando a KPL e a KCL 1.x](tutorial-stock-data-kplkcl.md)](tutorial-stock-data-kplkcl.md) processa continuamente o fluxo de negociações de ações criado em [[Implementar o produtor](tutorial-stock-data-kplkcl-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl-producer.md). Em seguida, ele produz as ações mais populares compradas e vendidas a cada minuto. A aplicação é compilada com base na Kinesis Client Library (KCL), que faz grande parte do trabalho pesado comum às aplicações de consumo. Para obter mais informações, consulte [Desenvolver aplicações de consumo da KCL 1.x](developing-consumers-with-kcl.md). 

Consulte o código-fonte e analise as informações a seguir.

**StockTradesProcessor classe**  
Principal classe do consumidor fornecida e que executa as seguintes tarefas:  
+ Lê o aplicativo, o fluxo e os nomes de região passados como argumentos.
+ Lê credenciais de `~/.aws/credentials`.
+ Cria uma instância de `RecordProcessorFactory` que veicula instâncias de `RecordProcessor`, implementadas por uma instância de `StockTradeRecordProcessor`.
+ Cria um operador da KCL com a instância `RecordProcessorFactory` e uma configuração padrão que inclui o nome do fluxo, as credenciais e o nome da aplicação. 
+ O operador cria um novo thread para cada fragmento (atribuído a essa instância de consumidor), que opera em loops contínuos para ler registros do Kinesis Data Streams. Em seguida, ele invoca a instância de `RecordProcessor` para processar cada lote de registros recebidos.

**StockTradeRecordProcessor classe**  
Implementação da instância de `RecordProcessor`, que, por sua vez, implementa três métodos necessários: `initialize`, `processRecords` e `shutdown`.  
Como os nomes sugerem, `initialize` e `shutdown` são usados pela Kinesis Client Library para permitir que o processador de registros saiba quando deve estar pronto para começar a receber registros e quando deve esperar parar de receber registros, respectivamente, para poder realizar tarefas de configuração e encerramento específicas da aplicação. Este código é fornecido para você. O processamento principal ocorre no método `processRecords`, que, por sua vez, usa `processRecord` para cada registro. Esse último método é fornecido como um código esqueleto quase todo vazio, para implementação na próxima etapa, onde é melhor explicado.  
Observe também a implementação dos métodos de suporte de `processRecord`: `reportStats` e `resetStats`, que estão vazios no código-fonte original.  
O método `processRecords`, implementado previamente, executa as seguintes etapas:  
+  Para cada registro passado, chama `processRecord`.
+ Se pelo menos 1 minuto houver decorrido após o último relatório, chamará `reportStats()`, que imprime as estatísticas mais recentes e, em seguida, `resetStats()`, que limpa as estatísticas para que o próximo intervalo inclua apenas registros novos.
+ Define o próximo horário para geração de relatórios.
+ Se houver decorrido pelo menos 1 minuto após o último ponto de verificação, chamará `checkpoint()`. 
+ Define o próximo horário do ponto de verificação.
Este método usa intervalos de 60 segundos como taxa de geração de relatórios e definição de pontos de verificação. Para obter mais informações sobre definição de pontos de verificação, consulte [Informações adicionais sobre o consumidor](#tutorial-stock-data-kplkcl-consumer-supplement).

**StockStats classe**  
Essa classe fornece retenção de dados e rastreamento de estatísticas em relação às ações mais populares ao longo do tempo. Esse código é fornecido e contém os seguintes métodos:  
+ `addStockTrade(StockTrade)`: injeta o `StockTrade` conhecido nas estatísticas correntes.
+ `toString()`: retorna as estatísticas em uma string formatada.
Essa classe rastreia as ações mais populares mantendo uma contagem corrente do número total de negociações de cada ação e a contagem máxima. Ela atualiza essas contagens sempre que chega uma negociação de ação.

Adicione código aos métodos da classe `StockTradeRecordProcessor`, como mostrado nas etapas a seguir.

**Como implementar o consumidor**

1. Implemente o método `processRecord` instanciando um objeto `StockTrade` de tamanho correto e adicionando a ele os dados do registro, registrando um aviso caso ocorra problema.

   ```
   StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array());
   if (trade == null) {
       LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey());
       return;
   }
   stockStats.addStockTrade(trade);
   ```

1. Implemente um método `reportStats` simples. Sinta-se à vontade para modificar o formato de saída conforme suas preferências.

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
                      stockStats + "\n" +
                      "****************************************************************\n");
   ```

1. Finalmente, implemente o método `resetStats`, que cria uma nova instância de `stockStats`.

   ```
   stockStats = new StockStats();
   ```

**Como executar o consumidor**

1. Execute a aplicação de produção escrita em [[Implementar o produtor](tutorial-stock-data-kplkcl-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl-producer.md) para injetar registros de negociações de ações no fluxo.

1. Verifique se o par de chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo `~/.aws/credentials`. 

1. Execute a classe `StockTradesProcessor` com os seguintes argumentos:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Observe que, ao criar o fluxo em uma região diferente de `us-west-2`, é necessário especificar essa região aqui.

Depois de um minuto, deverá aparecer uma saída como a seguir, atualizada a cada minuto a partir de então:

```
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Informações adicionais sobre o consumidor
<a name="tutorial-stock-data-kplkcl-consumer-supplement"></a>

Se já houver familiaridade com as vantagens da Kinesis Client Library, abordada em [Desenvolver aplicações de consumo da KCL 1.x](developing-consumers-with-kcl.md) e em outros documentos, poderá haver algum quesitonamento sobre usá-la aqui. Mesmo usando apenas um fluxo de fragmento e uma instância de consumidor para processá-lo, é mais fácil implementar o consumidor usando a KCL. Compare as etapas de implementação do código na seção do produtor para o consumidor para ver a facilidade comparativa para implementar um consumidor. Isso se deve, em grande parte, aos serviços que a KCL fornece.

Nessa aplicação, cencentre-se na implementação de uma classe de processador de registros, capaz de processar registros individuais. Não é necessário se preocupar com a forma como os registros são obtidos do Kinesis Data Streams. A KCL obtém os registros e invoca o processador de registros sempre que há novos registros disponíveis. Além disso, não é necessário se preocupar com a quantidade de fragmentos e de instâncias de consumidor. Se o fluxo for escalonado, não é necessário reescrever o aplicativo para lidar com mais de um fragmento ou com uma instância de uma aplicação de consumo.

O termo *ponto de verificação* significa registrar o ponto no fluxo até os registros de dados que foram consumidos e processados até o momento. Se o aplicativo falhar, o fluxo será lido a partir desse ponto e não do início do fluxo. O assunto da definição de pontos de verificação e os vários padrões de design e melhores práticas relativos estão fora do escopo deste capítulo. No entanto, é algo que pode ser encontrado em ambientes de produção.

Conforme visto no [[Implementar o produtor](tutorial-stock-data-kplkcl-producer.md)Implementar o produtor](tutorial-stock-data-kplkcl-producer.md), as operações `put` na API do Kinesis Data Streams usam uma *chave de partição* como entrada. O Kinesis Data Streams usa uma chave de partição como um mecanismo para dividir registros em vários fragmentos (quando há mais de um fragmento no fluxo). A mesma chave de partição sempre roteia para o mesmo fragmento. Isso permite que o consumidor que processa um determinado fragmento seja projetado com a premissa de que os registros com a mesma chave de partição só sejam enviados a esse consumidor, e nenhum registro com a mesma chave de partição termine em qualquer outro consumidor. Portanto, o operador de um consumidor pode agregar todos os registros com a mesma chave de partição sem se preocupar com a ausência de dados necessários.

Nesta aplicação, como o processamento de registros do consumidor não é intensivo, é possível usar um fragmento e fazer o processamento no mesmo thread da KCL. No entanto, na prática, considere primeiro escalar o número de fragmentos. Em alguns casos, talvez convenha mudar o processamento para outro thread ou usar um grupo de threads se for esperado que o processamento de registros seja intensivo. Dessa forma, a KCL pode obter novos registros mais rapidamente, enquanto outros threads podem processar os registros em paralelo. O design multithread não é trivial e deve ser planejado com técnicas avançadas, portanto, aumentar a contagem de fragmentos costuma ser a maneira mais eficiente de escalar.

## Próximas etapas
<a name="tutorial-stock-data-kplkcl-consumer-next"></a>

[(Opcional) Estender o consumidor](tutorial-stock-data-kplkcl-consumer-extension.md)