Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Migrasi dari KCL 1.x ke KCL 3.x
Gambaran umum
Panduan ini memberikan petunjuk untuk memigrasikan aplikasi konsumen Anda dari KCL 1.x ke KCL 3.x. Karena perbedaan arsitektur antara KCL 1.x dan KCL 3.x, migrasi memerlukan pembaruan beberapa komponen untuk memastikan kompatibilitas.
KCL 1.x menggunakan kelas dan antarmuka yang berbeda dibandingkan dengan KCL 3.x. Anda harus memigrasikan prosesor rekaman, pabrik prosesor rekaman, dan kelas pekerja ke format yang kompatibel dengan KCL 3.x terlebih dahulu, dan ikuti langkah-langkah migrasi untuk migrasi KCL 1.x ke KCL 3.x.
Langkah migrasi
Topik
Langkah 1: Migrasikan prosesor rekaman
Contoh berikut menunjukkan prosesor rekaman diimplementasikan untuk adaptor KCL 1.x DynamoDB Streams Kinesis:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Untuk memigrasikan kelas RecordProcessor
-
Ubah antarmuka dari
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessordancom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAwaremenjadisoftware.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessorsebagai berikut:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor -
Perbarui pernyataan impor untuk
initializedanprocessRecordsmetode:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; -
Ganti
shutdownRequestedmetode dengan metode baru berikut:leaseLost,shardEnded, danshutdownRequested.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Berikut ini adalah versi terbaru dari kelas prosesor rekaman:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
catatan
DynamoDB Streams Kinesis Adapter sekarang menggunakan model Record. SDKv2 Dalam SDKv2, AttributeValue objek kompleks (BS,NS, ML,SS) tidak pernah mengembalikan null. GunakanhasBs(),hasNs(),hasM(),hasL(), hasSs() metode untuk memverifikasi apakah nilai-nilai ini ada.
Langkah 2: Migrasikan pabrik prosesor rekaman
Pabrik prosesor rekaman bertanggung jawab untuk membuat prosesor rekaman ketika sewa diperoleh. Berikut ini adalah contoh pabrik KCL 1.x:
package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Untuk memigrasikan RecordProcessorFactory
-
Ubah antarmuka yang diimplementasikan dari
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactorykesoftware.amazon.kinesis.processor.ShardRecordProcessorFactory, sebagai berikut:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Berikut ini adalah contoh pabrik prosesor rekaman di 3.0:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
Langkah 3: Migrasikan pekerja
Dalam versi 3.0 dari KCL, kelas baru, yang disebut Scheduler, menggantikan kelas Worker. Berikut ini adalah contoh pekerja KCL 1.x:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Untuk memigrasikan pekerja
-
Ubah
importpernyataan untukWorkerkelas ke pernyataan impor untukSchedulerdanConfigsBuilderkelas.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder; -
Impor
StreamTrackerdan ubah imporStreamsWorkerFactorykeStreamsSchedulerFactory.import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory; -
Pilih posisi untuk memulai aplikasi. Bisa jadi
TRIM_HORIZONatauLATEST.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -
Buat sebuah
StreamTrackerinstance.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); -
Buat
AmazonDynamoDBStreamsAdapterClientobjek.import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion); -
Buat
ConfigsBuilderobjek.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory()); -
Buat
SchedulermenggunakanConfigsBuilderseperti yang ditunjukkan pada contoh berikut:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
penting
CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2XPengaturan mempertahankan kompatibilitas antara DynamoDB Streams Kinesis Adapter untuk KCL v3 dan KCL v1, bukan antara KCL v2 dan v3.
Langkah 4: Ikhtisar dan rekomendasi konfigurasi KCL 3.x
Untuk penjelasan rinci tentang konfigurasi yang diperkenalkan setelah KCL 1.x yang relevan di KCL 3.x lihat konfigurasi KCL dan konfigurasi klien migrasi KCL.
penting
Alih-alih langsung membuat objekcheckpointConfig,,,coordinatorConfig, processorConfig dan leaseManagementConfig metricsConfigretrievalConfig, kami sarankan menggunakan ConfigsBuilder untuk mengatur konfigurasi di KCL 3.x dan versi yang lebih baru untuk menghindari masalah inisialisasi Scheduler. ConfigsBuildermenyediakan cara yang lebih fleksibel dan dapat dipelihara untuk mengkonfigurasi aplikasi KCL Anda.
Konfigurasi dengan nilai default pembaruan di KCL 3.x
billingMode-
Dalam KCL versi 1.x, nilai default untuk
billingModediatur ke.PROVISIONEDNamun, dengan KCL versi 3.x, defaultnyabillingModeadalahPAY_PER_REQUEST(mode on-demand). Kami menyarankan Anda menggunakan mode kapasitas sesuai permintaan untuk tabel sewa Anda untuk secara otomatis menyesuaikan kapasitas berdasarkan penggunaan Anda. Untuk panduan tentang penggunaan kapasitas yang disediakan untuk tabel sewa Anda, lihat Praktik terbaik untuk tabel sewa dengan mode kapasitas yang disediakan. idleTimeBetweenReadsInMillis-
Dalam KCL versi 1.x, nilai default untuk diatur ke
idleTimeBetweenReadsInMillisadalah 1.000 (atau 1 detik). KCL versi 3.x menetapkan nilai defaultdleTimeBetweenReadsInMillisuntuk i menjadi 1.500 (atau 1,5 detik), tetapi Amazon DynamoDB Streams Kinesis Adapter mengganti nilai default menjadi 1.000 (atau 1 detik).
Konfigurasi baru di KCL 3.x
leaseAssignmentIntervalMillis-
Konfigurasi ini mendefinisikan interval waktu sebelum pecahan yang baru ditemukan mulai diproses, dan dihitung sebagai 1,5 ×.
leaseAssignmentIntervalMillisJika pengaturan ini tidak dikonfigurasi secara eksplisit, interval waktu default menjadi 1,5 ×.failoverTimeMillisMemproses pecahan baru melibatkan pemindaian tabel sewa dan menanyakan indeks sekunder global (GSI) pada tabel sewa. MenurunkanleaseAssignmentIntervalMillispeningkatan frekuensi operasi pemindaian dan kueri ini, menghasilkan biaya DynamoDB yang lebih tinggi. Kami merekomendasikan pengaturan nilai ini ke 2000 (atau 2 detik) untuk meminimalkan keterlambatan dalam memproses pecahan baru. shardConsumerDispatchPollIntervalMillis-
Konfigurasi ini mendefinisikan interval antara jajak pendapat berturut-turut oleh konsumen shard untuk memicu transisi status. Di KCL versi 1.x, perilaku ini dikendalikan oleh
idleTimeInMillisparameter, yang tidak diekspos sebagai pengaturan yang dapat dikonfigurasi. Dengan KCL versi 3.x, kami sarankan untuk mengatur konfigurasi ini agar sesuai dengan nilai yang digunakanidleTimeInMillisdalam pengaturan KCL versi 1.x Anda.
Langkah 5: Migrasi dari KCL 2.x ke KCL 3.x
Untuk memastikan kelancaran transisi dan kompatibilitas dengan versi Kinesis Client Library (KCL) terbaru, ikuti langkah 5-8 dalam petunjuk panduan migrasi untuk meningkatkan dari KCL 2.x ke KCL 3.x.
Untuk masalah pemecahan masalah umum KCL 3.x, lihat Memecahkan masalah aplikasi konsumen KCL.