Amazon Timestream for LiveAnalytics와 유사한 기능을 원하는 경우 Amazon Timestream for InfluxDB를 고려해 보세요. 간소화된 데이터 수집과 실시간 분석을 위한 10밀리초 미만의 쿼리 응답 시간을 제공합니다. 여기에서 자세히 알아보세요.
기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
처리 엔진 플러그인으로 Timestream for InfluxDB 확장
처리 엔진은 Amazon Timestream의 InfluxDB 3 데이터베이스 내에서 실행되는 임베디드 Python 가상 머신입니다. Core edition과 Enterprise edition 모두에서 사용할 수 있습니다. 이를 통해 워크플로를 자동화하고, 데이터를 변환하고, 사용자 지정 API 엔드포인트를 생성할 수 있는 사용자 지정 Python 코드로 데이터베이스를 확장할 수 있습니다.
처리 엔진은 특정 데이터베이스 이벤트에 대한 응답으로 Python 플러그인을 실행합니다.
-
데이터 쓰기: 데이터베이스에 입력되는 데이터 처리 및 변환
-
예약된 이벤트: 정의된 간격 또는 특정 시간에 코드 실행
-
HTTP 요청: 코드를 실행하는 사용자 지정 API 엔드포인트 노출
엔진에는 실행 간 상태를 관리하기 위한 인 메모리 캐시가 포함되어 있으므로 데이터베이스 내에서 직접 상태 저장 애플리케이션을 구축할 수 있습니다.
InfluxData 인증 플러그인
시작 시 InfluxDB 3에는 InfluxData에서 인증한 사전 구축되고 완전히 구성 가능한 플러그인 세트가 포함되어 있습니다.
-
데이터 변환: 수신 데이터 처리 및 보강
-
알림: 데이터 임곗값을 기반으로 알림 전송
-
집계: 시계열 데이터에 대한 통계 계산
-
시스템 모니터링: 리소스 사용량 및 상태 지표 추적
-
통합: 외부 서비스 및 API에 연결
이러한 인증된 플러그인은 바로 사용할 수 있으며 특정 요구 사항을 충족하도록 트리거 인수를 통해 구성할 수 있습니다.
플러그인 유형 및 트리거 사양
| 플러그인 유형 | 트리거 사양 | 플러그인 실행 시점 | 사용 사례 |
|---|---|---|---|
| 데이터 쓰기 | table:<TABLE_NAME> 또는 all_tables |
데이터가 테이블에 작성될 때 | 데이터 변환, 알림, 파생 지표 |
| 예약됨 | every:<DURATION> 또는 cron:<EXPRESSION> |
지정된 간격으로 | 주기적 집계, 보고서, 상태 확인 |
| HTTP 요청 |
request:<REQUEST_PATH>
|
HTTP 요청이 수신될 때 | 사용자 지정 API, 웹후크, 사용자 인터페이스 |
트리거 생성
트리거는 플러그인을 데이터베이스 이벤트에 연결하고 실행 시점을 정의합니다. influxdb3 create trigger 명령을 사용합니다.
데이터 쓰기 트리거를 생성하려면 다음을 수행하세요.
# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor
예약된 트리거를 생성하려면 다음을 수행하세요.
# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report
HTTP 요청 트리거를 생성하려면 다음을 수행하세요.
# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor
https://your-cluster-endpoint:8086/api/v3/engine/webhook에서 엔드포인트에 액세스합니다.
트리거 구성
플러그인에 인수 전달
트리거 인수를 사용하여 플러그인 동작을 구성합니다.
influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor
인수는 플러그인에 사전으로 전달됩니다.
def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic
오류 처리 동작
트리거가 오류를 처리하는 방법을 구성합니다.
# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor
비동기식 실행
여러 트리거 인스턴스가 동시에 실행되도록 허용합니다.
influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor
트리거 관리
데이터베이스의 트리거를 보려면 다음을 수행하세요.
# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN
쓰기 트리거에 대한 테이블 제외
all_tables 사용 시 플러그인 코드 내에서 테이블을 필터링하려면 다음을 수행하세요.
influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor
플러그인 구현은 다음과 같습니다.
def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables
분산 배포 고려 사항
다중 노드 배포에서 노드 역할을 기반으로 플러그인을 구성합니다.
| 플러그인 유형 | 노드 유형(Node Type) | 이유 |
|---|---|---|
| 데이터 쓰기 플러그인 | 수집기 노드 | 수집 지점에서 데이터 처리 |
| HTTP 요청 플러그인 | 쿼리 수행자 노드 | API 트래픽 처리 |
| 예약된 플러그인 | 구성된 모든 노드 | 스케줄러가 있는 모든 노드에서 실행 가능 |
엔터프라이즈 배포에는 다음 고려 사항이 중요합니다.
-
모든 관련 노드에서 동일한 플러그인 구성을 유지합니다.
-
외부 클라이언트(Grafana, 대시보드)를 쿼리 수행자 노드로 라우팅합니다.
-
트리거가 실행되는 노드에서 플러그인을 사용할 수 있는지 확인합니다.
모범 사례
-
플러그인 구성
-
하드 코딩 대신 구성 가능한 값에 트리거 인수를 사용합니다.
-
플러그인 내에서 적절한 오류 처리를 구현합니다.
-
데이터베이스 작업에
influxdb3_localAPI를 사용합니다.
-
-
성능 최적화
-
처리량이 많은 태스크에는 비동기식 실행을 사용합니다.
-
필터링된 데이터에 대한 조기 반환을 구현합니다.
-
플러그인 내에서 데이터베이스 쿼리를 최소화합니다.
-
-
오류 관리
-
적절한 오류 동작(로그, 재시도 또는 비활성화)을 선택합니다.
-
시스템 테이블을 통해 플러그인 실행을 모니터링합니다.
-
프로덕션 배포 전에 플러그인을 철저히 테스트합니다.
-
-
보안 고려 사항
-
HTTP 요청 플러그인의 모든 입력 데이터를 검증합니다.
-
민감한 구성 정보를 저장할 때는 안전한 방법을 사용합니다.
-
플러그인 권한을 필수 작업으로만 제한합니다.
-
플러그인 실행 모니터링
시스템 테이블을 쿼리하여 플러그인 성능을 모니터링합니다.
-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'
처리 엔진은 데이터 처리 로직을 데이터에 가깝게 유지하면서 InfluxDB 3 기능을 확장하고 지연 시간을 줄이고 아키텍처를 간소화하는 강력한 방법을 제공합니다.
InfluxData 인증 플러그인
Amazon Timestream for InfluxDB 3에는 사용자 지정 개발 없이 데이터베이스 기능을 확장하는 사전 구축되고 인증된 포괄적인 플러그인 세트가 포함되어 있습니다. 이러한 플러그인은 완전히 구성 가능하며 시작 시 사용할 준비가 되어 있어 데이터 처리, 모니터링 및 알림을 위한 고급 기능을 제공합니다.
전체 설명서와 소스 코드는 InfluxData 플러그인 리포지토리
사용 가능한 플러그인
Timestream for LiveAnalytics 마이그레이션 플러그인
LiveAnalytics용 Timestream에서 InfluxDB용 Timestream으로 데이터베이스 마이그레이션
-
트리거 유형: HTTP
-
사용 사례: Timestream for LiveAnalytics에서 Timestream for InfluxDB로 시계열 데이터베이스 마이그레이션.
작동 방식: Timestream for LiveAnalytics 마이그레이션 플러그인은 Timestream for LiveAnalytics 마이그레이션 클라이언트
모범 사례: Timestream for LiveAnalytics 마이그레이션 플러그인은 단일 InfluxDB 3 Enterprise 노드에서 실행해야 합니다. 플러그인 클라이언트와 함께 사용되는 InfluxDB 3 엔드포인트가 클러스터 엔드포인트가 아닌 프로세스 노드 엔드포인트인지 확인합니다. 마이그레이션을 수행하는 클러스터는 마이그레이션 플러그인이 실행되는 동안 수집 또는 쿼리를 수행하지 않아야 합니다. 메모리 부족 오류가 발생할 수 있습니다.
마이그레이션 성능은 InfluxDB 3 노드에서 사용할 수 있는 리소스와 마이그레이션 중인 데이터의 특성에 따라 달라집니다. 테스트에서 시간당 마이그레이션된 LiveAnalytics 레코드의 처리량은 30,000,000개로 나타났습니다. 실제 성능은 여러 요인에 따라 달라질 수 있습니다.
데이터 매핑: 다음 표는 Timestream for LiveAnalytics 데이터가 라인 프로토콜 데이터에 매핑되는 방법을 보여줍니다.
단일 측정 레코드 변환: 다음은 표의 Timestream for LiveAnalytics의 단일 측정 레코드입니다example_table.
| host | 리전 | request_id | measure_name | 시간 | measure_value::double |
|---|---|---|---|---|---|
| 호스트1 | us-west-2 | saio3242ovnfk | cpu_usage | 2025-04-17 16:42:54.702394001 | 0.66 |
이 레코드는 다음으로 변환됩니다.
example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=cpu_usage measure_value::double=0.66 1744908174702394001
다중 측정값 레코드 변환: 다음은 표의 Timestream for LiveAnalytics의 다중 측정값 레코드example_table로, 측정값 오른쪽에 모든 time 것이 있습니다.
| host | 리전 | request_id | measure_name | 시간 | cpu_usage | memory_usage |
|---|---|---|---|---|---|---|
| 호스트1 | us-west-2 | saio3242ovnfk | 지표 | 2025-04-17 16:42:54.702394001 | 0.66 | 0.21 |
이 레코드는 다음으로 변환됩니다.
example_table,host=host1,region=us-west-2,request_id=saio3242ovnfk,measure_name=metrics cpu_usage=0.66,memory_usage=0.21 1744908174702394001
중요
플러그인이 S3에서 LiveAnalytics 데이터를 검색하는 데 사용하는 미리 서명된 URLs은 설정된 만료가 발생하거나 이를 생성하는 데 사용되는 IAM 자격 증명이 만료되면 만료됩니다(최대 7일). EC2 인스턴스는 IAM 자격 증명을 자동으로 교체하여 마이그레이션 중에 미리 서명된 URL 시간 제약 조건을 제거하므로 EC2 인스턴스(t3.medium인스턴스면 충분)에서 마이그레이션 클라이언트를 실행하는 것이 좋습니다. EC2 인스턴스를 사용하지 않는 경우 마이그레이션을 재개할 수 있으며 대규모 데이터 세트에는 여러 재개 호출이 필요할 수 있습니다.
Timestream for LiveAnalytics 마이그레이션 플러그인은 단일 LiveAnalytics 데이터베이스 내에서 레코드가 10억 개 미만이거나 125GB 미만인 마이그레이션에 권장됩니다.
마이그레이션 플러그인은 클러스터의 단일 프로세스 노드에서만 사용해야 합니다. list-db-instances-for-cluster를 사용하고를 인스턴스 모드가 유형인 데이터베이스 인스턴스 중 하나의 INFLUXDB3_HOST_URL 엔드포인트로 설정하여 프로세스 노드를 확인하거나 Timestream 콘솔을 사용하여 클러스터를 선택하여 프로세스 노드를 찾을 PROCESS수 있습니다.
주요 기능:
-
UNLOAD 명령을 사용하여 Timestream for LiveAnalytics의 시계열 데이터를 S3 버킷으로 내보냅니다.
-
마이그레이션 중인 각 S3 객체에 대해 미리 서명된 URLs을 생성합니다.
-
각 S3 객체에 대한 마이그레이션 프로세스를 추적합니다.
-
마이그레이션 성공 후 S3 객체를 정리합니다.
-
미리 서명된 URL 만료 시 실패한 마이그레이션 재개를 지원합니다.
사용 예:
# Migrate a LiveAnalytics database to InfluxDB 3 export INFLUXDB3_HOST_URL="https://<your InfluxDB 3 URL>:<your InfluxDB 3 port>" export INFLUXDB3_AUTH_TOKEN="<your InfluxDB 3 token>" export INFLUXDB3_DATABASE_NAME="<your InfluxDB 3 target database>" aws s3api create-bucket --bucket <your S3 bucket name> \ --object-lock-enabled-for-bucket --region <your region> \ --create-bucket-configuration LocationConstraint=<your region>
참고
README의 예제 버킷 정책으로 S3 버킷 정책을 업데이트합니다. 자세한 내용은 사전 조건
python3 liveanalytics_influxdb3_migration_client.py \ --live-analytics-database-name <your LiveAnalytics database name> \ --s3-bucket-name <your S3 bucket name>
출력: Timestream for LiveAnalytics 데이터베이스는 라인 프로토콜로 변환되고 InfluxDB 3 데이터베이스로 수집됩니다.
이상 탐지 플러그인
MAD 기반 이상 탐지
-
트리거 유형: 데이터 쓰기(실시간)
-
사용 사례: 스트리밍 데이터에 대한 실시간 이상치 탐지, 센서 모니터링, 품질 제어
-
GitHub: MAD Anomaly Detection 설명서
작동 방식: 중앙값 절대 편차(MAD)를 사용하여 정상 동작에 대한 강력한 기준을 설정합니다. 새 데이터가 도착하면 각 포인트의 중앙값에서 몇 개의 MAD가 떨어져 있는지 계산합니다. 임곗값(k * MAD)을 초과하는 포인트는 이상으로 플래그가 지정됩니다.
주요 기능:
-
데이터가 작성될 때 실시간 처리
-
효율성을 위해 인 메모리 슬라이딩 윈도우 유지
-
개수 기반 알림(예: 5개 연속 이상 발생)
-
기간 기반 알림(예: 2분 동안 이상 발생)
-
값이 급격히 변할 때 발생하는 알림 피로를 방지하기 위한 플립 억제
사용 예:
# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly
출력: 이상이 탐지되면 필드 이름, 값 및 지속 시간을 포함한 실시간 알림을 보냅니다.
데이터 변환 플러그인
Basic Transformation
-
트리거 유형: 예약됨, 데이터 쓰기
-
사용 사례: 데이터 표준화, 단위 변환, 필드 이름 정규화, 데이터 정리
-
GitHub: Basic Transformation 설명서
작동 방식: 필드 이름과 값에 변환 체인을 적용합니다. 기록 데이터를 배치 처리(예약)하거나 데이터가 도착하는 즉시 변환(데이터 쓰기)할 수 있습니다. 변환은 지정된 순서대로 적용되므로 복잡한 데이터 파이프라인이 허용됩니다.
주요 기능:
-
필드 이름 변환: snake_case, 공백 제거, 영숫자만
-
단위 변환: 온도, 압력, 길이, 시간 단위
-
정규식 지원이 포함된 사용자 지정 문자열 대체
-
데이터를 쓰지 않고 테스트하기 위한 드라이 런 모드
-
기록 데이터에 대한 배치 처리
사용 예:
# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner
출력: 변환된 데이터를 사용하여 새 테이블을 생성하고, 원래 타임스탬프와 태그는 유지합니다.
다운샘플러
-
트리거 유형: 예약됨, HTTP
-
사용 사례: 데이터 절감, 장기 스토리지 최적화, 요약 통계 생성, 성능 개선
-
GitHub: Downsampler 설명서
작동 방식: 고해상도 시계열 데이터를 저해상도 요약으로 집계합니다. 예를 들어, 1초 데이터를 평균 1시간으로 변환합니다. 다운샘플링된 각 포인트에는 압축된 원래 포인트 수와 해당하는 시간 범위에 대한 메타데이터가 포함됩니다.
주요 기능:
-
여러 집계 함수: avg, sum, min, max, median, derivative
-
필드별 집계(필드마다 다른 함수).
-
메타데이터 추적(record_count, time_from, time_to)
-
백필을 포함한 온디맨드 다운샘플링을 위한 HTTP API
-
대규모 데이터세트를 위한 배치 크기 설정 가능
사용 예:
# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'
출력: 집계된 값과 압축된 포인트 수 및 시간 범위를 보여주는 메타데이터 열을 사용하여 다운샘플링된 데이터를 생성합니다.
모니터링 및 알림 플러그인
상태 변경 모니터링
-
트리거 유형: 예약됨, 데이터 쓰기
-
사용 사례: 상태 모니터링, 장비 상태 추적, 프로세스 모니터링, 변경 탐지
-
GitHub:State Change 설명서
작동 방식: 시간 경과에 따른 필드 값 변경을 추적하고 변경 횟수가 구성된 임곗값을 초과하면 알립니다. 값의 변화(다른 값)와 특정 값 조건(목표 값과 같음)을 모두 탐지할 수 있습니다. 노이즈 신호로 인한 알림을 방지하기 위한 안정성 검사를 포함합니다.
주요 기능:
-
개수 기반 변경 탐지(예: 10분 동안 5회 변경)
-
기간 기반 모니터링(예: 5분 동안 상태 = '오류')
-
노이즈 감소를 위한 상태 변경 기간
-
독립 임곗값을 사용한 다중 필드 모니터링
-
구성 가능한 안정성 요구 사항
사용 예:
# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor
출력: 알림에는 필드 이름, 탐지된 변경 사항 수, 기간 및 관련 태그 값이 포함됩니다.
시스템 지표 수집기
-
트리거 유형: 예약됨
-
사용 사례: 인프라 모니터링, 성능 기준, 용량 계획, 리소스 추적.
-
GitHub: System Metrics 설명서
작동 방식: psutil 라이브러리를 사용하여 InfluxDB를 실행하는 호스트에서 포괄적인 시스템 지표를 수집합니다. 구성 가능한 간격으로 CPU, 메모리, 디스크 및 네트워크 통계를 수집합니다. 각 지표 유형은 독립적으로 활성화/비활성화할 수 있습니다.
주요 기능:
-
로드 평균이 포함된 코어당 CPU 통계
-
스왑 및 페이지 결함을 포함한 메모리 사용량
-
계산된 IOPS 및 지연 시간이 포함된 디스크 I/O 지표
-
오류 추적이 포함된 네트워크 인터페이스 통계
-
지표 수집 구성(특정 유형 활성화/비활성화)
-
수집 실패 시 자동 재시도
사용 예:
# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics
출력: 각 하위 시스템에 대한 자세한 지표를 포함하는 여러 테이블(system_cpu, system_memory, system_disk_io 등)을 생성합니다.
일반적인 구성 패턴
TOML 구성 파일 사용
복잡한 구성의 경우 인라인 인수 대신 TOML 파일을 사용합니다.
# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector
플러그인 연결
여러 플러그인을 연결하여 데이터 처리 파이프라인을 생성합니다.
# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly
플러그인 모범 사례
-
보수적으로 시작 - 더 높은 임곗값과 더 긴 기간으로 시작한 다음 관찰된 패턴에 따라 조정합니다.
-
개발 중인 테스트 - 프로덕션 배포 전에 모의 실행 모드 및 테스트 데이터베이스를 사용합니다.
-
플러그인 성능 모니터링 - 시스템 테이블에서 실행 시간 및 리소스 사용량을 확인합니다.
-
적절한 트리거 유형 사용 - 배치 처리 예약, 실시간 데이터 쓰기를 선택합니다.
-
현명하게 알림 구성 - 심각도 수준 및 디바운스 로직을 사용하여 알림 피로를 방지합니다.
-
모델 지속성 활용 - ML 기반 플러그인의 경우 일관성을 위해 훈련된 모델을 저장합니다.
-
문서 구성 - 설명이 포함된 트리거 이름을 사용하고 구성 설명서를 유지 관리합니다.
플러그인 실행 모니터링
플러그인 성능을 모니터링하려면 다음을 수행하세요.
-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';
일반적인 문제 해결
다음 표에는 일반적인 문제와 가능한 해결 방법이 나와 있습니다.
| 문제 | 솔루션 |
|---|---|
| 플러그인이 트리거되지 않음 | 트리거가 활성화되었는지 확인, 일정/사양 구문 확인 |
| 알림 누락 | 설치된 알리미 플러그인 확인, 웹후크 URL 확인 |
| 높은 메모리 사용량 | 윈도우 크기 축소, 배치 처리 간격 조정 |
| 잘못된 변환 | 드라이 런 모드 사용, 필드 이름 및 데이터 유형 확인 |
| 예측 부정확성 | 훈련 데이터 기간 증가, 계절성 설정 조정 |
| 알림이 너무 많음 | 트리거 수 증가, 디바운스 기간 추가, 임곗값 조정 |
이러한 인증된 플러그인은 일반적인 시계열 데이터 처리 요구 사항에 맞는 엔터프라이즈 지원 기능을 제공하므로 포괄적인 구성 옵션을 통해 유연성을 유지하면서 사용자 지정 개발이 필요하지 않습니다. 자세한 설명서, 예제 및 업데이트는 GitHub 리포지토리