Elaborare i record del flusso di dati Amazon Kinesis con Lambda - AWS Lambda

Elaborare i record del flusso di dati Amazon Kinesis con Lambda

Per elaborare i record del flusso di dati Amazon Kinesis con Lambda, crea un consumer per il tuo flusso e quindi crea uno strumento di mappatura dell'origine degli eventi Lambda. È possibile mappare una funzione Lambda a un flusso di dati (iteratore standard) o a un consumatore di un flusso (fan-out avanzato). Per ulteriori informazioni, consulta Flussi di polling e batching.

Per creare uno strumento di mappatura dell'origine degli eventi Kinesis

Per richiamare la funzione Lambda con i record dal flusso di dati, crea uno strumento di mappatura dell'origine degli eventi. È possibile creare più mappature delle origini eventi per elaborare gli stessi dati con più funzioni Lambda o per elaborare elementi da più flussi di dati con una singola funzione. Quando si elaborano elementi da più flussi, ogni batch conterrà i record di un solo shard o di un solo flusso.

È possibile configurare gli strumenti di mappatura dell'origine degli eventi per elaborare i record da un flusso in un altro Account AWS. Per ulteriori informazioni, consulta Creazione di uno strumento di mappatura dell'origine degli eventi multi-account.

Prima di creare uno strumento di mappatura dell'origine degli eventi, devi autorizzare la funzione Lambda a leggere da un flusso di dati Kinesis. Lambda richiede le seguenti autorizzazioni per gestire le risorse correlate al flusso di dati Kinesis:

Queste autorizzazioni sono incluse nella policy gestita da AWS AWSLambdaKinesisExecutionRole. Aggiungi questa policy gestita alla funzione come descritto nella seguente procedura.

Nota
  • Non è necessaria l'kinesis:ListStreamsautorizzazione per creare e gestire le mappature delle sorgenti degli eventi per Kinesis. Tuttavia, se crei una mappatura dell'origine degli eventi nella console e non disponi di questa autorizzazione, non sarai in grado di selezionare uno stream Kinesis da un elenco a discesa e la console visualizzerà un errore. Per creare la mappatura della fonte dell'evento, devi inserire manualmente l'Amazon Resource Name (ARN) del nome della risorsa Amazon (ARN) del flusso.

  • Lambda effettua kinesis:GetRecords chiamate kinesis:GetShardIterator API ogni volta che tenta di ripetere le chiamate non riuscite.

Console di gestione AWS
Per aggiungere le autorizzazioni Kinesis alla tua funzione
  1. Apri la pagina Funzioni della console Lambda e scegli la tua funzione.

  2. Nella scheda Configurazione scegli Autorizzazioni.

  3. Nel riquadro Ruolo di esecuzione, ub Nome del ruolo, scegli il link al ruolo di esecuzione della tua funzione. Questo link apre la pagina del ruolo nella console IAM.

  4. Nel riquadro Policy di autorizzazioni, seleziona Aggiungi autorizzazioni, quindi Collega policy.

  5. Inserisci nel campo di ricerca AWSLambdaKinesisExecutionRole.

  6. Seleziona la casella di controllo accanto al nome della policy, quindi scegli Aggiungi autorizzazione.

AWS CLI
Per aggiungere le autorizzazioni Kinesis alla tua funzione
  • Per aggiungere la policy AWSLambdaKinesisExecutionRole al ruolo di esecuzione della funzione, eseguire il comando della CLI sotto riportato:

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
Per aggiungere le autorizzazioni Kinesis alla tua funzione
  • Nella definizione della funzione, aggiungi la proprietà Policies come mostrato nell'esempio seguente:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

Dopo aver configurato le autorizzazioni richieste, crea lo strumento di mappatura dell'origine degli eventi.

Console di gestione AWS
Per creare uno strumento di mappatura dell'origine degli eventi Kinesis
  1. Apri la pagina Funzioni della console Lambda e scegli la tua funzione.

  2. Nel riquadro Panoramica della funzione, scegli Aggiungi trigger.

  3. In Configurazione del trigger, per l'origine seleziona Kinesis.

  4. Seleziona il flusso Kinesis per il quale desideri creare lo strumento di mappatura dell'origine degli eventi e, facoltativamente, un consumer del tuo flusso.

  5. (Facoltativo) Modifica la dimensione del batch, la posizione iniziale e la finestra batch per lo strumento di mappatura dell'origine degli eventi.

  6. Scegli Aggiungi.

Inoltre, quando si crea lo strumento di mappatura dell'origine degli eventi dalla console, il ruolo IAM deve disporre delle autorizzazioni kinesis:ListStreams e kinesis:ListStreamConsumers.

AWS CLI
Per creare uno strumento di mappatura dell'origine degli eventi Kinesis
  • Per creare uno strumento di mappatura dell'origine degli eventi Kinesis, eseguire il comando della CLI sotto riportato. Scegli la dimensione del batch e la posizione iniziale in base al tuo caso d'uso.

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

Per specificare una finestra di batch, aggiungi l'opzione --maximum-batching-window-in-seconds. Per ulteriori informazioni sull'utilizzo di questo e di altri parametri, consulta create-event-source-mapping in Riferimento ai comandi della AWS CLI.

AWS SAM
Per creare uno strumento di mappatura dell'origine degli eventi Kinesis
  • Nella definizione della funzione, aggiungi la proprietà KinesisEvent come mostrato nell'esempio seguente:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

Per ulteriori informazioni sulla creazione di uno strumento di mappatura dell'origine degli eventi per il flusso di dati Kinesis in AWS SAM, consulta Kinesis nella Guida per gli sviluppatori di AWS Serverless Application Model.

Posizioni di partenza di polling e flussi

Tieni presente che il polling dei flussi durante la creazione e gli aggiornamenti dello strumento di mappatura dell'origine degli eventi alla fine è coerente.

  • Durante la creazione dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

  • Durante gli aggiornamenti dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

Questo comportamento implica che se specifichi LATEST come posizione iniziale del flusso, lo strumento di mappatura dell'origine degli eventi potrebbe perdere eventi durante la creazione o gli aggiornamenti. Per non perdere alcun evento, specifica la posizione iniziale del flusso come TRIM_HORIZON o AT_TIMESTAMP.

Creazione di uno strumento di mappatura dell'origine degli eventi multi-account

Flusso di dati Amazon Kinesis supporta le policy basate sulle risorse. Per questo motivo, puoi elaborare i dati importati in un flusso in un Account AWS con una funzione Lambda in un altro account.

Per creare uno strumento di mappatura dell'origine degli eventi per la tua funzione Lambda utilizzando un flusso Kinesis in un altro Account AWS, devi configurare il flusso utilizzando una policy basata sulle risorse per assegnare alla funzione Lambda a leggere gli elementi. Per informazioni su come configurare il flusso per consentire l'accesso multi-account, consulta Condivisione dell'accesso con funzioni AWS Lambda multi-account nella Guida per gli sviluppatori di Amazon Kinesis Streams.

Dopo aver configurato il flusso con una policy basata sulle risorse che fornisce alla funzione Lambda le autorizzazioni richieste, crea lo strumento di mappatura dell'origine degli eventi utilizzando uno dei metodi descritti nella sezione precedente.

Se scegli di creare lo strumento di mappatura dell'origine degli eventi utilizzando la console Lambda, incolla l'ARN del tuo flusso direttamente nel campo di input. Se desideri specificare un consumer per il tuo flusso, incollando l'ARN del consumer viene compilato automaticamente il campo del flusso.