処理エンジンプラグインを使用して Timestream for InfluxDB を拡張する - Amazon Timestream

Amazon Timestream for LiveAnalytics に類似した機能をご希望の場合は Amazon Timestream for InfluxDB をご検討ください。リアルタイム分析に適した、シンプルなデータインジェストと 1 桁ミリ秒のクエリ応答時間を特徴としています。詳細については、こちらを参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

処理エンジンプラグインを使用して Timestream for InfluxDB を拡張する

処理エンジンは、Amazon Timestream の InfluxDB 3 データベース内で実行される埋め込み Python 仮想マシンです。Core エディションと Enterprise エディションの両方で使用できます。ワークフローの自動化、データの変換、カスタム API エンドポイントの作成を可能にするカスタム Python コードを使用してデータベースを拡張できます。

処理エンジンは、特定のデータベースイベントに対応して Python プラグインを実行します。

  • データ書き込み: データベースに入るデータを処理および変換する

  • スケジュールされたイベント: 定義された間隔または特定の時間でコードを実行する

  • HTTP リクエスト: コードを実行するカスタム API エンドポイントを公開する

エンジンには、実行間の状態を管理するためのインメモリキャッシュが含まれているため、データベース内でステートフルアプリケーションを直接構築できます。

InfluxData 認定プラグイン

起動時、InfluxDB 3 には、InfluxData によって認定された構築済みの完全に設定可能なプラグインのセットが含まれています。

  • データ変換: 受信データを処理してエンリッチする

  • アラート: データしきい値に基づいて通知を送信する

  • 集計: 時系列データの統計を計算する

  • システムモニタリング: リソースの使用状況とヘルスメトリクスを追跡する

  • 統合: 外部サービスと API に接続する

これらの認定プラグインはすぐに使用でき、特定の要件を満たすためにトリガー引数を使用して設定できます。

プラグインタイプとトリガーの仕様

プラグインタイプ トリガー仕様 プラグインの実行タイミング ユースケース
データの書き込み table:<TABLE_NAME>またはall_tables データがテーブルに書き込まれるとき データ変換、アラート、派生メトリクス
スケジュール済み every:<DURATION>またはcron:<EXPRESSION> 指定された間隔で 定期的な集計、レポート、ヘルスチェック
HTTP リクエスト request:<REQUEST_PATH> HTTP リクエストが受信されたとき カスタム API、ウェブフック、ユーザーインターフェイス

トリガーの作成

プラグインをデータベースイベントに接続し、実行するタイミングを定義します。influxdb3 create trigger コマンドを使用します。

データ書き込みトリガーを作成するには:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

スケジュール済みトリガーを作成するには:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

HTTP リクエストトリガーを作成するには:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

エンドポイントへのアクセス先: https://your-cluster-endpoint:8086/api/v3/engine/webhook

トリガーの設定

プラグインに引数を渡す

トリガー引数を使用してプラグインの動作を設定します。

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

引数はディクショナリとしてプラグインに渡されます。

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

エラー処理の動作

トリガーがエラーを処理する方法を設定します。

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

非同期実行

複数のトリガーインスタンスを同時に実行できるようにします。

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

トリガーを管理

データベースのトリガーを表示するには:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

書き込みトリガーのテーブル除外

all_tables を使用する際にプラグインコード内のテーブルをフィルタリングするには:

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

プラグインの実装は次のとおりです。

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

分散デプロイに関する考慮事項

マルチノードデプロイでは、ノードロールに基づいてプラグインを設定します。

プラグインタイプ ノードタイプ 理由
データ書き込みプラグイン インジェスターノード 取り込みポイントでデータを処理する
HTTP リクエストプラグイン クエリノード API トラフィックを処理する
スケジュールされたプラグイン 任意の設定済みノード スケジューラを使用して任意のノードで実行できる

Enterprise デプロイでは、以下の考慮事項が重要です。

  • 関連するすべてのノードで同じプラグイン設定を維持します。

  • 外部クライアント (Grafana、ダッシュボード) をクエリノードにルーティングします。

  • トリガーが実行されるノードでプラグインが使用可能であることを確認します。

ベストプラクティス

  • プラグイン設定

    • ハードコーディングの代わりに、設定可能な値にトリガー引数を使用します。

    • プラグイン内に適切なエラー処理を実装します。

    • データベースオペレーションには influxdb3_local API を使用します。

  • パフォーマンスの最適化

    • 重い処理タスクには非同期実行を使用します。

    • フィルタリングされたデータの早期リターンを実装します。

    • プラグイン内のデータベースクエリを最小限に抑えます。

  • エラー管理

    • 適切なエラー動作 (ログ、再試行、または無効化) を選択します。

    • システムテーブルを使用してプラグインの実行をモニタリングします。

    • 本番環境のデプロイ前にプラグインを徹底的にテストします。

  • セキュリティに関する考慮事項

    • HTTP リクエストプラグインのすべての入力データを検証します。

    • 機密設定を保存するための安全な方法を使用します。

    • プラグインのアクセス許可を必要なオペレーションのみに制限します。

