View a markdown version of this page

C# 구현 - AWS Glue

C# 구현

참고

전제 조건: 다음 단계를 완료하기 전에 Amazon Managed Streaming for Apache Kafka(Amazon MSK) 또는 Apache Kafka 클러스터가 실행 중이어야 합니다. 생산자와 소비자는 .NET 8.0 이상에서 실행 중이어야 합니다.

설치

C# 애플리케이션의 경우 다음 방법 중 하나를 사용하여 AWS Glue Schema Registry SerDe NuGet 패키지를 설치합니다.

.NET CLI:

다음 명령을 실행하여 패키지 설치:

dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>

여기서 <rid>1.0.0-linux-x64, 1.0.0-linux-musl-x64 또는 1.0.0-linux-arm64

PackageReference(.csproj 파일):

프로젝트 파일에 다음을 추가:

<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />

여기서 <rid>1.0.0-linux-x64, 1.0.0-linux-musl-x64 또는 1.0.0-linux-arm64

구성 파일 설정

필요한 설정을 사용하여 구성 속성 파일(예: gsr-config.properties) 생성:

최소 구성:

다음 예제에서는 최소 구성 샘플을 보여줍니다.

region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true

Kafka SerDes에 C# Glue Schema 클라이언트 라이브러리 사용

샘플 Serializer 사용:

다음 예제에서는 Serializer 사용 방법을 보여줍니다.

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
샘플 Deserializer 사용:

다음 예제에서는 Deserializer 사용 방법을 보여줍니다.

private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);

KafkaFlow for SerDes에 C# Glue Schema 클라이언트 라이브러리 사용

샘플 Serializer 사용:

다음 예제에서는 Serializer를 사용하여 KafkaFlow를 구성하는 방법을 보여줍니다.

services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
샘플 Deserializer 사용:

다음 예제에서는 Deserializer를 사용하여 KafkaFlow를 구성하는 방법을 보여줍니다.

.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )

선택적 생산자 속성

추가 선택적 속성을 사용하여 구성 파일을 확장할 수 있습니다.

# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB

지원되는 날짜 형식

Java 및 C# 구현 모두 동일한 데이터 형식을 지원합니다.

  • AVRO: Apache Avro 이진 형식

  • JSON: JSON 스키마 형식

  • PROTOBUF: 프로토콜 버퍼 형식

참고