ItemReader (Map) - AWS Step Functions

ItemReader (Map)

El campo ItemReader es un objeto JSON que especifica un conjunto de datos y su ubicación. Un estado Map Distributed usa este conjunto de datos como entrada.

En el siguiente ejemplo se muestra la sintaxis del campo ItemReader en un flujo de trabajo basado en JSONPath para un conjunto de datos de un archivo delimitado por texto almacenado en un bucket de Amazon S3.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv", "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

En el siguiente flujo de trabajo basado en JSONata, observe que Parameters se sustituye por argumentos.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" "VersionId": "BcK42coT2jE1234VHLUvBV1yLNod2OEt" } }

Contenido del campo ItemReader

El contenido del campo ItemReader varía según el conjunto de datos. Por ejemplo, si el conjunto de datos es una matriz JSON transferida desde un paso anterior del flujo de trabajo, el campo ItemReader se omite. Si el conjunto de datos es un origen de datos de Amazon S3, este campo contiene los siguientes subcampos.

Resource

La acción de integración de la API de Amazon S3 que utilizará Step Functions, como arn:aws:states:::s3:getObject

Arguments (JSONata) or Parameters (JSONPath)

Un objeto JSON que especifica el nombre del bucket de Amazon S3 y la clave de objeto en los que se almacena el conjunto de datos.

Si el bucket tiene habilitado el control de versiones, también puede proporcionar la versión del objeto de Amazon S3.

ReaderConfig

Un objeto JSON que especifica los detalles siguientes:

  • InputType

    Acepta uno de los siguientes valores: CSV, JSON, JSONL, PARQUET, MANIFEST.

    Especifica el tipo de origen de datos de Amazon S3, como un archivo delimitado por texto (CSV), un objeto, un archivo JSON, líneas de JSON, un archivo de Parquet, un manifiesto de Athena o una lista de inventario de Amazon S3. En Workflow Studio, puede seleccionar un tipo de entrada del origen de elementos de S3.

    La mayoría de los tipos de entrada que utilizan la recuperación S3GetObject también admiten los campos ExpectedBucketOwner y VersionId en sus parámetros. Los archivos de Parquet son la única excepción que no admite VersionId.

    Los archivos de entrada admiten los siguientes tipos de compresión externa: GZIP, ZSTD.

    Ejemplos de nombres de archivo: myObject.jsonl.gz y myObject.csv.zstd.

    Nota: Los archivos de Parquet son un tipo de archivo binario que se comprime internamente. Se admite la compresión GZIP, ZSTD y Snappy.

  • Transformation

    Opcional. El valor será NONE o LOAD_AND_FLATTEN.

    Si no se especifica, se asumirá NONE. Si se establece en LOAD_AND_FLATTEN, también debe establecer InputType.

    Comportamiento predeterminado: el mapa recorrerá en iteración los objetos de metadatos devueltos por las llamadas a S3:ListObjectsV2. Si se establece en LOAD_AND_FLATTEN, el mapa leerá y procesará los objetos de datos reales a los que se hace referencia en la lista de resultados.

  • ManifestType

    Opcional. El valor será ATHENA_DATA o S3_INVENTORY.

    Nota: Si se establece en S3_INVENTORY, no se debe especificar también InputType porque se asume que el tipo es CSV.

  • CSVDelimiter

    Puede especificar este campo cuando InputType es CSV o MANIFEST.

    Acepta uno de los siguientes valores: COMMA (predeterminado), PIPE, SEMICOLON, SPACE, TAB.

    nota

    Con el campo CSVDelimiter, ItemReader puede procesar archivos que estén delimitados por caracteres distintos de una coma. Las referencias a los “archivos CSV” también incluyen los archivos que utilizan delimitadores alternativos especificados por el campo CSVDelimiter.

  • CSVHeaderLocation

    Puede especificar este campo cuando InputType es CSV o MANIFEST.

    Acepta uno de los siguientes valores para especificar la ubicación del encabezado de la columna:

    • FIRST_ROW – Utilice esta opción si la primera línea del archivo es el encabezado.

    • GIVEN – Utilice esta opción para especificar el encabezado dentro de la definición de la máquina de estado.

      Por ejemplo, si el archivo contiene los datos siguientes.

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      Puede proporcionar la siguiente matriz JSON como encabezado CSV.

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    Tamaño del encabezado CSV

    Step Functions admite encabezados de hasta 10 KiB para archivos delimitados por texto.

  • MaxItems

    De forma predeterminada, el estado Map recorre en iteración todos los elementos del conjunto de datos especificado. Al establecer MaxItems, puede limitar el número de elementos de datos que se pasan al estado Map. Por ejemplo, si proporciona un archivo delimitado por texto que contiene 1000 filas y establece un límite de 100, el intérprete pasará solo 100 filas al estado Distributed Map. El estado Map procesa los elementos en orden secuencial, a partir de la fila siguiente al encabezado.

    Para los flujos de trabajo de JSONPath, puede usar MaxItemsPath y una ruta de referencia a un par clave-valor en la entrada de estado que da como resultado un número entero. Tenga en cuenta que puede especificar MaxItems o MaxItemsPath, pero no ambos.

    nota

    Puede especificar un límite de hasta 100 000 000 después del cual Distributed Map dejará de leer elementos.

