を使用して検証、変換、パーティショニングで ETL パイプラインをオーケストレーションする AWS Step Functions - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

を使用して検証、変換、パーティショニングで ETL パイプラインをオーケストレーションする AWS Step Functions

Sandip Gangapadhyay、Amazon Web Services

概要

このパターンは、パフォーマンスとコストを最適化するために大規模な CSV データセットを検証、変換、圧縮、およびパーティション化するサーバーレスの抽出、変換、ロード (ETL) パイプラインを構築する方法を示しています。パイプラインは によって調整 AWS Step Functions され、エラー処理、自動再試行、ユーザー通知機能が含まれています。

CSV ファイルが Amazon Simple Storage Service (Amazon S3) バケットの出典フォルダにアップロードされると、ETL パイプラインが実行を開始します。パイプラインは、ソース CSV ファイルの内容とスキーマを検証し、CSV ファイルを圧縮された Apache Parquet 形式に変換し、データセットを年、月、日ごとにパーティション化し、分析ツールが処理できるように別のフォルダに保存します。

このパターンを自動化するコードは、GitHub の ETL Pipeline with AWS Step Functions repository にあります。

前提条件と制限

前提条件

  • アクティブ AWS アカウント。

  • AWS Command Line Interface (AWS CLI) を にインストールして設定し AWS アカウント、 AWS CloudFormation スタックをデプロイして AWS リソースを作成できるようにします。 AWS CLI バージョン 2 を使用することをお勧めします。手順については、 AWS CLI ドキュメントの「 の最新バージョンのインストールまたは更新 AWS CLI」を参照してください。設定手順については、 AWS CLI ドキュメントの「設定と認証情報ファイルの設定」を参照してください。

  • Amazon S3 バケット。

  • 正しいスキーマの CSV データセット。(このパターンに含まれる コードリポジトリ には、使用できる正しいスキーマとデータ型が記載されたサンプル CSV ファイルが用意されています)。

  • をサポートするウェブブラウザ AWS Management Console。 でサポートされるブラウザのリストについては、を参照してください。

  • AWS Glue コンソールアクセス。

  • AWS Step Functions コンソールアクセス。

制約事項

  • では AWS Step Functions、履歴ログを保持するための最大制限は 90 日です。詳細については、 AWS Step Functions ドキュメントの「Step Functions サービスクォータ」を参照してください。

製品バージョン

  • 用の Python 3.13 AWS Lambda

  • AWS Glue バージョン 4.0

アーキテクチャ

Step Functions、AWS Glue、Amazon SNS を介した S3 ソースバケットからの ETL プロセスを 10 ステップで実行します。

この図に示されているワークフローは、以下の大まかなステップで構成されています。

  1. ユーザーは CSV ファイルを Amazon S3 のソースフォルダにアップロードします。

  2. Amazon S3 通知イベントは、 AWS Step Functions ステートマシンを起動する AWS Lambda 関数を開始します。

  3. Lambda 関数は、未加工の CSV ファイルのスキーマとデータ型を検証します。

  4. 検証結果に応じて:

    1. ソースファイルの検証が成功すると、ファイルはステージフォルダーに移動してさらに処理されます。

    2. 検証に失敗すると、ファイルはエラーフォルダに移動し、Amazon Simple Notification Service (Amazon SNS) を通じてエラー通知が送信されます。

  5. AWS Glue クローラは、Amazon S3 のステージフォルダから raw ファイルのスキーマを作成します。

  6. AWS Glue ジョブは、raw ファイルを Parquet 形式に変換、圧縮、パーティション化します。

  7. AWS Glue ジョブは、Amazon S3 の変換フォルダにもファイルを移動します。

  8. AWS Glue クローラは、変換されたファイルからスキーマを作成します。生成されたスキーマは、どの分析ジョブでも使用できます。 を使用して、Amazon Athena でクエリを実行することができます。

  9. パイプラインがエラーなしで完了すると、スキーマファイルはアーカイブフォルダーに移動されます。エラーが発生した場合、ファイルは代わりにエラーフォルダーに移動されます。

  10. Amazon SNS は、パイプラインの完了ステータスに基づいて成功または失敗を示す通知を送信します。

このパターンで使用されるすべての AWS リソースはサーバーレスです。管理するサーバーはありません。

ツール

