

# Amazon MSK およびセルフマネージド Apache Kafka イベントソースからのイベントのフィルタリング
<a name="kafka-filtering"></a>

イベントフィルタリングを使用して、Lambda が関数に送信するストリームまたはキューからのレコードを制御することができます。イベントフィルタリングの仕組みに関する一般情報については、「[Lambda が関数に送信するイベントを制御する](invocation-eventfiltering.md)」を参照してください。

**注記**  
Amazon MSK およびセルフマネージド Apache Kafka イベントソースマッピングは、`value` キーのフィルタリングのみをサポートします。

**Topics**
+ [Kafka イベントフィルタリングの基本](#filtering-kafka)

## Kafka イベントフィルタリングの基本
<a name="filtering-kafka"></a>

プロデューサーが Kafka クラスターのトピックに、有効な JSON 形式またはプレーン文字列として、メッセージを書き込むとします。レコードの例は次のようになり、`value` フィールドでメッセージが Base64 でエンコードされた文字列に変換されます。

```
{
    "mytopic-0":[
        {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[]
        }
    ]
}
```

Apache Kafka プロデューサーが次の JSON 形式でトピックにメッセージを書き込むとします。

```
{
    "device_ID": "AB1234",
    "session":{
        "start_time": "yyyy-mm-ddThh:mm:ss",
        "duration": 162
    }
}
```

`value` キーを使用してレコードをフィルタリングできます。`device_ID` が AB の文字で始まるレコードのみをフィルタリングするとします。`FilterCriteria` オブジェクトは次のようになります。

```
{
    "Filters": [
        {
            "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"
        }
    ]
}
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{
    "value": {
        "device_ID": [ { "prefix": "AB" } ]
      }
}
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従って **[フィルター条件]** に次の文字列を入力します。

```
{ "value" : { "device_ID" : [ { "prefix":  "AB" } ] } }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\":  \"AB\" } ] } }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\":  \"AB\" } ] } }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "value" : { "device_ID" : [ { "prefix":  "AB" } ] } }'
```

------

Kafka では、メッセージがプレーン文字列のレコードをフィルタリングすることもできます。文字列が「error」を含むメッセージを無視するとします。`FilterCriteria` オブジェクトは次のようになります。

```
{
    "Filters": [
        {
            "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"
        }
    ]
}
```

以下は、わかりやすくするためにプレーン JSON で展開したフィルターの `Pattern` の値を記載しています。

```
{
    "value": [
        {
        "anything-but": [ "error" ]
        }
    ]
}
```

コンソール、AWS CLI、または AWS SAM テンプレートを使用してフィルターを追加できます。

------
#### [ Console ]

コンソールを使用してこのフィルターを追加するには、[イベントソースマッピングへのフィルター条件のアタッチ (コンソール)](invocation-eventfiltering.md#filtering-console) の指示に従って **[フィルター条件]** に次の文字列を入力します。

```
{ "value" : [ { "anything-but": [ "error" ] } ] }
```

------
#### [ AWS CLI ]

AWS Command Line Interface (AWS CLI) を使用してこれらのフィルター条件を持つ新しいイベントソースマッピングを作成するには、以下のコマンドを実行します。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
```

これらのフィルター条件を既存のイベントソースマッピングに追加するには、次のコマンドを実行します。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
```

------
#### [ AWS SAM ]

AWS SAM を使用してこのフィルターを追加するには、イベントソースの YAML テンプレートに次のスニペットを追加します。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'
```

------

Kafka メッセージは UTF-8 でエンコードされた文字列 (プレーン文字列または JSON 形式) である必要があります。これは、Lambda がフィルター条件を適用する前に Kafka のバイト配列を UTF-8 にデコードするためです。メッセージが UTF-16 や ASCII などの別のエンコーディングを使用している場合、またはメッセージ形式が `FilterCriteria` 形式と一致しない場合、Lambda はメタデータフィルターのみを処理します。以下は、特定の動作を要約した表です。


| 着信メッセージの形式 | メッセージプロパティのフィルターパターン形式 | 結果として生じるアクション | 
| --- | --- | --- | 
|  プレーン文字列  |  プレーン文字列  |  Lambda がフィルター条件に基づいてフィルタリングを実行します。  | 
|  プレーン文字列  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  プレーン文字列  |  有効な JSON  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  プレーン文字列  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  データプロパティのフィルターパターンがない  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 
|  有効な JSON  |  有効な JSON  |  Lambda がフィルター条件に基づいてフィルタリングを実行します。  | 
|  UTF-8 以外でエンコードされた文字  |  JSON、プレーン文字列、またはパターンなし  |  Lambda がフィルター条件に基づいて (他のメタデータプロパティのみを) フィルタリングします。  | 