Requisitos para la cuenta y la región

Los buckets de Amazon S3 deben estar en la misma Cuenta de AWS y Región de AWS que la máquina de estado.

Tenga en cuenta que, aunque su máquina de estado pueda acceder a los archivos de buckets en distintas Cuentas de AWS que se encuentran en la misma Región de AWS, Step Functions solo permite mostrar objetos en buckets de Amazon S3 que estén en la misma Cuenta de AWS y en la misma Región de AWS que la máquina de estado.

Procesamiento de conjuntos de datos anidados (actualizado el 11 de septiembre de 2025)

Con el nuevo parámetro Transformation, puede especificar un valor de LOAD_AND_FLATTEN y el mapa leerá los objetos de datos reales a los que se hace referencia en la lista de resultados de una llamada a S3:ListObjectsV2.

Antes de esta versión, era necesario crear Distributed Maps anidados para recuperar los metadatos y, posteriormente, procesar los datos reales. El primer mapa recorrería en iteración los metadatos devueltos por S3:ListObjectsV2 e invocaría los flujos de trabajo secundarios. Otro mapa dentro de cada máquina de estado secundaria leería los datos reales de los archivos individuales. Con la opción de transformación, puede realizar ambos pasos a la vez.

Imagine que quiere realizar una auditoría diaria de los últimos 24 archivos de registro que su sistema produce cada hora y almacena en Amazon S3. Su estado Distributed Map puede mostrar los archivos de registro con S3:ListObjectsV2 y luego recorrer en iteración los metadatos de cada objeto, o bien ahora puede cargar y analizar los objetos de datos reales almacenados en su bucket de Amazon S3.

El uso de la opción LOAD_AND_FLATTEN puede aumentar la escalabilidad, reducir los recuentos de ejecuciones de Map abierto y procesar varios objetos de forma simultánea. Los trabajos de Athena y Amazon EMR suelen generar resultados que se pueden procesar con la nueva configuración.

A continuación se muestra un ejemplo de los parámetros en una definición ItemReader:

{ "QueryLanguage": "JSONata", "States": { ... "Map": { ... "ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { // InputType is required if Transformation is LOAD_AND_FLATTEN. "InputType": "CSV | JSON | JSONL | PARQUET", // Transformation is OPTIONAL and defaults to NONE if not present "Transformation": "NONE | LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket1", "Prefix": "{% $states.input.PrefixKey %}" } }, ... } }

Ejemplos de conjuntos de datos

Puede especificar una de las opciones siguientes como conjunto de datos:

nota

Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

Un estado Map Distributed puede aceptar una entrada JSON transferida desde un paso anterior del flujo de trabajo.

La entrada puede ser una matriz JSON, un objeto JSON o una matriz dentro de un nodo de un objeto JSON.

Step Functions recorrerá en iteración directamente sobre los elementos de una matriz o los pares clave-valor de un objeto JSON.

Para seleccionar un nodo específico que contenga una matriz de un objeto JSON, puede usar ItemsPath (solo Map, JSONPath) o una expresión de JSONata en el campo Items para los estados de JSONata.

Para procesar los elementos individuales, el estado Distributed Map inicia la ejecución de un flujo de trabajo secundario para cada elemento. Las siguientes pestañas muestran ejemplos de la entrada que se transfiere al estado Map y la entrada correspondiente a la ejecución de un flujo de trabajo secundario.

nota

El campo ItemReader no es necesario cuando el conjunto de datos son datos JSON de un paso anterior.

Input passed to the Map state

Considera la siguiente matriz JSON de tres elementos.

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

El estado Map Distributed inicia tres ejecuciones de flujos de trabajo secundarios. Cada ejecución recibe un elemento de matriz como entrada. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Un estado Map Distributed puede iterar sobre los objetos que se almacenan en un bucket de Amazon S3. Cuando la ejecución del flujo de trabajo alcanza el estado Map, Step Functions invoca la acción de la API ListObjectsV2, que devuelve una matriz de metadatos de objetos de Amazon S3. En esta matriz, cada elemento contiene datos, como ETag y Key, de los datos reales almacenados en el bucket.

Para procesar los elementos individuales de la matriz, el estado Map Distributed inicia la ejecución de un flujo de trabajo secundario. Por ejemplo, suponga que su bucket de Amazon S3 contiene 100 imágenes. A continuación, la matriz devuelta tras invocar la acción de la API ListObjectsV2 contiene 100 elementos de metadatos. A continuación, el estado Distributed Map inicia 100 ejecuciones de flujos de trabajo secundarios para procesar cada elemento.

Para procesar los objetos de datos directamente, sin flujos de trabajo anidados, puede elegir la opción de transformación LOAD_AND_FLATTEN para procesar los elementos directamente.

nota
  • Step Functions también incluirá un elemento para cada carpeta que cree en un bucket de Amazon S3 utilizando la consola de Amazon S3. Los elementos de la carpeta provocan el inicio de más ejecuciones de flujos de trabajo secundarios.

    Para evitar crear una ejecución de flujo de trabajo secundaria adicional para cada carpeta, le recomendamos que la utilice AWS CLI para crear carpetas. Para obtener más información, consulte Uso de comandos de S3 de alto nivel en la Guía del usuario de AWS Command Line Interface.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

ItemReader syntax

En este ejemplo, ha organizado los datos, que incluyen imágenes, archivos JSON y objetos, dentro de un prefijo llamado processData en un bucket de Amazon S3 llamado amzn-s3-demo-bucket.

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Prefix": "processData" } }
Input passed to a child workflow execution

El estado Distributed Map inicia tantas ejecuciones de flujos de trabajo secundarios como el número de elementos de metadatos presentes en el bucket de Amazon S3. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

Gracias a la compatibilidad mejorada con S3 ListObjectsV2 como origen de entrada en Distributed Map, sus máquinas de estado pueden leer y procesar varios objetos de datos directamente desde los buckets de Amazon S3, lo que elimina la necesidad de mapas anidados para procesar los metadatos.

Con la opción LOAD_AND_FLATTEN, su máquina de estado hará lo siguiente:

  • Leer el contenido real de cada objeto mostrado por la llamada ListObjectsV2 de Amazon S3.

  • Analizar el contenido en función del InputType (CSV, JSON, JSONL, Parquet).

  • Creer elementos a partir del contenido del archivo (filas/registros) en lugar de los metadatos.

Con la opción de transformación, ya no necesitará Distributed Maps anidados para procesar los metadatos. El uso de la opción LOAD_AND_FLATTEN aumenta la escalabilidad, reduce los recuentos de ejecuciones de Map activo y procesa varios objetos de forma simultánea.

En la siguiente configuración se muestran los valores para un ItemReader:

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "ReaderConfig": { "InputType": "JSON", "Transformation": "LOAD_AND_FLATTEN" }, "Arguments": { "Bucket": "S3_BUCKET_NAME", "Prefix": "S3_BUCKET_PREFIX" } }
Recomendación de prefijo de bucket

Le recomendamos incluir una barra diagonal final en el prefijo. Por ejemplo, si selecciona datos con un prefijo de folder1, su máquina de estado procesará folder1/myData.csv y folder10/myData.csv. El uso de folder1/ procesará estrictamente una sola carpeta.

Un estado Map Distributed puede aceptar un archivo JSON almacenado en un bucket de Amazon S3 como conjunto de datos. El archivo JSON debe contener una matriz u objeto JSON.

Cuando la ejecución del flujo de trabajo alcanza el estado Map, Step Functions invoca la acción de la API GetObject para recuperar el archivo JSON especificado.

Si el archivo JSON contiene una estructura de objetos anidados, puede seleccionar el nodo específico con su conjunto de datos con un ItemsPointer. Por ejemplo, la siguiente configuración extraería una lista anidada de productos destacados del inventario.

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON", "ItemsPointer": "/inventory/products/featured" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "nested-data-file.json" } }

A continuación, el estado Map se repite sobre cada elemento de la matriz e inicia la ejecución de un flujo de trabajo secundario para cada elemento. Por ejemplo, si el archivo JSON contiene 1000 elementos de matriz, el estado Map inicia 1000 ejecuciones de flujos de trabajo secundarios.

nota
  • La entrada de ejecución utilizada para iniciar la ejecución de un flujo de trabajo secundario no puede superar los 256 KiB. Sin embargo, Step Functions permite leer un elemento de hasta 8 MB de un archivo delimitado por texto, un archivo JSON o líneas JSON si, a continuación, se aplica el campo ItemSelector opcional para reducir el tamaño del elemento.

  • Step Functions admite 10 GB como tamaño máximo de un archivo individual en Amazon S3.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

Para este ejemplo, imagine que tiene un archivo JSON llamado factcheck.json. Ha almacenado este archivo en un prefijo llamado jsonDataset en un bucket de Amazon S3. A continuación se muestra un ejemplo de conjunto de datos JSON.

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

El estado Map Distributed inicia tantas ejecuciones de flujos de trabajo secundarios como el número de elementos de la matriz presentes en el archivo JSON. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Un estado Distributed Map puede aceptar un archivo de líneas JSON almacenado en un bucket de Amazon S3 como conjunto de datos.

nota
  • La entrada de ejecución utilizada para iniciar la ejecución de un flujo de trabajo secundario no puede superar los 256 KiB. Sin embargo, Step Functions permite leer un elemento de hasta 8 MB de un archivo delimitado por texto, un archivo JSON o líneas JSON si, a continuación, se aplica el campo ItemSelector opcional para reducir el tamaño del elemento.

  • Step Functions admite 10 GB como tamaño máximo de un archivo individual en Amazon S3.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

Para este ejemplo, imagine que tiene un archivo de líneas JSON llamado factcheck.jsonl. Ha almacenado este archivo en un prefijo llamado jsonlDataset en un bucket de Amazon S3. A continuación, se muestra un ejemplo del contenido de un archivo.

{"verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech"} {"verdict": "false", "statement_date": "6/7/2022", "statement_source": "television"} {"verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news"}
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSONL" }, "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "jsonlDataset/factcheck.jsonl" } }
Input to a child workflow execution

