從 Timestream 查詢 InfluxDB 3 的資料 - Amazon Timestream

如需與 Amazon Timestream for LiveAnalytics 類似的功能,請考慮使用 Amazon Timestream for InfluxDB。它提供簡化的資料擷取和單一位數毫秒查詢回應時間,以進行即時分析。在這裡進一步了解。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

從 Timestream 查詢 InfluxDB 3 的資料

Amazon Timestream for InfluxDB 3 提供多個查詢 APIs和通訊協定來擷取您的時間序列資料。平台透過 HTTP 和飛行 (gRPC) 通訊協定支援 SQL 和 InfluxQL 查詢語言,為不同的使用案例和用戶端偏好設定提供靈活性。

查詢方法概觀

InfluxDB 3 支援下列查詢方法:

  • 原生 v3 HTTP API – 透過 REST 端點的 SQL 和 InfluxQL 查詢。

  • influxdb3 CLI – 互動式查詢的命令列界面。

  • Flight+gRPC 通訊協定 – 用戶端程式庫的高效能二進位通訊協定。

  • v1 相容性 API – 回溯相容性的舊版 InfluxQL 查詢。

使用 v3 HTTP 查詢 API

v3 API 為 SQL 和 InfluxQL 查詢提供專用端點,同時支援 GET 和 POST 方法。

SQL 查詢 (/api/v3/query_sql)

GET 請求範例:

curl --get "https://your-cluster-endpoint:8086/api/v3/query_sql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT * FROM home WHERE time >= now() - INTERVAL '1 day'"

POST 請求範例 (適用於複雜的查詢):

curl "https://your-cluster-endpoint:8086/api/v3/query_sql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --json '{ "db": "DATABASE_NAME", "q": "SELECT * FROM home WHERE room = '\''Kitchen'\'' AND temp > 20", "format": "jsonl" }'

InfluxQL 查詢 (/api/v3/query_influxql)

對於 InfluxQL 查詢,請執行下列動作:

curl --get "https://your-cluster-endpoint:8086/api/v3/query_influxql" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT mean(temp) FROM home WHERE time >= now() - 1d GROUP BY room"

查詢參數

您可以使用下列查詢參數:

Parameter (參數) Description 必要
db 資料庫名稱
q 查詢字串 (SQL 或 InfluxQL)
format 回應格式 (json、jsonl、csv、 pretty、parquet) 否 (預設:json)
params 參數化查詢的參數

使用 Influxdb3 CLI

使用 influxdb3命令叫用 InfluxDB 3 命令列界面 (CLI)。它提供一種互動式方法來查詢您的資料,並支援多種輸出格式。

以下顯示基本查詢:

influxdb3 query \ --host "your-cluster-endpoint:8086" \ --token "YOUR_TOKEN" \ --database "DATABASE_NAME" \ "SELECT * FROM home WHERE time >= now() - INTERVAL '1 day'"

以下顯示具有不同輸出格式的查詢:

# JSON output influxdb3 query \ --database "DATABASE_NAME" \ --format json \ "SELECT * FROM home LIMIT 10" # CSV output influxdb3 query \ --database "DATABASE_NAME" \ --format csv \ "SELECT * FROM home LIMIT 10" # Parquet file output influxdb3 query \ --database "DATABASE_NAME" \ --format parquet \ --output results.parquet \ "SELECT * FROM home"

支援下列輸出格式:

  • pretty (預設) – 人類可讀取的表格格式。

  • json – 標準 JSON 陣列。

  • jsonl – JSON Lines (易於串流)。

  • csv – 逗號分隔值。

  • parquet – 二進位單欄格式 (需要檔案輸出)。

使用 v1 相容性 API

對於舊版 InfluxQL 查詢,請使用 /query端點:

curl --get "https://your-cluster-endpoint:8086/query" \ --header "Authorization: Bearer YOUR_TOKEN" \ --data-urlencode "db=DATABASE_NAME" \ --data-urlencode "q=SELECT * FROM home" \ --data-urlencode "epoch=ms"

v3 HTTP 查詢 API 的身分驗證選項

  • 承載字符:Authorization: Bearer TOKEN

  • 基本身分驗證:--user "any:TOKEN"

  • 查詢參數:?p=TOKEN

v3 HTTP 查詢 API 的回應格式

  • JSON (預設)

  • CSV (Accept: application/csvwithheader)

SQL 查詢範例

以下顯示基本查詢:

-- Select specific fields with time filter SELECT temp, humidity, room FROM home WHERE time >= now() - INTERVAL '7 days' AND room = 'Kitchen' ORDER BY time DESC -- Show all tables in database SHOW TABLES -- Show columns in a table SHOW COLUMNS FROM home

以下顯示彙總查詢:

-- Aggregate by tags SELECT room, AVG(temp) as avg_temp, MAX(humidity) as max_humidity FROM home WHERE time >= now() - INTERVAL '24 hours' GROUP BY room -- Time-based aggregation using DATE_BIN SELECT DATE_BIN(INTERVAL '1 hour', time) as time, AVG(temp) as avg_temp, COUNT(*) as reading_count FROM home GROUP BY 1 ORDER BY time DESC

以下顯示進階 SQL 功能:

-- Parameterized queries (via API) SELECT * FROM home WHERE room = $room AND temp > $min_temp AND time >= $start_time -- Gap filling with interpolation SELECT date_bin_gapfill(INTERVAL '5 minutes', time) as time, room, interpolate(avg(temp)) as temp FROM home WHERE time >= '2025-01-01T00:00:00Z' AND time <= '2025-01-01T12:00:00Z' GROUP BY 1, room -- Type casting SELECT temp::INTEGER as temp_int, CAST(humidity AS VARCHAR) as humidity_str FROM home

以下顯示 InfluxQL 查詢範例:

-- Basic query with time filter SELECT * FROM home WHERE time >= now() - 1d -- Aggregation with GROUP BY time SELECT MEAN(temp), MAX(humidity) FROM home WHERE time >= now() - 7d GROUP BY time(1h), room -- Using selector functions SELECT FIRST(temp), LAST(temp) FROM home WHERE time >= now() - 1h GROUP BY room

以下說明如何查詢系統資料表,以了解您的資料庫結構並監控效能:

-- View all tables with schema information SELECT * FROM information_schema.tables -- View column details for a specific table SELECT * FROM information_schema.columns WHERE table_name = 'home' -- Monitor recent queries SELECT * FROM system.queries ORDER BY issue_time DESC LIMIT 10 -- Check cache configurations SELECT * FROM system.last_caches SELECT * FROM system.distinct_caches

查詢效能最佳實務

  • 使用時間篩選條件:一律包含時間範圍篩選條件,以限制掃描的資料。

  • 利用索引:設計查詢以有效地使用標籤篩選條件。

  • 選擇適當的輸出格式

    • 使用 jsonl 串流大型結果。

    • 使用 parquet 進行資料匯出和分析。

    • 使用 csv 實現試算表相容性。

  • 最佳化彙總:使用 DATE_BIN 進行以時間為基礎的分組。

  • 使用參數化查詢:防止注入攻擊並改善可重複使用性。

  • 監控查詢效能:檢查 system.queries 資料表是否有緩慢的查詢。

用戶端程式庫查詢支援

InfluxDB 3 用戶端程式庫使用 Flight+gRPC 通訊協定來獲得最佳效能。下列 Python 程式碼顯示此範例。

from influxdb3 import InfluxDBClient3 client = InfluxDBClient3( host="your-cluster-endpoint:8086", token="YOUR_TOKEN", database="DATABASE_NAME" ) # SQL query sql_result = client.query("SELECT * FROM home WHERE room = 'Kitchen'") # InfluxQL query influxql_result = client.query( "SELECT MEAN(temp) FROM home WHERE time >= now() - 1h GROUP BY room", language="influxql" )

比較 SQL 和 InfluxQL 查詢功能

下表比較 SQL 和 InfluxQL 查詢功能:

功能 SQL InfluxQL
聯結 支援 不支援
窗函數 完整支援 有限
子查詢 支援 不支援
時間函數 DATE_BIN, INTERVAL time() 分組
參數化查詢 原生支援 不支援
間隙填充 date_bin_gapfill() fill() 函數

透過了解這些查詢功能並選擇適合您使用案例的方法,您可以有效率地從 Timestream for InfluxDB 3 擷取和分析時間序列資料。

SQL 優點:

  • 支援複雜查詢的全功能 SQL 實作。

  • 支援聯結、聯集和視窗函數。

  • 適用於 SQL 背景使用者的熟悉語法。

  • 更廣泛的分析功能。

InfluxQL 優點:

  • 專為時間序列資料而設計。

  • 簡化常見時間序列操作的語法。

  • 從 InfluxDB v1 遷移的使用者回溯相容性。

  • 專用時間序列函數。

查詢最佳化功能

InfluxDB 3 提供數種最佳化功能,可改善特定使用案例的查詢效能。這些功能利用記憶體內快取和自訂索引策略,為經常存取的資料模式提供低於毫秒的回應時間。

最後一個值快取 (LVC)

Last Value Cache (LVC) 會將指定欄位的最新 N 值存放在記憶體中,為需要最新資料點的查詢啟用低於 10 毫秒的回應時間。此功能可在 Core 和 Enterprise 版本中使用。

LVC 的運作方式

LVC 會維護每個唯一索引鍵資料欄組合 (通常是標籤) 的最新值的記憶體內資料表。例如,使用感應器資料:

Table: home ├── Tags: room, wall └── Fields: temp, humidity, co LVC Configuration: - Key columns: room, wall - Value columns: temp, humidity, co - Count: 4 (last 4 values)

快取存放區:

房間 溫度 濕度 Co time
廚房 東部 22.7 36.5 26 2025-01-26T20:00:00Z
廚房 東部 22.7 36.0 9 2025-01-26T17:00:00Z
廚房 東部 22.7 36.2 3 2025-01-26T15:00:00Z
廚房 東部 22.7 36.1 0 2025-01-26T10:00:00Z
客廳 北部 22.2 36.4 17 2025-01-26T20:00:00Z
... ... ... ... ... ...

建立 LVC

使用下列命令來建立 LVC:

influxdb3 create last_cache \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table home \ --key-columns room,wall \ --value-columns temp,hum,co \ --count 5 \ --ttl 30mins \ homeLastCache

查詢 LVC

使用下列命令來查詢 LVC:

-- Query the last cached values SELECT * FROM last_cache('home', 'homeLastCache') -- Filter specific series SELECT * FROM last_cache('home', 'homeLastCache') WHERE room = 'Kitchen'

LVC 最佳實務

  • 管理基數:僅包含基本標籤作為索引鍵資料欄,以將記憶體用量降至最低。

  • 最佳化值計數:在查詢需求和記憶體耗用量之間取得平衡。

  • 考慮 TTL:為快取項目設定適當的time-to-live。

  • 監控記憶體:快取大小 = (key_column_cardinality × count × value_columns)。

差異值快取 (DVC)

Distinct Value Cache (DVC) 會維護記憶體中指定資料欄的唯一值,將中繼資料查詢加速至 30 毫秒以下的回應時間。適用於 Core 和 Enterprise 版本。

DVC 的運作方式

DVC 會存放指定資料欄的所有唯一值組合,非常適合需要列出可用標籤值或中繼資料的查詢。

位置資料的範例快取:

國家/地區 城市
奧地利 薩爾斯堡 薩爾斯堡
奧地利 維也納 維也納
比利時 安特衛普 安特衛普
比利時 西部佛蘭德斯 布魯日
捷克 布拉格 布拉格

建立 DVC

使用下列命令來建立 DVC:

influxdb3 create distinct_cache \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data \ --columns country,county,city \ --max-cardinality 10000 \ --max-age 24h \ windDistinctCache

查詢 DVC

使用下列命令來查詢 DVC:

-- Get all distinct values SELECT * FROM distinct_cache('wind_data', 'windDistinctCache') -- Get distinct countries SELECT DISTINCT country FROM distinct_cache('wind_data', 'windDistinctCache')

DVC 最佳實務

  • 設定基數限制:定義控制記憶體的最大唯一值組合。

  • 設定最長存留期:自動移除過時的值。

  • 快取策略資料欄:專注於中繼資料查詢中常用的資料欄。

  • 監控快取大小:較高的基數表示需要更多記憶體。

僅適用於 Enterprise 的檔案索引

僅適用於 Enterprise Edition。檔案索引可讓您自訂資料在儲存中編製索引的方式,大幅改善單一序列查詢效能。

預設與自訂索引

預設索引:所有標籤上的 InfluxDB 索引加上時間。 

自訂索引:僅在與您的查詢相關的資料欄上編製索引。

最佳化範例:

Schema columns: country, state_province, county, city, postal_code Query patterns: Always filter by country, state_province, city Custom index: time, country, state_province, city (skip county, postal_code)

以下說明如何建立自訂檔案索引

influxdb3 create file_index \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data \ country,state_province,city

以下說明如何刪除檔案索引:

influxdb3 delete file_index \ --database DATABASE_NAME \ --token YOUR_TOKEN \ --table wind_data

檔案索引考量事項

  • 壓縮必要:索引是在資料壓縮 (gen2+) 期間建置。

  • 僅限字串欄:可以同時為標籤和字串欄位編製索引。

  • 查詢模式分析:在建立自訂索引之前分析工作負載。

  • 單一系列最佳化:最適合以特定系列為目標的查詢。

管理快取資訊

使用系統資料表檢視快取組態和統計資料:

-- View Last Value Caches SELECT * FROM system.last_caches -- View Distinct Value Caches SELECT * FROM system.distinct_caches -- Check cache memory usage SELECT cache_name, table_name, key_columns, value_count, memory_size_bytes FROM system.last_caches

效能最佳化策略

根據您的查詢模式選擇正確的最佳化功能:

查詢模式 建議的功能 預期效能
每個序列的最新值 上次值快取 <10ms
可用的標籤值 不同的值快取 <30ms
單一系列查詢 檔案索引 (企業) 大幅改善
時間範圍彙總 標準索引 基準效能

記憶體和資源考量

快取記憶體公式:

  • LVC:記憶體 = key_cardinality × value_count × value_columns × data_size

  • DVC:記憶體 = distinct_combinations × column_count × data_size

最佳實務:

  • 從小型快取開始,並監控記憶體用量

  • 使用 TTL 設定限制快取成長

  • 僅快取經常查詢的資料

  • 透過系統資料表監控快取命中率

  • 對於企業:將快取與自訂檔案索引結合以獲得最佳效能

透過適當地利用這些最佳化功能,您可以針對特定查詢模式實現顯著的效能改善,同時有效地管理資源耗用量。