ItemReader (Map) - AWS Step Functions

ItemReader (Map)

ItemReader フィールドは JSON オブジェクトで、データセットとその場所を指定します。分散マップ状態はこのデータセットを入力として使用します。

次の例では、Amazon S3 バケットに保存されているテキスト区切りファイルのデータセットを対象に、JSONPath ベースのワークフローでの ItemReader フィールドの構文を示しています。

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv", "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

次の JSONata ベースのワークフローでは、ParametersArguments に置き換えられていることに注意してください。

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

ItemReader フィールドの内容

データセットによって、ItemReader フィールドの内容は異なります。例えば、データセットがワークフローの前のステップから渡された JSON 配列である場合、ItemReader フィールドは省略されます。データセットが Amazon S3 データソースである場合、このフィールドには次のサブフィールドが含まれます。

Resource

Step Functions が使用する Amazon S3 API 統合アクション (例: arn:aws:states:::s3:getObject)

Arguments (JSONata) or Parameters (JSONPath)

データセットが保存されている Amazon S3 バケットの名前とオブジェクトキーを指定する JSON オブジェクト。

バケットでバージョニングが有効な場合は、Amazon S3 オブジェクトのバージョンも指定できます。

ReaderConfig

以下の詳細を指定する JSON オブジェクト。

  • InputType

    CSVJSONJSONLPARQUETMANIFEST のいずれかの値を受け入れます。

    テキスト区切りファイル (CSV)、オブジェクト、JSON ファイル、JSON Lines、Parquet ファイル、Athena マニフェスト、Amazon S3 インベントリリストなど、Amazon S3 データソースのタイプを指定します。Workflow Studio では、[S3 項目ソース] から入力タイプを選択できます。

    S3GetObject 取得を使用するほとんどの入力タイプは、パラメータの ExpectedBucketOwner および VersionId フィールドもサポートしています。Parquet ファイルは例外で、VersionId をサポートしていません。

    入力ファイルは、GZIP、ZSTD の外部圧縮タイプをサポートしています。

    ファイル名の例: myObject.jsonl.gz および myObject.csv.zstd

    注: Parquet ファイルは内部で圧縮されるバイナリ形式のファイルです。GZIP、ZSTD、および Snappy 圧縮がサポートされています。

  • Transformation

    オプション。値は NONE または LOAD_AND_FLATTEN のいずれかになります。

    指定しない場合は、NONE とみなされます。LOAD_AND_FLATTEN に設定した場合は、InputType も設定する必要があります。

    デフォルトでは、マップは S3:ListObjectsV2 の呼び出しから返されるメタデータオブジェクトを反復処理します。LOAD_AND_FLATTEN に設定すると、マップは結果リストで参照される実際のデータオブジェクトを読み取って処理します。

  • ManifestType

    オプション。値は ATHENA_DATA または S3_INVENTORY のいずれかになります。

    注: S3_INVENTORY に設定した場合、タイプは CSV とみなされるため、InputType も指定しないでください

  • CSVDelimiter

    このフィールドは、InputTypeCSV または MANIFEST の場合に指定できます。

    COMMA (デフォルト)、PIPESEMICOLONSPACETAB のいずれかの値を受け入れます。

    注記

    CSVDelimiter フィールドを使用すると、ItemReader はカンマ以外の文字で区切られたファイルを処理できます。「CSV ファイル」という言及には、CSVDelimiter フィールドで指定された代替デリミタを使用するファイルも含まれます。

  • CSVHeaderLocation

    このフィールドは、InputTypeCSV または MANIFEST の場合に指定できます。

    以下の値のいずれかを受け入れ、列ヘッダーの位置を指定します。

    • FIRST_ROW - ファイルの最初の行がヘッダーの場合は、このオプションを使用します。

    • GIVEN - このオプションを使用して、ステートマシン定義内のヘッダーを指定します。

      例えば、ファイルに次のデータが含まれている場合を考えてみましょう。

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      CSV ヘッダーとして次のような JSON 配列を指定することもあります。

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    CSV ヘッダーサイズ

    Step Functions は、テキスト区切りファイルに対して最大 10 KiB のヘッダーをサポートします。

  • MaxItems

    デフォルトでは、Map ステートは指定されたデータセット内のすべての項目を反復処理します。MaxItems を設定すると、Map ステートに渡されるデータ項目の数を制限できます。例えば、1,000 行のテキスト区切りファイルを指定して、上限を 100 に設定した場合、インタープリタは Distributed Map ステートに 100 行のみを渡します。Map 状態は項目をヘッダー行の後から順番に処理します。

    JSONPath ワークフローでは、MaxItemsPath と、ステートの入力内で整数に解決されるキーと値のペアへの参照パスを使用できます。MaxItems または MaxItemsPath を指定できますが、両方を指定することはできません。

    注記

    最大 100,000,000 の上限を設定でき、それを超えると Distributed Map は項目の読み取りを停止します。

