使用處理引擎外掛程式擴展 InfluxDB 的 Timestream - Amazon Timestream

如需與 Amazon Timestream for LiveAnalytics 類似的功能,請考慮使用 Amazon Timestream for InfluxDB。它提供簡化的資料擷取和單一位數毫秒查詢回應時間,以進行即時分析。在這裡進一步了解。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用處理引擎外掛程式擴展 InfluxDB 的 Timestream

處理引擎是一種內嵌 Python 虛擬機器,可在 Amazon Timestream 中的 InfluxDB 3 資料庫內執行。它適用於 Core 和 Enterprise 版本。它可讓您使用自訂 Python 程式碼擴展資料庫,以自動化工作流程、轉換資料和建立自訂 API 端點。

處理引擎會執行 Python 外掛程式以回應特定資料庫事件:

  • 資料寫入:在資料進入資料庫時處理和轉換資料

  • 排程事件:以定義的間隔或特定時間執行程式碼

  • HTTP 請求:公開執行程式碼的自訂 API 端點

引擎包含記憶體內快取,用於管理執行之間的狀態,讓您能夠直接在資料庫中建置具狀態的應用程式。

InfluxData 認證外掛程式

在啟動時,InfluxDB 3 包含一組經過 InfluxData 認證的預先建置、可完全設定的外掛程式:

  • 資料轉換:處理和豐富傳入的資料

  • 提醒:根據資料閾值傳送通知

  • 彙總:計算時間序列資料的統計資料

  • 系統監控:追蹤資源用量和運作狀態指標

  • 整合:連線至外部服務和 APIs

這些已認證的外掛程式已準備好使用,可以透過觸發參數進行設定,以符合您的特定需求。

外掛程式類型和觸發條件規格

外掛程式類型 觸發條件規格 外掛程式執行時 使用案例
資料寫入 table:<TABLE_NAME>all_tables 將資料寫入資料表時 資料轉換、提醒、衍生指標
已排程 every:<DURATION>cron:<EXPRESSION> 依指定的間隔 定期彙總、報告、運作狀態檢查
HTTP 請求 request:<REQUEST_PATH> 收到 HTTP 請求時 自訂 APIs、Webhook、使用者介面

建立 觸發

觸發將外掛程式連線至資料庫事件,並定義它們的執行時間。使用 influxdb3 create trigger 命令。

若要建立資料寫入觸發條件:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

若要建立排程觸發:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

若要建立 HTTP 請求觸發條件:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

存取端點,網址為:https://your-cluster-endpoint:8086/api/v3/engine/webhook

設定觸發

將引數傳遞至外掛程式

使用觸發參數設定外掛程式行為:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

引數會以字典的形式傳遞至外掛程式:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

錯誤處理行為

設定觸發程序如何處理錯誤:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

非同步執行

允許多個觸發執行個體同時執行:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

管理觸發條件

若要檢視資料庫的觸發條件:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

寫入觸發條件的資料表排除

若要在使用 時篩選外掛程式程式碼中的資料表all_tables

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

外掛程式實作如下:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

分散式部署考量事項

在多節點部署中,根據節點角色設定外掛程式:

外掛程式類型 節點類型 原因
資料寫入外掛程式 Ingester 節點 在擷取點處理資料
HTTP 請求外掛程式 Querier 節點 處理 API 流量
排程外掛程式 任何已設定的節點 可以使用排程器在任何節點上執行

下列考量事項對於企業部署很重要:

  • 在所有相關節點之間維持相同的外掛程式組態。

  • 將外部用戶端 (Grafana、儀表板) 路由到佇列節點。

  • 確保外掛程式可在其觸發程序執行的節點上使用。

最佳實務

  • 外掛程式組態

    • 針對可設定的值使用觸發引數,而非硬式編碼。

    • 在外掛程式中實作適當的錯誤處理。

    • 使用 influxdb3_local API 進行資料庫操作。

  • 效能最佳化

    • 針對繁重的處理任務使用非同步執行。

    • 實作篩選資料的提早傳回。

    • 最小化外掛程式中的資料庫查詢。

  • 錯誤管理

    • 選擇適當的錯誤行為 (記錄、重試或停用)。

    • 透過系統資料表監控外掛程式執行。

    • 在生產部署之前徹底測試外掛程式。

  • 安全考量

    • 驗證 HTTP 請求外掛程式中的所有輸入資料。

    • 使用安全方法來存放敏感組態。

    • 將外掛程式許可限制為僅限必要操作。

