View a markdown version of this page

Estendi Timestream per InfluxDB con i plugin del motore di elaborazione - Amazon Timestream

Per funzionalità simili a Amazon Timestream for, prendi in considerazione Amazon Timestream LiveAnalytics per InfluxDB. Offre un'acquisizione semplificata dei dati e tempi di risposta alle query di una sola cifra di millisecondi per analisi in tempo reale. Scopri di più qui.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Estendi Timestream per InfluxDB con i plugin del motore di elaborazione

Il motore di elaborazione è una macchina virtuale Python incorporata che viene eseguita all'interno del database InfluxDB 3 in Amazon Timestream. È disponibile nelle edizioni Core ed Enterprise. Consente di estendere il database con codice Python personalizzato in grado di automatizzare i flussi di lavoro, trasformare i dati e creare endpoint API personalizzati.

Il motore di elaborazione esegue i plugin Python in risposta a eventi specifici del database:

  • Scrittura dei dati: elabora e trasforma i dati man mano che entrano nel database

  • Eventi pianificati: esegui il codice a intervalli definiti o orari specifici

  • Richieste HTTP: esponete gli endpoint API personalizzati che eseguono il codice

Il motore include una cache in memoria per la gestione dello stato tra le esecuzioni, che consente di creare applicazioni con stato direttamente all'interno del database.

InfluxData plugin certificati

Al momento del lancio, InfluxDB 3 include un set di plugin predefiniti e completamente configurabili certificati da: InfluxData

  • Trasformazione dei dati: elabora e arricchisci i dati in arrivo

  • Avvisi: invia notifiche in base alle soglie di dati

  • Aggregazione: calcola le statistiche sui dati delle serie temporali

  • Monitoraggio del sistema: monitora l'utilizzo delle risorse e le metriche sanitarie

  • Integrazione: Connect a servizi esterni e APIs

Questi plugin certificati sono pronti all'uso e possono essere configurati tramite argomenti di attivazione per soddisfare requisiti specifici.

Tipi di plug-in e specifiche dei trigger

Tipo di plugin Specificazione del trigger Quando viene eseguito il plugin Casi d'uso
Scrittura dei dati table:<TABLE_NAME>o all_tables Quando i dati vengono scritti su tabelle Trasformazione dei dati, avvisi, metriche derivate
Pianificato every:<DURATION>o cron:<EXPRESSION> A intervalli specificati Aggregazione periodica, report, controlli sanitari
Richiesta HTTP request:<REQUEST_PATH> Quando vengono ricevute richieste HTTP Webhook personalizzati APIs, interfacce utente

Creazione di trigger

I trigger collegano i plugin agli eventi del database e definiscono quando vengono eseguiti. Utilizza il comando influxdb3 create trigger.

Per creare un trigger di scrittura dei dati:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

Per creare un trigger pianificato:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

Per creare un trigger di richiesta HTTP:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

Accedi all'endpoint all'indirizzo: https://your-cluster-endpoint:8086/api/v3/engine/webhook

Configura i trigger

Passare argomenti ai plugin

Configura il comportamento del plugin usando gli argomenti di attivazione:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

Gli argomenti vengono passati al plugin come dizionario:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

Comportamento di gestione degli errori

Configura il modo in cui i trigger gestiscono gli errori:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

Esecuzione asincrona

Consenti l'esecuzione simultanea di più istanze trigger:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

Gestisci i trigger

Per visualizzare i trigger per un database:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

Esclusione dalla tabella per i trigger di scrittura

Per filtrare le tabelle all'interno del codice del plugin quando si utilizza: all_tables

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

L'implementazione del plugin è la seguente:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

Considerazioni sulla distribuzione distribuita

Nelle distribuzioni multinodo, configura i plugin in base ai ruoli dei nodi:

Tipo di plugin Tipo di nodo Motivo
Plugin per la scrittura dei dati Nodi di ingestione Elabora i dati al punto di ingestione
Plugin di richiesta HTTP Nodi Querier Gestisci il traffico API
Plugin pianificati Qualsiasi nodo configurato Può essere eseguito su qualsiasi nodo con scheduler

Le seguenti considerazioni sono importanti per le implementazioni aziendali:

  • Mantieni configurazioni di plug-in identiche su tutti i nodi pertinenti.

  • Indirizza i client esterni (Grafana, dashboard) ai nodi di interrogazione.

  • Assicurati che i plugin siano disponibili sui nodi in cui vengono eseguiti i relativi trigger.

Best practice

  • Configurazione del plugin

    • Utilizza gli argomenti di attivazione per i valori configurabili anziché la codifica rigida.

    • Implementa una corretta gestione degli errori all'interno dei plugin.

    • Usa l'influxdb3_localAPI per le operazioni del database.

  • Ottimizzazione delle prestazioni

    • Utilizza l'esecuzione asincrona per attività di elaborazione pesanti.

    • Implementa i rendimenti anticipati per i dati filtrati.

    • Riduci al minimo le interrogazioni al database all'interno dei plugin.

  • Gestione degli errori

    • Scegli il comportamento di errore appropriato (registra, riprova o disabilita).

    • Monitora l'esecuzione dei plugin tramite le tabelle di sistema.

    • Testa a fondo i plugin prima dell'implementazione in produzione.

  • Considerazioni sulla sicurezza

    • Convalida tutti i dati di input nei plugin di richiesta HTTP.

    • Utilizza metodi sicuri per archiviare configurazioni sensibili.

    • Limita le autorizzazioni dei plug-in solo alle operazioni richieste.

Monitora l'esecuzione dei plugin

Interroga le tabelle di sistema per monitorare le prestazioni dei plugin:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

Il motore di elaborazione offre un modo efficace per estendere la funzionalità di InfluxDB 3 mantenendo la logica di elaborazione dei dati vicina ai dati, riducendo la latenza e semplificando l'architettura.

InfluxData plugin certificati

Amazon Timestream for InfluxDB 3 include un set completo di plugin certificati predefiniti che estendono le funzionalità del database senza richiedere uno sviluppo personalizzato. Questi plugin sono completamente configurabili e pronti all'uso al momento del lancio e offrono funzionalità avanzate per l'elaborazione dei dati, il monitoraggio e l'invio di avvisi.

Per la documentazione completa e il codice sorgente, visita il Plugins Repository. InfluxData

Plugin disponibili

Timestream per il plugin di migrazione LiveAnalytics

Migra un database da Timestream for a Timestream per InfluxDB LiveAnalytics

Come funziona: il plugin Timestream for LiveAnalytics migration funziona all'unisono con il client Timestream for migration. LiveAnalytics Il client esegue un comando Timestream for LiveAnalytics UNLOAD per esportare un database in un LiveAnalytics bucket S3 in formato Parquet. Dopo l'esportazione dei dati, il client genera i file predefiniti URLs per Parquet e richiama il plug-in di migrazione con i file prefirmati. URLs Durante l'esecuzione del plugin, gli oggetti S3 vengono recuperati dal bucket S3 e trasformati nel protocollo di linea InfluxDB e scritti in un database InfluxDB 3.

Migliori pratiche: il plug-in Timestream for LiveAnalytics migration deve essere eseguito su un singolo nodo InfluxDB 3 Enterprise. Assicurati che l'endpoint InfluxDB 3 utilizzato con il client del plug-in sia un endpoint del nodo di processo anziché l'endpoint del cluster. Il cluster che esegue la migrazione non deve eseguire operazioni di acquisizione o interrogazione mentre il plug-in di migrazione è in esecuzione, poiché ciò potrebbe causare errori di esaurimento della memoria.

Le prestazioni di migrazione dipendono dalle risorse disponibili per il nodo InfluxDB 3 e dalle caratteristiche dei dati da migrare. Nei nostri test, abbiamo osservato un throughput di 30.000.000 LiveAnalytics di record migrati all'ora. Le prestazioni effettive possono variare in base a diversi fattori.

Mappatura dei dati: la tabella seguente mostra come Timestream for LiveAnalytics data viene mappato ai dati del protocollo di linea.

Timestream for Concept LiveAnalytics Concetto di protocollo di linea
Tabella Misurazione
Dimensioni Tag
Nome della misura Tag
Misure Campi
Time (Orario) Time stamp

Trasformazione dei record a misura singola: Di seguito è riportato un record a misura singola in Timestream per la LiveAnalytics tabella: example_table

host region request_id measure_name time measure_value::double
host 1 us-west-2 saio3242ovnfk cpu_usage 2025-04-17 16:42:54,702 394001 0,66

Questo record verrà trasformato in:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001

Trasformazione di record con più misure: quello che segue è un record con più misure in Timestream for LiveAnalytics nella tabella example_table con tutto a destra di essere misurato: time

host region request_id measure_name time cpu_usage uso_memoria
host 1 us-west-2 saio3242ovnfk metriche 2025-04-17 16:42:54,702 394001 0,66 0,21

Questo record verrà trasformato in:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
Importante

Gli URL predefiniti utilizzati dal plug-in per recuperare i LiveAnalytics dati in S3 scadono quando si verifica la scadenza impostata o le credenziali IAM utilizzate per generarli scadono (massimo 7 giorni). Consigliamo di eseguire il client di migrazione su un'istanza EC2 (un'istanza è sufficiente) perché l't3.mediumistanza EC2 ruota automaticamente le credenziali IAM, rimuovendo i vincoli temporali degli URL predefiniti durante la migrazione. Se non si utilizza un'istanza EC2, le migrazioni possono essere riprese e set di dati di grandi dimensioni possono richiedere più chiamate di ripristino.

Il plug-in Timestream for LiveAnalytics migration è consigliato per le migrazioni con meno di 1 miliardo di record o 125 GB all'interno di un singolo database. LiveAnalytics

Il plug-in di migrazione deve essere utilizzato solo su un singolo nodo di processo nel cluster. È possibile determinare il nodo del processo utilizzando list-db-instances-for-cluster e impostandolo sull'INFLUXDB3_HOST_URLendpoint di una delle istanze del database che ha una modalità di istanza di tipoPROCESS, oppure è possibile utilizzare la console Timestream e selezionare il cluster per trovare il nodo del processo.

Caratteristiche principali:

  • Esporta i dati delle serie temporali da Timestream for in un bucket S3 utilizzando il comando LiveAnalytics UNLOAD.

  • Genera predefiniti per ogni oggetto S3 da URLs migrare.

  • Tiene traccia del processo di migrazione per ogni oggetto S3.

  • Pulisce gli oggetti S3 dopo una migrazione riuscita.

  • Supporta la ripresa di una migrazione non riuscita in caso di scadenza dell'URL preimpostato.

Esempio di utilizzo:

# Migrate a LiveAnalytics database to InfluxDB 3 export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>" export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>" export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>" aws s3api create-bucket --bucket <your S3 bucket name> \ --object-lock-enabled-for-bucket --region <your region> \ --create-bucket-configuration LocationConstraint=<your region>
Nota

Aggiorna la policy del bucket S3 con la policy del bucket di esempio nel README. Per ulteriori informazioni, consulta Prerequisiti.

python3 liveanalytics_influxdb3_migration_client.py \ --live-analytics-database-name <your LiveAnalytics database name> \ --s3-bucket-name <your S3 bucket name>

Output: il timestream per il LiveAnalytics database viene trasformato in protocollo di linea e inserito nel database InfluxDB 3.

Plugin per il rilevamento delle anomalie

Rilevamento delle anomalie basato su MAD

  • Tipi di trigger: scrittura dei dati (in tempo reale)

  • Casi d'uso: rilevamento dei valori anomali in tempo reale per lo streaming di dati, monitoraggio dei sensori, controllo della qualità.

  • GitHub: Documentazione sul rilevamento delle anomalie MAD

Come funziona: utilizza la deviazione assoluta mediana (MAD) per stabilire una solida linea di base per il comportamento normale. Man mano che arrivano nuovi dati, calcola a quanti punti si trova la MADs distanza dalla mediana. I punti che superano la soglia (k * MAD) vengono contrassegnati come anomalie.

Caratteristiche principali:

  • Elaborazione in tempo reale durante la scrittura dei dati.

  • Mantiene le finestre scorrevoli in memoria per una maggiore efficienza.

  • Avvisi basati sul conteggio (ad esempio, 5 anomalie consecutive).

  • Avvisi basati sulla durata (ad esempio, anomalia per 2 minuti).

  • Soppressione del ribaltamento per evitare che l'affaticamento degli allarmi cambi rapidamente i valori.

Esempio di utilizzo:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

Output: invia notifiche in tempo reale quando vengono rilevate anomalie, inclusi il nome del campo, il valore e la durata.

Plugin per la trasformazione dei dati

Trasformazione di base

Come funziona: applica una catena di trasformazioni ai nomi e ai valori dei campi. Può elaborare i dati storici in batch (pianificati) o trasformare i dati non appena arrivano (scrittura dei dati). Le trasformazioni vengono applicate nell'ordine specificato, consentendo pipeline di dati complesse.

Caratteristiche principali:

  • Trasformazioni dei nomi di campo: snake_case, rimuovi spazi, solo alfanumerico.

  • Conversioni di unità: temperatura, pressione, lunghezza, unità di tempo.

  • Sostituzioni di stringhe personalizzate con supporto regex.

  • Modalità dry-run per testare senza scrivere dati.

  • Elaborazione in batch per dati storici.

Esempio di utilizzo:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

Output: crea una nuova tabella con dati trasformati, preservando i timestamp e i tag originali.

Downsampler

  • Tipi di trigger: Scheduled, HTTP

  • Casi d'uso: riduzione dei dati, ottimizzazione dello storage a lungo termine, creazione di statistiche di riepilogo, miglioramento delle prestazioni.

  • GitHub: Documentazione Downsampler

Come funziona: aggrega dati di serie temporali ad alta risoluzione in riepiloghi a bassa risoluzione. Ad esempio, converte i dati di 1 secondo in medie di 1 ora. Ogni punto sottoposto a downsampling include metadati sul numero di punti originali compressi e sull'intervallo di tempo coperto.

Caratteristiche principali:

  • Molteplici funzioni di aggregazione: avg, sum, min, max, median, derivative.

  • Aggregazioni specifiche per campo (funzioni diverse per campi diversi).

  • Tracciamento dei metadati (record_count, time_from, time_to).

  • API HTTP per il downsampling su richiesta con backfill.

  • Dimensioni dei batch configurabili per set di dati di grandi dimensioni.

Esempio di utilizzo:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

Output: crea dati sottocampionati con valori aggregati e colonne di metadati che mostrano il numero di punti compressi e l'intervallo di tempo.

Plugin di monitoraggio e avviso

Monitoraggio del cambio di stato

  • Tipi di trigger: programmato, scrittura dei dati

  • Casi d'uso: monitoraggio dello stato, tracciamento dello stato delle apparecchiature, monitoraggio dei processi, rilevamento delle modifiche.

  • GitHub: Documentazione sulla modifica dello stato

Come funziona: tiene traccia delle variazioni dei valori dei campi nel tempo e avvisa quando il numero di modifiche supera le soglie configurate. È in grado di rilevare sia le variazioni di valore (valori diversi) sia condizioni di valore specifiche (pari a un valore target). Include controlli di stabilità per prevenire allarmi dovuti a segnali rumorosi.

Caratteristiche principali:

  • Rilevamento delle modifiche basato sul conteggio (ad esempio, cinque modifiche in dieci minuti).

  • Monitoraggio basato sulla durata (ad esempio, status = «errore» per cinque minuti).

  • Finestra di cambio di stato per la riduzione del rumore.

  • Monitoraggio multicampo con soglie indipendenti.

  • Requisiti di stabilità configurabili.

Esempio di utilizzo:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

Output: gli avvisi includono il nome del campo, il numero di modifiche rilevate, la finestra temporale e i valori dei tag pertinenti.

Raccoglitore di metriche di sistema

  • Tipi di trigger: programmati

  • Casi d'uso: monitoraggio dell'infrastruttura, linee di base delle prestazioni, pianificazione della capacità, monitoraggio delle risorse.

  • GitHub: Documentazione sulle metriche di sistema

Come funziona: utilizza la libreria psutil per raccogliere metriche di sistema complete dall'host che esegue InfluxDB. Raccoglie statistiche su CPU, memoria, disco e rete a intervalli configurabili. Ogni tipo di metrica può essere abilitato/disabilitato indipendentemente.

Caratteristiche principali:

  • Statistiche della CPU per core con medie di carico.

  • Utilizzo della memoria, inclusi swap e errori di pagina.

  • I/O Metriche del disco con IOPS e latenza calcolati.

  • Statistiche dell'interfaccia di rete con tracciamento degli errori.

  • Raccolta di metriche configurabile (abilita/disabilita tipi specifici).

  • Riprova automatica in caso di errori di raccolta.

Esempio di utilizzo:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

Output: crea più tabelle (system_cpu, system_memory, system_disk_io, ecc.) con metriche dettagliate per ogni sottosistema.

Modelli di configurazione comuni

Utilizzo dei file di configurazione TOML

Per configurazioni complesse, usa i file TOML anziché gli argomenti in linea:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

Plugin di concatenamento

Crea pipeline di elaborazione dati concatenando più plugin:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

Le migliori pratiche per i plugin

  • Inizia in modo prudente: inizia con soglie più alte e finestre più lunghe, quindi regola in base ai modelli osservati.

  • Test in fase di sviluppo: utilizza le modalità dry-run e testa i database prima dell'implementazione in produzione.

  • Monitora le prestazioni dei plug-in: controlla i tempi di esecuzione e l'utilizzo delle risorse nelle tabelle di sistema.

  • Utilizza i tipi di trigger appropriati: scegli la pianificazione per l'elaborazione in batch, la scrittura dei dati in tempo reale.

  • Configura le notifiche con saggezza: utilizza i livelli di gravità e la logica di debounce per prevenire l'affaticamento degli avvisi.

  • Sfrutta la persistenza dei modelli: per i plug-in basati su ML, salva i modelli addestrati per garantire la coerenza.

  • Configurazioni dei documenti: utilizza nomi di trigger descrittivi e gestisci la documentazione di configurazione.

Monitora l'esecuzione dei plugin

Per monitorare le prestazioni del plugin:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

Risolvi i problemi più comuni

La tabella seguente mostra i problemi comuni e le possibili soluzioni.

Problema Soluzione
Il plugin non si attiva Verifica che il trigger sia abilitato, controlla la sintassi schedule/spec
Notifiche mancanti Conferma l'installazione del plugin Notifier, controlla webhook URLs
Elevato utilizzo della memoria Riduci le dimensioni delle finestre, regola gli intervalli di elaborazione in batch
Trasformazioni errate Usa la modalità dry-run, verifica i nomi dei campi e i tipi di dati
Imprecisione delle previsioni Aumenta la finestra dei dati di allenamento, regola le impostazioni della stagionalità
Troppi avvisi Aumenta il numero di trigger, aggiungi la durata del rimbalzo, regola le soglie

Questi plugin certificati offrono funzionalità pronte per l'uso aziendale per le comuni esigenze di elaborazione dei dati di serie temporali, eliminando la necessità di uno sviluppo personalizzato e mantenendo la flessibilità grazie a opzioni di configurazione complete. Visita il GitHubrepository per documentazione dettagliata, esempi e aggiornamenti.