El estado Map Distributed inicia tantas ejecuciones de flujos de trabajo secundarios como el número de líneas presentes en el archivo JSON. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }
nota

Con el campo CSVDelimiter, ItemReader puede procesar archivos que estén delimitados por caracteres distintos de una coma. Las referencias a los “archivos CSV” también incluyen los archivos que utilizan delimitadores alternativos especificados por el campo CSVDelimiter.

Un estado Map Distributed puede aceptar un archivo delimitado por texto almacenado en un bucket de Amazon S3 como conjunto de datos. Si usa un archivo delimitado por texto como conjunto de datos, debe especificar un encabezado de columna. Para obtener información sobre cómo especificar un encabezado, consulte Contenido del campo ItemReader.

Step Functions analiza los archivos delimitados por texto según las siguientes reglas:

  • El delimitador que separa los campos lo especifica CSVDelimiter en ReaderConfig. El valor predeterminado del delimitador es COMMA.

  • Los retornos de carro son un delimitador que separa los registros.

  • Los campos se tratan como cadenas. Para las conversiones de tipos de datos, utilice la función intrínseca States.StringToJson en ItemSelector (Map).

  • No es necesario incluir comillas dobles (" ") para delimitar cadenas. No obstante, las cadenas entre comillas dobles pueden contener comas y retornos de carro sin que funcionen como delimitadores de registro.

  • Para conservar las comillas dobles en una secuencia de conservación, repítalas.

  • Las barras diagonales invertidas (\) son otra forma de evitar los caracteres especiales. Las barras diagonales invertidas solo funcionan con otras barras diagonales invertidas, comillas dobles y el separador de campos configurado, como una coma o una barra vertical. La barra diagonal invertida seguida de cualquier otro carácter se elimina en silencio.

  • Para conservar las barras diagonales invertidas repitiéndolas. Por ejemplo:

    path,size C:\\Program Files\\MyApp.exe,6534512
  • Las barras diagonales invertidas que evitan las comillas dobles (\") solo funcionan cuando se incluyen en pares, por lo que recomendamos evitar las comillas dobles repitiéndolas: ""

  • Si el número de campos de una fila es inferior al número de campos del encabezado, Step Functions proporciona cadenas vacías para los valores que faltan.

  • Si el número de campos de una fila es mayor que el número de campos del encabezado, Step Functions omite los campos adicionales.

Para obtener más información acerca de cómo Step Functions analiza un archivo delimitado por texto, consulte Example of parsing an input CSV file.

Cuando la ejecución del flujo de trabajo alcanza el estado Map, Step Functions invoca la acción de la API GetObject para recuperar el archivo especificado. A continuación, el estado Map recorre en iteración cada fila del archivo e inicia la ejecución de un flujo de trabajo secundario para procesar los elementos de cada fila. Por ejemplo, suponga que proporciona un archivo delimitado por texto que contiene 100 filas como entrada. Entonces, el intérprete pasa cada fila al estado Map. El estado Map procesa los elementos en orden de serie, a partir de la fila siguiente al encabezado.

nota
  • La entrada de ejecución utilizada para iniciar la ejecución de un flujo de trabajo secundario no puede superar los 256 KiB. Sin embargo, Step Functions permite leer un elemento de hasta 8 MB de un archivo delimitado por texto, un archivo JSON o líneas JSON si, a continuación, se aplica el campo ItemSelector opcional para reducir el tamaño del elemento.

  • Step Functions admite 10 GB como tamaño máximo de un archivo individual en Amazon S3.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

ItemReader syntax

Por ejemplo, supongamos que tiene un archivo CSV llamado ratings.csv. A continuación, ha almacenado este archivo dentro de un prefijo llamado csvDataset en un bucket de Amazon S3.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW", "CSVDelimiter": "PIPE" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-bucket", "Key": "csvDataset/ratings.csv" } }
Input to a child workflow execution