AWS のサービス

  • AWS Glue – AWS Glue は、分析用にデータを簡単に準備してロードできるフルマネージド ETL サービスです。

  • AWS Step Functions – AWS Step Functions は、 AWS Lambda 関数と他の を組み合わせてビジネスクリティカルなアプリケーション AWS のサービス を構築できるサーバーレスオーケストレーションサービスです。 AWS Step Functions グラフィカルコンソールでは、アプリケーションのワークフローを一連のイベント駆動型のステップとして確認できます。

  • Amazon S3 – Amazon Simple Storage Service (Amazon S3) は、業界をリードするスケーラビリティ、データ可用性、セキュリティ、およびパフォーマンスを提供するオブジェクトストレージサービスです。

  • Amazon SNS — Amazon Simple Notification Service (Amazon SNS) は、マイクロサービス、分散システム、サーバーレスアプリケーションを分離できる、可用性が高く、耐久性があり、安全な、完全マネージド型のパブ/サブメッセージングサービスです。

  • AWS Lambda – AWS Lambda は、サーバーのプロビジョニングや管理を行わずにコードを実行できるコンピューティングサービスです。 は、必要な場合にのみコード AWS Lambda を実行し、1 日あたり数リクエストから 1 秒あたり数千リクエストまで自動的にスケーリングします。

コード

このパターンのコードは、GitHub の ETL Pipeline with AWS Step Functions repository で入手できます。コードリポジトリには以下のファイルとフォルダが含まれています。

  • template.yml – を使用して ETL パイプラインを作成するための AWS CloudFormation テンプレート AWS Step Functions。

  • parameter.json — すべてのパラメータとパラメータ値が含まれます。エピック セクションで説明されているように、このファイルを更新してパラメータ値を変更します。

  • myLayer/python フォルダ – このプロジェクトに必要な AWS Lambda レイヤーの作成に必要な Python パッケージが含まれています。

  • lambda フォルダー — 次の Lambda 関数が含まれます。

    • move_file.py — ソースデータセットをアーカイブフォルダー、トランスフォームフォルダー、またはエラーフォルダーに移動します。

    • check_crawler.py – AWS Glue クローラのステータスは、障害メッセージを送信する前に、RETRYLIMIT 環境変数によって設定された回数だけチェックします。

    • start_crawler.py – AWS Glue クローラを開始します。

    • start_step_function.py – 開始します AWS Step Functions。

    • start_codebuild.py – AWS CodeBuild プロジェクトを開始します。

    • validation.py — 入力された未加工データセットを検証します。

    • s3object.py – Amazon S3 バケット内に必要なディレクトリ構造を作成します。

    • notification.py — パイプラインの最後に成功通知またはエラー通知を送信します。

これらのファイルを使用するには、エピックセクションの指示に従ってください。

エピック

タスク説明必要なスキル

AWS SAM コードリポジトリを複製します。

  1. リポジトリで ETL パイプライン AWS Step Functionsを開きます。

  2. メインリポジトリページのファイルリストの上にある コード を選択し、Clone with HTTPS の下にある URL をコピーします。

  3. コマンドラインインターフェースで、作業ディレクトリをサンプルファイルを保存する場所に変更します。

  4. ターミナルまたはコマンドラインプロンプトで、 コマンドを実行します。

    git clone <repoURL>

    where はステップ 2 でコピーした URL <repoURL> を指します。

開発者

パラメータ値を更新します。

リポジトリのローカルコピーで、parameter.json ファイルを編集し、以下のようにデフォルトのパラメータ値を更新します。

  • pS3BucketName ─ データセットを保存するための Amazon S3 バケットの名前。このバケットはテンプレートによって自動的に作成されます。バケット名はグローバルに一意である必要があります。

  • pSourceFolder ─ ソース CSV ファイルのアップロードに使用される Amazon S3 バケット内のフォルダの名前。

  • pStageFolder ─ プロセス中にステージングエリアとして使用される Amazon S3 バケット内のフォルダの名前。

  • pTransformFolder ─ 変換されたデータセットとパーティション化されたデータセットを保存するために使用される Amazon S3 バケット内のフォルダの名前。

  • pErrorFolder ─ ソース CSV ファイルが検証できない場合に移動する Amazon S3 バケット内のフォルダ。

  • pArchiveFolder ─ ソース CSV ファイルのアーカイブに使用される Amazon S3 バケット内のフォルダの名前。

  • pEmailforNotification — 成功/エラー通知を受信するための有効な E メールアドレス。

  • pPrefix ─ AWS Glue クローラ名で使用されるプレフィックス文字列。

  • pDatasetSchema — ソースファイルの検証対象となるデータセットスキーマ。Cerberus Python パッケージは、ソースデータセットの検証に使用されます。詳細については、Cerberus のウェブサイトを参照してください。

