

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics，可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间，以实现实时分析。点击[此处](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用处理引擎插件扩展适用于 InfluxDB 的 Timestream
<a name="processing-engine"></a>

 处理引擎是一种嵌入式 Python 虚拟机，在 Amazon Timestream 的 InfluxDB 3 数据库内部运行。提供核心版和企业版两种版本。使您能够使用自定义 Python 代码扩展数据库，这些代码可以自动执行工作流程、转换数据以及创建自定义 API 端点。

 处理引擎执行 Python 插件，以响应特定的数据库事件：
+  **数据写入**：对进入数据库的数据进行处理和转换 
+  **计划事件**：按定义的时间间隔或特定的时间运行代码 
+  **HTTP 请求**：暴露执行代码的自定义 API 端点 

 该引擎包含内存缓存，用于管理执行之间的状态，使您能够直接在数据库中构建具有状态的应用程序。

## InfluxData 经过认证的插件
<a name="influxdata-certified-plugins"></a>

 在发布时，InfluxDB 3 包括一组经过以下认证的预构建、完全可配置的插件： InfluxData
+  **数据转换**：处理并丰富传入数据 
+  **警报**：根据数据阈值发送通知 
+  **聚合**：计算时间序列数据的统计数据 
+  **系统监控**：跟踪资源使用情况和运行状况指标 
+  **集成**：Connect 连接到外部服务和 APIs 

 这些经过认证的插件已准备就绪，可通过触发器参数进行配置以满足特定需求。

## 插件类型和触发器规格
<a name="plugin-types-and-trigger-specifications"></a>


|  **插件类型**  |  **触发器规格**  |  **插件何时运行**  |  **使用案例**  | 
| --- | --- | --- | --- | 
|  数据写入  |  table:<TABLE\_NAME>或 all\_tables  |  何时将数据写入表  |  数据转换、警报、派生指标  | 
|  已安排  |  every:<DURATION>或 cron:<EXPRESSION>  |  在指定间隔  |  定期聚合、报告、运行状况检查  | 
|  HTTP 请求  |  request:<REQUEST\_PATH>  |  何时接收 HTTP 请求  |  自定义 APIs、网络挂钩、用户界面  | 

## 创建触发器
<a name="creating-triggers"></a>

 触发器将插件连接到数据库事件，并定义其执行时间。使用 `influxdb3 create trigger` 命令。

要创建数据写入触发器，请执行以下操作：

```
# Trigger on writes to a specific table
influxdb3 create trigger \
  --trigger-spec "table:sensor_data" \
  --plugin-filename "process_sensors.py" \
  --database DATABASE_NAME \
  sensor_processor

# Trigger on all table writes
influxdb3 create trigger \
  --trigger-spec "all_tables" \
  --plugin-filename "process_all_data.py" \
  --database DATABASE_NAME \
  all_data_processor
```

 要创建计划触发器，请执行以下操作：

```
# Run every 5 minutes
influxdb3 create trigger \
  --trigger-spec "every:5m" \
  --plugin-filename "periodic_check.py" \
  --database DATABASE_NAME \
  regular_check

# Run daily at 8am (cron format with seconds)
influxdb3 create trigger \
  --trigger-spec "cron:0 0 8 * * *" \
  --plugin-filename "daily_report.py" \
  --database DATABASE_NAME \
  daily_report
```

要创建 HTTP 请求触发器，请执行以下操作

```
# Create endpoint at /api/v3/engine/webhook
influxdb3 create trigger \
  --trigger-spec "request:webhook" \
  --plugin-filename "webhook_handler.py" \
  --database DATABASE_NAME \
  webhook_processor
```

 访问端点，网址为：`https://your-cluster-endpoint:8086/api/v3/engine/webhook`

## 配置触发器
<a name="configuring-triggers"></a>

### 向插件传递参数
<a name="passing-arguments-to-plugins"></a>

 使用触发器参数配置插件行为：

```
influxdb3 create trigger \
  --trigger-spec "every:1h" \
  --plugin-filename "threshold_check.py" \
  --trigger-arguments "threshold=90,notify_email=admin@example.com" \
  --database DATABASE_NAME \
  threshold_monitor
```

 参数以字典形式传递到插件：

```
def process_scheduled_call(influxdb3_local, call_time, args=None):
    if args and "threshold" in args:
        threshold = float(args["threshold"])
        email = args.get("notify_email", "default@example.com")
        # Use arguments in your logic
```

### 错误处理行为
<a name="error-handling-behavior"></a>

 配置触发器处理错误的方式：

```
# Log errors (default)
influxdb3 create trigger \
  --trigger-spec "table:metrics" \
  --plugin-filename "process.py" \
  --error-behavior log \
  --database DATABASE_NAME \
  log_processor

# Retry on error
influxdb3 create trigger \
  --trigger-spec "table:critical_data" \
  --plugin-filename "critical.py" \
  --error-behavior retry \
  --database DATABASE_NAME \
  retry_processor

# Disable trigger on error
influxdb3 create trigger \
  --trigger-spec "request:webhook" \
  --plugin-filename "webhook.py" \
  --error-behavior disable \
  --database DATABASE_NAME \
  auto_disable_processor
```

### 异步执行
<a name="asynchronous-execution"></a>

 允许同时运行多个触发器实例：

```
influxdb3 create trigger \
  --trigger-spec "table:metrics" \
  --plugin-filename "heavy_process.py" \
  --run-asynchronous \
  --database DATABASE_NAME \
  async_processor
```

## 管理触发器
<a name="managing-triggers"></a>

要查看数据库的触发器，请执行以下操作：

```
# Show all triggers for a database
influxdb3 show summary \
  --database DATABASE_NAME \
  --token YOUR_TOKEN
```

### 写入触发器的表排除项
<a name="table-exclusion-for-write-triggers"></a>

使用 `all_tables` 时，要在插件代码中筛选表，请执行以下操作：

```
influxdb3 create trigger \
  --trigger-spec "all_tables" \
  --plugin-filename "processor.py" \
  --trigger-arguments "exclude_tables=temp_data,debug_info" \
  --database DATABASE_NAME \
  data_processor
```

插件实施如下：

```
def process_writes(influxdb3_local, table_batches, args=None):
    excluded_tables = set(args.get('exclude_tables', '').split(','))
    
    for table_batch in table_batches:
        if table_batch["table_name"] in excluded_tables:
            continue
        # Process allowed tables
```

## 分布式部署注意事项
<a name="distributed-deployment-considerations"></a>

 在多节点部署中，根据节点角色配置插件：


|  **插件类型**  |  **节点类型**  |  **原因**  | 
| --- | --- | --- | 
|  数据写入插件  |  摄取程序节点  |  在摄取点处理数据  | 
|  HTTP 请求插件  |  查询器节点  |  处理 API 流量  | 
|  计划插件  |  任何已配置的节点  |  可使用调度器在任何节点上运行  | 

 以下注意事项对于企业部署至关重要：
+  确保所有相关节点保持相同的插件配置。
+  将外部客户端（Grafana、控制面板）路由到查询器节点。
+  确保插件在执行其触发器的节点上可用。

## 最佳实践
<a name="best-practices-influxdb3-1"></a>
+  **插件配置** 
  +  使用触发器参数设置可配置值，而非硬编码。
  +  在插件中实施正确的错误处理。
  +  使用 `influxdb3_local` API 进行数据库操作。
+  **性能优化** 
  +  对于繁重的处理任务，使用异步执行。
  +  对筛选后的数据实施提前返回。
  +  尽量减少插件内的数据库查询。
+  **错误管理** 
  +  选择适当的错误行为（日志记录、重试或禁用）。
  +  通过系统表监控插件执行情况。
  +  在生产部署之前，请对插件进行全面测试。
+  **安全注意事项** 
  +  验证 HTTP 请求插件中的所有输入数据。
  +  使用安全方法存储敏感配置。
  +  将插件权限限制为仅执行必需操作。

## 监控插件执行情况
<a name="monitoring-plugin-execution"></a>

 查询系统表以监控插件性能：

```
-- View processing engine logs
SELECT * FROM system.processing_engine_logs 
WHERE time > now() - INTERVAL '1 hour'
ORDER BY time DESC

-- Check trigger status
SELECT * FROM system.processing_engine_triggers
WHERE database = 'DATABASE_NAME'
```

 处理引擎提供一种扩展 InfluxDB 3 功能的强大方法，同时使数据处理逻辑接近数据，从而减少延迟并简化架构。

### InfluxData 经过认证的插件
<a name="influxdata-certified-plugins-1"></a>

 Amazon Timestream for InfluxDB 3 包含一整套经过认证的预构建插件，无需自定义开发即可扩展数据库功能。这些插件在启动时即可完全配置且随时可用，为数据处理、监控和警报提供高级功能。

 要获取完整的文档和源代码，请访问[InfluxData插件存储库](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata)。

## 可用插件
<a name="available-plugins"></a>

### LiveAnalytics 迁移插件的时间流
<a name="liveanalytics-migration-plugins"></a>

#### 将数据库从 Timestream 迁移到 InfluxDB 的 LiveAnalytics Timestream
<a name="laMigrationPlugin"></a>
+  **触发器类型**：HTTP 
+  **用例**：将时间序列数据库从 Timestream 迁移到 InfluxDB 的 LiveAnalytics Timestream。
+  **GitHub**: [ LiveAnalytics 迁移插件的时间流](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/python/liveanalytics_influxdb3_migration_plugin)文档 

 **工作原理**：用于迁移的 Timestream 插件与用于 LiveAnalytics 迁移的 [Timestream 客户端协同工作。 LiveAnalytics ](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/python/liveanalytics_influxdb3_migration_plugin/liveanalytics_migration_client)客户端执行 [Timestream for LiveAnalytics UNLOAD 命令](https://docs.aws.amazon.com/timestream/latest/developerguide/supported-sql-constructs.UNLOAD.html)，以 Parquet LiveAnalytics 格式将数据库导出到 S3 存储桶。导出数据后，客户端会为 Parquet 文件生成[预签名 URLs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html)，并使用预签名调用迁移插件。 URLs在插件执行期间，将从 S3 存储桶中检索 S3 对象并将其转换为 InfluxDB 线路协议，然后写入 InfluxDB 3 数据库。

 **最佳实践**： LiveAnalytics 迁移时间流插件必须在单个 InfluxDB 3 Enterprise 节点上运行。确保与插件客户端一起使用的 InfluxDB 3 端点是进程节点端点，而不是集群端点。执行迁移的集群不应在迁移插件运行时进行摄取或查询，因为这可能会导致内存不足错误。

 迁移性能取决于InfluxDB 3节点的可用资源以及要迁移的数据的特征。在我们的测试中，我们观察到每小时迁移 LiveAnalytics 记录的吞吐量为3000万条。您的实际表现可能会因多种因素而有所不同。

 **数据映射**：下表显示了如何将 LiveAnalytics 数据的 Timestream 映射到线路协议数据。


| 概念的时间 LiveAnalytics 流 | 行协议概念 | 
| --- | --- | 
| [表](https://docs.aws.amazon.com/timestream/latest/developerguide/API_Table.html) | [测量](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#measurement) | 
| [Dimensions](https://docs.aws.amazon.com/timestream/latest/developerguide/API_Dimension.html) | [标签](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#tag-set) | 
| [度量名称](https://docs.aws.amazon.com/timestream/latest/developerguide/data-modeling.html#data-modeling-measurenamemulti) | Tag | 
| [措施](https://docs.aws.amazon.com/timestream/latest/developerguide/API_MeasureValue.html) | [字段](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#field-set) | 
| [时间](https://docs.aws.amazon.com/timestream/latest/developerguide/writes.html#writes.data-types) | [Timestamp](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#timestamp) | 

 **单小节记录转换**：以下是表中 Timestream 中的单度量记录： LiveAnalytics `example_table`


| host | region | request\_id | measure\_name | 时间 | measure\_value::double | 
| --- | --- | --- | --- | --- | --- | 
| 主机 1 | us-west-2 | saio3242ovnfk | cpu\_usage | 2025-04-17 16:42:54.702 394001 | 0.66 | 

此记录将转换为：

```
example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001
```

 **多度量记录转换**：以下是 Timestream 中用于 LiveAnalytics 表`example_table`中的多度量记录，其右边的所有内容均为度`time`量：


| host | region | request\_id | measure\_name | 时间 | cpu\_usage | 内存使用情况 | 
| --- | --- | --- | --- | --- | --- | --- | 
| 主机 1 | us-west-2 | saio3242ovnfk | metrics | 2025-04-17 16:42:54.702 394001 | 0.66 | 0.21 | 

此记录将转换为：

```
example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
```

**重要**  
插件用于检索 S3 中 LiveAnalytics 数据的预签名 URL 在设置的到期日或用于生成它们的 IAM 证书到期（最长 7 天）时过期。我们建议在 EC2 实例上运行迁移客户端（实例就足够了），因为 EC2 实例会自动轮换 IAM 证书，从而消除了迁移期间的预签名 URL 时间限制。`t3.medium`如果您不使用 EC2 实例，则可以恢复迁移，大型数据集可能需要多次恢复调用。  
对于 LiveAnalytics 单个数据库中记录少于 10 亿条或 125GB 以下的迁移，建议使用 Timestream for Migration 插件。 LiveAnalytics   
迁移插件只能在群集中的单个进程节点上使用。您可以使用 [list-db-instances-for-cluster](https://docs.aws.amazon.com/cli/latest/reference/timestream-influxdb/list-db-instances-for-cluster.html) 并将设置为实例模式类型为的数据库实例的终端节点`INFLUXDB3_HOST_URL`来确定进程节点`PROCESS`，也可以使用 Timestream 控制台选择您的集群来查找流程节点。

 **主要特征**：
+  使用 UNLOAD 命令 LiveAnalytics 将时间序列数据从 Timestream 导出到 S3 存储桶。
+  URLs 为每个要迁移的 S3 对象生成预签名。
+  跟踪每个 S3 对象的迁移过程。
+  成功迁移后清理 S3 对象。
+  支持在预签名 URL 到期时恢复失败的迁移。

 **示例用法：**

```
# Migrate a LiveAnalytics database to InfluxDB 3
export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>"
export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>"
export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>"

aws s3api create-bucket --bucket <your S3 bucket name> \
    --object-lock-enabled-for-bucket --region <your region> \
    --create-bucket-configuration LocationConstraint=<your region>
```

**注意**  
使用自述文件中的示例存储桶策略更新 S3 存储桶策略。有关更多信息，请参阅[先决条件](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/python/liveanalytics_influxdb3_migration_plugin#prerequisites)。

```
python3 liveanalytics_influxdb3_migration_client.py \
    --live-analytics-database-name <your LiveAnalytics database name> \
    --s3-bucket-name <your S3 bucket name>
```

 **输出：** LiveAnalytics 数据库的时间流转换为线路协议并导入到 InfluxDB 3 数据库。

### 异常检测插件
<a name="anomaly-detection-plugins"></a>

#### 基于 MAD 的异常检测
<a name="madPlugin"></a>
+  **触发器类型**：数据写入（实时） 
+  **使用案例**：流数据的实时异常值检测、传感器监控、质量控制。
+  **GitHub**: [MAD 异常检测文档](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/mad_check) 

 **工作原理：**使用中位数绝对偏差 （MAD），为正常行为建立稳健的基线。随着新数据的到来，它会计算出每个点与中位数相差多少 MADs 点。超过阈值（k \* MAD）的点会标记为异常。

 **主要特征**：
+  数据写入时的实时处理。
+  保持内存中滑动窗口，以提高效率。
+  基于计数的警报（例如，连续 5 次异常）。
+  基于持续时间的警报（例如，持续 2 分钟的异常）。
+  禁用翻转功能，以防止快速变化的值引发警报疲劳。

 **示例用法：**

```
# Detect temperature anomalies in real-time
influxdb3 create trigger \
  --database sensors \
  --plugin-filename "mad_check/mad_check_plugin.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \
  temp_anomaly_detector

# Threshold format: field:k_multiplier:window_size:trigger_condition
# temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies
# humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly
```

 **输出：**在检测到异常时发送实时通知，包括字段名称、值和持续时间。

### 数据转换插件
<a name="data-transformation-plugins"></a>

#### 基本转换
<a name="basicdataPlugin"></a>
+  **触发器类型**：计划、数据写入 
+  **使用案例**：数据标准化、单位转换、字段名称标准化、数据清理。
+  **GitHub**: [基本转换文档](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/basic_transformation) 

 **工作原理：**对字段名称和值应用一系列转换操作。可批量（计划）处理历史数据，也可在数据到达时对其进行转换（数据写入）。转换按指定顺序应用，从而支持复杂的数据管道。

 **主要特征**：
+  字段名称转换：蛇形命名法、删除空格、仅限字母数字。
+  单位转换：温度、压力、长度、时间单位。
+  支持正则表达式的自定义字符串替换。
+  试运行模式，无需写入数据即可进行测试。
+  历史数据的批量处理。

 **示例用法：**

```
# Transform temperature data from Celsius to Fahrenheit with field name standardization
influxdb3 create trigger \
  --database weather \
  --plugin-filename "basic_transformation/basic_transformation.py" \
  --trigger-spec "every:30m" \
  --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \
  temp_converter

# Real-time field name cleaning for incoming sensor data
influxdb3 create trigger \
  --database iot \
  --plugin-filename "basic_transformation/basic_transformation.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \
  sensor_cleaner
```

 **输出：**使用转换后的数据创建新表，同时保留原始时间戳和标签。

#### 降低取样频率取样器
<a name="downsamplerPlugin"></a>
+  **触发器类型**：计划、HTTP 
+  **使用案例**：数据压缩、长期存储优化、创建摘要统计数据、性能提升。
+  **GitHub**: [向下采样器文档](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/downsampler) 

 **工作原理：**将高分辨率的时间序列数据聚合为低分辨率的摘要。例如，将 1 秒数据转换为 1 小时平均值。每个降低取样频率取样器点都包含有关原始压缩点数量和所覆盖时间范围的元数据。

 **主要特征**：
+  多种聚合函数：平均值、总和、最小值、最大值、中位数、导数。
+  特定于字段的聚合（不同字段使用不同函数）。
+  元数据跟踪（record\_count、time\_from、time\_to）。
+  HTTP API，用于通过回填进行按需缩减采样。
+  可配置的批量大小，适用于大型数据集。

 **示例用法：**

```
# Downsample CPU metrics from 10-second to hourly resolution
influxdb3 create trigger \
  --database metrics \
  --plugin-filename "downsampler/downsampler.py" \
  --trigger-spec "every:1h" \
  --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \
  cpu_downsampler

# HTTP endpoint for on-demand downsampling
curl -X POST http://localhost:8086/api/v3/engine/downsample \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -d '{
    "source_measurement": "sensor_data",
    "target_measurement": "sensor_daily",
    "interval": "1d",
    "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]],
    "backfill_start": "2024-01-01T00:00:00Z",
    "backfill_end": "2024-12-31T23:59:59Z"
  }'
```

 **输出：**使用聚合值以及显示压缩点数和时间范围的元数据列，创建缩减采样数据。

### 监控和警报插件
<a name="monitoring-and-alerting-plugins"></a>

#### 状态更改监控
<a name="statechangemonitorPlugin"></a>
+  **触发器类型**：计划、数据写入 
+  **使用案例**：状态监控、设备状态跟踪、过程监控、更改检测。
+  **GitHub**: [状态变更文档](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/state_change) 

 **工作原理：**跟踪字段值随时间推移而出现的更改，并在更改数量超过配置的阈值时发出警报。可检测值的更改情况（不同的值）和特定的值条件（等于目标值）。包括稳定性检查，以防止噪声信号触发警报。

 **主要特征**：
+  基于计数的更改检测（例如，十分钟内出现五次更改）。
+  基于持续时间的监控（例如，状态 = “错误”，持续五分钟）。
+  用于降噪的状态更改窗口。
+  具备独立阈值的多字段监控。
+  可配置的稳定性要求。

 **示例用法：**

```
# Monitor equipment status changes
influxdb3 create trigger \
  --database factory \
  --plugin-filename "state_change/state_change_check_plugin.py" \
  --trigger-spec "every:5m" \
  --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \
  equipment_monitor

# Real-time monitoring for specific state conditions
influxdb3 create trigger \
  --database systems \
  --plugin-filename "state_change/state_change_check_plugin.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \
  service_monitor
```

 **输出：**警报包括字段名称、检测到的更改数量、时间窗口以及相关标签值。

#### 系统指标收集器
<a name="systemMetricsCollectorPlugin"></a>
+  **触发器类型**：计划 
+  **使用案例**：基础设施监控、性能基准、容量规划、资源跟踪。
+  **GitHub**: [系统指标文档](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata/system_metrics) 

 **工作原理：**使用 psutil 库，从运行 InfluxDB 的主机收集全面的系统指标。以可配置的间隔收集 CPU、内存、磁盘和网络统计数据。每种指标类型均可独立启用/禁用。

 **主要特征**：
+  每核 CPU 统计数据，包含负载平均值。
+  内存使用情况，包含交换和页面错误。
+  包含计算的 IOPS 和延迟的磁盘 I/O 指标。
+  带错误跟踪的网络接口统计信息。
+  可配置的指标收集（启用/禁用特定类型）。
+  收集失败时自动重试。

 **示例用法：**

```
# Collect all system metrics every 30 seconds
influxdb3 create trigger \
  --database monitoring \
  --plugin-filename "system_metrics/system_metrics.py" \
  --trigger-spec "every:30s" \
  --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \
  system_monitor

# Focus on CPU and memory for application servers
influxdb3 create trigger \
  --database app_monitoring \
  --plugin-filename "system_metrics/system_metrics.py" \
  --trigger-spec "every:1m" \
  --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \
  app_metrics
```

 **输出：**创建多个表（system\_cpu、system\_memory、system\_disk\_io 等），包含每个子系统的详细指标。

## 常见配置模式
<a name="common-configuration-patterns"></a>

### 使用 TOML 配置文件
<a name="using-toml-configuration-files"></a>

 对于复杂配置，使用 TOML 文件而非内联参数：

```
# anomaly_config.toml
measurement = "server_metrics"
field = "cpu_usage"
window = "1h"
detector_type = "IsolationForestAD"
contamination = 0.1
window_size = 20
output_table = "cpu_anomalies"
senders = "slack"
slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK"
notification_text = "Anomaly detected in $field: value=$value at $timestamp"
```

```
# Use TOML configuration
PLUGIN_DIR=~/.plugins influxdb3 create trigger \
  --database monitoring \
  --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \
  --trigger-spec "every:10m" \
  --trigger-arguments "config_file_path=anomaly_config.toml" \
  cpu_anomaly_detector
```

### 串联插件
<a name="chaining-plugins"></a>

 通过串联多个插件创建数据处理管道：

```
# Step 1: Transform raw data
influxdb3 create trigger \
  --database pipeline \
  --plugin-filename "basic_transformation/basic_transformation.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \
  step1_transform

# Step 2: Downsample transformed data
influxdb3 create trigger \
  --database pipeline \
  --plugin-filename "downsampler/downsampler.py" \
  --trigger-spec "every:1h" \
  --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \
  step2_downsample

# Step 3: Detect anomalies in downsampled data
influxdb3 create trigger \
  --database pipeline \
  --plugin-filename "mad_check/mad_check_plugin.py" \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \
  step3_anomaly
```

## 插件最佳实践
<a name="best-practices-influxdb3-2"></a>
+  **保守起步**：先设定较高的阈值和较长的窗口期，再根据观察到的模式进行调整。
+  **开发阶段测试**：在生产部署之前，使用试运行模式和测试数据库。
+  **监控插件性能**：检查系统表中的执行时间和资源使用情况。
+  **使用适当的触发器类型**：选择计划触发器用于批量处理，选择数据写入触发器用于实时处理。
+  **明智地配置通知**：使用严重性级别和防抖逻辑，避免警报疲劳。
+  **利用模型持久性**：对于基于机器学习的插件，保存经过训练的模型以确保一致性。
+  **文档配置**：使用描述性触发器名称，并维护配置文档。

## 监控插件执行情况
<a name="monitoring-plugin-execution-1"></a>

要监控插件性能，请执行以下操作：

```
-- View plugin execution logs
SELECT 
  event_time,
  trigger_name,
  log_level,
  log_text
FROM system.processing_engine_logs 
WHERE trigger_name = 'your_trigger_name'
  AND time > now() - INTERVAL '1 hour'
ORDER BY event_time DESC;

-- Monitor plugin performance
SELECT 
  trigger_name,
  COUNT(*) as executions,
  AVG(execution_time_ms) as avg_time_ms,
  MAX(execution_time_ms) as max_time_ms,
  SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count
FROM system.processing_engine_logs
WHERE time > now() - INTERVAL '24 hours'
GROUP BY trigger_name;

-- Check trigger status
SELECT * FROM system.processing_engine_triggers
WHERE database = 'your_database';
```

## 常见问题故障排除
<a name="troubleshooting-common-issues"></a>

下表显示常见问题及可能的解决方案。


|  **问题**  |  **解决方案**  | 
| --- | --- | 
|  插件未触发  |  验证触发器是否已启用，检查 schedule/spec 语法  | 
|  缺少通知  |  确认已安装通知器插件，请检查 webhook URLs  | 
|  高内存使用量  |  缩减窗口大小，调整批处理间隔  | 
|  不正确的转换  |  使用试运行模式，验证字段名称及数据类型  | 
|  预测不准确  |  扩大训练数据窗口，调整季节性设置  | 
|  警报过多  |  增加触发次数、添加防抖持续时间、调整阈值  | 

 这些经过认证的插件为常见的时间序列数据处理需求提供企业就绪功能，既无需进行自定义开发，又通过全面的配置选项保持灵活性。访问[GitHub存储库](https://github.com/influxdata/influxdb3_plugins/tree/main/influxdata)以获取详细的文档、示例和更新。