Estenda o Timestream para InfluxDB com plug-ins de mecanismo de processamento - Amazon Timestream

Para recursos semelhantes aos do Amazon Timestream para, considere o Amazon Timestream LiveAnalytics para InfluxDB. Ele oferece ingestão de dados simplificada e tempos de resposta de consulta de um dígito em milissegundos para análises em tempo real. Saiba mais aqui.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Estenda o Timestream para InfluxDB com plug-ins de mecanismo de processamento

O mecanismo de processamento é uma máquina virtual Python incorporada que é executada dentro do banco de dados InfluxDB 3 no Amazon Timestream. Ele está disponível nas edições Core e Enterprise. Ele permite que você estenda seu banco de dados com código Python personalizado que pode automatizar fluxos de trabalho, transformar dados e criar endpoints de API personalizados.

O mecanismo de processamento executa plug-ins Python em resposta a eventos específicos do banco de dados:

  • Gravações de dados: processe e transforme dados à medida que eles entram no banco de dados

  • Eventos programados: execute o código em intervalos definidos ou horários específicos

  • Solicitações HTTP: exponha endpoints de API personalizados que executam seu código

O mecanismo inclui um cache em memória para gerenciar o estado entre as execuções, permitindo que você crie aplicativos com estado diretamente no seu banco de dados.

InfluxData plug-ins certificados

No lançamento, o InfluxDB 3 inclui um conjunto de plug-ins pré-construídos e totalmente configuráveis certificados por: InfluxData

  • Transformação de dados: processe e enriqueça os dados recebidos

  • Alertas: envie notificações com base nos limites de dados

  • Agregação: calcule estatísticas sobre dados de séries temporais

  • Monitoramento do sistema: acompanhe o uso de recursos e as métricas de integridade

  • Integração: conecte-se a serviços externos e APIs

Esses plug-ins certificados estão prontos para uso e podem ser configurados por meio de argumentos de acionadores para atender aos seus requisitos específicos.

Tipos de plug-ins e especificações de acionador

Tipo de plug-in Especificação do acionador Quando o plug-in é executado Casos de uso
Gravar dados table:<TABLE_NAME> ou all_tables Quando os dados são gravados em tabelas Transformação de dados, alertas, métricas derivadas
Programado every:<DURATION> ou cron:<EXPRESSION> Em intervalos especificados Agregação periódica, relatórios, verificações de saúde
Solicitação HTTP request:<REQUEST_PATH> Quando as solicitações HTTP são recebidas Personalizado APIs, webhooks, interfaces de usuário

Criar acionadores

Os acionadores conectam plug-ins aos eventos do banco de dados e definem quando eles são executados. Use o comando  influxdb3 create trigger .

Para criar um acionador de gravação de dados:

# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor

Para criar um acionador programado:

# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report

Para criar um acionador de solicitação HTTP:

# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor

Acesse o endpoint em: https://your-cluster-endpoint:8086/api/v3/engine/webhook

Configurar acionadores

Passando argumentos para plug-ins

Configure o comportamento do plug-in usando argumentos de acionador:

influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor

Os argumentos são passados para o plugin como um dicionário:

def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic

Comportamento de tratamento de erros

Configure como os acionadores lidam com os erros:

# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor

Execução assíncrona

Permita que várias instâncias acionadoras sejam executadas simultaneamente:

influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor

Gerenciar acionadores

Para visualizar os acionadores para um banco de dados:

# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN

Exclusão de tabela para acionadores de gravação

Para filtrar tabelas no código do seu plug-in ao usar all_tables:

influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor

A implementação do plug-in é a seguinte:

def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables

Considerações sobre a implantação distribuída

Em implantações de vários nós, configure plug-ins com base nas funções dos nós:

Tipo de plug-in Node Type Motivo
Plug-ins de gravação de dados Nódulos de ingestão Processar dados no ponto de ingestão
Plug-ins de solicitação HTTP Nós de consulta Gerenciar o tráfego da API
Plug-ins programados Qualquer nó configurado Pode ser executado em qualquer nó com programador

