Java 구현
참고
전제 조건: 다음 단계를 완료하기 전에 Amazon Managed Streaming for Apache Kafka(Amazon MSK) 또는 Apache Kafka 클러스터가 실행 중이어야 합니다. 생산자와 소비자는 Java 8 이상에서 실행 중이어야 합니다.
생산자와 소비자에 라이브러리를 설치하려면
생산자와 소비자의 pom.xml 파일 내에서 아래 코드를 통해 이 종속성을 추가합니다.
<dependency> <groupId>software.amazon.glue</groupId> <artifactId>schema-registry-serde</artifactId> <version>1.1.5</version> </dependency>또는 AWS Glue Schema Registry Github 리포지토리
를 복제할 수 있습니다. 다음 필수 속성으로 생산자를 설정합니다.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Can replace StringSerializer.class.getName()) with any other key serializer that you may use props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "JSON"); // OR "AVRO"기존 스키마가 없으면 자동 등록을 설정해야 합니다(다음 단계). 적용하려는 스키마가 있는 경우 "my-schema"를 스키마 이름으로 바꿉니다. 또한 스키마 자동 등록이 꺼져 있는 경우 "registry-name"을 제공해야 합니다. 스키마가 "default-registry" 아래에 생성된 경우 레지스트리 이름을 생략할 수 있습니다.
(선택 사항) 이러한 선택적 생산자 속성을 설정합니다. 자세한 속성 설명은 ReadMe 파일
을 참조하세요. props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true"); // If not passed, uses "false" props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka, or stream name in case of Kinesis Data Streams) props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry"); // If not passed, uses "default-registry" props.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 (24 Hours) props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); // Pass a compatibility mode. If not passed, uses Compatibility.BACKWARD props.put(AWSSchemaRegistryConstants.DESCRIPTION, "This registry is used for several purposes."); // If not passed, constructs a description props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB); // If not passed, records are sent uncompressed자동 등록은 기본 레지스트리("default-registry") 아래에 스키마 버전을 등록합니다. 이전 단계에서
SCHEMA_NAME을 지정하지 않으면 주제 이름이SCHEMA_NAME으로 유추됩니다.호환성 모드에 대한 자세한 내용은 스키마 버전 관리 및 호환성 섹션을 참조하세요.
다음 필수 속성으로 소비자를 설정합니다.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2"); // Pass an AWS 리전 props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); // Only required for AVRO data format(선택 사항) 이러한 선택적 소비자 속성을 설정합니다. 자세한 속성 설명은 ReadMe 파일
을 참조하세요. properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); // If not passed, uses 86400000 props.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); // default value is 200 props.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, "com.amazonaws.services.schemaregistry.deserializers.external.ThirdPartyDeserializer"); // For migration fall back scenario