El estado Map Distributed inicia tantas ejecuciones de flujos de trabajo secundarios como el número de filas presentes en el archivo CSV, excluida la fila del encabezado, si está en el archivo. El siguiente ejemplo muestra la entrada recibida por la ejecución de un flujo de trabajo secundario.

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

Los archivos de Parquet se pueden utilizar como origen de entrada. Los archivos de Apache Parquet almacenados en Amazon S3 proporcionan un procesamiento de datos en columnas eficiente y a escala.

Cuando se utilicen archivos de Parquet, se aplicarán las siguientes condiciones:

  • 256 MB es el tamaño máximo de un grupo de filas y 5 MB es el tamaño máximo de pie de página. Si proporciona archivos de entrada que superan cualquiera de estos límites, la máquina de estado devolverá un error de tiempo de ejecución.

  • El campo VersionId no se admite para InputType=Parquet.

  • La compresión de datos interna con GZIP, ZSTD y Snappy es compatible de forma nativa. No son necesarias extensiones de nombre de archivo.

A continuación, se muestra un ejemplo de configuración de ASL para InputType establecido en Parquet:

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "PARQUET" }, "Arguments": { "Bucket": "amzn-s3-demo-bucket", "Key": "my-parquet-data-file-1.parquet" } }
Procesamiento de trabajos a gran escala

