搭配 Kinesis 事件來源使用事件篩選 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 Kinesis 事件來源使用事件篩選

您可以使用事件篩選來控制 Lambda 將哪些記錄從串流或佇列中傳送至函數。如需事件篩選運作方式的一般資訊,請參閱控制 Lambda 將哪些事件傳送至您的函數

本節重點介紹 Kinesis 事件來源的事件篩選。

注意

Kinesis 事件來源映射僅支援對data金鑰進行篩選。

Kinesis 事件篩選基本概念

假設生產者將 JSON 格式的資料放入您的 Kinesis 資料串流中。範例記錄如下所示,JSON 資料在 data 欄位中會轉換為 Base64 編碼字串。

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

只要生產者放入串流中的資料是有效的 JSON,您就可以使用事件篩選透過 data 索引鍵來篩選記錄。假設生產者以下列 JSON 格式將記錄放入 Kinesis 串流中。

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

若僅篩選訂單類型為「購買」的記錄,FilterCriteria 物件如下所示。

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

補充說明,此處是篩選條件的 Pattern 在純文字 JSON 中擴展的值。

{ "data": { "order": { "type": [ "buy" ] } } }

您可以使用 主控台 AWS CLI 或 AWS SAM 範本來新增篩選條件。

Console

若要使用主控台新增此篩選條件,請遵循 將篩選條件標準連接至事件來源映射 (主控台) 中的指示,並針對篩選條件標準輸入下列字串。

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

若要使用 AWS Command Line Interface (AWS CLI) 使用這些篩選條件建立新的事件來源映射,請執行下列命令。

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

若要將這些篩選條件標準新增到現有事件來源映射,請執行下列命令。

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

若要使用 新增此篩選條件 AWS SAM,請將下列程式碼片段新增至事件來源的 YAML 範本。

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

若要正確篩選來自 Kinesis 來源的事件,資料欄位和資料欄位的篩選條件標準都必須是有效的 JSON 格式。如果其中一個欄位不是有效的 JSON 格式,則 Lambda 會捨棄訊息或擲回例外狀況。下表摘要說明特定行為:

傳入資料格式 資料屬性的篩選條件模式格式 產生的動作

有效的 JSON

有效的 JSON

根據您的篩選條件標準之 Lambda 篩選條件。

有效的 JSON

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

有效的 JSON

非 JSON

Lambda 會在事件來源映射建立或更新時擲回例外狀況。資料屬性的篩選條件模式必須是有效的 JSON 格式。

非 JSON

有效的 JSON

Lambda 捨棄記錄。

非 JSON

資料屬性沒有篩選條件模式

Lambda 篩選條件 (僅限其他中繼資料屬性) 會根據您的篩選條件標準而定。

非 JSON

非 JSON

Lambda 會在事件來源映射建立或更新時擲回例外狀況。資料屬性的篩選條件模式必須是有效的 JSON 格式。

篩選 Kinesis 彙總記錄

使用 Kinesis,可以將多個記錄彙總到單一 Kinesis 資料串流記錄中,以提高資料輸送量。Lambda 只會在您使用 Kinesis 增強型展開傳送時,將篩選條件標準套用至彙總記錄。不支援使用標準 Kinesis 篩選彙總記錄。使用增強型展開傳送時,您可以設定 Kinesis 專用輸送量取用程式,以充當 Lambda 函數的觸發條件。Lambda 接著會篩選彙總記錄,並僅傳遞符合篩選條件標準的記錄。

若要進一步了解 Kinesis 記錄彙總,請參閱「Kinesis Producer Library (KPL) 主要概念」頁面上的彙總部分。若要進一步了解如何搭配 Kinesis 增強型廣發功能使用 Lambda,請參閱 AWS 運算部落格上的搭配 Amazon Kinesis Data Streams 增強型廣發功能和 AWS Lambda 提高即時串流處理效能