監控外掛程式執行

查詢系統資料表以監控外掛程式效能:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

處理引擎提供強大的方法來擴展 InfluxDB 3 功能,同時保持資料處理邏輯接近您的資料,減少延遲並簡化您的架構。

InfluxData 認證外掛程式

Amazon Timestream for InfluxDB 3 包含一組完整的預先建置、認證外掛程式,可延伸資料庫功能,而無需自訂開發。這些外掛程式可完全設定,並可在啟動時使用,提供資料處理、監控和提醒的進階功能。

如需完整的文件和原始程式碼,請造訪 InfluxData 外掛程式儲存庫

可用的外掛程式

異常偵測外掛程式

以 MAD 為基礎的異常偵測

  • 觸發類型:資料寫入 (即時)

  • 使用案例:串流資料的即時極端值偵測、感應器監控、品質控制。

  • GitHubMAD 異常偵測文件

運作方式:使用中位數絕對偏差 (MAD) 為正常行為建立穩固的基準。當新資料送達時,它會計算每個點與中位數相隔多少 MADs。超過閾值的點 (k * MAD) 會標記為異常。

主要功能:

  • 寫入資料時的即時處理。

  • 維護記憶體內滑動時段以提高效率。

  • 計數型警示 (例如 5 個連續異常)。

  • 持續時間型提醒 (例如 2 分鐘的異常)。

  • 翻轉抑制,以防止警示疲勞快速變更值。

使用範例

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

Output:偵測到異常時傳送即時通知,包括欄位名稱、值和持續時間。

資料轉換外掛程式

基本轉換

  • 觸發類型:排程、資料寫入

  • 使用案例:資料標準化、單位轉換、欄位名稱標準化、資料清理。

  • GitHub基本轉換文件

運作方式:將轉換鏈套用至欄位名稱和值。可以批次處理歷史資料 (排程),或在資料送達時轉換資料 (資料寫入)。轉換會依指定的順序套用,允許複雜的資料管道。

主要功能:

  • 欄位名稱轉換:snake_case、移除空格、僅限英數字元。

  • 單位轉換:溫度、壓力、長度、時間單位。

  • 具有 regex 支援的自訂字串取代。

  • 乾執行模式,用於測試而不寫入資料。

  • 歷史資料的批次處理。

使用範例

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

Output:使用轉換的資料建立新的資料表,保留原始時間戳記和標籤。

向下取樣器

  • 觸發類型:排程、HTTP

  • 使用案例:資料縮減、長期儲存最佳化、建立摘要統計資料、效能改善。

  • GitHubDownsampler 文件

運作方式:將高解析度時間序列資料彙總為低解析度摘要。例如, 會將 1 秒的資料轉換為 1 小時的平均值。每個向下取樣點都包含有關壓縮的原始點數量和涵蓋時間範圍的中繼資料。

主要功能:

  • 多個彙總函數:avg、 sum、min、max、median、 derivative。

  • 欄位特定彙總 (不同欄位的不同函數)。

  • 中繼資料追蹤 (record_count、time_from、time_to)。

  • 用於隨需回填的 HTTP API。

  • 大型資料集的可設定批次大小。

使用範例

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

Output:使用彙總值加上中繼資料資料欄建立向下取樣的資料,顯示壓縮的點數和時間範圍。

監控和提醒外掛程式

狀態變更監控

  • 觸發類型:排程、資料寫入

  • 使用案例:狀態監控、設備狀態追蹤、程序監控、變更偵測。

  • GitHubState 變更文件

運作方式:追蹤欄位值會隨著時間而變更,並在變更數量超過設定的閾值時發出警示。可以偵測值變更 (不同的值) 和特定值條件 (等於目標值)。包括穩定性檢查,以防止來自雜訊訊號的警示。

主要功能:

  • 計數型變更偵測 (例如十分鐘內五次變更)。

  • 以持續時間為基礎的監控 (例如,狀態 = 五分鐘的「錯誤」)。

  • 減少雜訊的狀態變更時段。

  • 具有獨立閾值的多欄位監控。

  • 可設定的穩定性要求。

使用範例

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

Output:Alerts 包含欄位名稱、偵測到的變更數量、時間範圍和相關的標籤值。

系統指標收集器

  • 觸發類型:排程

  • 使用案例:基礎設施監控、效能基準、容量規劃、資源追蹤。

  • GitHub系統指標文件

運作方式:使用 psutil 程式庫從執行 InfluxDB 的主機收集完整的系統指標。以可設定的間隔收集 CPU、記憶體、磁碟和網路統計資料。每個指標類型都可以獨立啟用/停用。

主要功能:

  • 具有負載平均值的每個核心 CPU 統計資料。

  • 記憶體用量,包括交換和分頁錯誤。

  • 具有計算 IOPS 和延遲的磁碟 I/O 指標。

  • 具有錯誤追蹤的網路界面統計資料。

  • 可設定的指標集合 (啟用/停用特定類型)。

  • 集合失敗時自動重試。

使用範例

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

Output:使用每個子系統的詳細指標建立多個資料表 (system_cpu、system_memory、system_disk_io 等)。

預測分析外掛程式

Prophet 預測

  • 觸發類型:排程、HTTP

  • 使用案例:需求預測、容量規劃、趨勢分析、季節性模式偵測。

  • GitHubProphet 預測文件

運作方式:使用 Facebook 的 Prophet 程式庫來建置時間序列預測模型。可以根據歷史資料訓練模型,並產生未來時段的預測。模型說明趨勢、季節性、假日和變更點。支援模型持久性,以實現一致的預測。

主要功能:

  • 自動季節性偵測 (每日、每週、每年)。

  • 假日行事曆支援 (內建和自訂)。

  • 趨勢轉移的變更點偵測。

  • 模型持續性和版本控制。

  • 預測的可信度間隔。

  • 使用 MSRE 閾值進行驗證。

使用範例

# Train and forecast temperature data influxdb3 create trigger \ --database weather \ --plugin-filename "prophet_forecasting/prophet_forecasting.py" \ --trigger-spec "every:1d" \ --trigger-arguments 'measurement=temperature,field=value,window=90d,forecast_horizont=7d,tag_values="location:seattle",target_measurement=temperature_forecast,model_mode=train,unique_suffix=seattle_v1,seasonality_mode=additive' \ temp_forecast # Validate temperature predictions with MAE influxdb3 create trigger \ --database weather \ --plugin-filename "forecast_error_evaluator/forecast_error_evaluator.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'forecast_measurement=temp_forecast,actual_measurement=temp_actual,forecast_field=predicted,actual_field=temperature,error_metric=mae,error_thresholds=WARN-"2.0":ERROR-"5.0",window=6h,rounding_freq=5min,senders=discord' \ temp_forecast_check

Output:預測錯誤超過閾值時傳送通知,包括錯誤指標值和受影響的時間範圍。

常見的組態模式

使用 TOML 組態檔案

對於複雜的組態,請使用 TOML 檔案而非內嵌引數:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

鏈結外掛程式

透過鏈結多個外掛程式來建立資料處理管道:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

外掛程式的最佳實務

  • 開始保守 – 從較高的閾值和較長的時段開始,然後根據觀察到的模式進行調整。

  • 開發中測試 – 在生產部署之前使用試轉模式和測試資料庫。

  • 監控外掛程式效能 – 檢查系統資料表中的執行時間和資源用量。

  • 使用適當的觸發類型 – 選擇批次處理排程、即時資料寫入。

  • 明智地設定通知 – 使用嚴重性等級和退信邏輯來防止警示疲勞。

  • 利用模型持久性 – 對於以 ML 為基礎的外掛程式,請儲存訓練模型以確保一致性。

  • 文件組態 – 使用描述性觸發條件名稱並維護組態文件。

監控外掛程式執行

若要監控外掛程式效能:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

對常見問題進行故障診斷

下表顯示常見問題和可能的解決方案。

問題 解決方案
外掛程式未觸發 確認已啟用觸發,檢查排程/規格語法
缺少通知 確認已安裝 Notifier 外掛程式,檢查 Webhook URLs
高記憶體用量 減少視窗大小、調整批次處理間隔
不正確的轉換 使用試轉模式,驗證欄位名稱和資料類型
預測不準確 增加訓練資料時段,調整季節性設定
太多提醒 增加觸發計數、新增退信持續時間、調整閾值

這些經過認證的外掛程式為常見的時間序列資料處理需求提供企業就緒功能,消除了自訂開發的需求,同時透過全面的組態選項保持靈活性。請造訪 GitHub 儲存庫以取得詳細的文件、範例和更新。