MSK - AWS Serverless Application Model

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

MSK

El objeto que describe un tipo de fuente de evento de MSK. Para obtener más información, consulte Uso AWS Lambda con Amazon MSK en la Guía para AWS Lambda desarrolladores.

AWS Serverless Application Model (AWS SAM) genera un AWS::Lambda::EventSourceMappingrecurso cuando se establece este tipo de evento.

Para utilizar Schema Registry, debe definir los permisos de rol de IAM específicos para su función. Consulte la Configuración completa con roles de IAM para ver un ejemplo de la configuración necesaria.

Sintaxis

Para declarar esta entidad en la AWS SAM plantilla, utilice la siguiente sintaxis.

Propiedades

BatchSize

El número máximo de registros en cada lote que Lambda extrae del flujo o la cola y envía a su función. Lambda pasa todos los registros del lote a la función en una sola llamada hasta el límite de carga para la invocación síncrona (6 MB).

Valor predeterminado: 100

Rango válido: valor mínimo de 1. Valor máximo de 10 000.

Tipo: entero

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se transfiere directamente a la BatchSize propiedad de un AWS::Lambda::EventSourceMapping recurso.

BisectBatchOnFunctionError

Si la función devuelve un error, divida el lote en dos y vuelva a intentarlo.

Tipo: Booleano

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la BisectBatchOnFunctionError propiedad de un AWS::Lambda::EventSourceMapping recurso.

ConsumerGroupId

Cadena que configura cómo se leerán los eventos de los temas de Kafka.

Tipo: cadena

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la AmazonManagedKafkaConfiguration propiedad de un AWS::Lambda::EventSourceMapping recurso.

DestinationConfig

Un objeto de configuración que especifica el destino de un evento después de que Lambda lo procese.

Utilice esta propiedad para especificar el destino de las invocaciones fallidas desde el origen de eventos de Amazon MSK.

Tipo: DestinationConfig

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la DestinationConfig propiedad de un AWS::Lambda::EventSourceMapping recurso.

Enabled

Deshabilita el mapeo de origen de eventos para pausar el sondeo y la invocación.

Tipo: Booleano

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la Enabled propiedad de un AWS::Lambda::EventSourceMapping recurso.

FilterCriteria

Un objeto que define los criterios que determinan si Lambda debe procesar un evento o no. Para obtener más información, consulta Filtrado de eventos de AWS Lambda en la Guía para desarrolladores de AWS Lambda .

Tipo: FilterCriteria

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la FilterCriteria propiedad de un AWS::Lambda::EventSourceMapping recurso.

FunctionResponseTypes

Una lista de enumeraciones de tipos de respuesta actuales aplicadas a la asignación de origen de eventos. Para obtener más información, consulta Informes de errores de artículos en lotes en la Guía para desarrolladores de AWS Lambda .

Valores válidos: ReportBatchItemFailures

Tipo: lista

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la FunctionResponseTypes propiedad de un AWS::Lambda::EventSourceMapping recurso.

KmsKeyArn

El nombre de recurso de Amazon (ARN) de la clave de que se utilizará para cifrar la información de este evento.

Tipo: cadena

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la KmsKeyArn propiedad de un AWS::Lambda::EventSourceMapping recurso.

MaximumBatchingWindowInSeconds

La cantidad de tiempo máxima para recopilar registros antes de invocar la función, en segundos.

Tipo: Entero

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la MaximumBatchingWindowInSeconds propiedad de un AWS::Lambda::EventSourceMapping recurso.

MaximumRecordAgeInSeconds

La antigüedad máxima de un registro que Lambda envía a una función para su procesamiento.

Tipo: entero

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la MaximumRecordAgeInSeconds propiedad de un AWS::Lambda::EventSourceMapping recurso.

MaximumRetryAttempts

El número máximo de veces que se debe volver a intentar cuando la función devuelve un error.

