Creating an ML input channel in AWS Clean Rooms ML - AWS Clean Rooms

Creating an ML input channel in AWS Clean Rooms ML

Prerequisites:

  • An AWS account with access to AWS Clean Rooms

  • A collaboration set up in AWS Clean Rooms where you want to create the ML input channel

  • Permissions to query data and create ML input channels in the collaboration.

  • (Optional) An existing model algorithm to associate with the ML input channel, or permissions to create a new one

  • (Optional) Tables with analysis rules that can be run for your specified model.

  • (Optional) An existing SQL query or analysis template to use for generating the dataset

  • (Optional) An existing service role with appropriate permissions, or permissions to create a new service role

  • (Optional) A custom AWS KMS key if you want to use your own encryption key

  • Appropriate permissions to create and manage ML models in the collaboration

An ML input channel is a dataset that is created from a specific data query. Members with the ability to query data can prepare their data for training and inference by creating an ML input channel. Creating an ML input channel allows that data to be used in different training models within the same collaboration. You should create separate ML input channels for training and inference.

To create an ML input channel, you must specify the SQL query that is used to query the input data and create the ML input channel. The results of this query are never shared with any member and remain within the boundaries of Clean Rooms ML. The reference Amazon Resource Name (ARN) is used in the next steps to train a model or run inference.

