

Amazon Timestream for LiveAnalytics に類似した機能をご希望の場合は Amazon Timestream for InfluxDB をご検討ください。リアルタイム分析に適した、シンプルなデータインジェストと 1 桁ミリ秒のクエリ応答時間を特徴としています。詳細については、[こちら](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# 処理エンジンプラグインを使用して Timestream for InfluxDB を拡張する
<a name="processing-engine"></a>

 処理エンジンは、Amazon Timestream の InfluxDB 3 データベース内で実行される埋め込み Python 仮想マシンです。Core エディションと Enterprise エディションの両方で使用できます。ワークフローの自動化、データの変換、カスタム API エンドポイントの作成を可能にするカスタム Python コードを使用してデータベースを拡張できます。

 処理エンジンは、特定のデータベースイベントに対応して Python プラグインを実行します。
+  **データ書き込み**: データベースに入るデータを処理および変換する 
+  **スケジュールされたイベント**: 定義された間隔または特定の時間でコードを実行する 
+  **HTTP リクエスト**: コードを実行するカスタム API エンドポイントを公開する 

 エンジンには、実行間の状態を管理するためのインメモリキャッシュが含まれているため、データベース内でステートフルアプリケーションを直接構築できます。

## InfluxData 認定プラグイン
<a name="influxdata-certified-plugins"></a>

 起動時、InfluxDB 3 には、InfluxData によって認定された構築済みの完全に設定可能なプラグインのセットが含まれています。
+  **データ変換**: 受信データを処理してエンリッチする 
+  **アラート**: データしきい値に基づいて通知を送信する 
+  **集計**: 時系列データの統計を計算する 
+  **システムモニタリング**: リソースの使用状況とヘルスメトリクスを追跡する 
+  **統合**: 外部サービスと API に接続する 

 これらの認定プラグインはすぐに使用でき、特定の要件を満たすためにトリガー引数を使用して設定できます。

## プラグインタイプとトリガーの仕様
<a name="plugin-types-and-trigger-specifications"></a>


|  **プラグインタイプ**  |  **トリガー仕様**  |  **プラグインの実行タイミング**  |  **ユースケース**  | 
| --- | --- | --- | --- | 
|  データの書き込み  |  table:<TABLE\_NAME> または all\_tables  |  データがテーブルに書き込まれるとき  |  データ変換、アラート、派生メトリクス  | 
|  スケジュール済み  |  every:<DURATION> または cron:<EXPRESSION>  |  指定された間隔で  |  定期的な集計、レポート、ヘルスチェック  | 
|  HTTP リクエスト  |  request:<REQUEST\_PATH>  |  HTTP リクエストが受信されたとき  |  カスタム API、ウェブフック、ユーザーインターフェイス  | 

## トリガーの作成
<a name="creating-triggers"></a>

 プラグインをデータベースイベントに接続し、実行するタイミングを定義します。`influxdb3 create trigger` コマンドを使用します。

データ書き込みトリガーを作成するには:

```
# Trigger on writes to a specific table
influxdb3 create trigger \
  --trigger-spec "table:sensor_data" \
  --plugin-filename "process_sensors.py" \
  --database DATABASE_NAME \
  sensor_processor

# Trigger on all table writes
influxdb3 create trigger \
  --trigger-spec "all_tables" \
  --plugin-filename "process_all_data.py" \
  --database DATABASE_NAME \
  all_data_processor
```

 スケジュール済みトリガーを作成するには:

```
# Run every 5 minutes
influxdb3 create trigger \
  --trigger-spec "every:5m" \
  --plugin-filename "periodic_check.py" \
  --database DATABASE_NAME \
  regular_check

# Run daily at 8am (cron format with seconds)
influxdb3 create trigger \
  --trigger-spec "cron:0 0 8 * * *" \
  --plugin-filename "daily_report.py" \
  --database DATABASE_NAME \
  daily_report
```

HTTP リクエストトリガーを作成するには:

```
# Create endpoint at /api/v3/engine/webhook
influxdb3 create trigger \
  --trigger-spec "request:webhook" \
  --plugin-filename "webhook_handler.py" \
  --database DATABASE_NAME \
  webhook_processor
```

 エンドポイントへのアクセス先: `https://your-cluster-endpoint:8086/api/v3/engine/webhook` 

## トリガーの設定
<a name="configuring-triggers"></a>

### プラグインに引数を渡す
<a name="passing-arguments-to-plugins"></a>

 トリガー引数を使用してプラグインの動作を設定します。

```
influxdb3 create trigger \
  --trigger-spec "every:1h" \
  --plugin-filename "threshold_check.py" \
  --trigger-arguments "threshold=90,notify_email=admin@example.com" \
  --database DATABASE_NAME \
  threshold_monitor
```

 引数はディクショナリとしてプラグインに渡されます。

```
def process_scheduled_call(influxdb3_local, call_time, args=None):
    if args and "threshold" in args:
        threshold = float(args["threshold"])
        email = args.get("notify_email", "default@example.com")
        # Use arguments in your logic
```

### エラー処理の動作
<a name="error-handling-behavior"></a>

 トリガーがエラーを処理する方法を設定します。

```
# Log errors (default)
influxdb3 create trigger \
  --trigger-spec "table:metrics" \
  --plugin-filename "process.py" \
  --error-behavior log \
  --database DATABASE_NAME \
  log_processor

# Retry on error
influxdb3 create trigger \
  --trigger-spec "table:critical_data" \
  --plugin-filename "critical.py" \
  --error-behavior retry \
  --database DATABASE_NAME \
  retry_processor

# Disable trigger on error
influxdb3 create trigger \
  --trigger-spec "request:webhook" \
  --plugin-filename "webhook.py" \
  --error-behavior disable \
  --database DATABASE_NAME \
  auto_disable_processor
```

### 非同期実行
<a name="asynchronous-execution"></a>

 複数のトリガーインスタンスを同時に実行できるようにします。

```
influxdb3 create trigger \
  --trigger-spec "table:metrics" \
  --plugin-filename "heavy_process.py" \
  --run-asynchronous \
  --database DATABASE_NAME \
  async_processor
```

## トリガーを管理
<a name="managing-triggers"></a>

データベースのトリガーを表示するには:

```
# Show all triggers for a database
influxdb3 show summary \
  --database DATABASE_NAME \
  --token YOUR_TOKEN
```

### 書き込みトリガーのテーブル除外
<a name="table-exclusion-for-write-triggers"></a>

`all_tables` を使用する際にプラグインコード内のテーブルをフィルタリングするには: 

```
influxdb3 create trigger \
  --trigger-spec "all_tables" \
  --plugin-filename "processor.py" \
  --trigger-arguments "exclude_tables=temp_data,debug_info" \
  --database DATABASE_NAME \
  data_processor
```

プラグインの実装は次のとおりです。

```
def process_writes(influxdb3_local, table_batches, args=None):
    excluded_tables = set(args.get('exclude_tables', '').split(','))
    
    for table_batch in table_batches:
        if table_batch["table_name"] in excluded_tables:
            continue
        # Process allowed tables
```

## 分散デプロイに関する考慮事項
<a name="distributed-deployment-considerations"></a>

 マルチノードデプロイでは、ノードロールに基づいてプラグインを設定します。


|  **プラグインタイプ**  |  **ノードタイプ**  |  **理由**  | 
| --- | --- | --- | 
|  データ書き込みプラグイン  |  インジェスターノード  |  取り込みポイントでデータを処理する  | 
|  HTTP リクエストプラグイン  |  クエリノード  |  API トラフィックを処理する  | 
|  スケジュールされたプラグイン  |  任意の設定済みノード  |  スケジューラを使用して任意のノードで実行できる  | 

 Enterprise デプロイでは、以下の考慮事項が重要です。
+  関連するすべてのノードで同じプラグイン設定を維持します。
+  外部クライアント (Grafana、ダッシュボード) をクエリノードにルーティングします。
+  トリガーが実行されるノードでプラグインが使用可能であることを確認します。

## ベストプラクティス
<a name="best-practices-influxdb3-1"></a>
+  **プラグイン設定** 
  +  ハードコーディングの代わりに、設定可能な値にトリガー引数を使用します。
  +  プラグイン内に適切なエラー処理を実装します。
  +  データベースオペレーションには `influxdb3_local` API を使用します。
+  **パフォーマンスの最適化** 
  +  重い処理タスクには非同期実行を使用します。
  +  フィルタリングされたデータの早期リターンを実装します。
  +  プラグイン内のデータベースクエリを最小限に抑えます。
+  **エラー管理** 
  +  適切なエラー動作 (ログ、再試行、または無効化) を選択します。
  +  システムテーブルを使用してプラグインの実行をモニタリングします。
  +  本番環境のデプロイ前にプラグインを徹底的にテストします。
+  **セキュリティに関する考慮事項** 
  +  HTTP リクエストプラグインのすべての入力データを検証します。
  +  機密設定を保存するための安全な方法を使用します。
  +  プラグインのアクセス許可を必要なオペレーションのみに制限します。

## プラグインの実行をモニタリングする
<a name="monitoring-plugin-execution"></a>

 システムテーブルをクエリしてプラグインのパフォーマンスをモニタリングします。

```
-- View processing engine logs
SELECT * FROM system.processing_engine_logs 
WHERE time > now() - INTERVAL '1 hour'
ORDER BY time DESC

-- Check trigger status
SELECT * FROM system.processing_engine_triggers
WHERE database = 'DATABASE_NAME'
```

 処理エンジンは、データ処理ロジックをデータの近くに保ち、レイテンシーを減らし、アーキテクチャを簡素化しながら、InfluxDB 3 機能を拡張する強力な方法を提供します。

### InfluxData 認定プラグイン
<a name="influxdata-certified-plugins-1"></a>

 Amazon Timestream for InfluxDB 3 には、カスタム開発を必要とせずにデータベース機能を拡張する、構築および認定済みプラグインの包括的なセットが含まれています。これらのプラグインは完全に設定可能で、起動時にすぐに使用でき、データ処理、モニタリング、アラートのための高度な機能を提供します。

 詳細なドキュメントとソースコードについては、[InfluxData プラグインリポジトリ](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata)を参照してください。

## 使用可能なプラグイン
<a name="available-plugins"></a>

### LiveAnalytics 移行プラグインの Timestream
<a name="liveanalytics-migration-plugins"></a>

#### LiveAnalytics の Timestream から InfluxDB の Timestream にデータベースを移行する
<a name="laMigrationPlugin"></a>
+  **トリガータイプ**: HTTP 
+  **ユースケース**: LiveAnalytics の Timestream から InfluxDB の Timestream に時系列データベースを移行する。
+  **GitHub**: [ LiveAnalytics 移行プラグインの Timestream ドキュメント](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/python/liveanalytics_influxdb3_migration_plugin) 

 **仕組み**: Timestream for LiveAnalytics  移行プラグインは、[Timestream for LiveAnalytics 移行クライアント](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/python/liveanalytics_influxdb3_migration_plugin/liveanalytics_migration_client)と連携します。クライアントは、[LiveAnalytics UNLOAD コマンドの Timestream](https://docs.aws.amazon.com/timestream/latest/developerguide/supported-sql-constructs.UNLOAD.html) を実行して、LiveAnalytics データベースを Parquet 形式の S3 バケットにエクスポートします。データがエクスポートされると、クライアントは Parquet ファイルの[署名付き URLs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html) を生成し、署名付き URLs。プラグインの実行中に、S3 オブジェクトは S3 バケットから取得され、InfluxDB ラインプロトコルに変換され、InfluxDB 3 データベースに書き込まれます。

 **ベストプラクティス**: Timestream for LiveAnalytics  移行プラグインは、単一の InfluxDB 3 Enterprise ノードで実行する必要があります。プラグインクライアントで使用される InfluxDB 3 エンドポイントが、クラスターエンドポイントではなくプロセスノードエンドポイントであることを確認します。移行を実行するクラスターは、移行プラグインの実行中に取り込みやクエリを実行しないでください。メモリ不足エラーが発生する可能性があります。

 移行パフォーマンスは、InfluxDB 3 ノードで使用できるリソースと、移行されるデータの特性によって異なります。テストでは、1 時間あたり 30,000,000 件の LiveAnalytics レコードが移行されるスループットが確認されました。実際のパフォーマンスは、いくつかの要因によって異なる場合があります。

 **データマッピング**: 次の表は、LiveAnalytics データの Timestream をラインプロトコルデータにマッピングする方法を示しています。


| Timestream for LiveAnalytics の概念 | ラインプロトコルの概念 | 
| --- | --- | 
| [[テーブル]](https://docs.aws.amazon.com/timestream/latest/developerguide/API_Table.html) | [測定](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#measurement) | 
| [ディメンション](https://docs.aws.amazon.com/timestream/latest/developerguide/API_Dimension.html) | [タグ](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#tag-set) | 
| [メジャー名](https://docs.aws.amazon.com/timestream/latest/developerguide/data-modeling.html#data-modeling-measurenamemulti) | タグ | 
| [メジャー](https://docs.aws.amazon.com/timestream/latest/developerguide/API_MeasureValue.html) | [フィールド](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#field-set) | 
| [時間](https://docs.aws.amazon.com/timestream/latest/developerguide/writes.html#writes.data-types) | [タイムスタンプ](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#timestamp) | 

 **単一メジャーレコード変換**: テーブル の Timestream for LiveAnalytics の単一メジャーレコードを次に示します`example_table`。


| ホスト | リージョン | request\_id | measure\_name | 時間 | measure\_value::double | 
| --- | --- | --- | --- | --- | --- | 
| host1 | us-west-2 | saio3242ovnfk | cpu\_usage | 2025-04-17 16:42:54.702394001 | 0.66 | 

このレコードは次のように変換されます。

```
example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001
```

 **複数メジャーレコード変換**: 以下は、テーブルの Timestream for LiveAnalytics の複数メジャーレコード`example_table`で、すべてメジャーの右側`time`にあります。


| ホスト | リージョン | request\_id | measure\_name | 時間 | cpu\_usage | memory\_usage | 
| --- | --- | --- | --- | --- | --- | --- | 
| host1 | us-west-2 | saio3242ovnfk | metrics | 2025-04-17 16:42:54.702394001 | 0.66 | 0.21 | 

このレコードは次のように変換されます。

```
example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
```

**重要**  
プラグインが S3 の LiveAnalytics データを取得するために使用する署名付き URLs は、設定の有効期限が切れるか、生成に使用される IAM 認証情報の有効期限が切れます (最大 7 日間）。EC2 インスタンス (`t3.medium`インスタンスで十分) で移行クライアントを実行することをお勧めします。EC2 インスタンスは IAM 認証情報を自動的にローテーションし、移行中に署名付き URL の時間制約がなくなるためです。EC2 インスタンスを使用しない場合、移行を再開でき、大規模なデータセットでは複数の再開呼び出しが必要になる場合があります。  
Timestream for LiveAnalytics 移行プラグインは、1 つの LiveAnalytics データベース内の 10 億レコードまたは 125GB 未満の移行に推奨されます。 LiveAnalytics   
移行プラグインは、クラスター内の単一のプロセスノードでのみ使用してください。list[list-db-instances-for-cluster](https://docs.aws.amazon.com/cli/latest/reference/timestream-influxdb/list-db-instances-for-cluster.html) を使用してプロセスノードを決定し、 `INFLUXDB3_HOST_URL`を タイプのインスタンスモードを持つデータベースインスタンスの 1 つのエンドポイントに設定するか`PROCESS`、Timestream コンソールを使用してクラスターを選択してプロセスノードを見つけることができます。

 **主な特徴:** 
+  UNLOAD コマンドを使用して、時系列データを Timestream for LiveAnalytics から S3 バケットにエクスポートします。
+  移行する S3 オブジェクトごとに署名付き URLs を生成します。
+  各 S3 オブジェクトの移行プロセスを追跡します。
+  移行が成功した後に S3 オブジェクトをクリーンアップします。
+  署名付き URL の有効期限が切れた場合の移行の失敗の再開をサポートします。

 **使用例**: 

```
# Migrate a LiveAnalytics database to InfluxDB 3
export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>"
export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>"
export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>"

aws s3api create-bucket --bucket <your S3 bucket name> \
    --object-lock-enabled-for-bucket --region <your region> \
    --create-bucket-configuration LocationConstraint=<your region>
```

**注記**  
README のサンプルバケットポリシーを使用して S3 バケットポリシーを更新します。詳細については、「[前提条件](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/python/liveanalytics_influxdb3_migration_plugin#prerequisites)」を参照してください。

```
python3 liveanalytics_influxdb3_migration_client.py \
    --live-analytics-database-name <your LiveAnalytics database name> \
    --s3-bucket-name <your S3 bucket name>
```

 **出力:**  LiveAnalytics データベースの Timestream はラインプロトコルに変換され、InfluxDB 3 データベースに取り込まれます。

### 異常検出プラグイン
<a name="anomaly-detection-plugins"></a>

#### MAD ベースの異常検出
<a name="madPlugin"></a>
+  **トリガータイプ**: データ書き込み (リアルタイム) 
+  **ユースケース**: ストリーミングデータの外れ値リアルタイム検出、センサーモニタリング、品質管理 
+  **GitHub**: [MAD 異常検出ドキュメント](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/mad_check) 

 **仕組み:** 平均絶対偏差 (MAD) を使用して、通常の動作の堅牢なベースラインを確立します。新しいデータが到着すると、各ポイントの中央値から離れた MAD の数を計算します。しきい値 (k \* MAD) を超えるポイントには異常としてフラグが付けられます。

 **主な特徴:** 
+  データ書き込み時のリアルタイム処理。
+  効率を高めるためにインメモリスライディングウィンドウを維持。
+  カウントベースのアラート (例: 5 回連続する異常)。
+  期間ベースのアラート (2 分間の異常など)。
+  値の急速な変化によるアラート疲れを防ぐためのフリップ抑制。

 **使用例**: 

```
# Detect temperature anomalies in real-time
influxdb3 create trigger \
  --database sensors \
  --plugin-filename "mad_check/mad_check_plugin.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \
  temp_anomaly_detector

# Threshold format: field:k_multiplier:window_size:trigger_condition
# temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies
# humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly
```

 **出力:** フィールド名、値、期間など、異常が検出されたときにリアルタイムの通知を送信します。

### データ変換プラグイン
<a name="data-transformation-plugins"></a>

#### 基本的な変換
<a name="basicdataPlugin"></a>
+  **トリガータイプ**: スケジュール、データ書き込み 
+  **ユースケース**: データの標準化、単位変換、フィールド名の正規化、データクリーニング 
+  **GitHub**: [基本的な変換ドキュメント](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/basic_transformation) 

 **仕組み:** フィールド名と値に一連の変換を適用します。履歴データをバッチ処理 (スケジュール) したり、到着時にデータを変換 (データ書き込み) したりできます。変換は指定された順序で適用され、複雑なデータパイプラインが可能になります。

 **主な特徴:** 
+  フィールド名の変換: snake\_case、スペースの削除、英数字のみ。
+  単位変換: 温度、圧力、長さ、時間単位。
+  正規表現サポートによるカスタム文字列置換。
+  データを書き込まずにテストするためのドライランモード。
+  履歴データのバッチ処理。

 **使用例**: 

```
# Transform temperature data from Celsius to Fahrenheit with field name standardization
influxdb3 create trigger \
  --database weather \
  --plugin-filename "basic_transformation/basic_transformation.py" \
  --trigger-spec "every:30m" \
  --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \
  temp_converter

# Real-time field name cleaning for incoming sensor data
influxdb3 create trigger \
  --database iot \
  --plugin-filename "basic_transformation/basic_transformation.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \
  sensor_cleaner
```

 **出力:** 変換されたデータを含む新しいテーブルを作成し、元のタイムスタンプとタグを保持します。

#### ダウンサンプラー
<a name="downsamplerPlugin"></a>
+  **トリガータイプ**: スケジュール済み、HTTP 
+  **ユースケース**: データ削減、長期ストレージの最適化、サマリー統計の作成、パフォーマンスの向上。
+  **GitHub**:[ダウンサンプラーのドキュメント](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/downsampler) 

 **仕組み:** 高解像度の時系列データを低解像度の概要に集約します。例えば、1 秒のデータを 1 時間の平均に変換します。各ダウンサンプリングポイントには、圧縮された元のポイントの数と対象時間範囲に関するメタデータが含まれます。

 **主な特徴:** 
+  複数の集計関数: 平均、合計、最小、最大、中央値、微分。
+  フィールド固有の集計 (フィールドごとに異なる関数)。
+  メタデータ追跡 (record\_count、time\_from、time\_to)。
+  バックフィルによるオンデマンドダウンサンプリング用の HTTP API。
+  大規模なデータセットの設定可能なバッチサイズ。

 **使用例**: 

```
# Downsample CPU metrics from 10-second to hourly resolution
influxdb3 create trigger \
  --database metrics \
  --plugin-filename "downsampler/downsampler.py" \
  --trigger-spec "every:1h" \
  --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \
  cpu_downsampler

# HTTP endpoint for on-demand downsampling
curl -X POST http://localhost:8086/api/v3/engine/downsample \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -d '{
    "source_measurement": "sensor_data",
    "target_measurement": "sensor_daily",
    "interval": "1d",
    "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]],
    "backfill_start": "2024-01-01T00:00:00Z",
    "backfill_end": "2024-12-31T23:59:59Z"
  }'
```

 **出力:** 集計値と、圧縮されたポイントの数と時間範囲を示すメタデータ列を使用して、ダウンサンプリングされたデータを作成します。

### プラグインのモニタリングとアラート
<a name="monitoring-and-alerting-plugins"></a>

#### 状態変化モニタ
<a name="statechangemonitorPlugin"></a>
+  **トリガータイプ**: スケジュール、データ書き込み 
+  **ユースケース**: ステータスモニタリング、機器状態追跡、プロセスモニタリング、変更検出。
+  **GitHub**: [状態変化ドキュメント](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/state_change) 

 **仕組み:** 経時的なフィールド値の変化を追跡し、変更回数が設定されたしきい値を超えたときに警告します。値の変化 (異なる値) と特定の値条件 (ターゲット値に等しい) の両方を検出できます。ノイズの多いシグナルからのアラートを防ぐための安定性チェックが含まれています。

 **主な特徴:** 
+  カウントベースの変更検出 (10 分で 5 回の変更など)。
+  期間ベースのモニタリング (5 分間のステータス = 「エラー」など)。
+  ノイズ低減のための状態変化ウィンドウ。
+  独立したしきい値によるマルチフィールドモニタリング。
+  設定可能な安定性要件。

 **使用例**: 

```
# Monitor equipment status changes
influxdb3 create trigger \
  --database factory \
  --plugin-filename "state_change/state_change_check_plugin.py" \
  --trigger-spec "every:5m" \
  --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \
  equipment_monitor

# Real-time monitoring for specific state conditions
influxdb3 create trigger \
  --database systems \
  --plugin-filename "state_change/state_change_check_plugin.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \
  service_monitor
```

 **出力:** アラートには、フィールド名、検出された変化の数、時間枠、関連するタグ値が含まれます。

#### システムメトリクスコレクター
<a name="systemMetricsCollectorPlugin"></a>
+  **トリガータイプ**: スケジュール済み 
+  **ユースケース**: インフラストラクチャのモニタリング、パフォーマンスベースライン、キャパシティプランニング、リソース追跡。
+  **GitHub**:[システムメトリクスのドキュメント](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/system_metrics) 

 **仕組み**: psutil ライブラリを使用して、InfluxDB を実行しているホストから包括的なシステムメトリクスを収集します。CPU、メモリ、ディスク、ネットワーク統計を設定可能な間隔で収集します。各メトリクスタイプは個別に有効/無効にできます。

 **主な特徴:** 
+  負荷平均を使用したコアごとの CPU 統計。
+  スワップ障害やページ障害などのメモリ使用量。
+  計算された IOPS とレイテンシーを含むディスク I/O メトリクス。
+  エラー追跡を使用したネットワークインターフェイス統計。
+  設定可能なメトリクス収集 (特定のタイプを有効/無効にする)。
+  収集の失敗時に自動的に再試行。

 **使用例**: 

```
# Collect all system metrics every 30 seconds
influxdb3 create trigger \
  --database monitoring \
  --plugin-filename "system_metrics/system_metrics.py" \
  --trigger-spec "every:30s" \
  --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \
  system_monitor

# Focus on CPU and memory for application servers
influxdb3 create trigger \
  --database app_monitoring \
  --plugin-filename "system_metrics/system_metrics.py" \
  --trigger-spec "every:1m" \
  --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \
  app_metrics
```

 **出力:** サブシステムごとに詳細なメトリクスを含む複数のテーブル (system\_cpu、system\_memory、system\_disk\_io など) を作成します。

## 一般的な設定パターン
<a name="common-configuration-patterns"></a>

### TOML 設定ファイルの使用
<a name="using-toml-configuration-files"></a>

 複雑な設定の場合は、インライン引数の代わりに TOML ファイルを使用します。

```
# anomaly_config.toml
measurement = "server_metrics"
field = "cpu_usage"
window = "1h"
detector_type = "IsolationForestAD"
contamination = 0.1
window_size = 20
output_table = "cpu_anomalies"
senders = "slack"
slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK"
notification_text = "Anomaly detected in $field: value=$value at $timestamp"
```

```
# Use TOML configuration
PLUGIN_DIR=~/.plugins influxdb3 create trigger \
  --database monitoring \
  --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \
  --trigger-spec "every:10m" \
  --trigger-arguments "config_file_path=anomaly_config.toml" \
  cpu_anomaly_detector
```

### プラグインのチェーン化
<a name="chaining-plugins"></a>

 複数のプラグインを連鎖させてデータ処理パイプラインを作成します。

```
# Step 1: Transform raw data
influxdb3 create trigger \
  --database pipeline \
  --plugin-filename "basic_transformation/basic_transformation.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \
  step1_transform

# Step 2: Downsample transformed data
influxdb3 create trigger \
  --database pipeline \
  --plugin-filename "downsampler/downsampler.py" \
  --trigger-spec "every:1h" \
  --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \
  step2_downsample

# Step 3: Detect anomalies in downsampled data
influxdb3 create trigger \
  --database pipeline \
  --plugin-filename "mad_check/mad_check_plugin.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \
  step3_anomaly
```

## プラグインのベストプラクティス
<a name="best-practices-influxdb3-2"></a>
+  **保守的に開始** – しきい値を高くしてウィンドウを長くしてから、観測されたパターンに基づいて調整します。
+  **開発中のテスト** – 本番デプロイの前に、ドライランモードとテストデータベースを使用します。
+  **プラグインのパフォーマンスのモニタリング** – システムテーブルの実行時間とリソースの使用状況を確認します。
+  **適切なトリガータイプの使用** – バッチ処理にスケジュール済みトリガーを選択し、リアルタイムにデータ書き込みを行います。
+  **通知を賢く設定** – 重要度レベルとデバウンスロジックを使用して、アラート疲れを防ぎます。
+  **モデルの永続性を活用** – ML ベースのプラグインの場合、トレーニング済みモデルを整合性のために保存します。
+  **ドキュメント設定** – わかりやすいトリガー名を使用し、設定ドキュメントを維持します。

## プラグインの実行をモニタリングする
<a name="monitoring-plugin-execution-1"></a>

プラグインのパフォーマンスをモニタリングするには:

```
-- View plugin execution logs
SELECT 
  event_time,
  trigger_name,
  log_level,
  log_text
FROM system.processing_engine_logs 
WHERE trigger_name = 'your_trigger_name'
  AND time > now() - INTERVAL '1 hour'
ORDER BY event_time DESC;

-- Monitor plugin performance
SELECT 
  trigger_name,
  COUNT(*) as executions,
  AVG(execution_time_ms) as avg_time_ms,
  MAX(execution_time_ms) as max_time_ms,
  SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count
FROM system.processing_engine_logs
WHERE time > now() - INTERVAL '24 hours'
GROUP BY trigger_name;

-- Check trigger status
SELECT * FROM system.processing_engine_triggers
WHERE database = 'your_database';
```

## 一般的な問題のトラブルシューティング
<a name="troubleshooting-common-issues"></a>

次の表は、一般的な問題と考えられる解決策を示しています。


|  **問題**  |  **解決策**  | 
| --- | --- | 
|  プラグインがトリガーされない  |  トリガーが有効になっていることを確認し、スケジュール/仕様構文を確認する  | 
|  通知が見つからない  |  Notifier プラグインがインストールされていることを確認し、ウェブフック URL を確認する  | 
|  メモリ使用量が多い  |  ウィンドウサイズを減らし、バッチ処理間隔を調整する  | 
|  変換が正しくない  |  ドライランモードを使用して、フィールド名とデータ型を検証する  | 
|  予測が正確でない  |  トレーニングデータウィンドウを増やし、季節性設定を調整する  | 
|  アラートが多すぎる  |  トリガーカウントを増やし、デバウンス期間を追加し、しきい値を調整する  | 

 これらの認定プラグインは、一般的な時系列データ処理ニーズに対応するエンタープライズ対応機能を提供することで、包括的な設定オプションを通じて柔軟性を維持しつつ、カスタム開発の必要性をなくします。詳細なドキュメント、例、更新については、[GitHub リポジトリ](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata)を参照してください。