View a markdown version of this page

Amplíe Timestream para InfluxDB con complementos para el motor de procesamiento - Amazon Timestream

Para obtener capacidades similares a las de Amazon Timestream, considere Amazon Timestream LiveAnalytics para InfluxDB. Ofrece una ingesta de datos simplificada y tiempos de respuesta a las consultas en milisegundos de un solo dígito para realizar análisis en tiempo real. Obtenga más información aquí.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Amplíe Timestream para InfluxDB con complementos para el motor de procesamiento

El motor de procesamiento es una máquina virtual Python integrada que se ejecuta dentro de la base de datos de InfluxDB 3 en Amazon Timestream. Está disponible en las ediciones Core y Enterprise. Le permite ampliar su base de datos con código Python personalizado que puede automatizar los flujos de trabajo, transformar datos y crear puntos finales de API personalizados.

El motor de procesamiento ejecuta complementos de Python en respuesta a eventos específicos de la base de datos:

  • Escritura de datos: procese y transforme los datos a medida que ingresan a la base de datos

  • Eventos programados: ejecute el código en intervalos definidos o en momentos específicos

  • Solicitudes HTTP: muestre los puntos de conexión de la API personalizados que ejecutan su código

El motor incluye una caché en memoria para administrar el estado entre las ejecuciones, lo que le permite crear aplicaciones con estado directamente en la base de datos.

InfluxData complementos certificados

En el momento del lanzamiento, InfluxDB 3 incluye un conjunto de complementos prediseñados y totalmente configurables certificados por: InfluxData

  • Transformación de datos: procese y enriquezca los datos de entrada

  • Alertas: envíe notificaciones en función de los umbrales de datos

  • Agregación: calcule las estadísticas a partir de datos de serie temporal

  • Supervisión del sistema: realice un seguimiento del uso de los recursos y de las métricas de estado

  • Integración: Conéctese a servicios externos y APIs

Estos complementos certificados están listos para usarse y se pueden configurar mediante argumentos de activación para cumplir con sus requisitos específicos.

Tipos de complementos y especificaciones de activación

Tipo de complemento Especificación de la activación Cuándo se ejecuta el complemento Casos de uso
Escritura de datos table:<TABLE_NAME>o all_tables Cuando los datos se escriben en tablas Transformación de datos, alertas y métricas derivadas
Programados every:<DURATION>o cron:<EXPRESSION> A intervalos específicos Agregación periódica, informes y comprobaciones de estado
Solicitud HTTP request:<REQUEST_PATH> Cuándo se reciben solicitudes HTTP Personalizados APIs, webhooks, interfaces de usuario

Crear desencadenadores de

Los activadores conectan los complementos a los eventos de la base de datos y definen cuándo se ejecutan. Utilice el comando influxdb3 create trigger.

Para crear un activador de escritura de datos:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

Para crear un activador programado:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

Para crear un activador de solicitudes HTTP:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

Acceda al punto de conexión en: https://your-cluster-endpoint:8086/api/v3/engine/webhook

Configurar activadores

Pasar argumentos a complementos

Configura el comportamiento del complemento mediante argumentos desencadenantes:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

Los argumentos se pasan al complemento como un diccionario:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

Comportamiento de gestión de errores

Configure cómo los desencadenadores gestionan los errores:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

Ejecución asíncrona

Permita que se ejecuten varias instancias de activación de forma simultánea:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

Administrar los activadores

Para ver los activadores de una base de datos:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

Exclusión de tablas para los activadores de escritura

Para filtrar las tablas del código de su complemento cuando utilice all_tables:

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

La implementación del complemento es la siguiente:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

Consideraciones acerca de la implementación distribuida

En las implementaciones de varios nodos, configure los complementos según las funciones de los nodos:

