Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Implementa il consumatore
L'applicazione consumer in questo tutorial elabora continuamente le operazioni azionarie nel flusso di dati. Quindi, genera i titoli più acquistati e venduti ogni minuto. L'applicazione si basa sulla Kinesis Client Library (KCL), che esegue numerose delle attività impegnative comuni alle applicazioni consumer. Per ulteriori informazioni, consulta Informazioni su KCL 1.x e 2.x.
Consulta il codice sorgente e rivedi le informazioni riportate di seguito.
- StockTradesProcessor classe
- 
                    La classe principale del consumatore, fornita per te, che svolge le seguenti attività: - 
                            Legge l'applicazione, il flusso di dati e i nomi delle regioni, passati come argomenti. 
- 
                            Crea un' KinesisAsyncClientistanza con il nome della regione.
- 
                            Crea un'istanza StockTradeRecordProcessorFactoryche serve istanze diShardRecordProcessor, implementate da un'istanzaStockTradeRecordProcessor.
- 
                            Crea un' ConfigsBuilderistanza con l'StockTradeRecordProcessorFactoryistanzaKinesisAsyncClientStreamNameApplicationName,, e. Questo è utile per creare tutte le configurazioni con valori predefiniti.
- 
                            Crea uno scheduler KCL (in precedenza, nelle versioni di KCL 1.x era noto come worker KCL) con l'istanza ConfigsBuilder.
- 
                            Lo scheduler crea un nuovo thread per ciascun shard (assegnato a questa istanza consumer), che in un ciclo continuo legge i record dai flussi di dati. Quindi invoca l'istanza StockTradeRecordProcessorper elaborare ogni batch di record ricevuto.
 
- 
                            
- StockTradeRecordProcessor classe
- 
                    Implementazione dell'istanza StockTradeRecordProcessor, che a sua volta implementa cinque metodi richiesti:initialize,processRecords,leaseLost,shardEndedeshutdownRequested.I metodi initializeeshutdownRequestedvengono utilizzati da KCL per consentire all'elaboratore di record di sapere quando dovrebbe essere pronto a iniziare a ricevere record e quando dovrebbe aspettarsi di non ricevere più record, in modo da poter effettuare qualsiasi attività di configurazione e cessazione specifica per l'app.leaseLosteshardEndedsono utilizzati per implementare qualsiasi logica su cosa fare quando un lease viene perso o quando una elaborazione ha raggiunto la fine del frammento. In questo esempio, registriamo semplicemente i messaggi che indicano questi eventi.Ti forniamo il codice per questi metodi. L'elaborazione principale si verifica nel metodo processRecords, che a sua volta utilizzaprocessRecordper ogni record. Quest'ultimo metodo viene fornito come codice di base per lo più vuoto da implementare nella fase successiva, dove è spiegato in modo dettagliato.Da segnalare è anche l'implementazione dei metodi di supporto per processRecord, ovveroreportStatseresetStats, che sono vuoti nel codice sorgente originale.Il metodo processRecordsviene implementato per te ed esegue questa procedura:- 
                            Per ogni record passato, chiama processRecordsu di esso.
- 
                            Se è trascorso almeno 1 minuto dall'ultimo report, chiama reportStats(), che consente di stampare le statistiche più recenti, seguito daresetStats(), che cancella le statistiche in modo che l'intervallo successivo includa solo i nuovi record.
- 
                            Imposta l'orario della creazione di report successiva. 
- 
                            Se è trascorso almeno 1 minuto dall'ultimo checkpoint, chiama checkpoint().
- 
                            Imposta l'orario della creazione di checkpoint successiva. 
 Questo metodo utilizza intervalli di 60 secondi per la frequenza di creazione di report e checkpoint. Per ulteriori informazioni sul checkpointing, consulta Utilizzo della Kinesis Client Library. 
- 
                            
- StockStats classe
- 
                    Questa classe fornisce la conservazione dei dati e il monitoraggio delle statistiche per i titoli più comuni nel tempo. Questo codice viene fornito per te e include i seguenti metodi: - 
                            addStockTrade(StockTrade): inserisce un datoStockTradenelle statistiche in esecuzione.
- 
                            toString(): restituisce le statistiche in una stringa formattata.
 Questa classe tiene traccia delle azioni più popolari tenendo un conteggio progressivo del numero totale di negoziazioni per ogni azione e del conteggio massimo. Aggiorna questi conteggi ogni volta che si verifica uno scambio. 
- 
                            
Aggiungi codice ai metodi della classe StockTradeRecordProcessor, come mostrato nella procedura seguente. 
Per implementare il consumer
- 
                Implementare il metodo processRecordcreando un'istanza di un oggettoStockTradedelle dimensioni corrette e aggiungendo a essa i dati del record, registrando un avviso se si verifica un problema.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
- 
                Implementa un reportStatsmetodo. Modifica il formato di output in base alle tue preferenze.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
- 
                Implementare il metodo resetStats, che crea una nuova istanzastockStats.stockStats = new StockStats();
- 
                Implementa i seguenti metodi richiesti dall' ShardRecordProcessorinterfaccia:@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Per eseguire il consumer
- 
                Eseguire il producer scritto in Implementa il produttore per inserire record di scambi simulati nel flusso. 
- 
                Verifica che la coppia chiave di accesso e chiave segreta recuperata durante la creazione dell'utente IAM sia stata salvata nel file ~/.aws/credentials.
- 
                Eseguire la classe StockTradesProcessorcon i seguenti argomenti:StockTradesProcessor StockTradeStream us-west-2Nota: se è stato creato un flusso in una regione diversa da us-west-2, è necessario specificare quella regione qui.
Dopo un minuto, si dovrebbe visualizzare un output come il seguente, aggiornato ogni minuto:
  
  ****** Shard shardId-000000000001 stats for last 1 minute ******
  Most popular stock being bought: WMT, 27 buys.
  Most popular stock being sold: PTR, 14 sells.
  ****************************************************************
        Passaggi successivi
(Facoltativo) Estendi il consumatore