使用適用於 Go V2 的 SDK 的 Kinesis 範例 - AWS SDK 程式碼範例

文件 AWS 開發套件範例 GitHub 儲存庫中有更多可用的 AWS SDK 範例

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用適用於 Go V2 的 SDK 的 Kinesis 範例

下列程式碼範例示範如何使用 適用於 Go 的 AWS SDK V2 搭配 Kinesis 來執行動作和實作常見案例。

每個範例均包含完整原始碼的連結,您可在連結中找到如何設定和執行內容中程式碼的相關指示。

無伺服器範例

以下程式碼範例示範如何實作 Lambda 函式,該函式會透過接收 Kinesis 串流的記錄來接收所觸發的事件。此函數會擷取 Kinesis 承載、從 Base64 解碼,並記錄記錄內容。

SDK for Go V2
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Go 搭配 Lambda 來使用 Kinesis 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { if len(kinesisEvent.Records) == 0 { log.Printf("empty Kinesis event received") return nil } for _, record := range kinesisEvent.Records { log.Printf("processed Kinesis event with EventId: %v", record.EventID) recordDataBytes := record.Kinesis.Data recordDataText := string(recordDataBytes) log.Printf("record data: %v", recordDataText) // TODO: Do interesting work based on the new data } log.Printf("successfully processed %v records", len(kinesisEvent.Records)) return nil } func main() { lambda.Start(handler) }

下列程式碼範例示範如何針對接收來自 Kinesis 串流之事件的 Lambda 函式,實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。

SDK for Go V2
注意

GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Go 搭配 Lambda 來報告 Kinesis 批次項目失敗。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, record := range kinesisEvent.Records { curRecordSequenceNumber := "" // Process your record if /* Your record processing condition here */ { curRecordSequenceNumber = record.Kinesis.SequenceNumber } // Add a condition to check if the record processing failed if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber}) } } kinesisBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return kinesisBatchResponse, nil } func main() { lambda.Start(handler) }