AWS源版本 2 的 Security Lake 查询 (OCSF 1.1.0) - Amazon Security Lake

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS源版本 2 的 Security Lake 查询 (OCSF 1.1.0)

以下部分提供了有关从 Security Lake 中查询数据的指导,并包括源版本 2 原生支持的AWSAWS源代码的一些查询示例。这些查询旨在检索特定数据AWS 区域。示例使用的是 us-east-1,即美国东部(弗吉尼亚州北部)。此外,示例查询使用 LIMIT 25 参数,最多返回 25 条记录。您可以省略该参数或根据自己的偏好进行调整。有关更多示例,请参阅 Amazon Security Lake OCSF 查询 GitHub 目录

您可以查询 Security Lake 存储在AWS Lake Formation数据库和表中的数据。您还可以在 Security Lake 控制台、API 或AWS CLI中创建第三方订阅用户。第三方订阅用户还可以从您指定的来源查询 Lake Formation 数据。

Lake Formation 数据湖管理员必须向查询数据的 IAM 身份授予相关数据库和表的 SELECT 权限。订阅用户也必须是在 Security Lake 中创建的,然后才能查询数据。有关如何创建具有查询权限的订阅用户的更多信息,请参阅管理 Security Lake 订阅用户的查询访问权限

以下查询包括基于时间的过滤器,eventDay用于确保您的查询在配置的保留设置范围内。有关更多信息,请参阅 Querying data with retention settings

例如,如果超过 60 天的数据已过期,则您的查询应包含时间限制,以防止访问过期的数据。对于 60 天的保留期,请在查询中加入以下子句:

... WHERE time_dt > DATE_ADD('day', -59, CURRENT_TIMESTAMP) ...

该条款使用 59 天(而不是 60 天)来避免 Amazon S3 和 Apache Iceberg 之间出现任何数据或时间重叠。

日志源表

查询 Security Lake 数据时,您必须将数据所在的 Lake Formation 表的名称包含在内。

SELECT * FROM "amazon_security_lake_glue_db_DB_Region"."amazon_security_lake_table_DB_Region_SECURITY_LAKE_TABLE" WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP LIMIT 25

日志源表的常见值包括以下内容:

  • cloud_trail_mgmt_2_0—AWS CloudTrail管理活动

  • lambda_execution_2_0— Lambda CloudTrail 的数据事件

  • s3_data_2_0— S3 CloudTrail 的数据事件

  • route53_2_0 – Amazon Route 53 Resolver 查询日志

  • sh_findings_2_0—AWS Security Hub CSPM调查结果

  • vpc_flow_2_0 – Amazon Virtual Private Cloud (Amazon VPC) 流日志

  • eks_audit_2_0— 亚马逊 Elastic Kubernetes Service(亚马逊 EKS)审计日志

  • waf_2_0—AWS WAF v2 日志

示例:表中所有来自 us-east sh_findings_2_0 -1 区域的 Security Hub CSPM 调查结果

SELECT * FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0" WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP LIMIT 25

数据库区域

查询 Security Lake 数据时,您必须将要从中查询数据的数据库区域名称包含在内。有关当前提供 Security Lake 的数据库区域的完整列表,请参阅 Amazon Security Lake 端点

示例:列出来自来源 IP 的亚马逊 Virtual Private Cloud 活动

以下示例列出了在(2023 年 3 月 1 日)之后20230301(2023 年 3 月 1 日)记录的vpc_flow_2_0来自源 IP 192.0.2.1 的所有 Amazon VPC 活动us-west-2DB_Region

SELECT * FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0" WHERE time_dt > TIMESTAMP '2023-03-01' AND src_endpoint.ip = '192.0.2.1' ORDER BY time_dt desc LIMIT 25

分区日期

通过对数据进行分区,您可以限制每次查询所扫描的数据量,从而提高性能并降低成本。与 Security Lake 1.0 相比,Security Lake 2.0 中的分区工作 Security Lake 现在通过time_dtregion、和accountid实现分区。而 Security Lake 1.0 通过eventDayregion、和accountid参数实现了分区。

