Para recursos semelhantes aos do Amazon Timestream para, considere o Amazon Timestream LiveAnalytics para InfluxDB. Ele oferece ingestão de dados simplificada e tempos de resposta de consulta de um dígito em milissegundos para análises em tempo real. Saiba mais aqui.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Estenda o Timestream para InfluxDB com plug-ins de mecanismo de processamento
O mecanismo de processamento é uma máquina virtual Python incorporada que é executada dentro do banco de dados InfluxDB 3 no Amazon Timestream. Ele está disponível nas edições Core e Enterprise. Ele permite que você estenda seu banco de dados com código Python personalizado que pode automatizar fluxos de trabalho, transformar dados e criar endpoints de API personalizados.
O mecanismo de processamento executa plug-ins Python em resposta a eventos específicos do banco de dados:
-
Gravações de dados: processe e transforme dados à medida que eles entram no banco de dados
-
Eventos programados: execute o código em intervalos definidos ou horários específicos
-
Solicitações HTTP: exponha endpoints de API personalizados que executam seu código
O mecanismo inclui um cache em memória para gerenciar o estado entre as execuções, permitindo que você crie aplicativos com estado diretamente no seu banco de dados.
InfluxData plug-ins certificados
No lançamento, o InfluxDB 3 inclui um conjunto de plug-ins pré-construídos e totalmente configuráveis certificados por: InfluxData
-
Transformação de dados: processe e enriqueça os dados recebidos
-
Alertas: envie notificações com base nos limites de dados
-
Agregação: calcule estatísticas sobre dados de séries temporais
-
Monitoramento do sistema: acompanhe o uso de recursos e as métricas de integridade
-
Integração: conecte-se a serviços externos e APIs
Esses plug-ins certificados estão prontos para uso e podem ser configurados por meio de argumentos de acionadores para atender aos seus requisitos específicos.
Tipos de plug-ins e especificações de acionador
| Tipo de plug-in | Especificação do acionador | Quando o plug-in é executado | Casos de uso |
|---|---|---|---|
| Gravar dados | table:<TABLE_NAME> ou all_tables |
Quando os dados são gravados em tabelas | Transformação de dados, alertas, métricas derivadas |
| Programado | every:<DURATION> ou cron:<EXPRESSION> |
Em intervalos especificados | Agregação periódica, relatórios, verificações de saúde |
| Solicitação HTTP |
request:<REQUEST_PATH>
|
Quando as solicitações HTTP são recebidas | Personalizado APIs, webhooks, interfaces de usuário |
Criar acionadores
Os acionadores conectam plug-ins aos eventos do banco de dados e definem quando eles são executados. Use o comando influxdb3 create trigger .
Para criar um acionador de gravação de dados:
# Trigger on writes to a specific table influxdb3 create trigger \ --trigger-spec "table:sensor_data" \ --plugin-filename "process_sensors.py" \ --database DATABASE_NAME \ sensor_processor # Trigger on all table writes influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "process_all_data.py" \ --database DATABASE_NAME \ all_data_processor
Para criar um acionador programado:
# Run every 5 minutes influxdb3 create trigger \ --trigger-spec "every:5m" \ --plugin-filename "periodic_check.py" \ --database DATABASE_NAME \ regular_check # Run daily at 8am (cron format with seconds) influxdb3 create trigger \ --trigger-spec "cron:0 0 8 * * *" \ --plugin-filename "daily_report.py" \ --database DATABASE_NAME \ daily_report
Para criar um acionador de solicitação HTTP:
# Create endpoint at /api/v3/engine/webhook influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook_handler.py" \ --database DATABASE_NAME \ webhook_processor
Acesse o endpoint em: https://your-cluster-endpoint:8086/api/v3/engine/webhook
Configurar acionadores
Passando argumentos para plug-ins
Configure o comportamento do plug-in usando argumentos de acionador:
influxdb3 create trigger \ --trigger-spec "every:1h" \ --plugin-filename "threshold_check.py" \ --trigger-arguments "threshold=90,notify_email=admin@example.com" \ --database DATABASE_NAME \ threshold_monitor
Os argumentos são passados para o plugin como um dicionário:
def process_scheduled_call(influxdb3_local, call_time, args=None): if args and "threshold" in args: threshold = float(args["threshold"]) email = args.get("notify_email", "default@example.com") # Use arguments in your logic
Comportamento de tratamento de erros
Configure como os acionadores lidam com os erros:
# Log errors (default) influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "process.py" \ --error-behavior log \ --database DATABASE_NAME \ log_processor # Retry on error influxdb3 create trigger \ --trigger-spec "table:critical_data" \ --plugin-filename "critical.py" \ --error-behavior retry \ --database DATABASE_NAME \ retry_processor # Disable trigger on error influxdb3 create trigger \ --trigger-spec "request:webhook" \ --plugin-filename "webhook.py" \ --error-behavior disable \ --database DATABASE_NAME \ auto_disable_processor
Execução assíncrona
Permita que várias instâncias acionadoras sejam executadas simultaneamente:
influxdb3 create trigger \ --trigger-spec "table:metrics" \ --plugin-filename "heavy_process.py" \ --run-asynchronous \ --database DATABASE_NAME \ async_processor
Gerenciar acionadores
Para visualizar os acionadores para um banco de dados:
# Show all triggers for a database influxdb3 show summary \ --database DATABASE_NAME \ --token YOUR_TOKEN
Exclusão de tabela para acionadores de gravação
Para filtrar tabelas no código do seu plug-in ao usar all_tables:
influxdb3 create trigger \ --trigger-spec "all_tables" \ --plugin-filename "processor.py" \ --trigger-arguments "exclude_tables=temp_data,debug_info" \ --database DATABASE_NAME \ data_processor
A implementação do plug-in é a seguinte:
def process_writes(influxdb3_local, table_batches, args=None): excluded_tables = set(args.get('exclude_tables', '').split(',')) for table_batch in table_batches: if table_batch["table_name"] in excluded_tables: continue # Process allowed tables
Considerações sobre a implantação distribuída
Em implantações de vários nós, configure plug-ins com base nas funções dos nós:
| Tipo de plug-in | Node Type | Motivo |
|---|---|---|
| Plug-ins de gravação de dados | Nódulos de ingestão | Processar dados no ponto de ingestão |
| Plug-ins de solicitação HTTP | Nós de consulta | Gerenciar o tráfego da API |
| Plug-ins programados | Qualquer nó configurado | Pode ser executado em qualquer nó com programador |
As considerações a seguir são importantes para implantações corporativas:
-
Mantenha configurações de plug-in idênticas em todos os nós relevantes.
-
Encaminhe clientes externos (Grafana, painéis) para os nós do consultor.
-
Garanta que os plug-ins estejam acessíveis nos nós nos quais os seus acionadores são ativados.
Práticas recomendadas
-
Configuração do plug-in
-
Use argumentos de acionador para valores configuráveis em vez de codificação permanente.
-
Implante o tratamento adequado de erros nos plug-ins.
-
Use a API
influxdb3_localpara operações de banco de dados.
-
-
Otimização de desempenho
-
Use a execução assíncrona para tarefas pesadas de processamento.
-
Implemente devoluções antecipadas para dados filtrados.
-
Minimize as consultas ao banco de dados nos plug-ins.
-
-
Gerenciamento de erros
-
Escolha o comportamento de erro apropriado (log, tentar novamente ou desativar).
-
Monitore a execução do plug-in por meio de tabelas do sistema.
-
Teste minuciosamente os plug-ins antes da implantação na produção.
-
-
Considerações de segurança
-
Valide todos os dados de entrada nos plug-ins de solicitação HTTP.
-
Use métodos seguros para armazenar configurações confidenciais.
-
Limite as permissões do plug-in somente às operações necessárias.
-
Monitore a execução do plugin
Consulte tabelas do sistema para monitorar o desempenho do plug-in:
-- View processing engine logs SELECT * FROM system.processing_engine_logs WHERE time > now() - INTERVAL '1 hour' ORDER BY time DESC -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'DATABASE_NAME'
O mecanismo de processamento fornece uma maneira poderosa de estender a funcionalidade do InfluxDB 3, mantendo sua lógica de processamento de dados próxima aos dados, reduzindo a latência e simplificando sua arquitetura.
InfluxData plug-ins certificados
O Amazon Timestream para InfluxDB 3 inclui um conjunto abrangente de plug-ins pré-criados e certificados que ampliam a funcionalidade do banco de dados sem exigir desenvolvimento personalizado. Esses plug-ins são totalmente configuráveis e prontos para uso no lançamento, fornecendo recursos avançados para processamento, monitoramento e alerta de dados.
Para obter a documentação completa e o código-fonte, visite o Repositório de InfluxData Plugins
Plug-ins disponíveis
Plugins de detecção de anomalias
Detecção de anomalias baseada em MAD
-
Tipos de acionador: gravação de dados (em tempo real)
-
Casos de uso: detecção de valores discrepantes em tempo real para streaming de dados, monitoramento de sensores e controle de qualidade.
-
GitHub: Documentação de detecção de anomalias do MAD
Como funciona: usa o desvio absoluto mediano (MAD) para estabelecer uma linha de base robusta para o comportamento normal. À medida que novos dados chegam, ele calcula a que MADs distância da mediana cada ponto está. Os pontos que excedem o limite (k * MAD) são marcados como anomalias.
Principais atributos:
-
Processamento em tempo real à medida que os dados são gravados.
-
Mantém janelas deslizantes na memória para maior eficiência.
-
Alertas baseados em contagem (por exemplo, 5 anomalias consecutivas).
-
Alertas baseados em duração (por exemplo, anomalia por 2 minutos).
-
Supressão de giro para evitar que a fadiga do alerta mude rapidamente os valores.
Exemplo de uso:
# Detect temperature anomalies in real-time influxdb3 create trigger \ --database sensors \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=temperature_sensors,mad_thresholds="temp:2.5:20:5@humidity:3:30:2m",senders=slack,slack_webhook_url="YOUR_WEBHOOK"' \ temp_anomaly_detector # Threshold format: field:k_multiplier:window_size:trigger_condition # temp:2.5:20:5 = temperature field, 2.5 MADs, 20-point window, alert after 5 consecutive anomalies # humidity:3:30:2m = humidity field, 3 MADs, 30-point window, alert after 2 minutes of anomaly
Saída: envia notificações em tempo real quando anomalias são detectadas, incluindo nome, valor e duração do campo.
Plug-ins de transformação de dados
Transformação básica
-
Tipos de acionador: programado, gravação de dados
-
Casos de uso: padronização de dados, conversões de unidades, normalização de nomes de campos, limpeza de dados.
Como funciona: aplica uma cadeia de transformações aos nomes e valores dos campos. Pode processar dados históricos em lotes (programados) ou transformar dados à medida que chegam (gravação de dados). As transformações são aplicadas na ordem especificada, permitindo pipelines de dados complexos.
Principais atributos:
-
Transformações do nome do campo: snake_case, remova espaços, somente alfanumérico.
-
Conversões de unidades: temperatura, pressão, comprimento, unidades de tempo.
-
Substituições personalizadas de strings com suporte a regex.
-
Modo de execução a seco para testes sem gravar dados.
-
Processamento em lote para dados históricos.
Exemplo de uso:
# Transform temperature data from Celsius to Fahrenheit with field name standardization influxdb3 create trigger \ --database weather \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "every:30m" \ --trigger-arguments 'measurement=raw_weather,window=1h,target_measurement=weather_fahrenheit,names_transformations="Temperature Reading":"snake",values_transformations=temperature_reading:"convert_degC_to_degF"' \ temp_converter # Real-time field name cleaning for incoming sensor data influxdb3 create trigger \ --database iot \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore"' \ sensor_cleaner
Saída: cria uma nova tabela com dados transformados, preservando os carimbos de data/hora e tags originais.
Downsampler
-
Tipos de acionador: programado, HTTP
-
Casos de uso: redução de dados, otimização do armazenamento a longo prazo, criação de estatísticas resumidas, melhoria do desempenho.
-
GitHub: Documentação do downsampler
Como funciona: agrega dados de séries temporais de alta resolução em resumos de baixa resolução. Por exemplo, converte dados de 1 segundo em médias de 1 hora. Cada ponto com resolução reduzida inclui metadados sobre o número de pontos originais compactados e o intervalo de tempo coberto.
Principais atributos:
-
Várias funções de agregação: avg, sum, min, max, median, derivative.
-
Agregações específicas de campo (funções diferentes para campos diferentes).
-
Rastreamento de metadados (record_count, time_from, time_to).
-
API HTTP para redução de amostragem sob demanda (downsampling) com preenchimento retroativo (backfill).
-
Tamanhos de lote configuráveis para grandes conjuntos de dados.
Exemplo de uso:
# Downsample CPU metrics from 10-second to hourly resolution influxdb3 create trigger \ --database metrics \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=cpu_detailed,target_measurement=cpu_hourly,interval=1h,window=6h,calculations="usage:avg.max_usage:max.total_processes:sum",specific_fields=usage.max_usage.total_processes' \ cpu_downsampler # HTTP endpoint for on-demand downsampling curl -X POST http://localhost:8086/api/v3/engine/downsample \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "source_measurement": "sensor_data", "target_measurement": "sensor_daily", "interval": "1d", "calculations": [["temperature", "avg"], ["humidity", "avg"], ["pressure", "max"]], "backfill_start": "2024-01-01T00:00:00Z", "backfill_end": "2024-12-31T23:59:59Z" }'
Saída: cria dados com resolução reduzida com valores agregados e colunas de metadados mostrando o número de pontos compactados e o intervalo de tempo.
Plug-ins de monitoramento e alerta
Monitor de alteração do estado
-
Tipos de acionador: programado, gravação de dados
-
Casos de uso: monitoramento de status, rastreamento do estado do equipamento, monitoramento de processos, detecção de alterações.
Como funciona: monitora as alterações no valor do campo ao longo do tempo e alerta quando o número de alterações excede os limites configurados. Pode detectar mudanças de valor (valores diferentes) e condições de valor específicas (igual a um valor alvo). Inclui verificações de estabilidade para evitar alertas de sinais ruidosos.
Principais atributos:
-
Detecção de alterações com base na contagem (por exemplo, cinco alterações em dez minutos).
-
Monitoramento baseado em duração (por exemplo, status = “error” por cinco minutos).
-
Janela de mudança de estado para redução de ruído.
-
Monitoramento de vários campos com limites independentes.
-
Requisitos de estabilidade configuráveis.
Exemplo de uso:
# Monitor equipment status changes influxdb3 create trigger \ --database factory \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "every:5m" \ --trigger-arguments 'measurement=equipment,field_change_count="status:3.temperature:10",window=15m,state_change_window=5,senders=slack,notification_text="Equipment $field changed $changes times in $window"' \ equipment_monitor # Real-time monitoring for specific state conditions influxdb3 create trigger \ --database systems \ --plugin-filename "state_change/state_change_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=service_health,field_thresholds="status:down:5@health_score:0:10",senders=pagerduty' \ service_monitor
Saída: os alertas incluem o nome do campo, o número de alterações detectadas, a janela de tempo e os valores de tag relevantes.
Coletor de métricas de sistema
-
Tipos de acionador: programado
-
Casos de uso: monitoramento de infraestrutura, linhas de base de desempenho, planejamento de capacidade, rastreamento de recursos.
Como funciona: usa a biblioteca psutil para coletar métricas abrangentes do sistema do host que executa o InfluxDB. Coleta estatísticas de CPU, memória, disco e rede em intervalos configuráveis. Cada tipo de métrica pode ser ativado/desativado de forma independente.
Principais atributos:
-
Estatísticas de CPU por núcleo com médias de carga.
-
Uso de memória, incluindo falhas de troca e página.
-
I/O Métricas de disco com IOPS e latência calculados.
-
Estatísticas da interface de rede com rastreamento de erros.
-
Coleção de métricas configurável (ativar/desativar tipos específicos).
-
Repetição automática em caso de falhas de coleta.
Exemplo de uso:
# Collect all system metrics every 30 seconds influxdb3 create trigger \ --database monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:30s" \ --trigger-arguments 'hostname=db-server-01,include_cpu=true,include_memory=true,include_disk=true,include_network=true' \ system_monitor # Focus on CPU and memory for application servers influxdb3 create trigger \ --database app_monitoring \ --plugin-filename "system_metrics/system_metrics.py" \ --trigger-spec "every:1m" \ --trigger-arguments 'hostname=app-server-01,include_cpu=true,include_memory=true,include_disk=false,include_network=false' \ app_metrics
Saída: cria várias tabelas (system_cpu, system_memory, system_disk_io etc.) com métricas detalhadas para cada subsistema.
Plug-ins de análise preditiva
Previsões do Prophet
-
Tipos de acionador: programado, HTTP
-
Casos de uso: previsão de demanda, planejamento de capacidade, análise de tendências, detecção de padrões sazonais.
-
GitHub: Documentação de previsão do Prophet
Como funciona: usa a biblioteca Prophet do Facebook para criar modelos de previsão de séries temporais. Pode treinar modelos com base em dados históricos e gerar previsões para períodos futuros. Os modelos levam em conta tendências, sazonalidade, feriados e pontos de mudança. Oferece suporte à persistência do modelo para previsões consistentes.
Principais atributos:
-
Detecção automática de sazonalidade (diária, semanal, anual).
-
Suporte para calendário de feriados (integrado e personalizado).
-
Detecção de pontos de mudança para mudanças de tendência.
-
Persistência e controle de versionamento do modelo.
-
Intervalos de confiança para previsões.
-
Validação com limites do MSRE.
Exemplo de uso:
# Train and forecast temperature data influxdb3 create trigger \ --database weather \ --plugin-filename "prophet_forecasting/prophet_forecasting.py" \ --trigger-spec "every:1d" \ --trigger-arguments 'measurement=temperature,field=value,window=90d,forecast_horizont=7d,tag_values="location:seattle",target_measurement=temperature_forecast,model_mode=train,unique_suffix=seattle_v1,seasonality_mode=additive' \ temp_forecast # Validate temperature predictions with MAE influxdb3 create trigger \ --database weather \ --plugin-filename "forecast_error_evaluator/forecast_error_evaluator.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'forecast_measurement=temp_forecast,actual_measurement=temp_actual,forecast_field=predicted,actual_field=temperature,error_metric=mae,error_thresholds=WARN-"2.0":ERROR-"5.0",window=6h,rounding_freq=5min,senders=discord' \ temp_forecast_check
Saída: envia notificações quando o erro de previsão excede os limites, incluindo o valor da métrica de erro e o intervalo de tempo afetado.
Padrões de configuração comuns
Utilização de arquivos de configuração TOML
Para configurações complexas, use arquivos TOML em vez de argumentos embutidos:
# anomaly_config.toml measurement = "server_metrics" field = "cpu_usage" window = "1h" detector_type = "IsolationForestAD" contamination = 0.1 window_size = 20 output_table = "cpu_anomalies" senders = "slack" slack_webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK" notification_text = "Anomaly detected in $field: value=$value at $timestamp"
# Use TOML configuration PLUGIN_DIR=~/.plugins influxdb3 create trigger \ --database monitoring \ --plugin-filename "adtk_anomaly/adtk_anomaly_detection_plugin.py" \ --trigger-spec "every:10m" \ --trigger-arguments "config_file_path=anomaly_config.toml" \ cpu_anomaly_detector
Encadeamento de plug-ins
Crie pipelines de processamento de dados encadeando vários plug-ins:
# Step 1: Transform raw data influxdb3 create trigger \ --database pipeline \ --plugin-filename "basic_transformation/basic_transformation.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake"' \ step1_transform # Step 2: Downsample transformed data influxdb3 create trigger \ --database pipeline \ --plugin-filename "downsampler/downsampler.py" \ --trigger-spec "every:1h" \ --trigger-arguments 'source_measurement=clean_sensors,target_measurement=sensors_hourly,interval=1h,window=6h,calculations=avg' \ step2_downsample # Step 3: Detect anomalies in downsampled data influxdb3 create trigger \ --database pipeline \ --plugin-filename "mad_check/mad_check_plugin.py" \ --trigger-spec "all_tables" \ --trigger-arguments 'measurement=sensors_hourly,mad_thresholds="value:3:20:5",senders=slack' \ step3_anomaly
Práticas recomendadas para plug-ins
-
Comece com cautela — comece com limites mais altos e janelas mais longas, depois ajuste com base nos padrões observados.
-
Teste em desenvolvimento — use modos de execução a seco e teste bancos de dados antes da implantação na produção.
-
Monitore o desempenho do plug-in — verifique os tempos de execução e o uso de recursos nas tabelas do sistema.
-
Use os tipos de acionador apropriados — escolha agendado para processamento em lote, gravação de dados para tempo real.
-
Configure as notificações com sabedoria — use os níveis de gravidade e a lógica de rejeição para evitar a fadiga dos alertas.
-
Aproveite a persistência do modelo — para plug-ins baseados em ML, salve os modelos treinados para garantir a consistência.
-
Configurações do documento — use nomes de acionador descritivos e mantenha a documentação de configuração.
Monitore a execução do plugin
Para monitorar o desempenho do plug-in:
-- View plugin execution logs SELECT event_time, trigger_name, log_level, log_text FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name' AND time > now() - INTERVAL '1 hour' ORDER BY event_time DESC; -- Monitor plugin performance SELECT trigger_name, COUNT(*) as executions, AVG(execution_time_ms) as avg_time_ms, MAX(execution_time_ms) as max_time_ms, SUM(CASE WHEN log_level = 'ERROR' THEN 1 ELSE 0 END) as error_count FROM system.processing_engine_logs WHERE time > now() - INTERVAL '24 hours' GROUP BY trigger_name; -- Check trigger status SELECT * FROM system.processing_engine_triggers WHERE database = 'your_database';
Solução de problemas comuns
A tabela a seguir mostra os problemas comuns e as possíveis soluções.
| Problema | Solução |
|---|---|
| Plug-in não está sendo acionado | Verifique se o gatilho está ativado, verifique a schedule/spec sintaxe |
| Notificações ausentes | Confirme a instalação do plug-in Notifier, verifique o webhook URLs |
| Alto uso de memória | Reduza o tamanho das janelas, ajuste os intervalos de processamento em lote |
| Transformações incorretas | Use o modo de execução a seco, verifique os nomes dos campos e os tipos de dados |
| Previsões imprecisas | Aumente a janela de dados de treinamento, ajuste as configurações de sazonalidade |
| Muitos alertas | Aumente a contagem de acionadores, adicione a duração do debounce, ajuste os limites |
Esses plug-ins certificados fornecem funcionalidade pronta para uso corporativo para necessidades comuns de processamento de dados de séries temporais, eliminando a necessidade de desenvolvimento personalizado e mantendo a flexibilidade por meio de opções de configuração abrangentes. Visite o GitHubrepositório