Tipo de complemento Node Type Motivo
Complementos de escritura de datos Nodos de ingesta Procese los datos en el punto de ingestión
Complementos de solicitud HTTP Nodos de consultante Gestionar el tráfico de la API
Complementos programados Cualquier nodo configurado Se puede ejecutar en cualquier nodo con programador

Las siguientes consideraciones son importantes para las implementaciones empresariales:

  • Mantenga configuraciones de complementos idénticas en todos los nodos relevantes.

  • Dirija los clientes externos (Grafana, paneles de control) a los nodos de consulta.

  • Asegúrese de que los complementos estén disponibles en los nodos donde se ejecutan sus activadores.

Prácticas recomendadas

  • Configuración de complementos

    • Utilice argumentos activadores para los valores configurables en lugar de codificarlos de forma rígida.

    • Implemente un manejo de errores adecuado en los complementos.

    • Use la API influxdb3_local para las operaciones de la base de datos.

  • Optimización del rendimiento

    • Utilice la ejecución asíncrona para tareas pesadas de procesamiento.

    • Implemente devoluciones anticipadas para los datos filtrados.

    • Minimice las consultas a las bases de datos en los complementos.

  • Administración de errores

    • Elija el comportamiento de error adecuado (registrar, volver a intentar o deshabilitar).

    • Supervise la ejecución de los complementos a través de las tablas del sistema.

    • Pruebe los complementos en profundidad antes de implementarlos en producción.

  • Consideraciones de seguridad

    • Valide todos los datos de entrada en los complementos de solicitud HTTP.

    • Utilice métodos seguros para almacenar la configuración confidencial.

    • Limite los permisos de los complementos únicamente a las operaciones requeridas.

Supervisión de la ejecución del complemento

Consulte las tablas del sistema para supervisar el rendimiento de los complementos:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

El motor de procesamiento proporciona una forma eficaz de ampliar la funcionalidad de InfluxDB 3 y, al mismo tiempo, mantener la lógica del procesamiento de datos cerca de los datos, lo que reduce la latencia y simplifica la arquitectura.

InfluxData complementos certificados

Amazon Timestream para InfluxDB 3 incluye un conjunto completo de complementos certificados y prediseñados que amplían la funcionalidad de la base de datos sin necesidad de desarrollo personalizado. Estos complementos son completamente configurables y están listos para usarse en el momento del lanzamiento, y ofrecen capacidades avanzadas para el procesamiento de datos, la supervisión y las alertas.

Para obtener la documentación completa y el código fuente, visita el repositorio de InfluxData complementos.

Complementos disponibles

Cronograma del complemento de migración LiveAnalytics

Migre una base de datos de Timestream para a Timestream para InfluxDB LiveAnalytics

Cómo funciona: El complemento Timestream para LiveAnalytics migración funciona al unísono con el cliente Timestream para migración. LiveAnalytics El cliente ejecuta un comando Timestream for LiveAnalytics UNLOAD para exportar una base de datos a un LiveAnalytics bucket de S3 en formato Parquet. Una vez exportados los datos, el cliente genera los archivos prefirmados URLs para Parquet e invoca el complemento de migración con los prefirmados. URLs Durante la ejecución del complemento, los objetos de S3 se recuperan del depósito de S3, se transforman en el protocolo de línea InfluxDB y se escriben en una base de datos de InfluxDB 3.

Prácticas recomendadas: El complemento Timestream for LiveAnalytics Migration debe ejecutarse en un único nodo de InfluxDB 3 Enterprise. Asegúrese de que el punto final de InfluxDB 3 utilizado con el cliente del complemento sea un punto final del nodo de proceso y no el punto final del clúster. El clúster que realiza la migración no debe realizar ingestas ni realizar consultas mientras el complemento de migración esté en ejecución, ya que esto podría provocar errores de falta de memoria.

El rendimiento de la migración depende de los recursos disponibles para el nodo InfluxDB 3 y de las características de los datos que se migran. En nuestras pruebas, observamos un rendimiento de 30 000 000 LiveAnalytics de registros migrados por hora. Su rendimiento real puede variar en función de varios factores.

Mapeo de datos: en la siguiente tabla se muestra cómo se asigna el flujo temporal de los LiveAnalytics datos a los datos del protocolo de línea.

Flujo temporal de Concept LiveAnalytics Concepto de protocolo de línea
Tabla Medición
Dimensiones Etiquetas
Nombre de la medida Tag
Medidas Campos
Tiempo Timestamp

Transformación de registros de una sola medida: el siguiente es un registro de una sola medida en Timestream para LiveAnalytics la tabla: example_table

host region request_id measure_name hora measure_value::double
host1 us-west-2 saio3242ovnfk cpu_usage 2025-04-17 16:42:54 .702 394001 0,66

Este registro se transformará en:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001

Transformación de registros de múltiples medidas: el siguiente es un registro de múltiples medidas en Timestream para que esté LiveAnalytics en la tabla example_table con todo lo que esté a la derecha de las medidas: time

host region request_id measure_name hora cpu_usage uso_de_memoria
host1 us-west-2 saio3242ovnfk métricas 2025-04-17 16:42:54 .702 394001 0,66 0,21

Este registro se transformará en:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
importante

Las URL prefirmadas que el complemento utiliza para recuperar los LiveAnalytics datos en S3 caducan cuando caducan las credenciales de IAM utilizadas para generarlas (máximo 7 días). Recomendamos ejecutar el cliente de migración en una instancia de EC2 (una instancia es suficiente), ya que la t3.medium instancia de EC2 rota las credenciales de IAM automáticamente, lo que elimina las restricciones de tiempo de la URL prefirmada durante la migración. Si no utiliza una instancia EC2, las migraciones se pueden reanudar y los conjuntos de datos grandes pueden requerir varias invocaciones de reanudación.

El complemento Timestream para LiveAnalytics migración se recomienda para migraciones con menos de mil millones de registros o 125 GB en una sola base de datos. LiveAnalytics

El complemento de migración solo debe usarse en un único nodo de proceso del clúster. Puede determinar el nodo de proceso utilizando list-db-instances-for-cluster y configurándolo en el INFLUXDB3_HOST_URL punto final de una de las instancias de base de datos que tenga un modo de instancia de tipo instanciaPROCESS, o puede usar la consola Timestream y seleccionar su clúster para buscar el nodo de proceso.

Características principales:

  • Exporta los datos de series temporales de Timestream for LiveAnalytics a un bucket de S3 mediante el comando UNLOAD.

  • Genera un objeto prefirmado URLs para cada objeto de S3 que se está migrando.

  • Realiza un seguimiento del proceso de migración de cada objeto de S3.

  • Limpia los objetos de S3 tras una migración satisfactoria.

  • Permite reanudar una migración fallida en caso de que caduquen las URL prefirmadas.

Ejemplo de uso:

# Migrate a LiveAnalytics database to InfluxDB 3 export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>" export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>" export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>" aws s3api create-bucket --bucket <your S3 bucket name> \ --object-lock-enabled-for-bucket --region <your region> \ --create-bucket-configuration LocationConstraint=<your region>
nota

Actualice la política de bucket de S3 con la política de bucket de ejemplo en el archivo README. Para más información, consulte Requisitos previos.

python3 liveanalytics_influxdb3_migration_client.py \ --live-analytics-database-name <your LiveAnalytics database name> \ --s3-bucket-name <your S3 bucket name>

Resultado: El flujo temporal de la LiveAnalytics base de datos se transforma en un protocolo de línea y se ingiere en la base de datos InfluxDB 3.

Complementos de detección de anomalías

Detección de anomalías basada en MAD

  • Tipos de activadores: escritura de datos (en tiempo real)

  • Casos de uso: detección de valores atípicos en tiempo real para la transmisión de datos, supervisión de sensores y control de calidad.

  • GitHub: Documentación de detección de anomalías en MAD

