Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Implemente una aplicación de consumo de KCL para las transmisiones de Amazon Keyspaces CDC
En este tema se proporciona una step-by-step guía para implementar una aplicación de consumo de KCL para procesar las transmisiones de Amazon Keyspaces CDC.
-
Requisitos previos: antes de empezar, asegúrese de tener:
-
Una tabla de Amazon Keyspaces con una transmisión de CDC
-
Permisos de IAM necesarios para que el director de IAM pueda acceder a la transmisión CDC de Amazon Keyspaces, crear tablas de DynamoDB y acceder a ellas para el procesamiento de transmisiones de KCL y permisos para publicar métricas en ellas. CloudWatch Para obtener más información y un ejemplo de política, consulte. Permisos para procesar las transmisiones de los CDC de Amazon Keyspaces con la biblioteca de clientes de Kinesis (KCL)
Asegúrese de que AWS las credenciales válidas estén configuradas en su configuración local. Para obtener más información, consulte Almacenamiento de claves de acceso para el acceso programático.
-
Java Development Kit (JDK) 8 o posterior,
-
Los requisitos figuran en el archivo readme
de Github.
-
-
En este paso, agregas la dependencia de KCL a tu proyecto. Para Maven, agrega lo siguiente a tu archivo pom.xml:
<dependencies> <dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>software.amazon.keyspaces</groupId> <artifactId>keyspaces-streams-kinesis-adapter</artifactId> <version>1.0.0</version> </dependency> </dependencies>nota
Compruebe siempre la última versión de KCL en el repositorio de GitHub KCL
. -
Cree una clase de fábrica que produzca instancias de procesadores de registros:
import software.amazon.awssdk.services.keyspacesstreams.model.Record; import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord; import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput; import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor { private String shardId; @Override public void initialize(InitializationInput initializationInput) { this.shardId = initializationInput.shardId(); System.out.println("Initializing record processor for shard: " + shardId); } @Override public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) { try { for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) { Record keyspacesRecord = record.getRecord(); System.out.println("Received record: " + keyspacesRecord); } if (!processRecordsInput.records().isEmpty()) { RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer(); try { checkpointer.checkpoint(); System.out.println("Checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while checkpointing for shard: " + shardId + " " + e); } } } catch (Exception e) { System.out.println("Error processing records for shard: " + shardId + " " + e); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease lost for shard: " + shardId); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { System.out.println("Shard ended: " + shardId); try { // This is required. Checkpoint at the end of the shard shardEndedInput.checkpointer().checkpoint(); System.out.println("Final checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while final checkpointing for shard: " + shardId + " " + e); throw new RuntimeException("Error while final checkpointing", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { System.out.println("Shutdown requested for shard " + shardId); try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (Exception e) { System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e); } } } -
Cree una fábrica de discos como se muestra en el siguiente ejemplo.
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class RecordProcessorFactory implements ShardRecordProcessorFactory { private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>(); @Override public ShardRecordProcessor shardRecordProcessor() { System.out.println("Creating new RecordProcessor"); RecordProcessor processor = new RecordProcessor(); processors.add(processor); return processor; } } -
En este paso, se crea la clase base que se va a configurar KCLv3 y el adaptador Amazon Keyspaces.
import com.example.KCLExample.utils.RecordProcessorFactory; import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient; import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse; import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.polling.PollingConfig; public class KCLTestBase { protected KeyspacesStreamsClient streamsClient; protected KinesisAsyncClient adapterClient; protected DynamoDbAsyncClient dynamoDbAsyncClient; protected CloudWatchAsyncClient cloudWatchClient; protected Region region; protected RecordProcessorFactory recordProcessorFactory; protected Scheduler scheduler; protected Thread schedulerThread; public void baseSetUp() { recordProcessorFactory = new RecordProcessorFactory(); setupKCLBase(); } protected void setupKCLBase() { region = Region.US_EAST_1; streamsClient = KeyspacesStreamsClient.builder() .region(region) .build(); adapterClient = new AmazonKeyspacesStreamsAdapterClient( streamsClient, region); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .region(region) .build(); cloudWatchClient = CloudWatchAsyncClient.builder() .region(region) .build(); } protected void startScheduler(Scheduler scheduler) { this.scheduler = scheduler; schedulerThread = new Thread(() -> scheduler.run()); schedulerThread.start(); } protected void shutdownScheduler() { if (scheduler != null) { scheduler.shutdown(); try { schedulerThread.join(30000); } catch (InterruptedException e) { System.out.println("Error while shutting down scheduler " + e); } } } protected Scheduler createScheduler(String streamArn, String leaseTableName) { String workerId = "worker-" + System.currentTimeMillis(); // Create ConfigsBuilder ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName); // Configure retrieval config for polling PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient); // Create the Scheduler return StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig), streamsClient, region ); } private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) { ConfigsBuilder configsBuilder = new ConfigsBuilder( streamArn, leaseTableName, adapterClient, dynamoDbAsyncClient, cloudWatchClient, workerId, recordProcessorFactory); configureCoordinator(configsBuilder.coordinatorConfig()); configureLeaseManagement(configsBuilder.leaseManagementConfig()); configureProcessor(configsBuilder.processorConfig()); configureStreamTracker(configsBuilder, streamArn); return configsBuilder; } private void configureCoordinator(CoordinatorConfig config) { config.skipShardSyncAtWorkerInitializationIfLeasesExist(true) .parentShardPollIntervalMillis(1000) .shardConsumerDispatchPollIntervalMillis(500); } private void configureLeaseManagement(LeaseManagementConfig config) { config.shardSyncIntervalMillis(0) .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0) .leasesRecoveryAuditorExecutionFrequencyMillis(5000) .leaseAssignmentIntervalMillis(1000L); } private void configureProcessor(ProcessorConfig config) { config.callProcessRecordsEvenForEmptyRecordList(true); } private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) { StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); configsBuilder.streamTracker(streamTracker); } public void deleteAllDdbTables(String baseTableName) { List<String> tablesToDelete = Arrays.asList( baseTableName, baseTableName + "-CoordinatorState", baseTableName + "-WorkerMetricStats" ); for (String tableName : tablesToDelete) { deleteTable(tableName); } } private void deleteTable(String tableName) { DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder() .tableName(tableName) .build(); try { DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get(); System.out.println("Table deletion response " + response); } catch (InterruptedException | ExecutionException e) { System.out.println("Error deleting table: " + tableName + " " + e); } } } -
En este paso, implementa la clase de procesador de registros para que su aplicación comience a procesar los eventos de cambio.
import software.amazon.kinesis.coordinator.Scheduler; public class KCLTest { private static final int APP_RUNTIME_SECONDS = 1800; private static final int SLEEP_INTERNAL_MS = 60*1000; public static void main(String[] args) { KCLTestBase kclTestBase; kclTestBase = new KCLTestBase(); kclTestBase.baseSetUp(); // Create and start scheduler String leaseTableName = generateUniqueApplicationName(); // Update below to your Stream ARN String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529"; Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName); kclTestBase.startScheduler(scheduler); // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this // example we will shut it down after APP_RUNTIME_SECONDS long startTime = System.currentTimeMillis(); long endTime = startTime + (APP_RUNTIME_SECONDS * 1000); while (System.currentTimeMillis() < endTime) { try { // Print and sleep every minute Thread.sleep(SLEEP_INTERNAL_MS); System.out.println("Application is running"); } catch (InterruptedException e) { System.out.println("Interrupted while waiting for records"); Thread.currentThread().interrupt(); break; } } // Stop the scheduler kclTestBase.shutdownScheduler(); kclTestBase.deleteAllDdbTables(leaseTableName); } public static String generateUniqueApplicationName() { String timestamp = String.valueOf(System.currentTimeMillis()); String randomString = java.util.UUID.randomUUID().toString().substring(0, 8); return String.format("KCL-App-%s-%s", timestamp, randomString); } }
Prácticas recomendadas
Siga estas prácticas recomendadas cuando utilice KCL con las transmisiones de los CDC de Amazon Keyspaces:
- Error handling (Control de errores)
-
Implemente una sólida gestión de errores en su procesador de registros para gestionar las excepciones sin problemas. Considere la posibilidad de implementar una lógica de reintento para los errores transitorios.
- Frecuencia de los puntos de control
-
Equilibre la frecuencia de los puntos de control para minimizar el procesamiento duplicado y, al mismo tiempo, garantizar un seguimiento razonable del progreso. Los puntos de control demasiado frecuentes pueden afectar al rendimiento, mientras que los controles demasiado infrecuentes pueden provocar un mayor reprocesamiento si un trabajador no lo logra.
- Escalamiento de los trabajadores
-
Escale la cantidad de trabajadores en función de la cantidad de fragmentos de su transmisión de los CDC. Un buen punto de partida es tener un trabajador por fragmento, pero es posible que tengas que ajustarlo en función de tus requisitos de procesamiento.
- Supervisión
-
Utilice CloudWatch las métricas proporcionadas por KCL para supervisar el estado y el rendimiento de su aplicación de consumo. Las métricas clave incluyen la latencia de procesamiento, la antigüedad de los puntos de control y el recuento de arrendamientos.
- Pruebas
-
Pruebe minuciosamente su aplicación de consumo, incluidos escenarios como los fallos de los trabajadores, la refragmentación de las transmisiones y las condiciones de carga variables.
Uso de KCL con lenguajes distintos de Java
Si bien KCL es principalmente una biblioteca de Java, puede usarla con otros lenguajes de programación a través de. MultiLangDaemon MultiLangDaemon Se trata de un daemon basado en Java que gestiona la interacción entre un procesador de registros que no es de Java y el KCL.
KCL es compatible con los siguientes idiomas:
-
Python
-
Ruby
-
Node.js
-
.NET
Resolución de problemas
En esta sección se proporcionan soluciones a los problemas habituales que pueden surgir al utilizar KCL con las transmisiones de los CDC de Amazon Keyspaces.
- Procesamiento lento
-
Si su solicitud de consumidor procesa los registros lentamente, considere lo siguiente:
-
Aumentar el número de instancias de trabajadores
-
Optimizar la lógica de procesamiento de registros
-
Comprobación de cuellos de botella en los sistemas posteriores
-
- Procesamiento duplicado
-
Si observa un procesamiento duplicado de registros, compruebe la lógica de los puntos de control. Asegúrese de realizar un control después de procesar correctamente los registros.
- Fallos de los trabajadores
-
Si los trabajadores fallan con frecuencia, compruebe:
-
Restricciones de recursos (CPU, memoria)
-
Problemas de conectividad de red
-
Problemas con los permisos
-
- Problemas con la tabla de arrendamiento
-
Si tienes problemas con la tabla de arrendamientos de KCL:
-
Comprueba que tu aplicación tiene los permisos adecuados para acceder a la tabla de Amazon Keyspaces.
-
Compruebe que la tabla tiene un rendimiento aprovisionado suficiente
-