Para trabajos a gran escala, Step Functions utilizará muchos lectores de entrada. Los lectores intercalan su procesamiento, lo que puede provocar que algunos lectores se detengan mientras otros avanzan. Se espera un progreso intermitente a gran escala.

Puede usar los archivos de manifiesto de Athena, generados a partir de los resultados de la consulta UNLOAD, para especificar el origen de los archivos de datos de su estado Map. Se establece ManifestType en ATHENA_DATA, y InputType en CSV, JSONL o Parquet.

Al ejecutar una consulta UNLOAD, Athena genera un archivo de manifiesto de datos además de los objetos de datos reales. El archivo de manifiesto proporciona una lista CSV estructurada de los archivos de datos. Los archivos de datos y de manifiesto se guardan en la ubicación de resultados de consulta de Athena en Amazon S3.

UNLOAD (<YOUR_SELECT_QUERY>) TO 'S3_URI_FOR_STORING_DATA_OBJECT' WITH (format = 'JSON')

Descripción conceptual del proceso, en resumen:

  1. Seleccione sus datos de una tabla mediante una consulta UNLOAD en Athena.

  2. Athena generará un archivo de manifiesto (CSV) y los objetos de datos en Amazon S3.

  3. Configure Step Functions para leer el archivo de manifiesto y procesar la entrada.

La característica puede procesar los formatos de salida CSV, JSONL y Parquet de Athena. Todos los objetos a los que se hace referencia en un único archivo de manifiesto deben tener el mismo formato de InputType. Tenga en cuenta que los objetos CSV exportados mediante una consulta UNLOAD no incluyen el encabezado en la primera línea. Compruebe CSVHeaderLocation si tiene que proporcionar los encabezados de columnas.

El contexto del mapa también incluirá una $states.context.Map.Item.Source para que pueda personalizar el procesamiento en función del origen de los datos.

A continuación se muestra un ejemplo de configuración de un ItemReader configurado para usar un manifiesto de Athena:

"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "ManifestType": "ATHENA_DATA", "InputType": "CSV | JSONL | PARQUET" }, "Arguments": { "Bucket": "<S3_BUCKET_NAME>", "Key": "<S3_KEY_PREFIX><QUERY_ID>-manifest.csv" } }
Uso del patrón de manifiesto de Athena en Workflow Studio

Un escenario común para el procesamiento de datos es aplicar un Map a los datos procedentes de una consulta UNLOAD de Athena. El Map invoca una función de Lambda para procesar cada elemento descrito en el manifiesto de Athena. Step Functions Workflow Studio proporciona un patrón listo para usar que combina todos estos componentes en un bloque que puede arrastrar al lienzo de su máquina de estado.

Un estado Map Distributed puede aceptar un archivo de manifiesto de inventario de Amazon S3 almacenado en un bucket de Amazon S3 como conjunto de datos.

Cuando la ejecución del flujo de trabajo alcanza el estado Map, Step Functions invoca la acción de la API GetObject para recuperar el archivo de manifiesto de inventario de Amazon S3 especificado.

De forma predeterminada, el estado Map recorre en iteración los objetos del inventario para devolver una matriz de metadatos de objetos de inventario de Amazon S3.

Si especifica que ManifestType es S3_INVENTORY, no se puede especificar InputType.

nota
  • Step Functions admite 10 GB como tamaño máximo de un archivo individual en un informe de inventario de Amazon S3 tras la descompresión. Sin embargo, Step Functions puede procesar más de 10 GB si cada archivo individual tiene menos de 10 GB.

  • Step Functions necesita los permisos adecuados para obtener acceso a los conjuntos de datos de Amazon S3 que utilice. Para obtener información sobre las políticas de IAM para los conjuntos de datos, consulte Recomendaciones de políticas de IAM para conjuntos de datos.

A continuación se muestra un ejemplo de un archivo de inventario de ejemplo en formato CSV: Este archivo incluye los objetos denominados csvDataset yimageDataset, que se almacenan en un bucket de Amazon S3 que lleva ese nombre amzn-s3-demo-source-bucket.

