

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Implementa il consumatore
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

L'applicazione consumer in questo tutorial elabora continuamente le operazioni azionarie nel flusso di dati. Quindi, genera i titoli più acquistati e venduti ogni minuto. L'applicazione si basa sulla Kinesis Client Library (KCL), che esegue numerose delle attività impegnative comuni alle applicazioni consumer. Per ulteriori informazioni, consulta [Informazioni su KCL 1.x e 2.x](shared-throughput-kcl-consumers.md). 

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

**StockTradesProcessor classe**  
La classe principale del consumatore, fornita per te, che svolge le seguenti attività:  
+ Legge l'applicazione, il flusso di dati e i nomi delle regioni, passati come argomenti.
+ Crea un'`KinesisAsyncClient`istanza con il nome della regione.
+ Crea un'istanza `StockTradeRecordProcessorFactory` che serve istanze di `ShardRecordProcessor`, implementate da un'istanza `StockTradeRecordProcessor`. 
+ Crea un'`ConfigsBuilder`istanza con l'`StockTradeRecordProcessorFactory`istanza `KinesisAsyncClient` `StreamName``ApplicationName`,, e. Questo è utile per creare tutte le configurazioni con valori predefiniti.
+ Crea uno scheduler KCL (in precedenza, nelle versioni di KCL 1.x era noto come worker KCL) con l'istanza `ConfigsBuilder`. 
+ Lo scheduler crea un nuovo thread per ciascun shard (assegnato a questa istanza consumer), che in un ciclo continuo legge i record dai flussi di dati. Quindi invoca l'istanza `StockTradeRecordProcessor` per elaborare ogni batch di record ricevuto. 

**StockTradeRecordProcessor classe**  
Implementazione dell'istanza `StockTradeRecordProcessor`, che a sua volta implementa cinque metodi richiesti: `initialize`, `processRecords`, `leaseLost`, `shardEnded` e `shutdownRequested`.   
I metodi `initialize` e `shutdownRequested` vengono utilizzati da KCL per consentire all'elaboratore di record di sapere quando dovrebbe essere pronto a iniziare a ricevere record e quando dovrebbe aspettarsi di non ricevere più record, in modo da poter effettuare qualsiasi attività di configurazione e cessazione specifica per l'app. `leaseLost` e `shardEnded` sono utilizzati per implementare qualsiasi logica su cosa fare quando un lease viene perso o quando una elaborazione ha raggiunto la fine del frammento. In questo esempio, registriamo semplicemente i messaggi che indicano questi eventi.   
Ti forniamo il codice per questi metodi. L'elaborazione principale si verifica nel metodo `processRecords`, che a sua volta utilizza `processRecord` per ogni record. Quest'ultimo metodo viene fornito come codice di base per lo più vuoto da implementare nella fase successiva, dove è spiegato in modo dettagliato.   
Da segnalare è anche l'implementazione dei metodi di supporto per `processRecord`, ovvero `reportStats` e `resetStats`, che sono vuoti nel codice sorgente originale.   
Il metodo `processRecords` viene implementato per te ed esegue questa procedura:  
+ Per ogni record passato, chiama `processRecord` su di esso. 
+ Se è trascorso almeno 1 minuto dall'ultimo report, chiama `reportStats()`, che consente di stampare le statistiche più recenti, seguito da `resetStats()`, che cancella le statistiche in modo che l'intervallo successivo includa solo i nuovi record.
+ Imposta l'orario della creazione di report successiva.
+ Se è trascorso almeno 1 minuto dall'ultimo checkpoint, chiama `checkpoint()`. 
+ Imposta l'orario della creazione di checkpoint successiva.
Questo metodo utilizza intervalli di 60 secondi per la frequenza di creazione di report e checkpoint. Per ulteriori informazioni sul checkpointing, consulta [Utilizzo della Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html). 

**StockStats classe**  
Questa classe fornisce la conservazione dei dati e il monitoraggio delle statistiche per i titoli più comuni nel tempo. Questo codice viene fornito per te e include i seguenti metodi:  
+ `addStockTrade(StockTrade)`: inserisce un dato `StockTrade` nelle statistiche in esecuzione.
+ `toString()`: restituisce le statistiche in una stringa formattata.
Questa classe tiene traccia delle azioni più popolari tenendo un conteggio progressivo del numero totale di negoziazioni per ogni azione e del conteggio massimo. Aggiorna questi conteggi ogni volta che si verifica uno scambio.

Aggiungi codice ai metodi della classe `StockTradeRecordProcessor`, come mostrato nella procedura seguente. 

**Per implementare il consumer**

1. Implementare il metodo `processRecord` creando un'istanza di un oggetto `StockTrade` delle dimensioni corrette e aggiungendo a essa i dati del record, registrando un avviso se si verifica un problema. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implementa un `reportStats` metodo. Modifica il formato di output in base alle tue preferenze. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implementare il metodo `resetStats`, che crea una nuova istanza `stockStats`. 

   ```
   stockStats = new StockStats();
   ```

1. Implementa i seguenti metodi richiesti dall'`ShardRecordProcessor`interfaccia:

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**Per eseguire il consumer**

1. Eseguire il producer scritto in [[Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md)Implementa il produttore](tutorial-stock-data-kplkcl2-producer.md) per inserire record di scambi simulati nel flusso.

1. Verifica che la coppia chiave di accesso e chiave segreta recuperata durante la creazione dell'utente IAM sia stata salvata nel file `~/.aws/credentials`. 

1. Eseguire la classe `StockTradesProcessor` con i seguenti argomenti:

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Nota: se è stato creato un flusso in una regione diversa da `us-west-2`, è necessario specificare quella regione qui.

Dopo un minuto, si dovrebbe visualizzare un output come il seguente, aggiornato ogni minuto:

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Fasi successive
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Facoltativo) Estendi il consumatore](tutorial-stock-data-kplkcl2-consumer-extension.md)