Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Creación de un canal de entrada de ML en AWS Clean Rooms ML
Requisitos previos:
-
Y Cuenta de AWS con acceso a AWS Clean Rooms
-
Una colaboración configurada en la AWS Clean Rooms que desea crear el canal de entrada ML
-
Permisos para consultar datos y crear canales de entrada de aprendizaje automático en la colaboración.
-
(Opcional) Un algoritmo modelo existente para asociarlo al canal de entrada de ML o permisos para crear uno nuevo
-
(Opcional) Tablas con reglas de análisis que se pueden ejecutar para el modelo especificado.
-
(Opcional) Una plantilla de consulta o análisis SQL existente que se utilizará para generar el conjunto de datos
-
(Opcional) Un rol de servicio existente con los permisos adecuados o permisos para crear un nuevo rol de servicio
-
(Opcional) Una AWS KMS clave personalizada si quieres usar tu propia clave de cifrado
-
Permisos adecuados para crear y administrar modelos de aprendizaje automático en la colaboración
Un canal de entrada de ML es un conjunto de datos que se crea a partir de una consulta de datos específica. Los miembros con la capacidad de consultar datos pueden preparar sus datos para el entrenamiento y la inferencia mediante la creación de un canal de entrada de aprendizaje automático. La creación de un canal de entrada de aprendizaje automático permite que los datos se utilicen en diferentes modelos de entrenamiento dentro de la misma colaboración. Debe crear canales de entrada de aprendizaje automático independientes para el entrenamiento y la inferencia.
Para crear un canal de entrada de ML, debe especificar la consulta SQL que se utiliza para consultar los datos de entrada y crear el canal de entrada de ML. Los resultados de esta consulta nunca se comparten con ningún miembro y permanecen dentro de los límites de Clean Rooms ML. El nombre de recurso de Amazon (ARN) de referencia se utiliza en los siguientes pasos para entrenar un modelo o ejecutar una inferencia.
- Console
-
Para crear un canal de entrada de aprendizaje automático (consola)
-
Inicie sesión Consola de administración de AWS y abra la AWS Clean Rooms consola en https://console.aws.amazon.com/cleanrooms.
-
En el panel de navegación izquierdo, elija Colaboraciones.
-
En la página de colaboraciones, elija la colaboración en la que desee crear un canal de entrada de aprendizaje automático.
-
Cuando se abra la colaboración, selecciona la pestaña de modelos de aprendizaje automático.
-
En Modelos ML personalizados, en la sección Canales de entrada ML, selecciona Crear canal de entrada ML.
-
En la página Crear canal de entrada ML, para ver los detalles del canal de entrada ML, haga lo siguiente:
-
En Nombre, introduce un nombre único para tu canal.
-
(Opcional) En Descripción, introduce una descripción de tu canal.
-
En Algoritmo del modelo asociado, seleccione el algoritmo que desee utilizar.
Elija Asociar algoritmo de modelo para agregar uno nuevo.
-
-
En Dataset, elige un método para generar el conjunto de datos de entrenamiento:
-
Elija una consulta SQL para usar los resultados de una consulta SQL como conjunto de datos de entrenamiento.
Si eligió una consulta SQL, introduzca la consulta en el campo de consulta SQL.
(Opcional) Para importar una consulta que haya utilizado recientemente, seleccione Importar de consultas recientes.
-
Elija una plantilla de análisis para usar los resultados de una plantilla de análisis como conjunto de datos de entrenamiento.
aviso
La generación de datos sintéticos protege contra la inferencia de atributos individuales, ya sea que haya personas específicas presentes en el conjunto de datos original o que estén presentes los atributos de aprendizaje de esas personas. Sin embargo, no impide que los valores literales del conjunto de datos original, incluida la información de identificación personal (PII), aparezcan en el conjunto de datos sintético.
Recomendamos evitar los valores en el conjunto de datos de entrada que estén asociados a un solo sujeto de datos, ya que pueden volver a identificar a un sujeto de datos. Por ejemplo, si solo un usuario vive en un código postal, la presencia de ese código postal en el conjunto de datos sintético confirmaría que el usuario estaba en el conjunto de datos original. Para mitigar este riesgo, se pueden utilizar técnicas como truncar valores de alta precisión o reemplazar catálogos poco comunes por otros. Estas transformaciones pueden formar parte de la consulta utilizada para crear el canal de entrada de ML.
-
Si no hay tablas asociadas, elija Asociar tabla para agregar tablas con una regla de análisis que se pueda ejecutar para el modelo especificado.
-
Elija el tipo de trabajador que se utilizará al crear este canal de datos. El tipo de trabajador predeterminado es CR.1X. Especifique el número de trabajadores que se van a utilizar. El número de trabajador predeterminado es 16. Para especificar las propiedades de Spark:
-
Amplíe Propiedades de Spark.
-
Selecciona Añadir propiedades de Spark.
-
En el cuadro de diálogo de propiedades de Spark, elija un nombre de propiedad en la lista desplegable e introduzca un valor.
En la siguiente tabla se proporciona una definición para cada propiedad.
Para obtener más información sobre las propiedades de Spark, consulte Propiedades de Spark
en la documentación de Apache Spark. Nombre de propiedad Description (Descripción) Valor predeterminado Fallos en Spark.task.max
Controla cuántas veces consecutivas puede fallar una tarea antes de que el trabajo falle. Requiere un valor mayor o igual a 1. El número de reintentos permitidos es igual a este valor menos 1. El recuento de errores se restablece si algún intento tiene éxito. Los errores en las distintas tareas no se acumulan hasta alcanzar este límite.
4
spark.sql.files. maxPartitionBytes
Establece el número máximo de bytes que se deben empaquetar en una sola partición al leer fuentes basadas en archivos, como Parquet, JSON y ORC.
128 MB
Spark.Hadoop.FS.S3.max se reintenta
Establece el número máximo de reintentos para las operaciones con archivos de Amazon S3.
spark.network.timeout
Establece el tiempo de espera predeterminado para todas las interacciones de la red. Anula los siguientes ajustes de tiempo de espera si no están configurados:
-
spark.storage. blockManagerHeartbeatTimeoutMs
-
Tiempo de espera de conexión de Spark.Shuffle.io
-
Spark.rpc.askTimeout
-
Se ha agotado el tiempo de espera de Spark.rpc Lookup
120 s
spark.rdd.com press
Especifica si se deben comprimir las particiones RDD serializadas mediante spark.io.compression.codec. Se aplica a StorageLevel .MEMORY_ONLY_SER en Java y Scala, o a .MEMORY_ONLY en Python. StorageLevel Reduce el espacio de almacenamiento, pero requiere más tiempo de procesamiento de la CPU.
FALSO
spark.shuffle.spill.compress
Especifica si se deben comprimir los datos de Shuffle Spill mediante spark.io.compression.codec.
TRUE
spark.sql.adaptive. advisoryPartitionSizeInBytes
Establece el tamaño objetivo en bytes para las particiones aleatorias durante la optimización adaptativa cuando spark.sql.adaptive.enabled es true. Controla el tamaño de las particiones al unir particiones pequeñas o dividir particiones asimétricas.
(valor de spark.sql.adaptive.shuffle. targetPostShuffleInputSize)
spark.sql.adaptive. autoBroadcastJoinUmbral
Establece el tamaño máximo de la tabla en bytes para la transmisión a los nodos trabajadores durante las uniones. Se aplica solo en un marco adaptativo. Utiliza el mismo valor predeterminado que spark.sql. autoBroadcastJoinUmbral. Configúrelo en -1 para deshabilitar la transmisión.
(ninguno)
spark.sql.adaptive.coalescePartitions.enabled
Especifica si se van a unir particiones aleatorias contiguas en función de spark.sql.adaptive. advisoryPartitionSizeInBytes para optimizar el tamaño de la tarea. Requiere que spark.sql.adaptive.enabled sea verdadero.
TRUE
Particiones spark.sql.Adaptive.Coalesce. initialPartitionNum
Define el número inicial de particiones de mezcla antes de la fusión. Requiere que spark.sql.adaptive.enabled y spark.sql.Adaptive.coalescePartitions.enabled sean verdaderos. El valor predeterminado es spark.sql.shuffle.partition.
(ninguno)
Spark.sql.Adaptive.CoalescePartitions. minPartitionSize
Establece el tamaño mínimo de las particiones aleatorias combinadas para evitar que las particiones se vuelvan demasiado pequeñas durante la optimización adaptativa.
1 MB
spark.sql.adaptive.coalescePartitions.ParalelismFirst
Especifica si se deben calcular los tamaños de las particiones en función del paralelismo del clúster en lugar de en spark.sql.adaptive. advisoryPartitionSizeInBytes durante la fusión de particiones. Genera tamaños de partición más pequeños que el tamaño objetivo configurado para maximizar el paralelismo. Se recomienda establecer este valor en false en los clústeres ocupados para mejorar la utilización de los recursos y evitar el exceso de tareas pequeñas.
TRUE
spark.sql.adaptive.enabled
Especifica si se debe habilitar la ejecución adaptativa de consultas para volver a optimizar los planes de consultas durante la ejecución de las consultas, en función de estadísticas de tiempo de ejecución precisas.
TRUE
spark.sql.adaptive. forceOptimizeSkewed¡Únete!
Especifica si se debe forzar la activación OptimizeSkewedJoin incluso si se introduce una mezcla adicional.
FALSO
spark.sql.adaptive. localShuffleReader.habilitado
Especifica si se deben utilizar lectores aleatorios locales cuando no se requiera la creación de particiones aleatorias, por ejemplo, después de la conversión de uniones por ordenación y combinación a uniones por hash de difusión. Requiere que spark.sql.adaptive.enabled sea verdadero.
TRUE
spark.sql.adaptive. maxShuffledHashJoinLocalMapThreshold
Establece el tamaño máximo de partición en bytes para crear mapas hash locales. Prioriza las uniones hash mezcladas sobre las uniones por ordenación y combinación cuando:
-
Este valor es igual o superior a spark.sql.adaptive. advisoryPartitionSizeInBytes
-
Todos los tamaños de partición están dentro de este límite
Anula spark.sql.join. preferSortMergeUnirse a la configuración.
0 bytes
spark.sql.adaptive. optimizeSkewsInRebalancePartitions.habilitado
Especifica si se deben optimizar las particiones aleatorias asimétricas dividiéndolas en particiones más pequeñas según spark.sql.adaptive. advisoryPartitionSizeInBytes. Requiere que spark.sql.adaptive.enabled sea verdadero.
TRUE
spark.sql.adaptive. rebalancePartitionsSmallPartitionFactor
Define el factor de umbral de tamaño para la fusión de particiones durante la división. Las particiones más pequeñas que este factor se multiplican por spark.sql.adaptive. advisoryPartitionSizeInBytes están fusionadas.
0.2
spark.sql.adaptive.skewJoin.enabled
Especifica si se debe tratar la asimetría de datos en las uniones mezcladas dividiendo y, opcionalmente, replicando las particiones asimétricas. Se aplica a las uniones hash ordenadas, combinadas y barajadas. Requiere que spark.sql.adaptive.enabled sea verdadero.
TRUE
spark.sql.adaptive.skewJoin. skewedPartitionFactor
Determina el factor de tamaño que determina el sesgo de la partición. Una partición está sesgada cuando su tamaño supera las dos condiciones:
-
Este factor se multiplica por la mediana del tamaño de la partición
-
El valor de spark.sql.adaptive.skewJoin. skewedPartitionThresholdInBytes
5
spark.sql.adaptive.skewJoin. skewedPartitionThresholdInBytes
Establece el umbral de tamaño en bytes para identificar las particiones asimétricas. Una partición está sesgada cuando su tamaño supera las dos condiciones:
-
Este umbral
-
El tamaño medio de la partición multiplicado por spark.sql.adaptive.skewJoin. skewedPartitionFactor
Recomendamos establecer este valor por encima de spark.sql.adaptive. advisoryPartitionSizeInBytes.
256 MB
spark.sql. autoBroadcastJoinUmbral
Establece el tamaño máximo de la tabla en bytes para la transmisión a los nodos trabajadores durante las uniones. Configúrelo en -1 para deshabilitar la transmisión.
10 MB
Spark.sql.Tiempo de espera de emisión
Controla el tiempo de espera en segundos para las operaciones de transmisión durante las uniones de transmisión.
300 segundos
spark.sql.cbo.enabled
Especifica si se debe habilitar la optimización basada en costes (CBO) para la estimación de las estadísticas del plan.
FALSO
spark.sql.cbo.joinreorder.dp.star.filter
Especifica si se debe aplicar la heurística del filtro de unión en estrella durante la enumeración de uniones basada en el coste.
FALSO
spark.sql.cbo.joinReorder.dp.Threshold
Establece el número máximo de nodos unidos permitidos en el algoritmo de programación dinámica.
12
spark.sql.cbo.joinReorder.Enabled
Especifica si se debe habilitar el reordenamiento de las uniones en la optimización basada en costes (CBO).
FALSO
Spark.sql.cbo.PlanStats.Enabled
Especifica si se deben obtener los recuentos de filas y las estadísticas de las columnas del catálogo durante la generación del plan lógico.
FALSO
spark.sql.cbo. starSchemaDetection
Especifica si se debe habilitar el reordenamiento de las uniones en función de la detección del esquema en estrella.
FALSO
spark.sql.files. maxPartitionNum
Establece el número máximo objetivo de particiones de archivos divididas para fuentes basadas en archivos (Parquet, JSON y ORC). Cambia la escala de las particiones cuando el recuento inicial supera este valor. Se trata de un objetivo sugerido, no de un límite garantizado.
(ninguno)
spark.sql.files. maxRecordsPerFichero
Establece el número máximo de registros que se van a escribir en un único archivo. No se aplica ningún límite cuando se establece en cero o en un valor negativo.
0
spark.sql.files. minPartitionNum
Establece el número mínimo objetivo de particiones de archivos divididas para las fuentes basadas en archivos (Parquet, JSON y ORC). El valor predeterminado es spark.sql. leafNodeDefaultParalelismo. Se trata de un objetivo sugerido, no de un límite garantizado.
(ninguno)
spark.sql. inMemoryColumnarAlmacenamiento. Tamaño del lote
Controla el tamaño del lote para el almacenamiento en caché en columnas. El aumento del tamaño mejora la utilización y la compresión de la memoria, pero aumenta el riesgo de out-of-memory errores.
10000
spark.sql. inMemoryColumnarAlmacenamiento. Comprimido
Especifica si se seleccionan automáticamente los códecs de compresión para las columnas en función de las estadísticas de datos.
TRUE
spark.sql. inMemoryColumnarAlmacenamiento. enableVectorizedReader
Especifica si se debe habilitar la lectura vectorizada para el almacenamiento en caché de columnas.
TRUE
spark.sql.legacy. allowHashOnMapType
Especifica si se permiten operaciones de hash en estructuras de datos de tipo mapa. Esta configuración antigua mantiene la compatibilidad con el manejo de tipos de mapas de las versiones anteriores de Spark.
spark.sql.legacy. allowNegativeScaleOfDecimal
Especifica si se permiten valores de escala negativos en las definiciones de tipo decimal. Esta configuración antigua mantiene la compatibilidad con las versiones anteriores de Spark que admitían escalas decimales negativas.
spark.sql.legacy. castComplexTypesToString.habilitado
Especifica si se debe habilitar el comportamiento heredado para convertir tipos complejos en cadenas. Mantiene la compatibilidad con las reglas de conversión de tipos de las versiones anteriores de Spark.
spark.sql.legacy. charVarcharAsCadena
Especifica si se deben tratar los tipos CHAR y VARCHAR como tipos STRING. Esta configuración antigua proporciona compatibilidad con el manejo de tipos de cadenas de las versiones anteriores de Spark.
spark.sql.legacy. createEmptyCollectionUsingStringType
Especifica si se van a crear colecciones vacías mediante elementos de tipo cadena. Esta configuración antigua mantiene la compatibilidad con el comportamiento de inicialización de colecciones de versiones anteriores de Spark.
spark.sql.legacy. exponentLiteralAsDecimal. Habilitado
Especifica si se deben interpretar los literales exponenciales como tipos decimales. Esta configuración antigua mantiene la compatibilidad con el manejo literal numérico de las versiones anteriores de Spark.
spark.sql.legacy.json. allowEmptyString.habilitado
Especifica si se permiten cadenas vacías en el procesamiento de JSON. Esta configuración antigua mantiene la compatibilidad con el comportamiento de análisis de JSON de las versiones anteriores de Spark.
spark.sql.legacy.parquet.int96 RebaseModelRead
Especifica si se debe utilizar el modo de rebase de marcas de tiempo heredado al leer los archivos de Parquet. INT96 Esta configuración antigua mantiene la compatibilidad con el manejo de las marcas de tiempo de las versiones anteriores de Spark.
spark.sql.legacy. timeParserPolicy
Controla el comportamiento del análisis temporal para garantizar la compatibilidad con versiones anteriores. Esta configuración antigua determina cómo se analizan las marcas de tiempo y las fechas a partir de las cadenas.
spark.sql.legacy.TypeCoercion. datetimeToString.habilitado
Especifica si se debe habilitar el comportamiento de coerción de tipo heredado al convertir valores de fecha y hora en cadenas. Mantiene la compatibilidad con las reglas de conversión de fecha y hora de las versiones anteriores de Spark.
spark.sql. maxSinglePartitionBytes
Establece el tamaño máximo de la partición en bytes. El planificador introduce operaciones de mezcla para particiones más grandes a fin de mejorar el paralelismo.
128 m
Caché de metadatos Spark.sql. TTLSeconds
Controla el time-to-live (TTL) de las cachés de metadatos. Se aplica a los metadatos de los archivos de partición y a las cachés del catálogo de sesiones. Requiere:
-
Un valor positivo superior a cero
-
Spark.sql.CatalogImplementation está configurado como hive
-
spark.sql.hive. filesourcePartitionFileCacheSize mayor que cero
-
spark.sql.hive. manageFilesourcePartitions establecido en verdadero
-1000 ms
spark.sql.optimizer. collapseProjectAlwaysEn línea
Especifica si se deben contraer las proyecciones adyacentes y las expresiones en línea, incluso cuando esto provoque duplicación.
FALSO
spark.sql.optimizer. dynamicPartitionPruning.habilitado
Especifica si se van a generar predicados para las columnas de partición utilizadas como claves de unión.
TRUE
spark.sql.optimizer. enableCsvExpressionOptimización
Especifica si se deben optimizar las expresiones CSV en el optimizador de SQL eliminando las columnas innecesarias de las operaciones from_csv.
TRUE
spark.sql.optimizer. enableJsonExpressionOptimización
Especifica si se deben optimizar las expresiones JSON en el optimizador de SQL de la siguiente manera:
-
Eliminar columnas innecesarias de las operaciones de from_json
-
Simplificar las combinaciones from_json y to_json
-
Optimización de las operaciones de named_struct
TRUE
Spark.sql.Optimizer.ExcludedRules
Define las reglas del optimizador que se van a deshabilitar, identificadas mediante nombres de reglas separados por comas. Algunas reglas no se pueden deshabilitar porque son necesarias para que sean correctas. El optimizador registra qué reglas se han desactivado correctamente.
(ninguno)
spark.sql.Optimizer.Runtime.BloomFilter. applicationSideScanSizeThreshold
Establece el tamaño mínimo de escaneo agregado en bytes necesario para inyectar un filtro Bloom en el lado de la aplicación.
10 GB
Filtro Spark.Sql.Optimizer.Runtime.BloomFilter. creationSideThreshold
Define el umbral de tamaño máximo para inyectar un filtro Bloom en el lado de la creación.
10 MB
Spark.Sql.Optimizer.Runtime.BloomFilter.Enabled
Especifica si se debe insertar un filtro Bloom para reducir los datos de mezcla aleatoria cuando un lado de una unión aleatoria tiene un predicado selectivo.
TRUE
spark.sql.optimizer.Runtime.BloomFilter. expectedNumItems
Define el número predeterminado de elementos esperados en el filtro Bloom en tiempo de ejecución.
1000000
spark.sql.Optimizer.Runtime.BloomFilter. maxNumBits
Establece el número máximo de bits permitido en el filtro Bloom en tiempo de ejecución.
67108864
Spark.Sql.Optimizer.Runtime.BloomFilter. maxNumItems
Establece el número máximo de elementos esperados permitidos en el filtro Bloom en tiempo de ejecución.
4000000
spark.sql.Optimizer.Runtime.BloomFilter.Number.Threshold
Limita el número máximo de filtros de tiempo de ejecución distintos de DPP permitidos por consulta para evitar errores en el controlador. out-of-memory
10
spark.sql.Optimizer.Runtime.BloomFilter.NumBits
Define el número predeterminado de bits que se utilizan en el filtro Bloom en tiempo de ejecución.
8388608
spark.sql.optimizer.runtime. rowlevelOperationGroupFiltro. Habilitado
Especifica si se debe habilitar el filtrado de grupos en tiempo de ejecución para las operaciones a nivel de fila. Permite que las fuentes de datos:
-
Elimine grupos completos de datos (como archivos o particiones) mediante filtros de fuentes de datos
-
Ejecute consultas en tiempo de ejecución para identificar los registros coincidentes
-
Deseche los grupos innecesarios para evitar costosas reescrituras
Limitaciones:
-
No todas las expresiones se pueden convertir en filtros de fuentes de datos
-
Algunas expresiones requieren la evaluación de Spark (como las subconsultas)
TRUE
spark.sql.Optimizer.RuntimeFilter. semiJoinReduction.habilitado
Especifica si se debe insertar una semiunión para reducir los datos de mezcla aleatoria cuando un lado de una unión aleatoria tiene un predicado selectivo.
FALSO
spark.sql.parquet.aggregatePushdown
Especifica si se deben enviar los agregados a Parquet para su optimización. Soporta:
-
MIN y MAX para los tipos booleano, entero, flotante y de fecha
-
COUNT para todos los tipos de datos
Lanza una excepción si faltan estadísticas en el pie de página de un archivo de Parquet.
FALSO
spark.sql.parquet. columnarReaderBatchTamaño
Controla el número de filas de cada lote de lectores vectorizados Parquet. Elija un valor que equilibre la sobrecarga de rendimiento y el uso de memoria para evitar out-of-memory errores.
4096
Spark.sql.Session.TimeZone
Define la zona horaria de la sesión para gestionar las marcas de tiempo en los literales de cadena y la conversión de objetos Java. Acepta:
-
area/city Formato basado IDs en regiones (como America/Los_Angeles)
-
Desplazamientos de zona en formato (+/-) HH, (+/-) HH:mm o (+/-) HH:mm:ss (como -08 o + 01:00)
-
UTC o Z como alias para + 00:00
(valor de la zona horaria local)
spark.sql.shuffle.partition
Establece el número predeterminado de particiones para la mezcla de datos durante las uniones o agregaciones. No se puede modificar entre reinicios de consultas de streaming estructuradas desde la misma ubicación del punto de control.
200
spark.sql. shuffledHashJoinFactor
Define el factor de multiplicación utilizado para determinar la idoneidad de las combinaciones de hash aleatorio. Se selecciona una combinación de hash aleatorio cuando el tamaño de los datos del lado pequeño multiplicado por este factor es menor que el tamaño de los datos del lado grande.
3
spark.sql.sources. parallelPartitionDiscovery.umbral
Establece el número máximo de rutas para la lista de archivos del lado del conductor con fuentes basadas en archivos (Parquet, JSON y ORC). Si se supera durante la detección de particiones, los archivos se enumeran mediante un trabajo distribuido de Spark independiente.
32
spark.sql.statistics.histogram.enabled
Especifica si se deben generar histogramas con la misma altura durante el cálculo de las estadísticas de columnas para mejorar la precisión de la estimación. Requiere un escaneo de tabla adicional al necesario para las estadísticas de columnas básicas.
FALSO
-
-
En Retención de datos en días, introduzca el número de días que desea conservar los datos.
-
En el formato de resultado, elija CSV o Parquet como formato de datos que debe usar el canal de entrada ML.
-
-
Para acceder al servicio, elija el nombre del rol de servicio existente que se usará para acceder a esta tabla o elija Crear y usar un nuevo rol de servicio.
-
Para el cifrado, elija el secreto de cifrado con una clave de KMS personalizada para especificar su propia clave de KMS y la información relacionada. De lo contrario, Clean Rooms ML gestionará el cifrado.
-
Elija Crear canal de entrada ML.
La creación del canal de entrada ML tardará unos minutos. Puede ver una lista de canales de entrada ML en la pestaña de modelos ML.
nota
Una vez creado el canal de entrada ML, no podrá editarlo.
-
- API
-
Para crear un canal de entrada (API) de ML
Ejecute el siguiente código con sus parámetros específicos:
import boto3 acr_client = boto3.client('cleanroomsml') acr_client.create_ml_input_channel( name="ml_input_channel_name", membershipIdentifier='membership_id', configuredModelAlgorithmAssociations=[configured_model_algorithm_association_arn], retentionInDays=1, inputChannel={ "dataSource": { "protectedQueryInputParameters": { "sqlParameters": { "queryString": "select * fromtable", "computeConfiguration": { "worker": { "type": "CR.1X", "number":16, "properties": { "spark": { "spark configuration key": "spark configuration value", } } } }, "resultFormat": "PARQUET" } } }, "roleArn": "arn:aws:iam::111122223333:role/role_name" } ) channel_arn = resp['ML Input Channel ARN']