Para recursos semelhantes aos do Amazon Timestream para, considere o Amazon Timestream LiveAnalytics para InfluxDB. Ele oferece ingestão de dados simplificada e tempos de resposta de consulta de um dígito em milissegundos para análises em tempo real. Saiba mais aqui.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Lidar com dados que chegam tardiamente
Você pode ter cenários em que os dados chegam com um atraso significativo, por exemplo, o horário em que os dados foram ingeridos no Timestream para LiveAnalytics está significativamente atrasado em comparação com o timestamp associado às linhas que são ingeridas. Nos exemplos anteriores, você viu como usar os intervalos de tempo definidos pelo parâmetro @scheduled_runtime para contabilizar alguns dados que chegam tarde. Entretanto, se houver casos de uso em que os dados possam ser atrasados por horas ou dias, pode ser necessário adotar um padrão diferente para assegurar que os pré-cálculos na tabela derivada sejam atualizados corretamente para refletir esses dados que chegam com atraso. Para obter informações gerais sobre dados de chegada tardia, consulte Gravação de dados (inserções e acréscimos).
A seguir, você verá duas maneiras diferentes de lidar com esses dados que chegam tardiamente.
-
Se você tiver atrasos previsíveis na chegada dos dados, poderá usar outra computação programada “atualizada” para atualizar seus agregados para dados que chegam tarde.
-
Se você tiver atrasos imprevisíveis ou dados ocasionais de chegada tardia, poderá usar execuções manuais para atualizar as tabelas derivadas.
Esta discussão aborda cenários de chegada tardia de dados. No entanto, os mesmos princípios se aplicam às correções de dados, nas quais você modificou os dados na tabela de origem e deseja atualizar os agregados nas tabelas derivadas.
Tópicos
Consultas de atualização programadas
Consulta agregando dados que chegaram a tempo
Abaixo está um padrão: você verá como usar uma forma automatizada de atualizar seus agregados se houver atrasos previsíveis na chegada dos dados. Considere um dos exemplos anteriores de um cálculo programado em dados em tempo real abaixo. Esse cálculo programado atualiza a tabela derivada uma vez a cada 30 minutos e já contabiliza dados com até uma hora de atraso.
{ "Name": "MultiPT30mPerHrPerTimeseriesDPCount", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 1h AND @scheduled_runtime + 1h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0/30 * * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }
Consulta de atualização para atualizar os agregados para dados que chegaram com atraso
Agora, se você considerar o caso, seus dados podem ser atrasados em cerca de 12 horas. Veja abaixo uma variante da mesma consulta. No entanto, a diferença é que ele calcula os agregados em dados que estão atrasados em até 12 horas em comparação com quando a computação programada está sendo acionada. Por exemplo, você vê a consulta no exemplo abaixo. O intervalo de tempo que essa consulta tem como alvo é entre 2h e 14h antes de ser acionada. Além disso, se você observar a expressão de cronograma cron (0 0,12 * *? *), ele acionará o cálculo às 00:00 UTC e 12:00 UTC todos os dias. Portanto, quando a consulta é acionada em 01/12/2021 00:00:00, a consulta atualiza os agregados no intervalo de tempo 2021-11-30 10:00:00 a 2021-11-30 22:00:00. As consultas agendadas usam uma semântica ascendente semelhante às gravações do Timestream para LiveAnalytics, em que essa consulta de recuperação atualizará os valores agregados com valores mais novos se houver dados atrasados na janela ou se novos agregados forem encontrados (por exemplo, um novo agrupamento aparecerá nesse agregado que não estava presente quando a computação programada original foi acionada) e, em seguida, o novo agregado será inserido na tabela derivada. Da mesma forma, quando a próxima instância for acionada em 01/12/2021 12:00:00, essa instância atualizará os agregados no intervalo 2021-11-30 22:00:00 a 2021-12-01 10:00:00.
{ "Name": "MultiPT12HPerHrPerTimeseriesDPCountCatchUp", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 14h AND bin(@scheduled_runtime, 1h) - 2h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0 0,12 * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }
O exemplo anterior é uma ilustração, supondo que sua chegada tardia esteja limitada a 12 horas e que não há problema em atualizar a tabela derivada uma vez a cada 12 horas para que os dados cheguem depois da janela em tempo real. Para que sua tabela derivada reflita os dados que chegam tardiamente mais cedo, você pode ajustar esse padrão para atualizá-la uma vez por hora. Da mesma forma, você pode ajustar o intervalo de tempo para mais de 12 horas, como um dia, uma semana ou mais, para gerenciar dados previsíveis que chegam atrasados.
Execuções manuais para dados imprevisíveis que chegam tardiamente
Pode haver casos em que você tenha dados imprevisíveis chegando atrasados ou tenha feito alterações nos dados de origem e atualizado alguns valores após o fato. Em todos esses casos, é possível acionar manualmente consultas agendadas para atualizar a tabela derivada. Abaixo está um exemplo de como você é possível conseguir isso.
Suponha que você tenha o caso de uso em que você tenha a computação gravada na tabela derivada dp_per_timeseries_per_hr. Seus dados básicos na tabela devops foram atualizados no intervalo de tempo 2021-11-30 23:00:00 - 2021-12-01 00:00:00. Há duas consultas agendadas diferentes que podem ser usadas para atualizar essa tabela derivada: Multipt30mPerHRPerTimeSeriesDpCount e Multipt12HPerHRPerTimeSeriesDPCountCatchup. Cada cálculo agendado que você cria no Timestream para LiveAnalytics tem um ARN exclusivo que você obtém ao criar o cálculo ou ao realizar uma operação de lista. Para realizar essa operação, você pode usar o ARN para o cálculo e um valor para o parâmetro @scheduled_runtime, obtido pela consulta.
Suponha que o cálculo para Multipt30mPerHRPerTimeSeriesDpCount tenha um ARN arn_1 e você queira usar esse cálculo para atualizar a tabela derivada. Como o cálculo agendado anterior atualiza os agregados 1 hora antes e 1 hora depois do valor @scheduled_runtime, é possível cobrir o intervalo de tempo da atualização (2021-11-30 23:00:00 - 2021-12-01 00:00:00) usando um valor de 2021-12-01 00:00:00 para o parâmetro @scheduled_runtime. É possível usar a API ExecuteScheduledQuery para passar o ARN desse cálculo e o valor do parâmetro de tempo em segundos de época (em UTC) para fazer isso. Abaixo está um exemplo usando a AWS CLI e você pode seguir o mesmo padrão usando qualquer um dos SDKs compatíveis com o Timestream para LiveAnalytics.
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1
No exemplo anterior, o perfil é o perfil AWS que tem os privilégios apropriados para fazer essa chamada de API e 1638316800 corresponde à segunda época de 2021-12-01 00:00:00. Esse gatilho manual se comporta quase como o gatilho automatizado, supondo que o sistema tenha acionado essa invocação no período de tempo desejado.
Se você teve uma atualização em um período mais longo, digamos que os dados básicos foram atualizados para 2021-11-30 23:00:00 - 2021-12-01 11:00:00, então é possível acionar as consultas anteriores várias vezes para cobrir todo esse intervalo de tempo. Por exemplo, é possível fazer seis execuções diferentes da seguinte maneira.
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638324000 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638331200 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638338400 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638345600 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638352800 --profile profile --region us-east-1
Os seis comandos anteriores correspondem à computação programada invocada em 2021-12-01 00:00:00, 2021-12-01 02:00:00, 2021-12-01 04:0:00, 2021-12-01 06:00:00, 2021-12-01 08:00:00e 2021-12-01 10:00:
Como opção, você pode usar o cálculo Multipt12hPerHRPerTimeSeriesDPCountCatchup acionado em 2021-12-01 13:00:00 para uma execução para atualizar os agregados para todo o intervalo de tempo de 12 horas. Por exemplo, se arn_2 for o ARN desse cálculo, será possível executar o seguinte comando na CLI.
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_2 --invocation-time 1638363600 --profile profile --region us-east-1
É importante notar que, para um acionamento manual, você pode usar um carimbo de data/hora para o parâmetro de tempo de invocação que não precisa estar alinhado com os carimbos de data/hora do gatilho automatizado. Por exemplo, no exemplo anterior, você acionou o cálculo no horário 2021-12-01 13:00:00 embora a programação automatizada só seja acionada nos timestamps 2021-12-01 10:00:00, 2021-12-01 12:00:00e 2021-12-02 00:00:00. O Timestream para LiveAnalytics oferece a flexibilidade de acioná-lo com valores apropriados, conforme necessário, para suas operações manuais.
A seguir estão algumas considerações importantes ao usar a API ExecuteScheduledQuery.
-
Se você estiver acionando várias dessas invocações, precisará garantir que essas invocações não gerem resultados em intervalos de tempo sobrepostos. Por exemplo, nos exemplos anteriores, havia seis invocações. Cada invocação abrange um intervalo de tempo de 2 horas e, portanto, os timestamps de invocação foram distribuídos em duas horas cada para evitar qualquer sobreposição nas atualizações. Isso garante que os dados na tabela derivada terminem em um estado que corresponda aos agregados da tabela de origem. Se você não puder garantir intervalos de tempo não sobrepostos, certifique-se de que essas execuções sejam acionadas sequencialmente uma após a outra. Se você acionar várias execuções simultaneamente que se sobrepõem em seus intervalos de tempo, poderá ver falhas de gatilho nas quais poderá ver conflitos de versão nos relatórios de erros dessas execuções. Os resultados gerados por uma invocação de consulta agendada recebem uma versão com base em quando a invocação foi acionada. Portanto, as linhas geradas por invocações mais recentes têm versões superiores. Um registro de versão superior pode sobrescrever um registro de versão inferior. Para consultas agendadas acionadas automaticamente, o Timestream para LiveAnalytics gerencia automaticamente os agendamentos para que você não veja esses problemas, mesmo que as invocações subsequentes tenham intervalos de tempo sobrepostos.
-
observado anteriormente, é possível acionar as invocações com qualquer valor de timestamp para @scheduled_runtime. Portanto, é sua responsabilidade definir adequadamente os valores para que os intervalos de tempo apropriados sejam atualizados na tabela derivada correspondente aos intervalos em que os dados foram atualizados na tabela de origem.
-
Também é possível usar esses gatilhos manuais para consultas agendadas que estão no estado DESATIVADO. Isso permite que você defina consultas especiais que não são executadas em um agendamento automatizado, pois estão no estado DESATIVADO. Em vez disso, você pode usar acionadores manuais neles para gerenciar correções de dados ou casos de uso de chegada tardia.