

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Security Lake 查询AWS源版本 1 (OCSF 1.0.0-rc.2)
<a name="subscriber-query-examples1"></a>

以下部分提供了有关从 Security Lake 中查询数据的指导，并包括源版本 1 中原生支持的AWSAWS源代码的一些查询示例。这些查询旨在检索特定数据AWS 区域。示例使用的是 us-east-1，即美国东部（弗吉尼亚州北部）。此外，示例查询使用 `LIMIT 25` 参数，最多返回 25 条记录。您可以省略该参数或根据自己的偏好进行调整。有关更多示例，请参阅 [Amazon Security Lake OCSF 查询 GitHub 目录](https://github.com/awslabs/aws-security-analytics-bootstrap/tree/main/AWSSecurityAnalyticsBootstrap/amazon_security_lake_queries)。

以下查询包括基于时间的过滤器，`eventDay`用于确保您的查询在配置的保留设置范围内。有关更多信息，请参阅 [Querying data with retention settings](subscriber-query-examples.md#security-lake-retention-setting-query-data)。

例如，如果超过 60 天的数据已过期，则您的查询应包含时间限制，以防止访问过期的数据。对于 60 天的保留期，请在查询中加入以下子句：

```
...
WHERE eventDay BETWEEN cast(date_format(current_date - INTERVAL '59' day, '%Y%m%d') AS varchar) 
                   AND cast(date_format(current_date, '%Y%m%d') AS varchar)
...
```

该条款使用 59 天（而不是 60 天）来避免 Amazon S3 和 Apache Iceberg 之间出现任何数据或时间重叠。

## 日志源表
<a name="log-source-table"></a>

查询 Security Lake 数据时，您必须将数据所在的 Lake Formation 表的名称包含在内。

```
SELECT *
   FROM amazon_security_lake_glue_db_DB_Region.amazon_security_lake_table_DB_Region_SECURITY_LAKE_TABLE
   WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
   LIMIT 25
```

日志源表的常见值包括以下内容：
+ `cloud_trail_mgmt_1_0`—AWS CloudTrail管理活动
+ `lambda_execution_1_0`— Lambda CloudTrail 的数据事件
+ `s3_data_1_0`— S3 CloudTrail 的数据事件
+ `route53_1_0` – Amazon Route 53 Resolver 查询日志
+ `sh_findings_1_0`—AWS Security Hub CSPM调查结果
+ `vpc_flow_1_0` – Amazon Virtual Private Cloud (Amazon VPC) 流日志

**示例：表中所有来自 us-east `sh_findings_1_0` -1 区域的 Security Hub CSPM 调查结果**

```
SELECT *
   FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
   WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
   LIMIT 25
```

## 数据库区域
<a name="database-region"></a>

查询 Security Lake 数据时，您必须将要从中查询数据的数据库区域名称包含在内。有关当前提供 Security Lake 的数据库区域的完整列表，请参阅 [Amazon Security Lake 端点](https://docs.aws.amazon.com/general/latest/gr/securitylake.html)。

**示例：列出来自源 IP AWS CloudTrail的活动**

以下示例列出了在（2023 年 3 月 1 日）之后*20230301*（2023 年 3 月 1 日）记录的*cloud\$1trail\$1mgmt\$11\$10*来自源 IP *192.0.2.1* 的所有 CloudTrail 活动*us-east-1*`DB_Region`。

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay > '20230301' AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT 25
```

## 分区日期
<a name="partition-date"></a>

通过对数据进行分区，您可以限制每次查询所扫描的数据量，从而提高性能并降低成本。Security Lake 通过 `eventDay`、`region` 和 `accountid` 参数实施分区。`eventDay` 分区采用格式 `YYYYMMDD`。

以下是使用 `eventDay` 分区的查询示例：

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay > '20230301'
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
```

`eventDay` 的常见值包括以下内容：

**过去 1 年内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '1' year, '%Y%m%d%H') as varchar)`

**过去 1 个月内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '1' month, '%Y%m%d%H') as varchar)`

**过去 30 天内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '30' day, '%Y%m%d%H') as varchar)`

**过去 12 个小时内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '12' hour, '%Y%m%d%H') as varchar)`

