有关与适用于 LiveAnalytics 的 Amazon Timestream 类似的功能,可以考虑使用适用于 InfluxDB 的 Amazon Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。
Amazon Kinesis
使用 适用于 Apache Flink 的 Amazon 托管服务
使用适用于 Apache Flink 的托管服务示例 Timestream 数据连接器,将数据从 Kinesis 数据流发送到适用于 LiveAnalytics 的 Timestream。有关更多信息,请参阅 Apache Flink 的 适用于 Apache Flink 的亚马逊托管服务。
使用 EventBridge 管道,以将 Kinesis 数据发送到 Timestream
可使用 EventBridge 管道以将数据从 Kinesis 流发送到适用于 LiveAnalytics 的 Amazon Timestream 表。
管道用于支持的来源和目标之间的点对点集成,并支持高级转换和扩充。在开发事件驱动型架构时,管道可减少对专业知识和集成代码的需求。要设置管道,请选择源、添加可选筛选、定义可选富集,然后为事件数据选择目标。
此集成使您能够利用 Timestream 时间序列数据分析功能的强大优势,同时简化数据摄取管道。
将 EventBridge 管道与 Timestream 结合使用可提供以下优势:
实时数据摄取:将数据从 Kinesis 直接流式传输到适用于 LiveAnalytics 的 Timestream,从而实现实时分析和监控。
无缝集成:利用 EventBridge 管道管理数据流,无需进行复杂的自定义集成。
增强的筛选和转换:在 Kinesis 记录存储至 Timestream 前对其进行筛选或转换,以满足特定数据处理要求。
可扩展性:利用内置的并行和批处理功能,处理高吞吐量数据流,并确保高效的数据处理。
配置
要设置 EventBridge 管道以将数据从 Kinesis 流式传输到 Timestream,请按照以下步骤操作:
创建 Kinesis 流
确保您有活跃的 Kinesis 数据流,可用于摄取数据。
创建 Timestream 数据库和表
设置将用于存储数据的 Timestream 数据库和表。
配置 EventBridge 管道:
来源:选择 Kinesis 流作为来源。
目标:选择 Timestream 作为目标。
批处理设置:定义批处理窗口及批处理大小,以优化数据处理并减少延迟。
重要
设置管道时,我们建议通过摄取少量记录以测试所有配置是否正确。请注意,成功创建管道并不能保证管道配置正确,也不能保证数据流畅无误。应用映射后,可能出现运行时错误,例如表不正确、动态路径参数错误或 Timestream 记录无效,这些问题将在实际数据流经管道时被发现。
以下配置决定数据摄取的速率:
BatchSize:将发送至适用于 LiveAnalytics 的 Timestream 的批次最大大小。范围:0-100。建议将此值保持为 100,以获得最大吞吐量。
MaximumBatchingWindowInSeconds:将批次发送至适用于 LiveAnalytics 的 Timestream 目标之前,等待填充 batchSize 的最长时间。根据传入事件的速率,此配置将决定摄取的延迟,建议将此值保持为小于 10 秒,以近乎实时地向 Timestream 发送数据。
ParallelizationFactor:每个分片并行处理的批次数量。建议使用最大值 10,以实现最大吞吐量和近乎实时的摄取。
如果您的数据流被多个目标读取,请使用增强型扇出功能为管道提供专属消费者,从而实现高吞吐量。有关更多信息,请参阅《Kinesis Data Streams 用户指南》中的使用 Kinesis Data Streams API 开发增强型扇出消费者。
注意
每个账户可实现的最大吞吐量受限于并发管道执行数。
以下配置可防止数据丢失:
DeadLetterConfig:建议始终配置 DeadLetterConfig,以避免因用户错误导致事件无法摄取到适用于 LiveAnalytics 的 Timestream 时丢失任何数据。
使用以下配置设置优化管道性能,这有助于防止记录导致性能下降或堵塞。
MaximumRecordAgeInSeconds:早于此时间的记录将不予处理,并将直接移至 DLQ。建议将此值设置为不高于目标 Timestream 表配置的内存存储保留期。
MaximumRetryAttempts:记录发送到 DeadLetterQueue 之前,该记录的重试次数。建议将此项配置为 10。这应该能够帮助解决任何暂时性问题,而对于持续性问题,记录将被移至 DeadLetterQueue,从而解除对后续数据流的阻塞。
OnPartialBatchItemFailure:对于支持部分批处理的源,我们建议您启用此功能,并将其配置为 AUTOMATIC_BISECT,以便在丢弃/发送到 DLQ 之前对失败记录进行额外重试。
配置示例
以下是如何配置 EventBridge 管道以将数据从 Kinesis 流传输到 Timestream 表的示例:
例 Timestream 的 IAM 策略更新
例 Kinesis 流配置
{ "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream", "SourceParameters": { "KinesisStreamParameters": { "BatchSize": 100, "DeadLetterConfig": { "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue" }, "MaximumBatchingWindowInSeconds": 5, "MaximumRecordAgeInSeconds": 1800, "MaximumRetryAttempts": 10, "StartingPosition": "LATEST", "OnPartialBatchItemFailure": "AUTOMATIC_BISECT" } } }
例 Timestream 目标配置
{ "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table", "TargetParameters": { "TimestreamParameters": { "DimensionMappings": [ { "DimensionName": "sensor_id", "DimensionValue": "$.data.device_id", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_type", "DimensionValue": "$.data.sensor_type", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_location", "DimensionValue": "$.data.sensor_loc", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": [ { "MultiMeasureName": "readings", "MultiMeasureAttributeMappings": [ { "MultiMeasureAttributeName": "temperature", "MeasureValue": "$.data.temperature", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "humidity", "MeasureValue": "$.data.humidity", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "pressure", "MeasureValue": "$.data.pressure", "MeasureValueType": "DOUBLE" } ] } ], "SingleMeasureMappings": [], "TimeFieldType": "TIMESTAMP_FORMAT", "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS", "TimeValue": "$.data.time", "VersionValue": "$.approximateArrivalTimestamp" } } }
事件转换
EventBridge 管道允许在数据到达 Timestream 前对其进行转换。您可以定义转换规则以修改传入的 Kinesis 记录,例如更改字段名称。
假设 Kinesis 流包含温度和湿度数据。在将这些字段插入 Timestream 之前,可使用 EventBridge 转换以重命名这些字段。
最佳实践
批处理和缓冲
配置批处理窗口和大小,以在写入延迟和处理效率之间取得平衡。
使用批处理窗口在处理前积累足够的数据,从而减少频繁进行小规模批处理的开销。
并行处理
利用 ParallelizationFactor 设置提高并发度,尤其适用于高吞吐量流。这确保每个分片的多批次数据能够同时进行处理。
数据转换
利用 EventBridge 管道的转换功能,在将记录存储到 Timestream 之前进行筛选和增强。这有助于确保数据与分析要求保持一致。
安全性。
确保用于 EventBridge 管道的 IAM 角色具有读取 Kinesis 和写入 Timestream 的必要权限。
使用加密和访问控制措施,以保护传输中数据和静态数据。
调试失败
-
自动禁用管道
如果目标不存在或存在权限问题,管道将在约 2 小时后自动禁用
-
节流
管道具备自动后退并重试的能力,直至节流有所减弱。
-
启用日志
建议您启用错误级别的日志,并包含执行数据,以便更深入地分析失败原因。出现任何故障时,这些日志将包含从 Timestream 发送/接收的请求/响应。这有助于您了解相关的错误,并在修复错误后重新处理记录(如有需要)。
监控
建议您设置以下方面的警报,以检测数据流的任何问题:
源中记录的最长存留时间
GetRecords.IteratorAgeMilliseconds
管道中的故障指标
ExecutionFailedTargetStageFailed
Timestream 写入 API 错误
UserErrors
有关其他监控指标,请参阅《EventBridge 用户指南》中的监控 EventBridge。