Tipo: entero

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la MaximumRetryAttempts propiedad de un AWS::Lambda::EventSourceMapping recurso.

ProvisionedPollerConfig

Configuración para aumentar la cantidad de sondeos utilizados para calcular las asignaciones de orígenes de eventos. Esta configuración permite un mínimo de 1 sondeador y un máximo de 2000 sondeadores. Para ver un ejemplo, consulte ProvisionedPollerConfig ejemplo.

Tipo: ProvisionedPollerConfig

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se transfiere directamente a la ProvisionedPollerConfig propiedad de un AWS::Lambda::EventSourceMapping recurso.

SchemaRegistryConfig

Configuración para usar un registro de esquemas con el origen de eventos de Kafka.

nota

Esta característica solicita que se configue ProvisionedPollerConfig.

Tipo: SchemaRegistryConfig

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la AmazonManagedKafkaEventSourceConfig propiedad de un AWS::Lambda::EventSourceMapping recurso.

SourceAccessConfigurations

Una matriz del protocolo de autenticación, los componentes de VPC o el host virtual para proteger y definir su origen de eventos.

Valores válidos: CLIENT_CERTIFICATE_TLS_AUTH

Tipo: lista de SourceAccessConfiguration

Obligatorio: no

CloudFormation compatibilidad: esta propiedad forma parte de la AmazonManagedKafkaEventSourceConfigpropiedad de un AWS::Lambda::EventSourceMapping recurso.

StartingPosition

La posición en el flujo donde comienza la lectura.

  • AT_TIMESTAMP: especifique el tiempo a partir del cual comenzar la lectura de registros.

  • LATEST: registros nuevos de solo lectura.

  • TRIM_HORIZON: procese todos los registros disponibles.

Valores válidos: AT_TIMESTAMP | LATEST | TRIM_HORIZON

Tipo: cadena

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la StartingPosition propiedad de un AWS::Lambda::EventSourceMapping recurso.

StartingPositionTimestamp

El tiempo a partir del cual comenzar la lectura, en segundos de tiempo Unix. Defina StartingPositionTimestamp cuando StartingPosition se especifica como AT_TIMESTAMP.

Tipo: Doble

Obligatorio: no

CloudFormation compatibilidad: esta propiedad se pasa directamente a la StartingPositionTimestamp propiedad de un AWS::Lambda::EventSourceMapping recurso.

Stream

El nombre de recurso de Amazon (ARN) del flujo de datos un consumidor de flujos.

Tipo: cadena

Obligatorio: sí

CloudFormation compatibilidad: esta propiedad se pasa directamente a la EventSourceArn propiedad de un AWS::Lambda::EventSourceMapping recurso.

Topics

El nombre del tema de Kafka.

Tipo: lista

Obligatorio: sí

CloudFormation compatibilidad: esta propiedad se pasa directamente a la Topics propiedad de un AWS::Lambda::EventSourceMapping recurso.

Ejemplos

Configuración completa con los roles de IAM

El siguiente ejemplo muestra una configuración completa que incluye la configuración del rol de IAM necesaria para usar Schema Registry:

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17 ' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig ejemplo

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 200

Ejemplo de Amazon MSK para un clúster existente

El siguiente es un ejemplo de un tipo de origen de eventos MSK para un clúster de Amazon MSK que ya existe en un Cuenta de AWS.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic

Ejemplo de Amazon MSK para un clúster declarado en la misma plantilla

El siguiente es un ejemplo de un tipo de origen de eventos MSK para un clúster de Amazon MSK que está declarado en el mismo archivo de plantilla.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic

Origen de eventos de MSK con Schema Registry

A continuación, se muestra un ejemplo de un tipo de origen de evento de MSKconfigurado con Schema Registry.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE

Origen de eventos de MSK con Schema Registry confluentes

A continuación, se muestra un ejemplo de un tipo de origen de evento de MSKconfigurado con Schema Registry confluente.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE