ItemReader (Map) - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ItemReader (Map)

ItemReader フィールドは JSON オブジェクトで、データセットとその場所を指定します。分散マップ状態はこのデータセットを入力として使用します。

次の例は、Amazon S3 バケットに保存されているテキスト区切りファイルのデータセットの JSONPath ベースのワークフローの ItemReaderフィールドの構文を示しています。

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv", "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

次の JSONata ベースのワークフローでは、 Parameters引数に置き換えられます。

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

ItemReader フィールドの内容

データセットによって、ItemReader フィールドの内容は異なります。例えば、データセットがワークフローの前のステップから渡された JSON 配列である場合、ItemReader フィールドは省略されます。データセットが Amazon S3 データソースである場合、このフィールドには次のサブフィールドが含まれます。

Resource

Step Functions が使用する Amazon S3 API 統合アクション。 arn:aws:states:::s3:getObject

Arguments (JSONata) or Parameters (JSONPath)

データセットが保存されている Amazon S3 バケットの名前とオブジェクトキーを指定する JSON オブジェクト。

バケットでバージョニングが有効になっている場合は、Amazon S3 オブジェクトバージョンを指定することもできます。

ReaderConfig

以下の詳細を指定する JSON オブジェクト。

  • InputType

    CSVJSON、、 JSONL PARQUETのいずれかの値を受け入れますMANIFEST

    テキスト区切りファイル (CSV)、オブジェクト、JSON ファイル、JSON Lines、Parquet ファイル、Athena マニフェスト、Amazon S3 インベントリリストなど、Amazon S3 Amazon S3データソースのタイプを指定します。Workflow Studio では、S3 項目ソースから入力タイプを選択できます。

    S3GetObject 取得を使用するほとんどの入力タイプは、パラメータの フィールドExpectedBucketOwnerVersionIdフィールドもサポートしています。Parquet ファイルは、 をサポートしていない 1 つの例外ですVersionId

    入力ファイルは、GZIP、ZSTD の外部圧縮タイプをサポートしています。

    ファイル名の例: myObject.jsonl.gzおよび myObject.csv.zstd

    注: Parquet ファイルは、内部的に圧縮されるバイナリファイルタイプです。GZIP、ZSTD、および Snappy 圧縮がサポートされています。

  • Transformation

    オプション。値は、 または NONEのいずれかになりますLOAD_AND_FLATTEN

    指定しない場合、 が想定NONEされます。に設定する場合はLOAD_AND_FLATTEN、 も設定する必要がありますInputType

    デフォルトの動作、マップは への呼び出しから返されたメタデータオブジェクトを反復処理しますS3:ListObjectsV2。に設定するとLOAD_AND_FLATTEN、マップは結果のリストで参照される実際のデータオブジェクトを読み取り、処理します。

  • ManifestType

    オプション。値は、 または ATHENA_DATAのいずれかになりますS3_INVENTORY

    注: に設定するとS3_INVENTORY、タイプが であると仮定InputTypeされるため、 も指定しないでくださいCSV

  • CSVDelimiter

    CSVまたは の場合InputType、このフィールドを指定できますMANIFEST

    (COMMAデフォルト)、PIPE、、SEMICOLONSPACEのいずれかの値を受け入れますTAB

    注記

    CSVDelimiter フィールドを使用すると、 はカンマ以外の文字で区切られたファイルを処理ItemReaderできます。「CSV ファイル」への参照には、 CSVDelimiterフィールドで指定された代替区切り文字を使用するファイルも含まれています。

  • CSVHeaderLocation

    CSVまたは の場合InputType、このフィールドを指定できますMANIFEST

    以下の値のいずれかを受け入れ、列ヘッダーの位置を指定します。

    • FIRST_ROW - ファイルの最初の行がヘッダーの場合は、このオプションを使用します。

    • GIVEN - このオプションを使用して、ステートマシン定義内のヘッダーを指定します。

      たとえば、ファイルに次のデータが含まれている場合です。

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      次の JSON 配列を CSV ヘッダーとして指定できます。

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    CSV ヘッダーサイズ

    Step Functions は、テキスト区切りファイルに対して最大 10 KiB のヘッダーをサポートします。

  • MaxItems

    デフォルトでは、 Map状態は指定されたデータセット内のすべての項目を反復処理します。を設定することでMaxItemsMap状態に渡されるデータ項目の数を制限できます。たとえば、1,000 行を含むテキスト区切りファイルを指定し、制限を 100 に設定すると、インタープリタは 100 行のみを分散マップ状態に渡します。Map 状態は項目をヘッダー行の後から順番に処理します。

    JSONPath ワークフローでは、 MaxItemsPathと、整数に解決される状態入力のキーと値のペアへの参照パスを使用できます。MaxItems または のいずれかを指定できますがMaxItemsPath両方を指定することはできません。

    注記

    が項目の読み取りDistributed Mapを停止するまで、最大 100,000,000 の制限を指定できます。

アカウントとリージョンの要件

Amazon S3 バケットは、 AWS リージョン ステートマシンと同じ AWS アカウント および にある必要があります。

ステートマシンが同じ にある異なる バケット内のファイルにアクセスできる場合でも AWS アカウント AWS リージョン、Step Functions は AWS リージョン ステートマシンと同じ AWS アカウント と同じ の両方にある Amazon S3 バケット内のオブジェクトの一覧表示のみをサポートします。

ネストされたデータセットの処理 (2025 年 9 月 11 日更新)

新しいTransformationパラメータを使用すると、 の値を指定できLOAD_AND_FLATTEN、マップは への呼び出しの結果のリストで参照される実際のデータオブジェクトを読み取りますS3:ListObjectsV2

このリリースより前に、ネストされた分散マップを作成してメタデータを取得し、実際のデータを処理する必要があります。最初のマップは、 から返されたメタデータを反復処理S3:ListObjectsV2し、子ワークフローを呼び出します。各子ステートマシン内の別のマップは、個々のファイルから実際のデータを読み取ります。変換オプションを使用すると、両方のステップを一度に実行できます。

システムが 1 時間ごとに生成し、Amazon S3 に保存している過去 24 個のログファイルに対して毎日の監査を実行するとします。分散マップの状態では、 を使用してログファイルを一覧表示しS3:ListObjectsV2、各オブジェクトのメタデータを反復処理したり、Amazon S3 バケットに保存されている実際のデータオブジェクトをロードして分析したりできます。

LOAD_AND_FLATTEN オプションを使用すると、スケーラビリティが向上し、開いているマップ実行数が減少し、複数のオブジェクトが同時に処理されます。Athena ジョブと Amazon EMR ジョブは通常、新しい設定で処理できる出力を生成します。

定義のパラメータの例を次に示しますItemReader

{ "QueryLanguage": "JSONata", "States": { ... "Map": { ... "ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { // InputType is required if Transformation is LOAD_AND_FLATTEN. "InputType": "CSV | JSON | JSONL | PARQUET", // Transformation is OPTIONAL and defaults to NONE if not present "Transformation": "NONE | LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket1", "Prefix": "{% $states.input.PrefixKey %}" } }, ... } }

データセットの例

データセットとして以下のいずれかのオプションを指定できます。

注記

Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーレコメンデーション」を参照してください。

分散マップ状態は、ワークフローの前のステップから渡された JSON 入力を受け入れることができます。

入力は、JSON 配列、JSON オブジェクト、または JSON オブジェクトのノード内の配列です。

Step Functions は、配列の要素、または JSON オブジェクトのキーと値のペアを直接反復処理します。

JSON オブジェクトから配列を含む特定のノードを選択するには、 を使用するItemsPath (Map、JSONPath のみ)か、JSONata 状態の Itemsフィールドで JSONata 式を使用できます。

個々の項目を処理するために、分散マップ状態は項目ごとに子ワークフロー実行を開始します。以下のタブには、Map 状態に渡される入力と、それに対応する子ワークフロー実行への入力の例が表示されます。

注記

データセットが前のステップの JSON データである場合、 ItemReaderフィールドは必要ありません。

Input passed to the Map state

次の 3 つの項目からなる JSON 配列を考えてみましょう。

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

分散マップ状態は、3 つの子ワークフローの実行を開始します。実行するたびに、配列項目を入力として受け取ります。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

分散マップ状態では、Amazon S3 バケットに格納されているオブジェクトを反復できます。ワークフロー実行が Map状態に達すると、Step Functions は ListObjectsV2 API アクションを呼び出し、Amazon S3 オブジェクトメタデータの配列を返します。この配列では、各項目には、バケットに保存されている実際のデータの ETagKey などのデータが含まれます。

配列内の個々の項目を処理するために、分散マップ状態は子ワークフロー実行を開始します。例えば、Amazon S3 バケットに 100 個のイメージが含まれているとします。次に、ListObjectsV2API アクションを呼び出した後に返される配列には、100 個のメタデータ項目が含まれます。その後、分散マップ状態は 100 個の子ワークフロー実行を開始し、各項目を処理します。

ネストされたワークフローなしでデータオブジェクトを直接処理するには、LOAD_AND_FLATTEN 変換オプションを選択して項目を直接処理できます。

注記
  • Step Functions には、Amazon S3 Amazon S3コンソールを使用して Amazon S3 バケットに作成された各フォルダの項目も含まれます。フォルダ項目により、追加の子ワークフロー実行が開始されます。

    フォルダごとに追加の子ワークフロー実行が作成されないようにするには、 AWS CLI を使用してフォルダを作成することをお勧めします。詳細については、「AWS Command Line Interface ユーザーガイド」の「高レベルな S3 コマンド」を参照してください。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーレコメンデーション」を参照してください。

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

ItemReader syntax

この例では、イメージ、JSON ファイル、オブジェクトを含むデータを、amzn-s3-demo-bucket という名前の Amazon S3 バケットの processData というプレフィックス内に整理しました。

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Prefix": "processData" } }
Input passed to a child workflow execution

分散マップ状態は、Amazon S3 バケットに存在するメタデータ項目の数と同じ数の子ワークフロー実行を開始します。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

分散マップの入力ソースとしての S3 ListObjectsV2 のサポートが強化されているため、ステートマシンは Amazon S3 バケットから複数のデータオブジェクトを直接読み取って処理できるため、ネストされたマップでメタデータを処理する必要がなくなります。

LOAD_AND_FLATTEN オプションを使用すると、ステートマシンは以下を実行します。

  • Amazon S3 ListObjectsV2呼び出しでリストされている各オブジェクトの実際の内容を読み取ります。

  • InputType (CSV、JSON、JSONL、Parquet) に基づいてコンテンツを解析します。

  • メタデータではなくファイルの内容 (行/レコード) から項目を作成します。

変換オプションを使用すると、ネストされた分散マップでメタデータを処理する必要がなくなります。LOAD_AND_FLATTEN オプションを使用すると、スケーラビリティが向上し、アクティブなマップ実行数が減り、複数のオブジェクトが同時に処理されます。

次の設定は、 の設定を示していますItemReader

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { "InputType": "JSON", "Transformation": "LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "S3_BUCKET_NAME", "Prefix": "S3_BUCKET_PREFIX" } }
バケットプレフィックスのレコメンデーション

プレフィックスに末尾のスラッシュを含めることをお勧めします。たとえば、プレフィックスが のデータを選択するとfolder1、ステートマシンは folder1/myData.csvと の両方を処理しますfolder10/myData.csv。を使用すると、厳密に 1 つのフォルダのみが処理folder1/されます。

分散マップ状態は、Amazon S3 バケットにデータセットとして保存されている JSON ファイルを受け入れることができます。JSON ファイルには、配列または JSON オブジェクトが含まれている必要があります。

ワークフローの実行が Map 状態に達すると、Step Functions は GetObject API アクションを呼び出して、指定された JSON ファイルを取得します。

JSON ファイルにネストされたオブジェクト構造が含まれている場合は、 でデータセットを持つ特定のノードを選択できますItemsPointer。たとえば、次の設定では、インベントリ内の注目製品のネストされたリストが抽出されます。

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON", "ItemsPointer": "/inventory/products/featured" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "nested-data-file.json" } }

その後、Map 状態は配列内の各項目を反復処理し、項目ごとに子ワークフローの実行を開始します。例えば、JSON ファイルに 1000 個の配列項目が含まれている場合、Map 状態は 1000 個の子ワークフロー実行を開始します。

注記
  • 子ワークフロー実行の開始に使用される実行入力は、256 KiB を超えることはできません。ただし、オプションの ItemSelectorフィールドを適用して項目のサイズを小さくすると、Step Functions はテキスト区切りファイル、JSON、または JSON Lines ファイルから最大 8 MB の項目を読み取ることができます。

  • Step Functions は、Amazon S3 内の個々のファイルの最大サイズとして 10 GB をサポートしています。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーレコメンデーション」を参照してください。

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

この例では、factcheck.json という JSON ファイルがあるとします。このファイルを Amazon S3 バケットに jsonDataset という名前のプレフィックスで保存しました。以下は、JSON データセットの例です。

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

分散マップ状態は、Json ファイルに存在する配列項目数と同じ数の子ワークフローの実行を開始します。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

分散マップ状態は、Amazon S3 バケットに保存されている JSON Lines ファイルをデータセットとして受け入れることができます。

注記
  • 子ワークフロー実行の開始に使用される実行入力は、256 KiB を超えることはできません。ただし、オプションの ItemSelectorフィールドを適用して項目のサイズを小さくすると、Step Functions はテキスト区切りファイル、JSON、または JSON Lines ファイルから最大 8 MB の項目を読み取ることができます。

  • Step Functions は、Amazon S3 内の個々のファイルの最大サイズとして 10 GB をサポートしています。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーレコメンデーション」を参照してください。

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

この例では、 という名前の JSON Lines ファイルがあるとしますfactcheck.jsonl。このファイルを Amazon S3 バケットに jsonlDataset という名前のプレフィックスで保存しました。以下は、ファイルの内容の例です。

{"verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech"} {"verdict": "false", "statement_date": "6/7/2022", "statement_source": "television"} {"verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news"}
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSONL" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonlDataset/factcheck.jsonl" } }
Input to a child workflow execution

分散マップ状態は、JSONL ファイルに存在する行数と同じ数の子ワークフロー実行を開始します。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }
注記

CSVDelimiter フィールドを使用すると、 はカンマ以外の文字で区切られたファイルを処理ItemReaderできます。「CSV ファイル」への参照には、 CSVDelimiterフィールドで指定された代替区切り文字を使用するファイルも含まれています。

分散マップ状態は、Amazon S3 バケットに保存されているテキスト区切りファイルをデータセットとして受け入れることができます。データセットとしてテキスト区切りファイルを使用する場合は、列ヘッダーを指定する必要があります。ヘッダーを指定する方法については、「」を参照してくださいItemReader フィールドの内容

Step Functions は、次のルールに基づいてテキスト区切りファイルを解析します。

  • フィールドを区切る区切り文字は、ReaderConfig CSVDelimiterの によって指定されます。区切り文字のデフォルトは ですCOMMA

  • 改行はレコードを分割する区切り文字です。

  • フィールドは文字列として扱われます。データ型変換には、ItemSelector (Map)States.StringToJson 組み込み関数を使用します。

  • 文字列を二重引用符 (" ") で囲む必要はありません。ただし、二重引用符で囲まれた文字列には、レコード区切り文字として機能しないカンマや改行を含めることができます。

  • 二重引用符は繰り返しにより保存できます。

  • 特殊文字をエスケープするもう 1 つの方法はバックスラッシュ (\) です。バックスラッシュは、他のバックスラッシュ、二重引用符、カンマやパイプなどの設定されたフィールド区切り文字でのみ機能します。バックスラッシュの後に他の文字が続くと、サイレントに削除されます。

  • バックスラッシュは繰り返して保持できます。例:

    path,size C:\\Program Files\\MyApp.exe,6534512
  • 二重引用符 (\") をエスケープするバックスラッシュは、ペアに含まれる場合にのみ機能するため、二重引用符を繰り返してエスケープすることをお勧めします。 ""

  • 行のフィールド数がヘッダーのフィールド数より少ない場合、Step Functions は欠落している値に空の文字列を提供します。

  • 行のフィールド数がヘッダーのフィールド数よりも多い場合、Step Functions は追加のフィールドをスキップします。

Step Functions がテキスト区切りファイルを解析する方法の詳細については、「」を参照してくださいExample of parsing an input CSV file

ワークフロー実行が Map状態に達すると、Step Functions は GetObject API アクションを呼び出して、指定されたファイルを取得します。次に、 Map状態はファイル内の各行を反復処理し、子ワークフロー実行を開始して各行の項目を処理します。たとえば、入力として 100 行を含むテキスト区切りファイルを指定するとします。そうすると、インタープリタは各行を Map 状態に渡します。Map 状態は項目をヘッダー行の後から順次処理します。

注記
  • 子ワークフロー実行の開始に使用される実行入力は、256 KiB を超えることはできません。ただし、オプションの ItemSelectorフィールドを適用して項目のサイズを小さくすると、Step Functions はテキスト区切りファイル、JSON、または JSON Lines ファイルから最大 8 MB の項目を読み取ることができます。

  • Step Functions は、Amazon S3 内の個々のファイルの最大サイズとして 10 GB をサポートしています。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーレコメンデーション」を参照してください。

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

ItemReader syntax

例えば、ratings.csv という名前の CSV ファイルがあるとします。次に、このファイルを Amazon S3 バケットに csvDataset という名前のプレフィックスで保存しました。

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW", "CSVDelimiter": "PIPE" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }
Input to a child workflow execution

分散マップ状態は、ヘッダー行を除いて (ファイル内にある場合)、CSV ファイルに存在する行数と同じ数の子ワークフローの実行を開始します。次の例は、子ワークフロー実行が受け取る入力を示しています。

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

Parquet ファイルは入力ソースとして使用できます。Amazon S3 に保存されている Apache Parquet ファイルは、大規模な効率的な列指向データ処理を提供します。

Parquet ファイルを使用する場合、次の条件が適用されます。

  • 256MB は最大行グループサイズ、5MB は最大フッターサイズです。いずれかの制限を超える入力ファイルを指定すると、ステートマシンはランタイムエラーを返します。

  • VersionId フィールドは ではサポートされていませんInputType=Parquet

  • 内部 GZIP、ZSTD、および Snappy データ圧縮はネイティブにサポートされています。ファイル名拡張子は必要ありません。

以下は、 を Parquet InputTypeに設定する ASL 設定の例です。

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "PARQUET" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "my-parquet-data-file-1.parquet" } }
大規模なジョブ処理

非常に大規模なジョブの場合、Step Functions は多くの入力リーダーを使用します。読者は処理をインターリーブするため、一部の読者は一時停止し、他の読者は進行する可能性があります。断続的な進行状況は、大規模な予想される動作です。

UNLOAD クエリ結果から生成された Athena マニフェストファイルを使用して、マップ状態のデータファイルのソースを指定できます。ManifestType を に設定しATHENA_DATAInputTypeCSVJSONL、または に設定しますParquet

UNLOAD クエリを実行すると、Athena は実際のデータオブジェクトに加えてデータマニフェストファイルを生成します。マニフェストファイルは、データファイルの構造化された CSV リストを提供します。マニフェストとデータファイルの両方が Amazon S3 の Athena クエリ結果の場所に保存されます。

UNLOAD (<YOUR_SELECT_QUERY>) TO 'S3_URI_FOR_STORING_DATA_OBJECT' WITH (format = 'JSON')

プロセスの概念的な概要を簡単に説明します。

  1. Athena のUNLOADクエリを使用してテーブルからデータを選択します。

  2. Athena は、Amazon S3 でマニフェストファイル (CSV) とデータオブジェクトを生成します。

  3. マニフェストファイルを読み取って入力を処理するように Step Functions を設定します。

この機能では、Athena から CSV、JSONL、および Parquet の出力形式を処理できます。単一のマニフェストファイルで参照されるすべてのオブジェクトは、同じ InputType 形式である必要があります。UNLOAD クエリによってエクスポートされた CSV オブジェクトには、最初の行に ヘッダーが含まれていないことに注意してください。列ヘッダーを指定する必要があるCSVHeaderLocationかどうかを確認します。

マップコンテキストには も含まれ$states.context.Map.Item.Sourceるため、データのソースに基づいて処理をカスタマイズできます。

以下は、Athena マニフェストを使用するようにItemReader設定された の設定例です。

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "ManifestType": "ATHENA_DATA", "InputType": "CSV | JSONL | PARQUET" }, "Arguments": { "Bucket": "<S3_BUCKET_NAME>", "Key": "<S3_KEY_PREFIX><QUERY_ID>-manifest.csv" } }
Workflow Studio での Athena マニフェストパターンの使用

データ処理の一般的なシナリオでは、Athena UNLOAD クエリから取得されたデータにマップが適用されます。Map は Lambda 関数を呼び出して、Athena マニフェストで説明されている各項目を処理します。Step Functions Workflow Studio は、これらのすべてのコンポーネントを組み合わせてステートマシンキャンバスへのドラッグをブロックする既製のパターンを提供します。

分散マップ状態は、Amazon S3 バケットにデータセットとして保存されている Amazon S3 インベントリマニフェストファイルを受け入れることができます。

ワークフローの実行が Map 状態に達すると、Step Functions は GetObject API アクションを呼び出して、指定された Amazon S3 インベントリマニフェストファイルを取得します。

デフォルトでは、 Map状態はインベントリ内のオブジェクトを繰り返して、Amazon S3 インベントリオブジェクトメタデータの配列を返します。

ManifestType を S3_INVENTORY に指定した場合、InputType を指定することはできません。

注記
  • Step Functions は、解凍後の Amazon S3 インベントリレポートの個々のファイルの最大サイズとして 10 GB をサポートします。ただし、Step Functions は、個々のファイルが 10 GB 未満の場合、10 GB 以上を処理できます。

  • Step Functions には、使用する Amazon S3 データセットにアクセスするための適切な許可が必要です。データセットの IAM ポリシーの詳細については、「データセットの IAM ポリシーレコメンデーション」を参照してください。

CSV 形式のインベントリファイルの例を次に示します。このファイルには imageDataset および csvDataset という名前のオブジェクトが含まれます。これらのオブジェクトは、amzn-s3-demo-source-bucket という名前の Amazon S3 バケットに保存されます。

"amzn-s3-demo-source-bucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "amzn-s3-demo-source-bucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "amzn-s3-demo-source-bucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "amzn-s3-demo-source-bucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
重要

Step Functions は、ユーザー定義の Amazon S3 インベントリレポートをデータセットとしてサポートしていません。

Amazon S3 インベントリレポートの出力形式は CSV である必要があります。

Amazon S3 インベントリとそのセットアップ方法の詳細については、Amazon S3インベントリ」を参照してください。

次の Amazon S3 インベントリマニフェストファイルの例は、インベントリオブジェクトメタデータの CSV ヘッダーを示しています。

{ "sourceBucket" : "amzn-s3-demo-source-bucket", "destinationBucket" : "arn:aws:s3:::amzn-s3-demo-inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "amzn-s3-demo-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

以下のタブは、このデータセットの子ワークフロー実行に渡される ItemReader フィールド構文と入力の例を示しています。

ItemReader syntax
"ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Key": "destination-prefix/amzn-s3-demo-bucket/config-id/YYYY-MM-DDTHH-MMZ/manifest.json" } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "amzn-s3-demo-source-bucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

Amazon S3 インベントリレポートの設定時に選択したフィールドによっては、manifest.jsonファイルの内容が例と異なる場合があります。

データセットの IAM ポリシーレコメンデーション

Step Functions コンソールでワークフローを作成すると、Step Functions はワークフロー定義内のリソースに基づいて IAM ポリシーを自動的に生成できます。生成されたポリシーには、ステートマシンロールが分散マップ状態の StartExecution API アクションを呼び出し、Amazon S3 バケットやオブジェクト、Lambda 関数などの AWS リソースにアクセスするために必要な最小限の権限が含まれます。

IAM ポリシーに必要な許可のみを含めることをお勧めします。たとえば、ワークフローに分散モードMapの状態が含まれている場合は、データを含む特定の Amazon S3 バケットとフォルダにポリシーの範囲を絞り込みます。

重要

分散マップ状態の入力で、既存のキーと値のペアへの参照パスとともに、Amazon S3 バケットやオブジェクト、またはプレフィックスを指定する場合は、ワークフローの IAM ポリシーを必ず更新してください。ポリシーの範囲は、ランタイムでパスから解釈されるバケット名とオブジェクト名に限定します。

次の例は、ListObjectsV2 および GetObject API アクションを使用して Amazon S3 データセットにアクセスするために必要な最小限の権限を付与する手法を示しています。

例 Amazon S3 オブジェクトをデータセットとして使用する条件

次の条件は、Amazon S3 バケットのprocessImagesフォルダ内のオブジェクトにアクセスする最小権限を付与します。

"Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } }
例 CSV ファイルをデータセットとして使用する

次の例は、 という名前の CSV ファイルにアクセスするために必要なアクションを示していますratings.csv

"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/csvDataset/ratings.csv" ]
例 Amazon S3 インベントリをデータセットとして使用する

Amazon S3 インベントリマニフェストとデータファイルのリソース例を次に示します。

"Resource": [ "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/data/*" ]
例 ListObjectsV2 を使用してフォルダプレフィックスに制限する

ListObjectsV2 を使用する場合、2 つのポリシーが生成されます。1 つはバケットの内容の一覧表示を許可するために必要であり (ListBucket)、もう 1 つのポリシーはバケット内のオブジェクトの取得を許可します ()GetObject

アクション、リソース、および条件の例を次に示します。

"Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "/path/to/your/json/" ] } }
"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/path/to/your/json/*" ]

はスコープに含まれGetObjectず、オブジェクトにはワイルドカード (*) を使用することに注意してください。