本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
C# 實作
注意
必要條件:在完成下列步驟前,您必須擁有執行中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或 Apache Kafka 叢集。您的生產者和消費者需要在 .NET 8.0 或更高版本上執行。
安裝
對於 C# 應用程式,請使用下列其中一種方法安裝 AWS Glue 結構描述登錄檔 SerDe NuGet 套件:
.NET CLI:
使用下列命令來安裝套件:
dotnet add package Aws.Glue.SchemaRegistry --version 1.0.0-<rid>
其中 <rid> 可以是 1.0.0-linux-x64,1.0.0-linux-musl-x64或 1.0.0-linux-arm64
PackageReference (在您的 .csproj 檔案中):
將下列項目新增至您的專案檔案:
<PackageReference Include="Aws.Glue.SchemaRegistry" Version="1.0.0-<rid>" />
其中 <rid> 可以是 1.0.0-linux-x64,1.0.0-linux-musl-x64或 1.0.0-linux-arm64
組態檔案設定
使用必要的設定建立組態屬性檔案 (例如 gsr-config.properties):
最小組態:
以下顯示最低組態範例:
region=us-east-1 registry.name=default-registry dataFormat=AVRO schemaAutoRegistrationEnabled=true
針對 Kafka SerDes 使用 C# Glue 結構描述用戶端程式庫
序列化程式用量範例:
下列範例示範如何使用 序列化程式:
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var protobufSerializer = new GlueSchemaRegistryKafkaSerializer(PROTOBUF_CONFIG_PATH); var serialized = protobufSerializer.Serialize(message, message.Descriptor.FullName); // send serialized bytes to Kafka using producer.Produce(serialized)
還原序列化程式用量範例:
下列範例示範如何使用還原序列化程式:
private static readonly string PROTOBUF_CONFIG_PATH = "<PATH_TO_CONFIG_FILE>"; var dataConfig = new GlueSchemaRegistryDataFormatConfiguration( new Dictionary<string, dynamic> { { GlueSchemaRegistryConstants.ProtobufMessageDescriptor, message.Descriptor } } ); var protobufDeserializer = new GlueSchemaRegistryKafkaDeserializer(PROTOBUF_CONFIG_PATH, dataConfig); // read message from Kafka using serialized = consumer.Consume() var deserializedObject = protobufDeserializer.Deserialize(message.Descriptor.FullName, serialized);
搭配 KafkaFlow for SerDes 使用 C# Glue 結構描述用戶端程式庫
序列化程式用量範例:
下列範例示範如何使用序列化程式設定 KafkaFlow:
services.AddKafka(kafka => kafka .UseConsoleLog() .AddCluster(cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .AddProducer<CustomerProducer>(producer => producer .DefaultTopic("customer-events") .AddMiddlewares(m => m .AddSerializer<GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufSerializer<Customer>("config/gsr-config.properties") ) ) ) ) );
還原序列化程式用量範例:
下列範例示範如何使用還原序列化程式設定 KafkaFlow:
.AddConsumer(consumer => consumer .Topic("customer-events") .WithGroupId("customer-group") .WithBufferSize(100) .WithWorkersCount(10) .AddMiddlewares(middlewares => middlewares .AddDeserializer<GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>>( () => new GlueSchemaRegistryKafkaFlowProtobufDeserializer<Customer>("config/gsr-config.properties") ) .AddTypedHandlers(h => h.AddHandler<CustomerHandler>()) ) )
選用生產者屬性
您可以使用其他選用屬性來擴展組態檔案:
# Auto-registration (if not passed, uses "false") schemaAutoRegistrationEnabled=true # Schema name (if not passed, uses topic name) schema.name=my-schema # Registry name (if not passed, uses "default-registry") registry.name=my-registry # Cache settings cacheTimeToLiveMillis=86400000 cacheSize=200 # Compatibility mode (if not passed, uses BACKWARD) compatibility=FULL # Registry description description=This registry is used for several purposes. # Compression (if not passed, records are sent uncompressed) compressionType=ZLIB
支援的資料格式
Java 和 C# 實作都支援相同的資料格式:
AVRO:Apache Avro 二進位格式
JSON:JSON 結構描述格式
PROTOBUF:通訊協定緩衝區格式
備註
若要開始使用程式庫,請造訪 https://https://www.nuget.org/packages/AWS.Glue.SchemaRegistry
來源碼可在 https://https://github.com/awslabs/aws-glue-schema-registry
取得