View a markdown version of this page

向 Timestream for InfluxDB 3 集群写入数据 - Amazon Timestream

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间,以实现实时分析。点击此处了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

向 Timestream for InfluxDB 3 集群写入数据

Amazon Timestream for InfluxDB 3 提供强大的功能,以高效摄取时间序列数据。了解正确的数据写入方法对于实现性能最大化并确保数据完整性至关重要。

Timestream for InfluxDB 3 提供多个 HTTP API 端点用于写入时间序列数据,既能灵活适配不同的集成方式,又能兼容现有的 InfluxDB 工作负载。

行协议概述

InfluxDB 3 专为高写入吞吐量而设计,并使用名为行协议的高效、人类可读的写入语法。作为 schema-on-write数据库,InfluxDB 会在您开始写入数据时自动创建逻辑数据库、表及其架构,无需任何手动设置。创建架构后,InfluxDB 会在接受新数据之前根据该架构验证未来的写入请求,同时仍允许在需求变化时进行架构演变。

行协议结构

行协议包含以下基本元素:

  • :存储数据的表的字符串标识符。

  • (可选)标签集:以逗号分隔的键值对,表示元数据(已进行索引)。

  • 字段集:以逗号分隔的键值对,表示实际测量值。

  • (可选)时间戳:与数据点关联的 Unix 时间戳,精度可达纳秒级。

字段值可以是以下数据类型之一:

  • 字符串(必须加引号)

  • 浮点数(例如,23.4)

  • 整数(例如,10i)

  • 无符号整数(例如,10u)

  • 布尔值(true/false)

行协议遵循以下通用语法:

myTable,tag1=val1,tag2=val2 field1="v1",field2=1i 0000000000000000000

使用行协议的数据点示例:

home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1735545600

这会在“home”表中创建包含以下内容的点:

  • 标签:room=“客厅”

  • 字段:temp=21.1(浮点数)、hum=35.9(浮点数)、co=0(整数)

  • 时间戳:1735545600(Unix 秒)

API 端点概览

InfluxDB 3 支持三种主要写入端点:

  1. 原生 v3 API/api/v3/write_lp):新实施的推荐端点。

  2. v2 兼容性 API/api/v2/write):用于迁移 InfluxDB v2.x 工作负载。

  3. v1 兼容性 API/write):用于迁移 InfluxDB v1.x 工作负载。

使用原生 v3 写入 API

/api/v3/write_lp 端点是用于写入行协议数据的原生 InfluxDB 3 API。

请求格式:

POST /api/v3/write_lp?db=DATABASE_NAME&precision=PRECISION&accept_partial=BOOLEAN&no_sync=BOOLEAN

查询参数:

参数 描述 默认
db 数据库名称(必填) -
precision 时间戳精度(ns、us、ms、s) 自动检测
accept_partial 在错误发生时允许部分写入 true
no_sync 在 WAL 持久化之前进行确认 false

示例写入请求:

curl -v "https://your-cluster-endpoint:8086/api/v3/write_lp?db=sensors&precision=s" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-raw "home,room=Living\ Room temp=21.1,hum=35.9,co=0i 1735545600 home,room=Kitchen temp=21.0,hum=35.9,co=0i 1735545600"

写入响应模式

标准模式(no_sync=false)

  • 在确认之前等待数据写入 WAL(写入前日志)。

  • 提供持久性保证。

  • 由于 WAL 持久化等待导致的延迟增加。

  • 建议用于对持久性要求极高的关键数据。

快速模式(no_sync=true)

  • 立即确认,无需等待 WAL 持久化。

  • 尽可能低的写入延迟。

  • 如果系统在 WAL 写入完成前崩溃,则存在数据丢失风险。

  • 适用于优先考虑速度而非绝对持久性的高吞吐量场景。

部分写入处理

accept_partial 参数控制写入批处理包含错误时的行为:

accept_partialtrue 时(默认):

  • 有效行已成功写入。

  • 无效行会被拒绝。

  • 返回 400 状态,并附带失败行的详细信息。

  • 适用于可容忍部分失败的大批量操作。

accept_partialfalse 时:

  • 如果任何一行失败,则整个批处理都将被拒绝。

  • 未写入任何数据。

  • 返回 400 状态,并显示错误详情。

  • 确保 all-or-nothing写入语义。

兼容性 APIs

兼容性 APIs 允许将现有的 InfluxDB v1 或 v2 工作负载无缝迁移到 InfluxDB 3。这些端点可与现有的 InfluxDB 客户端库、Telegraf 以及第三方集成配合使用。

重要区别:

  • 表(测量值)中的标签一经创建即不可更改。

  • 同一个表中,标签和字段不能使用相同的名称。

  • 架构验证在写入时强制执行。

InfluxDB v2 兼容性

/api/v2/write 端点为 v2 客户端提供向后兼容性:

curl -i "https://your-cluster-endpoint:8086/api/v2/write?bucket=DATABASE_NAME&precision=s" \ --header "Authorization: Bearer DATABASE_TOKEN" \ --header "Content-type: text/plain; charset=utf-8" \ --data-binary 'home,room=kitchen temp=72 1641024000'

V2 API 参数:

参数 位置 描述
bucket * 查询字符串 映射到数据库名称
precision 查询字符串 时间戳精度(ns、us、ms、s、m、h)
Authorization 标题 持有者或令牌架构
Content-Encoding 标题 gzip 或身份
InfluxDB v1 兼容性

/write 端点为 v1 客户端提供向后兼容性:

curl -i "https://your-cluster-endpoint:8086/write?db=DATABASE_NAME&precision=s" \ --user "any:DATABASE_TOKEN" \ --header "Content-type: text/plain; charset=utf-8" \ --data-binary 'home,room=kitchen temp=72 1641024000'

V1 身份验证选项:

  • 基本身份验证:令牌作为密码(--user "any:TOKEN")。

  • 查询参数:URL 中的 p=TOKEN

  • Bearer/Token 标头:标准授权标头。

V1 API 参数:

参数 位置 描述
db * 查询字符串 数据库名称
precision 查询字符串 时间戳精度
p 查询字符串 用于查询身份验证的令牌
u 查询字符串 用户名(已忽略)
Authorization 标题 支持多个架构
Content-Encoding 标题 gzip 或身份

客户端库与集成

InfluxDB 3 官方客户端库

InfluxDB 3 客户端库提供用于构造和写入时间序列数据的本地语言接口:

  • Pythoninfluxdb3-python

  • Goinfluxdb3-go

  • JavaScript/Node.jsinfluxdb3-js

  • Javainfluxdb3-java

  • C#InfluxDB3.Client

示例:Python 客户端

from influxdb3 import InfluxDBClient3 client = InfluxDBClient3( host="your-cluster-endpoint:8086", token="YOUR_TOKEN", database="DATABASE_NAME" ) # Write using line protocol client.write("home,room=Living\\ Room temp=21.1,hum=35.9,co=0i") # Write using Point objects from influxdb3 import Point point = Point("home") \ .tag("room", "Living Room") \ .field("temp", 21.1) \ .field("hum", 35.9) \ .field("co", 0) client.write(point)

示例:Go 客户端

import "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" client, err := influxdb3.New(influxdb3.ClientConfig{ Host: "your-cluster-endpoint:8086", Token: "YOUR_TOKEN", Database: "DATABASE_NAME", }) point := influxdb3.NewPoint("home", map[string]string{"room": "Living Room"}, map[string]any{ "temp": 24.5, "hum": 40.5, "co": 15, }, time.Now(), ) err = client.WritePoints(context.Background(), []*influxdb3.Point{point})

旧版客户端库

对于现有的 v1 和 v2 工作负载,可继续将旧版客户端库与兼容性端点结合使用:

示例:Node.js v1 客户端:

const Influx = require('influx') const client = new Influx.InfluxDB({ host: 'your-cluster-endpoint', port: 8086, protocol: 'https', database: 'DATABASE_NAME', username: 'ignored', password: 'DATABASE_TOKEN' })

写入数据的最佳实践

写入数据时,我们建议执行以下步骤:

  • 批量优化

    • 最佳批量大小:5000-10000 行或每个请求 10MB。

    • 对大型有效载荷使用压缩(gzip)。

    • 按字典顺序对标签进行排序,以获得更佳性能。

  • 时间戳精度

    • 使用满足您需求的最低精度。

    • 明确指定精度,以避免歧义。

    • 在整个应用程序中保持一致的精度。

  • 错误处理

    • 对暂时性故障实施重试逻辑。

    • 使用 accept_partial=true 实现弹性批处理操作。

    • 通过 CloudWatch 指标监控写入错误。

  • 性能优化

    • 在高吞吐量场景中使用 no_sync=true。

    • 跨多个连接分配写入。

    • 使用 writer/reader 终端节点进行所有写入操作。

  • 架构注意事项

    • 标签一经创建即不可改变。

    • 字段和标签不能共享相同的名称。

    • 设计架构时需考虑查询模式。

    • 控制标签基数。

与先前版本的重要区别:

  • 不可变标签:在表中创建标签后,其类型不可更改

  • 没有 tag/field 名称冲突:标签和字段在表中不能有相同的名称

  • Schema-on-write: InfluxDB 3 在写入时验证数据类型

  • 自动创建表:首次写入时自动创建表

  • 严格的类型检查:字段类型在所有写入操作中必须保持一致

通过利用适当的写入 API 并遵循这些最佳实践,您可以在保持高性能和数据完整性的同时,高效地将时间序列数据摄取到 Timestream for InfluxDB 3 实例中。