View a markdown version of this page

Erweitern Sie Timestream für InfluxDB mit Processing Engine-Plugins - Amazon Timestream

Für ähnliche Funktionen wie Amazon Timestream für sollten Sie Amazon Timestream for LiveAnalytics InfluxDB in Betracht ziehen. Es bietet eine vereinfachte Datenaufnahme und Antwortzeiten im einstelligen Millisekundenbereich für Analysen in Echtzeit. Erfahren Sie hier mehr.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Erweitern Sie Timestream für InfluxDB mit Processing Engine-Plugins

Die Processing Engine ist eine eingebettete virtuelle Python-Maschine, die in Ihrer InfluxDB 3-Datenbank in Amazon Timestream läuft. Sie ist sowohl in der Core- als auch in der Enterprise-Edition verfügbar. Es ermöglicht Ihnen, Ihre Datenbank mit benutzerdefiniertem Python-Code zu erweitern, der Workflows automatisieren, Daten transformieren und benutzerdefinierte API-Endpunkte erstellen kann.

Die Processing Engine führt Python-Plugins als Reaktion auf bestimmte Datenbankereignisse aus:

  • Schreibvorgänge an Daten: Verarbeiten und transformieren Sie Daten, sobald sie in die Datenbank gelangen

  • Geplante Ereignisse: Führen Sie Code in definierten Intervallen oder zu bestimmten Zeiten aus

  • HTTP-Anfragen: Machen Sie benutzerdefinierte API-Endpunkte verfügbar, die Ihren Code ausführen

Die Engine enthält einen In-Memory-Cache zur Verwaltung des Status zwischen den Ausführungen, sodass Sie statusbehaftete Anwendungen direkt in Ihrer Datenbank erstellen können.

InfluxData zertifizierte Plug-ins

Bei der Markteinführung enthält InfluxDB 3 eine Reihe vorgefertigter, vollständig konfigurierbarer Plugins, zertifiziert von: InfluxData

  • Datentransformation: Verarbeiten und bereichern Sie eingehende Daten

  • Warnmeldungen: Senden Sie Benachrichtigungen auf der Grundlage von Datenschwellenwerten

  • Aggregation: Berechnet Statistiken zu Zeitreihendaten

  • Systemüberwachung: Verfolgen Sie die Kennzahlen zur Ressourcennutzung und zum Zustand

  • Integration: Connect zu externen Diensten her und APIs

Diese zertifizierten Plugins sind sofort einsatzbereit und können über Triggerargumente konfiguriert werden, um Ihre spezifischen Anforderungen zu erfüllen.

Plugin-Typen und Trigger-Spezifikationen

Typ des Plug-ins Trigger-Spezifikation Wenn das Plugin ausgeführt wird Anwendungsfälle
Daten schreiben table:<TABLE_NAME>oder all_tables Wenn Daten in Tabellen geschrieben werden Datentransformation, Warnmeldungen, abgeleitete Metriken
Geplant every:<DURATION>oder cron:<EXPRESSION> In bestimmten Intervallen Regelmäßige Aggregation, Berichte, Gesundheitschecks
HTTP-Anfrage request:<REQUEST_PATH> Wenn HTTP-Anfragen empfangen werden Benutzerdefiniert APIs, Webhooks, Benutzeroberflächen

Erstellen von -Auslösern

Trigger verbinden Plugins mit Datenbankereignissen und definieren, wann sie ausgeführt werden. Verwenden Sie den Befehl influxdb3 create trigger.

Um einen Trigger für das Schreiben von Daten zu erstellen:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

Um einen geplanten Trigger zu erstellen:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

So erstellen Sie einen HTTP-Anforderungs-Trigger:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

Greifen Sie auf den Endpunkt zu unter: https://your-cluster-endpoint:8086/api/v3/engine/webhook

Trigger konfigurieren

Argumente an Plugins übergeben

Konfigurieren Sie das Plugin-Verhalten mithilfe von Triggerargumenten:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

Argumente werden als Wörterbuch an das Plugin übergeben:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

Verhalten bei der Fehlerbehandlung

Konfigurieren Sie, wie Trigger mit Fehlern umgehen:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

Asynchrone Ausführung

Erlauben Sie die gleichzeitige Ausführung mehrerer Triggerinstanzen:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

Trigger verwalten

So zeigen Sie Trigger für eine Datenbank an:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

Tabellenausschluss für Schreib-Trigger

Um Tabellen in Ihrem Plugin-Code zu filtern, wenn Sie Folgendes verwendenall_tables:

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

Die Plugin-Implementierung sieht wie folgt aus:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

Überlegungen zur verteilten Bereitstellung

In Bereitstellungen mit mehreren Knoten konfigurieren Sie Plugins auf der Grundlage von Knotenrollen:

Typ des Plug-ins Node Type Grund
Plugins zum Schreiben von Daten Ingester-Knoten Daten am Aufnahmepunkt verarbeiten
Plug-ins für HTTP-Anfragen Querier-Knoten Behandeln Sie den API-Verkehr
Geplante Plugins Jeder konfigurierte Knoten Kann auf jedem Knoten mit Scheduler ausgeführt werden

Die folgenden Überlegungen sind wichtig für Bereitstellungen in Unternehmen:

  • Sorgen Sie für identische Plugin-Konfigurationen auf allen relevanten Knoten.

  • Leiten Sie externe Clients (Grafana, Dashboards) an Querier-Knoten weiter.

  • Stellen Sie sicher, dass Plugins auf Knoten verfügbar sind, auf denen ihre Trigger ausgeführt werden.

Best Practices

  • Plugin-Konfiguration

    • Verwenden Sie Triggerargumente für konfigurierbare Werte, anstatt sie fest zu codieren.

    • Implementieren Sie die richtige Fehlerbehandlung in Plugins.

    • Verwenden Sie die influxdb3_local API für Datenbankoperationen.

  • Optimierung der Leistung

    • Verwenden Sie die asynchrone Ausführung für umfangreiche Verarbeitungsaufgaben.

    • Implementieren Sie vorzeitige Rückgaben für gefilterte Daten.

    • Minimiere Datenbankabfragen innerhalb von Plugins.

  • Verwaltung von Fehlern

    • Wählen Sie ein geeignetes Fehlerverhalten (protokollieren, erneut versuchen oder deaktivieren).

    • Überwachen Sie die Plugin-Ausführung anhand von Systemtabellen.

    • Testen Sie Plugins gründlich, bevor Sie sie in der Produktion einsetzen.

  • Überlegungen zur Sicherheit

    • Überprüfen Sie alle Eingabedaten in HTTP-Anforderungs-Plugins.

    • Verwenden Sie sichere Methoden zum Speichern vertraulicher Konfigurationen.

    • Beschränken Sie die Plugin-Berechtigungen nur auf die erforderlichen Operationen.

Überwachen Sie die Ausführung des Plugins

Fragen Sie Systemtabellen ab, um die Leistung des Plugins zu überwachen:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

Die Processing Engine bietet eine leistungsstarke Möglichkeit, die InfluxDB 3-Funktionalität zu erweitern und gleichzeitig Ihre Datenverarbeitungslogik nah an Ihren Daten zu halten, die Latenz zu reduzieren und Ihre Architektur zu vereinfachen.

InfluxData zertifizierte Plugins

Amazon Timestream for InfluxDB 3 umfasst einen umfassenden Satz vorgefertigter, zertifizierter Plugins, die die Datenbankfunktionalität erweitern, ohne dass eine kundenspezifische Entwicklung erforderlich ist. Diese Plugins sind vollständig konfigurierbar und sofort einsatzbereit. Sie bieten erweiterte Funktionen für Datenverarbeitung, Überwachung und Warnmeldungen.

Die vollständige Dokumentation und den Quellcode finden Sie im InfluxDataPlugins-Repository.

Verfügbare Plugins

Timestream für das LiveAnalytics Migrations-Plugin

Migrieren Sie eine Datenbank von Timestream for LiveAnalytics zu Timestream for InfluxDB

So funktioniert es: Das Timestream for LiveAnalytics Migration Plugin funktioniert zusammen mit dem Timestream for Migration Client. LiveAnalytics Der Client führt den Befehl Timestream for LiveAnalytics UNLOAD aus, um eine LiveAnalytics Datenbank im Parquet-Format in einen S3-Bucket zu exportieren. Nachdem die Daten exportiert wurden, generiert der Client URLs für die Parquet-Dateien vorsignierte Dateien und ruft das Migrations-Plugin mit den vorsignierten Dateien auf. URLs Während der Ausführung des Plugins werden die S3-Objekte aus dem S3-Bucket abgerufen und in das InfluxDB-Zeilenprotokoll umgewandelt und in eine InfluxDB 3-Datenbank geschrieben.

Bewährte Methoden: Das Timestream for LiveAnalytics Migration Plugin muss auf einem einzelnen InfluxDB 3 Enterprise-Knoten ausgeführt werden. Stellen Sie sicher, dass der mit dem Plugin-Client verwendete InfluxDB 3-Endpunkt ein Prozessknotenendpunkt und nicht der Cluster-Endpunkt ist. Der Cluster, der die Migration durchführt, sollte keine Datenerfassung oder Abfragen durchführen, während das Migrations-Plugin ausgeführt wird, da dies zu Fehlern bei Speichermangel führen könnte.

Die Migrationsleistung hängt von den Ressourcen ab, die dem InfluxDB 3-Knoten zur Verfügung stehen, und von den Eigenschaften der migrierten Daten. In unseren Tests haben wir einen Durchsatz von 30.000.000 LiveAnalytics pro Stunde migrierten Datensätzen beobachtet. Ihre tatsächliche Leistung kann aufgrund verschiedener Faktoren variieren.

Datenzuordnung: Die folgende Tabelle zeigt, wie Timestream für LiveAnalytics Daten den Leitungsprotokolldaten zugeordnet wird.

Timestream für Concept LiveAnalytics Konzept des Leitungsprotokolls
Tabelle Messung
Dimensions (Abmessungen) Tags
Name der Maßnahme Markierung
Maßnahmen Felder
Time (Zeit) Zeitstempel

Transformation von Einzelmessdatensätzen: Das Folgende ist ein Datensatz mit einer einzelnen Kennzahl in Timestream für LiveAnalytics in der Tabelle: example_table

Host Region request_id measure_name time Messwert::doppelt
Host1 us-west-2 saio3242ovnfk cpu_usage 2025-04-17 16:42:54,702 394001 0,66

Dieser Datensatz wird umgewandelt in:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001

Transformation von Datensätzen mit mehreren Messwerten: Im Folgenden finden Sie einen Datensatz mit mehreren Messwerten in Timestream für LiveAnalytics in der Tabelle, example_table bei dem alles rechts von time Messungen steht:

Host Region request_id measure_name time cpu_usage Speicherverbrauch
Host 1 us-west-2 saio3242ovnfk Metriken 2025-04-17 16:42:54,702 394001 0,66 0,21

Dieser Datensatz wird umgewandelt in:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
Wichtig

Die vorsignierten URLs, die das Plugin zum Abrufen der LiveAnalytics Daten in S3 verwendet, laufen ab, wenn entweder ihr festgelegter Ablauf eintritt oder die für ihre Generierung verwendeten IAM-Anmeldeinformationen ablaufen (maximal 7 Tage). Wir empfehlen, den Migrationsclient auf einer EC2-Instance auszuführen (eine t3.medium Instance ist ausreichend), da die EC2-Instance die IAM-Anmeldeinformationen automatisch rotiert, wodurch die vordefinierten URL-Zeitbeschränkungen während der Migration aufgehoben werden. Wenn Sie keine EC2-Instance verwenden, können Migrationen wieder aufgenommen werden, und bei großen Datensätzen sind möglicherweise mehrere Resume-Aufrufe erforderlich.

Das Timestream for LiveAnalytics Migration Plug-in wird für Migrationen mit weniger als 1 Milliarde Datensätzen oder 125 GB innerhalb einer einzigen Datenbank empfohlen. LiveAnalytics

Das Migrations-Plugin sollte nur auf einem einzelnen Prozessknoten im Cluster verwendet werden. Sie können den Prozessknoten ermitteln, indem Sie list-db-instances-for-cluster verwenden und das INFLUXDB3_HOST_URL auf den Endpunkt einer der Datenbankinstanzen setzen, die einen Instanzmodus vom Typ T hatPROCESS, oder Sie können die Timestream-Konsole verwenden und Ihren Cluster auswählen, um den Prozessknoten zu finden.

Die wichtigsten Funktionen:

  • Exportiert die Zeitreihendaten mit dem Befehl UNLOAD aus Timestream for LiveAnalytics in einen S3-Bucket.

  • Generiert eine Vorsignierung URLs für jedes S3-Objekt, das migriert wird.

  • Verfolgt den Migrationsprozess für jedes S3-Objekt.

  • Bereinigt S3-Objekte nach einer erfolgreichen Migration.

  • Unterstützt die Wiederaufnahme einer fehlgeschlagenen Migration im Falle eines Ablaufs der vorab signierten URL.

Beispielverwendung:

# Migrate a LiveAnalytics database to InfluxDB 3 export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>" export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>" export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>" aws s3api create-bucket --bucket <your S3 bucket name> \ --object-lock-enabled-for-bucket --region <your region> \ --create-bucket-configuration LocationConstraint=<your region>
Anmerkung

Aktualisieren Sie die S3-Bucket-Richtlinie mit der Beispiel-Bucket-Richtlinie in der README-Datei. Weitere Informationen finden Sie unter Voraussetzungen.

python3 liveanalytics_influxdb3_migration_client.py \ --live-analytics-database-name <your LiveAnalytics database name> \ --s3-bucket-name <your S3 bucket name>

Ausgabe: Der Timestream für die LiveAnalytics Datenbank wird in das Linienprotokoll umgewandelt und in die InfluxDB 3-Datenbank aufgenommen.

Plugins zur Erkennung von Anomalien

MAD-basierte Anomalieerkennung

  • Triggertypen: Schreiben von Daten (in Echtzeit)

  • Anwendungsfälle: Erkennung von Ausreißern in Echtzeit für Streaming-Daten, Sensorüberwachung, Qualitätskontrolle.

  • GitHub: Dokumentation zur Erkennung von MAD-Anomalien

So funktioniert es: Verwendet die mittlere absolute Abweichung (MAD), um eine solide Ausgangsbasis für normales Verhalten zu ermitteln. Sobald neue Daten eintreffen, wird berechnet, wie viele MADs Punkte vom Median entfernt sind. Punkte, die den Schwellenwert (k * MAD) überschreiten, werden als Anomalien gekennzeichnet.

Die wichtigsten Merkmale:

  • Verarbeitung in Echtzeit, während Daten geschrieben werden.

  • Behält aus Effizienzgründen verschiebbare Fenster im Speicher bei.

  • Auf der Anzahl basierende Warnmeldungen (z. B. 5 aufeinanderfolgende Anomalien).

  • Warnmeldungen auf der Grundlage der Dauer (z. B. Anomalie für 2 Minuten).

  • Unterdrückung des Kippverhaltens, um zu verhindern, dass Warnmeldungen aufgrund sich schnell ändernder Werte übermäßig werden

Beispielverwendung:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

Ausgabe: Sendet Benachrichtigungen in Echtzeit, wenn Anomalien erkannt werden, einschließlich Feldname, Wert und Dauer.

Plugins zur Datentransformation

Grundlegende Transformation

So funktioniert es: Wendet eine Kette von Transformationen auf Feldnamen und Werte an. Kann historische Daten stapelweise verarbeiten (geplant) oder Daten bei Eingang transformieren (Daten schreiben). Transformationen werden in der angegebenen Reihenfolge angewendet, was komplexe Daten-Pipelines ermöglicht.

Die wichtigsten Funktionen:

  • Transformationen von Feldnamen: snake_case, Leerzeichen entfernen, nur alphanumerisch.

  • Umrechnungen von Einheiten: Temperatur, Druck, Länge, Zeiteinheiten.

  • Benutzerdefinierte Zeichenkettenersetzungen mit Regex-Unterstützung.

  • Trockenlaufmodus zum Testen ohne Schreiben von Daten.

  • Stapelverarbeitung für historische Daten.

Beispielverwendung:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

Ausgabe: Erstellt eine neue Tabelle mit transformierten Daten, wobei die ursprünglichen Zeitstempel und Tags beibehalten werden.

Downsampler

  • Triggertypen: Geplant, HTTP

  • Anwendungsfälle: Datenreduzierung, langfristige Speicheroptimierung, Erstellung von zusammenfassenden Statistiken, Leistungsverbesserung.

  • GitHub: Downsampler-Dokumentation

So funktioniert es: Aggregiert hochauflösende Zeitreihendaten zu Zusammenfassungen mit niedrigerer Auflösung. Konvertiert beispielsweise 1-Sekunden-Daten in Durchschnittswerte von einer Stunde. Jeder Punkt, für den ein Downsampling durchgeführt wurde, enthält Metadaten über die Anzahl der komprimierten Originalpunkte und den abgedeckten Zeitraum.

Die wichtigsten Funktionen:

  • Mehrere Aggregationsfunktionen: Durchschnitt, Summe, Min, Max, Median, Ableitung.

  • Feldspezifische Aggregationen (verschiedene Funktionen für verschiedene Felder).

  • Nachverfolgung von Metadaten (record_count, time_from, time_to).

  • HTTP-API für On-Demand-Downsampling mit Backfill.

  • Konfigurierbare Batchgrößen für große Datensätze.

Beispielverwendung:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

Ausgabe: Erstellt Daten mit einem Downsampling mit aggregierten Werten und Metadatenspalten, die die Anzahl der komprimierten Punkte und den Zeitraum angeben.

Plug-ins für Überwachung und Warnmeldungen

Monitor für Statusänderungen

  • Triggertypen: Geplant, Datenschreibvorgang

  • Anwendungsfälle: Statusüberwachung, Überwachung des Gerätestatus, Prozessüberwachung, Erkennung von Änderungen.

  • GitHub: Dokumentation zu Statusänderungen

So funktioniert es: Verfolgt Änderungen von Feldwerten im Laufe der Zeit und gibt Warnmeldungen aus, wenn die Anzahl der Änderungen die konfigurierten Schwellenwerte überschreitet. Kann sowohl Wertänderungen (unterschiedliche Werte) als auch bestimmte Wertbedingungen (entspricht einem Zielwert) erkennen. Beinhaltet Stabilitätsprüfungen, um Warnmeldungen aufgrund von Störsignalen zu verhindern.

Die wichtigsten Funktionen:

  • Zählbasierte Erkennung von Änderungen (z. B. fünf Änderungen in zehn Minuten).

  • Überwachung auf Dauer (z. B. Status = „Fehler“ für fünf Minuten).

  • Fenster zur Statusänderung zur Geräuschreduzierung.

  • Mehrfeldüberwachung mit unabhängigen Schwellenwerten.

  • Konfigurierbare Stabilitätsanforderungen.

Beispielverwendung:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

Ausgabe: Zu den Warnmeldungen gehören der Feldname, die Anzahl der erkannten Änderungen, das Zeitfenster und die relevanten Tag-Werte.

Sammler von Systemmetriken

  • Triggertypen: Geplant

  • Anwendungsfälle: Infrastrukturüberwachung, Leistungsbasislinien, Kapazitätsplanung, Ressourcenverfolgung.

  • GitHub: Dokumentation zu Systemmetriken

So funktioniert es: Verwendet die psutil-Bibliothek, um umfassende Systemmetriken von dem Host zu sammeln, auf dem InfluxDB ausgeführt wird. Sammelt CPU-, Speicher-, Festplatten- und Netzwerkstatistiken in konfigurierbaren Intervallen. Jeder Metriktyp kann unabhängig aktiviert/deaktiviert werden.

Die wichtigsten Funktionen:

  • CPU-Statistiken pro Kern mit Durchschnittsauslastung.

  • Speichernutzung einschließlich Swap- und Seitenfehlern.

  • I/O Festplattenmetriken mit berechneten IOPS und Latenz.

  • Netzwerkschnittstellen-Statistiken mit Fehlerverfolgung.

  • Konfigurierbare Erfassung von Metriken (Aktivieren/Deaktivieren bestimmter Typen).

  • Automatischer Wiederholungsversuch bei fehlgeschlagener Erfassung.

Beispielverwendung:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

Ausgabe: Erstellt mehrere Tabellen (system_cpu, system_memory, system_disk_io usw.) mit detaillierten Metriken für jedes Subsystem.

Gängige Konfigurationsmuster

Verwenden von TOML-Konfigurationsdateien

Verwenden Sie für komplexe Konfigurationen TOML-Dateien anstelle von Inline-Argumenten:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

Verkettung von Plugins

Erstellen Sie Datenverarbeitungspipelines, indem Sie mehrere Plugins verketten:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

Bewährte Methoden für Plugins

  • Fangen Sie konservativ an — Beginnen Sie mit höheren Schwellenwerten und längeren Zeitfenstern und passen Sie sie dann anhand der beobachteten Muster an.

  • Test in der Entwicklung — Verwenden Sie Testmodi und testen Sie Datenbanken vor der Produktionsbereitstellung.

  • Überwachen Sie die Plugin-Leistung — Überprüfen Sie die Ausführungszeiten und die Ressourcennutzung in Systemtabellen.

  • Verwenden Sie geeignete Triggertypen — Wählen Sie „Geplant“ für die Batch-Verarbeitung und das Schreiben von Daten für Echtzeit.

  • Benachrichtigungen mit Bedacht konfigurieren — Verwenden Sie Schweregrade und Debounce-Logik, um Warnungsmüdigkeit zu vermeiden.

  • Nutzen Sie die Modellpersistenz — Speichern Sie trainierte Modelle für ML-basierte Plug-ins aus Konsistenzgründen.

  • Dokumentkonfigurationen — Verwenden Sie aussagekräftige Triggernamen und pflegen Sie die Konfigurationsdokumentation.

Überwachen Sie die Ausführung des Plugins

Um die Leistung des Plugins zu überwachen:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

Beheben Sie häufig auftretende Probleme

Die folgende Tabelle zeigt häufig auftretende Probleme und mögliche Lösungen.

Problem Lösung
Das Plugin wird nicht ausgelöst Stellen Sie sicher, dass der Auslöser aktiviert ist, überprüfen Sie die Syntax schedule/spec
Fehlende Benachrichtigungen Bestätigen Sie, dass das Notifier-Plugin installiert ist, überprüfen Sie den Webhook URLs
Hoher Speicherverbrauch Reduzieren Sie die Fenstergrößen und passen Sie die Intervalle für die Stapelverarbeitung an
Falsche Transformationen Verwenden Sie den Testlaufmodus und überprüfen Sie Feldnamen und Datentypen
Ungenauigkeit der Forecast Vergrößern Sie das Trainingsdatenfenster und passen Sie die Saisoneinstellungen an
Zu viele Benachrichtigungen Erhöhen Sie die Anzahl der Trigger, fügen Sie die Dauer der Debounce hinzu, passen Sie die Schwellenwerte an

Diese zertifizierten Plug-ins bieten unternehmenstaugliche Funktionen für gängige Zeitreihendatenverarbeitungsanforderungen, sodass keine kundenspezifische Entwicklung erforderlich ist und gleichzeitig die Flexibilität durch umfassende Konfigurationsoptionen gewahrt bleibt. Im GitHubRepository finden Sie ausführliche Dokumentation, Beispiele und Updates.