场景 2:为安全团队提供近实时数据
ABC2Badge 公司为企业或大型活动(例如 AWS re:Invent
在即将举行的活动中,由于与会者人数众多,活动安全团队已要求 ABC2Badge 每 15 分钟收集一次园区中人员最密集区域的数据。这将使安全团队有足够的时间做出反应,并按比例将安保人员分散到各个人员密集区域。鉴于安全团队提出的这一新要求以及构建流解决方案的经验不足,为了近实时地处理数据,ABC2Badge 正在寻找一种简单但可扩展且可靠的解决方案。
他们目前的数据仓库解决方案是 Amazon Redshift
使用 Amazon Kinesis Data Firehose 的新解决方案
Amazon Kinesis Data Firehose
Amazon Kinesis Data Firehose
Kinesis Data Firehose 旨在与您目前已在使用的现有商业智能工具和控制面板配合,从而实现近实时的分析。这是一项完全托管式无服务器服务,可以自动扩展以匹配数据吞吐量,并且无需持续管理。Kinesis Data Firehose 可以在加载数据前对数据进行批处理、压缩和加密,从而最大程度地减少在目标位置占用的存储量,同时提高安全性。它还可以使用 AWS Lambda 转换源数据,并将转换后的数据传送到目标位置。您可以配置数据生成者向 Kinesis Data Firehose 发送数据,然后 Kinesis Data Firehose 将数据自动传输到您指定的目标位置。
将数据发送到 Firehose 传输流
要将数据发送到您的传输流,有几种选项。AWS 为许多常用的编程语言提供了软件开发工具包,每个软件开发工具包都为 Amazon Kinesis Data Firehose
使用 Amazon Kinesis 代理
Amazon Kinesis 代理是一个独立的软件应用程序,它持续监控一组日志文件,以查找要发送到传输流的新数据。此代理会自动处理文件轮换、检查点操作、出现故障时的重试,并发出 Amazon CloudWatch 指标以监控传输流并排除其故障。可以将其他配置(例如数据预处理、监控多个文件目录以及写入多个传输流)应用于此代理。
此代理可以安装在基于 Linux 或 Windows 的服务器上,例如 Web 服务器、日志服务器和数据库服务器。安装此代理后,只需指定它将监控的日志文件及其将发送到的传输流即可。此代理将持久、可靠地将新数据发送到传输流。
将 API 与 AWS 软件开发工具包和 AWS 服务一起用作源
Kinesis Data Firehose API 提供两种向传输流发送数据的操作:PutRecord 在一次调用中发送一条数据记录。PutRecordBatch 在一次调用中发送多条数据记录,并且对于每个生成者可以实现更高的吞吐量。对于每种方法,使用此方法时都必须指定传输流的名称和数据记录或数据记录数组。有关 Kinesis Data Firehose API 操作的更多信息和示例代码,请参阅使用 AWS 软件开发工具包写入 Firehose 传输流。
Kinesis Data Firehose 还可以与 Kinesis Data Firehose、CloudWatch Logs、CloudWatch Events、Amazon Simple Notification Service
在传输到目标位置之前处理数据
在某些情况下,您可能希望在将流数据传输到其目标位置之前对其进行转换或增强。例如,数据生成者可能会在每条数据记录中发送非结构化文本,而您需要先将其转换为 JSON,然后再将其传输到 OpenSearch Service
Kinesis Data Firehose 具有内置的数据格式转换功能。使用此功能,可以轻松地将 JSON 数据流转换为 Apache Parquet 或 Apache ORC 文件格式。
数据转换流
为了启用流数据转换,Kinesis Data Firehose 使用您创建的 Lambda 函数来转换数据。Kinesis Data Firehose 将传入的数据缓冲到函数的指定缓冲区大小,然后以异步方式调用指定的 Lambda 函数。转换后的数据将从 Lambda 发送到 Kinesis Data Firehose,然后 Kinesis Data Firehose 将数据传输到目标位置。
数据格式转换
还可以启用 Kinesis Data Firehose 数据格式转换,这会将 JSON 数据流转换为 Apache Parquet 或 Apache ORC。此功能只能将 JSON 转换为 Apache Parquet 或 Apache ORC。如果您有 CSV 格式的数据,则可以通过 Lambda 函数将该数据转换为 JSON,然后应用数据格式转换。
数据传输
作为近实时的传输流,Kinesis Data Firehose 会缓冲传入的数据。达到传输流的缓冲阈值后,数据将传输到您配置的目标位置。Kinesis Data Firehose 将数据传输到每个目标位置的方式存在一些差异,本文将在以下各节中对此进行考察。
Amazon S3
Amazon S3
将数据传输到 Amazon S3
为了将数据传输到 Amazon S3,Kinesis Data Firehose 根据传输流的缓冲配置串联多个传入记录,然后将它们作为一个 S3 对象传输到 Amazon S3。向 S3 传输数据的频率由 S3 缓冲区大小(1 MB 到 128 MB)或缓冲区间隔(60 秒到 900 秒)决定,以先到者为准。
向 S3 存储桶传输数据可能会由于各种原因而失败。例如,存储桶不再存在、Kinesis Data Firehose 代入的 AWS Identity and Access Management
Amazon Redshift
Amazon Redshift
将数据传输到 Amazon Redshift
为了将数据传输到 Amazon Redshift,Kinesis Data Firehose 首先以前面描述的格式将传入的数据传输到您的 S3 存储桶。然后,Kinesis Data Firehose 发出 Amazon Redshift COPY 命令,以将数据从 S3 存储桶加载到您的 Amazon Redshift 集群。
从 S3 到 Amazon Redshift 的数据 COPY 操作的频率取决于 Amazon Redshift 集群完成 COPY 命令的速度。对于 Amazon Redshift 目标位置,您可以在创建传输流时指定重试持续时间(0 - 7200 秒),以处理数据传输失败。Kinesis Data Firehose 会在指定的持续时间内重试,如果失败,则跳过该特定批次的 S3 对象。所跳过对象的信息会以清单文件的形式传输到您的 S3 存储桶中的 errors/ 文件夹内,您可以利用该清单文件进行手动回填。
以下是 Kinesis Data Firehose 到 Amazon Redshift 数据流的架构图。尽管此数据流是 Amazon Redshift 独有的,但 Kinesis data Firehose 对于其他目标位置也遵循类似的模式。
从 Kinesis Data Firehose 到 Amazon Redshift 的数据流
Amazon OpenSearch Service (OpenSearch Service)
OpenSearch Service
将数据传输到 OpenSearch Service
为了将数据传输 OpenSearch Service,Kinesis Data Firehose 根据传输流的缓冲配置缓冲传入的记录,然后生成 OpenSearch 批处理请求,以将多条记录编入到 OpenSearch 集群的索引中。向 OpenSearch Service 传输数据的频率由 OpenSearch 缓冲区大小(1 MB 到 100 MB)或缓冲区间隔(60 秒到 900 秒)值决定,以先到者为准。
对于 OpenSearch Service 目标位置,您可以在创建传输流时指定重试时长(0 - 7200 秒)。Kinesis Data Firehose 会重试指定的时长,然后跳过该特定的索引请求。跳过的文档会传输到您的 S3 存储桶中的 elasticsearch_failed/ 文件夹内,您可以利用它进行手动回填。
Amazon Kinesis Data Firehose 可以基于时间段轮换 OpenSearch Service 索引。根据您选择的轮换选项(NoRotation、OneHour、OneDay、OneWeek 或 OneMonth),Kinesis Data Firehose 向您指定的索引名称追加协调世界时 (UTC) 到达时间戳的一部分。
自定义 HTTP 终端节点或受支持的第三方服务提供商
Kinesis Data Firehose 可以将数据发送到自定义 HTTP 终端节点或受支持的第三方提供商,例如 Datadog、Dynatrace、LogicMonitor、MongoDB、New Relic、Splunk 和 Sumo Logic。
自定义 HTTP 终端节点或受支持的第三方服务提供商
为使 Kinesis Data Firehose 能够成功地将数据传输到自定义 HTTP 终端节点,这些终端节点必须使用特定的 Kinesis Data Firehose 请求和响应格式来接受请求和发送响应。
将数据传输到受支持的第三方服务提供商拥有的 HTTP 终端节点时,您可以使用集成的 AWS Lambda 服务创建一个函数,以将传入的记录转换为与服务提供商集成所期望的格式相匹配的格式。
对于数据传输频率,每个服务提供商都有建议的缓冲区大小。请与您的服务提供商联系,了解有关他们建议的缓冲区大小的更多信息。对于数据传输失败的处理,Kinesis data Firehose 首先通过等待来自目标位置的响应来建立与 HTTP 终端节点的连接。Kinesis Data Firehose 会继续建立连接,直到重试持续时间过期。超过该时间之后,Kinesis Data Firehose 会将其视为数据传输失败,并将数据备份到您的 S3 存储桶。
总结
Kinesis Data Firehose 可以持续将您的流数据传输到受支持的目标位置。这是一个完全托管式解决方案,几乎不需要或根本不需要开发。对于 ABC2Badge 公司,使用 Kinesis Data Firehose 是很自然的选择。他们已经在使用 Amazon Redshift 作为其数据仓库解决方案。由于他们的数据源持续写入事务日志,因此他们能够利用 Amazon Kinesis 代理来流式处理该数据,而无需编写任何其他代码。现在,ABC2Badge 公司已经创建了传感器记录流,并正在通过 Kinesis Data Firehose 接收这些记录,他们可以将其用作安全团队使用案例的基础。