

# Kinesis イベントソースでのイベントフィルタリングの使用
<a name="with-kinesis-filtering"></a>

イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「[Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)」を参照してください。

このセクションでは、Kinesis イベントソースのイベントフィルタリングに焦点を当てます。

**注記**  
Kinesis イベントソースマッピングは、 `data` キーでのフィルタリングのみをサポートします。

**Topics**
+ [Kinesis イベントフィルタリングの基本](#filtering-kinesis)
+ [Kinesis 集約レコードのフィルタリング](#filtering-kinesis-efo)

## Kinesis イベントフィルタリングの基本
<a name="filtering-kinesis"></a>

プロデューサーが JSON 形式のデータを Kinesis データストリームに入力するとします。レコードの例は次のようになり、`data` フィールドで JSON データが Base64 でエンコードされた文字列に変換されます。

```
{
    "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=",
        "approximateArrivalTimestamp": 1545084650.987
        },
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
```

プロデューサーがストリームに入力するデータが有効な JSON である限り、イベントフィルタリングを使用して `data` キーを使用するレコードをフィルタリングできます。プロデューサーが次の JSON 形式でレコードを Kinesis ストリームに入力するとします。

```
{
    "record": 12345,
    "order": {
        "type": "buy",
        "stock": "ANYCO",
        "quantity": 1000
        }
}
```

注文タイプが「購入」のレコードのみをフィルタリングするには、`FilterCriteria` オブジェクトは次のようになります。

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"
        }
    ]
}
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{
    "data": {
        "order": {
            "type": [ "buy" ]
            }
      }
}
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従って **[フィルター条件]** に次の文字列を入力します。

```
{ "data" : { "order" : { "type" : [ "buy" ] } } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'
```

------

Kinesis ソースからイベントを適切にフィルタリングするには、データフィールドおよびデータフィールドのフィルター条件の両方が有効な JSON 形式である必要があります。フィールドのどちらかが有効な JSON 形式ではない場合、Lambda はメッセージをドロップするか、例外をスローします。以下は、特定の動作を要約した表です。


| 着信データの形式 | データプロパティのフィルターパターンの形式 | 結果として生じるアクション | 
| --- | --- | --- | 
|  有効な JSON  |  有効な JSON  |  Lambda がフィルター条件に基づいてフィルタリングを実行します。  | 
|  有効な JSON  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  JSON 以外  |  Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。  | 
|  JSON 以外  |  有効な JSON  |  Lambda がレコードをドロップします。  | 
|  JSON 以外  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  JSON 以外  |  JSON 以外  |  Lambda がイベントソースマッピングの作成または更新時に例外をスローします。データプロパティのフィルターパターンは、有効な JSON 形式である必要があります。  | 

## Kinesis 集約レコードのフィルタリング
<a name="filtering-kinesis-efo"></a>

Kinesis を使用すると、複数のレコードを 1 つの Kinesis データストリームレコードに集約し、データスループットを増加させることができます。Lambda は、Kinesis 「[拡張ファンアウト](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)」を使用する場合に限り、集約レコードにフィルター条件を適用できます。標準 Kinesis による集約レコードのフィルタリングはサポートされていません。拡張ファンアウトを使用するときは、Kinesis 専用スループットコンシューマーが Lambda 関数のトリガーとして機能するように設定します。次に、Lambda は集約されたレコードをフィルタリングし、フィルター条件を満たすレコードのみを渡します。

Kinesis レコード集約の詳細については、「Kinesis プロデューサーライブラリ (KPL) のキーコンセプト」ページの「[集約](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation)」セクションを参照してください。Kinesis 拡張ファンアウトを用いた Lambda の使用に関する詳細については、「AWS コンピュートブログ」の「[Amazon Kinesis Data Streams 拡張ファンアウトおよび AWS Lambda でのリアルタイムストリーム処理パフォーマンスの向上](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/)」を参照してください。