Cómo funciona: utiliza la desviación absoluta media (MAD) para establecer una línea de base sólida para el comportamiento normal. A medida que llegan nuevos datos, calcula a qué MADs distancia de la mediana se encuentra cada punto. Los puntos que superan el umbral (k * MAD) se marcan como anomalías.

Características principales:

  • Procesamiento en tiempo real a medida que se escriben los datos.

  • Mantiene las ventanas deslizantes en la memoria para aumentar la eficiencia.

  • Alertas basadas en recuentos (por ejemplo, 5 anomalías consecutivas).

  • Alertas basadas en la duración (por ejemplo, una anomalía durante 2 minutos).

  • Supresión de giro para evitar que las alertas se cansen debido a los cambios rápidos de los valores.

Ejemplo de uso:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

Resultado: envía notificaciones en tiempo real cuando se detectan anomalías, incluidos el nombre, el valor y la duración del campo.

Complementos de transformación de datos

Transformación básica

Cómo funciona: aplica una cadena de transformaciones a los nombres y valores de los campos. Puede procesar datos históricos en lotes (programados) o transformarlos a medida que llegan (escritura de datos). Las transformaciones se aplican en el orden especificado, lo que permite canalizaciones de datos complejas.

Características principales:

  • Transformaciones de nombres de campo: snake_case, eliminar espacios, solo alfanuméricos.

  • Conversiones de unidades: unidades de temperatura, presión, longitud y tiempo.

  • Sustituciones de cadenas personalizadas con soporte para expresiones regulares.

  • Modo de ejecución en seco para realizar pruebas sin escribir datos.

  • Procesamiento por lotes de datos históricos.

Ejemplo de uso:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

Resultado: crea una nueva tabla con datos transformados, conservando las marcas de tiempo y las etiquetas originales.

Descartar muestras

  • Tipos de activadores: HTTP programado

  • Casos de uso: reducción de datos, optimización del almacenamiento a largo plazo, creación de estadísticas resumidas, mejora del rendimiento.

  • GitHub: Documentación de Downsampler

Cómo funciona: agrega datos de serie temporal de alta resolución en resúmenes de menor resolución. Por ejemplo, convierte datos de 1 segundo en promedios de 1 hora. Cada punto reducido de muestreo incluye metadatos sobre el número de puntos originales comprimidos y el intervalo de tiempo cubierto.

Características principales:

  • Múltiples funciones de agregación: media, suma, mínima, máxima, mediana y derivada.

  • Agregaciones específicas de campos (diferentes funciones para diferentes campos).

  • Seguimiento de metadatos (record_count, time_from, time_to).

  • API de HTTP para reducción de muestreo bajo demanda con relleno.

  • Tamaños de lote configurables para conjuntos de datos de gran tamaño.

Ejemplo de uso:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

Resultado: crea datos reducidos con valores agregados y columnas de metadatos que muestran el número de puntos comprimidos y el intervalo de tiempo.

Complementos de supervisión y alertas

Supervisión de los cambios de estado

  • Tipos de activadores: escritura de datos programada

  • Casos de uso: supervisión del estado, seguimiento del estado del equipo, supervisión de procesos, detección de cambios.

  • GitHub: Documentación de cambio de estado

Cómo funciona: realiza un seguimiento de los cambios en el valor de los campos a lo largo del tiempo y alerta cuando el número de cambios supera los umbrales configurados. Puede detectar tanto los cambios de valor (valores diferentes) como las condiciones de valores específicas (equivale a un valor objetivo). Incluye controles de estabilidad para evitar alertas por señales ruidosas.

Características principales:

  • Detección de cambios basada en el recuento (por ejemplo, cinco cambios en diez minutos).

  • Supervisión basada en la duración (por ejemplo, estado = “error” durante cinco minutos).

  • Ventana de cambio de estado para reducir el ruido.

  • Supervisión de varios campos con umbrales independientes.

  • Requisitos de estabilidad configurables.

Ejemplo de uso:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

Resultado: las alertas incluyen el nombre del campo, el número de cambios detectados, el intervalo de tiempo y los valores de etiqueta relevantes.

Recopilador de métricas del sistema

  • Tipos de activadores: programados

  • Casos de uso: supervisión de la infraestructura, bases de referencia de rendimiento, planificación de la capacidad, seguimiento de los recursos.

  • GitHub: Documentación sobre las métricas del sistema

Cómo funciona: utiliza la biblioteca psutil para recopilar métricas completas del sistema del host que ejecuta InfluxDB. Recopilación de estadísticas de CPU, memoria, disco y red en intervalos configurables. Cada tipo de métrica se puede activar o desactivar de forma independiente.

Características principales:

  • Estadísticas de CPU por núcleo con promedios de carga.

  • Uso de memoria, incluidos los errores de intercambio y de página.

  • I/O Métricas de disco con IOPS y latencia calculadas.

  • Estadísticas de la interfaz de red con seguimiento de errores.

  • Recopilación de métricas configurable (activar/desactivar tipos específicos).

  • Reintento automático en caso de errores de recopilación.

Ejemplo de uso:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

Resultado: crea varias tablas (system_cpu, system_memory, system_disk_io, etc.) con métricas detalladas para cada subsistema.

Patrones de configuración frecuentes

Uso de archivos de configuración TOML

Para configuraciones complejas, utilice archivos TOML en lugar de argumentos en línea:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

Encadenamiento de complementos

Cree canalizaciones de procesamiento de datos encadenando varios complementos:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

Prácticas recomendadas para los complementos

  • Comience de forma conservadora: comience con umbrales más altos y ventanas más largas, luego ajústelos en función de los patrones observados.

  • Pruebe durante el desarrollo: utilice los modos de ejecución en seco y pruebe las bases de datos antes de la implementación en producción.

  • Supervise el rendimiento de los complementos: compruebe los tiempos de ejecución y el uso de recursos en las tablas del sistema.

  • Utilice los tipos de activadores adecuados: elija una programación para el procesamiento por lotes y la escritura de datos para tiempo real.

  • Configure las notificaciones con prudencia: utilice los niveles de gravedad y la lógica de rechazo para evitar que se agoten las alertas.

  • Aproveche la persistencia de los modelos: en el caso de los complementos basados en ML, guarde los modelos entrenados para mantener la coherencia.

  • Documente las configuraciones: utilice nombres descriptivos de los activadores y conserve la documentación de configuración.

Supervisión de la ejecución del complemento

Para supervisar el rendimiento del complemento:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

Solución de problemas comunes

En la siguiente tabla, se muestran los problemas más comunes y las posibles soluciones.

Problema Solución
Complemento no activado Compruebe que el disparador esté activado, compruebe schedule/spec la sintaxis
Notificaciones que faltan Confirma que el plugin Notifier está instalado y comprueba el webhook URLs
Uso alto de memoria Reduzca el tamaño de las ventanas y ajuste los intervalos de procesamiento por lotes
Transformaciones incorrectas Utilice el modo de ejecución en seco, compruebe los nombres de los campos y los tipos de datos
Imprecisión de previsión Aumente la ventana de datos de entrenamiento, ajuste la configuración de estacionalidad
Demasiadas alertas Aumente el número de activadores, añada la duración de los rebotes y ajuste los umbrales

Estos complementos certificados proporcionan una funcionalidad apta para la empresa para las necesidades habituales de procesamiento de datos de serie temporal, lo que elimina la necesidad de un desarrollo personalizado y, al mismo tiempo, mantiene la flexibilidad mediante amplias opciones de configuración. Visita el GitHubrepositorio para ver documentación detallada, ejemplos y actualizaciones.