Amazon Kinesis - Amazon Timestream

有关与适用于 LiveAnalytics 的 Amazon Timestream 类似的功能,可以考虑使用适用于 InfluxDB 的 Amazon Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。

Amazon Kinesis

使用适用于 Apache Flink 的托管服务示例 Timestream 数据连接器,将数据从 Kinesis 数据流发送到适用于 LiveAnalytics 的 Timestream。有关更多信息,请参阅 Apache Flink 的 适用于 Apache Flink 的亚马逊托管服务

使用 EventBridge 管道,以将 Kinesis 数据发送到 Timestream

可使用 EventBridge 管道以将数据从 Kinesis 流发送到适用于 LiveAnalytics 的 Amazon Timestream 表。

管道用于支持的来源和目标之间的点对点集成,并支持高级转换和扩充。在开发事件驱动型架构时,管道可减少对专业知识和集成代码的需求。要设置管道,请选择源、添加可选筛选、定义可选富集,然后为事件数据选择目标。

源向 EventBridge 管道发送事件,管道对匹配的事件进行筛选并将其路由到目标。

此集成使您能够利用 Timestream 时间序列数据分析功能的强大优势,同时简化数据摄取管道。

将 EventBridge 管道与 Timestream 结合使用可提供以下优势:

  • 实时数据摄取:将数据从 Kinesis 直接流式传输到适用于 LiveAnalytics 的 Timestream,从而实现实时分析和监控。

  • 无缝集成:利用 EventBridge 管道管理数据流,无需进行复杂的自定义集成。

  • 增强的筛选和转换:在 Kinesis 记录存储至 Timestream 前对其进行筛选或转换,以满足特定数据处理要求。

  • 可扩展性:利用内置的并行和批处理功能,处理高吞吐量数据流,并确保高效的数据处理。

配置

要设置 EventBridge 管道以将数据从 Kinesis 流式传输到 Timestream,请按照以下步骤操作:

  1. 创建 Kinesis 流

    确保您有活跃的 Kinesis 数据流,可用于摄取数据。

  2. 创建 Timestream 数据库和表

    设置将用于存储数据的 Timestream 数据库和表。

  3. 配置 EventBridge 管道:

    • 来源:选择 Kinesis 流作为来源。

    • 目标:选择 Timestream 作为目标。

    • 批处理设置:定义批处理窗口及批处理大小,以优化数据处理并减少延迟。

重要

设置管道时,我们建议通过摄取少量记录以测试所有配置是否正确。请注意,成功创建管道并不能保证管道配置正确,也不能保证数据流畅无误。应用映射后,可能出现运行时错误,例如表不正确、动态路径参数错误或 Timestream 记录无效,这些问题将在实际数据流经管道时被发现。

以下配置决定数据摄取的速率:

  • BatchSize:将发送至适用于 LiveAnalytics 的 Timestream 的批次最大大小。范围:0-100。建议将此值保持为 100,以获得最大吞吐量。

  • MaximumBatchingWindowInSeconds:将批次发送至适用于 LiveAnalytics 的 Timestream 目标之前,等待填充 batchSize 的最长时间。根据传入事件的速率,此配置将决定摄取的延迟,建议将此值保持为小于 10 秒,以近乎实时地向 Timestream 发送数据。

  • ParallelizationFactor:每个分片并行处理的批次数量。建议使用最大值 10,以实现最大吞吐量和近乎实时的摄取。

    如果您的数据流被多个目标读取,请使用增强型扇出功能为管道提供专属消费者,从而实现高吞吐量。有关更多信息,请参阅《Kinesis Data Streams 用户指南》中的使用 Kinesis Data Streams API 开发增强型扇出消费者

注意

每个账户可实现的最大吞吐量受限于并发管道执行数

以下配置可防止数据丢失:

  • DeadLetterConfig:建议始终配置 DeadLetterConfig,以避免因用户错误导致事件无法摄取到适用于 LiveAnalytics 的 Timestream 时丢失任何数据。