アカウントとリージョンの要件

Amazon S3 バケットはステートマシンと同じ AWS アカウント および AWS リージョン にある必要があります。

ステートマシンが同じ AWS リージョン、異なる AWS アカウント に属するバケット内のファイルにアクセスできる場合でも、Step Functions が Amazon S3 バケット内のオブジェクトを一覧表示できるのは、ステートマシンと同じ AWS リージョン、同じ AWS アカウント に属するバケット内のオブジェクトのみです。

ネストされたデータセットの処理 (2025 年 9 月 11 日更新)

新しい Transformation パラメータを使用すると、LOAD_AND_FLATTEN の値を指定でき、マップは S3:ListObjectsV2 の呼び出しからの結果リストで参照されている実際のデータオブジェクトを読み取ります。

このリリース以前は、メタデータを取得し、実際のデータを処理するために、ネストされた分散マップを作成する必要がありました。最初のマップは S3:ListObjectsV2 によって返されたメタデータを反復処理し、子ワークフローを呼び出します。各子ステートマシン内の別のマップが個々のファイルから実際のデータを読み取ります。変換オプションを使用すると、両方のステップを一度に実行できます。

例えば、システムが毎時生成し、Amazon S3 に保存する過去 24 個のログファイルに対して日時監査を行うとします。Distributed Map ステートは S3:ListObjectsV2 でログファイルを一覧表示し、各オブジェクトの メタデータ を反復処理できます。さらに、Amazon S3 バケットに保存されている実際のデータオブジェクトをロードし、分析できるようになりました。

LOAD_AND_FLATTEN オプションを使用すると、スケーラビリティの向上、オープン状態のマップ実行数の削減、複数オブジェクトの同時処理が可能になります。Athena と Amazon EMR のジョブは通常、この新しい設定で処理できる出力を生成します。

次に示しているのは、ItemReader 定義のパラメータの例です。