As considerações a seguir são importantes para implantações corporativas:

  • Mantenha configurações de plug-in idênticas em todos os nós relevantes.

  • Encaminhe clientes externos (Grafana, painéis) para os nós do consultor.

  • Garanta que os plug-ins estejam acessíveis nos nós nos quais os seus acionadores são ativados.

Práticas recomendadas

  • Configuração do plug-in

    • Use argumentos de acionador para valores configuráveis em vez de codificação permanente.

    • Implante o tratamento adequado de erros nos plug-ins.

    • Use a API influxdb3_local para operações de banco de dados.

  • Otimização de desempenho

    • Use a execução assíncrona para tarefas pesadas de processamento.

    • Implemente devoluções antecipadas para dados filtrados.

    • Minimize as consultas ao banco de dados nos plug-ins.

  • Gerenciamento de erros

    • Escolha o comportamento de erro apropriado (log, tentar novamente ou desativar).

    • Monitore a execução do plug-in por meio de tabelas do sistema.

    • Teste minuciosamente os plug-ins antes da implantação na produção.

  • Considerações de segurança

    • Valide todos os dados de entrada nos plug-ins de solicitação HTTP.

    • Use métodos seguros para armazenar configurações confidenciais.

    • Limite as permissões do plug-in somente às operações necessárias.

Monitore a execução do plugin

Consulte tabelas do sistema para monitorar o desempenho do plug-in:

-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'

O mecanismo de processamento fornece uma maneira poderosa de estender a funcionalidade do InfluxDB 3, mantendo sua lógica de processamento de dados próxima aos dados, reduzindo a latência e simplificando sua arquitetura.

InfluxData plug-ins certificados

O Amazon Timestream para InfluxDB 3 inclui um conjunto abrangente de plug-ins pré-criados e certificados que ampliam a funcionalidade do banco de dados sem exigir desenvolvimento personalizado. Esses plug-ins são totalmente configuráveis e prontos para uso no lançamento, fornecendo recursos avançados para processamento, monitoramento e alerta de dados.

Para obter a documentação completa e o código-fonte, visite o Repositório de InfluxData Plugins.

Plug-ins disponíveis

Plugins de detecção de anomalias

Detecção de anomalias baseada em MAD

  • Tipos de acionador: gravação de dados (em tempo real)

  • Casos de uso: detecção de valores discrepantes em tempo real para streaming de dados, monitoramento de sensores e controle de qualidade.

  • GitHub: Documentação de detecção de anomalias do MAD

Como funciona: usa o desvio absoluto mediano (MAD) para estabelecer uma linha de base robusta para o comportamento normal. À medida que novos dados chegam, ele calcula a que MADs distância da mediana cada ponto está. Os pontos que excedem o limite (k * MAD) são marcados como anomalias.

Principais atributos:

  • Processamento em tempo real à medida que os dados são gravados.

  • Mantém janelas deslizantes na memória para maior eficiência.

  • Alertas baseados em contagem (por exemplo, 5 anomalias consecutivas).

  • Alertas baseados em duração (por exemplo, anomalia por 2 minutos).

  • Supressão de giro para evitar que a fadiga do alerta mude rapidamente os valores.

Exemplo de uso:

# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly

Saída: envia notificações em tempo real quando anomalias são detectadas, incluindo nome, valor e duração do campo.

Plug-ins de transformação de dados

Transformação básica

  • Tipos de acionador: programado, gravação de dados

  • Casos de uso: padronização de dados, conversões de unidades, normalização de nomes de campos, limpeza de dados.

  • GitHub: Documentação básica de transformação

Como funciona: aplica uma cadeia de transformações aos nomes e valores dos campos. Pode processar dados históricos em lotes (programados) ou transformar dados à medida que chegam (gravação de dados). As transformações são aplicadas na ordem especificada, permitindo pipelines de dados complexos.

Principais atributos:

  • Transformações do nome do campo: snake_case, remova espaços, somente alfanumérico.

  • Conversões de unidades: temperatura, pressão, comprimento, unidades de tempo.

  • Substituições personalizadas de strings com suporte a regex.

  • Modo de execução a seco para testes sem gravar dados.

  • Processamento em lote para dados históricos.

Exemplo de uso:

# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner

Saída: cria uma nova tabela com dados transformados, preservando os carimbos de data/hora e tags originais.

Downsampler

  • Tipos de acionador: programado, HTTP

  • Casos de uso: redução de dados, otimização do armazenamento a longo prazo, criação de estatísticas resumidas, melhoria do desempenho.

  • GitHub: Documentação do downsampler

Como funciona: agrega dados de séries temporais de alta resolução em resumos de baixa resolução. Por exemplo, converte dados de 1 segundo em médias de 1 hora. Cada ponto com resolução reduzida inclui metadados sobre o número de pontos originais compactados e o intervalo de tempo coberto.

Principais atributos:

  • Várias funções de agregação: avg, sum, min, max, median, derivative.

  • Agregações específicas de campo (funções diferentes para campos diferentes).

  • Rastreamento de metadados (record_count, time_from, time_to).

  • API HTTP para redução de amostragem sob demanda (downsampling) com preenchimento retroativo (backfill).

  • Tamanhos de lote configuráveis para grandes conjuntos de dados.

Exemplo de uso:

# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'

Saída: cria dados com resolução reduzida com valores agregados e colunas de metadados mostrando o número de pontos compactados e o intervalo de tempo.

Plug-ins de monitoramento e alerta

Monitor de alteração do estado

  • Tipos de acionador: programado, gravação de dados

  • Casos de uso: monitoramento de status, rastreamento do estado do equipamento, monitoramento de processos, detecção de alterações.

  • GitHub: Documentação de mudança de estado

Como funciona: monitora as alterações no valor do campo ao longo do tempo e alerta quando o número de alterações excede os limites configurados. Pode detectar mudanças de valor (valores diferentes) e condições de valor específicas (igual a um valor alvo). Inclui verificações de estabilidade para evitar alertas de sinais ruidosos.

Principais atributos:

  • Detecção de alterações com base na contagem (por exemplo, cinco alterações em dez minutos).

  • Monitoramento baseado em duração (por exemplo, status = “error” por cinco minutos).

  • Janela de mudança de estado para redução de ruído.

  • Monitoramento de vários campos com limites independentes.

  • Requisitos de estabilidade configuráveis.

Exemplo de uso:

# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor

Saída: os alertas incluem o nome do campo, o número de alterações detectadas, a janela de tempo e os valores de tag relevantes.

Coletor de métricas de sistema

  • Tipos de acionador: programado

  • Casos de uso: monitoramento de infraestrutura, linhas de base de desempenho, planejamento de capacidade, rastreamento de recursos.

  • GitHub: Documentação de métricas do sistema

Como funciona: usa a biblioteca psutil para coletar métricas abrangentes do sistema do host que executa o InfluxDB. Coleta estatísticas de CPU, memória, disco e rede em intervalos configuráveis. Cada tipo de métrica pode ser ativado/desativado de forma independente.

Principais atributos:

  • Estatísticas de CPU por núcleo com médias de carga.

  • Uso de memória, incluindo falhas de troca e página.

  • I/O Métricas de disco com IOPS e latência calculados.

  • Estatísticas da interface de rede com rastreamento de erros.

  • Coleção de métricas configurável (ativar/desativar tipos específicos).

  • Repetição automática em caso de falhas de coleta.

Exemplo de uso:

# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics

Saída: cria várias tabelas (system_cpu, system_memory, system_disk_io etc.) com métricas detalhadas para cada subsistema.

Plug-ins de análise preditiva

Previsões do Prophet

  • Tipos de acionador: programado, HTTP

  • Casos de uso: previsão de demanda, planejamento de capacidade, análise de tendências, detecção de padrões sazonais.

  • GitHub: Documentação de previsão do Prophet

