SelfManagedKafka
The object describing a SelfManagedKafka
event source type. For more
information, see Using AWS Lambda with
self-managed Apache Kafka in the AWS Lambda Developer Guide.
AWS Serverless Application Model (AWS SAM) generates an AWS::Lambda::EventSourceMapping resource when this event type is set.
To use Schema Registry, you need to define specific IAM role permissions for your function. See Complete setup with IAM roles for an example of the required configuration.
Syntax
To declare this entity in your AWS SAM template, use the following syntax.
YAML
BatchSize:
Integer
ConsumerGroupId:String
DestinationConfig:DestinationConfig
Enabled:Boolean
FilterCriteria:FilterCriteria
KafkaBootstrapServers:List
KmsKeyArn:String
ProvisionedPollerConfig:ProvisionedPollerConfig
SchemaRegistryConfig:SchemaRegistryConfig
SourceAccessConfigurations:SourceAccessConfigurations
StartingPosition:String
StartingPositionTimestamp:Double
Topics:List
Properties
-
BatchSize
-
The maximum number of records in each batch that Lambda pulls from your stream and sends to your function.
Type: Integer
Required: No
Default: 100
AWS CloudFormation compatibility: This property is passed directly to the
BatchSize
property of anAWS::Lambda::EventSourceMapping
resource.Minimum:
1
Maximum:
10000
-
ConsumerGroupId
-
A string that configures how events will be read from Kafka topics.
Type: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
SelfManagedKafkaConfiguration
property of anAWS::Lambda::EventSourceMapping
resource. -
DestinationConfig
-
A configuration object that specifies the destination of an event after Lambda processes it.
Use this property to specify the destination of failed invocations from the self-managed Kafka event source.
Type: DestinationConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
DestinationConfig
property of anAWS::Lambda::EventSourceMapping
resource. -
Enabled
-
Disables the event source mapping to pause polling and invocation.
Type: Boolean
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
Enabled
property of anAWS::Lambda::EventSourceMapping
resource. -
FilterCriteria
-
A object that defines the criteria to determine whether Lambda should process an event. For more information, see AWS Lambda event filtering in the AWS Lambda Developer Guide.
Type: FilterCriteria
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
FilterCriteria
property of anAWS::Lambda::EventSourceMapping
resource. -
KafkaBootstrapServers
-
The list of bootstrap servers for your Kafka brokers. Include the port, for example
broker.example.com:
xxxx
Type: List
Required: No
AWS CloudFormation compatibility: This property is unique to AWS SAM and doesn't have an AWS CloudFormation equivalent.
-
KmsKeyArn
-
The Amazon Resource Name (ARN) of the key to encrypt information related to this event.
Type: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
KmsKeyArn
property of anAWS::Lambda::EventSourceMapping
resource. -
ProvisionedPollerConfig
-
Configuration to increase the amount of pollers used to compute event source mappings. This configuration allows for a minumum of 1 poller and a maximum of 20 pollers. For an example, refer to ProvisionedPollerConfig example
Type: ProvisionedPollerConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
ProvisionedPollerConfig
property of anAWS::Lambda::EventSourceMapping
resource. SchemaRegistryConfig
-
Configuration for using a schema registry with the self-managed Kafka event source.
Note
This feature requires
ProvisionedPollerConfig
to be configured.Type: SchemaRegistryConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
SelfManagedKafkaEventSourceConfig
property of anAWS::Lambda::EventSourceMapping
resource. -
SourceAccessConfigurations
-
An array of the authentication protocol, VPC components, or virtual host to secure and define your event source.
Valid values:
BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE
Type: List of SourceAccessConfiguration
Required: Yes
AWS CloudFormation compatibility: This property is part of the SelfManagedKafkaEventSourceConfig property of an
AWS::Lambda::EventSourceMapping
resource. -
StartingPosition
-
The position in a stream from which to start reading.
-
AT_TIMESTAMP
– Specify a time from which to start reading records. -
LATEST
– Read only new records. -
TRIM_HORIZON
– Process all available records.
Valid values:
AT_TIMESTAMP
|LATEST
|TRIM_HORIZON
Type: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
StartingPosition
property of anAWS::Lambda::EventSourceMapping
resource. -
-
StartingPositionTimestamp
-
The time from which to start reading, in Unix time seconds. Define
StartingPositionTimestamp
whenStartingPosition
is specified asAT_TIMESTAMP
.Type: Double
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
StartingPositionTimestamp
property of anAWS::Lambda::EventSourceMapping
resource. -
Topics
-
The name of the Kafka topic.
Type: List
Required: Yes
AWS CloudFormation compatibility: This property is passed directly to the
Topics
property of anAWS::Lambda::EventSourceMapping
resource.
Examples
Complete setup with IAM roles
The following example shows a complete setup including the required IAM role configuration for using Schema Registry:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyKafkaProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - my-kafka-broker-1:9092 - my-kafka-broker-2:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig example
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
Self-managed Kafka event source
The following is an example of a SelfManagedKafka
event source type.
YAML
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: BatchSize: 1000 Enabled: true KafkaBootstrapServers: - abc.xyz.com:xxxx SourceAccessConfigurations: - Type: BASIC_AUTH URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - MyKafkaTopic
Self-managed Kafka Event Source with AWS Glue Schema Registry
The following is an example of a SelfManagedKafka
event source type configured with AWS Glue Schema Registry.
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678
Self-managed Kafka Event Source with Confluent Schema Registry
The following is an example of a SelfManagedKafka
event source type configured with Confluent Schema Registry.
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret