Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Mettre en œuvre le consommateur
L'application consommateur de ce didacticiel traite en continu les transactions boursières dans votre flux de données. Elle génère ensuite les actions les plus populaires achetées et vendues toutes les minutes. L'application est créée au niveau supérieur de Kinesis Client Library (KCL), qui effectue une grande partie de la lourde charge qui pèse couramment sur les applications consommateur. Pour de plus amples informations, veuillez consulter Informations sur KCL 1.x et 2.x.
Reportez-vous au code source et vérifiez les informations suivantes.
- StockTradesProcessor classe
-
La classe principale du consommateur, fournie pour vous, qui exécute les tâches suivantes :
-
Lit les noms de l'application, du flux de données et des régions, transmis sous forme d'arguments.
-
Crée une
KinesisAsyncClientinstance avec le nom de la région. -
Crée une instance
StockTradeRecordProcessorFactoryqui sert les instances deShardRecordProcessor, implémentée par une instanceStockTradeRecordProcessor. -
Crée une
ConfigsBuilderinstance avec l'StockTradeRecordProcessorFactoryinstanceKinesisAsyncClientStreamNameApplicationName,, et. Ceci est utile pour créer toutes les configurations avec des valeurs par défaut. -
Crée un planificateur KCL (précédemment, dans les versions 1.x de KCL, il portait le nom d’application de travail KCL) avec l'instance
ConfigsBuilder. -
Le planificateur crée un nouveau thread pour chaque partition (affectée à cette instance de consommateur), qui fonctionne en boucle pour lire des enregistrements dans le flux de données. Elle appelle alors l'instance
StockTradeRecordProcessorpour traiter chaque lot d'enregistrements reçu.
-
- StockTradeRecordProcessor classe
-
Implémentation de l'instance
StockTradeRecordProcessor, qui implémente à son tour cinq méthodes requises :initialize,processRecords,leaseLost,shardEndedetshutdownRequested.Les méthodes
initializeetshutdownRequestedsont utilisées par le KCL pour indiquer au processeur d'enregistrements quand il doit être prêt à commencer à recevoir des enregistrements et quand il devrait s'attendre à cesser de recevoir des enregistrements, respectivement, afin qu'il puisse effectuer des tâches de configuration et de terminaison spécifiques à l'application.leaseLostetshardEndedsont utilisés pour implémenter n'importe quelle logique pour ce qu'il faut faire lorsqu'un bail est perdu ou qu'un traitement a atteint la fin d'une partition. Dans cet exemple, nous enregistrons simplement les messages indiquant ces événements.Le code de ces méthodes est fourni. Le traitement principal se déroule dans la méthode
processRecords, qui utilise à son tourprocessRecordpour chaque enregistrement. Cette dernière méthode est fournie la plupart du temps sous forme de squelette de code vide à implémenter à l'étape suivante qui fournit des explications détaillées.Notez également l'implémentation des méthodes d'assistance pour
processRecord:reportStatsetresetStats, qui sont vides dans le code source d'origine.La méthode
processRecordsest implémentée pour vous et effectue les opérations suivantes :-
Pour chaque enregistrement passé, il appelle
processRecorddessus. -
Si au moins 1 minute s'est écoulée depuis le dernier rapport, il appelle
reportStats(), qui imprime les dernières statistiques, puis deresetStats(), qui efface les statistiques afin que l'intervalle suivant n'inclut que les nouveaux enregistrements. -
Règle l'heure du rapport suivant.
-
Si au moins 1 minute s'est écoulée depuis le dernier point de contrôle, il appelle
checkpoint(). -
Règle l'heure du point de contrôle suivante.
Cette méthode utilise des intervalles de 60 secondes pour la fréquence de création de rapports et de point de contrôle. Pour plus d'informations sur les points de contrôle, consultez la section Utilisation de Kinesis Client Library (français non garanti).
-
- StockStats classe
-
Cette classe prend en charge la conservation des données et le suivi des statistiques dans le temps pour les actions les plus populaires. Ce code est fourni pour vous et contient les méthodes suivantes :
-
addStockTrade(StockTrade): Injecte leStockTradedonné dans les statistiques en cours d'exécution. -
toString(): renvoie les statistiques dans une chaîne formatée.
Cette classe suit les actions les plus populaires en comptabilisant le nombre total de transactions pour chaque action et le nombre maximum. Elle met à jour ces chiffres chaque fois qu'une opération boursière arrive.
-
Ajoutez du code aux méthodes de la classe StockTradeRecordProcessor, comme l'illustrent les étapes suivantes.
Pour implémenter l'application consommateur
-
Implémentez la méthode
processRecorden instanciant un objetStockTradecorrectement dimensionné et en lui ajoutant les données d'enregistrement, puis en consignant un avertissement en cas de problème.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade); -
Implémentez une
reportStatsméthode. Modifiez le format de sortie en fonction de vos préférences.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n"); -
Implémentez la méthode
resetStats, qui crée une nouvelle instancestockStats.stockStats = new StockStats(); -
Implémentez les méthodes suivantes requises par
ShardRecordProcessorl'interface :@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Pour exécuter l'application consommateur
-
Exécutez l'application producteur que vous avez écrite dans Implémenter le producteur pour injecter des enregistrements d'opérations boursières simulées dans votre flux.
-
Vérifiez que la paire de clé d'accès et de clé secrète extraites précédemment (lors de la création de l'utilisateur IAM) sont enregistrées dans le fichier
~/.aws/credentials. -
Exécutez la classe
StockTradesProcessoravec les arguments suivants :StockTradesProcessor StockTradeStream us-west-2Notez que, si vous avez créé votre flux dans une région autre que
us-west-2, vous devez spécifier cette région ici.
Au bout d'une minute, vous devez voir une sortie similaire à la suivante, actualisée toutes les minutes :
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Étapes suivantes
(Facultatif) Étendre le consommateur