Per funzionalità simili a Amazon Timestream for, prendi in considerazione Amazon Timestream LiveAnalytics per InfluxDB. Offre un'acquisizione semplificata dei dati e tempi di risposta alle query di una sola cifra di millisecondi per analisi in tempo reale. Scopri di più qui.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Estendi Timestream per InfluxDB con i plugin del motore di elaborazione
Il motore di elaborazione è una macchina virtuale Python incorporata che viene eseguita all'interno del database InfluxDB 3 in Amazon Timestream. È disponibile nelle edizioni Core ed Enterprise. Consente di estendere il database con codice Python personalizzato in grado di automatizzare i flussi di lavoro, trasformare i dati e creare endpoint API personalizzati.
Il motore di elaborazione esegue i plugin Python in risposta a eventi specifici del database:
-
Scrittura dei dati: elabora e trasforma i dati man mano che entrano nel database
-
Eventi pianificati: esegui il codice a intervalli definiti o orari specifici
-
Richieste HTTP: esponete gli endpoint API personalizzati che eseguono il codice
Il motore include una cache in memoria per la gestione dello stato tra le esecuzioni, che consente di creare applicazioni con stato direttamente all'interno del database.
InfluxData plugin certificati
Al momento del lancio, InfluxDB 3 include un set di plugin predefiniti e completamente configurabili certificati da: InfluxData
-
Trasformazione dei dati: elabora e arricchisci i dati in arrivo
-
Avvisi: invia notifiche in base alle soglie di dati
-
Aggregazione: calcola le statistiche sui dati delle serie temporali
-
Monitoraggio del sistema: monitora l'utilizzo delle risorse e le metriche sanitarie
-
Integrazione: Connect a servizi esterni e APIs
Questi plugin certificati sono pronti all'uso e possono essere configurati tramite argomenti di attivazione per soddisfare requisiti specifici.
Tipi di plug-in e specifiche dei trigger
| Tipo di plugin | Specificazione del trigger | Quando viene eseguito il plugin | Casi d'uso |
|---|---|---|---|
| Scrittura dei dati | table:<TABLE_NAME>o all_tables |
Quando i dati vengono scritti su tabelle | Trasformazione dei dati, avvisi, metriche derivate |
| Pianificato | every:<DURATION>o cron:<EXPRESSION> |
A intervalli specificati | Aggregazione periodica, report, controlli sanitari |
| Richiesta HTTP |
request:<REQUEST_PATH>
|
Quando vengono ricevute richieste HTTP | Webhook personalizzati APIs, interfacce utente |
Creazione di trigger
I trigger collegano i plugin agli eventi del database e definiscono quando vengono eseguiti. Utilizza il comando influxdb3 create trigger.
Per creare un trigger di scrittura dei dati:
# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor
Per creare un trigger pianificato:
# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report
Per creare un trigger di richiesta HTTP:
# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor
Accedi all'endpoint all'indirizzo: https://your-cluster-endpoint:8086/api/v3/engine/webhook
Configura i trigger
Passare argomenti ai plugin
Configura il comportamento del plugin usando gli argomenti di attivazione:
influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor
Gli argomenti vengono passati al plugin come dizionario:
def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic
Comportamento di gestione degli errori
Configura il modo in cui i trigger gestiscono gli errori:
# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor
Esecuzione asincrona
Consenti l'esecuzione simultanea di più istanze trigger:
influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor
Gestisci i trigger
Per visualizzare i trigger per un database:
# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN
Esclusione dalla tabella per i trigger di scrittura
Per filtrare le tabelle all'interno del codice del plugin quando si utilizza: all_tables
influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor
L'implementazione del plugin è la seguente:
def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables
Considerazioni sulla distribuzione distribuita
Nelle distribuzioni multinodo, configura i plugin in base ai ruoli dei nodi:
| Tipo di plugin | Tipo di nodo | Motivo |
|---|---|---|
| Plugin per la scrittura dei dati | Nodi di ingestione | Elabora i dati al punto di ingestione |
| Plugin di richiesta HTTP | Nodi Querier | Gestisci il traffico API |
| Plugin pianificati | Qualsiasi nodo configurato | Può essere eseguito su qualsiasi nodo con scheduler |
Le seguenti considerazioni sono importanti per le implementazioni aziendali:
-
Mantieni configurazioni di plug-in identiche su tutti i nodi pertinenti.
-
Indirizza i client esterni (Grafana, dashboard) ai nodi di interrogazione.
-
Assicurati che i plugin siano disponibili sui nodi in cui vengono eseguiti i relativi trigger.
Best practice
-
Configurazione del plugin
-
Utilizza gli argomenti di attivazione per i valori configurabili anziché la codifica rigida.
-
Implementa una corretta gestione degli errori all'interno dei plugin.
-
Usa l'
influxdb3_localAPI per le operazioni del database.
-
-
Ottimizzazione delle prestazioni
-
Utilizza l'esecuzione asincrona per attività di elaborazione pesanti.
-
Implementa i rendimenti anticipati per i dati filtrati.
-
Riduci al minimo le interrogazioni al database all'interno dei plugin.
-
-
Gestione degli errori
-
Scegli il comportamento di errore appropriato (registra, riprova o disabilita).
-
Monitora l'esecuzione dei plugin tramite le tabelle di sistema.
-
Testa a fondo i plugin prima dell'implementazione in produzione.
-
-
Considerazioni sulla sicurezza
-
Convalida tutti i dati di input nei plugin di richiesta HTTP.
-
Utilizza metodi sicuri per archiviare configurazioni sensibili.
-
Limita le autorizzazioni dei plug-in solo alle operazioni richieste.
-
Monitora l'esecuzione dei plugin
Interroga le tabelle di sistema per monitorare le prestazioni dei plugin:
-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'
Il motore di elaborazione offre un modo efficace per estendere la funzionalità di InfluxDB 3 mantenendo la logica di elaborazione dei dati vicina ai dati, riducendo la latenza e semplificando l'architettura.
InfluxData plugin certificati
Amazon Timestream for InfluxDB 3 include un set completo di plugin certificati predefiniti che estendono le funzionalità del database senza richiedere uno sviluppo personalizzato. Questi plugin sono completamente configurabili e pronti all'uso al momento del lancio e offrono funzionalità avanzate per l'elaborazione dei dati, il monitoraggio e l'invio di avvisi.
Per la documentazione completa e il codice sorgente, visita il Plugins Repository. InfluxData
Plugin disponibili
Timestream per il plugin di migrazione LiveAnalytics
Migra un database da Timestream for a Timestream per InfluxDB LiveAnalytics
-
Tipi di trigger: HTTP
-
Casi d'uso: migrazione di un database di serie temporali da Timestream for LiveAnalytics a Timestream per InfluxDB.
-
GitHub: LiveAnalytics Documentazione del plug-in Timestream per la migrazione
Come funziona: il plugin Timestream for LiveAnalytics migration funziona all'unisono con il client Timestream for migration. LiveAnalytics
Migliori pratiche: il plug-in Timestream for LiveAnalytics migration deve essere eseguito su un singolo nodo InfluxDB 3 Enterprise. Assicurati che l'endpoint InfluxDB 3 utilizzato con il client del plug-in sia un endpoint del nodo di processo anziché l'endpoint del cluster. Il cluster che esegue la migrazione non deve eseguire operazioni di acquisizione o interrogazione mentre il plug-in di migrazione è in esecuzione, poiché ciò potrebbe causare errori di esaurimento della memoria.
Le prestazioni di migrazione dipendono dalle risorse disponibili per il nodo InfluxDB 3 e dalle caratteristiche dei dati da migrare. Nei nostri test, abbiamo osservato un throughput di 30.000.000 LiveAnalytics di record migrati all'ora. Le prestazioni effettive possono variare in base a diversi fattori.
Mappatura dei dati: la tabella seguente mostra come Timestream for LiveAnalytics data viene mappato ai dati del protocollo di linea.
| Timestream for Concept LiveAnalytics | Concetto di protocollo di linea |
|---|---|
| Tabella | Misurazione |
| Dimensioni | Tag |
| Nome della misura | Tag |
| Misure | Campi |
| Time (Orario) | Time stamp |
Trasformazione dei record a misura singola: Di seguito è riportato un record a misura singola in Timestream per la LiveAnalytics tabella: example_table
| host | region | request_id | measure_name | time | measure_value::double |
|---|---|---|---|---|---|
| host 1 | us-west-2 | saio3242ovnfk | cpu_usage | 2025-04-17 16:42:54,702 394001 | 0,66 |
Questo record verrà trasformato in:
example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001
Trasformazione di record con più misure: quello che segue è un record con più misure in Timestream for LiveAnalytics nella tabella example_table con tutto a destra di essere misurato: time
| host | region | request_id | measure_name | time | cpu_usage | uso_memoria |
|---|---|---|---|---|---|---|
| host 1 | us-west-2 | saio3242ovnfk | metriche | 2025-04-17 16:42:54,702 394001 | 0,66 | 0,21 |
Questo record verrà trasformato in:
example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
Importante
Gli URL predefiniti utilizzati dal plug-in per recuperare i LiveAnalytics dati in S3 scadono quando si verifica la scadenza impostata o le credenziali IAM utilizzate per generarli scadono (massimo 7 giorni). Consigliamo di eseguire il client di migrazione su un'istanza EC2 (un'istanza è sufficiente) perché l't3.mediumistanza EC2 ruota automaticamente le credenziali IAM, rimuovendo i vincoli temporali degli URL predefiniti durante la migrazione. Se non si utilizza un'istanza EC2, le migrazioni possono essere riprese e set di dati di grandi dimensioni possono richiedere più chiamate di ripristino.
Il plug-in Timestream for LiveAnalytics migration è consigliato per le migrazioni con meno di 1 miliardo di record o 125 GB all'interno di un singolo database. LiveAnalytics
Il plug-in di migrazione deve essere utilizzato solo su un singolo nodo di processo nel cluster. È possibile determinare il nodo del processo utilizzando list-db-instances-for-cluster e impostandolo sull'INFLUXDB3_HOST_URLendpoint di una delle istanze del database che ha una modalità di istanza di tipoPROCESS, oppure è possibile utilizzare la console Timestream e selezionare il cluster per trovare il nodo del processo.
Caratteristiche principali:
-
Esporta i dati delle serie temporali da Timestream for in un bucket S3 utilizzando il comando LiveAnalytics UNLOAD.
-
Genera predefiniti per ogni oggetto S3 da URLs migrare.
-
Tiene traccia del processo di migrazione per ogni oggetto S3.
-
Pulisce gli oggetti S3 dopo una migrazione riuscita.
-
Supporta la ripresa di una migrazione non riuscita in caso di scadenza dell'URL preimpostato.
Esempio di utilizzo:
# Migrate a LiveAnalytics database to InfluxDB 3 export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>" export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>" export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>" aws s3api create-bucket --bucket <your S3 bucket name> \ --object-lock-enabled-for-bucket --region <your region> \ --create-bucket-configuration LocationConstraint=<your region>
Nota
Aggiorna la policy del bucket S3 con la policy del bucket di esempio nel README. Per ulteriori informazioni, consulta Prerequisiti
python3 liveanalytics_influxdb3_migration_client.py \ --live-analytics-database-name <your LiveAnalytics database name> \ --s3-bucket-name <your S3 bucket name>
Output: il timestream per il LiveAnalytics database viene trasformato in protocollo di linea e inserito nel database InfluxDB 3.
Plugin per il rilevamento delle anomalie
Rilevamento delle anomalie basato su MAD
-
Tipi di trigger: scrittura dei dati (in tempo reale)
-
Casi d'uso: rilevamento dei valori anomali in tempo reale per lo streaming di dati, monitoraggio dei sensori, controllo della qualità.
-
GitHub: Documentazione sul rilevamento delle anomalie MAD
Come funziona: utilizza la deviazione assoluta mediana (MAD) per stabilire una solida linea di base per il comportamento normale. Man mano che arrivano nuovi dati, calcola a quanti punti si trova la MADs distanza dalla mediana. I punti che superano la soglia (k * MAD) vengono contrassegnati come anomalie.
Caratteristiche principali:
-
Elaborazione in tempo reale durante la scrittura dei dati.
-
Mantiene le finestre scorrevoli in memoria per una maggiore efficienza.
-
Avvisi basati sul conteggio (ad esempio, 5 anomalie consecutive).
-
Avvisi basati sulla durata (ad esempio, anomalia per 2 minuti).
-
Soppressione del ribaltamento per evitare che l'affaticamento degli allarmi cambi rapidamente i valori.
Esempio di utilizzo:
# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly
Output: invia notifiche in tempo reale quando vengono rilevate anomalie, inclusi il nome del campo, il valore e la durata.
Plugin per la trasformazione dei dati
Trasformazione di base
-
Tipi di trigger: programmato, scrittura dei dati
-
Casi d'uso: standardizzazione dei dati, conversioni di unità, normalizzazione dei nomi di campo, pulizia dei dati.
Come funziona: applica una catena di trasformazioni ai nomi e ai valori dei campi. Può elaborare i dati storici in batch (pianificati) o trasformare i dati non appena arrivano (scrittura dei dati). Le trasformazioni vengono applicate nell'ordine specificato, consentendo pipeline di dati complesse.
Caratteristiche principali:
-
Trasformazioni dei nomi di campo: snake_case, rimuovi spazi, solo alfanumerico.
-
Conversioni di unità: temperatura, pressione, lunghezza, unità di tempo.
-
Sostituzioni di stringhe personalizzate con supporto regex.
-
Modalità dry-run per testare senza scrivere dati.
-
Elaborazione in batch per dati storici.
Esempio di utilizzo:
# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner
Output: crea una nuova tabella con dati trasformati, preservando i timestamp e i tag originali.
Downsampler
-
Tipi di trigger: Scheduled, HTTP
-
Casi d'uso: riduzione dei dati, ottimizzazione dello storage a lungo termine, creazione di statistiche di riepilogo, miglioramento delle prestazioni.
-
GitHub: Documentazione Downsampler
Come funziona: aggrega dati di serie temporali ad alta risoluzione in riepiloghi a bassa risoluzione. Ad esempio, converte i dati di 1 secondo in medie di 1 ora. Ogni punto sottoposto a downsampling include metadati sul numero di punti originali compressi e sull'intervallo di tempo coperto.
Caratteristiche principali:
-
Molteplici funzioni di aggregazione: avg, sum, min, max, median, derivative.
-
Aggregazioni specifiche per campo (funzioni diverse per campi diversi).
-
Tracciamento dei metadati (record_count, time_from, time_to).
-
API HTTP per il downsampling su richiesta con backfill.
-
Dimensioni dei batch configurabili per set di dati di grandi dimensioni.
Esempio di utilizzo:
# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'
Output: crea dati sottocampionati con valori aggregati e colonne di metadati che mostrano il numero di punti compressi e l'intervallo di tempo.
Plugin di monitoraggio e avviso
Monitoraggio del cambio di stato
-
Tipi di trigger: programmato, scrittura dei dati
-
Casi d'uso: monitoraggio dello stato, tracciamento dello stato delle apparecchiature, monitoraggio dei processi, rilevamento delle modifiche.
Come funziona: tiene traccia delle variazioni dei valori dei campi nel tempo e avvisa quando il numero di modifiche supera le soglie configurate. È in grado di rilevare sia le variazioni di valore (valori diversi) sia condizioni di valore specifiche (pari a un valore target). Include controlli di stabilità per prevenire allarmi dovuti a segnali rumorosi.
Caratteristiche principali:
-
Rilevamento delle modifiche basato sul conteggio (ad esempio, cinque modifiche in dieci minuti).
-
Monitoraggio basato sulla durata (ad esempio, status = «errore» per cinque minuti).
-
Finestra di cambio di stato per la riduzione del rumore.
-
Monitoraggio multicampo con soglie indipendenti.
-
Requisiti di stabilità configurabili.
Esempio di utilizzo:
# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor
Output: gli avvisi includono il nome del campo, il numero di modifiche rilevate, la finestra temporale e i valori dei tag pertinenti.
Raccoglitore di metriche di sistema
-
Tipi di trigger: programmati
-
Casi d'uso: monitoraggio dell'infrastruttura, linee di base delle prestazioni, pianificazione della capacità, monitoraggio delle risorse.
-
GitHub: Documentazione sulle metriche di sistema
Come funziona: utilizza la libreria psutil per raccogliere metriche di sistema complete dall'host che esegue InfluxDB. Raccoglie statistiche su CPU, memoria, disco e rete a intervalli configurabili. Ogni tipo di metrica può essere abilitato/disabilitato indipendentemente.
Caratteristiche principali:
-
Statistiche della CPU per core con medie di carico.
-
Utilizzo della memoria, inclusi swap e errori di pagina.
-
I/O Metriche del disco con IOPS e latenza calcolati.
-
Statistiche dell'interfaccia di rete con tracciamento degli errori.
-
Raccolta di metriche configurabile (abilita/disabilita tipi specifici).
-
Riprova automatica in caso di errori di raccolta.
Esempio di utilizzo:
# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics
Output: crea più tabelle (system_cpu, system_memory, system_disk_io, ecc.) con metriche dettagliate per ogni sottosistema.
Modelli di configurazione comuni
Utilizzo dei file di configurazione TOML
Per configurazioni complesse, usa i file TOML anziché gli argomenti in linea:
# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector
Plugin di concatenamento
Crea pipeline di elaborazione dati concatenando più plugin:
# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly
Le migliori pratiche per i plugin
-
Inizia in modo prudente: inizia con soglie più alte e finestre più lunghe, quindi regola in base ai modelli osservati.
-
Test in fase di sviluppo: utilizza le modalità dry-run e testa i database prima dell'implementazione in produzione.
-
Monitora le prestazioni dei plug-in: controlla i tempi di esecuzione e l'utilizzo delle risorse nelle tabelle di sistema.
-
Utilizza i tipi di trigger appropriati: scegli la pianificazione per l'elaborazione in batch, la scrittura dei dati in tempo reale.
-
Configura le notifiche con saggezza: utilizza i livelli di gravità e la logica di debounce per prevenire l'affaticamento degli avvisi.
-
Sfrutta la persistenza dei modelli: per i plug-in basati su ML, salva i modelli addestrati per garantire la coerenza.
-
Configurazioni dei documenti: utilizza nomi di trigger descrittivi e gestisci la documentazione di configurazione.
Monitora l'esecuzione dei plugin
Per monitorare le prestazioni del plugin:
-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';
Risolvi i problemi più comuni
La tabella seguente mostra i problemi comuni e le possibili soluzioni.
| Problema | Soluzione |
|---|---|
| Il plugin non si attiva | Verifica che il trigger sia abilitato, controlla la sintassi schedule/spec |
| Notifiche mancanti | Conferma l'installazione del plugin Notifier, controlla webhook URLs |
| Elevato utilizzo della memoria | Riduci le dimensioni delle finestre, regola gli intervalli di elaborazione in batch |
| Trasformazioni errate | Usa la modalità dry-run, verifica i nomi dei campi e i tipi di dati |
| Imprecisione delle previsioni | Aumenta la finestra dei dati di allenamento, regola le impostazioni della stagionalità |
| Troppi avvisi | Aumenta il numero di trigger, aggiungi la durata del rimbalzo, regola le soglie |
Questi plugin certificati offrono funzionalità pronte per l'uso aziendale per le comuni esigenze di elaborazione dei dati di serie temporali, eliminando la necessità di uno sviluppo personalizzato e mantenendo la flessibilità grazie a opzioni di configurazione complete. Visita il GitHubrepository