

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Mettre en œuvre le consommateur
<a name="tutorial-stock-data-kplkcl2-consumer"></a>

L'application consommateur de ce didacticiel traite en continu les transactions boursières dans votre flux de données. Elle génère ensuite les actions les plus populaires achetées et vendues toutes les minutes. L'application est créée au niveau supérieur de Kinesis Client Library (KCL), qui effectue une grande partie de la lourde charge qui pèse couramment sur les applications consommateur. Pour de plus amples informations, veuillez consulter [Informations sur KCL 1.x et 2.x](shared-throughput-kcl-consumers.md). 

Reportez-vous au code source et vérifiez les informations suivantes.

**StockTradesProcessor classe**  
La classe principale du consommateur, fournie pour vous, qui exécute les tâches suivantes :  
+ Lit les noms de l'application, du flux de données et des régions, transmis sous forme d'arguments.
+ Crée une `KinesisAsyncClient` instance avec le nom de la région.
+ Crée une instance `StockTradeRecordProcessorFactory` qui sert les instances de `ShardRecordProcessor`, implémentée par une instance `StockTradeRecordProcessor`. 
+ Crée une `ConfigsBuilder` instance avec l'`StockTradeRecordProcessorFactory`instance `KinesisAsyncClient` `StreamName``ApplicationName`,, et. Ceci est utile pour créer toutes les configurations avec des valeurs par défaut.
+ Crée un planificateur KCL (précédemment, dans les versions 1.x de KCL, il portait le nom d’application de travail KCL) avec l'instance `ConfigsBuilder`. 
+ Le planificateur crée un nouveau thread pour chaque partition (affectée à cette instance de consommateur), qui fonctionne en boucle pour lire des enregistrements dans le flux de données. Elle appelle alors l'instance `StockTradeRecordProcessor` pour traiter chaque lot d'enregistrements reçu. 

**StockTradeRecordProcessor classe**  
Implémentation de l'instance `StockTradeRecordProcessor`, qui implémente à son tour cinq méthodes requises : `initialize`, `processRecords`, `leaseLost`, `shardEnded` et `shutdownRequested`.   
Les méthodes `initialize` et `shutdownRequested` sont utilisées par le KCL pour indiquer au processeur d'enregistrements quand il doit être prêt à commencer à recevoir des enregistrements et quand il devrait s'attendre à cesser de recevoir des enregistrements, respectivement, afin qu'il puisse effectuer des tâches de configuration et de terminaison spécifiques à l'application. `leaseLost` et `shardEnded` sont utilisés pour implémenter n'importe quelle logique pour ce qu'il faut faire lorsqu'un bail est perdu ou qu'un traitement a atteint la fin d'une partition. Dans cet exemple, nous enregistrons simplement les messages indiquant ces événements.   
Le code de ces méthodes est fourni. Le traitement principal se déroule dans la méthode `processRecords`, qui utilise à son tour `processRecord` pour chaque enregistrement. Cette dernière méthode est fournie la plupart du temps sous forme de squelette de code vide à implémenter à l'étape suivante qui fournit des explications détaillées.   
Notez également l'implémentation des méthodes d'assistance pour `processRecord` : `reportStats` et `resetStats`, qui sont vides dans le code source d'origine.   
La méthode `processRecords` est implémentée pour vous et effectue les opérations suivantes :  
+ Pour chaque enregistrement passé, il appelle `processRecord` dessus. 
+ Si au moins 1 minute s'est écoulée depuis le dernier rapport, il appelle `reportStats()`, qui imprime les dernières statistiques, puis de `resetStats()`, qui efface les statistiques afin que l'intervalle suivant n'inclut que les nouveaux enregistrements.
+ Règle l'heure du rapport suivant.
+ Si au moins 1 minute s'est écoulée depuis le dernier point de contrôle, il appelle `checkpoint()`. 
+ Règle l'heure du point de contrôle suivante.
Cette méthode utilise des intervalles de 60 secondes pour la fréquence de création de rapports et de point de contrôle. Pour plus d'informations sur les points de contrôle, consultez la section [Utilisation de Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html) (français non garanti). 

