SelfManagedKafka
The object describing a SelfManagedKafka event source type. For more
information, see Using AWS Lambda with
self-managed Apache Kafka in the AWS Lambda Developer Guide.
AWS Serverless Application Model (AWS SAM) generates an AWS::Lambda::EventSourceMapping resource when this event type is set.
To use Schema Registry, you need to define specific IAM role permissions for your function. See Complete setup with IAM roles for an example of the required configuration.
Syntax
To declare this entity in your AWS SAM template, use the following syntax.
YAML
BatchSize:IntegerConsumerGroupId:StringDestinationConfig:DestinationConfigEnabled:BooleanFilterCriteria:FilterCriteriaKafkaBootstrapServers:ListKmsKeyArn:StringProvisionedPollerConfig:ProvisionedPollerConfigSchemaRegistryConfig:SchemaRegistryConfigSourceAccessConfigurations:SourceAccessConfigurationsStartingPosition:StringStartingPositionTimestamp:DoubleTopics:List
Properties
-
BatchSize -
The maximum number of records in each batch that Lambda pulls from your stream and sends to your function.
Type: Integer
Required: No
Default: 100
AWS CloudFormation compatibility: This property is passed directly to the
BatchSizeproperty of anAWS::Lambda::EventSourceMappingresource.Minimum:
1Maximum:
10000 -
ConsumerGroupId -
A string that configures how events will be read from Kafka topics.
Type: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
SelfManagedKafkaConfigurationproperty of anAWS::Lambda::EventSourceMappingresource. -
DestinationConfig -
A configuration object that specifies the destination of an event after Lambda processes it.
Use this property to specify the destination of failed invocations from the self-managed Kafka event source.
Type: DestinationConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
DestinationConfigproperty of anAWS::Lambda::EventSourceMappingresource. -
Enabled -
Disables the event source mapping to pause polling and invocation.
Type: Boolean
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
Enabledproperty of anAWS::Lambda::EventSourceMappingresource. -
FilterCriteria -
A object that defines the criteria to determine whether Lambda should process an event. For more information, see AWS Lambda event filtering in the AWS Lambda Developer Guide.
Type: FilterCriteria
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
FilterCriteriaproperty of anAWS::Lambda::EventSourceMappingresource. -
KafkaBootstrapServers -
The list of bootstrap servers for your Kafka brokers. Include the port, for example
broker.example.com:xxxxType: List
Required: No
AWS CloudFormation compatibility: This property is unique to AWS SAM and doesn't have an AWS CloudFormation equivalent.
-
KmsKeyArn -
The Amazon Resource Name (ARN) of the key to encrypt information related to this event.
Type: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
KmsKeyArnproperty of anAWS::Lambda::EventSourceMappingresource. -
ProvisionedPollerConfig -
Configuration to increase the amount of pollers used to compute event source mappings. This configuration allows for a minumum of 1 poller and a maximum of 20 pollers. For an example, refer to ProvisionedPollerConfig example
Type: ProvisionedPollerConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
ProvisionedPollerConfigproperty of anAWS::Lambda::EventSourceMappingresource. SchemaRegistryConfig-
Configuration for using a schema registry with the self-managed Kafka event source.
Note
This feature requires
ProvisionedPollerConfigto be configured.Type: SchemaRegistryConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
SelfManagedKafkaEventSourceConfigproperty of anAWS::Lambda::EventSourceMappingresource. -
SourceAccessConfigurations -
An array of the authentication protocol, VPC components, or virtual host to secure and define your event source.
Valid values:
BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATEType: List of SourceAccessConfiguration
Required: Yes
AWS CloudFormation compatibility: This property is part of the SelfManagedKafkaEventSourceConfig property of an
AWS::Lambda::EventSourceMappingresource. -
StartingPosition -
The position in a stream from which to start reading.
-
AT_TIMESTAMP– Specify a time from which to start reading records. -
LATEST– Read only new records. -
TRIM_HORIZON– Process all available records.
Valid values:
AT_TIMESTAMP|LATEST|TRIM_HORIZONType: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
StartingPositionproperty of anAWS::Lambda::EventSourceMappingresource. -
-
StartingPositionTimestamp -
The time from which to start reading, in Unix time seconds. Define
StartingPositionTimestampwhenStartingPositionis specified asAT_TIMESTAMP.Type: Double
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
StartingPositionTimestampproperty of anAWS::Lambda::EventSourceMappingresource. -
Topics -
The name of the Kafka topic.
Type: List
Required: Yes
AWS CloudFormation compatibility: This property is passed directly to the
Topicsproperty of anAWS::Lambda::EventSourceMappingresource.
Examples
Complete setup with IAM roles
The following example shows a complete setup including the required IAM role configuration for using Schema Registry:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17 ' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyKafkaProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - my-kafka-broker-1:9092 - my-kafka-broker-2:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig example
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
Self-managed Kafka event source
The following is an example of a SelfManagedKafka event source type.
YAML
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: BatchSize: 1000 Enabled: true KafkaBootstrapServers: - abc.xyz.com:xxxx SourceAccessConfigurations: - Type: BASIC_AUTH URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - MyKafkaTopic
Self-managed Kafka Event Source with AWS Glue Schema Registry
The following is an example of a SelfManagedKafka event source type configured with AWS Glue Schema Registry.
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678
Self-managed Kafka Event Source with Confluent Schema Registry
The following is an example of a SelfManagedKafka event source type configured with Confluent Schema Registry.
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret