Menggunakan pendaftar skema dengan sumber acara Kafka di Lambda - AWS Lambda

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan pendaftar skema dengan sumber acara Kafka di Lambda

Pendaftaran skema membantu Anda menentukan dan mengelola skema aliran data. Sebuah skema mendefinisikan struktur dan format catatan data. Dalam konteks pemetaan sumber acara Kafka, Anda dapat mengonfigurasi registri skema untuk memvalidasi struktur dan format pesan Kafka terhadap skema yang telah ditentukan sebelum mencapai fungsi Lambda Anda. Ini menambahkan lapisan tata kelola data ke aplikasi Anda dan memungkinkan Anda mengelola format data secara efisien, memastikan kepatuhan skema, dan mengoptimalkan biaya melalui penyaringan acara.

Fitur ini berfungsi dengan semua bahasa pemrograman, tetapi pertimbangkan poin-poin penting ini:

  • Powertools untuk Lambda menyediakan dukungan khusus untuk Java, Python, TypeScript dan, menjaga konsistensi dengan pola pengembangan Kafka yang ada dan memungkinkan akses langsung ke objek bisnis tanpa kode deserialisasi khusus

  • Fitur ini hanya tersedia untuk pemetaan sumber peristiwa menggunakan mode yang disediakan. Registri skema tidak mendukung pemetaan sumber peristiwa dalam mode sesuai permintaan. Jika Anda menggunakan mode yang disediakan dan memiliki registri skema yang dikonfigurasi, Anda tidak dapat mengubah ke mode sesuai permintaan kecuali Anda menghapus konfigurasi registri skema terlebih dahulu. Untuk informasi selengkapnya, lihat Mode yang disediakan

  • Anda hanya dapat mengonfigurasi satu registri skema per pemetaan sumber peristiwa (ESM). Menggunakan registri skema dengan sumber acara Kafka Anda dapat meningkatkan penggunaan Lambda Event Poller Unit (EPU) Anda, yang merupakan dimensi harga untuk mode Provisioned.

Opsi registri skema

Lambda mendukung opsi registri skema berikut:

Registri skema Anda mendukung memvalidasi pesan dalam format data berikut:

  • Apache Avro

  • Protokol Buffer (Protobuf)

  • Skema JSON (JSON-SE)

Untuk menggunakan registri skema, pertama-tama pastikan bahwa pemetaan sumber peristiwa Anda dalam mode yang disediakan. Saat Anda menggunakan registri skema, Lambda menambahkan metadata tentang skema ke payload. Untuk informasi selengkapnya, lihat Format muatan dan perilaku deserialisasi.

Bagaimana Lambda melakukan validasi skema untuk pesan Kafka

Saat Anda mengonfigurasi registri skema, Lambda melakukan langkah-langkah berikut untuk setiap pesan Kafka:

  1. Lambda melakukan polling catatan Kafka dari cluster Anda.

  2. Lambda memvalidasi atribut pesan yang dipilih dalam catatan terhadap skema tertentu di registri skema Anda.

    • Jika skema yang terkait dengan pesan tidak ditemukan di registri, Lambda mengirim pesan ke DLQ dengan kode alasan. SCHEMA_NOT_FOUND

  3. Lambda deserialisasi pesan sesuai dengan konfigurasi registri skema untuk memvalidasi pesan. Jika pemfilteran peristiwa dikonfigurasi, Lambda kemudian melakukan pemfilteran berdasarkan kriteria filter yang dikonfigurasi.

    • Jika deserialisasi gagal, Lambda mengirim pesan ke DLQ dengan kode alasan. DESERIALIZATION_ERROR Jika tidak ada DLQ yang dikonfigurasi, Lambda akan menghapus pesan.

  4. Jika pesan divalidasi oleh registri skema, dan tidak disaring berdasarkan kriteria filter, Lambda akan memanggil fungsi Anda dengan pesan tersebut.

Fitur ini dimaksudkan untuk memvalidasi pesan yang sudah diproduksi menggunakan klien Kafka yang terintegrasi dengan registri skema. Kami merekomendasikan untuk mengonfigurasi produsen Kafka Anda untuk bekerja dengan registri skema Anda untuk membuat pesan yang diformat dengan benar.

Mengkonfigurasi registri skema Kafka

Langkah-langkah konsol berikut menambahkan konfigurasi registri skema Kafka ke pemetaan sumber acara Anda.

Untuk menambahkan konfigurasi registri skema Kafka ke pemetaan sumber acara Anda (konsol)
  1. Buka halaman Fungsi konsol Lambda.

  2. Pilih Konfigurasi.

  3. Pilih Pemicu.

  4. Pilih pemetaan sumber peristiwa Kafka yang ingin Anda konfigurasikan registri skema, dan pilih Edit.

  5. Di bawah konfigurasi poller acara, pilih Konfigurasi registri skema. Pemetaan sumber acara Anda harus dalam mode yang disediakan untuk melihat opsi ini.

  6. Untuk URI registri Skema, masukkan ARN registri skema AWS Glue Anda, atau URL HTTPS dari registri skema Confluent Cloud atau Self-Managed Confluent Schema Registry.

  7. Langkah-langkah konfigurasi berikut memberi tahu Lambda cara mengakses registri skema Anda. Untuk informasi selengkapnya, lihat Metode otentikasi untuk registri skema Anda.

    • Untuk tipe konfigurasi Access, pilih jenis otentikasi yang digunakan Lambda untuk mengakses registri skema Anda.

    • Untuk URI konfigurasi Access, masukkan ARN rahasia Secrets Manager untuk mengautentikasi dengan registri skema Anda, jika berlaku. Pastikan peran eksekusi fungsi Anda berisi izin yang benar.

  8. Bidang Enkripsi hanya berlaku jika registri skema Anda ditandatangani oleh Otoritas Sertifikat (CA) pribadi atau otoritas sertifikat (CA) yang tidak ada di toko kepercayaan Lambda.. Jika berlaku, berikan kunci rahasia yang berisi sertifikat CA pribadi yang digunakan oleh registri skema Anda untuk enkripsi TLS.

  9. Untuk format rekaman Peristiwa, pilih bagaimana Anda ingin Lambda mengirimkan catatan fungsi Anda setelah validasi skema. Untuk informasi selengkapnya, lihat Contoh format payload.

    • Jika Anda memilih JSON, Lambda memberikan atribut yang Anda pilih dalam atribut validasi Skema di bawah ini dalam format JSON standar. Untuk atribut yang tidak Anda pilih, Lambda mengirimkannya apa adanya.

    • Jika Anda memilih SUMBER, Lambda memberikan atribut yang Anda pilih dalam atribut validasi Skema di bawah ini dalam format sumber aslinya.

  10. Untuk atribut validasi Skema, pilih atribut pesan yang Anda ingin Lambda untuk memvalidasi dan deserialisasi menggunakan registri skema Anda. Anda harus memilih setidaknya satu dari KEY atau VALUE. Jika Anda memilih JSON untuk format rekaman peristiwa, Lambda juga melakukan deserialisasi atribut pesan yang dipilih sebelum mengirimnya ke fungsi Anda. Untuk informasi selengkapnya, lihat Format muatan dan perilaku deserialisasi.

  11. Pilih Simpan.

Anda juga dapat menggunakan API Lambda untuk membuat atau memperbarui pemetaan sumber peristiwa Anda dengan konfigurasi registri skema. Contoh berikut menunjukkan cara mengonfigurasi registri skema AWS Glue atau Confluent menggunakan AWS CLI, yang sesuai dengan operasi CreateEventSourceMappingAPI UpdateEventSourceMappingdan API di Referensi API:AWS Lambda

penting

Jika Anda memperbarui kolom konfigurasi registri skema apa pun menggunakan AWS CLI atau update-event-source-mapping API, Anda harus memperbarui semua bidang konfigurasi registri skema.

Create Event Source Mapping
aws lambda create-event-source-mapping \ --function-name my-schema-validator-function \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \ --topics my-kafka-topic \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update AWS Glue Schema Registry
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry with Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry without Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Remove Schema Registry Configuration

Untuk menghapus konfigurasi registri skema dari pemetaan sumber peristiwa, Anda dapat menggunakan perintah CLI UpdateEventSourceMappingdi Referensi API.AWS Lambda

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : {} }'

Pemfilteran untuk Avro dan Protobuf

Saat menggunakan format Avro atau Protobuf dengan registri skema, Anda dapat menerapkan pemfilteran acara ke fungsi Lambda Anda. Pola filter diterapkan pada representasi JSON klasik deserialisasi dari data Anda setelah validasi skema. Misalnya, dengan skema Avro yang menentukan detail produk termasuk harga, Anda dapat memfilter pesan berdasarkan nilai harga:

catatan

Saat dideserialisasi, Avro dikonversi ke JSON standar, yang berarti tidak dapat langsung dikonversi kembali ke objek Avro. Jika Anda perlu mengonversi ke objek Avro, gunakan format SOURCE sebagai gantinya.

Untuk deserialisasi Protobuf, nama bidang di JSON yang dihasilkan cocok dengan yang ditentukan dalam skema Anda, daripada dikonversi ke kasus unta seperti yang biasanya dilakukan Protobuf. Ingatlah hal ini saat membuat pola penyaringan.

aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'

Dalam contoh ini, pola filter menganalisis value objek, mencocokkan pesan dengan "value1" dan field_1 field_2 dengan"value2". Kriteria filter dievaluasi terhadap data deserialisasi, setelah Lambda mengonversi pesan dari format Avro ke JSON.

Untuk informasi lebih rinci tentang pemfilteran acara, lihat Pemfilteran acara Lambda.

Format muatan dan perilaku deserialisasi

Saat menggunakan registri skema, Lambda mengirimkan muatan akhir ke fungsi Anda dalam format yang mirip dengan muatan acara biasa, dengan beberapa bidang tambahan. Bidang tambahan tergantung pada SchemaValidationConfigs parameter. Untuk setiap atribut yang Anda pilih untuk validasi (kunci atau nilai), Lambda menambahkan metadata skema yang sesuai ke payload.

catatan

Anda harus memperbarui aws-lambda-java-eventske versi 3.16.0 atau lebih tinggi untuk menggunakan bidang metadata skema.

Misalnya, jika Anda memvalidasi value bidang, Lambda menambahkan bidang yang valueSchemaMetadata dipanggil ke payload Anda. Demikian pula, untuk key bidang, Lambda menambahkan bidang yang disebut. keySchemaMetadata Metadata ini berisi informasi tentang format data dan ID skema yang digunakan untuk validasi:

"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }

EventRecordFormatParameter dapat diatur ke salah satu JSON atauSOURCE, yang menentukan cara Lambda menangani data yang divalidasi skema sebelum mengirimkannya ke fungsi Anda. Setiap opsi menyediakan kemampuan pemrosesan yang berbeda:

  • JSON- Lambda deserialisasi atribut yang divalidasi ke dalam format JSON standar, membuat data siap untuk digunakan langsung dalam bahasa dengan dukungan JSON asli. Format ini sangat ideal ketika Anda tidak perlu mempertahankan format biner asli atau bekerja dengan kelas yang dihasilkan.

  • SOURCE- Lambda mempertahankan format biner asli data sebagai string yang dikodekan Base64, memungkinkan konversi langsung ke objek Avro atau Protobuf. Format ini sangat penting saat bekerja dengan bahasa yang diketik dengan kuat atau ketika Anda perlu mempertahankan kemampuan penuh skema Avro atau Protobuf.

Berdasarkan karakteristik format dan pertimbangan khusus bahasa ini, kami merekomendasikan format berikut:

Format yang disarankan berdasarkan bahasa pemrograman
Bahasa Avro Protobuf JSON
Java SUMBER SUMBER SUMBER
Python JSON JSON JSON
NodeJS JSON JSON JSON
.NET SUMBER SUMBER SUMBER
Lainnya JSON JSON JSON

Bagian berikut menjelaskan format ini secara rinci dan memberikan contoh muatan untuk setiap format.

Format JSON

Jika Anda memilih JSON sebagaiEventRecordFormat, Lambda memvalidasi dan deserialisasi atribut pesan yang telah Anda pilih di SchemaValidationConfigs bidang (atribut). key and/or value Lambda memberikan atribut yang dipilih ini sebagai string yang dikodekan base64 dari representasi JSON standar mereka dalam fungsi Anda.

catatan

Saat dideserialisasi, Avro dikonversi ke JSON standar, yang berarti tidak dapat langsung dikonversi kembali ke objek Avro. Jika Anda perlu mengonversi ke objek Avro, gunakan format SOURCE sebagai gantinya.

Untuk deserialisasi Protobuf, nama bidang di JSON yang dihasilkan cocok dengan yang ditentukan dalam skema Anda, daripada dikonversi ke kasus unta seperti yang biasanya dilakukan Protobuf. Ingatlah hal ini saat membuat pola penyaringan.

Berikut ini menunjukkan contoh payload, dengan asumsi Anda memilih JSON sebagaiEventRecordFormat, dan kedua value atribut key dan sebagai: SchemaValidationConfigs

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Dalam contoh ini:

  • Keduanya key dan value merupakan string yang dikodekan base64 dari representasi JSON mereka setelah deserialisasi.

  • Lambda menyertakan metadata skema untuk kedua atribut di dan. keySchemaMetadata valueSchemaMetadata

  • Fungsi Anda dapat memecahkan kode key dan value string untuk mengakses data JSON deserialized.

Format JSON direkomendasikan untuk bahasa yang tidak diketik dengan kuat, seperti Python atau Node.js. Bahasa-bahasa ini memiliki dukungan asli untuk mengubah JSON menjadi objek.

Format sumber

Jika Anda memilih SOURCE sebagaiEventRecordFormat, Lambda masih memvalidasi catatan terhadap registri skema, tetapi mengirimkan data biner asli ke fungsi Anda tanpa deserialisasi. Data biner ini dikirimkan sebagai string yang dikodekan Base64 dari data byte asli, dengan metadata yang ditambahkan produsen dihapus. Akibatnya, Anda dapat langsung mengonversi data biner mentah menjadi objek Avro dan Protobuf dalam kode fungsi Anda. Kami merekomendasikan menggunakan Powertools untuk AWS Lambda, yang akan deserialisasi data biner mentah dan memberi Anda objek Avro dan Protobuf secara langsung.

Misalnya, jika Anda mengonfigurasi Lambda untuk memvalidasi value atribut key dan atribut namun menggunakan SOURCE formatnya, fungsi Anda akan menerima payload seperti ini:

{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Dalam contoh ini:

  • Keduanya key dan value berisi data biner asli sebagai string yang dikodekan Base64.

  • Fungsi Anda perlu menangani deserialisasi menggunakan pustaka yang sesuai.

Memilih SOURCE untuk EventRecordFormat disarankan jika Anda menggunakan objek yang dihasilkan Avro atau yang dihasilkan Protobuf, terutama dengan fungsi Java. Ini karena Java diketik dengan kuat, dan membutuhkan deserializer khusus untuk format Avro dan Protobuf. Dalam kode fungsi Anda, Anda dapat menggunakan pustaka Avro atau Protobuf pilihan Anda untuk deserialisasi data.

Bekerja dengan data deserialisasi dalam fungsi Lambda

Powertools untuk AWS Lambda membantu Anda deserialisasi catatan Kafka dalam kode fungsi Anda berdasarkan format yang Anda gunakan. Utilitas ini menyederhanakan bekerja dengan catatan Kafka dengan menangani konversi data dan menyediakan ready-to-use objek.

Untuk menggunakan Powertools untuk AWS Lambda fungsi Anda, Anda perlu menambahkan Powertools AWS Lambda baik sebagai lapisan atau memasukkannya sebagai dependensi saat membangun fungsi Lambda Anda. Untuk petunjuk penyiapan dan informasi selengkapnya, lihat Powertools untuk AWS Lambda dokumentasi untuk bahasa pilihan Anda:

catatan

Saat bekerja dengan integrasi registri skema, Anda dapat memilih SOURCE atau JSON memformat. Setiap opsi mendukung format serialisasi yang berbeda seperti yang ditunjukkan di bawah ini:

Format Mendukung

SUMBER

Avro dan Protobuf (menggunakan integrasi Lambda Schema Registry)

JSON

Data JSON

Saat menggunakan JSON format SOURCE or, Anda dapat menggunakan Powertools AWS untuk membantu deserialisasi data dalam kode fungsi Anda. Berikut adalah contoh cara menangani format data yang berbeda:

AVRO

Contoh Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.avro.AvroProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_AVRO) public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) { for (ConsumerRecord<String, AvroProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); AvroProduct product = consumerRecord.value(); LOGGER.info("AvroProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Contoh Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") value_schema_str = open("customer_profile.avsc", "r").read() schema_config = SchemaConfig( value_schema_type="AVRO", value_schema=value_schema_str) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript contoh:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; const schema = '{ "type": "record", "name": "Product", "fields": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "price", "type": "double" } ] }'; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'avro', schema: schema, }, } );

Contoh .NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Avro; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
PROTOBUF

Contoh Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.protobuf.ProtobufProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class ProtobufDeserializationFunction implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_PROTOBUF) public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) { for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); ProtobufProduct product = consumerRecord.value(); LOGGER.info("ProtobufProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Contoh Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger from user_pb2 import User # protobuf generated class logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig( value_schema_type="PROTOBUF", value_schema=User) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript contoh:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; import { Product } from './product.generated.js'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); } }, { value: { type: 'protobuf', schema: Product, }, } );

Contoh .NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Protobuf; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
JSON

Contoh Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_JSON) public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) { for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) { LOGGER.info("ConsumerRecord: {}", consumerRecord); Product product = consumerRecord.value(); LOGGER.info("Product: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Contoh Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig(value_schema_type="JSON") @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript contoh:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'json', }, } );

Contoh .NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Json; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))] namespace JsonClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }

Metode otentikasi untuk registri skema Anda

Untuk menggunakan registri skema, Lambda harus dapat mengaksesnya dengan aman. Jika Anda bekerja dengan registri AWS Glue skema, Lambda bergantung pada otentikasi IAM. Ini berarti bahwa peran eksekusi fungsi Anda harus memiliki izin berikut untuk mengakses AWS Glue registri:

Contoh kebijakan IAM yang diperlukan:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetRegistry", "glue:GetSchemaVersion" ], "Resource": [ "*" ] } ] }
catatan

Untuk pendaftar AWS Glue skema, jika Anda menyediakan AccessConfigs registri AWS Glue , Lambda akan mengembalikan pengecualian validasi.

Jika Anda bekerja dengan registri skema Confluent, Anda dapat memilih salah satu dari tiga metode otentikasi yang didukung untuk Type parameter objek Anda: KafkaSchemaRegistryAccessConfig

  • BASIC_AUTH — Lambda menggunakan nama pengguna dan kata sandi atau otentikasi Kunci API dan Rahasia API untuk mengakses registri Anda. Jika Anda memilih opsi ini, berikan Secrets Manager ARN yang berisi kredensyal Anda di bidang URI.

  • CLIENT_CERTIFICATE_TLS_AUTH — Lambda menggunakan otentikasi TLS timbal balik dengan sertifikat klien. Untuk menggunakan opsi ini, Lambda memerlukan akses ke sertifikat dan kunci pribadi. Berikan Secrets Manager ARN yang berisi kredensyal ini di bidang URI.

  • NO_AUTH - Sertifikat CA publik harus ditandatangani oleh otoritas sertifikat (CA) yang ada di toko kepercayaan Lambda. Untuk sertifikat CA/Self-signed privat, Anda mengonfigurasi sertifikat CA root server. Untuk menggunakan opsi ini, hilangkan AccessConfigs parameter.

Selain itu, jika Lambda memerlukan akses ke sertifikat CA pribadi untuk memverifikasi sertifikat TLS registri skema Anda, pilih SERVER_ROOT_CA_CERT sebagai Type dan berikan Secrets Manager ARN ke sertifikat di bidang URI.

catatan

Untuk mengkonfigurasi SERVER_ROOT_CA_CERT opsi di konsol, berikan ARN rahasia yang berisi sertifikat di bidang Enkripsi.

Konfigurasi otentikasi untuk registri skema Anda terpisah dari otentikasi apa pun yang telah Anda konfigurasi untuk cluster Kafka Anda. Anda harus mengkonfigurasi keduanya secara terpisah, bahkan jika mereka menggunakan metode otentikasi serupa.

Penanganan kesalahan dan pemecahan masalah untuk masalah registri skema

Saat menggunakan registri skema dengan sumber acara MSK Amazon Anda, Anda mungkin mengalami berbagai kesalahan. Bagian ini memberikan panduan tentang masalah umum dan cara mengatasinya.

Kesalahan konfigurasi

Kesalahan ini terjadi saat mengatur konfigurasi registri skema Anda.

Diperlukan mode yang disediakan

Pesan kesalahan: SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.

Resolusi: Aktifkan mode yang disediakan untuk pemetaan sumber acara Anda dengan mengonfigurasi parameter di. MinimumPollers ProvisionedPollerConfig

URL registri skema tidak valid

Pesan kesalahan: Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.

Resolusi: Berikan URL HTTPS yang valid untuk Confluent Schema Registry atau ARN yang valid untuk Schema Registry. AWS Glue

Format rekaman acara tidak valid atau hilang

Pesan kesalahan: EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.

Resolusi: Tentukan SOURCE atau JSON sebagai EventRecordFormat konfigurasi registri skema Anda.

Atribut validasi duplikat

Pesan kesalahan: Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.

Resolusi: Hapus duplikat KEY atau atribut VALUE dari atribut Anda SchemaValidationConfigs. Setiap jenis atribut hanya dapat muncul sekali.

Konfigurasi validasi tidak ada

Pesan kesalahan: SchemaValidationConfigs is a required field for SchemaRegistryConfig.

Resolusi: Tambahkan SchemaValidationConfigs ke konfigurasi Anda, tentukan setidaknya satu atribut validasi (KEY atau VALUE).

Kesalahan akses dan izin

Kesalahan ini terjadi ketika Lambda tidak dapat mengakses registri skema karena masalah izin atau otentikasi.

AWS Glue Akses registri skema ditolak

Pesan kesalahan: Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.

Resolusi: Tambahkan izin yang diperlukan (glue:GetRegistrydanglue:GetSchemaVersion) ke peran eksekusi fungsi Anda.

Akses Registri Skema Konfluen ditolak

Pesan kesalahan: Cannot access Confluent Schema with the provided access configuration.

Resolusi: Verifikasi bahwa kredensyal otentikasi Anda (disimpan di Secrets Manager) sudah benar dan memiliki izin yang diperlukan untuk mengakses registri skema.

Registri AWS Glue Skema Lintas Akun

Pesan kesalahan: Cross-account Glue Schema Registry ARN not supported.

Resolusi: Gunakan Registri AWS Glue Skema yang ada di AWS akun yang sama dengan fungsi Lambda Anda.

Registri AWS Glue Skema Lintas Wilayah

Pesan kesalahan: Cross-region Glue Schema Registry ARN not supported.

Resolusi: Gunakan Registri AWS Glue Skema yang berada di wilayah yang sama dengan fungsi Lambda Anda.

Masalah akses rahasia

Pesan kesalahan: Lambda received InvalidRequestException from Secrets Manager.

Resolusi: Verifikasi bahwa peran eksekusi fungsi Anda memiliki izin untuk mengakses rahasia dan bahwa rahasia tidak dienkripsi dengan AWS KMS kunci default jika mengakses dari akun yang berbeda.

Kesalahan koneksi

Kesalahan ini terjadi ketika Lambda tidak dapat membuat koneksi ke registri skema.

Masalah konektivitas VPC

Pesan kesalahan: Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.

Resolusi: Konfigurasikan jaringan VPC Anda untuk memungkinkan koneksi ke registri skema menggunakan AWS PrivateLink, NAT Gateway, atau VPC peering.

Kegagalan jabat tangan TLS

Pesan kesalahan: Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.

Resolusi: Verifikasi bahwa sertifikat CA dan sertifikat klien Anda (untuk mTL) sudah benar dan dikonfigurasi dengan benar di Secrets Manager.

Throttling

Pesan kesalahan: Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.

Resolusi: Tingkatkan batas tarif API untuk registri skema Anda atau kurangi laju permintaan dari aplikasi Anda.

Kesalahan registri skema yang dikelola sendiri

Pesan kesalahan: Lambda received an internal server an unexpected error from the provided self-managed schema registry.

Resolusi: Periksa kesehatan dan konfigurasi server registri skema yang dikelola sendiri.