開発者

ソースコードを Amazon S3 バケットにアップロードします。

ETL パイプラインを自動化する AWS CloudFormation テンプレートをデプロイする前に、テンプレートのソースファイルをパッケージ化し、Amazon S3 バケットにアップロードする必要があります。これを行うには、事前設定されたプロファイルで次の AWS CLI コマンドを実行します。

aws cloudformation package --template-file template.yml --s3-bucket <bucket_name> --output-template-file packaged.template --profile <profile_name>

各パラメータの意味は次のとおりです。

  • <bucket_name> は、スタックをデプロイ AWS リージョン する 内の既存の Amazon S3 バケットの名前です。このバケットは、CloudFormation テンプレートのソースコードパッケージを保存するために使用されます。

  • <profile_name> は、 のセットアップ時に事前設定した有効な AWS CLI プロファイルです AWS CLI。

開発者
タスク説明必要なスキル

CloudFormation のテンプレートをデプロイします。

AWS CloudFormation テンプレートをデプロイするには、次の AWS CLI コマンドを実行します。

aws cloudformation deploy --stack-name <stack_name> --template-file packaged.template --parameter-overrides file://parameter.json --capabilities CAPABILITY_IAM --profile <profile_name>

各パラメータの意味は次のとおりです。

  • <stack_name> はCloudFormation スタックのユニークな識別子です。

  • <profile-name> は事前設定された AWS CLI プロファイルです。

開発者

進捗確認。

AWS CloudFormation コンソールで、スタック開発の進行状況を確認します。ステータスが CREATE_COMPLETE の場合、スタックは正常にデプロイされています。

開発者

AWS Glue データベース名を書き留めます。

スタックの出力タブには、 AWS Glue データベースの名前が表示されます。キー名は、GlueDBOutput です。

開発者
タスク説明必要なスキル

ETL パイプラインを開始します。

  1. Amazon S3 バケット内のソースフォルダ (source、または parameter.json ファイルで設定したフォルダ名) に移動します。

  2. サンプル CSV ファイルをこのフォルダにアップロードします。(コードリポジトリには、使用できる Sample_Bank_Transaction_Raw_Dataset.csv というサンプルファイルが用意されています)。ファイルをアップロードすると、Step Functions を通じて ETL パイプラインが開始されます。

  3. Step Functions コンソール で ETL パイプラインのステータスを確認します。

開発者

パーティション化されたデータセットを確認します。

ETL パイプラインが完了したら、パーティション化されたデータセットが Amazon S3 トランスフォームフォルダ (transform または parameter.json ファイルに設定したフォルダ名) にあることを確認します。

開発者

パーティション化された AWS Glue データベースを確認します。

  1. AWS Glue コンソールで、スタックによって作成された AWS Glue データベースを選択します (これは前のエピックでメモしたデータベースです)。

  2. パーティション分割されたテーブルが で使用可能であることを確認します AWS Glue Data Catalog。

開発者

クエリを実行する。

(オプション) Amazon Athena を使用して、パーティション分割され変換されたデータベースでアドホッククエリを実行します。手順については、 AWS ドキュメントのAmazon Athena で SQL クエリを実行する」を参照してください。

データベースアナリスト

トラブルシューティング

問題ソリューション

AWS Identity and Access Management AWS Glue ジョブとクローラの (IAM) アクセス許可

AWS Glue ジョブまたはクローラをさらにカスタマイズする場合は、 AWS Glue ジョブで使用される IAM ロールで適切な IAM アクセス許可を付与するか、データアクセス許可を付与してください AWS Lake Formation。詳細については、AWS のドキュメントを参照してください。

関連リソース

AWS のサービス ドキュメント

追加情報

次の図は、 AWS Step Functions Inspector パネルから成功した ETL パイプラインの AWS Step Functions ワークフローを示しています。

入力 .csv の検証、データのクローリング、AWS Glue ジョブの実行のための Step Functions ワークフロー。

次の図は、Step Functions Inspector パネルからの入力検証エラーが原因で失敗する ETL パイプラインの AWS Step Functions ワークフローを示しています。

Step Functions ワークフローが失敗し、ファイルがエラーフォルダに移動する。