

# 从 Amazon MSK 和自托管式 Apache Kafka 事件源中筛选事件
<a name="kafka-filtering"></a>

您可以使用事件筛选，控制 Lambda 将流或队列中的哪些记录发送给函数。有关事件筛选工作原理的一般信息，请参阅 [控制 Lambda 向您的函数发送的事件](invocation-eventfiltering.md)。

**注意**  
Amazon MSK 和自托管式 Apache Kafka 事件源映射仅支持对 `value` 键进行筛选。

**Topics**
+ [Kafka 事件筛选基础知识](#filtering-kafka)

## Kafka 事件筛选基础知识
<a name="filtering-kafka"></a>

假设创建者以有效的 JSON 格式或纯字符串的形式将消息写入 Kafka 集群中的主题。示例记录将如下所示，`value` 字段中的消息会转换为 Base64 编码字符串。

```
{
    "mytopic-0":[
        {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[]
        }
    ]
}
```

假设 Apache Kafka 创建器以如下 JSON 格式将消息写入主题。

```
{
    "device_ID": "AB1234",
    "session":{
        "start_time": "yyyy-mm-ddThh:mm:ss",
        "duration": 162
    }
}
```

您可以使用 `value` 键筛选记录。假设您只想筛选 `device_ID` 以字母 AB 开头的记录。`FilterCriteria` 对象将如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"
        }
    ]
}
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{
    "value": {
        "device_ID": [ { "prefix": "AB" } ]
      }
}
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "value" : { "device_ID" : [ { "prefix":  "AB" } ] } }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\":  \"AB\" } ] } }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\":  \"AB\" } ] } }"}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "value" : { "device_ID" : [ { "prefix":  "AB" } ] } }'
```

------

通过 Kafka，您还可以筛选消息为纯字符串的记录。假设您想忽略字符串为“错误”的消息。`FilterCriteria` 对象将如下所示。

```
{
    "Filters": [
        {
            "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"
        }
    ]
}
```

为了更清楚起见，以下是在纯 JSON 中展开的筛选条件 `Pattern` 的值。

```
{
    "value": [
        {
        "anything-but": [ "error" ]
        }
    ]
}
```

您可以使用控制台、AWS CLI 或 AWS SAM 模板添加筛选条件。

------
#### [ Console ]

要使用控制台添加此筛选条件，请按照 [将筛选条件附加到事件源映射（控制台）](invocation-eventfiltering.md#filtering-console) 中的说明，为**筛选条件**输入以下字符串。

```
{ "value" : [ { "anything-but": [ "error" ] } ] }
```

------
#### [ AWS CLI ]

要使用 AWS Command Line Interface（AWS CLI）创建包含这些筛选条件的新事件源映射，请运行以下命令。

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
```

要将这些筛选条件添加到现有事件源映射中，请运行以下命令。

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
```

------
#### [ AWS SAM ]

要使用 AWS SAM 添加此筛选条件，请将以下代码段添加到事件源的 YAML 模板中。

```
FilterCriteria:
  Filters:
    - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'
```

------

Kafka 消息必须是 UTF-8 编码的字符串，可以是纯字符串或 JSON 格式。这是因为 Lambda 在应用筛选条件之前将 Kafka 字节数组解码为 UTF-8。如果您的消息使用另一种编码，例如 UTF-16 或 ASCII，或者消息格式与 `FilterCriteria` 格式不匹配，则 Lambda 仅处理元数据筛选条件。下表汇总了具体行为：


| 传入消息格式 | 消息属性的筛选条件模式格式 | 导致的操作 | 
| --- | --- | --- | 
|  纯字符串  |  纯字符串  |  Lambda 根据您的筛选条件进行筛选。  | 
|  纯字符串  |  数据属性中没有筛选条件模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  纯字符串  |  有效 JSON  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  有效 JSON  |  纯字符串  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  有效 JSON  |  数据属性中没有筛选条件模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 
|  有效 JSON  |  有效 JSON  |  Lambda 根据您的筛选条件进行筛选。  | 
|  非 UTF-8 编码字符串  |  JSON、纯字符串或无模式  |  Lambda 根据您的筛选条件进行筛选（仅限其他元数据属性）。  | 