查询time_dt将自动生成来自 S3 的日期分区,并且可以像 Athena 中任何基于时间的字段一样进行查询。

以下是使用 time_dt分区查询 2023 年 3 月 1 日之后的日志的查询示例:

SELECT * FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0" WHERE time_dt > TIMESTAMP '2023-03-01' AND src_endpoint.ip = '192.0.2.1' ORDER BY time desc LIMIT 25

time_dt 的常见值包括以下内容:

过去 1 年内发生的事件

WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '1' YEAR

过去 1 个月内发生的事件

WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '1' MONTH

过去 30 天内发生的事件

WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '30' DAY

过去 12 个小时内发生的事件

WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '12' HOUR

过去 5 分钟内发生的事件

WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '5' MINUTE

7-14 天前发生的事件

WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '14' DAY AND CURRENT_TIMESTAMP - INTERVAL '7' DAY

在特定日期当天或之后发生的事件

WHERE time_dt >= TIMESTAMP '2023-03-01'

示例:表中列出了 2023 年 3 月 1 日当天或之后来自源 IP 192.0.2.1 的所有 CloudTrail 活动 cloud_trail_mgmt_1_0

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay >= '20230301' AND src_endpoint.ip = '192.0.2.1' ORDER BY time desc LIMIT 25

示例:表中列出了过去 30 天内来自源 IP 192.0.2.1 的所有 CloudTrail 活动 cloud_trail_mgmt_1_0

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay > cast(date_format(current_timestamp - INTERVAL '30' day, '%Y%m%d%H') as varchar) AND src_endpoint.ip = '192.0.2.1' ORDER BY time desc LIMIT 25

查询安全湖观测数据

Observables 是 Security Lake 2.0 现已推出的一项新功能。可观察对象是一个枢轴元素,其中包含在事件中许多地方发现的相关信息。通过查询可观察数据,用户可以从其数据集中获得高级安全见解。

通过查询可观察对象中的特定元素,您可以将数据集限制为诸如特定用户名、资源 UIDs IPs、哈希值和其他 IOC 类型信息之类的内容

这是一个使用 observables 数组查询包含 IP 值 “172.01.02.03” 的 VPC Flow 和 Route53 表中的日志的示例查询

WITH a AS (SELECT time_dt, observable.name, observable.value FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0", UNNEST(observables) AS t(observable) WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP AND observable.value='172.01.02.03' AND observable.name='src_endpoint.ip'), b as (SELECT time_dt, observable.name, observable.value FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_route53_2_0", UNNEST(observables) AS t(observable) WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP AND observable.value='172.01.02.03' AND observable.name='src_endpoint.ip') SELECT * FROM a LEFT JOIN b ON a.value=b.value and a.name=b.name LIMIT 25

Amazon EKS 审核日志的安全湖查询示例

Amazon EKS 日志跟踪控制平面活动直接从 Amazon EKS 控制平面向您的账户提供审计和诊断 CloudWatch 日志。这些日志可让您轻松地保护和运行您的集群。订阅者可以查询 EKS 日志以了解以下类型的信息。

以下是AWS源版本 2 的 Amazon EKS 审核日志的一些查询示例:

过去 7 天内对特定 URL 的请求

SELECT time_dt, actor.user.name, http_request.url.path, activity_name FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0" WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP AND activity_name = 'get' and http_request.url.path = '/apis/coordination.k8s.io/v1/' LIMIT 25

更新过去 7 天来自 “10.0.97.167” 的请求

SELECT activity_name, time_dt, api.request, http_request.url.path, src_endpoint.ip, resources FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0" WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP AND src_endpoint.ip = '10.0.97.167' AND activity_name = 'Update' LIMIT 25

过去 7 天内与资源 “kube-controller-manager” 关联的请求和响应

SELECT activity_name, time_dt, api.request, api.response, resource.name FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0", UNNEST(resources) AS t(resource) WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP AND resource.name = 'kube-controller-manager' LIMIT 25