Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Gunakan topik penyimpanan offset khusus
Untuk memberikan kontinuitas offset antara konektor sumber, Anda dapat menggunakan topik penyimpanan offset pilihan Anda alih-alih topik default. Menentukan topik penyimpanan offset membantu Anda menyelesaikan tugas seperti membuat konektor sumber yang melanjutkan pembacaan dari offset terakhir konektor sebelumnya.
Untuk menentukan topik penyimpanan offset, Anda memberikan nilai untuk offset.storage.topic properti dalam konfigurasi pekerja sebelum membuat konektor. Jika Anda ingin menggunakan kembali topik penyimpanan offset untuk menggunakan offset dari konektor yang dibuat sebelumnya, Anda harus memberi konektor baru nama yang sama dengan konektor lama. Jika Anda membuat topik penyimpanan offset kustom, Anda harus mengatur cleanup.policycompact dalam konfigurasi topik Anda.
catatan
Jika Anda menentukan topik penyimpanan offset saat membuat konektor sink, MSK Connect akan membuat topik jika belum ada. Namun, topik tersebut tidak akan digunakan untuk menyimpan offset konektor.
Offset konektor sink malah dikelola menggunakan protokol grup konsumen Kafka. Setiap konektor wastafel membuat grup bernamaconnect-{CONNECTOR_NAME}. Selama grup konsumen ada, konektor wastafel berturut-turut yang Anda buat dengan CONNECTOR_NAME nilai yang sama akan berlanjut dari offset komitmen terakhir.
penting
Jika Anda ingin memperbarui konfigurasi konektor yang ada sambil mempertahankan kontinuitas offset, gunakan API. UpdateConnector Untuk informasi selengkapnya, lihat Perbarui konektor.
contoh: Menentukan topik penyimpanan offset saat membuat ulang konektor sumber
Jika Anda perlu menghapus dan membuat ulang konektor sambil mempertahankan kontinuitas offset, Anda dapat menentukan topik penyimpanan offset dalam konfigurasi pekerja Anda. Misalnya, Anda memiliki konektor change data capture (CDC) dan Anda ingin membuatnya kembali tanpa kehilangan tempat Anda di aliran CDC. Langkah-langkah berikut menunjukkan bagaimana menyelesaikan tugas ini.
-
Pada mesin klien Anda, jalankan perintah berikut untuk menemukan nama topik penyimpanan offset konektor Anda. Ganti
dengan string broker bootstrap cluster Anda. Untuk petunjuk tentang mendapatkan string broker bootstrap Anda, lihatDapatkan broker bootstrap untuk cluster MSK Amazon.<bootstrapBrokerString><path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server<bootstrapBrokerString>Output berikut menunjukkan daftar semua topik cluster, termasuk topik konektor internal default. Dalam contoh ini, konektor CDC yang ada menggunakan topik penyimpanan offset default yang dibuat oleh MSK Connect. Inilah sebabnya mengapa topik penyimpanan offset disebut
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.__consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2 -
Buka konsol MSK Amazon di https://console.aws.amazon.com/msk/
. -
Pilih konektor Anda dari daftar Konektor. Salin dan simpan konten bidang konfigurasi Konektor sehingga Anda dapat memodifikasinya dan menggunakannya untuk membuat konektor baru.
-
Pilih Hapus untuk menghapus konektor. Kemudian masukkan nama konektor di bidang input teks untuk mengonfirmasi penghapusan.
-
Buat konfigurasi pekerja khusus dengan nilai yang sesuai dengan skenario Anda. Untuk petunjuk, lihat Buat konfigurasi pekerja khusus.
Dalam konfigurasi pekerja Anda, Anda harus menentukan nama topik penyimpanan offset yang sebelumnya Anda ambil sebagai nilai untuk
offset.storage.topiclike dalam konfigurasi berikut.config.providers.secretManager.param.aws.region=eu-west-3 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 -
penting
Anda harus memberikan konektor baru Anda nama yang sama dengan konektor lama.
Buat konektor baru menggunakan konfigurasi pekerja yang Anda atur di langkah sebelumnya. Untuk petunjuk, lihat Buat konektor.