ItemReader (Peta) - AWS Step Functions

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

ItemReader (Peta)

ItemReaderBidang adalah objek JSON, yang menentukan dataset dan lokasinya. Status Peta Terdistribusi menggunakan kumpulan data ini sebagai inputnya.

Contoh berikut menunjukkan sintaks ItemReader bidang dalam alur kerja JSONPathberbasis, untuk kumpulan data dalam file teks yang dibatasi yang disimpan dalam bucket Amazon S3.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv", "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

Dalam alur kerja JSONataberbasis berikut, perhatikan bahwa Parameters diganti dengan Argumen.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

Isi ItemReader lapangan

Bergantung pada kumpulan data Anda, isi ItemReader bidang bervariasi. Misalnya, jika kumpulan data Anda adalah array JSON yang diteruskan dari langkah sebelumnya dalam alur kerja, ItemReader bidang tersebut dihilangkan. Jika kumpulan data Anda adalah sumber data Amazon S3, bidang ini berisi sub-bidang berikut.

Resource

Tindakan integrasi Amazon S3 API yang akan digunakan Step Functions, seperti arn:aws:states:::s3:getObject

Arguments (JSONata) or Parameters (JSONPath)

Objek JSON yang menentukan nama bucket Amazon S3 dan kunci objek tempat kumpulan data disimpan.

Jika bucket mengaktifkan versi, Anda juga dapat menyediakan versi objek Amazon S3.

ReaderConfig

Sebuah objek JSON yang menentukan rincian berikut:

  • InputType

    Menerima salah satu nilai berikut:CSV,,JSON, JSONLPARQUET,MANIFEST.

    Menentukan jenis sumber data Amazon S3, seperti file yang dibatasi teks CSV (), objek, file JSON, Garis JSON, file Parket, manifes Athena, atau daftar inventaris Amazon S3. Di Workflow Studio, Anda dapat memilih jenis input dari sumber item S3.

    Sebagian besar jenis input yang menggunakan S3GetObject pengambilan juga mendukung ExpectedBucketOwner dan VersionId bidang dalam parameternya. File parket adalah satu pengecualian yang tidak mendukungVersionId.

    File input mendukung jenis kompresi eksternal berikut: GZIP, ZSTD.

    Contoh nama file: myObject.jsonl.gz danmyObject.csv.zstd.

    Catatan: File parket adalah jenis file biner yang dikompresi secara internal. Kompresi GZIP, ZSTD, dan Snappy didukung.

  • Transformation

    Opsional. Nilai akan menjadi salah satu atau NONE atauLOAD_AND_FLATTEN.

    Jika tidak ditentukan, NONE akan diasumsikan. Ketika diatur keLOAD_AND_FLATTEN, Anda juga harus mengaturInputType.

    Perilaku default, peta akan mengulangi objek metadata yang dikembalikan dari panggilan ke. S3:ListObjectsV2 Ketika diatur keLOAD_AND_FLATTEN, peta akan membaca dan memproses objek data aktual yang direferensikan dalam daftar hasil.

  • ManifestType

    Opsional. Nilai akan menjadi salah satu atau ATHENA_DATA atauS3_INVENTORY.

    Catatan: Jika disetel keS3_INVENTORY, Anda juga tidak harus menentukan InputType karena jenis diasumsikanCSV.

  • CSVDelimiter

    Anda dapat menentukan bidang InputType ini kapan CSV atauMANIFEST.

    Menerima salah satu nilai berikut: COMMA (default),,PIPE, SEMICOLONSPACE,TAB.

    catatan

    Dengan CSVDelimiter bidang, ItemReader dapat memproses file yang dibatasi oleh karakter selain koma. Referensi ke “file CSV” juga mencakup file yang menggunakan pembatas alternatif yang ditentukan oleh bidang. CSVDelimiter

  • CSVHeaderLocation

    Anda dapat menentukan bidang InputType ini kapan CSV atauMANIFEST.

    Menerima salah satu nilai berikut untuk menentukan lokasi header kolom:

    • FIRST_ROW— Gunakan opsi ini jika baris pertama file adalah header.

    • GIVEN— Gunakan opsi ini untuk menentukan header dalam definisi mesin negara.

      Misalnya, jika file Anda berisi data berikut.

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      Anda dapat memberikan array JSON berikut sebagai header CSV:

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    Ukuran header CSV

    Step Functions mendukung header hingga 10 KiB untuk file yang dibatasi teks.

  • MaxItems

    Secara default, Map status mengulangi semua item dalam kumpulan data yang ditentukan. Dengan menyetelMaxItems, Anda dapat membatasi jumlah item data yang diteruskan ke Map status. Misalnya, jika Anda memberikan file dibatasi teks yang berisi 1.000 baris, dan Anda menetapkan batas 100, penerjemah hanya meneruskan 100 baris ke status Peta Terdistribusi. MapStatus memproses item dalam urutan berurutan, dimulai setelah baris header.

    Untuk JSONPathalur kerja, Anda dapat menggunakan MaxItemsPath dan jalur referensi ke pasangan kunci-nilai dalam input status yang menyelesaikan ke bilangan bulat. Perhatikan bahwa Anda dapat menentukan salah satu MaxItems atauMaxItemsPath, tetapi tidak keduanya.

    catatan

    Anda dapat menentukan batas hingga 100.000.000 setelah itu Distributed Map berhenti membaca item.

Persyaratan untuk akun dan wilayah

Bucket Amazon S3 Anda harus sama Akun AWS dan Wilayah AWS sebagai mesin negara bagian Anda.

Perhatikan bahwa meskipun mesin status Anda mungkin dapat mengakses file dalam bucket di berbagai Akun AWS yang sama Wilayah AWS, Step Functions hanya mendukung daftar objek di bucket Amazon S3 yang Akun AWS sama dan sama dengan mesin status. Wilayah AWS

Memproses kumpulan data bersarang (diperbarui 11 Sep 2025)

Dengan Transformation parameter baru, Anda dapat menentukan nilai LOAD_AND_FLATTEN dan peta akan membaca objek data aktual yang direferensikan dalam daftar hasil dari panggilan keS3:ListObjectsV2.

Sebelum rilis ini, Anda perlu membuat Peta Terdistribusi bersarang untuk mengambil metadata dan kemudian memproses data aktual. Peta pertama akan mengulangi metadata yang dikembalikan oleh S3:ListObjectsV2 dan memanggil alur kerja anak. Peta lain dalam setiap mesin status anak akan membaca data aktual dari masing-masing file. Dengan opsi transformasi, Anda dapat menyelesaikan kedua langkah sekaligus.

Bayangkan Anda ingin menjalankan audit harian pada 24 file log terakhir yang diproduksi sistem Anda setiap jam dan disimpan di Amazon S3. Status Peta Terdistribusi Anda dapat mencantumkan file logS3:ListObjectsV2, lalu mengulang metadata setiap objek, atau sekarang dapat memuat dan menganalisis objek data aktual yang disimpan di bucket Amazon S3 Anda.

Menggunakan LOAD_AND_FLATTEN opsi ini dapat meningkatkan skalabilitas, mengurangi jumlah Map Run yang terbuka, dan memproses beberapa objek secara bersamaan. Pekerjaan EMR Athena dan Amazon biasanya menghasilkan output yang dapat diproses dengan konfigurasi baru.

Berikut ini adalah contoh parameter dalam ItemReader definisi:

{ "QueryLanguage": "JSONata", "States": { ... "Map": { ... "ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { // InputType is required if Transformation is LOAD_AND_FLATTEN. "InputType": "CSV | JSON | JSONL | PARQUET", // Transformation is OPTIONAL and defaults to NONE if not present "Transformation": "NONE | LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket1", "Prefix": "{% $states.input.PrefixKey %}" } }, ... } }

Contoh dataset

Anda dapat menentukan salah satu opsi berikut sebagai kumpulan data Anda:

catatan

Step Functions memerlukan izin yang sesuai untuk mengakses kumpulan data Amazon S3 yang Anda gunakan. Untuk informasi tentang kebijakan IAM untuk kumpulan data, lihat. Rekomendasi kebijakan IAM untuk kumpulan data

Status Peta Terdistribusi dapat menerima input JSON yang diteruskan dari langkah sebelumnya dalam alur kerja.

Input dapat berupa array JSON, objek JSON, atau array dalam node dari objek JSON.

Step Functions akan iterasi langsung di atas elemen array, atau pasangan kunci-nilai dari objek JSON.

Untuk memilih node tertentu yang berisi array dari objek JSON, Anda dapat menggunakan ItemsPath (Peta, JSONPath hanya) atau menggunakan JSONata ekspresi di Items bidang untuk JSONata status.

Untuk memproses item individual, status Peta Terdistribusi memulai eksekusi alur kerja anak untuk setiap item. Tab berikut menunjukkan contoh input yang diteruskan ke Map status dan input yang sesuai ke eksekusi alur kerja anak.

catatan

ItemReaderBidang tidak diperlukan saat dataset Anda adalah data JSON dari langkah sebelumnya.

Input passed to the Map state

Pertimbangkan array JSON berikut dari tiga item.

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

Status Peta Terdistribusi memulai tiga eksekusi alur kerja anak. Setiap eksekusi menerima item array sebagai input. Contoh berikut menunjukkan input yang diterima oleh eksekusi alur kerja anak.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Status Peta Terdistribusi dapat mengulang objek yang disimpan dalam bucket Amazon S3. Saat eksekusi alur kerja mencapai Map status, Step Functions memanggil aksi ListObjectsV2 API, yang mengembalikan array metadata objek Amazon S3. Dalam array ini, setiap item berisi data, seperti ETagdan Key, untuk data aktual yang disimpan dalam bucket.

Untuk memproses item individual dalam array, status Peta Terdistribusi memulai eksekusi alur kerja anak. Misalnya, ember Amazon S3 Anda berisi 100 gambar. Kemudian, array yang dikembalikan setelah menjalankan aksi ListObjectsV2 API berisi 100 item metadata. Status Peta Terdistribusi kemudian memulai 100 eksekusi alur kerja anak untuk memproses setiap item.

Untuk memproses objek data secara langsung, tanpa alur kerja bersarang, Anda dapat memilih opsi Transformasi LOAD_AND_FLATTEN untuk memproses item secara langsung.

catatan
  • Step Functions juga akan menyertakan item untuk setiap folder yang dibuat di bucket Amazon S3 menggunakan konsol Amazon S3. Item folder menghasilkan eksekusi alur kerja anak tambahan.

    Untuk menghindari membuat eksekusi alur kerja anak tambahan untuk setiap folder, kami sarankan Anda menggunakan AWS CLI untuk membuat folder. Untuk informasi selengkapnya, lihat Perintah Amazon S3 tingkat tinggi di Panduan Pengguna AWS Command Line Interface .

  • Step Functions memerlukan izin yang sesuai untuk mengakses kumpulan data Amazon S3 yang Anda gunakan. Untuk informasi tentang kebijakan IAM untuk kumpulan data, lihat. Rekomendasi kebijakan IAM untuk kumpulan data

Tab berikut menunjukkan contoh sintaks ItemReader bidang dan masukan yang diteruskan ke eksekusi alur kerja anak untuk kumpulan data ini.

ItemReader syntax

Dalam contoh ini, Anda telah mengatur data Anda, yang mencakup gambar, file JSON, dan objek, dalam awalan yang dinamai processData dalam bucket Amazon S3 bernama. amzn-s3-demo-bucket

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Prefix": "processData" } }
Input passed to a child workflow execution

Status Peta Terdistribusi memulai eksekusi alur kerja anak sebanyak jumlah item metadata yang ada di bucket Amazon S3. Contoh berikut menunjukkan input yang diterima oleh eksekusi alur kerja anak.

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

Dengan dukungan yang disempurnakan untuk S3 ListObjects V2 sebagai sumber input di Peta Terdistribusi, mesin status Anda dapat membaca dan memproses beberapa objek data dari bucket Amazon S3 secara langsung, sehingga tidak perlu peta bersarang untuk memproses metadata!

Dengan LOAD_AND_FLATTEN opsi tersebut, mesin negara Anda akan melakukan hal berikut:

  • Baca konten aktual dari setiap objek yang terdaftar oleh panggilan Amazon S3ListObjectsV2.

  • Parse konten berdasarkan InputType (CSV, JSON, JSONL, Parquet).

  • Buat item dari isi file (baris/catatan) daripada metadata.

Dengan opsi transformasi, Anda tidak lagi memerlukan Peta Terdistribusi bersarang untuk memproses metadata. Menggunakan opsi LOAD_AND_FLATTEN meningkatkan skalabilitas, mengurangi jumlah run map aktif, dan memproses beberapa objek secara bersamaan.

Konfigurasi berikut menunjukkan pengaturan untukItemReader:

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { "InputType": "JSON", "Transformation": "LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "S3_BUCKET_NAME", "Prefix": "S3_BUCKET_PREFIX" } }
Rekomendasi awalan ember

Sebaiknya sertakan garis miring pada awalan Anda. Misalnya, jika Anda memilih data dengan awalanfolder1, mesin status Anda akan memproses keduanya folder1/myData.csv danfolder10/myData.csv. Menggunakan hanya folder1/ akan memproses satu folder.

Status Peta Terdistribusi dapat menerima file JSON yang disimpan di bucket Amazon S3 sebagai kumpulan data. File JSON harus berisi array atau objek JSON.

Ketika eksekusi alur kerja mencapai Map status, Step Functions memanggil aksi GetObjectAPI untuk mengambil file JSON yang ditentukan.

Jika file JSON berisi struktur objek bersarang, Anda dapat memilih node tertentu dengan kumpulan data Anda dengan file. ItemsPointer Misalnya, konfigurasi berikut akan mengekstrak daftar bersarang produk unggulan dalam inventaris.

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON", "ItemsPointer": "/inventory/products/featured" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "nested-data-file.json" } }

MapStatus kemudian mengulangi setiap item dalam array dan memulai eksekusi alur kerja anak untuk setiap item. Misalnya, jika file JSON Anda berisi 1000 item array, Map status memulai 1000 eksekusi alur kerja anak.

catatan
  • Input eksekusi yang digunakan untuk memulai eksekusi alur kerja anak tidak dapat melebihi 256 KiB. Namun, Step Functions mendukung membaca item hingga 8 MB dari file teks yang dibatasi, JSON, atau file JSON Lines jika Anda kemudian menerapkan ItemSelector bidang opsional untuk mengurangi ukuran item.

  • Step Functions mendukung 10 GB sebagai ukuran maksimum file individual di Amazon S3.

  • Step Functions memerlukan izin yang sesuai untuk mengakses kumpulan data Amazon S3 yang Anda gunakan. Untuk informasi tentang kebijakan IAM untuk kumpulan data, lihat. Rekomendasi kebijakan IAM untuk kumpulan data

Tab berikut menunjukkan contoh sintaks ItemReader bidang dan masukan yang diteruskan ke eksekusi alur kerja anak untuk kumpulan data ini.

Untuk contoh ini, bayangkan Anda memiliki file JSON bernamafactcheck.json. Anda telah menyimpan file ini dalam awalan bernama jsonDataset dalam bucket Amazon S3. Berikut ini adalah contoh dari dataset JSON.

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

Status Peta Terdistribusi memulai eksekusi alur kerja anak sebanyak jumlah item array yang ada dalam file JSON. Contoh berikut menunjukkan input yang diterima oleh eksekusi alur kerja anak.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Status Peta Terdistribusi dapat menerima file JSON Lines yang disimpan di bucket Amazon S3 sebagai kumpulan data.

catatan
  • Input eksekusi yang digunakan untuk memulai eksekusi alur kerja anak tidak dapat melebihi 256 KiB. Namun, Step Functions mendukung membaca item hingga 8 MB dari file teks yang dibatasi, JSON, atau file JSON Lines jika Anda kemudian menerapkan ItemSelector bidang opsional untuk mengurangi ukuran item.

  • Step Functions mendukung 10 GB sebagai ukuran maksimum file individual di Amazon S3.

  • Step Functions memerlukan izin yang sesuai untuk mengakses kumpulan data Amazon S3 yang Anda gunakan. Untuk informasi tentang kebijakan IAM untuk kumpulan data, lihat. Rekomendasi kebijakan IAM untuk kumpulan data

Tab berikut menunjukkan contoh sintaks ItemReader bidang dan masukan yang diteruskan ke eksekusi alur kerja anak untuk kumpulan data ini.

Untuk contoh ini, bayangkan Anda memiliki file JSON Lines bernamafactcheck.jsonl. Anda telah menyimpan file ini dalam awalan bernama jsonlDataset dalam bucket Amazon S3. Berikut ini adalah contoh isi file.

{"verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech"} {"verdict": "false", "statement_date": "6/7/2022", "statement_source": "television"} {"verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news"}
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSONL" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonlDataset/factcheck.jsonl" } }
Input to a child workflow execution

Status Peta Terdistribusi memulai eksekusi alur kerja anak sebanyak jumlah baris yang ada dalam file JSONL. Contoh berikut menunjukkan input yang diterima oleh eksekusi alur kerja anak.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }
catatan

Dengan CSVDelimiter bidang, ItemReader dapat memproses file yang dibatasi oleh karakter selain koma. Referensi ke “file CSV” juga mencakup file yang menggunakan pembatas alternatif yang ditentukan oleh bidang. CSVDelimiter

Status Peta Terdistribusi dapat menerima file teks yang dibatasi yang disimpan dalam bucket Amazon S3 sebagai kumpulan data. Jika Anda menggunakan file yang dibatasi teks sebagai kumpulan data Anda, Anda perlu menentukan header kolom. Untuk informasi tentang cara menentukan header, lihatIsi ItemReader lapangan.

Step Functions mem-parsing file teks yang dibatasi berdasarkan aturan berikut:

  • Pembatas yang memisahkan bidang ditentukan oleh in. CSVDelimiter ReaderConfig Delimiter default ke. COMMA

  • Baris baru adalah pembatas yang memisahkan catatan.

  • Bidang diperlakukan sebagai string. Untuk konversi tipe data, gunakan fungsi States.StringToJson intrinsik di. ItemSelector (Peta)

  • Tanda kutip ganda (” “) tidak diperlukan untuk melampirkan string. Namun, string yang diapit oleh tanda kutip ganda dapat berisi koma dan baris baru tanpa bertindak sebagai pembatas rekaman.

  • Anda dapat mempertahankan tanda kutip ganda dengan mengulanginya.

  • Backslashes (\) adalah cara lain untuk melarikan diri dari karakter khusus. Garis miring terbalik hanya berfungsi dengan garis miring terbalik lainnya, tanda kutip ganda, dan pemisah bidang yang dikonfigurasi seperti koma atau pipa. Garis miring terbalik yang diikuti oleh karakter lain dihapus secara diam-diam.

  • Anda dapat mempertahankan garis miring terbalik dengan mengulanginya. Misalnya:

    path,size C:\\Program Files\\MyApp.exe,6534512
  • Garis miring terbalik yang lolos dari tanda kutip ganda (\"), hanya berfungsi jika disertakan dalam pasangan, jadi kami sarankan untuk menghindari tanda kutip ganda dengan mengulanginya:. ""

  • Jika jumlah bidang dalam satu baris kurang dari jumlah bidang di header, Step Functions menyediakan string kosong untuk nilai yang hilang.

  • Jika jumlah bidang dalam satu baris lebih dari jumlah bidang di header, Step Functions melewatkan bidang tambahan.

Untuk informasi selengkapnya tentang cara Step Functions mem-parsing file teks yang dibatasi, lihat. Example of parsing an input CSV file

Saat eksekusi alur kerja mencapai Map status, Step Functions memanggil aksi GetObjectAPI untuk mengambil file yang ditentukan. MapStatus kemudian mengulangi setiap baris dalam file dan memulai eksekusi alur kerja anak untuk memproses item di setiap baris. Misalnya, misalkan Anda menyediakan file teks yang dibatasi yang berisi 100 baris sebagai input. Kemudian, penerjemah meneruskan setiap baris ke Map negara bagian. MapStatus memproses item dalam urutan serial, dimulai setelah baris header.

catatan
  • Input eksekusi yang digunakan untuk memulai eksekusi alur kerja anak tidak dapat melebihi 256 KiB. Namun, Step Functions mendukung membaca item hingga 8 MB dari file teks yang dibatasi, JSON, atau file JSON Lines jika Anda kemudian menerapkan ItemSelector bidang opsional untuk mengurangi ukuran item.

  • Step Functions mendukung 10 GB sebagai ukuran maksimum file individual di Amazon S3.

  • Step Functions memerlukan izin yang sesuai untuk mengakses kumpulan data Amazon S3 yang Anda gunakan. Untuk informasi tentang kebijakan IAM untuk kumpulan data, lihat. Rekomendasi kebijakan IAM untuk kumpulan data

Tab berikut menunjukkan contoh sintaks ItemReader bidang dan masukan yang diteruskan ke eksekusi alur kerja anak untuk kumpulan data ini.

ItemReader syntax

Misalnya, katakanlah Anda memiliki file CSV bernamaratings.csv. Kemudian, Anda telah menyimpan file ini dalam awalan yang dinamai csvDataset dalam bucket Amazon S3.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW", "CSVDelimiter": "PIPE" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }
Input to a child workflow execution

Status Peta Terdistribusi memulai eksekusi alur kerja anak sebanyak jumlah baris yang ada dalam file CSV, tidak termasuk baris header, jika dalam file. Contoh berikut menunjukkan input yang diterima oleh eksekusi alur kerja anak.

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

File parket dapat digunakan sebagai sumber input. File Apache Parquet yang disimpan di Amazon S3 menyediakan pemrosesan data kolumnar yang efisien dalam skala besar.

Saat menggunakan file Parket, ketentuan berikut berlaku:

  • 256MB adalah ukuran row-group maksimum, dan 5MB adalah ukuran footer maksimum. Jika Anda memberikan file input yang melebihi batas mana pun, mesin status Anda akan mengembalikan kesalahan runtime.

  • VersionIdBidang tidak didukung untukInputType=Parquet.

  • Kompresi data internal GZIP, ZSTD, dan Snappy didukung secara native. Tidak ada ekstensi nama file yang diperlukan.

Berikut ini menunjukkan contoh konfigurasi ASL untuk InputType diatur ke Parket:

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "PARQUET" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "my-parquet-data-file-1.parquet" } }
Pemrosesan pekerjaan skala besar

Untuk pekerjaan skala yang sangat besar, Step Functions akan menggunakan banyak pembaca input. Pembaca meninggalkan pemrosesan mereka, yang mungkin mengakibatkan beberapa pembaca berhenti sementara yang lain maju. Kemajuan intermiten adalah perilaku yang diharapkan dalam skala.

Anda dapat menggunakan file manifes Athena, yang dihasilkan dari hasil UNLOAD kueri, untuk menentukan sumber file data untuk status Peta Anda. Anda mengatur ManifestType keATHENA_DATA, dan InputType ke salah satuCSV,JSONL, atauParquet.

Saat menjalankan UNLOAD kueri, Athena menghasilkan file manifes data selain objek data aktual. File manifes menyediakan daftar CSV terstruktur dari file data. File manifes dan data disimpan ke lokasi hasil kueri Athena Anda di Amazon S3.

UNLOAD (<YOUR_SELECT_QUERY>) TO 'S3_URI_FOR_STORING_DATA_OBJECT' WITH (format = 'JSON')

Tinjauan konseptual proses, secara singkat:

  1. Pilih data Anda dari Tabel menggunakan UNLOAD kueri di Athena.

  2. Athena akan menghasilkan file manifes (CSV) dan objek data di Amazon S3.

  3. Konfigurasikan Step Functions untuk membaca file manifes dan memproses input.

Fitur ini dapat memproses format keluaran CSV, JSONL, dan Parket dari Athena. Semua objek yang direferensikan dalam satu file manifes harus InputType format yang sama. Perhatikan bahwa objek CSV yang diekspor oleh UNLOAD kueri tidak menyertakan header di baris pertama. Lihat CSVHeaderLocation apakah Anda perlu memberikan header kolom.

Konteks peta juga akan menyertakan $states.context.Map.Item.Source sehingga Anda dapat menyesuaikan pemrosesan berdasarkan sumber data.

Berikut ini adalah contoh konfigurasi yang ItemReader dikonfigurasi untuk menggunakan manifes Athena:

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "ManifestType": "ATHENA_DATA", "InputType": "CSV | JSONL | PARQUET" }, "Arguments": { "Bucket": "<S3_BUCKET_NAME>", "Key": "<S3_KEY_PREFIX><QUERY_ID>-manifest.csv" } }
Menggunakan pola manifes Athena di Workflow Studio

Skenario umum untuk pemrosesan data menerapkan Peta ke data yang bersumber dari kueri Athena UNLOAD. Peta memanggil fungsi Lambda untuk memproses setiap item yang dijelaskan dalam manifes Athena. Step Functions Workflow Studio menyediakan pola siap pakai yang menggabungkan semua komponen ini menjadi blok seret ke kanvas mesin status Anda.

Status Peta Terdistribusi dapat menerima file manifes inventaris Amazon S3 yang disimpan di bucket Amazon S3 sebagai kumpulan data.

Saat eksekusi alur kerja mencapai Map status, Step Functions akan memanggil tindakan GetObjectAPI untuk mengambil file manifes inventaris Amazon S3 yang ditentukan.

Secara default, Map status kemudian mengulangi objek dalam inventaris untuk mengembalikan array metadata objek inventaris Amazon S3.

Jika Anda menentukan ManifestType adalah S3_INVENTORY maka InputType tidak dapat ditentukan.

catatan
  • Step Functions mendukung 10 GB sebagai ukuran maksimum file individual dalam laporan inventaris Amazon S3 setelah dekompresi. Namun, Step Functions dapat memproses lebih dari 10 GB jika setiap file individu di bawah 10 GB.

  • Step Functions memerlukan izin yang sesuai untuk mengakses kumpulan data Amazon S3 yang Anda gunakan. Untuk informasi tentang kebijakan IAM untuk kumpulan data, lihat. Rekomendasi kebijakan IAM untuk kumpulan data

Berikut ini adalah contoh file inventaris dalam format CSV. File ini menyertakan objek bernama csvDataset danimageDataset, yang disimpan dalam bucket Amazon S3 yang diberi nama. amzn-s3-demo-source-bucket

"amzn-s3-demo-source-bucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "amzn-s3-demo-source-bucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "amzn-s3-demo-source-bucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "amzn-s3-demo-source-bucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
penting

Step Functions tidak mendukung laporan inventaris Amazon S3 yang ditentukan pengguna sebagai kumpulan data.

Format keluaran laporan inventaris Amazon S3 Anda harus CSV.

Untuk informasi selengkapnya tentang inventaris Amazon S3 dan cara mengaturnya, lihat Inventaris Amazon S3.

Contoh berikut dari file manifes inventaris Amazon S3 menunjukkan header CSV untuk metadata objek inventaris.

{ "sourceBucket" : "amzn-s3-demo-source-bucket", "destinationBucket" : "arn:aws:s3:::amzn-s3-demo-inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "amzn-s3-demo-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

Tab berikut menunjukkan contoh sintaks ItemReader bidang dan masukan yang diteruskan ke eksekusi alur kerja anak untuk kumpulan data ini.

ItemReader syntax
"ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Key": "destination-prefix/amzn-s3-demo-bucket/config-id/YYYY-MM-DDTHH-MMZ/manifest.json" } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "amzn-s3-demo-source-bucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

Bergantung pada bidang yang Anda pilih saat mengonfigurasi laporan inventaris Amazon S3, konten file manifest.json Anda mungkin berbeda dari contoh.

