Amazon Timestream for LiveAnalytics と同様の機能については、Amazon Timestream for InfluxDB を検討してください。リアルタイム分析のために、シンプルなデータ取り込みと 1 桁ミリ秒のクエリ応答時間を提供します。詳細については、こちらを参照してください。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
遅延受信データの処理
例えば、データが Timestream for LiveAnalytics に取り込まれた時間が、取り込まれた行に関連付けられたタイムスタンプと比較して大幅に遅れるなど、データが大幅に遅れるシナリオがあるかもしれません。前の例では、@scheduled_runtime パラメータで定義された時間範囲を使用して、到着遅延データを考慮する方法を確認しました。ただし、データを数時間または数日遅延できるユースケースがある場合、派生テーブルの事前計算が、このような遅延受信データを反映するように適切に更新されるように、別のパターンが必要になる場合があります。遅延受信データに関する一般的な情報については、「」を参照してくださいデータの書き込み (挿入とアップサート)。
以下では、この到着遅延データに対処する 2 つの異なる方法を示します。
-
データの到着に予測可能な遅延がある場合は、別の「キャッチアップ」スケジュール計算を使用して、到着遅延データの集計を更新できます。
-
予測不可能な遅延や時折の遅延到着データがある場合は、手動実行を使用して派生テーブルを更新できます。
このディスカッションでは、遅延データ到着のシナリオについて説明します。ただし、ソーステーブルのデータを変更し、派生テーブルの集計を更新する場合、データ修正にも同じ原則が適用されます。
スケジュールされたキャッチアップクエリ
時間内に到着したデータの集計をクエリする
以下は、データ到着に予測可能な遅延が発生した場合に、自動方法を使用して集計を更新する方法を示すパターンです。以下のリアルタイムデータでスケジュールされた計算の例の 1 つを考えてみましょう。このスケジュールされた計算では、派生テーブルが 30 分に 1 回更新され、最大 1 時間の遅延が既に考慮されています。
{ "Name": "MultiPT30mPerHrPerTimeseriesDPCount", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 1h AND @scheduled_runtime + 1h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0/30 * * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }
遅延到着データの集計を更新するキャッチアップクエリ
ここで、データが約 12 時間遅延する可能性がある場合を考えてみましょう。以下は、同じクエリのバリアントです。ただし、スケジュールされた計算がトリガーされるときと比較して、最大 12 時間遅延したデータの集計を計算する点が異なります。例えば、以下の例では、クエリがターゲットとしている時間範囲は、クエリがトリガーされる 2 時間前から 14 時間前です。さらに、スケジュール式 cron(0 0,12 * * ? *) に気付いた場合、計算は毎日 00:00 UTC と 12:00 UTC にトリガーされます。したがって、クエリが 2021-12-01 00:00:00 にトリガーされると、クエリは 2021-11-30 10:00:00 から 2021-11-30 22:00:00 の時間範囲で集計されます。スケジュールされたクエリは、Timestream for LiveAnalytics の書き込みと同様のアップサートセマンティクスを使用します。このキャッチアップクエリは、ウィンドウに遅延到着データがある場合や、新しい集計が見つかった場合 (たとえば、この集計に新しいグループが表示され、元のスケジュールされた計算がトリガーされたときに存在しなかった)、新しい集計が派生テーブルに挿入されます。同様に、次のインスタンスが 2021-12-012:00:00 にトリガーされると、そのインスタンスは 2021-11-302:00:00 から 10:00:00 の範囲2021-12-01の集計を更新します。
{ "Name": "MultiPT12HPerHrPerTimeseriesDPCountCatchUp", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 14h AND bin(@scheduled_runtime, 1h) - 2h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0 0,12 * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }
この例は、遅延到着が 12 時間に制限されており、リアルタイムウィンドウより遅れて到着するデータについては、派生テーブルを 12 時間ごとに更新しても問題ありません。このパターンを適応させて、派生テーブルを 1 時間に 1 回更新し、派生テーブルが到着遅延データをより早く反映するようにできます。同様に、時間範囲を 1 日や 1 週間以上など、12 時間以上経過するように調整して、予測可能な遅延受信データを処理できます。
予測不可能な遅延到着データの手動実行
予測不可能な遅延到着データがある場合や、ソースデータを変更し、事後にいくつかの値を更新した場合があります。このような場合はすべて、スケジュールされたクエリを手動でトリガーして、派生テーブルを更新できます。以下は、これを実現する方法の例です。
派生テーブル dp_per_timeseries_per_hr に計算が書き込まれるユースケースがあるとします。テーブル devops のベースデータは、2021-11-303:00:00 - 2021-12-01」の時間範囲で更新されました。この派生テーブルの更新に使用できるスケジュールされたクエリは、MultiPT30mPerHrPerTimeseriesDPCount と MultiPT12HPerHrPerTimeseriesDPCountCatchUp の 2 つあります。LiveAnalytics の Timestream で作成するスケジュールされた各計算には、計算の作成時またはリストオペレーションの実行時に取得する一意の ARN があります。計算には ARN を使用し、このオペレーションを実行するにはクエリによって実行されるパラメータ @scheduled_runtime の値を使用できます。
MultiPT30mPerHrPerTimeseriesDPCount の計算に ARN arn_1 があり、この計算を使用して派生テーブルを更新するとします。前述のスケジュールされた計算では、@scheduled_runtime 値の 1 時間前と 1 時間後に集計が更新されるため、@scheduled_runtime パラメータの 2021-12-010:00:00 という値を使用して、更新 (2021-11-303:00:00 - 2021-12-01 00:00:00) の時間範囲をカバーできます。ExecuteScheduledQuery API を使用して、この計算の ARN とエポック秒単位 (UTC) の時間パラメータ値を渡すことで、これを実現できます。以下は CLI AWS を使用した例で、Timestream for LiveAnalytics でサポートされている SDKs のいずれかを使用して同じパターンをたどることができます。
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1
前の例では、 profile は、この API コールを行うための適切な権限を持つ AWS プロファイルであり、 1638316800「」は、2021-12-01「 00:00:00」のエポック秒に対応します。この手動トリガーは、システムが希望する期間にこの呼び出しをトリガーしたと仮定して、自動トリガーとほぼ同じように動作します。
より長い期間に更新があった場合、ベースデータが 2021-11-303:00:00 - 2021-12-0111:00:00 に更新されたとすると、前述のクエリを複数回トリガーして、この時間範囲全体をカバーすることができます。たとえば、次のように 6 つの異なる実行を実行できます。
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638324000 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638331200 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638338400 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638345600 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638352800 --profile profile --region us-east-1
前の 6 つのコマンドは、2021-12-01「」00:00:00、2021-12-01「02:00:00」、2021-12-01「04:0:00」、「06:00:002021-12-01」、2021-12-01「08:00:00」、「10:002021-12-01」に呼び出されたスケジュールされた計算に対応しています。
または、1 回の実行で 2021-12-01 13:00:00 にトリガーされた MultiPT12HPerHrPerTimeseriesDPCountCatchUp 計算を使用して、12 時間範囲全体の集計を更新することもできます。たとえば、arn_2 がその計算の ARN である場合、CLI から次のコマンドを実行できます。
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_2 --invocation-time 1638363600 --profile profile --region us-east-1
手動トリガーでは、その自動トリガータイムスタンプと整列する必要のない呼び出し時パラメータにタイムスタンプを使用できます。例えば、前の例では、自動スケジュールがタイムスタンプ 2021-12-010:00:00、12021-12-012:00:00、および 00:00:00 でのみトリガーされているにもかかわらず、計算を 2021-12-0113:00:00 2021-12-02トリガーしました。LiveAnalytics の Timestream は、手動操作に必要な適切な値でトリガーする柔軟性を提供します。
ExecuteScheduledQuery API を使用する際の重要な考慮事項を以下に示します。
-
これらの呼び出しの複数をトリガーする場合は、これらの呼び出しが重複する時間範囲で結果を生成しないようにする必要があります。たとえば、前の例では、6 つの呼び出しがありました。各呼び出しは 2 時間の時間範囲をカバーするため、更新の重複を避けるために、呼び出しタイムスタンプはそれぞれ 2 時間分散されました。これにより、派生テーブルのデータが、ソーステーブルから集計された一致する状態になります。重複しない時間範囲を確保できない場合は、これらの実行が順番にトリガーされていることを確認します。時間範囲が重複する複数の実行を同時にトリガーすると、トリガーの失敗が表示され、これらの実行のエラーレポートにバージョン競合が表示される可能性があります。スケジュールされたクエリ呼び出しによって生成された結果には、呼び出しがトリガーされた日時に基づいてバージョンが割り当てられます。したがって、新しい呼び出しによって生成された行のバージョンは高くなります。上位バージョンのレコードは、下位バージョンのレコードを上書きできます。自動的にトリガーされるスケジュールされたクエリの場合、Timestream for LiveAnalytics はスケジュールを自動的に管理するため、後続の呼び出しの時間範囲が重複してもこれらの問題は表示されません。
-
前述のように、@scheduled_runtime の任意のタイムスタンプ値を使用して呼び出しをトリガーできます。したがって、ソーステーブルでデータが更新された範囲に対応する派生テーブルで適切な時間範囲が更新されるように、値を適切に設定するのはユーザーの責任です。
-
これらの手動トリガーは、DISABLED 状態のスケジュールされたクエリにも使用できます。これにより、DISABLED 状態であるため、自動スケジュールで実行されない特別なクエリを定義できます。むしろ、手動トリガーを使用して、データ修正や遅延到着のユースケースを管理できます。