C# 구현
참고
전제 조건: 다음 단계를 완료하기 전에 Amazon Managed Streaming for Apache Kafka(Amazon MSK) 또는 Apache Kafka 클러스터가 실행 중이어야 합니다. 생산자와 소비자는 .NET 8.0 이상에서 실행 중이어야 합니다.
설치
C# 애플리케이션의 경우 다음 방법 중 하나를 사용하여 AWS Glue Schema Registry SerDe NuGet 패키지를 설치합니다.
.NET CLI:
다음 명령을 실행하여 패키지 설치:
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
여기서 <rid>는 1.0.0-linux-x64, 1.0.0-linux-musl-x64 또는 1.0.0-linux-arm64
PackageReference(.csproj 파일):
프로젝트 파일에 다음을 추가:
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
여기서 <rid>는 1.0.0-linux-x64, 1.0.0-linux-musl-x64 또는 1.0.0-linux-arm64
구성 파일 설정
필요한 설정을 사용하여 구성 속성 파일(예: gsr-config.properties) 생성:
최소 구성:
다음 예제에서는 최소 구성 샘플을 보여줍니다.
region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true
Kafka SerDes에 C# Glue Schema 클라이언트 라이브러리 사용
샘플 Serializer 사용:
다음 예제에서는 Serializer 사용 방법을 보여줍니다.
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
샘플 Deserializer 사용:
다음 예제에서는 Deserializer 사용 방법을 보여줍니다.
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
KafkaFlow for SerDes에 C# Glue Schema 클라이언트 라이브러리 사용
샘플 Serializer 사용:
다음 예제에서는 Serializer를 사용하여 KafkaFlow를 구성하는 방법을 보여줍니다.
services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
샘플 Deserializer 사용:
다음 예제에서는 Deserializer를 사용하여 KafkaFlow를 구성하는 방법을 보여줍니다.
.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )
선택적 생산자 속성
추가 선택적 속성을 사용하여 구성 파일을 확장할 수 있습니다.
# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB
지원되는 날짜 형식
Java 및 C# 구현 모두 동일한 데이터 형식을 지원합니다.
AVRO: Apache Avro 이진 형식
JSON: JSON 스키마 형식
PROTOBUF: 프로토콜 버퍼 형식
참고
라이브러리를 시작하려면 https://www.nuget.org/packages/AWS.Glue.SchemaRegistry
페이지 방문