"amzn-s3-demo-source-bucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "amzn-s3-demo-source-bucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "amzn-s3-demo-source-bucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "amzn-s3-demo-source-bucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
importante

Step Functions no admite un informe de inventario de Amazon S3 definido por el usuario como conjunto de datos.

El formato de salida del informe de inventario de Amazon S3 debe ser CSV.

Para obtener más información sobre los inventarios de Amazon S3 y cómo configurarlos, consulte Inventario de Amazon S3.

El siguiente ejemplo de un archivo de manifiesto de inventario de Amazon S3 muestra los encabezados CSV de los metadatos de los objetos de inventario.

{ "sourceBucket" : "amzn-s3-demo-source-bucket", "destinationBucket" : "arn:aws:s3:::amzn-s3-demo-inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "amzn-s3-demo-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

En las siguientes pestañas se muestran ejemplos de la sintaxis del campo ItemReader y de la entrada que se transfiere a la ejecución de un flujo de trabajo secundario para este conjunto de datos.

ItemReader syntax
"ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "amzn-s3-demo-destination-bucket", "Key": "destination-prefix/amzn-s3-demo-bucket/config-id/YYYY-MM-DDTHH-MMZ/manifest.json" } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "amzn-s3-demo-source-bucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

En función de los campos que haya seleccionado al configurar el informe de inventario de Amazon S3, el contenido del archivo manifest.json puede variar con respecto al ejemplo.

Recomendaciones de políticas de IAM para conjuntos de datos

Al crear flujos de trabajo con la consola de Step Functions, Step Functions puede generar automáticamente políticas de IAM basadas en los recursos de la definición de flujo de trabajo. Las políticas generadas incluyen los privilegios mínimos necesarios para permitir que el rol de la máquina de estado invoque la acción de la API StartExecution para el estado Distributed Map y acceda a los recursos de AWS, como buckets y objetos de Amazon S3 y las funciones de Lambda.

Le recomendamos que incluya únicamente los permisos necesarios en las políticas de IAM. Por ejemplo, si el flujo de trabajo incluye un estado Map en modo distribuido, aplique las políticas al bucket y a la carpeta de Amazon S3 específicos que contengan los datos.

importante

Si especifica un bucket y un objeto de Amazon S3, o un prefijo, con una ruta de referencia a un par clave-valor existente en la entrada del estado Map Distributed, no olvide actualizar las políticas de IAM para el flujo de trabajo. Limite las políticas hasta los nombres de objeto y bucket a los que se dirige la ruta en tiempo de ejecución.

En los siguientes ejemplos se muestran técnicas para otorgar los privilegios mínimos necesarios para acceder a los conjuntos de datos de Amazon S3 mediante las acciones de la API ListObjectsV2 y GetObject.

ejemplo condición que usa un objeto de Amazon S3 como conjunto de datos

La siguiente condición otorga privilegios mínimos para acceder a los objetos de una carpeta processImages de un bucket de Amazon S3.

"Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } }
ejemplo uso de un archivo CSV como conjunto de datos

En el siguiente ejemplo se muestran las acciones necesarias para acceder a un archivo CSV denominado ratings.csv.

"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/csvDataset/ratings.csv" ]
ejemplo uso de un inventario de Amazon S3 como conjunto de datos

A continuación, se muestran ejemplos de recursos para un manifiesto de inventario y archivos de datos de Amazon S3.

"Resource": [ "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::myPrefix/amzn-s3-demo-bucket/myConfig-id/data/*" ]
ejemplo uso de ListObjectsV2 para restringirlo a un prefijo de carpeta

Al usar ListObjectsV2, se generarán dos políticas. Una es necesaria para poder mostrar el contenido del bucket (ListBucket) y otra política permitirá recuperar los objetos del bucket (GetObject).

A continuación, se muestran ejemplos de acciones, recursos y una condición:

"Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket" ], "Condition": { "StringLike": { "s3:prefix": [ "/path/to/your/json/" ] } }
"Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/path/to/your/json/*" ]

Tenga en cuenta que GetObject no se limitará al ámbito y utilizará un comodín (*) para el objeto.