Como funciona: usa a biblioteca Prophet do Facebook para criar modelos de previsão de séries temporais. Pode treinar modelos com base em dados históricos e gerar previsões para períodos futuros. Os modelos levam em conta tendências, sazonalidade, feriados e pontos de mudança. Oferece suporte à persistência do modelo para previsões consistentes.

Principais atributos:

  • Detecção automática de sazonalidade (diária, semanal, anual).

  • Suporte para calendário de feriados (integrado e personalizado).

  • Detecção de pontos de mudança para mudanças de tendência.

  • Persistência e controle de versionamento do modelo.

  • Intervalos de confiança para previsões.

  • Validação com limites do MSRE.

Exemplo de uso:

# Train and forecast temperature data influxdb3 create trigger \ --database weather \ --plugin-filename "prophet_forecasting/prophet_forecasting.py" \ --trigger-spec "every:1d" \ --trigger-arguments 'measurement=temperature,field=value,window=90d,forecast_horizont=7d,tag_values="location:seattle",target_measurement=temperature_forecast,model_mode=train,unique_suffix=seattle_v1,seasonality_mode=additive' \ temp_forecast # Validate temperature predictions with MAE influxdb3 create trigger \ --database weather \ --plugin-filename "forecast_error_evaluator/forecast_error_evaluator.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'forecast_measurement=temp_forecast,actual_measurement=temp_actual,forecast_field=predicted,actual_field=temperature,error_metric=mae,error_thresholds=WARN-"2.0":ERROR-"5.0",window=6h,rounding_freq=5min,senders=discord' \ temp_forecast_check

Saída: envia notificações quando o erro de previsão excede os limites, incluindo o valor da métrica de erro e o intervalo de tempo afetado.

Padrões de configuração comuns

Utilização de arquivos de configuração TOML

Para configurações complexas, use arquivos TOML em vez de argumentos embutidos:

# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector

Encadeamento de plug-ins

Crie pipelines de processamento de dados encadeando vários plug-ins:

# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly

Práticas recomendadas para plug-ins

  • Comece com cautela — comece com limites mais altos e janelas mais longas, depois ajuste com base nos padrões observados.

  • Teste em desenvolvimento — use modos de execução a seco e teste bancos de dados antes da implantação na produção.

  • Monitore o desempenho do plug-in — verifique os tempos de execução e o uso de recursos nas tabelas do sistema.

  • Use os tipos de acionador apropriados — escolha agendado para processamento em lote, gravação de dados para tempo real.

  • Configure as notificações com sabedoria — use os níveis de gravidade e a lógica de rejeição para evitar a fadiga dos alertas.

  • Aproveite a persistência do modelo — para plug-ins baseados em ML, salve os modelos treinados para garantir a consistência.

  • Configurações do documento — use nomes de acionador descritivos e mantenha a documentação de configuração.

Monitore a execução do plugin

Para monitorar o desempenho do plug-in:

-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';

Solução de problemas comuns

A tabela a seguir mostra os problemas comuns e as possíveis soluções.

Problema Solução
Plug-in não está sendo acionado Verifique se o gatilho está ativado, verifique a schedule/spec sintaxe
Notificações ausentes Confirme a instalação do plug-in Notifier, verifique o webhook URLs
Alto uso de memória Reduza o tamanho das janelas, ajuste os intervalos de processamento em lote
Transformações incorretas Use o modo de execução a seco, verifique os nomes dos campos e os tipos de dados
Previsões imprecisas Aumente a janela de dados de treinamento, ajuste as configurações de sazonalidade
Muitos alertas Aumente a contagem de acionadores, adicione a duração do debounce, ajuste os limites

Esses plug-ins certificados fornecem funcionalidade pronta para uso corporativo para necessidades comuns de processamento de dados de séries temporais, eliminando a necessidade de desenvolvimento personalizado e mantendo a flexibilidade por meio de opções de configuração abrangentes. Visite o GitHubrepositório para obter documentação detalhada, exemplos e atualizações.