{ "QueryLanguage": "JSONata", "States": { ... "Map": { ... "ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { // InputType is required if Transformation is LOAD_AND_FLATTEN. "InputType": "CSV | JSON | JSONL | PARQUET", // Transformation is OPTIONAL and defaults to NONE if not present "Transformation": "NONE | LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket1", "Prefix": "{% $states.input.PrefixKey %}" } }, ... } }

データセットの例

データセットとして以下のいずれかのオプションを指定できます。

注記

Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーに関する推奨事項」を参照してください。

分散マップ状態は、ワークフローの前のステップから渡された JSON 入力を受け入れることができます。

入力は JSON 配列、JSON オブジェクト、または JSON オブジェクトのノード内の配列のいずれかにできます。

Step Functions は、配列の要素、または JSON オブジェクトのキーと値のペアを直接反復処理します。

JSON オブジェクトから配列を含む特定のノードを選択するには、ItemsPath (マップ、JSONPath のみ) を使用するか、JSONata ステートの場合は Items フィールドで JSONata 式を使用できます。

個々の項目を処理するために、Distributed Map ステートは項目ごとに子ワークフロー実行を開始します。以下のタブには、Map 状態に渡される入力と、それに対応する子ワークフロー実行への入力の例が表示されます。

注記

データセットが前のステップからの JSON データである場合、ItemReader フィールドは不要です。

Input passed to the Map state

次の 3 つの項目からなる JSON 配列を考えてみましょう。

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

分散マップ状態は、3 つの子ワークフローの実行を開始します。実行するたびに、配列項目を入力として受け取ります。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

分散マップ状態では、Amazon S3 バケットに格納されているオブジェクトを反復できます。ワークフロー実行が Map ステートに到達すると、Step Functions は ListObjectsV2 API アクションを呼び出し、Amazon S3 オブジェクトメタデータの配列を返します。この配列の各項目には、バケットに保存されている実際のデータの [ETag][Key] などのデータが含まれています。

配列内の個々の項目を処理するために、分散マップ状態は子ワークフロー実行を開始します。例えば、Amazon S3 バケットに 100 個のイメージが含まれているとします。次に、ListObjectsV2 API アクションを呼び出した後に返される配列には 100 個のメタデータ項目が含まれています。Distributed Map ステートは 100 個の子ワークフロー実行を開始して各項目を処理します。

ネストされたワークフローを使用せずにデータオブジェクトを直接処理するには、LOAD_AND_FLATTEN 変換オプションを選択して項目を直接処理できます。

注記
  • Step Functions は、Amazon S3 コンソールを使用して Amazon S3 バケットに作成し各フォルダにも 1 つの項目を含めます。これらのフォルダ項目により、追加の子ワークフロー実行が開始されます。

    各フォルダに余分な子ワークフロー実行が作成されないようにするには、AWS CLI を使用してフォルダを作成することをお勧めします。詳細については、「AWS Command Line Interface ユーザーガイド」の「高レベルな S3 コマンド」を参照してください。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーに関する推奨事項」を参照してください。

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

ItemReader syntax

この例では、イメージ、JSON ファイル、オブジェクトを含むデータを、amzn-s3-demo-bucket という名前の Amazon S3 バケットの processData というプレフィックス内に整理しました。

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Prefix": "processData" } }
Input passed to a child workflow execution

Distributed Map ステートは、Amazon S3 バケットに存在するメタデータ項目の数だけ子ワークフロー実行を開始します。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

分散マップの入力ソースとしての S3 ListObjectsV2 のサポートの強化により、ステートマシンは Amazon S3 バケットから複数のデータオブジェクトを直接読み取って処理できるようになり、メタデータ処理のためのネストされたマップが不要になります。

LOAD_AND_FLATTEN オプションを使用すると、ステートマシンは次の処理を行います。

  • Amazon S3 の ListObjectsV2 呼び出しでリストされた各オブジェクトの実際の内容を読み取ります。

  • InputType (CSV、JSON、JSONL、Parquet) に基づいてその内容を解析します。

  • メタデータではなくファイルの内容 (行/レコード) から項目を作成します。

この変換オプションにより、メタデータの処理にネストされた分散マップは不要になります。LOAD_AND_FLATTEN オプションを使用すると、スケーラビリティの向上、アクティブ状態のマップ実行数の削減、複数のオブジェクトの同時処理が行われます。

次に示しているのは、ItemReader の設定です。

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { "InputType": "JSON", "Transformation": "LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "S3_BUCKET_NAME", "Prefix": "S3_BUCKET_PREFIX" } }
バケットプレフィックスに関する推奨事項

プレフィックスの末尾にスラッシュを含めることをお勧めします。例えば、folder1 をプレフィックスにしてデータを選択すると、ステートマシンは folder1/myData.csvfolder10/myData.csv の両方を処理します。folder1/ を使用すると、厳密に 1 つのフォルダのみが処理されます。

分散マップ状態は、Amazon S3 バケットにデータセットとして保存されている JSON ファイルを受け入れることができます。JSON ファイルには配列または JSON オブジェクトが含まれている必要があります。

ワークフローの実行が Map 状態に達すると、Step Functions は GetObject API アクションを呼び出して、指定された JSON ファイルを取得します。

JSON ファイルにネストされたオブジェクト構造が含まれている場合は、ItemsPointer を使用して、データセットを含む特定のノードを選択できます。例えば、次の設定では、inventory 内の featured products のネストされたリストが抽出されます。

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON", "ItemsPointer": "/inventory/products/featured" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "nested-data-file.json" } }

その後、Map 状態は配列内の各項目を反復処理し、項目ごとに子ワークフローの実行を開始します。例えば、JSON ファイルに 1000 個の配列項目が含まれている場合、Map 状態は 1000 個の子ワークフロー実行を開始します。

注記
  • 子ワークフロー実行の開始に使用される実行入力は、256 KiB を超えることはできません。ただし、オプションの ItemSelector フィールドを適用して項目のサイズを小さくすることで、Step Functions はテキスト区切りファイル、JSON、または JSON Lines ファイルから最大 8 MB の項目を読み取ることができます。

  • Step Functions は、Amazon S3 内の個々のファイルの最大サイズとして 10 GB をサポートしています。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーに関する推奨事項」を参照してください。

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

この例では、factcheck.json という JSON ファイルがあるとします。このファイルを Amazon S3 バケットに jsonDataset という名前のプレフィックスで保存しました。以下は、JSON データセットの例です。

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

分散マップ状態は、Json ファイルに存在する配列項目数と同じ数の子ワークフローの実行を開始します。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Distributed Map ステートは、Amazon S3 バケットに保存されている JSON Lines ファイルをデータセットとして受け入れることができます。

注記
  • 子ワークフロー実行の開始に使用される実行入力は、256 KiB を超えることはできません。ただし、オプションの ItemSelector フィールドを適用して項目のサイズを小さくすることで、Step Functions はテキスト区切りファイル、JSON、または JSON Lines ファイルから最大 8 MB の項目を読み取ることができます。

  • Step Functions は、Amazon S3 内の個々のファイルの最大サイズとして 10 GB をサポートしています。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーに関する推奨事項」を参照してください。

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

この例では、factcheck.jsonl という JSON Lines ファイルがあるとします。このファイルを Amazon S3 バケットに jsonlDataset という名前のプレフィックスで保存しました。次に示しているのは、そのファイルの内容の例です。

{"verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech"} {"verdict": "false", "statement_date": "6/7/2022", "statement_source": "television"} {"verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news"}
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSONL" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonlDataset/factcheck.jsonl" } }
Input to a child workflow execution

Distributed Map ステートは、JSONL ファイル内の行の数だけ子ワークフロー実行を開始します。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }
注記

CSVDelimiter フィールドを使用すると、ItemReader はカンマ以外の文字で区切られたファイルを処理できます。「CSV ファイル」という言及には、CSVDelimiter フィールドで指定された代替デリミタを使用するファイルも含まれます。

Distributed Map ステートは、Amazon S3 バケットにデータセットとして保存されているテキスト区切りファイルを受け入れることができます。テキスト区切りファイルをデータセットとして使用する場合は、列ヘッダーを指定する必要があります。ヘッダーの指定方法については、「ItemReader フィールドの内容」を参照してください。

Step Functions は、次のルールに基づいてテキスト区切りファイルを解析します。

  • フィールドの区切り文字は ReaderConfigCSVDelimiter で指定します。区切り記号のデフォルトは COMMA です。

  • 改行はレコードを分割する区切り文字です。

  • フィールドは文字列として扱われます。データ型変換には、ItemSelector (Map)States.StringToJson 組み込み関数を使用します。

  • 文字列を二重引用符 (" ") で囲む必要はありません。ただし、二重引用符で囲まれた文字列には、レコード区切り文字として機能しないカンマや改行を含めることができます。

  • 二重引用符は繰り返しにより保存できます。

  • バックスラッシュ (\) も特殊文字をエスケープする方法として使用できます。バックスラッシュは、他のバックスラッシュ、二重引用符、設定されたフィールド区切り記号 (カンマやパイプなど) と組み合わせてのみ機能します。バックスラッシュの後に他の文字が続く場合、そのバックスラッシュは警告なしに削除されます。

  • バックスラッシュは繰り返すことで保持できます。例:

    path,size C:\\Program Files\\MyApp.exe,6534512
  • 二重引用符をエスケープするバックスラッシュ (\") は、ペアで含める必要があるため、二重引用符を繰り返してエスケープすること ("") をお勧めします。

  • 行のフィールド数がヘッダーのフィールド数より少ない場合、Step Functions は欠落している値に空の文字列を提供します。

  • 行のフィールド数がヘッダーのフィールド数よりも多い場合、Step Functions は追加のフィールドをスキップします。

Step Functions がテキスト区切りファイルを解析する方法の詳細については、「Example of parsing an input CSV file」を参照してください。

ワークフロー実行が Map ステートに到達すると、Step Functions は GetObject API アクションを呼び出して、指定されたファイルを取得します。その後、Map ステートはファイル内の各行を反復処理し、各行の項目を処理するために子ワークフロー実行を開始します。例えば、100 行を含むテキスト区切りファイルを入力として用意したとします。そうすると、インタープリタは各行を Map 状態に渡します。Map 状態は項目をヘッダー行の後から順次処理します。

注記
  • 子ワークフロー実行の開始に使用される実行入力は、256 KiB を超えることはできません。ただし、オプションの ItemSelector フィールドを適用して項目のサイズを小さくすることで、Step Functions はテキスト区切りファイル、JSON、または JSON Lines ファイルから最大 8 MB の項目を読み取ることができます。

  • Step Functions は、Amazon S3 内の個々のファイルの最大サイズとして 10 GB をサポートしています。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーに関する推奨事項」を参照してください。

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

ItemReader syntax

例えば、ratings.csv という名前の CSV ファイルがあるとします。次に、このファイルを Amazon S3 バケットに csvDataset という名前のプレフィックスで保存しました。

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW", "CSVDelimiter": "PIPE" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }
Input to a child workflow execution

分散マップ状態は、ヘッダー行を除いて (ファイル内にある場合)、CSV ファイルに存在する行数と同じ数の子ワークフローの実行を開始します。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

Parquet ファイルは入力ソースとして使用できます。Amazon S3 に保存した Apache Parquet ファイルにより、大規模で効率的な列指向データ処理が可能になります。

Parquet ファイルを使用する場合、次の条件が適用されます。

  • 行グループの最大サイズは 256 MB、フッターの最大サイズは 5 MB です。入力ファイルがいずれかの制限を超えた場合、ステートマシンは実行時エラーを返します。

  • VersionId フィールドは InputType=Parquet ではサポートされていません

  • GZIP、ZSTD、および Snappy データ内部圧縮はネイティブでサポートされています。ファイル名の拡張子は不要です。

次に示しているのは、InputType に Parquet を指定した場合の ASL の設定例です。

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "PARQUET" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "my-parquet-data-file-1.parquet" } }
大規模なジョブ処理

非常に大規模なジョブでは、Step Functions は多数の入力リーダーを使用します。リーダーは処理をインターリーブするため、一部のリーダーが一時停止している間に他のリーダーが進行する可能性があります。断続的な進行は、大規模な処理で想定される動作です。

UNLOAD クエリ結果から生成された Athena マニフェストファイルを使用して、Map ステートのデータファイルのソースを指定できます。ManifestTypeATHENA_DATA に設定し、InputTypeCSVJSONL、または Parquet に設定します。

UNLOAD クエリを実行すると、Athena は実際のデータオブジェクトに加えて、データマニフェストファイルを生成します。マニフェストファイルには、データファイルの構造化された CSV リストが含まれます。マニフェストとデータファイルはどちらも、Athena のクエリ結果の保存先である Amazon S3 に保存されます。

UNLOAD (<YOUR_SELECT_QUERY>) TO 'S3_URI_FOR_STORING_DATA_OBJECT' WITH (format = 'JSON')

プロセスの概念的概要 (要約):

  1. Athena の UNLOAD クエリを使用してテーブルからデータを選択します。

  2. Athena は Amazon S3 でマニフェストファイル (CSV) とデータオブジェクトを生成します。

  3. マニフェストファイルを読み取って入力を処理するように Step Functions を設定します。

この機能は、Athena からの CSV、JSONL、および Parquet 出力形式を処理できます。1 つのマニフェストファイルで参照されるすべてのオブジェクトは、同じ InputType 形式である必要があります。UNLOAD クエリによってエクスポートされた CSV オブジェクトでは、最初の行にヘッダーが含まれていないことに注意してください。列ヘッダーを指定する必要がある場合は、「CSVHeaderLocation」を参照してください。

マップのコンテキストには $states.context.Map.Item.Source も含まれるため、データソースに基づいて処理をカスタマイズできます。

次に示しているのは、Athena マニフェストを使用するようにした場合の ItemReader の設定例です。

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "ManifestType": "ATHENA_DATA", "InputType": "CSV | JSONL | PARQUET" }, "Arguments": { "Bucket": "<S3_BUCKET_NAME>", "Key": "<S3_KEY_PREFIX><QUERY_ID>-manifest.csv" } }
Workflow Studio での Athena マニフェストパターンの使用

データ処理の一般的なシナリオでは、Athena UNLOAD クエリから取得されたデータにマップを適用します。マップは、Athena マニフェストに記述された各項目を処理する Lambda 関数を呼び出します。Step Functions Workflow Studio では、これらすべてのコンポーネントを 1 つのブロックにまとめた既製のパターンが用意されており、ステートマシンのキャンバスにドラッグして使用できます。

分散マップ状態は、Amazon S3 バケットにデータセットとして保存されている Amazon S3 インベントリマニフェストファイルを受け入れることができます。