使用以下配置设置优化管道性能,这有助于防止记录导致性能下降或堵塞。

  • MaximumRecordAgeInSeconds:早于此时间的记录将不予处理,并将直接移至 DLQ。建议将此值设置为不高于目标 Timestream 表配置的内存存储保留期。

  • MaximumRetryAttempts:记录发送到 DeadLetterQueue 之前,该记录的重试次数。建议将此项配置为 10。这应该能够帮助解决任何暂时性问题,而对于持续性问题,记录将被移至 DeadLetterQueue,从而解除对后续数据流的阻塞。

  • OnPartialBatchItemFailure:对于支持部分批处理的源,我们建议您启用此功能,并将其配置为 AUTOMATIC_BISECT,以便在丢弃/发送到 DLQ 之前对失败记录进行额外重试。

配置示例

以下是如何配置 EventBridge 管道以将数据从 Kinesis 流传输到 Timestream 表的示例:

例 Timestream 的 IAM 策略更新
JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "timestream:WriteRecords" ], "Resource": [ "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table" ] }, { "Effect": "Allow", "Action": [ "timestream:DescribeEndpoints" ], "Resource": "*" } ] }
例 Kinesis 流配置
{ "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream", "SourceParameters": { "KinesisStreamParameters": { "BatchSize": 100, "DeadLetterConfig": { "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue" }, "MaximumBatchingWindowInSeconds": 5, "MaximumRecordAgeInSeconds": 1800, "MaximumRetryAttempts": 10, "StartingPosition": "LATEST", "OnPartialBatchItemFailure": "AUTOMATIC_BISECT" } } }
例 Timestream 目标配置
{ "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table", "TargetParameters": { "TimestreamParameters": { "DimensionMappings": [ { "DimensionName": "sensor_id", "DimensionValue": "$.data.device_id", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_type", "DimensionValue": "$.data.sensor_type", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_location", "DimensionValue": "$.data.sensor_loc", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": [ { "MultiMeasureName": "readings", "MultiMeasureAttributeMappings": [ { "MultiMeasureAttributeName": "temperature", "MeasureValue": "$.data.temperature", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "humidity", "MeasureValue": "$.data.humidity", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "pressure", "MeasureValue": "$.data.pressure", "MeasureValueType": "DOUBLE" } ] } ], "SingleMeasureMappings": [], "TimeFieldType": "TIMESTAMP_FORMAT", "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS", "TimeValue": "$.data.time", "VersionValue": "$.approximateArrivalTimestamp" } } }

事件转换

EventBridge 管道允许在数据到达 Timestream 前对其进行转换。您可以定义转换规则以修改传入的 Kinesis 记录,例如更改字段名称。

假设 Kinesis 流包含温度和湿度数据。在将这些字段插入 Timestream 之前,可使用 EventBridge 转换以重命名这些字段。

最佳实践

批处理和缓冲

  • 配置批处理窗口和大小,以在写入延迟和处理效率之间取得平衡。

  • 使用批处理窗口在处理前积累足够的数据,从而减少频繁进行小规模批处理的开销。

并行处理

利用 ParallelizationFactor 设置提高并发度,尤其适用于高吞吐量流。这确保每个分片的多批次数据能够同时进行处理。

数据转换

利用 EventBridge 管道的转换功能,在将记录存储到 Timestream 之前进行筛选和增强。这有助于确保数据与分析要求保持一致。

安全性

  • 确保用于 EventBridge 管道的 IAM 角色具有读取 Kinesis 和写入 Timestream 的必要权限。

  • 使用加密和访问控制措施,以保护传输中数据和静态数据。

调试失败

  • 自动禁用管道

    如果目标不存在或存在权限问题,管道将在约 2 小时后自动禁用

  • 节流

    管道具备自动后退并重试的能力,直至节流有所减弱。

  • 启用日志

    建议您启用错误级别的日志,并包含执行数据,以便更深入地分析失败原因。出现任何故障时,这些日志将包含从 Timestream 发送/接收的请求/响应。这有助于您了解相关的错误,并在修复错误后重新处理记录(如有需要)。

监控

建议您设置以下方面的警报,以检测数据流的任何问题:

  • 源中记录的最长存留时间

    • GetRecords.IteratorAgeMilliseconds

  • 管道中的故障指标

    • ExecutionFailed

    • TargetStageFailed

  • Timestream 写入 API 错误

    • UserErrors

有关其他监控指标,请参阅《EventBridge 用户指南》中的监控 EventBridge