MSK - AWS Serverless Application Model

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

MSK

Das Objekt, das einen MSK Ereignisquellentyp beschreibt. Weitere Informationen finden Sie unter Using AWS Lambda with Amazon MSK im AWS Lambda Developer Guide.

AWS Serverless Application Model (AWS SAM) generiert eine AWS::Lambda::EventSourceMappingRessource, wenn dieser Ereignistyp festgelegt ist.

Um Schema Registry verwenden zu können, müssen Sie spezifische IAM-Rollenberechtigungen für Ihre Funktion definieren. Ein Beispiel für die erforderliche Konfiguration finden Sie unter Vollständige Einrichtung mit IAM-Rollen.

Syntax

Verwenden Sie die folgende Syntax, um diese Entität in Ihrer AWS SAM Vorlage zu deklarieren.

Eigenschaften

ConsumerGroupId

Eine Zeichenfolge, die konfiguriert, wie Ereignisse aus Kafka-Themen gelesen werden.

Typ: Zeichenfolge

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die AmazonManagedKafkaConfiguration Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

DestinationConfig

Ein Konfigurationsobjekt, das das Ziel eines Ereignisses angibt, nachdem Lambda es verarbeitet hat.

Verwenden Sie diese Eigenschaft, um das Ziel fehlgeschlagener Aufrufe aus der Amazon MSK-Ereignisquelle anzugeben.

Typ: DestinationConfig

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die DestinationConfig Eigenschaft einer Ressource übergeben. AWS::Lambda::EventSourceMapping

FilterCriteria

Ein Objekt, das die Kriterien definiert, die bestimmen, ob Lambda ein Ereignis verarbeiten soll. Weitere Informationen finden Sie unter AWS Lambda Ereignisfilterung im AWS Lambda Entwicklerhandbuch.

Typ: FilterCriteria

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die FilterCriteria Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

KmsKeyArn

Der Amazon-Ressourcenname (ARN) des Schlüssels zur Verschlüsselung von Informationen im Zusammenhang mit diesem Ereignis.

Typ: Zeichenfolge

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die KmsKeyArn Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

MaximumBatchingWindowInSeconds

Die maximale Zeitspanne zur Erfassung von Datensätzen vor dem Aufruf der Funktion in Sekunden.

Typ: Ganzzahl

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die MaximumBatchingWindowInSeconds Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

ProvisionedPollerConfig

Konfiguration zur Erhöhung der Anzahl von Pollern, die zur Berechnung von Ereignisquellenzuordnungen verwendet werden. Diese Konfiguration ermöglicht mindestens einen Poller und ein Maximum von 20 Pollern. Ein Beispiel finden Sie unter. ProvisionedPollerConfig Beispiel

Typ: ProvisionedPollerConfig

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die ProvisionedPollerConfig Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

SchemaRegistryConfig

Konfiguration für die Verwendung einer Schemaregistrierung mit der Kafka-Ereignisquelle.

Anmerkung

Diese Funktion ProvisionedPollerConfig muss konfiguriert werden.

Typ: SchemaRegistryConfig

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die AmazonManagedKafkaEventSourceConfig Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

SourceAccessConfigurations

Ein Array des Authentifizierungsprotokolls, der VPC-Komponenten oder des virtuellen Hosts zum Sichern und Definieren Ihrer Ereignisquelle.

Gültige Werte: CLIENT_CERTIFICATE_TLS_AUTH

Typ: Liste von SourceAccessConfiguration

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft ist Teil der AmazonManagedKafkaEventSourceConfigEigenschaft einer AWS::Lambda::EventSourceMapping Ressource.

StartingPosition

Die Position im Stream, an der mit dem Lesen begonnen wird.

  • AT_TIMESTAMP— Geben Sie einen Zeitpunkt an, ab dem mit dem Lesen von Datensätzen begonnen werden soll.

  • LATEST— Nur neue Datensätze lesen.

  • TRIM_HORIZON— Verarbeitet alle verfügbaren Datensätze.

Zulässige Werte: AT_TIMESTAMP | LATEST | TRIM_HORIZON

Typ: Zeichenfolge

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die StartingPosition Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

StartingPositionTimestamp

Die Zeit, ab der mit dem Lesen begonnen werden soll, in Unix-Zeitsekunden. DefiniertStartingPositionTimestamp, wann als angegeben StartingPosition istAT_TIMESTAMP.

Type: Double

Required: No

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die StartingPositionTimestamp Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

Stream

Der Amazon-Ressourcenname (ARN) des Datenstroms oder eines Stream-Verbrauchers.

Typ: Zeichenfolge

Erforderlich: Ja

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die EventSourceArn Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

Topics

Der Name des Kafka-Themas.

Typ: Liste

Erforderlich: Ja

AWS CloudFormation Kompatibilität: Diese Eigenschaft wird direkt an die Topics Eigenschaft einer AWS::Lambda::EventSourceMapping Ressource übergeben.

Beispiele

Schließen Sie das Setup mit IAM-Rollen ab

Das folgende Beispiel zeigt ein vollständiges Setup einschließlich der erforderlichen IAM-Rollenkonfiguration für die Verwendung von Schema Registry:

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig Beispiel

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20

Amazon MSK-Beispiel für einen vorhandenen Cluster

Im Folgenden finden Sie ein Beispiel für einen MSK Ereignisquellentyp für einen Amazon MSK-Cluster, der bereits in einem AWS-Konto vorhanden ist.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic

Amazon MSK-Beispiel für einen Cluster, der in derselben Vorlage deklariert wurde

Im Folgenden finden Sie ein Beispiel für einen MSK Ereignisquellentyp für einen Amazon MSK-Cluster, der in derselben Vorlagendatei deklariert ist.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic

MSK-Ereignisquelle mit Schemaregistrierung

Im Folgenden finden Sie ein Beispiel für einen MSK Ereignisquellentyp, der mit einer Schemaregistrierung konfiguriert wurde.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE

MSK-Ereignisquelle mit Confluent Schema Registry

Im Folgenden finden Sie ein Beispiel für einen MSK Ereignisquellentyp, der mit einer Confluent Schema Registry konfiguriert wurde.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE