Using schema registries with Kafka event sources in Lambda
Schema registries help you define and manage data stream schemas. A schema defines the structure and format of a data record. In the context of Kafka event source mappings, you can configure a schema registry to validate the structure and format of Kafka messages against predefined schemas before they reach your Lambda function. This adds a layer of data governance to your application and allows you to efficiently manage data formats, ensure schema compliance, and optimize costs through event filtering.
This feature works with all programming languages, but consider these important points:
Powertools for Lambda provides specific support for Java, Python, and TypeScript, maintaining consistency with existing Kafka development patterns and allowing direct access to business objects without custom deserialization code
This feature is only available for event source mappings using provisioned mode. Schema registry doesn't support event source mappings in on-demand mode. If you're using provisioned mode and you have a schema registry configured, you can't change to on-demand mode unless you remove your schema registry configuration first. For more information, see Provisioned mode
You can configure only one schema registry per event source mapping (ESM). Using a schema registry with your Kafka event source may increase your Lambda Event Poller Unit (EPU) usage, which is a pricing dimension for Provisioned mode.
Schema registry options
Lambda supports the following schema registry options:
Your schema registry supports validating messages in the following data formats:
-
Apache Avro
-
Protocol Buffers (Protobuf)
-
JSON Schema (JSON-SE)
To use a schema registry, first ensure that your event source mapping is in provisioned mode. When you use a schema registry, Lambda adds metadata about the schema to the payload. For more information, see Payload formats and deserialization behavior.
How Lambda performs schema validation for Kafka messages
When you configure a schema registry, Lambda performs the following steps for each Kafka message:
-
Lambda polls the Kafka record from your cluster.
-
Lambda validates selected message attributes in the record against a specific schema in your schema registry.
-
If the schema associated with the message is not found in the registry, Lambda sends the message to a DLQ with reason code
SCHEMA_NOT_FOUND
.
-
-
Lambda deserializes the message according to the schema registry configuration to validate the message. If event filtering is configured, Lambda then performs filtering based on the configured filter criteria.
-
If deserialization fails, Lambda sends the message to a DLQ with reason code
DESERIALIZATION_ERROR
. If no DLQ is configured, Lambda drops the message.
-
-
If the message is validated by the schema registry, and isn't filtered out by your filter criteria, Lambda invokes your function with the message.
This feature is intended to validate messages that are already produced using Kafka clients integrated with a schema registry. We recommend configuring your Kafka producers to work with your schema registry to create properly formatted messages.
Configuring a Kafka schema registry
The following console steps add a Kafka schema registry configuration to your event source mapping.
To add a Kafka schema registry configuration to your event source mapping (console)
-
Open the Function page
of the Lambda console. -
Choose Configuration.
-
Choose Triggers.
-
Select the Kafka event source mapping that you want to configure a schema registry for, and choose Edit.
-
Under Event poller configuration, choose Configure schema registry. Your event source mapping must be in provisioned mode to see this option.
-
For Schema registry URI, enter the ARN of your AWS Glue schema registry, or the HTTPS URL of your Confluent Cloud schema registry or Self-Managed Confluent Schema Registry.
-
The following configuration steps tell Lambda how to access your schema registry. For more information, see Authentication methods for your schema registry.
-
For Access configuration type, choose the type of authentication Lambda uses to access your schema registry.
-
For Access configuration URI, enter the ARN of the Secrets Manager secret to authenticate with your schema registry, if applicable. Ensure that your function's execution role contains the correct permissions.
-
-
The Encryption field applies only if your schema registry is signed by a private Certificate Authority (CA) or a certificate authority (CA) that's not in the Lambda trust store.. If applicable, provide the secret key containing the private CA certificate used by your schema registry for TLS encryption.
-
For Event record format, choose how you want Lambda to deliver the records your function after schema validation. For more information, see Payload format examples.
-
If you choose JSON, Lambda delivers the attributes that you select in the Schema validation attribute below in standard JSON format. For the attributes that you don't select, Lambda delivers them as-is.
-
If you choose SOURCE, Lambda delivers the attributes that you select in the Schema validation attribute below in its original source format.
-
-
For Schema validation attribute, select the message attributes that you want Lambda to validate and deserialize using your schema registry. You must select at least one of either KEY or VALUE. If you chose JSON for event record format, Lambda also deserializes the selected message attributes before sending them to your function. For more information, see Payload formats and deserialization behavior.
-
Choose Save.
You can also use the Lambda API to create or update your event source mapping with a schema registry configuration. The following examples demonstrate how to configure an AWS Glue or Confluent schema registry using the AWS CLI, which corresponds to the UpdateEventSourceMapping and CreateEventSourceMapping API operations in the AWS Lambda API Reference:
Important
If you are updating any schema registry configuration field using the AWS CLI or the update-event-source-mapping
API, you must update all the fields of schema registry configuration.
Filtering for Avro and Protobuf
When using Avro or Protobuf formats with a schema registry, you can apply event filtering to your Lambda function. The filter patterns are applied to the deserialized classic JSON representation of your data after schema validation. For example, with an Avro schema defining product details including price, you can filter messages based on the price value:
Note
When being deserialized, Avro is converted to standard JSON, which means it cannot be directly converted back to an Avro object. If you need to convert to an Avro object, use the SOURCE format instead.
For Protobuf deserialization, field names in the resulting JSON match those defined in your schema, rather than being converted to camel case as Protobuf typically does. Keep this in mind when creating filtering patterns.
aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'
In this example, the filter pattern analyzes the value
object, matching messages in field_1
with "value1"
and field_2
with "value2"
.
The filter criteria are evaluated against the deserialized data, after Lambda converts the message from Avro format to JSON.
For more detailed information on event filtering, see Lambda event filtering.
Payload formats and deserialization behavior
When using a schema registry, Lambda delivers the final payload to your function in a format similar to the regular event payload, with some additional fields.
The additional fields depend on the SchemaValidationConfigs
parameter. For each attribute that you select for validation (key or value), Lambda adds corresponding schema metadata to the payload.
Note
You must update your aws-lambda-java-events
For example, if you validate the value
field, Lambda adds a field called valueSchemaMetadata
to your payload. Similarly, for the key
field, Lambda adds a field called keySchemaMetadata
. This metadata contains information about the data format and the schema ID used for validation:
"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }
The EventRecordFormat
parameter can be set to either JSON
or SOURCE
, which determines how Lambda handles schema-validated data before delivering it to your function. Each option provides different processing capabilities:
-
JSON
- Lambda deserializes the validated attributes into standard JSON format, making the data ready for direct use in languages with native JSON support. This format is ideal when you don't need to preserve the original binary format or work with generated classes. -
SOURCE
- Lambda preserves the original binary format of the data as a Base64-encoded string, allowing direct conversion to Avro or Protobuf objects. This format is essential when working with strongly-typed languages or when you need to maintain the full capabilities of Avro or Protobuf schemas.
Based on these format characteristics and language-specific considerations, we recommend the following formats:
Language | Avro | Protobuf | JSON |
---|---|---|---|
Java | SOURCE | SOURCE | SOURCE |
Python | JSON | JSON | JSON |
NodeJS | JSON | JSON | JSON |
.NET | SOURCE | SOURCE | SOURCE |
Others | JSON | JSON | JSON |
The following sections describe these formats in detail and provide example payloads for each format.
JSON format
If you choose JSON
as the EventRecordFormat
, Lambda validates and deserializes the message attributes that you've selected in the SchemaValidationConfigs
field (the key
and/or value
attributes). Lambda delivers these selected attributes as base64-encoded strings of their standard JSON representation in your function.
Note
When being deserialized, Avro is converted to standard JSON, which means it cannot be directly converted back to an Avro object. If you need to convert to an Avro object, use the SOURCE format instead.
For Protobuf deserialization, field names in the resulting JSON match those defined in your schema, rather than being converted to camel case as Protobuf typically does. Keep this in mind when creating filtering patterns.
The following shows an example payload, assuming you choose JSON
as the EventRecordFormat
, and both the key
and value
attributes as SchemaValidationConfigs
:
{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }
In this example:
-
Both
key
andvalue
are base64-encoded strings of their JSON representation after deserialization. -
Lambda includes schema metadata for both attributes in
keySchemaMetadata
andvalueSchemaMetadata
. -
Your function can decode the
key
andvalue
strings to access the deserialized JSON data.
The JSON format is recommended for languages that aren't strongly typed, such as Python or Node.js. These languages have native support for converting JSON into objects.
Source format
If you choose SOURCE
as the EventRecordFormat
, Lambda still validates the record against the schema registry, but delivers the original binary data to your function without deserialization.
This binary data is delivered as a Base64 encoded string of the original byte data, with producer-appended metadata removed.
As a result, you can directly convert the raw binary data into Avro and Protobuf objects within your function code.
We recommend using Powertools for AWS Lambda, which will deserialize the raw binary data and give you Avro and Protobuf objects directly.
For example, if you configure Lambda to validate both the key
and value
attributes but use the SOURCE
format, your function receives a payload like this:
{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }
In this example:
-
Both
key
andvalue
contain the original binary data as Base64 encoded strings. -
Your function needs to handle deserialization using the appropriate libraries.
Choosing SOURCE
for EventRecordFormat
is recommended if you're using Avro-generated or Protobuf-generated objects, especially with Java functions. This is because Java is strongly typed,
and requires specific deserializers for Avro and Protobuf formats. In your function code, you can use your preferred Avro or Protobuf library to deserialize the data.
Working with deserialized data in Lambda functions
Powertools for AWS Lambda helps you deserialize Kafka records in your function code based on the format you use. This utility simplifies working with Kafka records by handling data conversion and providing ready-to-use objects.
To use Powertools for AWS Lambda in your function, you need to add Powertools for AWS Lambda either as a layer or include it as a dependency when building your Lambda function. For setup instructions and more information, see the Powertools for AWS Lambda documentation for your preferred language:
Note
When working with schema registry integration, you can choose SOURCE
or JSON
format. Each option supports different serialization formats as shown below:
Format | Supports |
---|---|
SOURCE |
Avro and Protobuf (using Lambda Schema Registry integration) |
JSON |
JSON data |
When using the SOURCE
or JSON
format, you can use Powertools for AWS to help deserialize the data in your function code. Here are examples of how to handle different data formats:
Authentication methods for your schema registry
To use a schema registry, Lambda needs to be able to securely access it. If you're working with an AWS Glue schema registry, Lambda relies on IAM authentication. This means that your function's execution role must have the following permissions to access the AWS Glue registry:
-
GetRegistry in the AWS Glue Web API Reference
-
GetSchemaVersion in the AWS Glue Web API Reference
Example of the required IAM policy:
Note
For AWS Glue schema registries, if you provide AccessConfigs
for a AWS Glue registry, Lambda will return a validation exception.
If you're working with a Confluent schema registry, you can choose one of three supported authentication methods for the Type
parameter of your KafkaSchemaRegistryAccessConfig object:
-
BASIC_AUTH — Lambda uses username and password or API Key and API Secret authentication to access your registry. If you choose this option, provide the Secrets Manager ARN containing your credentials in the URI field.
-
CLIENT_CERTIFICATE_TLS_AUTH — Lambda uses mutual TLS authentication with client certificates. To use this option, Lambda needs access to both the certificate and the private key. Provide the Secrets Manager ARN containing these credentials in the URI field.
-
NO_AUTH — The public CA certificate must be signed by a certificate authority (CA) that's in the Lambda trust store. For a private CA/self-signed certificate, you configure the server root CA certificate. To use this option, omit the
AccessConfigs
parameter.
Additionally, if Lambda needs access to a private CA certificate to verify your schema registry's TLS certificate, choose SERVER_ROOT_CA_CERT
as the Type
and provide the Secrets Manager ARN to the certificate in the URI field.
Note
To configure the SERVER_ROOT_CA_CERT
option in the console, provide the secret ARN containing the certificate in the Encryption field.
The authentication configuration for your schema registry is separate from any authentication you've configured for your Kafka cluster. You must configure both separately, even if they use similar authentication methods.
Error handling and troubleshooting for schema registry issues
When using a schema registry with your Amazon MSK event source, you may encounter various errors. This section provides guidance on common issues and how to resolve them.
Configuration errors
These errors occur when setting up your schema registry configuration.
- Provisioned mode required
-
Error message:
SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.
Resolution: Enable provisioned mode for your event source mapping by configuring the
MinimumPollers
parameter inProvisionedPollerConfig
. - Invalid schema registry URL
-
Error message:
Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.
Resolution: Provide a valid HTTPS URL for Confluent Schema Registry or a valid ARN for AWS Glue Schema Registry.
- Invalid or missing event record format
-
Error message:
EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.
Resolution: Specify either SOURCE or JSON as the EventRecordFormat in your schema registry configuration.
- Duplicate validation attributes
-
Error message:
Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.
Resolution: Remove duplicate KEY or VALUE attributes from your SchemaValidationConfigs. Each attribute type can only appear once.
- Missing validation configuration
-
Error message:
SchemaValidationConfigs is a required field for SchemaRegistryConfig.
Resolution: Add SchemaValidationConfigs to your configuration, specifying at least one validation attribute (KEY or VALUE).
Access and permission errors
These errors occur when Lambda cannot access the schema registry due to permission or authentication issues.
- AWS Glue Schema Registry access denied
-
Error message:
Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.
Resolution: Add the required permissions (
glue:GetRegistry
andglue:GetSchemaVersion
) to your function's execution role. - Confluent Schema Registry access denied
-
Error message:
Cannot access Confluent Schema with the provided access configuration.
Resolution: Verify that your authentication credentials (stored in Secrets Manager) are correct and have the necessary permissions to access the schema registry.
- Cross-account AWS Glue Schema Registry
-
Error message:
Cross-account Glue Schema Registry ARN not supported.
Resolution: Use a AWS Glue Schema Registry that's in the same AWS account as your Lambda function.
- Cross-region AWS Glue Schema Registry
-
Error message:
Cross-region Glue Schema Registry ARN not supported.
Resolution: Use a AWS Glue Schema Registry that's in the same region as your Lambda function.
- Secret access issues
-
Error message:
Lambda received InvalidRequestException from Secrets Manager.
Resolution: Verify that your function's execution role has permission to access the secret and that the secret is not encrypted with a default AWS KMS key if accessing from a different account.
Connection errors
These errors occur when Lambda cannot establish a connection to the schema registry.
- VPC connectivity issues
-
Error message:
Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.
Resolution: Configure your VPC networking to allow connections to the schema registry using AWS PrivateLink, a NAT Gateway, or VPC peering.
- TLS handshake failure
-
Error message:
Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.
Resolution: Verify that your CA certificates and client certificates (for mTLS) are correct and properly configured in Secrets Manager.
- Throttling
-
Error message:
Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.
Resolution: Increase the API rate limits for your schema registry or reduce the rate of requests from your application.
- Self-managed schema registry errors
-
Error message:
Lambda received an internal server an unexpected error from the provided self-managed schema registry.
Resolution: Check the health and configuration of your self-managed schema registry server.