**StockStats classe**  
Cette classe prend en charge la conservation des données et le suivi des statistiques dans le temps pour les actions les plus populaires. Ce code est fourni pour vous et contient les méthodes suivantes :  
+ `addStockTrade(StockTrade)` : Injecte le `StockTrade` donné dans les statistiques en cours d'exécution.
+ `toString()` : renvoie les statistiques dans une chaîne formatée.
Cette classe suit les actions les plus populaires en comptabilisant le nombre total de transactions pour chaque action et le nombre maximum. Elle met à jour ces chiffres chaque fois qu'une opération boursière arrive.

Ajoutez du code aux méthodes de la classe `StockTradeRecordProcessor`, comme l'illustrent les étapes suivantes. 

**Pour implémenter l'application consommateur**

1. Implémentez la méthode `processRecord` en instanciant un objet `StockTrade` correctement dimensionné et en lui ajoutant les données d'enregistrement, puis en consignant un avertissement en cas de problème. 

   ```
   byte[] arr = new byte[record.data().remaining()];
   record.data().get(arr);
   StockTrade trade = StockTrade.fromJsonAsBytes(arr);
       if (trade == null) {
           log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey());
           return;
           }
   stockStats.addStockTrade(trade);
   ```

1. Implémentez une `reportStats` méthode. Modifiez le format de sortie en fonction de vos préférences. 

   ```
   System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" +
   stockStats + "\n" +
   "****************************************************************\n");
   ```

1. Implémentez la méthode `resetStats`, qui crée une nouvelle instance `stockStats`. 

   ```
   stockStats = new StockStats();
   ```

1. Implémentez les méthodes suivantes requises par `ShardRecordProcessor` l'interface :

   ```
   @Override
   public void leaseLost(LeaseLostInput leaseLostInput) {
       log.info("Lost lease, so terminating.");
   }
   
   @Override
   public void shardEnded(ShardEndedInput shardEndedInput) {
       try {
           log.info("Reached shard end checkpointing.");
           shardEndedInput.checkpointer().checkpoint();
       } catch (ShutdownException | InvalidStateException e) {
           log.error("Exception while checkpointing at shard end. Giving up.", e);
       }
   }
   
   @Override
   public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
       log.info("Scheduler is shutting down, checkpointing.");
       checkpoint(shutdownRequestedInput.checkpointer());
   }
   
   private void checkpoint(RecordProcessorCheckpointer checkpointer) {
       log.info("Checkpointing shard " + kinesisShardId);
       try {
           checkpointer.checkpoint();
       } catch (ShutdownException se) {
           // Ignore checkpoint if the processor instance has been shutdown (fail over).
           log.info("Caught shutdown exception, skipping checkpoint.", se);
       } catch (ThrottlingException e) {
           // Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
           log.error("Caught throttling exception, skipping checkpoint.", e);
       } catch (InvalidStateException e) {
           // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
           log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
       }
   }
   ```

**Pour exécuter l'application consommateur**

1. Exécutez l'application producteur que vous avez écrite dans [[Implémenter le producteur](tutorial-stock-data-kplkcl2-producer.md)Implémenter le producteur](tutorial-stock-data-kplkcl2-producer.md) pour injecter des enregistrements d'opérations boursières simulées dans votre flux.

1. Vérifiez que la paire de clé d'accès et de clé secrète extraites précédemment (lors de la création de l'utilisateur IAM) sont enregistrées dans le fichier `~/.aws/credentials`. 

1. Exécutez la classe `StockTradesProcessor` avec les arguments suivants :

   ```
   StockTradesProcessor StockTradeStream us-west-2
   ```

   Notez que, si vous avez créé votre flux dans une région autre que `us-west-2`, vous devez spécifier cette région ici.

Au bout d'une minute, vous devez voir une sortie similaire à la suivante, actualisée toutes les minutes :

```
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
```

## Étapes suivantes
<a name="tutorial-stock-data-kplkcl2-consumer-next"></a>

[(Facultatif) Étendre le consommateur](tutorial-stock-data-kplkcl2-consumer-extension.md)