遅延到着データの処理 - Amazon Timestream

Amazon Timestream for LiveAnalytics に類似した機能をご希望の場合は Amazon Timestream for InfluxDB をご検討ください。リアルタイム分析に適した、シンプルなデータインジェストと 1 桁ミリ秒のクエリ応答時間を特徴としています。詳細については、こちらを参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

遅延到着データの処理

データの到着が大幅に遅れることがあります。例えば、Timestream for LiveAnalytics にデータが取り込まれた時刻が、取り込まれる行に関連付けられたタイムスタンプよりも大幅に遅くなる場合です。前の例では、@scheduled_runtime パラメータで定義された時間範囲を使用して、遅延到着データを織り込む方法を確認しました。ただし、データが数時間や数日遅延しうるユースケースがある場合は、派生テーブルの事前計算が適切に更新され、遅延到着データが反映されるよう、別のパターンが必要になる可能性があります。遅延到着データに関する一般的な情報については、「データの書き込み (挿入とアップサート)」を参照してください。

以下で、この遅延到着データに対処する 2 つの方法を説明します。

  • データ到着の遅延が予測可能な場合は、「キャッチアップ」という別のスケジュールされた計算を使用して、遅延到着データの集計を更新できます。

  • データ到着の遅延が予測不可能な場合や不定期に発生する場合は、手動実行によって派生テーブルを更新できます。

ここでは、データ遅延到着のシナリオを取り上げます。ただし、ソーステーブルのデータを変更し、派生テーブルの集計を更新する場合、データ修正にも同じ原則が適用されます。

スケジュールされたキャッチアップクエリ

遅延なく到着したデータを集計するクエリ

以下は、データ到着の遅延が予測可能な場合に、集計を自動的に更新する方法を示すパターンです。下のリアルタイムデータのスケジュールされた計算に関する前述の例の 1 つを考えてみましょう。このスケジュールされた計算では、派生テーブルが 30 分おきに更新され、最大 1 時間の遅延データが既に織り込まれています。

{ "Name": "MultiPT30mPerHrPerTimeseriesDPCount", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 1h AND @scheduled_runtime + 1h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0/30 * * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }

遅延到着データの集計を更新するキャッチアップクエリ

次に、データが約 12 時間遅延する可能性があるケースを考えてみましょう。以下は、同じクエリのバリアントです。ただし、スケジュールされた計算がトリガーされる場合と比較すると、最大 12 時間の遅延データの集計を計算する点が異なります。例えば、下の例のクエリがターゲットとする時間範囲は、クエリがトリガーされる 2 時間前~14 時間前です。さらに、スケジュール式 cron(0 0,12 * * ? *) により、計算は毎日 00:00 UTC と 12:00 UTC にトリガーされます。したがって、クエリが 2021–12-01 00:00:00 にトリガーされると、時間範囲 2021-11-30 10:00:00~2021-11-30 22:00:00 の集計が更新されます。スケジュールされたクエリでは、Timestream for LiveAnalytics の書き込みと類似するアップサートセマンティクスが使用されます。ウィンドウ内に遅延到着データがある場合、またはより新しい集計が見つかった場合 (元のスケジュールされた計算がトリガーされた時点で存在しなかった新しいグループがこの集計に表示される場合など) に、このキャッチアップクエリによって集計値が新しい値に更新され、新しい集計が派生テーブルに挿入されます。同様に、次のインスタンスが 2021-12-01 12:00:00 にトリガーされると、そのインスタンスによって 2021-11-30 22:00:00~2021-12-01 10:00:00 の範囲内の集計が更新されます。

{ "Name": "MultiPT12HPerHrPerTimeseriesDPCountCatchUp", "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 14h AND bin(@scheduled_runtime, 1h) - 2h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)", "ScheduleConfiguration": { "ScheduleExpression": "cron(0 0,12 * * ? *)" }, "NotificationConfiguration": { "SnsConfiguration": { "TopicArn": "******" } }, "TargetConfiguration": { "TimestreamConfiguration": { "DatabaseName": "derived", "TableName": "dp_per_timeseries_per_hr", "TimeColumn": "hour", "DimensionMappings": [ { "Name": "region", "DimensionValueType": "VARCHAR" }, { "Name": "cell", "DimensionValueType": "VARCHAR" }, { "Name": "silo", "DimensionValueType": "VARCHAR" }, { "Name": "availability_zone", "DimensionValueType": "VARCHAR" }, { "Name": "microservice_name", "DimensionValueType": "VARCHAR" }, { "Name": "instance_type", "DimensionValueType": "VARCHAR" }, { "Name": "os_version", "DimensionValueType": "VARCHAR" }, { "Name": "instance_name", "DimensionValueType": "VARCHAR" }, { "Name": "process_name", "DimensionValueType": "VARCHAR" }, { "Name": "jdk_version", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": { "TargetMultiMeasureName": "numDataPoints", "MultiMeasureAttributeMappings": [ { "SourceColumn": "numDataPoints", "MeasureValueType": "BIGINT" } ] } } }, "ErrorReportConfiguration": { "S3Configuration" : { "BucketName" : "******", "ObjectKeyPrefix": "errors", "EncryptionOption": "SSE_S3" } }, "ScheduledQueryExecutionRoleArn": "******" }

上の例では、遅延到着が 12 時間までに制限されており、リアルタイムウィンドウより後に到着するデータについては、派生テーブルを 12 時間おきに更新すると想定しています。派生テーブルを 1 時間おきに更新するようこのパターンを調整して、派生テーブルでの遅延到着データの反映を早めることができます。同様に、予測可能な遅延到着データを処理するため、時間範囲を 12 時間よりも長くなるよう (1 日や 1 週間以上などに) 調整することもできます。

予測不可能な遅延到着データの手動実行

予測不可能な遅延到着データがあるケースや、ソースデータを変更し、事後にいくつかの値を更新したケースが考えられます。このようなケースではすべて、スケジュールされたクエリを手動でトリガーして、派生テーブルを更新できます。以下はこの処理を実行する方法の例です。

派生テーブル dp_per_timeseries_per_hr に計算が書き込まれるユースケースがあるとします。テーブル devops のベースデータが 2021-11-30 23:00:00~2021-12-01 00:00:00 の時間範囲で更新されました。MultiPT30mPerHrPerTimeseriesDPCount と MultiPT12HPerHrPerTimeseriesDPCountCatchUp の 2 つのスケジュールされたクエリを使用して、この派生テーブルを更新できます。Timestream for LiveAnalytics で作成するスケジュールされた計算にはそれぞれ、計算の作成時またはリストオペレーションの実行時に取得する一意の ARN があります。このオペレーションを実行するために、計算の ARN と、クエリによって取得されるパラメータ @scheduled_runtime の値を使用できます。

MultiPT30mPerHrPerTimeseriesDPCount の計算に ARN arn_1 があり、この計算を使用して派生テーブルを更新するとします。前述のスケジュールされた計算では、@scheduled_runtime 値の 1 時間前と 1 時間後に集計が更新されるため、@scheduled_runtime パラメータの 2021-12-01 00:00:00 の値を使用して、更新の時間範囲 (2021-11-30 23:00:00~2021-12-01 00:00:00) をカバーできます。ExecuteScheduledQuery API を使用してこの計算の ARN とエポック秒の時間パラメータ値 (UTC) を渡すことで、これを実現できます。以下は AWS CLI を使用した例で、Timestream for LiveAnalytics でサポートされている SDK を使用して同じパターンに従うことができます。

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1

前の例では、プロファイルはこの API コールを行うための適切な権限を持つ AWS プロファイルであり、1638316800 は 2021-12-01 00:00:00 のエポック秒に対応します。この手動トリガーは、希望する期間にシステムがこの呼び出しをトリガーしたと仮定して、自動トリガーとほぼ同じように動作します。

より長い期間に更新があった場合 (ベースデータが 2021-11-30 23:00:00~2021-12-01 11:00:00 に更新された場合など)、前述のクエリを複数回トリガーして、この時間範囲全体をカバーすることができます。例えば、次のような 6 つの異なる実行が可能です。

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638324000 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638331200 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638338400 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638345600 --profile profile --region us-east-1 aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638352800 --profile profile --region us-east-1

上の 6 つのコマンドは、2021-12-01 00:00:00、2021-12-01 02:00:00、2021-12-01 04:0:00、2021-12-01 06:00:00、2021-12-01 08:00:00、2021-12-01 10:00 に呼び出されたスケジュールされた計算に対応します。

また、2021-12-01 13:00:00 にトリガーされた計算 MultiPT12HPerHrPerTimeseriesDPCountCatchUp を 1 回の実行で使用して、12 時間の時間範囲全体の集計を更新することもできます。例えば、arn_2 がその計算の ARN である場合、CLI から次のコマンドを実行できます。

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_2 --invocation-time 1638363600 --profile profile --region us-east-1

手動トリガーでは、自動トリガータイムスタンプに合わせて調整する必要のない呼び出し時間パラメータのタイムスタンプを使用できます。例えば、前の例では、自動スケジュールでは計算がタイムスタンプ 2021-12-01 10:00:00、2021-12-01 12:00:00、2021-12-02 00:00:00 でのみトリガーされますが、2021-12-01 13:00:00 に計算をトリガーしました。Timestream for LiveAnalytics により、手動オペレーションに必要とされる適切な値を使用して、計算を柔軟にトリガーできます。

ExecuteScheduledQuery API を使用する際の重要な考慮事項を以下に示します。

  • これらの呼び出しを複数トリガーする場合は、これらの呼び出しによって生成される結果の時間範囲が重複しないようにする必要があります。例えば、前述の例では、6 つの呼び出しがありました。各呼び出しは 2 時間の時間範囲をカバーするため、更新の重複を避けるために、呼び出しタイムスタンプはそれぞれ 2 時間の間隔が空けられました。これにより、派生テーブル内のデータの状態が、ソーステーブルからの集計と一致します。時間範囲の重複を避けられない場合は、これらの実行が順番にトリガーされるようにしてください。時間範囲が重複する複数の実行を同時にトリガーすると、トリガーエラーが表示され、これらの実行のエラーレポートにバージョン競合が表示される可能性があります。スケジュールされたクエリ呼び出しによって生成された結果には、呼び出しがトリガーされたタイミングに基づいてバージョンが割り当てられます。したがって、新しい呼び出しによって生成された行ほど、バージョンは上位になります。上位バージョンのレコードは、下位バージョンのレコードを上書きできます。自動的にトリガーされるスケジュールされたクエリの場合、Timestream for LiveAnalytics がスケジュールを自動的に管理するため、後続の呼び出しの時間範囲が重複してもこのような問題は発生しません。

  • 前述のように、@scheduled_runtime の任意のタイムスタンプ値を使用して呼び出しをトリガーできます。したがって、ソーステーブルでデータが更新された範囲に合わせて、派生テーブルで適切な時間範囲が更新されるよう値を適切に設定するのはユーザーの責任です。

  • これらの手動トリガーは、DISABLED 状態のスケジュールされたクエリにも使用できます。これにより、DISABLED 状態であるため、自動スケジュールで実行されない特別なクエリを定義できます。これらのクエリには手動トリガーを使用して、データ修正や遅延到着のユースケースを管理できます。