Console
To create an ML input channel (console)
  1. Sign in to the AWS Management Console and open the AWS Clean Rooms console at https://console.aws.amazon.com/cleanrooms.

  2. In the left navigation pane, choose Collaborations.

  3. On the Collaborations page, choose the collaboration where you want to create an ML input channel.

  4. After the collaboration opens, choose the ML models tab.

  5. Under Custom ML models, in the ML input channels section, choose Create ML input channel.

  6. On the Create ML input channel page, for ML input channel details, do the following:

    1. For Name, enter a unique name for your channel.

    2. (Optional) For Description, enter a description of your channel.

    3. For Associated model algorithm, select the algorithm to use.

      Choose Associate model algorithm to add a new one.

  7. For Dataset, choose a method to generate the training dataset:

    • Choose SQL query to use the results of a SQL query as the training dataset.

      If you chose SQL query, enter your query in the SQL query field.

      (Optional) To import a query you've used recently, choose Import from recent queries.

    • Choose Analysis template to use the results of an analysis template as the training dataset.

      Warning

      Synthetic data generation protects against inferring individual attributes whether specific individuals are present in the original dataset or learning attributes of those individuals are present. However, it doesn't prevent literal values from the original dataset, including personally identifiable information (PII) from appearing in the synthetic dataset.

      We recommend avoiding values in the input dataset that are associated with only one data subject because these may re-identify a data subject. For example, if only one user lives in a zip code, the presence of that zip code in the synthetic dataset would confirm that user was in the original dataset. Techniques like truncating high precision values or replacing uncommon catalogues with other can be used to mitigate this risk. These transformations can be part of the query used to create the ML input channel.

    1. If no tables are associated, choose Associate table to add tables with an analysis rule that can be run for the specified model.

    2. Choose Worker type to use when creating this data channel. The default worker type is CR.1X. Specify the Number of workers to use. The default worker number is 16. To specify Spark properties:

      1. Expand Spark properties.

      2. Choose Add Spark properties.

      3. On the Spark properties dialog box, choose a Property name from the dropdown list and enter a Value.

      The following table provides a definition for each property.

      For more information about Spark properties, see Spark Properties in the Apache Spark documentation.

      Property Name Description Default Value

      spark.task.maxFailures

      Controls how many consecutive times a task can fail before the job fails. Requires a value greater than or equal to 1. The number of allowed retries equals this value minus 1. The failure count resets if any attempt succeeds. Failures across different tasks don't accumulate toward this limit.

      4

      spark.sql.files.maxPartitionBytes

      Sets the maximum number of bytes to pack into a single partition when reading from file-based sources such as Parquet, JSON, and ORC.

      128MB

      spark.hadoop.fs.s3.maxRetries

      Sets the maximum number of retry attempts for Amazon S3 file operations.

      spark.network.timeout

      Sets the default timeout for all network interactions. Overrides the following timeout settings if they aren't configured:

      • spark.storage.blockManagerHeartbeatTimeoutMs

      • spark.shuffle.io.connectionTimeout

      • spark.rpc.askTimeout

      • spark.rpc.lookupTimeout

      120s

      spark.rdd.compress

      Specifies whether to compress serialized RDD partitions using spark.io.compression.codec. Applies to StorageLevel.MEMORY_ONLY_SER in Java and Scala, or StorageLevel.MEMORY_ONLY in Python. Reduces storage space but requires additional CPU processing time.

      FALSE

      spark.shuffle.spill.compress

      Specifies whether to compress shuffle spill data using spark.io.compression.codec.

      TRUE

      spark.sql.adaptive.advisoryPartitionSizeInBytes

      Sets the target size in bytes for shuffle partitions during adaptive optimization when spark.sql.adaptive.enabled is true. Controls partition size when coalescing small partitions or splitting skewed partitions.

      (value of spark.sql.adaptive.shuffle.targetPostShuffleInputSize)

      spark.sql.adaptive.autoBroadcastJoinThreshold

      Sets the maximum table size in bytes for broadcasting to worker nodes during joins. Applies only in adaptive framework. Uses the same default value as spark.sql.autoBroadcastJoinThreshold. Set to -1 to disable broadcasting.

      (none)

      spark.sql.adaptive.coalescePartitions.enabled

      Specifies whether to coalesce contiguous shuffle partitions based on spark.sql.adaptive.advisoryPartitionSizeInBytes to optimize task size. Requires spark.sql.adaptive.enabled to be true.

      TRUE

      spark.sql.adaptive.coalescePartitions.initialPartitionNum

      Defines the initial number of shuffle partitions before coalescing. Requires both spark.sql.adaptive.enabled and spark.sql.adaptive.coalescePartitions.enabled to be true. Defaults to the value of spark.sql.shuffle.partitions.

      (none)

      spark.sql.adaptive.coalescePartitions.minPartitionSize

      Sets the minimum size for coalesced shuffle partitions to prevent partitions from becoming too small during adaptive optimization.

      1 MB

      spark.sql.adaptive.coalescePartitions.parallelismFirst

      Specifies whether to calculate partition sizes based on cluster parallelism instead of spark.sql.adaptive.advisoryPartitionSizeInBytes during partition coalescing. Generates smaller partition sizes than the configured target size to maximize parallelism. We recommend setting this to false on busy clusters to improve resource utilization by preventing excessive small tasks.

      TRUE

      spark.sql.adaptive.enabled

      Specifies whether to enable adaptive query execution to re-optimize query plans during query execution, based on accurate runtime statistics.

      TRUE

      spark.sql.adaptive.forceOptimizeSkewedJoin

      Specifies whether to force enable OptimizeSkewedJoin even if it introduces extra shuffle.

      FALSE

      spark.sql.adaptive.localShuffleReader.enabled

      Specifies whether to use local shuffle readers when shuffle partitioning isn't required, such as after converting from sort-merge joins to broadcast-hash joins. Requires spark.sql.adaptive.enabled to be true.

      TRUE

      spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

      Sets the maximum partition size in bytes for building local hash maps. Prioritizes shuffled hash joins over sort-merge joins when:

      • This value equals or exceeds spark.sql.adaptive.advisoryPartitionSizeInBytes

      • All partition sizes are within this limit

      Overrides spark.sql.join.preferSortMergeJoin setting.

      0 bytes

      spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled

      Specifies whether to optimize skewed shuffle partitions by splitting them into smaller partitions based on spark.sql.adaptive.advisoryPartitionSizeInBytes. Requires spark.sql.adaptive.enabled to be true.

      TRUE

      spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor

      Defines the size threshold factor for merging partitions during splitting. Partitions smaller than this factor multiplied by spark.sql.adaptive.advisoryPartitionSizeInBytes are merged.

      0.2

      spark.sql.adaptive.skewJoin.enabled

      Specifies whether to handle data skew in shuffled joins by splitting and optionally replicating skewed partitions. Applies to sort-merge and shuffled hash joins. Requires spark.sql.adaptive.enabled to be true.

      TRUE

      spark.sql.adaptive.skewJoin.skewedPartitionFactor

      Determines the size factor that determines partition skew. A partition is skewed when its size exceeds both:

      • This factor multiplied by the median partition size

      • The value of spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

      5

      spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

      Sets the size threshold in bytes for identifying skewed partitions. A partition is skewed when its size exceeds both:

      • This threshold

      • The median partition size multiplied by spark.sql.adaptive.skewJoin.skewedPartitionFactor

      We recommend setting this value larger than spark.sql.adaptive.advisoryPartitionSizeInBytes.

      256MB

      spark.sql.autoBroadcastJoinThreshold

      Sets the maximum table size in bytes for broadcasting to worker nodes during joins. Set to -1 to disable broadcasting.

      10MB

      spark.sql.broadcastTimeout

      Controls the timeout period in seconds for the broadcast operations during broadcast joins.

      300 seconds

      spark.sql.cbo.enabled

      Specifies whether to enable cost-based optimization (CBO) for plan statistics estimation.

      FALSE

      spark.sql.cbo.joinReorder.dp.star.filter

      Specifies whether to apply star-join filter heuristics during cost-based join enumeration.

      FALSE

      spark.sql.cbo.joinReorder.dp.threshold

      Sets the maximum number of joined nodes allowed in the dynamic programming algorithm.

      12

      spark.sql.cbo.joinReorder.enabled

      Specifies whether to enable join reordering in cost-based optimization (CBO).

      FALSE

      spark.sql.cbo.planStats.enabled

      Specifies whether to fetch row counts and column statistics from the catalog during logical plan generation.

      FALSE

      spark.sql.cbo.starSchemaDetection

      Specifies whether to enable join reordering based on star schema detection.

      FALSE

      spark.sql.files.maxPartitionNum

      Sets the target maximum number of split file partitions for file-based sources (Parquet, JSON, and ORC). Rescales partitions when the initial count exceeds this value. This is a suggested target, not a guaranteed limit.

      (none)

      spark.sql.files.maxRecordsPerFile

      Sets the maximum number of records to write to a single file. No limit applies when set to zero or a negative value.

      0

      spark.sql.files.minPartitionNum

      Sets the target minimum number of split file partitions for file-based sources (Parquet, JSON, and ORC). Defaults to spark.sql.leafNodeDefaultParallelism. This is a suggested target, not a guaranteed limit.

      (none)

      spark.sql.inMemoryColumnarStorage.batchSize

      Controls the batch size for columnar caching. Increasing the size improves memory utilization and compression but increases the risk of out-of-memory errors.

      10000

      spark.sql.inMemoryColumnarStorage.compressed

      Specifies whether to automatically select compression codecs for columns based on data statistics.

      TRUE

      spark.sql.inMemoryColumnarStorage.enableVectorizedReader

      Specifies whether to enable vectorized reading for columnar caching.

      TRUE

      spark.sql.legacy.allowHashOnMapType

      Specifies whether to allow hash operations on map type data structures. This legacy setting maintains compatibility with older Spark versions' map type handling.

      spark.sql.legacy.allowNegativeScaleOfDecimal

      Specifies whether to allow negative scale values in decimal type definitions. This legacy setting maintains compatibility with older Spark versions that supported negative decimal scales.

      spark.sql.legacy.castComplexTypesToString.enabled

      Specifies whether to enable legacy behavior for casting complex types to strings. Maintains compatibility with older Spark versions' type conversion rules.

      spark.sql.legacy.charVarcharAsString

      Specifies whether to treat CHAR and VARCHAR types as STRING types. This legacy setting provides compatibility with older Spark versions' string type handling.

      spark.sql.legacy.createEmptyCollectionUsingStringType

      Specifies whether to create empty collections using string type elements. This legacy setting maintains compatibility with older Spark versions' collection initialization behavior.

      spark.sql.legacy.exponentLiteralAsDecimal.enabled

      Specifies whether to interpret exponential literals as decimal types. This legacy setting maintains compatibility with older Spark versions' numeric literal handling.

      spark.sql.legacy.json.allowEmptyString.enabled

      Specifies whether to allow empty strings in JSON processing. This legacy setting maintains compatibility with older Spark versions' JSON parsing behavior.

      spark.sql.legacy.parquet.int96RebaseModelRead

      Specifies whether to use legacy INT96 timestamp rebase mode when reading Parquet files. This legacy setting maintains compatibility with older Spark versions' timestamp handling.

      spark.sql.legacy.timeParserPolicy

      Controls the time parsing behavior for backwards compatibility. This legacy setting determines how timestamps and dates are parsed from strings.

      spark.sql.legacy.typeCoercion.datetimeToString.enabled

      Specifies whether to enable legacy type coercion behavior when converting datetime values to strings. Maintains compatibility with older Spark versions' datetime conversion rules.

      spark.sql.maxSinglePartitionBytes

      Sets the maximum partition size in bytes. The planner introduces shuffle operations for larger partitions to improve parallelism.

      128m

      spark.sql.metadataCacheTTLSeconds

      Controls the time-to-live (TTL) for metadata caches. Applies to partition file metadata and session catalog caches. Requires:

      • A positive value greater than zero

      • spark.sql.catalogImplementation set to hive

      • spark.sql.hive.filesourcePartitionFileCacheSize greater than zero

      • spark.sql.hive.manageFilesourcePartitions set to true

      -1000ms

      spark.sql.optimizer.collapseProjectAlwaysInline

      Specifies whether to collapse adjacent projections and inline expressions, even when it causes duplication.

      FALSE

      spark.sql.optimizer.dynamicPartitionPruning.enabled

      Specifies whether to generate predicates for partition columns used as join keys.

      TRUE

      spark.sql.optimizer.enableCsvExpressionOptimization

      Specifies whether to optimize CSV expressions in SQL optimizer by pruning unnecessary columns from from_csv operations.

      TRUE

      spark.sql.optimizer.enableJsonExpressionOptimization

      Specifies whether to optimize JSON expressions in SQL optimizer by:

      • Pruning unnecessary columns from from_json operations

      • Simplifying from_json and to_json combinations

      • Optimizing named_struct operations

      TRUE

      spark.sql.optimizer.excludedRules

      Defines optimizer rules to disable, identified by comma-separated rule names. Some rules cannot be disabled as they are required for correctness. The optimizer logs which rules are successfully disabled.

      (none)

      spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold

      Sets the minimum aggregated scan size in bytes required to inject a Bloom filter on the application side.

      10GB

      spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold

      Defines the maximum size threshold for injecting a Bloom filter on the creation side.

      10MB

      spark.sql.optimizer.runtime.bloomFilter.enabled

      Specifies whether to insert a Bloom filter to reduce shuffle data when one side of a shuffle join has a selective predicate.

      TRUE

      spark.sql.optimizer.runtime.bloomFilter.expectedNumItems

      Defines the default number of expected items in the runtime Bloom filter.

      1000000

      spark.sql.optimizer.runtime.bloomFilter.maxNumBits

      Sets the maximum number of bits allowed in the runtime Bloom filter.

      67108864

      spark.sql.optimizer.runtime.bloomFilter.maxNumItems

      Sets the maximum number of expected items allowed in the runtime Bloom filter.

      4000000

      spark.sql.optimizer.runtime.bloomFilter.number.threshold

      Limits the maximum number of non-DPP runtime filters allowed per query to prevent out-of-memory errors in the driver.

      10

      spark.sql.optimizer.runtime.bloomFilter.numBits

      Defines the default number of bits used in the runtime Bloom filter.

      8388608

      spark.sql.optimizer.runtime.rowlevelOperationGroupFilter.enabled

      Specifies whether to enable runtime group filtering for row-level operations. Allows data sources to:

      • Prune entire groups of data (such as files or partitions) using data source filters

      • Execute runtime queries to identify matching records

      • Discard unnecessary groups to avoid expensive rewrites

      Limitations:

      • Not all expressions can convert to data source filters

      • Some expressions require Spark evaluation (such as subqueries)

      TRUE

      spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled

      Specifies whether to insert a semi-join to reduce shuffle data when one side of a shuffle join has a selective predicate.

      FALSE

      spark.sql.parquet.aggregatePushdown

      Specifies whether to push down aggregates to Parquet for optimization. Supports:

      • MIN and MAX for boolean, integer, float, and date types

      • COUNT for all data types

      Throws an exception if statistics are missing from any Parquet file footer.

      FALSE

      spark.sql.parquet.columnarReaderBatchSize

      Controls the number of rows in each Parquet vectorized reader batch. Choose a value that balances performance overhead and memory usage to prevent out-of-memory errors.

      4096

      spark.sql.session.timeZone

      Defines the session time zone for handling timestamps in string literals and Java object conversion. Accepts:

      • Region-based IDs in area/city format (such as America/Los_Angeles)

      • Zone offsets in (+/-)HH, (+/-)HH:mm, or (+/-)HH:mm:ss format (such as -08 or +01:00)

      • UTC or Z as aliases for +00:00

      (value of local timezone)

      spark.sql.shuffle.partitions

      Sets the default number of partitions for data shuffling during joins or aggregations. Cannot be modified between structured streaming query restarts from the same checkpoint location.

      200

      spark.sql.shuffledHashJoinFactor

      Defines the multiplication factor used to determine shuffle hash join eligibility. A shuffle hash join is selected when the small-side data size multiplied by this factor is less than the large-side data size.

      3

      spark.sql.sources.parallelPartitionDiscovery.threshold

      Sets the maximum number of paths for driver-side file listing with file-based sources (Parquet, JSON, and ORC). When exceeded during partition discovery, files are listed using a separate Spark distributed job.

      32

      spark.sql.statistics.histogram.enabled

      Specifies whether to generate equi-height histograms during column statistics computation to improve estimation accuracy. Requires an additional table scan beyond the one needed for basic column statistics.

      FALSE

    3. For Data retention in days, enter the number of days to keep the data.

    4. For Result format, choose either CSV or Parquet as the data format the ML input channel should use.

  8. For Service access, choose the Existing service role name that will be used to access this table or choose Create and use a new service role.

  9. For Encryption, choose the Encrypt secret with a custom KMS key to specify your own KMS key and related information. Otherwise, Clean Rooms ML will manage the encryption.

  10. Choose Create ML input channel.

    It will take a few minutes to create the ML input channel. You can see a list of ML input channels on the ML models tab.

Note

After the ML input channel is created, you can't edit it.

API

To create an ML input channel (API)

Run the following code with your specific parameters:

import boto3 acr_client = boto3.client('cleanroomsml') acr_client.create_ml_input_channel( name="ml_input_channel_name", membershipIdentifier='membership_id', configuredModelAlgorithmAssociations=[configured_model_algorithm_association_arn], retentionInDays=1, inputChannel={ "dataSource": { "protectedQueryInputParameters": { "sqlParameters": { "queryString": "select * from table", "computeConfiguration": { "worker": { "type": "CR.1X", "number": 16, "properties": { "spark": { "spark configuration key": "spark configuration value", } } } }, "resultFormat": "PARQUET" } } }, "roleArn": "arn:aws:iam::111122223333:role/role_name" } ) channel_arn = resp['ML Input Channel ARN']