Configurar a resposta em lote parcial com o Kinesis Data Streams e o Lambda
Ao consumir e processar dados de transmissão de uma fonte de eventos, o Lambda definirá checkpoints por padrão no número mais elevado na sequência de um lote somente quando o lote for um sucesso total. O Lambda trata todos os outros resultados como uma falha completa e tenta processar novamente o lote até o limite de novas tentativas. Para permitir sucessos parciais durante o processamento de lotes de um stream, ative ReportBatchItemFailures. Permitir sucessos parciais pode ajudar a reduzir o número de novas tentativas em um registro, embora não impeça totalmente a possibilidade de novas tentativas em um registro bem-sucedido.
Para ativar ReportBatchItemFailures, inclua o valor de enum ReportBatchItemFailures na lista FunctionResponseTypes. Essa lista indica quais tipos de resposta estão habilitados para sua função. Você pode configurar essa lista ao criar ou atualizar um mapeamento de origem de eventos.
Mesmo quando seu código de função retorna respostas parciais de falha em lote, essas respostas não serão processadas pelo Lambda, a menos que o atributo ReportBatchItemFailures esteja explicitamente ativado para o mapeamento da origem do evento.
Sintaxe do relatório
Ao configurar relatórios sobre falhas de itens de lote, a classe StreamsEventResponse é retornada com uma lista de falhas de itens de lote. É possível usar um objeto StreamsEventResponse para retornar o número sequencial do primeiro registro com falha no lote. Você também pode criar sua própria classe personalizada usando a sintaxe de resposta correta. A seguinte estrutura JSON mostra a sintaxe de resposta necessária:
{
"batchItemFailures": [
{
"itemIdentifier": "<SequenceNumber>"
}
]
}
Se a matriz batchItemFailures contém vários itens, o Lambda usa o registro com o menor número de sequência como ponto de verificação. Em seguida, o Lambda repete todos os registros a partir desse ponto de verificação.
Condições de sucesso e falha
O Lambda trata um lote como um sucesso completo se você retornar qualquer um destes:
O Lambda trata um lote como uma falha absoluta se você retornar qualquer um dos seguintes:
O Lambda faz novas tentativas após falhas com base na sua estratégia de repetição.
Dividir um lote
Se a invocação falhar e BisectBatchOnFunctionError estiver ativado, o lote será dividido independentemente da configuração de ReportBatchItemFailures.
Quando uma resposta de sucesso parcial do lote é recebida e tanto BisectBatchOnFunctionError quanto ReportBatchItemFailures estão ativados, o lote é dividido no número de sequência retornado e o Lambda tenta novamente apenas os registros restantes.
Para simplificar a implementação da lógica de respostas parciais em lote, considere usar o utilitário de processador em lote da Powertools para AWS Lambda, que lida de maneira automática com essas complexidades para você.
Veja alguns exemplos de código de função que retornam a lista de IDs de mensagens com falha no lote:
- .NET
-
- SDK para .NET
-
Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.
Relatar falhas de itens em lote do Kinesis com o Lambda usando o .NET.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegration;
public class Function
{
// Powertools Logger requires an environment variables against your function
// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
{
if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return new StreamsEventResponse();
}
foreach (var record in evnt.Records)
{
try
{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return new StreamsEventResponse
{
BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
{
new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
}
};
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
return new StreamsEventResponse();
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
{
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async work
return data;
}
}
public class StreamsEventResponse
{
[JsonPropertyName("batchItemFailures")]
public IList<BatchItemFailure> BatchItemFailures { get; set; }
public class BatchItemFailure
{
[JsonPropertyName("itemIdentifier")]
public string ItemIdentifier { get; set; }
}
}
- Go
-
- SDK para Go V2
-
Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.
Relatar falhas de itens em lote do Kinesis com o Lambda usando Go.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
batchItemFailures := []map[string]interface{}{}
for _, record := range kinesisEvent.Records {
curRecordSequenceNumber := ""
// Process your record
if /* Your record processing condition here */ {
curRecordSequenceNumber = record.Kinesis.SequenceNumber
}
// Add a condition to check if the record processing failed
if curRecordSequenceNumber != "" {
batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
}
}
kinesisBatchResponse := map[string]interface{}{
"batchItemFailures": batchItemFailures,
}
return kinesisBatchResponse, nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK para Java 2.x
-
Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.
Relatar falhas de itens em lote do Kinesis com o Lambda usando Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {
@Override
public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
try {
//Process your record
KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
curRecordSequenceNumber = kinesisRecord.getSequenceNumber();
} catch (Exception e) {
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
return new StreamsEventResponse(batchItemFailures);
}
}
return new StreamsEventResponse(batchItemFailures);
}
}
- JavaScript
-
- SDK para JavaScript (v3)
-
Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.
Relatar falhas de itens em lote do Kinesis com o Lambda usando Javascript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
for (const record of event.Records) {
try {
console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
console.error(`An error occurred ${err}`);
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return {
batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
return { batchItemFailures: [] };
};
async function getRecordDataAsync(payload) {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
Relatar falhas de itens em lote do Kinesis com o Lambda usando TypeScript.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";
const logger = new Logger({
logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
export const functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<KinesisStreamBatchResponse> => {
for (const record of event.Records) {
try {
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return {
batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
logger.info(`Successfully processed ${event.Records.length} records.`);
return { batchItemFailures: [] };
};
async function getRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {
var data = Buffer.from(payload.data, "base64").toString("utf-8");
await Promise.resolve(1); //Placeholder for actual async work
return data;
}
- PHP
-
- SDK para PHP
-
Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.
Relatar falhas de itens em lote do Kinesis com o Lambda usando PHP.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler implements StdHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handle(mixed $event, Context $context): array
{
$kinesisEvent = new KinesisEvent($event);
$this->logger->info("Processing records");
$records = $kinesisEvent->getRecords();
$failedRecords = [];
foreach ($records as $record) {
try {
$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$failedRecords[] = $record->getSequenceNumber();
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
// change format for the response
$failures = array_map(
fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
$failedRecords
);
return [
'batchItemFailures' => $failures
];
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK para Python (Boto3).
-
Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.
Relatar falhas de itens em lote do Kinesis com o Lambda usando Python.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""
for record in records:
try:
# Process your record
curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
except Exception as e:
# Return failed record's sequence number
return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return {"batchItemFailures":[]}
- Ruby
-
- SDK para Ruby
-
Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.
Relatar falhas de item em lote do Kinesis com o Lambda usando Ruby.
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'
def lambda_handler(event:, context:)
batch_item_failures = []
event['Records'].each do |record|
begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"
# TODO: Do interesting work based on the new data
rescue StandardError => err
puts "An error occurred #{err}"
# Since we are working with streams, we can return the failed item immediately.
# Lambda will immediately begin to retry processing from this failed item onwards.
return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
end
end
puts "Successfully processed #{event['Records'].length} records."
{ batchItemFailures: batch_item_failures }
end
def get_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('utf-8')
# Placeholder for actual async work
sleep(1)
data
end
- Rust
-
- SDK para Rust
-
Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.
Relate falhas de itens em lote do Kinesis com o Lambda usando Rust.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
event::kinesis::KinesisEvent,
kinesis::KinesisEventRecord,
streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
let mut response = KinesisEventResponse {
batch_item_failures: vec![],
};
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(response);
}
for record in &event.payload.records {
tracing::info!(
"EventId: {}",
record.event_id.as_deref().unwrap_or_default()
);
let record_processing_result = process_record(record);
if record_processing_result.is_err() {
response.batch_item_failures.push(KinesisBatchItemFailure {
item_identifier: record.kinesis.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
return Ok(response);
}
}
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(response)
}
fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
let record_data = std::str::from_utf8(record.kinesis.data.as_slice());
if let Some(err) = record_data.err() {
tracing::error!("Error: {}", err);
return Err(Error::from(err));
}
let record_data = record_data.unwrap_or_default();
// do something interesting with the data
tracing::info!("Data: {}", record_data);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
O utilitário de processador em lote do Powertools para AWS Lambda lida de maneira automática com a lógica de respostas parciais em lote, reduzindo a complexidade da implementação de relatórios de falhas em lote. Veja a seguir alguns exemplos usando o processador em lote:
- Python
-
Processamento de registros de stream do Kinesis Data Streams com o processador em lote AWS Lambda.
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import KinesisEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
logger = Logger()
def record_handler(record):
logger.info(record)
# Your business logic here
# Raise an exception to mark this record as failed
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context
)
- TypeScript
-
Processamento de registros de stream do Kinesis Data Streams com o processador em lote AWS Lambda.
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { KinesisEvent, Context } from 'aws-lambda';
const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();
const recordHandler = async (record: any): Promise<void> => {
logger.info('Processing record', { record });
// Your business logic here
// Throw an error to mark this record as failed
};
export const handler = async (event: KinesisEvent, context: Context) => {
return processPartialResponse(event, recordHandler, processor, {
context,
});
};
- Java
-
Processamento de registros de stream do Kinesis Data Streams com o processador em lote AWS Lambda.
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
public class KinesisStreamBatchHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {
private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;
public KinesisStreamBatchHandler() {
handler = new BatchMessageHandlerBuilder()
.withKinesisBatchHandler()
.buildWithRawMessageHandler(this::processMessage);
}
@Override
public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
return handler.processBatch(kinesisEvent, context);
}
private void processMessage(KinesisEvent.KinesisEventRecord kinesisEventRecord, Context context) {
// Process the stream record
}
}
- .NET
-
Processamento de registros de stream do Kinesis Data Streams com o processador em lote AWS Lambda.
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;
[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]
namespace HelloWorld;
public class OrderEvent
{
public string? OrderId { get; set; }
public string? CustomerId { get; set; }
public decimal Amount { get; set; }
public DateTime OrderDate { get; set; }
}
internal class TypedKinesisRecordHandler : ITypedRecordHandler<OrderEvent>
{
public async Task<RecordHandlerResult> HandleAsync(OrderEvent orderEvent, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(orderEvent.OrderId))
{
throw new ArgumentException("Order ID is required");
}
return await Task.FromResult(RecordHandlerResult.None);
}
}
public class Function
{
[BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
{
return TypedKinesisStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
}