プラグインの実行をモニタリングする

システムテーブルをクエリしてプラグインのパフォーマンスをモニタリングします。

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

処理エンジンは、データ処理ロジックをデータの近くに保ち、レイテンシーを減らし、アーキテクチャを簡素化しながら、InfluxDB 3 機能を拡張する強力な方法を提供します。

InfluxData 認定プラグイン

Amazon Timestream for InfluxDB 3 には、カスタム開発を必要とせずにデータベース機能を拡張する、構築および認定済みプラグインの包括的なセットが含まれています。これらのプラグインは完全に設定可能で、起動時にすぐに使用でき、データ処理、モニタリング、アラートのための高度な機能を提供します。

詳細なドキュメントとソースコードについては、InfluxData プラグインリポジトリを参照してください。

使用可能なプラグイン

異常検出プラグイン

MAD ベースの異常検出

  • トリガータイプ: データ書き込み (リアルタイム)

  • ユースケース: ストリーミングデータの外れ値リアルタイム検出、センサーモニタリング、品質管理

  • GitHub: MAD 異常検出ドキュメント

仕組み: 平均絶対偏差 (MAD) を使用して、通常の動作の堅牢なベースラインを確立します。新しいデータが到着すると、各ポイントの中央値から離れた MAD の数を計算します。しきい値 (k * MAD) を超えるポイントには異常としてフラグが付けられます。

主な機能:

  • データ書き込み時のリアルタイム処理。

  • 効率を高めるためにインメモリスライディングウィンドウを維持。

  • カウントベースのアラート (例: 5 回連続する異常)。

  • 期間ベースのアラート (2 分間の異常など)。

  • 値の急速な変化によるアラート疲れを防ぐためのフリップ抑制。

使用例:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

出力: フィールド名、値、期間など、異常が検出されたときにリアルタイムの通知を送信します。

データ変換プラグイン

基本的な変換

  • トリガータイプ: スケジュール、データ書き込み

  • ユースケース: データの標準化、単位変換、フィールド名の正規化、データクリーニング

  • GitHub: 基本的な変換ドキュメント

仕組み: フィールド名と値に一連の変換を適用します。履歴データをバッチ処理 (スケジュール) したり、到着時にデータを変換 (データ書き込み) したりできます。変換は指定された順序で適用され、複雑なデータパイプラインが可能になります。

主な機能:

  • フィールド名の変換: snake_case、スペースの削除、英数字のみ。

  • 単位変換: 温度、圧力、長さ、時間単位。

  • 正規表現サポートによるカスタム文字列置換。

  • データを書き込まずにテストするためのドライランモード。

  • 履歴データのバッチ処理。

使用例:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

出力: 変換されたデータを含む新しいテーブルを作成し、元のタイムスタンプとタグを保持します。

ダウンサンプラー

  • トリガータイプ: スケジュール済み、HTTP

  • ユースケース: データ削減、長期ストレージの最適化、サマリー統計の作成、パフォーマンスの向上。

  • GitHub:ダウンサンプラーのドキュメント

仕組み: 高解像度の時系列データを低解像度の概要に集約します。例えば、1 秒のデータを 1 時間の平均に変換します。各ダウンサンプリングポイントには、圧縮された元のポイントの数と対象時間範囲に関するメタデータが含まれます。

主な機能:

  • 複数の集計関数: 平均、合計、最小、最大、中央値、微分。

  • フィールド固有の集計 (フィールドごとに異なる関数)。

  • メタデータ追跡 (record_count、time_from、time_to)。

  • バックフィルによるオンデマンドダウンサンプリング用の HTTP API。

  • 大規模なデータセットの設定可能なバッチサイズ。

使用例:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

出力: 集計値と、圧縮されたポイントの数と時間範囲を示すメタデータ列を使用して、ダウンサンプリングされたデータを作成します。

プラグインのモニタリングとアラート

状態変化モニタ

  • トリガータイプ: スケジュール、データ書き込み

  • ユースケース: ステータスモニタリング、機器状態追跡、プロセスモニタリング、変更検出。

  • GitHub: 状態変化ドキュメント

仕組み: 経時的なフィールド値の変化を追跡し、変更回数が設定されたしきい値を超えたときに警告します。値の変化 (異なる値) と特定の値条件 (ターゲット値に等しい) の両方を検出できます。ノイズの多いシグナルからのアラートを防ぐための安定性チェックが含まれています。

主な機能:

  • カウントベースの変更検出 (10 分で 5 回の変更など)。

  • 期間ベースのモニタリング (5 分間のステータス = 「エラー」など)。

  • ノイズ低減のための状態変化ウィンドウ。

  • 独立したしきい値によるマルチフィールドモニタリング。

  • 設定可能な安定性要件。

使用例:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

出力: アラートには、フィールド名、検出された変化の数、時間枠、関連するタグ値が含まれます。

システムメトリクスコレクター

  • トリガータイプ: スケジュール済み

  • ユースケース: インフラストラクチャのモニタリング、パフォーマンスベースライン、キャパシティプランニング、リソース追跡。

  • GitHub:システムメトリクスのドキュメント

仕組み: psutil ライブラリを使用して、InfluxDB を実行しているホストから包括的なシステムメトリクスを収集します。CPU、メモリ、ディスク、ネットワーク統計を設定可能な間隔で収集します。各メトリクスタイプは個別に有効/無効にできます。

主な機能:

  • 負荷平均を使用したコアごとの CPU 統計。

  • スワップ障害やページ障害などのメモリ使用量。

  • 計算された IOPS とレイテンシーを含むディスク I/O メトリクス。

  • エラー追跡を使用したネットワークインターフェイス統計。

  • 設定可能なメトリクス収集 (特定のタイプを有効/無効にする)。

  • 収集の失敗時に自動的に再試行。

使用例:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

出力: サブシステムごとに詳細なメトリクスを含む複数のテーブル (system_cpu、system_memory、system_disk_io など) を作成します。

予測分析プラグイン

Prophet 予測

  • トリガータイプ: スケジュール済み、HTTP

  • ユースケース: 需要予測、キャパシティプランニング、傾向分析、季節的なパターン検出。

  • GitHub:Prophet 予測のドキュメント

仕組み: Facebook の Prophet ライブラリを使用して時系列予測モデルを構築します。履歴データに基づいてモデルをトレーニングし、将来の期間の予測を生成できます。モデルは、傾向、季節性、祝日、変化点を予測に織り込みます。一貫した予測のためのモデル永続性をサポートします。

主な機能:

  • 自動季節性検出 (毎日、毎週、毎年)。

  • 祝祭日カレンダーのサポート (組み込みおよびカスタム)。

  • トレンド転換の変化点検出。

  • モデルの永続性とバージョニング。

  • 予測の信頼区間。

  • MSRE しきい値による検証。

使用例:

# Train and forecast temperature data influxdb3 create trigger \ --database weather \ --plugin-filename "prophet_forecasting/prophet_forecasting.py" \ --trigger-spec "every:1d" \ --trigger-arguments 'measurement=temperature,field=value,window=90d,forecast_horizont=7d,tag_values="location:seattle",target_measurement=temperature_forecast,model_mode=train,unique_suffix=seattle_v1,seasonality_mode=additive' \ temp_forecast # Validate temperature predictions with MAE influxdb3 create trigger \ --database weather \ --plugin-filename "forecast_error_evaluator/forecast_error_evaluator.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'forecast_measurement=temp_forecast,actual_measurement=temp_actual,forecast_field=predicted,actual_field=temperature,error_metric=mae,error_thresholds=WARN-"2.0":ERROR-"5.0",window=6h,rounding_freq=5min,senders=discord' \ temp_forecast_check

出力: エラーメトリクス値や影響を受ける時間範囲など、予測エラーがしきい値を超えたときに通知を送信します。

一般的な設定パターン

TOML 設定ファイルの使用

複雑な設定の場合は、インライン引数の代わりに TOML ファイルを使用します。

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

プラグインのチェーン化

複数のプラグインを連鎖させてデータ処理パイプラインを作成します。

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

プラグインのベストプラクティス

  • 保守的に開始 – しきい値を高くしてウィンドウを長くしてから、観測されたパターンに基づいて調整します。

  • 開発中のテスト – 本番デプロイの前に、ドライランモードとテストデータベースを使用します。

  • プラグインのパフォーマンスのモニタリング – システムテーブルの実行時間とリソースの使用状況を確認します。

  • 適切なトリガータイプの使用 – バッチ処理にスケジュール済みトリガーを選択し、リアルタイムにデータ書き込みを行います。

  • 通知を賢く設定 – 重要度レベルとデバウンスロジックを使用して、アラート疲れを防ぎます。

  • モデルの永続性を活用 – ML ベースのプラグインの場合、トレーニング済みモデルを整合性のために保存します。

  • ドキュメント設定 – わかりやすいトリガー名を使用し、設定ドキュメントを維持します。

プラグインの実行をモニタリングする

プラグインのパフォーマンスをモニタリングするには:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

一般的な問題のトラブルシューティング

次の表は、一般的な問題と考えられる解決策を示しています。

問題 解決策
プラグインがトリガーされない トリガーが有効になっていることを確認し、スケジュール/仕様構文を確認する
通知が見つからない Notifier プラグインがインストールされていることを確認し、ウェブフック URL を確認する
メモリ使用量が多い ウィンドウサイズを減らし、バッチ処理間隔を調整する
変換が正しくない ドライランモードを使用して、フィールド名とデータ型を検証する
予測が正確でない トレーニングデータウィンドウを増やし、季節性設定を調整する
アラートが多すぎる トリガーカウントを増やし、デバウンス期間を追加し、しきい値を調整する

これらの認定プラグインは、一般的な時系列データ処理ニーズに対応するエンタープライズ対応機能を提供することで、包括的な設定オプションを通じて柔軟性を維持しつつ、カスタム開発の必要性をなくします。詳細なドキュメント、例、更新については、GitHub リポジトリを参照してください。