Rekomendasi kebijakan IAM untuk kumpulan data

Saat Anda membuat alur kerja dengan konsol Step Functions, Step Functions dapat secara otomatis menghasilkan kebijakan IAM berdasarkan sumber daya dalam definisi alur kerja Anda. Kebijakan yang dihasilkan mencakup hak istimewa paling sedikit yang diperlukan untuk memungkinkan peran mesin status menjalankan tindakan StartExecution API untuk status Peta Terdistribusi dan AWS sumber daya akses, seperti bucket dan objek Amazon S3, serta fungsi Lambda.

Kami merekomendasikan untuk memasukkan hanya izin yang diperlukan dalam kebijakan IAM Anda. Misalnya, jika alur kerja Anda menyertakan Map status dalam mode Terdistribusi, cakup kebijakan Anda ke bucket dan folder Amazon S3 tertentu yang berisi data Anda.

penting

Jika Anda menentukan bucket dan objek Amazon S3, atau awalan, dengan jalur referensi ke pasangan nilai kunci yang ada di input status Peta Terdistribusi, pastikan Anda memperbarui kebijakan IAM untuk alur kerja Anda. Cakupan kebijakan hingga ke bucket dan nama objek yang diselesaikan jalur saat runtime.

Contoh berikut menunjukkan teknik untuk memberikan hak istimewa paling sedikit yang diperlukan untuk mengakses kumpulan data Amazon S3 Anda menggunakan ListObjects tindakan V2 dan API. GetObject

contoh kondisi menggunakan objek Amazon S3 sebagai kumpulan data

Kondisi berikut memberikan hak istimewa paling sedikit untuk mengakses objek di processImages folder bucket Amazon S3.

"Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } }
contoh menggunakan file CSV sebagai kumpulan data

Contoh berikut menunjukkan tindakan yang diperlukan untuk mengakses file CSV bernamaratings.csv.

"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/csvDataset/ratings.csv" ]
contoh menggunakan inventaris Amazon S3 sebagai kumpulan data

Berikut ini menunjukkan contoh sumber daya untuk manifes inventaris Amazon S3 dan file data.

"Resource": [ "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/data/*" ]
contoh menggunakan ListObjects V2 untuk membatasi ke awalan folder

Saat menggunakan ListObjectsV2, dua kebijakan akan dibuat. Satu diperlukan untuk mengizinkan daftar konten bucket (ListBucket) dan kebijakan lain akan memungkinkan pengambilan objek di bucket (GetObject).

Berikut ini menunjukkan contoh tindakan, sumber daya, dan kondisi:

"Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "/path/to/your/json/" ] } }
"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/path/to/your/json/*" ]

Perhatikan bahwa tidak GetObject akan dicakup dan Anda akan menggunakan wildcard (*) untuk objek.