Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Implementasi Java
catatan
Prasyarat: Sebelum menyelesaikan langkah-langkah berikut, Anda harus memiliki sebuah klaster Amazon Managed Streaming for Apache Kafka (Amazon MSK) atau Apache Kafka yang berjalan. Produsen dan konsumen Anda harus berjalan pada Java 8 atau yang lebih tinggi.
Untuk menginstal perpustakaan pada produsen dan konsumen:
Di dalam file pom.xml baik dari produsen dan konsumen, tambahkan dependensi ini melalui kode di bawah ini:
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>Atau, Anda dapat mengkloning Repositori Github Registri Skema AWS Glue
. Siapkan produsen Anda dengan properti yang diperlukan ini:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"Jika tidak ada skema yang ada, maka pendaftaran otomatis harus Anda aktifkan (langkah berikutnya). Jika Anda memiliki skema yang ingin Anda terapkan, maka ganti "my-schema" dengan nama skema Anda. Selain itu, "registry-name" harus disediakan jika skema pendaftaran otomatis dimatikan. Jika skema dibuat menggunakan "default-registry", maka nama registri dapat dihilangkan.
(Opsional) Mengatur salah satu properti produsen opsional ini. Untuk deskripsi properti terperinci, lihat ReadMe file
. props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressedPendaftaran otomatis mendaftarkan versi skema dengan menggunakan registri default ("default-registry"). Jika
SCHEMA_NAMEtidak ditentukan dalam langkah sebelumnya, maka nama topik disimpulkan sebagaiSCHEMA_NAME.Lihat Pembuatan versi dan kompatibilitas skema untuk informasi lebih lanjut tentang mode kompatibilitas.
Siapkan konsumen Anda dengan properti yang diperlukan berikut ini:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Wilayah AWS props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format(Opsional) Mengatur properti konsumen opsional ini. Untuk deskripsi properti terperinci, lihat ReadMe file
. properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario