Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Escribir en Kinesis Data Stream mediante KPL
En las secciones siguientes, se muestra un código de ejemplo con una progresión que va desde el productor más básico hasta un código completamente asíncrono.
Código de productor básico
El siguiente código es todo lo que necesita para escribir un productor mínimamente funcional. Los registros de usuario de Amazon Kinesis Producer Library (KPL) se procesan en segundo plano.
// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...
Responder a los resultados de manera síncrona
En el ejemplo anterior, el código no comprobó si el procesamiento de registros de usuario de KPL finalizó correctamente. KPL efectúa los reintentos necesarios en caso de error. Sin embargo, si desea comprobar los resultados, puede examinarlos con los objetos Future que devuelve addUserRecord, como en el siguiente ejemplo (el ejemplo anterior se incluye para aportar contexto):
KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccessful()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }
Responder a los resultados de manera asíncrona
El ejemplo anterior llama a get() en un objeto Future que bloquea el tiempo de ejecución. Si no desea bloquear el tiempo de ejecución, puede utilizar una devolución de llamada asíncrona, tal y como se muestra en el ejemplo siguiente:
KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }