View a markdown version of this page

Perpanjang Timestream untuk InfluxDB dengan plugin mesin pemrosesan - Amazon Timestream

Untuk kemampuan serupa dengan Amazon Timestream LiveAnalytics, pertimbangkan Amazon Timestream untuk InfluxDB. Ini menawarkan konsumsi data yang disederhanakan dan waktu respons kueri milidetik satu digit untuk analitik waktu nyata. Pelajari lebih lanjut di sini.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Perpanjang Timestream untuk InfluxDB dengan plugin mesin pemrosesan

Mesin Processing adalah mesin virtual Python tertanam yang berjalan di dalam database InfluxDB 3 Anda di Amazon Timestream. Ini tersedia dalam edisi Core dan Enterprise. Hal ini memungkinkan Anda untuk memperluas database Anda dengan kode Python kustom yang dapat mengotomatiskan alur kerja, mengubah data, dan membuat endpoint API kustom.

Mesin Processing mengeksekusi plugin Python sebagai respons terhadap peristiwa database tertentu:

  • Data menulis: Memproses dan mengubah data saat memasuki database

  • Acara terjadwal: Jalankan kode pada interval yang ditentukan atau waktu tertentu

  • Permintaan HTTP: Paparkan titik akhir API khusus yang mengeksekusi kode Anda

Mesin menyertakan cache dalam memori untuk mengelola status di antara eksekusi, memungkinkan Anda untuk membangun aplikasi stateful langsung dalam database Anda.

InfluxData plugin bersertifikat

Saat diluncurkan, InfluxDB 3 menyertakan satu set plugin pra-bangun dan dapat dikonfigurasi sepenuhnya yang disertifikasi oleh: InfluxData

  • Transformasi data: Memproses dan memperkaya data yang masuk

  • Peringatan: Kirim pemberitahuan berdasarkan ambang data

  • Agregasi: Hitung statistik pada data deret waktu

  • Pemantauan sistem: Lacak penggunaan sumber daya dan metrik kesehatan

  • Integrasi: Connect ke layanan eksternal dan APIs

Plugin bersertifikat ini siap digunakan dan dapat dikonfigurasi melalui argumen pemicu untuk memenuhi persyaratan spesifik Anda.

Jenis plugin dan spesifikasi pemicu

Jenis Plugin Spesifikasi Pemicu Saat Plugin Berjalan Kasus Penggunaan
Menulis data table:<TABLE_NAME>atau all_tables Ketika data ditulis ke tabel Transformasi data, peringatan, metrik turunan
Dijadwalkan every:<DURATION>atau cron:<EXPRESSION> Pada interval tertentu Agregasi berkala, laporan, pemeriksaan kesehatan
Permintaan HTTP request:<REQUEST_PATH> Ketika permintaan HTTP diterima Kustom APIs, webhook, antarmuka pengguna

Buat pemicu

Pemicu menghubungkan plugin ke peristiwa database dan menentukan kapan mereka mengeksekusi. Gunakan perintah influxdb3 create trigger.

Untuk membuat pemicu penulisan data:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

Untuk membuat pemicu terjadwal:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

Untuk membuat pemicu permintaan HTTP:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

Akses titik akhir di: https://your-cluster-endpoint:8086/api/v3/engine/webhook

Konfigurasikan pemicu

Melewati argumen ke plugin

Konfigurasikan perilaku plugin menggunakan argumen pemicu:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

Argumen diteruskan ke plugin sebagai kamus:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

Perilaku penanganan kesalahan

Konfigurasikan cara pemicu menangani kesalahan:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

Eksekusi asinkron

Izinkan beberapa instance pemicu berjalan secara bersamaan:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

Kelola pemicu

Untuk melihat pemicu database:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

Pengecualian tabel untuk pemicu penulisan

Untuk memfilter tabel dalam kode plugin Anda saat menggunakanall_tables:

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

Implementasi plugin adalah sebagai berikut:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

Pertimbangan penyebaran terdistribusi

Dalam penerapan multi-node, konfigurasikan plugin berdasarkan peran node:

Jenis Plugin Jenis Simpul Alasan
Plugin tulis data Node ingester Memproses data pada titik konsumsi
Plugin permintaan HTTP Node kueri Menangani lalu lintas API
Plugin terjadwal Setiap node yang dikonfigurasi Dapat berjalan di node apa pun dengan penjadwal

Pertimbangan berikut penting untuk penerapan perusahaan:

  • Pertahankan konfigurasi plugin yang identik di semua node yang relevan.

  • Rutekan klien eksternal (Grafana, dasbor) ke node kueri.

  • Pastikan plugin tersedia di node tempat pemicunya dijalankan.

Praktik terbaik

  • Konfigurasi plugin

    • Gunakan argumen pemicu untuk nilai yang dapat dikonfigurasi alih-alih hardcoding.

    • Menerapkan penanganan kesalahan yang tepat dalam plugin.

    • Gunakan influxdb3_local API untuk operasi database.

  • Optimalisasi kinerja

    • Gunakan eksekusi asinkron untuk tugas pemrosesan berat.

    • Menerapkan pengembalian awal untuk data yang difilter.

    • Minimalkan kueri database dalam plugin.

  • Manajemen kesalahan

    • Pilih perilaku kesalahan yang sesuai (log, coba lagi, atau nonaktifkan).

    • Pantau eksekusi plugin melalui tabel sistem.

    • Uji plugin secara menyeluruh sebelum penerapan produksi.

  • Pertimbangan keamanan

    • Validasi semua data input di plugin permintaan HTTP.

    • Gunakan metode aman untuk menyimpan konfigurasi sensitif.

    • Batasi izin plugin hanya untuk operasi yang diperlukan.

Pantau eksekusi plugin

Tabel sistem kueri untuk memantau kinerja plugin:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

Mesin Processing menyediakan cara yang ampuh untuk memperluas fungsionalitas InfluxDB 3 sambil menjaga logika pemrosesan data Anda dekat dengan data Anda, mengurangi latensi dan menyederhanakan arsitektur Anda.

InfluxData plugin bersertifikat

Amazon TimeStream untuk InfluxDB 3 mencakup satu set lengkap plugin bersertifikat pra-bangun yang memperluas fungsionalitas basis data tanpa memerlukan pengembangan khusus. Plugin ini sepenuhnya dapat dikonfigurasi dan siap digunakan saat peluncuran, menyediakan kemampuan canggih untuk pemrosesan data, pemantauan, dan peringatan.

Untuk dokumentasi lengkap dan kode sumber, kunjungi InfluxDataPlugins Repository.

Plugin yang tersedia

Timestream untuk plugin LiveAnalytics migrasi

Migrasi database dari Timestream ke Timestream LiveAnalytics untuk InfluxDB

Cara kerjanya: Plugin Timestream untuk LiveAnalytics migrasi bekerja bersamaan dengan klien Timestream untuk LiveAnalytics migrasi. Klien mengeksekusi perintah Timestream for LiveAnalytics UNLOAD untuk mengekspor LiveAnalytics database ke bucket S3 dalam format Parquet. Setelah data diekspor, klien menghasilkan presigned URLs untuk file Parquet, dan memanggil plugin migrasi dengan presigned. URLs Selama eksekusi plugin, objek S3 diambil dari bucket S3 dan diubah menjadi protokol baris InfluxDB, dan ditulis ke database InfluxDB 3.

Praktik terbaik: Plugin Timestream untuk LiveAnalytics migrasi harus dijalankan pada satu node InfluxDB 3 Enterprise. Pastikan titik akhir InfluxDB 3 yang digunakan dengan klien plugin adalah titik akhir node proses daripada titik akhir cluster. Cluster yang melakukan migrasi tidak boleh melakukan konsumsi atau kueri saat plugin migrasi berjalan, karena ini dapat menyebabkan kesalahan memori.

Kinerja migrasi tergantung pada sumber daya yang tersedia untuk node InfluxDB 3 dan karakteristik data yang dimigrasikan. Dalam pengujian kami, kami mengamati throughput 30.000.000 LiveAnalytics catatan yang dimigrasi per jam. Kinerja aktual Anda dapat bervariasi berdasarkan beberapa faktor.

Pemetaan data: Tabel berikut menunjukkan bagaimana Timestream untuk LiveAnalytics data dipetakan ke data protokol baris.

Timestream untuk Konsep LiveAnalytics Konsep Protokol Garis
Tabel Pengukuran
Dimensi Tanda
Ukur nama Tag
Tindakan Bidang
Waktu Stempel waktu

Transformasi rekaman ukuran tunggal: Berikut ini adalah catatan ukuran tunggal dalam Timestream untuk LiveAnalytics dalam tabel: example_table

host region request_id ukuran_nama Waktu ukuran_nilai: :ganda
host1 us-west-2 saio3242ovnfk cpu_usage 2025-04-17 16:42:54.702 394001 0,66

Catatan ini akan diubah menjadi:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001

Transformasi catatan multi-ukuran: Berikut ini adalah catatan multi-ukuran di Timestream untuk LiveAnalytics dalam tabel example_table dengan segala sesuatu di sebelah kanan menjadi ukurantime:

host region request_id ukuran_nama Waktu cpu_usage memory_usage
host1 us-west-2 saio3242ovnfk metrik 2025-04-17 16:42:54.702 394001 0,66 0,21

Catatan ini akan diubah menjadi:

example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
penting

URL yang telah ditetapkan sebelumnya yang digunakan plugin untuk mengambil LiveAnalytics data di S3 kedaluwarsa saat kedaluwarsa yang disetel terjadi atau kredensyal IAM yang digunakan untuk membuatnya kedaluwarsa (maksimum 7 hari). Sebaiknya jalankan klien migrasi pada instans EC2 (instance sudah cukup) karena t3.medium instans EC2 memutar kredensyal IAM secara otomatis, menghapus batasan waktu URL yang telah ditetapkan sebelumnya selama migrasi. Jika Anda tidak menggunakan instans EC2, migrasi dapat dilanjutkan dan kumpulan data besar mungkin memerlukan beberapa pemanggilan resume.

Plugin Timestream untuk LiveAnalytics migrasi direkomendasikan untuk migrasi dengan di bawah 1 miliar catatan atau 125GB dalam satu database. LiveAnalytics

Plugin migrasi seharusnya hanya digunakan pada satu node proses di cluster. Anda dapat menentukan node proses dengan menggunakan list-db-instances-for-cluster dan menyetel INFLUXDB3_HOST_URL ke titik akhir dari salah satu instance database yang memiliki jenis mode instancePROCESS, atau Anda dapat menggunakan konsol Timestream dan memilih cluster Anda untuk menemukan node proses.

Fitur utama:

  • Mengekspor data deret waktu dari Timestream LiveAnalytics ke bucket S3 menggunakan perintah UNLOAD.

  • Menghasilkan presigned URLs untuk setiap objek S3 yang dimigrasikan.

  • Melacak proses migrasi untuk setiap objek S3.

  • Membersihkan objek S3 setelah migrasi berhasil.

  • Mendukung melanjutkan migrasi yang gagal jika terjadi kedaluwarsa URL yang telah ditetapkan sebelumnya.

Contoh penggunaan:

# Migrate a LiveAnalytics database to InfluxDB 3 export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>" export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>" export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>" aws s3api create-bucket --bucket <your S3 bucket name> \ --object-lock-enabled-for-bucket --region <your region> \ --create-bucket-configuration LocationConstraint=<your region>
catatan

Perbarui kebijakan bucket S3 dengan contoh kebijakan bucket di README. Untuk informasi lebih lanjut, lihat Prasyarat.

python3 liveanalytics_influxdb3_migration_client.py \ --live-analytics-database-name <your LiveAnalytics database name> \ --s3-bucket-name <your S3 bucket name>

Output: Timestream untuk LiveAnalytics database diubah menjadi protokol baris dan dicerna ke database InfluxDB 3.

Plugin deteksi anomali

Deteksi anomali berbasis MAD

  • Jenis pemicu: Penulisan data (waktu nyata)

  • Kasus penggunaan: Deteksi outlier waktu nyata untuk streaming data, pemantauan sensor, kontrol kualitas.

  • GitHub: Dokumentasi Deteksi Anomali MAD

Cara kerjanya: Menggunakan Median Absolute Deviation (MAD) untuk menetapkan dasar yang kuat untuk perilaku normal. Saat data baru tiba, ia menghitung berapa banyak MADs dari median setiap titik. Poin yang melebihi ambang batas (k* MAD) ditandai sebagai anomali.

Fitur utama:

  • Pemrosesan waktu nyata saat data ditulis.

  • Mempertahankan jendela geser dalam memori untuk efisiensi.

  • Peringatan berbasis hitungan (misalnya, 5 anomali berturut-turut).

  • Peringatan berbasis durasi (misalnya, anomali selama 2 menit).

  • Penindasan balik untuk mencegah kelelahan waspada dari nilai yang berubah dengan cepat.

Contoh penggunaan:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

Keluaran: Mengirim notifikasi real-time saat anomali terdeteksi, termasuk nama bidang, nilai, dan durasi.

Plugin transformasi data

Transformasi dasar

  • Jenis pemicu: Dijadwalkan, Penulisan data

  • Kasus penggunaan: Standardisasi data, konversi unit, normalisasi nama bidang, pembersihan data.

  • GitHub: Dokumentasi Transformasi Dasar

Cara kerjanya: Menerapkan rantai transformasi ke nama dan nilai bidang. Dapat memproses data historis dalam batch (terjadwal) atau mengubah data saat tiba (data write). Transformasi diterapkan dalam urutan yang ditentukan, memungkinkan jaringan data yang kompleks.

Fitur utama:

  • Transformasi nama bidang: snake_case, hapus spasi, hanya alfanumerik.

  • Konversi satuan: Suhu, tekanan, panjang, satuan waktu.

  • Penggantian string kustom dengan dukungan regex.

  • Mode dry-run untuk pengujian tanpa menulis data.

  • Pemrosesan batch untuk data historis.

Contoh penggunaan:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

Output: Membuat tabel baru dengan data yang diubah, melestarikan stempel waktu dan tag asli.

Downsampler

  • Jenis pemicu: Terjadwal, HTTP

  • Kasus penggunaan: Pengurangan data, pengoptimalan penyimpanan jangka panjang, membuat statistik ringkasan, peningkatan kinerja.

  • GitHub: Dokumentasi Downsampler

Cara kerjanya: Menggabungkan data deret waktu resolusi tinggi ke dalam ringkasan resolusi lebih rendah. Misalnya, mengubah data 1 detik menjadi rata-rata 1 jam. Setiap titik downsampled mencakup metadata tentang jumlah titik asli yang dikompresi dan rentang waktu yang dicakup.

Fitur utama:

  • Beberapa fungsi agregasi: rata-rata, jumlah, min, maks, median, turunan.

  • Agregasi khusus bidang (fungsi berbeda untuk bidang yang berbeda).

  • Pelacakan metadata (record_count, time_from, time_to).

  • HTTP API untuk downsampling sesuai permintaan dengan backfill.

  • Ukuran batch yang dapat dikonfigurasi untuk kumpulan data besar.

Contoh penggunaan:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

Output: Membuat data downsampled dengan nilai agregat ditambah kolom metadata yang menunjukkan jumlah titik yang dikompresi dan rentang waktu.

Memantau dan mengingatkan plugin

Monitor perubahan negara

  • Jenis pemicu: Dijadwalkan, Penulisan data

  • Kasus penggunaan: Pemantauan status, pelacakan status peralatan, pemantauan proses, deteksi perubahan.

  • GitHub: Dokumentasi Perubahan Negara

Cara kerjanya: Melacak perubahan nilai bidang dari waktu ke waktu dan memberi peringatan ketika jumlah perubahan melebihi ambang batas yang dikonfigurasi. Dapat mendeteksi perubahan nilai (nilai yang berbeda) dan kondisi nilai tertentu (sama dengan nilai target). Termasuk pemeriksaan stabilitas untuk mencegah peringatan dari sinyal bising.

Fitur utama:

  • Deteksi perubahan berbasis hitungan (misalnya, lima perubahan dalam sepuluh menit).

  • Pemantauan berbasis durasi (misalnya, status = “kesalahan” selama lima menit).

  • Jendela perubahan status untuk pengurangan kebisingan.

  • Pemantauan multi-bidang dengan ambang batas independen.

  • Persyaratan stabilitas yang dapat dikonfigurasi.

Contoh penggunaan:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

Keluaran: Peringatan mencakup nama bidang, jumlah perubahan yang terdeteksi, jendela waktu, dan nilai tag yang relevan.

Kolektor metrik sistem

  • Jenis pemicu: Dijadwalkan

  • Kasus penggunaan: Pemantauan infrastruktur, garis dasar kinerja, perencanaan kapasitas, pelacakan sumber daya.

  • GitHub: Dokumentasi Metrik Sistem

Cara kerjanya: Menggunakan pustaka psutil untuk mengumpulkan metrik sistem komprehensif dari host yang menjalankan InfluxDB. Mengumpulkan CPU, memori, disk, dan statistik jaringan pada interval yang dapat dikonfigurasi. Setiap jenis metrik dapat diaktifkan/dinonaktifkan secara independen.

Fitur utama:

  • Statistik CPU per inti dengan rata-rata beban.

  • Penggunaan memori termasuk swap dan kesalahan halaman.

  • I/O Metrik disk dengan IOPS dan latensi yang dihitung.

  • Statistik antarmuka jaringan dengan pelacakan kesalahan.

  • Koleksi metrik yang dapat dikonfigurasi (mengaktifkan/menonaktifkan jenis tertentu).

  • Coba lagi otomatis pada kegagalan pengumpulan.

Contoh penggunaan:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

Output: Membuat beberapa tabel (system_cpu, system_memory, system_disk_io, dll.) Dengan metrik rinci untuk setiap subsistem.

Pola konfigurasi umum

Menggunakan file konfigurasi TOLL

Untuk konfigurasi yang kompleks, gunakan file TOMM alih-alih argumen sebaris:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

Plugin rantai

Buat pipeline pemrosesan data dengan merantai beberapa plugin:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

Praktik terbaik untuk plugin

  • Mulai konservatif — Mulailah dengan ambang batas yang lebih tinggi dan jendela yang lebih panjang, lalu sesuaikan berdasarkan pola yang diamati.

  • Uji dalam pengembangan - Gunakan mode dry-run dan uji database sebelum penerapan produksi.

  • Memantau kinerja plugin - Periksa waktu eksekusi dan penggunaan sumber daya dalam tabel sistem.

  • Gunakan jenis pemicu yang sesuai — Pilih terjadwal untuk pemrosesan batch, tulis data secara real-time.

  • Konfigurasikan notifikasi dengan bijak — Gunakan tingkat keparahan dan logika debounce untuk mencegah kelelahan waspada.

  • Leverage model persistence — Untuk plugin berbasis ML, simpan model terlatih untuk konsistensi.

  • Konfigurasi dokumen — Gunakan nama pemicu deskriptif dan pertahankan dokumentasi konfigurasi.

Pantau eksekusi plugin

Untuk memantau kinerja plugin:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

Memecahkan masalah umum

Tabel berikut menunjukkan masalah umum dan solusi yang mungkin.

Masalah Solusi
Plugin tidak memicu Verifikasi pemicu diaktifkan, periksa schedule/spec sintaks
Notifikasi hilang Konfirmasikan plugin Notifier diinstal, periksa webhook URLs
Penggunaan memori tinggi Kurangi ukuran jendela, sesuaikan interval pemrosesan batch
Transformasi yang salah Gunakan mode dry-run, verifikasi nama bidang dan tipe data
Ketidakakuratan Forecast Tingkatkan jendela data pelatihan, sesuaikan pengaturan musiman
Terlalu banyak peringatan Tingkatkan jumlah pemicu, tambahkan durasi debounce, sesuaikan ambang batas

Plugin bersertifikat ini menyediakan fungsionalitas siap perusahaan untuk kebutuhan pemrosesan data deret waktu umum, menghilangkan kebutuhan untuk pengembangan khusus sambil mempertahankan fleksibilitas melalui opsi konfigurasi yang komprehensif. Kunjungi GitHubrepositori untuk dokumentasi terperinci, contoh, dan pembaruan.