Amazon S3 から Timestream for InfluxDB へのデータ取り込みの自動化 - Amazon Timestream

Amazon Timestream for LiveAnalytics に類似した機能をご希望の場合は Amazon Timestream for InfluxDB をご検討ください。リアルタイム分析に適した、シンプルなデータインジェストと 1 桁ミリ秒のクエリ応答時間を特徴としています。詳細については、こちらを参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon S3 から Timestream for InfluxDB へのデータ取り込みの自動化

Timestream for LiveAnalytics エクスポートツールでアンロードプロセスが完了すると、自動化プロセスの次のステップが開始されます。この自動化では、InfluxDB のインポートツールを使用して、データを特殊な時系列構造に転送します。このプロセスは、Timestream のデータモデルを InfluxDB のメジャーメント、タグ、フィールドの概念と一致するように変換します。最後に、InfluxDB のラインプロトコルを使用してデータを効率的にロードします。

移行を完了するためのワークフローは、次の 4 つの段階に分かれています。

  1. Timestream for LiveAnalytics エクスポートツールを使用してデータをアンロードします。

  2. データ変換: Amazon Athena を使用して Timestream for LiveAnalytics のデータを InfluxDB ラインプロトコル形式 (カーディナリティ評価の後に定義されたスキーマに基づく) に変換します。

  3. データインジェスト: ラインプロトコルデータセットを Timestream for InfluxDB インスタンスに取り込みます。

  4. 検証: オプションで、すべてのラインプロトコルポイントが取り込まれたことを検証できます (データ変換ステップ中に --add-validation-field true が必要)。

データ変換

データ変換のために、Amazon Athena を使用して Timestream for LiveAnalytics でエクスポートされた Parquet 形式データを InfluxDB のラインプロトコル形式に変換するスクリプトを開発しました。Amazon Athena は、サーバーレスのクエリサービスを提供し、専用のコンピューティングリソースを必要とせずにコスト効率の高い方法で大量の時系列データを変換することができます。

スクリプトは、次を実行します。

  • Timestream for LiveAnalytics でエクスポートされたデータを Amazon S3 バケットから Amazon Athena テーブルにロードします。

  • Athena テーブルに保存されているデータからラインプロトコルにデータマッピングと変換を実行し、S3 バケットに保存します。

データマッピング

次の表は、Timestream for LiveAnalytics データがどのようにラインプロトコルデータにマッピングされるかを示しています。

Timestream for LiveAnalytics の概念 ラインプロトコルの概念

テーブル名

測定

ディメンション

タグ

メジャー名

タグ (オプション)

メジャー

フィールド

時間

タイムスタンプ

前提条件とインストール

変換スクリプトの「README」の「Prerequisites」セクションと「Installation」セクションを参照してください。

使用方法

example_s3_bucket バケットに保存されているデータを example_database の Timestream for LiveAnalytics example_table テーブルから変換するには、次のコマンドを実行します。

python3 transform.py \ --database-name example_database \ --tables example_table \ --s3-bucket-path example_s3_bucket \ --add-validation-field false

スクリプトが完了すると次のようになります。

  • Athena では、example_database_example_table というテーブルが作成され、このテーブルに Timestream for LiveAnalytics のデータが含まれます。

  • Athena では、lp_example_database_example_table というテーブルが作成され、これにはラインプロトコルポイントに変換された Timestream for LiveAnalytics のデータが含まれます。

  • example_s3_bucket という S3 バケットのパス example_database/example_table/unload-<%Y-%m-%d-%H:%M:%S>/line-protocol-output 内に、ラインプロトコルデータが保存されます。

推奨事項

スクリプトの最新の使用状況についての詳細は、変換スクリプトの「README」で確認してください。出力は後の移行手順 (検証など) で必要となります。カーディナリティを向上させるためにディメンションを除外した場合は、--dimensions-to-fields 引数を使用して特定のディメンションをフィールドに変更することで、スキーマを調整してカーディナリティを減らします。

検証用のフィールドの追加

検証用のフィールドを追加する方法については、変換スクリプトの「README」の「Adding a Field for Validation」セクションを参照してください。

Timestream for InfluxDB へのデータインジェスト

InfluxDB の取り込みスクリプトは、圧縮されたラインプロトコルデータセットを Timestream for InfluxDB に取り込みます。gzip 圧縮ラインプロトコルファイルを含むディレクトリは、取り込み先の InfluxDB バケットとともにコマンドライン引数として渡されます。このスクリプトは、マルチ処理を使用して一度に複数のファイルを取り込み、InfluxDB と、スクリプトを実行するマシンでリソースを活用するように設計されています。

スクリプトは、次を実行します。

  • 圧縮ファイルを抽出し、InfluxDB に取り込む。

  • 再試行メカニズムとエラー処理を実装する。

  • 再開に向けて取り込みの成功と失敗を追跡する。

  • ラインプロトコルデータセットから読み取るときに I/O 操作を最適化する。

前提条件とインストール

GitHub の取り込みスクリプトの「README」の「Prerequisites」と「Installation」セクションを参照してください。

データ準備

取り込みに必要な圧縮ラインプロトコルファイルは、データ変換スクリプトによって生成されます。次の手順に従ってデータを準備します。

  1. 変換したデータセットを保持するのに十分なストレージがある EC2 インスタンスを設定します。

  2. 変換したデータを S3 バケットからローカルディレクトリに同期します。

    aws s3 sync \ s3://your-bucket-name/path/to/transformed/data \ ./data_directory
  3. データディレクトリ内のすべてのファイルへの読み取りアクセス権があることを確認します。

  4. 次の取り込みスクリプトを実行して、Timestream for InfluxDB にデータを取り込みます。

使用方法

python influxdb_ingestion.py <bucket_name> <data_directory> [options]

基本的な使用法

python influxdb_ingestion.py my_bucket ./data_files

取り込みレート

取り込みレートに関するテストをいくつか実行しました。10 個のワーカーで取り込みスクリプトを実行し、約 500 GB のラインプロトコルを 8XL Timestream for InfluxDB インスタンスに取り込む C5N.9XL EC2 インスタンスを使用した取り込みテストです。

  • 3K IOPS 15.86 GB/時間。

  • 12K IOPS 70.34 GB/時間。

  • 16K IOPS 71.28 GB/時間。

推奨事項

  • 並列処理を処理するのに十分な CPU コアを持つ EC2 インスタンスを使用します。

  • インスタンスに、変換済みのデータセット全体を保持するのに十分なストレージがあり、さらに抽出用に空きがあることを確認します。

    • 一度に抽出されるファイルの数は、スクリプトの実行中に設定されたワーカーの数と等しくなります。

  • レイテンシーを最小限に抑えるために、EC2 インスタンスを InfluxDB インスタンスと同じリージョンと AZ (可能な場合) に配置します。

  • C5N など、ネットワークオペレーション向けに最適化されたインスタンスタイプの使用を検討してください。

  • 高い取り込みレートが必要な場合は、Timestream for InfluxDB インスタンスに少なくとも 12K IOPS をお勧めします。Timestream for InfluxDB インスタンスのサイズによってスクリプトのワーカー数を増やすことで、さらなる最適化を実現できます。

詳細については、取り込みスクリプトの「README」を参照してください。