ワークフローの実行が Map 状態に達すると、Step Functions は GetObject API アクションを呼び出して、指定された Amazon S3 インベントリマニフェストファイルを取得します。

デフォルトでは、Map ステートはインベントリ内のオブジェクトを反復処理し、Amazon S3 インベントリオブジェクトのメタデータ配列を返します。

ManifestType を S3_INVENTORY に指定した場合、InputType を指定することはできません。

注記
  • Step Functions では、Amazon S3 インベントリレポートの展開後の個々のファイルの最大サイズとして 10 GB をサポートしています。ただし、Step Functions は、個々のファイルが 10 GB 未満の場合、10 GB 以上を処理できます。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーに関する推奨事項」を参照してください。

CSV 形式のインベントリファイルの例を次に示します。このファイルには imageDataset および csvDataset という名前のオブジェクトが含まれます。これらのオブジェクトは、amzn-s3-demo-source-bucket という名前の Amazon S3 バケットに保存されます。

"amzn-s3-demo-source-bucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "amzn-s3-demo-source-bucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "amzn-s3-demo-source-bucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "amzn-s3-demo-source-bucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
重要

Step Functions では、データセットとしてユーザー定義の Amazon S3 インベントリレポートをサポートしていません。

Amazon S3 インベントリレポートの出力形式は CSV である必要があります。

Amazon S3 インベントリとその設定方法の詳細については、「Amazon S3 インベントリ」を参照してください。

次の Amazon S3 インベントリマニフェストファイルの例は、インベントリオブジェクトメタデータの CSV ヘッダーを示しています。

{ "sourceBucket" : "amzn-s3-demo-source-bucket", "destinationBucket" : "arn:aws:s3:::amzn-s3-demo-inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "amzn-s3-demo-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

ItemReader syntax
"ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Key": "destination-prefix/amzn-s3-demo-bucket/config-id/YYYY-MM-DDTHH-MMZ/manifest.json" } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "amzn-s3-demo-source-bucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

Amazon S3 インベントリレポートの設定時に選択したフィールドによって、manifest.json ファイルの内容はこの例と異なる場合があります。

データセットの IAM ポリシーに関する推奨事項

Step Functions コンソールでワークフローを作成すると、Step Functions はワークフロー定義内のリソースに基づいて IAM ポリシーを自動的に生成できます。生成されるポリシーには、Distributed Map ステートに対して StartExecution API アクションを呼び出し、Amazon S3 バケットやオブジェクト、Lambda 関数などの AWS リソースにアクセスするための最小特権が含まれます。

IAM ポリシーには必要最小限のアクセス許可のみを含めることをお勧めします。例えば、ワークフローに分散モードの Map ステートが含まれている場合、ポリシーの範囲を対象データが含まれる特定の Amazon S3 バケットおよびフォルダに絞り込みます。

重要

分散マップ状態の入力で、既存のキーと値のペアへの参照パスとともに、Amazon S3 バケットやオブジェクト、またはプレフィックスを指定する場合は、ワークフローの IAM ポリシーを必ず更新してください。ポリシーの範囲は、ランタイムでパスから解釈されるバケット名とオブジェクト名に限定します。

次の例では、ListObjectsV2 および GetObject API アクションを使用して Amazon S3 データセットにアクセスするための最小特権を付与する方法を示しています。

例 Amazon S3 オブジェクトをデータセットとして使用する条件

次の条件は、Amazon S3 バケットの processImages フォルダ内のオブジェクトにアクセスするための最小特権を付与します。

"Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } }
例 CSV ファイルをデータセットとして使用する

次の例では、ratings.csv という名前の CSV ファイルにアクセスするために必要なアクションを示しています。

"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/csvDataset/ratings.csv" ]
例 Amazon S3 インベントリをデータセットとして使用する

次に示しているのは、Amazon S3 インベントリマニフェストとデータファイルの例です。

"Resource": [ "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/data/*" ]
例 ListObjectsV2 を使用してフォルダプレフィックスに制限する

ListObjectsV2 を使用する場合、2 つのポリシーが生成されます。1 つはバケット (ListBucket) の内容を一覧表示するための、もう 1 つはバケット (GetObject) 内のオブジェクトを取得するためのポリシーです。

次に示しているのは、アクション、リソース、条件の例です。

"Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "/path/to/your/json/" ] } }
"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/path/to/your/json/*" ]

GetObject ではスコープを設定せず、オブジェクトにワイルドカード (*) を使用することに注意してください。