Creating an ML input channel in AWS Clean Rooms ML
Prerequisites:
-
An AWS account with access to AWS Clean Rooms
-
A collaboration set up in AWS Clean Rooms where you want to create the ML input channel
-
Permissions to query data and create ML input channels in the collaboration.
-
(Optional) An existing model algorithm to associate with the ML input channel, or permissions to create a new one
-
(Optional) Tables with analysis rules that can be run for your specified model.
-
(Optional) An existing SQL query or analysis template to use for generating the dataset
-
(Optional) An existing service role with appropriate permissions, or permissions to create a new service role
-
(Optional) A custom AWS KMS key if you want to use your own encryption key
-
Appropriate permissions to create and manage ML models in the collaboration
An ML input channel is a dataset that is created from a specific data query. Members with the ability to query data can prepare their data for training and inference by creating an ML input channel. Creating an ML input channel allows that data to be used in different training models within the same collaboration. You should create separate ML input channels for training and inference.
To create an ML input channel, you must specify the SQL query that is used to query the input data and create the ML input channel. The results of this query are never shared with any member and remain within the boundaries of Clean Rooms ML. The reference Amazon Resource Name (ARN) is used in the next steps to train a model or run inference.
- Console
-
To create an ML input channel (console)
-
Sign in to the AWS Management Console and open the AWS Clean Rooms console at https://console.aws.amazon.com/cleanrooms
. -
In the left navigation pane, choose Collaborations.
-
On the Collaborations page, choose the collaboration where you want to create an ML input channel.
-
After the collaboration opens, choose the ML models tab.
-
Under Custom ML models, in the ML input channels section, choose Create ML input channel.
-
On the Create ML input channel page, for ML input channel details, do the following:
-
For Name, enter a unique name for your channel.
-
(Optional) For Description, enter a description of your channel.
-
For Associated model algorithm, select the algorithm to use.
Choose Associate model algorithm to add a new one.
-
-
For Dataset, choose a method to generate the training dataset:
-
Choose SQL query to use the results of a SQL query as the training dataset.
If you chose SQL query, enter your query in the SQL query field.
(Optional) To import a query you've used recently, choose Import from recent queries.
-
Choose Analysis template to use the results of an analysis template as the training dataset.
Warning
Synthetic data generation protects against inferring individual attributes whether specific individuals are present in the original dataset or learning attributes of those individuals are present. However, it doesn't prevent literal values from the original dataset, including personally identifiable information (PII) from appearing in the synthetic dataset.
We recommend avoiding values in the input dataset that are associated with only one data subject because these may re-identify a data subject. For example, if only one user lives in a zip code, the presence of that zip code in the synthetic dataset would confirm that user was in the original dataset. Techniques like truncating high precision values or replacing uncommon catalogues with other can be used to mitigate this risk. These transformations can be part of the query used to create the ML input channel.
-
If no tables are associated, choose Associate table to add tables with an analysis rule that can be run for the specified model.
-
Choose Worker type to use when creating this data channel. The default worker type is CR.1X. Specify the Number of workers to use. The default worker number is 16. To specify Spark properties:
-
Expand Spark properties.
-
Choose Add Spark properties.
-
On the Spark properties dialog box, choose a Property name from the dropdown list and enter a Value.
The following table provides a definition for each property.
For more information about Spark properties, see Spark Properties
in the Apache Spark documentation. Property Name Description Default Value spark.task.maxFailures
Controls how many consecutive times a task can fail before the job fails. Requires a value greater than or equal to 1. The number of allowed retries equals this value minus 1. The failure count resets if any attempt succeeds. Failures across different tasks don't accumulate toward this limit.
4
spark.sql.files.maxPartitionBytes
Sets the maximum number of bytes to pack into a single partition when reading from file-based sources such as Parquet, JSON, and ORC.
128MB
spark.hadoop.fs.s3.maxRetries
Sets the maximum number of retry attempts for Amazon S3 file operations.
spark.network.timeout
Sets the default timeout for all network interactions. Overrides the following timeout settings if they aren't configured:
-
spark.storage.blockManagerHeartbeatTimeoutMs
-
spark.shuffle.io.connectionTimeout
-
spark.rpc.askTimeout
-
spark.rpc.lookupTimeout
120s
spark.rdd.compress
Specifies whether to compress serialized RDD partitions using spark.io.compression.codec. Applies to StorageLevel.MEMORY_ONLY_SER in Java and Scala, or StorageLevel.MEMORY_ONLY in Python. Reduces storage space but requires additional CPU processing time.
FALSE
spark.shuffle.spill.compress
Specifies whether to compress shuffle spill data using spark.io.compression.codec.
TRUE
spark.sql.adaptive.advisoryPartitionSizeInBytes
Sets the target size in bytes for shuffle partitions during adaptive optimization when spark.sql.adaptive.enabled is true. Controls partition size when coalescing small partitions or splitting skewed partitions.
(value of spark.sql.adaptive.shuffle.targetPostShuffleInputSize)
spark.sql.adaptive.autoBroadcastJoinThreshold
Sets the maximum table size in bytes for broadcasting to worker nodes during joins. Applies only in adaptive framework. Uses the same default value as spark.sql.autoBroadcastJoinThreshold. Set to -1 to disable broadcasting.
(none)
spark.sql.adaptive.coalescePartitions.enabled
Specifies whether to coalesce contiguous shuffle partitions based on spark.sql.adaptive.advisoryPartitionSizeInBytes to optimize task size. Requires spark.sql.adaptive.enabled to be true.
TRUE
spark.sql.adaptive.coalescePartitions.initialPartitionNum
Defines the initial number of shuffle partitions before coalescing. Requires both spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled to be true. Defaults to the value of spark.sql.shuffle.partitions.
(none)
spark.sql.adaptive.coalescePartitions.minPartitionSize
Sets the minimum size for coalesced shuffle partitions to prevent partitions from becoming too small during adaptive optimization.
1 MB
spark.sql.adaptive.coalescePartitions.parallelismFirst
Specifies whether to calculate partition sizes based on cluster parallelism instead of spark.sql.adaptive.advisoryPartitionSizeInBytes during partition coalescing. Generates smaller partition sizes than the configured target size to maximize parallelism. We recommend setting this to false on busy clusters to improve resource utilization by preventing excessive small tasks.
TRUE
spark.sql.adaptive.enabled
Specifies whether to enable adaptive query execution to re-optimize query plans during query execution, based on accurate runtime statistics.
TRUE
spark.sql.adaptive.forceOptimizeSkewedJoin
Specifies whether to force enable OptimizeSkewedJoin even if it introduces extra shuffle.
FALSE
spark.sql.adaptive.localShuffleReader.enabled
Specifies whether to use local shuffle readers when shuffle partitioning isn't required, such as after converting from sort-merge joins to broadcast-hash joins. Requires spark.sql.adaptive.enabled to be true.
TRUE
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
Sets the maximum partition size in bytes for building local hash maps. Prioritizes shuffled hash joins over sort-merge joins when:
-
This value equals or exceeds spark.sql.adaptive.advisoryPartitionSizeInBytes
-
All partition sizes are within this limit
Overrides spark.sql.join.preferSortMergeJoin setting.
0 bytes
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
Specifies whether to optimize skewed shuffle partitions by splitting them into smaller partitions based on spark.sql.adaptive.advisoryPartitionSizeInBytes. Requires spark.sql.adaptive.enabled to be true.
TRUE
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor
Defines the size threshold factor for merging partitions during splitting. Partitions smaller than this factor multiplied by spark.sql.adaptive.advisoryPartitionSizeInBytes are merged.
0.2
spark.sql.adaptive.skewJoin.enabled
Specifies whether to handle data skew in shuffled joins by splitting and optionally replicating skewed partitions. Applies to sort-merge and shuffled hash joins. Requires spark.sql.adaptive.enabled to be true.
TRUE
spark.sql.adaptive.skewJoin.skewedPartitionFactor
Determines the size factor that determines partition skew. A partition is skewed when its size exceeds both:
-
This factor multiplied by the median partition size
-
The value of spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
Sets the size threshold in bytes for identifying skewed partitions. A partition is skewed when its size exceeds both:
-
This threshold
-
The median partition size multiplied by spark.sql.adaptive.skewJoin.skewedPartitionFactor
We recommend setting this value larger than spark.sql.adaptive.advisoryPartitionSizeInBytes.
256MB
spark.sql.autoBroadcastJoinThreshold
Sets the maximum table size in bytes for broadcasting to worker nodes during joins. Set to -1 to disable broadcasting.
10MB
spark.sql.broadcastTimeout
Controls the timeout period in seconds for the broadcast operations during broadcast joins.
300 seconds
spark.sql.cbo.enabled
Specifies whether to enable cost-based optimization (CBO) for plan statistics estimation.
FALSE
spark.sql.cbo.joinReorder.dp.star.filter
Specifies whether to apply star-join filter heuristics during cost-based join enumeration.
FALSE
spark.sql.cbo.joinReorder.dp.threshold
Sets the maximum number of joined nodes allowed in the dynamic programming algorithm.
12
spark.sql.cbo.joinReorder.enabled
Specifies whether to enable join reordering in cost-based optimization (CBO).
FALSE
spark.sql.cbo.planStats.enabled
Specifies whether to fetch row counts and column statistics from the catalog during logical plan generation.
FALSE
spark.sql.cbo.starSchemaDetection
Specifies whether to enable join reordering based on star schema detection.
FALSE
spark.sql.files.maxPartitionNum
Sets the target maximum number of split file partitions for file-based sources (Parquet, JSON, and ORC). Rescales partitions when the initial count exceeds this value. This is a suggested target, not a guaranteed limit.
(none)
spark.sql.files.maxRecordsPerFile
Sets the maximum number of records to write to a single file. No limit applies when set to zero or a negative value.
0
spark.sql.files.minPartitionNum
Sets the target minimum number of split file partitions for file-based sources (Parquet, JSON, and ORC). Defaults to spark.sql.leafNodeDefaultParallelism. This is a suggested target, not a guaranteed limit.
(none)
spark.sql.inMemoryColumnarStorage.batchSize
Controls the batch size for columnar caching. Increasing the size improves memory utilization and compression but increases the risk of out-of-memory errors.
10000
spark.sql.inMemoryColumnarStorage.compressed
Specifies whether to automatically select compression codecs for columns based on data statistics.
TRUE
spark.sql.inMemoryColumnarStorage.enableVectorizedReader
Specifies whether to enable vectorized reading for columnar caching.
TRUE
spark.sql.legacy.allowHashOnMapType
Specifies whether to allow hash operations on map type data structures. This legacy setting maintains compatibility with older Spark versions' map type handling.
spark.sql.legacy.allowNegativeScaleOfDecimal
Specifies whether to allow negative scale values in decimal type definitions. This legacy setting maintains compatibility with older Spark versions that supported negative decimal scales.
spark.sql.legacy.castComplexTypesToString.enabled
Specifies whether to enable legacy behavior for casting complex types to strings. Maintains compatibility with older Spark versions' type conversion rules.
spark.sql.legacy.charVarcharAsString
Specifies whether to treat CHAR and VARCHAR types as STRING types. This legacy setting provides compatibility with older Spark versions' string type handling.
spark.sql.legacy.createEmptyCollectionUsingStringType
Specifies whether to create empty collections using string type elements. This legacy setting maintains compatibility with older Spark versions' collection initialization behavior.
spark.sql.legacy.exponentLiteralAsDecimal.enabled
Specifies whether to interpret exponential literals as decimal types. This legacy setting maintains compatibility with older Spark versions' numeric literal handling.
spark.sql.legacy.json.allowEmptyString.enabled
Specifies whether to allow empty strings in JSON processing. This legacy setting maintains compatibility with older Spark versions' JSON parsing behavior.
spark.sql.legacy.parquet.int96RebaseModelRead
Specifies whether to use legacy INT96 timestamp rebase mode when reading Parquet files. This legacy setting maintains compatibility with older Spark versions' timestamp handling.
spark.sql.legacy.timeParserPolicy
Controls the time parsing behavior for backwards compatibility. This legacy setting determines how timestamps and dates are parsed from strings.
spark.sql.legacy.typeCoercion.datetimeToString.enabled
Specifies whether to enable legacy type coercion behavior when converting datetime values to strings. Maintains compatibility with older Spark versions' datetime conversion rules.
spark.sql.maxSinglePartitionBytes
Sets the maximum partition size in bytes. The planner introduces shuffle operations for larger partitions to improve parallelism.
128m
spark.sql.metadataCacheTTLSeconds
Controls the time-to-live (TTL) for metadata caches. Applies to partition file metadata and session catalog caches. Requires:
-
A positive value greater than zero
-
spark.sql.catalogImplementation set to hive
-
spark.sql.hive.filesourcePartitionFileCacheSize greater than zero
-
spark.sql.hive.manageFilesourcePartitions set to true
-1000ms
spark.sql.optimizer.collapseProjectAlwaysInline
Specifies whether to collapse adjacent projections and inline expressions, even when it causes duplication.
FALSE
spark.sql.optimizer.dynamicPartitionPruning.enabled
Specifies whether to generate predicates for partition columns used as join keys.
TRUE
spark.sql.optimizer.enableCsvExpressionOptimization
Specifies whether to optimize CSV expressions in SQL optimizer by pruning unnecessary columns from from_csv operations.
TRUE
spark.sql.optimizer.enableJsonExpressionOptimization
Specifies whether to optimize JSON expressions in SQL optimizer by:
-
Pruning unnecessary columns from from_json operations
-
Simplifying from_json and to_json combinations
-
Optimizing named_struct operations
TRUE
spark.sql.optimizer.excludedRules
Defines optimizer rules to disable, identified by comma-separated rule names. Some rules cannot be disabled as they are required for correctness. The optimizer logs which rules are successfully disabled.
(none)
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold
Sets the minimum aggregated scan size in bytes required to inject a Bloom filter on the application side.
10GB
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold
Defines the maximum size threshold for injecting a Bloom filter on the creation side.
10MB
spark.sql.optimizer.runtime.bloomFilter.enabled
Specifies whether to insert a Bloom filter to reduce shuffle data when one side of a shuffle join has a selective predicate.
TRUE
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems
Defines the default number of expected items in the runtime Bloom filter.
1000000
spark.sql.optimizer.runtime.bloomFilter.maxNumBits
Sets the maximum number of bits allowed in the runtime Bloom filter.
67108864
spark.sql.optimizer.runtime.bloomFilter.maxNumItems
Sets the maximum number of expected items allowed in the runtime Bloom filter.
4000000
spark.sql.optimizer.runtime.bloomFilter.number.threshold
Limits the maximum number of non-DPP runtime filters allowed per query to prevent out-of-memory errors in the driver.
10
spark.sql.optimizer.runtime.bloomFilter.numBits
Defines the default number of bits used in the runtime Bloom filter.
8388608
spark.sql.optimizer.runtime.rowlevelOperationGroupFilter.enabled
Specifies whether to enable runtime group filtering for row-level operations. Allows data sources to:
-
Prune entire groups of data (such as files or partitions) using data source filters
-
Execute runtime queries to identify matching records
-
Discard unnecessary groups to avoid expensive rewrites
Limitations:
-
Not all expressions can convert to data source filters
-
Some expressions require Spark evaluation (such as subqueries)
TRUE
spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled
Specifies whether to insert a semi-join to reduce shuffle data when one side of a shuffle join has a selective predicate.
FALSE
spark.sql.parquet.aggregatePushdown
Specifies whether to push down aggregates to Parquet for optimization. Supports:
-
MIN and MAX for boolean, integer, float, and date types
-
COUNT for all data types
Throws an exception if statistics are missing from any Parquet file footer.
FALSE
spark.sql.parquet.columnarReaderBatchSize
Controls the number of rows in each Parquet vectorized reader batch. Choose a value that balances performance overhead and memory usage to prevent out-of-memory errors.
4096
spark.sql.session.timeZone
Defines the session time zone for handling timestamps in string literals and Java object conversion. Accepts:
-
Region-based IDs in area/city format (such as America/Los_Angeles)
-
Zone offsets in (+/-)HH, (+/-)HH:mm, or (+/-)HH:mm:ss format (such as -08 or +01:00)
-
UTC or Z as aliases for +00:00
(value of local timezone)
spark.sql.shuffle.partitions
Sets the default number of partitions for data shuffling during joins or aggregations. Cannot be modified between structured streaming query restarts from the same checkpoint location.
200
spark.sql.shuffledHashJoinFactor
Defines the multiplication factor used to determine shuffle hash join eligibility. A shuffle hash join is selected when the small-side data size multiplied by this factor is less than the large-side data size.
3
spark.sql.sources.parallelPartitionDiscovery.threshold
Sets the maximum number of paths for driver-side file listing with file-based sources (Parquet, JSON, and ORC). When exceeded during partition discovery, files are listed using a separate Spark distributed job.
32
spark.sql.statistics.histogram.enabled
Specifies whether to generate equi-height histograms during column statistics computation to improve estimation accuracy. Requires an additional table scan beyond the one needed for basic column statistics.
FALSE
-
-
For Data retention in days, enter the number of days to keep the data.
-
For Result format, choose either CSV or Parquet as the data format the ML input channel should use.
-
-
For Service access, choose the Existing service role name that will be used to access this table or choose Create and use a new service role.
-
For Encryption, choose the Encrypt secret with a custom KMS key to specify your own KMS key and related information. Otherwise, Clean Rooms ML will manage the encryption.
-
Choose Create ML input channel.
It will take a few minutes to create the ML input channel. You can see a list of ML input channels on the ML models tab.
Note
After the ML input channel is created, you can't edit it.
-
- API
-
To create an ML input channel (API)
Run the following code with your specific parameters:
import boto3 acr_client = boto3.client('cleanroomsml') acr_client.create_ml_input_channel( name="ml_input_channel_name", membershipIdentifier='membership_id', configuredModelAlgorithmAssociations=[configured_model_algorithm_association_arn], retentionInDays=1, inputChannel={ "dataSource": { "protectedQueryInputParameters": { "sqlParameters": { "queryString": "select * fromtable", "computeConfiguration": { "worker": { "type": "CR.1X", "number":16, "properties": { "spark": { "spark configuration key": "spark configuration value", } } } }, "resultFormat": "PARQUET" } } }, "roleArn": "arn:aws:iam::111122223333:role/role_name" } ) channel_arn = resp['ML Input Channel ARN']