**过去 5 分钟内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '5' minute, '%Y%m%d%H') as varchar)`

**7-14 天前发生的事件**  
`BETWEEN cast(date_format(current_timestamp - INTERVAL '14' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar)`

**在特定日期当天或之后发生的事件**  
`>= '20230301'`

**示例：表中列出了 2023 年 3 月 1 日当天或之后来自源 IP `192.0.2.1` 的所有 CloudTrail 活动 `cloud_trail_mgmt_1_0`**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay >= '20230301'
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT 25
```

**示例：表中列出了过去 30 天内来自源 IP `192.0.2.1` 的所有 CloudTrail 活动 `cloud_trail_mgmt_1_0`**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay > cast(date_format(current_timestamp - INTERVAL '30' day, '%Y%m%d%H') as varchar) 
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT 25
```

# Security Lake 查询 CloudTrail 数据的示例
<a name="cloudtrail-query-examples"></a>

AWS CloudTrail跟踪中的用户活动和 API 使用情况AWS 服务。订阅者可以查询 CloudTrail 数据以了解以下类型的信息：

以下是AWS源版本 1 的一些 CloudTrail 数据查询示例：

**过去 7 天AWS 服务内未经授权的企图**

```
SELECT 
      time, 
      api.service.name, 
      api.operation, 
      api.response.error, 
      api.response.message, 
      unmapped['responseElements'], 
      cloud.region, 
      actor.user.uuid, 
      src_endpoint.ip, 
      http_request.user_agent
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND api.response.error in (
        'Client.UnauthorizedOperation',
        'Client.InvalidPermission.NotFound',
        'Client.OperationNotPermitted',
        'AccessDenied')
    ORDER BY time desc
    LIMIT 25
```

**过去 7 天`192.0.2.1`内来自源 IP 的所有 CloudTrail 活动清单**

```
SELECT 
      api.request.uid, 
      time, 
      api.service.name, 
      api.operation, 
      cloud.region, 
      actor.user.uuid, 
      src_endpoint.ip, 
      http_request.user_agent
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    AND src_endpoint.ip = '127.0.0.1.'
    ORDER BY time desc
    LIMIT 25
```

**过去 7 天内所有 IAM 活动的列表**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
      AND api.service.name = 'iam.amazonaws.com'
    ORDER BY time desc
    LIMIT 25
```

**过去 7 天内使用过凭证 `AIDACKCEVSQ6C2EXAMPLE` 的实例**

```
SELECT 
      actor.user.uid, 
      actor.user.uuid, 
      actor.user.account_uid, 
      cloud.region
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
      AND actor.user.credential_uid = 'AIDACKCEVSQ6C2EXAMPLE'
      LIMIT 25
```

**过去 7 天内失败的 CloudTrail 记录列表**

```
SELECT 
      actor.user.uid, 
      actor.user.uuid, 
      actor.user.account_uid, 
      cloud.region
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE status='failed' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    ORDER BY time DESC
    LIMIT 25
```

# Route 53 解析器查询日志的安全湖查询示例
<a name="route53_1_0-query-examples"></a>

Amazon Route 53 Resolver 查询日志可以跟踪由 Amazon VPC 中的资源进行的 DNS 查询。订阅用户可以查询 Route 53 Resolver 查询日志，以了解以下类型的信息：

以下是AWS源版本 1 的 Route 53 解析器查询日志的一些示例查询：

**过去 7 天 CloudTrail 内的 DNS 查询列表**

```
SELECT 
      time,
      src_endpoint.instance_uid,
      src_endpoint.ip,
      src_endpoint.port,
      query.hostname,
      rcode
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
    ORDER BY time DESC
    LIMIT 25
```

**过去 7 天内与 `s3.amazonaws.com` 匹配的 DNS 查询的列表**

```
SELECT 
      time,
      src_endpoint.instance_uid,
      src_endpoint.ip,
      src_endpoint.port,
      query.hostname,
      rcode,
      answers
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0
    WHERE query.hostname LIKE 's3.amazonaws.com.' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    ORDER BY time DESC
    LIMIT 25
```

**过去 7 天内未解析的 DNS 查询的列表**

```
SELECT 
      time, 
      src_endpoint.instance_uid, 
      src_endpoint.ip, 
      src_endpoint.port, 
      query.hostname, 
      rcode, 
      answers
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0
    WHERE cardinality(answers) = 0 and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    LIMIT 25
```

**过去 7 天内解析到 `192.0.2.1`** 的 DNS 查询的列表

```
SELECT 
      time, 
      src_endpoint.instance_uid, 
      src_endpoint.ip, 
      src_endpoint.port, 
      query.hostname, 
      rcode, 
      answer.rdata
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0
    CROSS JOIN UNNEST(answers) as st(answer)
    WHERE answer.rdata='192.0.2.1' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    LIMIT 25
```

# Security Lake 对 Security Hub CSPM 调查结果的查询示例
<a name="security-hub-query-examples"></a>

Security Hub CSPM 为您提供安全状态的全面视图，AWS并帮助您根据安全行业标准和最佳实践检查您的环境。Security Hub CSPM 会生成用于安全检查的结果，并接收来自第三方服务的调查结果。

以下是 Security Hub CSPM 调查结果的一些示例查询：

**过去 7 天内严重性等级大于或等于 `MEDIUM` 的新调查发现**

```
SELECT 
      time,
      finding,
      severity
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0_findings
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND severity_id >= 3
      AND state_id = 1
    ORDER BY time DESC
    LIMIT 25
```

**过去 7 天内的重复调查发现**

```
SELECT 
    finding.uid,
    MAX(time) AS time,
    ARBITRARY(region) AS region,
    ARBITRARY(accountid) AS accountid,
    ARBITRARY(finding) AS finding,
    ARBITRARY(vulnerabilities) AS vulnerabilities
FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
GROUP BY finding.uid
LIMIT 25
```

**过去 7 天内的所有非信息性调查发现**

```
SELECT 
      time,
      finding.title,
      finding,
      severity
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
    WHERE severity != 'Informational' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    LIMIT 25
```

**资源为 Amazon S3 存储桶的调查发现（无时间限制）**

```
SELECT *
   FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
   WHERE any_match(resources, element -> element.type = 'amzn-s3-demo-bucket')
   LIMIT 25
```

**通用漏洞评分系统 (CVSS) 得分大于 `1` 的调查发现（无时间限制）**

```
SELECT *
   FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
   WHERE any_match(vulnerabilities, element -> element.cve.cvss.base_score > 1.0)
   LIMIT 25
```

**符合通用漏洞披露 (CVE) `CVE-0000-0000` 的调查发现（无时间限制）**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
    WHERE any_match(vulnerabilities, element -> element.cve.uid = 'CVE-0000-0000')
    LIMIT 25
```

**过去 7 天内从 Security Hub CSPM 发送调查结果的产品数量**

```
SELECT 
      metadata.product.feature.name,
      count(*)
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    GROUP BY metadata.product.feature.name
    ORDER BY metadata.product.feature.name DESC
    LIMIT 25
```

**过去 7 天内调查发现中的资源类型数量**

```
SELECT 
      count(*),
      resource.type
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
        CROSS JOIN UNNEST(resources) as st(resource)
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    GROUP BY resource.type
    LIMIT 25
```

**过去 7 天内调查发现中的易受攻击软件包**

```
SELECT 
      vulnerability
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0,
    UNNEST(vulnerabilities) as t(vulnerability)  
    WHERE vulnerabilities is not null
    LIMIT 25
```

**过去 7 天内发生更改的调查发现**

```
SELECT 
    finding.uid,
    finding.created_time,
    finding.first_seen_time,
    finding.last_seen_time,
    finding.modified_time,
    finding.title,
    state
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    LIMIT 25
```

# Amazon VPC 流日志的安全湖查询示例
<a name="vpc-query-examples"></a>

Amazon Virtual Private Cloud (Amazon VPC) 提供有关进出 VPC 网络接口的 IP 流量的详细信息。

以下是AWS源版本 1 的 Amazon VPC 流日志的一些查询示例：

**最近 7 天的具体AWS 区域流量**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND region in ('us-east-1','us-east-2','us-west-2')
    LIMIT 25
```

**过去 7 天内来自源 IP `192.0.2.1` 和源端口 `22` 的活动的列表**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND src_endpoint.ip = '192.0.2.1'
      AND src_endpoint.port = 22
    LIMIT 25
```

**过去 7 天内不同目标 IP 地址的数量**

```
SELECT
    COUNT(DISTINCT dst_endpoint.ip) 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
    LIMIT 25
```

**过去 7 天内源自 198.51.100.0/24 的流量**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
    AND split_part(src_endpoint.ip,'.', 1)='198'AND split_part(src_endpoint.ip,'.', 2)='51'
    LIMIT 25
```

**过去 7 天内的所有 HTTPS 流量**

```
SELECT
      dst_endpoint.ip as dst, 
      src_endpoint.ip as src, 
      traffic.packets 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND dst_endpoint.port = 443
    GROUP BY 
      dst_endpoint.ip, 
      traffic.packets, 
      src_endpoint.ip 
    ORDER BY traffic.packets DESC 
    LIMIT 25
```

**按过去 7 天内发送到端口 `443` 的连接的数据包数量排序**

```
SELECT
      traffic.packets,
      dst_endpoint.ip
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND dst_endpoint.port = 443 
    GROUP BY 
      traffic.packets,
      dst_endpoint.ip
    ORDER BY traffic.packets DESC
    LIMIT 25
```

**过去 7 天内 IP `192.0.2.1` 和 `192.0.2.2` 之间的所有流量**

```
SELECT
      start_time, 
      end_time, 
      src_endpoint.interface_uid, 
      connection_info.direction,
      src_endpoint.ip,
      dst_endpoint.ip,
      src_endpoint.port,
      dst_endpoint.port,
      traffic.packets,
      traffic.bytes
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND(
        src_endpoint.ip = '192.0.2.1'
        AND dst_endpoint.ip = '192.0.2.2')
      OR (
        src_endpoint.ip = '192.0.2.2'
        AND dst_endpoint.ip = '192.0.2.1')
    ORDER BY start_time ASC
    LIMIT 25
```

**过去 7 天内的所有入站流量**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND connection_info.direction = 'ingress'
    LIMIT 25
```

**过去 7 天的所有出站流量**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND connection_info.direction = 'egress'
    LIMIT 25
```

**过去 7 天内所有被拒绝的流量**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND type_uid = 400105
    LIMIT 25
```