MSK - AWS Serverless Application Model

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

MSK

El objeto que describe un tipo de fuente de evento de MSK. Para obtener más información, consulte Uso AWS Lambda con Amazon MSK en la Guía para AWS Lambda desarrolladores.

AWS Serverless Application Model (AWS SAM) genera un AWS::Lambda::EventSourceMappingrecurso cuando se establece este tipo de evento.

Para utilizar Schema Registry, debe definir permisos de rol de IAM específicos para su función. Consulte Configuración completa con funciones de IAM para ver un ejemplo de la configuración requerida.

Sintaxis

Para declarar esta entidad en la AWS SAM plantilla, utilice la siguiente sintaxis.

Propiedades

ConsumerGroupId

Una cadena que configura cómo se leerán los eventos de los temas de Kafka.

Tipo: cadena

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la AmazonManagedKafkaConfiguration propiedad de un AWS::Lambda::EventSourceMapping recurso.

DestinationConfig

Un objeto de configuración que especifica el destino de un evento después de que Lambda lo procese.

Utilice esta propiedad para especificar el destino de las invocaciones fallidas desde el origen de eventos de Amazon MSK.

Tipo: DestinationConfig

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la DestinationConfig propiedad de un AWS::Lambda::EventSourceMapping recurso.

FilterCriteria

Un objeto que define los criterios que determinan si Lambda debe procesar un evento o no. Para obtener más información, consulta Filtrado de eventos de AWS Lambda en la Guía para desarrolladores de AWS Lambda .

Tipo: FilterCriteria

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la FilterCriteria propiedad de un AWS::Lambda::EventSourceMapping recurso.

KmsKeyArn

El nombre de recurso de Amazon (ARN) de la clave de que se utilizará para cifrar la información de este evento.

Tipo: cadena

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la KmsKeyArn propiedad de un AWS::Lambda::EventSourceMapping recurso.

MaximumBatchingWindowInSeconds

La cantidad de tiempo máxima para recopilar registros antes de invocar la función, en segundos.

Tipo: Entero

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la MaximumBatchingWindowInSeconds propiedad de un AWS::Lambda::EventSourceMapping recurso.

ProvisionedPollerConfig

Configuración para aumentar la cantidad de sondeos utilizados para calcular las asignaciones de fuentes de eventos. Esta configuración permite un mínimo de 1 sondeador y un máximo de 20 sondeos. Para ver un ejemplo, consulte. ProvisionedPollerConfig ejemplo

Tipo: ProvisionedPollerConfig

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la ProvisionedPollerConfig propiedad de un AWS::Lambda::EventSourceMapping recurso.

SchemaRegistryConfig

Configuración para usar un registro de esquemas con la fuente de eventos de Kafka.

nota

Es necesario ProvisionedPollerConfig configurar esta función.

Tipo: SchemaRegistryConfig

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la AmazonManagedKafkaEventSourceConfig propiedad de un AWS::Lambda::EventSourceMapping recurso.

SourceAccessConfigurations

Una matriz del protocolo de autenticación, los componentes de VPC o el host virtual para proteger y definir su origen de eventos.

Valores válidos: CLIENT_CERTIFICATE_TLS_AUTH

Tipo: lista de SourceAccessConfiguration

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad forma parte de la AmazonManagedKafkaEventSourceConfigpropiedad de un AWS::Lambda::EventSourceMapping recurso.

StartingPosition

La posición en el flujo donde comienza la lectura.

  • AT_TIMESTAMP: especifique el tiempo a partir del cual comenzar la lectura de registros.

  • LATEST: registros nuevos de solo lectura.

  • TRIM_HORIZON: procese todos los registros disponibles.

Valores válidos: AT_TIMESTAMP | LATEST | TRIM_HORIZON

Tipo: cadena

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la StartingPosition propiedad de un AWS::Lambda::EventSourceMapping recurso.

StartingPositionTimestamp

El tiempo a partir del cual comenzar la lectura, en segundos de tiempo Unix. Defina StartingPositionTimestamp cuando StartingPosition se especifica como AT_TIMESTAMP.

Tipo: Doble

Obligatorio: no

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la StartingPositionTimestamp propiedad de un AWS::Lambda::EventSourceMapping recurso.

Stream

El nombre de recurso de Amazon (ARN) del flujo de datos un consumidor de flujos.

Tipo: cadena

Obligatorio: sí

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la EventSourceArn propiedad de un AWS::Lambda::EventSourceMapping recurso.

Topics

El nombre del tema de Kafka.

Tipo: lista

Obligatorio: sí

AWS CloudFormation compatibilidad: esta propiedad se pasa directamente a la Topics propiedad de un AWS::Lambda::EventSourceMapping recurso.

Ejemplos

Configuración completa con funciones de IAM

El siguiente ejemplo muestra una configuración completa que incluye la configuración del rol de IAM necesaria para usar Schema Registry:

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig ejemplo

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20

Ejemplo de Amazon MSK para un clúster existente

El siguiente es un ejemplo de un tipo de origen de eventos MSK para un clúster de Amazon MSK que ya existe en un Cuenta de AWS.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic

Ejemplo de Amazon MSK para un clúster declarado en la misma plantilla

El siguiente es un ejemplo de un tipo de origen de eventos MSK para un clúster de Amazon MSK que está declarado en el mismo archivo de plantilla.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic

Fuente de eventos de MSK con registro de esquemas

A continuación se muestra un ejemplo de un tipo de fuente de MSK eventos configurado con un registro de esquemas.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE

Fuente de eventos de MSK con registro de esquemas confluentes

El siguiente es un ejemplo de un tipo de fuente de MSK eventos configurado con un registro de esquemas confluentes.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE