Implementasi Java - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Implementasi Java

catatan

Prasyarat: Sebelum menyelesaikan langkah-langkah berikut, Anda harus memiliki sebuah klaster Amazon Managed Streaming for Apache Kafka (Amazon MSK) atau Apache Kafka yang berjalan. Produsen dan konsumen Anda harus berjalan pada Java 8 atau yang lebih tinggi.

Untuk menginstal perpustakaan pada produsen dan konsumen:

  1. Di dalam file pom.xml baik dari produsen dan konsumen, tambahkan dependensi ini melalui kode di bawah ini:

    <dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>

    Atau, Anda dapat mengkloning Repositori Github Registri Skema AWS Glue.

  2. Siapkan produsen Anda dengan properti yang diperlukan ini:

    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"

    Jika tidak ada skema yang ada, maka pendaftaran otomatis harus Anda aktifkan (langkah berikutnya). Jika Anda memiliki skema yang ingin Anda terapkan, maka ganti "my-schema" dengan nama skema Anda. Selain itu, "registry-name" harus disediakan jika skema pendaftaran otomatis dimatikan. Jika skema dibuat menggunakan "default-registry", maka nama registri dapat dihilangkan.

  3. (Opsional) Mengatur salah satu properti produsen opsional ini. Untuk deskripsi properti terperinci, lihat ReadMe file.

    props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed

    Pendaftaran otomatis mendaftarkan versi skema dengan menggunakan registri default ("default-registry"). Jika SCHEMA_NAME tidak ditentukan dalam langkah sebelumnya, maka nama topik disimpulkan sebagai SCHEMA_NAME.

    Lihat Pembuatan versi dan kompatibilitas skema untuk informasi lebih lanjut tentang mode kompatibilitas.

  4. Siapkan konsumen Anda dengan properti yang diperlukan berikut ini:

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an Wilayah AWS props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format
  5. (Opsional) Mengatur properti konsumen opsional ini. Untuk deskripsi properti terperinci, lihat ReadMe file.

    properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario