翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
SelfManagedKafka
SelfManagedKafka イベントソースタイプを説明するオブジェクトです。詳細については、「 AWS Lambda デベロッパーガイド」の「Using AWS Lambda with self-managed Apache Kafka」を参照してください。
AWS Serverless Application Model (AWS SAM) このイベントタイプが設定されている場合、 は AWS::Lambda::EventSourceMappingリソースを生成します。
Schema Registry を使用するには、関数に特定の IAM ロール許可を定義する必要があります。必要な設定の例については、「Complete setup with IAM roles」を参照してください。
構文
AWS SAM テンプレートでこのエンティティを宣言するには、次の構文を使用します。
YAML
BatchSize:IntegerBisectBatchOnFunctionError:BooleanConsumerGroupId:StringDestinationConfig:DestinationConfigEnabled:BooleanFilterCriteria:FilterCriteriaKafkaBootstrapServers:ListFunctionResponseTypes:ListKmsKeyArn:StringMaximumRecordAgeInSeconds:IntegerMaximumRetryAttempts:IntegerProvisionedPollerConfig:ProvisionedPollerConfigSchemaRegistryConfig:SchemaRegistryConfigSourceAccessConfigurations:SourceAccessConfigurationsStartingPosition:StringStartingPositionTimestamp:DoubleTopics:List
プロパティ
-
BatchSize -
Lambda がストリームから取り出し、関数に送信する各バッチ内の最大レコード数。
タイプ: 整数
必須: いいえ
デフォルト: 100
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのBatchSizeプロパティに直接渡されます。最小:
1最大:
10000 -
BisectBatchOnFunctionError -
関数がエラーを返す場合は、バッチを 2 つに分割して再試行します。
型: ブール
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのBisectBatchOnFunctionErrorプロパティに直接渡されます。 -
ConsumerGroupId -
Kafka トピックからイベントを読み取る方法を設定する文字列。
タイプ: 文字列
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのSelfManagedKafkaConfigurationプロパティに直接渡されます。 -
DestinationConfig -
Lambda がイベントを処理した後のイベントの送信先を指定する構成オブジェクト。
このプロパティを使用して、自己管理型の Kafka イベントソースからの失敗した呼び出しの送信先を指定します。
タイプ: DestinationConfig
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのDestinationConfigプロパティに直接渡されます。 -
Enabled -
ポーリングと呼び出しを一時停止するために、イベントソースマッピングを無効にします。
型: ブール
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのEnabledプロパティに直接渡されます。 -
FilterCriteria -
Lambda がイベントを処理する必要があるかどうかを判断するための基準を定義するオブジェクト。詳細については、AWS Lambda デベロッパーガイドの AWS Lambda イベントのフィルタリングを参照してください。
タイプ: FilterCriteria
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのFilterCriteriaプロパティに直接渡されます。 -
KafkaBootstrapServers -
Kafka ブローカー用のブートストラップサーバーのリスト。ポートを含めます。例:
broker.example.com:xxxxタイプ: リスト
必須: いいえ
CloudFormation 互換性: このプロパティは に固有 AWS SAM であり、 CloudFormation 同等のプロパティはありません。
-
FunctionResponseTypes -
現在イベントソースマッピングに適用されているレスポンスタイプのリストです。詳細については、「AWS Lambda デベロッパーガイド」の「バッチアイテムの失敗をレポートする」を参照してください。
有効な値:
ReportBatchItemFailuresタイプ: リスト
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのFunctionResponseTypesプロパティに直接渡されます。 -
KmsKeyArn -
このイベントに関連する情報を暗号化するためのキーの Amazon リソースネーム (ARN)。
タイプ: 文字列
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのKmsKeyArnプロパティに直接渡されます。 -
MaximumRecordAgeInSeconds -
Lambda が処理のために関数に送信するレコードの最大存続時間です。
タイプ: 整数
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのMaximumRecordAgeInSecondsプロパティに直接渡されます。 -
MaximumRetryAttempts -
関数がエラーを返すときの最大再試行回数です。
タイプ: 整数
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのMaximumRetryAttemptsプロパティに直接渡されます。 -
ProvisionedPollerConfig -
イベントソースマッピングの計算に使用されるポーラーの数を増やすための設定。この設定では、最小 1 つのポーラーと最大 2000 のポーラーを使用できます。例については、「ProvisionedPollerConfig の例」を参照してください。
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのProvisionedPollerConfigプロパティに直接渡されます。 SchemaRegistryConfig-
セルフマネージド Kafka イベントソースでスキーマレジストリを使用するための設定。
注記
この機能は
ProvisionedPollerConfigを設定するために必要です。タイプ: SchemaRegistryConfig
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのSelfManagedKafkaEventSourceConfigプロパティに直接渡されます。 -
SourceAccessConfigurations -
認証プロトコルの配列、VPC コンポーネント、イベントソースを保護して定義する仮想ホスト。
有効な値:
BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE型: SourceAccessConfiguration のリスト
必須: はい
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースの SelfManagedKafkaEventSourceConfig プロパティの一部です。 -
StartingPosition -
読み取りを開始するストリームの場所です。
-
AT_TIMESTAMP- レコードの読み取りを開始する時間を指定します。 -
LATEST- 新しいレコードのみを読み込みます。 -
TRIM_HORIZON- 使用可能なすべてのレコードを処理します。
有効な値:
AT_TIMESTAMP|LATEST|TRIM_HORIZONタイプ: 文字列
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのStartingPositionプロパティに直接渡されます。 -
-
StartingPositionTimestamp -
Unix タイム秒単位で読み取りをスタートする時間。
StartingPositionがAT_TIMESTAMPとして指定されている場合のStartingPositionTimestampを定義します。型: 倍精度
必須: いいえ
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのStartingPositionTimestampプロパティに直接渡されます。 -
Topics -
Kafka トピックの名前です。
タイプ: リスト
必須: はい
CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMappingリソースのTopicsプロパティに直接渡されます。
例
IAM ロールを使用してセットアップを完了する
次の例は、Schema Registry を使用するために必要な IAM ロール設定を含む完全なセットアップを示しています。
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17 ' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyKafkaProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - my-kafka-broker-1:9092 - my-kafka-broker-2:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig の例
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 200
セルフマネージド型の Kafka イベントソース
以下は、SelfManagedKafka イベントソースタイプの例です。
YAML
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: BatchSize: 1000 Enabled: true KafkaBootstrapServers: - abc.xyz.com:xxxx SourceAccessConfigurations: - Type: BASIC_AUTH URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - MyKafkaTopic
AWS Glue Schema Registry を使用したセルフマネージド Kafka イベントソース
AWS Glue Schema Registry で設定されたSelfManagedKafkaイベントソースタイプの例を次に示します。
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678
Confluent Schema Registry を使用したセルフマネージド Kafka イベントソース
以下は、Confluent Schema Registry を使用して設定された SelfManagedKafka イベントソースタイプの例です。
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret