SelfManagedKafka - AWS Serverless Application Model

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SelfManagedKafka

SelfManagedKafka イベントソースタイプを説明するオブジェクトです。詳細については、「 AWS Lambda デベロッパーガイド」の「Using AWS Lambda with self-managed Apache Kafka」を参照してください。

AWS Serverless Application Model (AWS SAM) このイベントタイプが設定されている場合、 は AWS::Lambda::EventSourceMappingリソースを生成します。

Schema Registry を使用するには、関数に特定の IAM ロールのアクセス許可を定義する必要があります。必要な設定の例については、「IAM ロールでのセットアップを完了する」を参照してください。

構文

AWS SAM テンプレートでこのエンティティを宣言するには、次の構文を使用します。

プロパティ

BatchSize

Lambda がストリームから取り出し、関数に送信する各バッチ内の最大レコード数。

タイプ: 整数

必須: いいえ

デフォルト: 100

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの BatchSizeプロパティに直接渡されます。

最小: 1

最大: 10000

ConsumerGroupId

Kafka トピックからイベントを読み取る方法を設定する文字列。

タイプ: 文字列

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの SelfManagedKafkaConfigurationプロパティに直接渡されます。

DestinationConfig

Lambda がイベントを処理した後のイベントの送信先を指定する構成オブジェクト。

このプロパティを使用して、自己管理型の Kafka イベントソースからの失敗した呼び出しの送信先を指定します。

タイプ: DestinationConfig

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの DestinationConfigプロパティに直接渡されます。

Enabled

ポーリングと呼び出しを一時停止するために、イベントソースマッピングを無効にします。

型: ブール

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの Enabledプロパティに直接渡されます。

FilterCriteria

Lambda がイベントを処理する必要があるかどうかを判断するための基準を定義するオブジェクト。詳細については、AWS Lambda デベロッパーガイドAWS Lambda イベントのフィルタリングを参照してください。

タイプ: FilterCriteria

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの FilterCriteriaプロパティに直接渡されます。

KafkaBootstrapServers

Kafka ブローカー用のブートストラップサーバーのリスト。ポートを含めます。例: broker.example.com:xxxx

タイプ: リスト

必須: いいえ

AWS CloudFormation 互換性: このプロパティは に固有 AWS SAM であり、 AWS CloudFormation 同等のプロパティはありません。

KmsKeyArn

このイベントに関連する情報を暗号化するためのキーの Amazon リソースネーム (ARN)。

タイプ: 文字列

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの KmsKeyArnプロパティに直接渡されます。

ProvisionedPollerConfig

イベントソースマッピングの計算に使用されるポーラーの量を増やすための設定。この設定では、最小 1 つのポーラーと最大 20 のポーラーを使用できます。例については、「」を参照してください。 ProvisionedPollerConfig の例

タイプ: ProvisionedPollerConfig

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの ProvisionedPollerConfigプロパティに直接渡されます。

SchemaRegistryConfig

セルフマネージド Kafka イベントソースでスキーマレジストリを使用するための設定。

注記

この機能は を設定ProvisionedPollerConfigする必要があります。

タイプ: SchemaRegistryConfig

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの SelfManagedKafkaEventSourceConfigプロパティに直接渡されます。

SourceAccessConfigurations

認証プロトコルの配列、VPC コンポーネント、イベントソースを保護して定義する仮想ホスト。

有効な値: BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE

型: SourceAccessConfiguration のリスト

必須: はい

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの SelfManagedKafkaEventSourceConfig プロパティの一部です。

StartingPosition

読み取りを開始するストリームの場所です。

  • AT_TIMESTAMP - レコードの読み取りを開始する時間を指定します。

  • LATEST - 新しいレコードのみを読み込みます。

  • TRIM_HORIZON - 使用可能なすべてのレコードを処理します。

有効な値: AT_TIMESTAMP | LATEST | TRIM_HORIZON

タイプ: 文字列

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの StartingPositionプロパティに直接渡されます。

StartingPositionTimestamp

Unix タイム秒単位で読み取りをスタートする時間。StartingPositionAT_TIMESTAMP として指定されている場合の StartingPositionTimestamp を定義します。

型: 倍精度

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの StartingPositionTimestampプロパティに直接渡されます。

Topics

Kafka トピックの名前です。

タイプ: リスト

必須: はい

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの Topicsプロパティに直接渡されます。

IAM ロールを使用してセットアップを完了する

次の例は、スキーマレジストリを使用するために必要な IAM ロール設定を含む完全なセットアップを示しています。

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyKafkaProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - my-kafka-broker-1:9092 - my-kafka-broker-2:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig の例

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20

セルフマネージド型の Kafka イベントソース

以下は、SelfManagedKafka イベントソースタイプの例です。

YAML

Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: BatchSize: 1000 Enabled: true KafkaBootstrapServers: - abc.xyz.com:xxxx SourceAccessConfigurations: - Type: BASIC_AUTH URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - MyKafkaTopic

AWS Glue Schema Registry を使用したセルフマネージド Kafka イベントソース

AWS Glue Schema Registry で設定されたSelfManagedKafkaイベントソースタイプの例を次に示します。

Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678

Confluent Schema Registry を使用したセルフマネージド Kafka イベントソース

Confluent Schema Registry で設定されたSelfManagedKafkaイベントソースタイプの例を次に示します。

Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret