View a markdown version of this page

Extend Timestream for InfluxDB with processing engine plugins - Amazon Timestream

For similar capabilities to Amazon Timestream for LiveAnalytics, consider Amazon Timestream for InfluxDB. It offers simplified data ingestion and single-digit millisecond query response times for real-time analytics. Learn more here.

Extend Timestream for InfluxDB with processing engine plugins

The Processing engine is an embedded Python virtual machine that runs inside your InfluxDB 3 database in Amazon Timestream. It's available in both Core and Enterprise editions. It enables you to extend your database with custom Python code that can automate workflows, transform data, and create custom API endpoints.

The Processing engine executes Python plugins in response to specific database events:

  • Data writes: Process and transform data as it enters the database

  • Scheduled events: Run code at defined intervals or specific times

  • HTTP requests: Expose custom API endpoints that execute your code

The engine includes an in-memory cache for managing state between executions, enabling you to build stateful applications directly within your database.

InfluxData certified plugins

At launch, InfluxDB 3 includes a set of pre-built, fully configurable plugins certified by InfluxData:

  • Data transformation: Process and enrich incoming data

  • Alerting: Send notifications based on data thresholds

  • Aggregation: Calculate statistics on time-series data

  • System monitoring: Track resource usage and health metrics

  • Integration: Connect to external services and APIs

These certified plugins are ready to use and can be configured through trigger arguments to meet your specific requirements.

Plugin types and trigger specifications

Plugin Type Trigger Specification When Plugin Runs Use Cases
Data write table:<TABLE_NAME> or all_tables When data is written to tables Data transformation, alerting, derived metrics
Scheduled every:<DURATION> or cron:<EXPRESSION> At specified intervals Periodic aggregation, reports, health checks
HTTP request request:<REQUEST_PATH> When HTTP requests are received Custom APIs, webhooks, user interfaces

Create triggers

Triggers connect plugins to database events and define when they execute. Use the influxdb3 create trigger command.

To create a data write trigger:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

To create a scheduled trigger:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

To create an HTTP request trigger:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

Access the endpoint at: https://your-cluster-endpoint:8086/api/v3/engine/webhook

Configure triggers

Passing arguments to plugins

Configure plugin behavior using trigger arguments:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

Arguments are passed to the plugin as a dictionary:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

Error handling behavior

Configure how triggers handle errors:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

Asynchronous execution

Allow multiple trigger instances to run simultaneously:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

Manage triggers

To view triggers for a database:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

Table exclusion for write triggers

To filter tables within your plugin code when using all_tables:

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

The plugin implementation is as follows:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

Distributed deployment considerations

In multi-node deployments, configure plugins based on node roles:

Plugin Type Node Type Reason
Data write plugins Ingester nodes Process data at ingestion point
HTTP request plugins Querier nodes Handle API traffic
Scheduled plugins Any configured node Can run on any node with scheduler

The following considerations are important for enterprise deployments:

  • Maintain identical plugin configurations across all relevant nodes.

  • Route external clients (Grafana, dashboards) to querier nodes.

  • Ensure plugins are available on nodes where their triggers execute.

Best practices

  • Plugin configuration

    • Use trigger arguments for configurable values instead of hardcoding.

    • Implement proper error handling within plugins.

    • Use the influxdb3_local API for database operations.

  • Performance optimization

    • Use asynchronous execution for heavy processing tasks.

    • Implement early returns for filtered data.

    • Minimize database queries within plugins.

  • Error management

    • Choose appropriate error behavior (log, retry, or disable).

    • Monitor plugin execution through system tables.

    • Test plugins thoroughly before production deployment.

  • Security considerations

    • Validate all input data in HTTP request plugins.

    • Use secure methods for storing sensitive configuration.

    • Limit plugin permissions to required operations only.

Monitor plugin execution

Query system tables to monitor plugin performance:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

The Processing engine provides a powerful way to extend InfluxDB 3 functionality while keeping your data processing logic close to your data, reducing latency and simplifying your architecture.

InfluxData certified plugins

Amazon Timestream for InfluxDB 3 includes a comprehensive set of pre-built, certified plugins that extend database functionality without requiring custom development. These plugins are fully configurable and ready to use at launch, providing advanced capabilities for data processing, monitoring, and alerting.

For complete documentation and source code, visit the InfluxData Plugins Repository.

Available plugins

Timestream for LiveAnalytics migration plugin

Migrate a database from Timestream for LiveAnalytics to Timestream for InfluxDB

How it works: The Timestream for LiveAnalytics migration plugin works in unison with the Timestream for LiveAnalytics migration client. The client executes a Timestream for LiveAnalytics UNLOAD command to export a LiveAnalytics database to an S3 bucket in Parquet format. After the data is exported the client generates presigned URLs for the Parquet files, and invokes the migration plugin with the presigned URLs. During plugin execution the S3 objects are retrieved from the S3 bucket and transformed to InfluxDB line protocol, and written to an InfluxDB 3 database.

Best practices: The Timestream for LiveAnalytics migration plugin must be run on a single InfluxDB 3 Enterprise node. Ensure the InfluxDB 3 endpoint used with the plugin client is a process node endpoint rather than the cluster endpoint. The cluster performing the migration should not undertake ingestion or queries while the migration plugin is running, as this could lead to out of memory errors.

Migration performance depends on the resources available to the InfluxDB 3 node and the characteristics of the data being migrated. In our testing, we observed a throughput of 30,000,000 LiveAnalytics records migrated per hour. Your actual performance may vary based on several factors.

Data mapping: The following table shows how Timestream for LiveAnalytics data is mapped to line protocol data.

Timestream for LiveAnalytics Concept Line Protocol Concept
Table Measurement
Dimensions Tags
Measure name Tag
Measures Fields
Time Timestamp

Single-measure record transformation: The following is a single-measure record in Timestream for LiveAnalytics in the table example_table:

host region request_id measure_name time measure_value::double
host1 us-west-2 saio3242ovnfk cpu_usage 2025-04-17 16:42:54.702394001 0.66

This record will be transformed to:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001

Multi-measure record transformation: The following is a multi-measure record in Timestream for LiveAnalytics in the table example_table with everything to the right of time being measures:

host region request_id measure_name time cpu_usage memory_usage
host1 us-west-2 saio3242ovnfk metrics 2025-04-17 16:42:54.702394001 0.66 0.21

This record will be transformed to:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
Important

The presigned URLs that the plugin uses to retrieve the LiveAnalytics data in S3 expire when either their set expiration occurs or the IAM credentials used to generate them expire (maximum 7 days). We recommend running the migration client on an EC2 instance (a t3.medium instance is sufficient) because the EC2 instance rotates IAM credentials automatically, removing the presigned URL time constraints during the migration. If you do not use an EC2 instance, migrations can be resumed and large datasets may require multiple resume invocations.

The Timestream for LiveAnalytics migration plugin is recommended for migrations with under 1 billion records or 125GB within a single LiveAnalytics database.

The migration plugin should only be used on a single process node in the cluster. You can determine the process node by using list-db-instances-for-cluster and setting the INFLUXDB3_HOST_URL to the endpoint of one of the database instances that has an instance mode of type PROCESS, or you can use the Timestream console and select your cluster to find the process node.

Key features:

  • Exports the time series data from Timestream for LiveAnalytics to an S3 bucket using the UNLOAD command.

  • Generates presigned URLs for each S3 object being migrated.

  • Tracks the migration process for each S3 object.

  • Cleans up S3 objects after a successful migration.

  • Supports resuming a failed migration in the event of presigned URL expiration.

Example usage:

# Migrate a LiveAnalytics database to InfluxDB 3 export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>" export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>" export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>" aws s3api create-bucket --bucket <your S3 bucket name> \ --object-lock-enabled-for-bucket --region <your region> \ --create-bucket-configuration LocationConstraint=<your region>
Note

Update the S3 bucket policy with the example bucket policy in the README. For more information, see Prerequisites.

python3 liveanalytics_influxdb3_migration_client.py \ --live-analytics-database-name <your LiveAnalytics database name> \ --s3-bucket-name <your S3 bucket name>

Output: Timestream for LiveAnalytics database is transformed to line protocol and Ingested to InfluxDB 3 database.

Anomaly detection plugins

MAD-based anomaly detection

  • Trigger types: Data write (real-time)

  • Use cases: Real-time outlier detection for streaming data, sensor monitoring, quality control.

  • GitHubMAD Anomaly Detection Documentation

How it works: Uses Median Absolute Deviation (MAD) to establish a robust baseline for normal behavior. As new data arrives, it calculates how many MADs away from the median each point is. Points exceeding the threshold (k * MAD) are flagged as anomalies.

Key features:

  • Real-time processing as data is written.

  • Maintains in-memory sliding windows for efficiency.

  • Count-based alerts (e.g., 5 consecutive anomalies).

  • Duration-based alerts (e.g., anomaly for 2 minutes).

  • Flip suppression to prevent alert fatigue from rapidly changing values.

Example usage:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

Output: Sends real-time notifications when anomalies are detected, including the field name, value, and duration.

Data transformation plugins

Basic transformation

  • Trigger types: Scheduled, Data write

  • Use cases: Data standardization, unit conversions, field name normalization, data cleaning.

  • GitHubBasic Transformation Documentation

How it works: Applies a chain of transformations to field names and values. Can process historical data in batches (scheduled) or transform data as it arrives (data write). Transformations are applied in the order specified, allowing complex data pipelines.

Key features:

  • Field name transformations: snake_case, remove spaces, alphanumeric only.

  • Unit conversions: Temperature, pressure, length, time units.

  • Custom string replacements with regex support.

  • Dry-run mode for testing without writing data.

  • Batch processing for historical data.

Example usage:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

Output: Creates a new table with transformed data, preserving original timestamps and tags.

Downsampler

  • Trigger types: Scheduled, HTTP

  • Use cases: Data reduction, long-term storage optimization, creating summary statistics, performance improvement.

  • GitHubDownsampler Documentation

How it works: Aggregates high-resolution time-series data into lower-resolution summaries. For example, converts 1-second data into 1-hour averages. Each downsampled point includes metadata about the number of original points compressed and the time range covered.

Key features:

  • Multiple aggregation functions: avg, sum, min, max, median, derivative.

  • Field-specific aggregations (different functions for different fields).

  • Metadata tracking (record_count, time_from, time_to).

  • HTTP API for on-demand downsampling with backfill.

  • Configurable batch sizes for large datasets.

Example usage:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

Output: Creates downsampled data with aggregated values plus metadata columns showing the number of points compressed and time range.

Monitoring and alerting plugins

State change monitor

  • Trigger types: Scheduled, Data write

  • Use cases: Status monitoring, equipment state tracking, process monitoring, change detection.

  • GitHubState Change Documentation

How it works: Tracks field value changes over time and alerts when the number of changes exceeds configured thresholds. Can detect both value changes (different values) and specific value conditions (equals a target value). Includes stability checks to prevent alerts from noisy signals.

Key features:

  • Count-based change detection (e.g., five changes in ten minutes).

  • Duration-based monitoring (e.g., status = "error" for five minutes).

  • State change window for noise reduction.

  • Multi-field monitoring with independent thresholds.

  • Configurable stability requirements.

Example usage:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

Output: Alerts include the field name, number of changes detected, time window, and relevant tag values.

System metrics collector

  • Trigger types: Scheduled

  • Use cases: Infrastructure monitoring, performance baselines, capacity planning, resource tracking.

  • GitHubSystem Metrics Documentation

How it works: Uses the psutil library to collect comprehensive system metrics from the host running InfluxDB. Collects CPU, memory, disk, and network statistics at configurable intervals. Each metric type can be enabled/disabled independently.

Key features:

  • Per-core CPU statistics with load averages.

  • Memory usage including swap and page faults.

  • Disk I/O metrics with calculated IOPS and latency.

  • Network interface statistics with error tracking.

  • Configurable metric collection (enable/disable specific types).

  • Automatic retry on collection failures.

Example usage:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

Output: Creates multiple tables (system_cpu, system_memory, system_disk_io, etc.) with detailed metrics for each subsystem.

Common configuration patterns

Using TOML configuration files

For complex configurations, use TOML files instead of inline arguments:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

Chaining plugins

Create data processing pipelines by chaining multiple plugins:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

Best practices for plugins

  • Start conservative – Begin with higher thresholds and longer windows, then adjust based on observed patterns.

  • Test in development – Use dry-run modes and test databases before production deployment.

  • Monitor plugin performance – Check execution times and resource usage in system tables.

  • Use appropriate trigger types – Choose scheduled for batch processing, data write for real-time.

  • Configure notifications wisely – Use severity levels and debounce logic to prevent alert fatigue.

  • Leverage model persistence – For ML-based plugins, save trained models for consistency.

  • Document configurations – Use descriptive trigger names and maintain configuration documentation.

Monitor plugin execution

To monitor plugin performance:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

Troubleshoot common issues

The following table shows common issues and possible solutions.

Issue Solution
Plugin not triggering Verify trigger is enabled, check schedule/spec syntax
Missing notifications Confirm Notifier plugin installed, check webhook URLs
High memory usage Reduce window sizes, adjust batch processing intervals
Incorrect transformations Use dry-run mode, verify field names and data types
Forecast inaccuracy Increase training data window, adjust seasonality settings
Too many alerts Increase trigger counts, add debounce duration, adjust thresholds

These certified plugins provide enterprise-ready functionality for common time-series data processing needs, eliminating the need for custom development while maintaining flexibility through comprehensive configuration options. Visit the GitHub repository for detailed documentation, examples, and updates.