在 AWS Clean Rooms ML 中创建机器学习输入通道 - AWS Clean Rooms

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 AWS Clean Rooms ML 中创建机器学习输入通道

先决条件:

  • 可以 AWS 账户 访问的 AWS Clean Rooms

  • 您要在 AWS Clean Rooms 其中创建 ML 输入通道的协作设置

  • 在协作中查询数据和创建机器学习输入通道的权限。

  • (可选)用于与 ML 输入通道关联的现有模型算法,或创建新模型的权限

  • (可选)包含可以针对您的指定模型运行的分析规则的表。

  • (可选)用于生成数据集的现有 SQL 查询或分析模板

  • (可选)具有相应权限的现有服务角色,或创建新服务角色的权限

  • (可选)如果您想使用自己的加密 AWS KMS 密钥,请使用自定义密钥

  • 在协作中创建和管理机器学习模型的适当权限

机器学习输入通道是根据特定数据查询创建的数据集。能够查询数据的成员可以通过创建 ML 输入通道为训练和推理做好数据准备。创建 ML 输入通道允许在同一个协作中将这些数据用于不同的训练模型。您应该为训练和推理创建单独的 ML 输入通道。

要创建 ML 输入通道,必须指定用于查询输入数据和创建 ML 输入通道的 SQL 查询。此查询的结果永远不会与任何成员共享,并且保持在 Clean Rooms ML 的范围内。在接下来的步骤中,将使用引用 Amazon 资源名称 (ARN) 来训练模型或运行推理。

Console
创建 ML 输入频道(控制台)
  1. 登录 AWS 管理控制台 并在 https://console.aws.amazon.com/clean room AWS Clean Rooms s 上打开控制台。

  2. 在左侧导航窗格中,选择协作

  3. 协作页面上,选择要在其中创建机器学习输入渠道的协作。

  4. 协作打开后,选择机器学习模型选项卡。

  5. 在 “自定义 ML 模型” 下的 “机器学习输入通道” 部分中,选择 “创建 ML 输入通道”。

  6. 在 “创建 ML 输入频道” 页面上,要获取 ML 输入频道的详细信息,请执行以下操作:

    1. 在 “名称” 中,输入频道的唯一名称。

    2. (可选)在描述中,输入您的频道的描述。

    3. 在 “关联模型算法” 中,选择要使用的算法。

      选择 “关联模型算法” 以添加新算法。

  7. 对于数据集,选择一种生成训练数据集的方法:

    • 选择 SQL 查询可将 SQL 查询的结果用作训练数据集。

      如果您选择了 SQL 查询,请在 SQL 查询字段中输入您的查询。

      (可选)要导入您最近使用的查询,请选择从最近的查询中导入

    • 选择分析模板以使用分析模板的结果作为训练数据集。

      警告

      合成数据生成可以防止推断出个人属性,无论特定个体存在于原始数据集中,还是存在这些个体的学习属性。但是,它并不能阻止原始数据集中的文字值,包括个人身份信息 (PII) 出现在合成数据集中。

      我们建议避免输入数据集中仅与一个数据主体关联的值,因为这些值可能会重新识别数据主体。例如,如果只有一个用户居住在邮政编码中,则合成数据集中存在该邮政编码将确认该用户位于原始数据集中。诸如截断高精度值或用其他目录替换不常见的目录之类的技术可以用来降低这种风险。这些转换可以是用于创建 ML 输入通道的查询的一部分。

    1. 如果没有关联表,请选择 “关联表” 以添加具有可针对指定模型运行的分析规则的表。

    2. 选择创建此数据通道时要使用的工作器类型。默认的工作器类型为 CR.1X。指定要使用的员工人数。默认工作人员编号为 16。要指定火花属性,请执行以下操作:

      1. 展开 Spark 属性

      2. 选择 “添加 Spark 属性”。

      3. Spark 属性对话框中,从下拉列表中选择属性名称并输入

      下表提供了每个属性的定义。

      有关 Spark 属性的更多信息,请参阅 Apache Spark 文档中的 Spark 属性

      属性名称 说明 默认值

      spark.task.maxFa

      控制任务在失败之前可以连续失败多少次。需要一个大于或等于 1 的值。允许的重试次数等于该值减去 1。如果任何尝试成功,则失败计数将重置。不同任务的失败不会累积到这个极限。

      4

      spark.sql.files。 maxPartitionBytes

      设置从基于文件的源(例如 Parquet、JSON 和 ORC)读取数据时要打包到单个分区的最大字节数。

      128MB

      spark.hadoop.fs.s3.maxRetries

      设置 Amazon S3 文件操作的最大重试次数。

      spark.network.

      设置所有网络交互的默认超时时间。如果未配置,则覆盖以下超时设置:

      • Spark.storage。 blockManagerHeartbeatTimeoutMs

      • shuffle.io.connectionTimeout

      • Spark.rpc.askTimeout

      • spark.rpc.lookupTim

      120

      spark.rdd.com

      指定是否使用 spark.io.compression.codec 压缩序列化的 RDD 分区。适用于 Java 和 Scala 中的 StorageLevel .MEMORY_ONLY_SER,或 Python 中的.MEMORY_ONLY。 StorageLevel减少存储空间,但需要额外的 CPU 处理时间。

      FALSE

      Spark.shuffle.spill.compress

      指定是否使用 spark.io.compression.codec 压缩随机播放数据。

      TRUE

      spark.sql.自适应。 advisoryPartitionSizeInBytes

      当 spark.sql.adaptive.enabled 为真时,设置自适应优化期间洗牌分区的目标大小(以字节为单位)。控制合并小分区或拆分倾斜分区时的分区大小。

      (spark.sql.adaptive.shuffle 的值。 targetPostShuffleInputSize)

      spark.sql.自适应。 autoBroadcastJoin阈值

      设置在联接期间向工作节点广播的最大表大小(以字节为单位)。仅适用于自适应框架。使用与 spark.sql 相同的默认值。 autoBroadcastJoin阈值。设置为 -1 可禁用广播。

      (无)

      Spark.sql.adaptive.coalescePartitions.enabled

      指定是否根据 spark.sql.adaptive 合并连续的洗牌分区。 advisoryPartitionSizeInBytes 以优化任务规模。需要 spark.sql.adaptive.enabled 才为真。

      TRUE

      Spark.sql.adaptive.coalescePartive.coal initialPartitionNum

      定义合并前随机分区的初始数量。需要同时启用 spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalescePartitions.enabled 才能成真。默认为 spark.sql.shuffle.partitions 的值。

      (无)

      Spark.sql.adaptive.coalescePartive.coal minPartitionSize

      设置合并后的随机分区的最小大小,以防止自适应优化期间分区变得太小。

      1 MB

      Spark.sql.adaptive.coalescePartitions.parallelism Firs

      指定是否根据集群并行度而不是 spark.sql.adaptive 来计算分区大小。 advisoryPartitionSizeInBytes 在分区合并期间。生成的分区大小小于配置的目标大小,以最大限度地提高并行度。我们建议在繁忙的群集上将其设置为 false,以通过防止过多的小任务来提高资源利用率。

      TRUE

      sql.adaptive.enabled

      指定是否启用自适应查询执行,以便在查询执行期间根据准确的运行时统计数据重新优化查询计划。

      TRUE

      spark.sql.自适应。 forceOptimizeSkewed加入

      指定是否强制启用, OptimizeSkewedJoin 即使它引入了额外的随机播放。

      FALSE

      spark.sql.自适应。 localShuffleReader. 已启用

      指定在不需要随机分区时(例如从排序合并联接转换为广播哈希联接之后)是否使用本地随机播放读取器。需要 spark.sql.adaptive.enabled 才为真。

      TRUE

      spark.sql.自适应。 maxShuffledHashJoinLocalMapThreshold

      设置用于构建本地哈希映射的最大分区大小(以字节为单位)。在以下情况下,优先考虑洗牌后的哈希联接而不是排序合并联接:

      • 此值等于或超过 spark.sql.adaptive。 advisoryPartitionSizeInBytes

      • 所有分区大小均在此限制范围内

      覆盖 spark.sql.join。 preferSortMerge加入设置。

      0 字节

      spark.sql.自适应。 optimizeSkewsInRebalancePartitions.enabled

      指定是否通过基于 spark.sql.adaptive 将倾斜的随机分区拆分为较小的分区来优化这些分区。 advisoryPartitionSizeInBytes。需要 spark.sql.adaptive.enabled 才为真。

      TRUE

      spark.sql.自适应。 rebalancePartitionsSmallPartitionFactor

      定义拆分期间合并分区的大小阈值系数。小于此因子的分区乘以 spark.sql.adaptive。 advisoryPartitionSizeInBytes 已合并。

      0.2

      Spark.sql.adaptive.skewjoin.enable

      指定是否通过拆分和可选复制倾斜的分区来处理洗牌联接中的数据倾斜。适用于排序合并和洗牌哈希联接。需要 spark.sql.adaptive.enabled 才为真。

      TRUE

      Spark.sql.adaptive.skewJoin。 skewedPartitionFactor

      确定决定分区偏斜的大小系数。当分区的大小超过两个分区时,分区就会出现偏差:

      • 该因子乘以分区大小中位数

      • spark.sql.adaptive.skewJoin 的值。 skewedPartitionThresholdInBytes

      5

      Spark.sql.adaptive.skewJoin。 skewedPartitionThresholdInBytes

      设置用于识别偏斜分区的大小阈值(以字节为单位)。当分区的大小超过两个分区时,分区就会出现偏差:

      • 这个门槛

      • 分区大小中位数乘以 spark.sql.adaptive.skewJoin。 skewedPartitionFactor

      我们建议将此值设置为大于 spark.sql.adaptive。 advisoryPartitionSizeInBytes。

      256MB

      spark.sql。 autoBroadcastJoin阈值

      设置在联接期间向工作节点广播的最大表大小(以字节为单位)。设置为 -1 可禁用广播。

      10MB

      sql.broadcastTimeout

      控制广播加入期间广播操作的超时时间(以秒为单位)。

      300 秒

      spark.sql.cbo.enabled

      指定是否为计划统计数据估算启用基于成本的优化 (CBO)。

      FALSE

      spark.sql.cbo.joinreorder.dp.star.Filter

      指定是否在基于开销的联接枚举期间应用星型联接过滤器启发式算法。

      FALSE

      spark.sql.cbo.joinreorder.dp.Thresh

      设置动态规划算法中允许的最大连接节点数。

      12

      Spark.sql.cbo.joinreorder.enabled

      指定是否在基于成本的优化 (CBO) 中启用联接重新排序。

      FALSE

      Spark.sql.cbo.planstats.enabled

      指定在逻辑计划生成期间是否从目录中提取行数和列统计信息。

      FALSE

      spark.sql.cbo。 starSchemaDetection

      指定是否启用基于星型架构检测的联接重新排序。

      FALSE

      spark.sql.files。 maxPartitionNum

      为基于文件的源(Parquet、JSON 和 ORC)设置拆分文件分区的目标最大数量。当初始计数超过此值时,重新缩放分区。这是建议的目标,而不是保证的上限。

      (无)

      spark.sql.files。 maxRecordsPer文件

      设置写入单个文件的最大记录数。如果设置为零或负值,则不适用任何限制。

      0

      spark.sql.files。 minPartitionNum

      为基于文件的源(Parquet、JSON 和 ORC)设置拆分文件分区的目标最小数量。默认为 spark.sql。 leafNodeDefault并行性。这是建议的目标,而不是保证的上限。

      (无)

      spark.sql。 inMemoryColumnarStorage.batchSize

      控制列式缓存的批次大小。增加大小可以提高内存利用率和压缩率,但会增加 out-of-memory出错的风险。

      10000

      spark.sql。 inMemoryColumnar存储. 已压缩

      指定是否根据数据统计信息自动为列选择压缩编解码器。

      TRUE

      spark.sql。 inMemoryColumnar存储。 enableVectorizedReader

      指定是否为列式缓存启用矢量化读取。

      TRUE

      Spark.sql.legacy。 allowHashOnMapType

      指定是否允许对地图类型数据结构进行哈希操作。此传统设置保持了与旧版 Spark 地图类型处理的兼容性。

      Spark.sql.legacy。 allowNegativeScaleOfDecimal

      指定是否允许在十进制类型定义中使用负比例值。此传统设置保持了与支持负十进制小数位数的旧 Spark 版本的兼容性。

      Spark.sql.legacy。 castComplexTypesToString.enabled

      指定是否启用将复杂类型转换为字符串的传统行为。保持与旧版 Spark 的类型转换规则的兼容性。

      Spark.sql.legacy。 charVarcharAs字符串

      指定是否将 CHAR 和 VARCHAR 类型视为字符串类型。此传统设置提供了与旧版 Spark 的字符串类型处理的兼容性。

      Spark.sql.legacy。 createEmptyCollectionUsingStringType

      指定是否使用字符串类型元素创建空集合。此传统设置保持了与旧版 Spark 的集合初始化行为的兼容性。

      Spark.sql.legacy。 exponentLiteralAsdecimal. 已启用

      指定是否将指数文字解释为十进制类型。此传统设置保持了与旧版 Spark 的数字文字处理的兼容性。

      spark.sql.legacy.json。 allowEmptyString. 已启用

      指定是否允许在 JSON 处理中使用空字符串。此传统设置保持了与旧版 Spark 的 JSON 解析行为的兼容性。

      spark.sql.legacy.parquet.int96 RebaseModelRead

      指定在读取 Parquet 文件时是否使用传统 INT96 的时间戳变基模式。此传统设置保持了与旧版 Spark 的时间戳处理的兼容性。

      Spark.sql.legacy。 timeParserPolicy

      控制时间解析行为以实现向后兼容。此传统设置决定了如何从字符串中解析时间戳和日期。

      Spark.sql.legacy.typeCorcion。 datetimeToString. 已启用

      指定在将日期时间值转换为字符串时是否启用传统类型强制行为。保持与旧版 Spark 版本的日期时间转换规则的兼容性。

      spark.sql。 maxSinglePartition字节

      设置最大分区大小(以字节为单位)。规划器为较大的分区引入了洗牌操作以提高并行度。

      128m

      Spark.sql.metadataCache TTLSeconds

      控制元数据缓存的 time-to-live (TTL)。适用于分区文件元数据和会话目录缓存。需要:

      • 大于零的正值

      • Spark.sql.catalogCatalog实现设置为蜂巢

      • spark.sql.hive。 filesourcePartitionFileCacheSize 大于零

      • spark.sql.hive。 manageFilesourcePartitions 设置为 true

      -1000 毫秒

      sql.optimizer。 collapseProjectAlways内联

      指定是否折叠相邻的投影和行内表达式,即使这会导致重复。

      FALSE

      sql.optimizer。 dynamicPartitionPruning. 已启用

      指定是否为用作联接键的分区列生成谓词。

      TRUE

      sql.optimizer。 enableCsvExpression优化

      指定是否通过从 from_csv 操作中删除不必要的列来优化 SQL 优化器中的 CSV 表达式。

      TRUE

      sql.optimizer。 enableJsonExpression优化

      通过以下方式指定是否优化 SQL 优化器中的 JSON 表达式:

      • 从 from_json 操作中删除不必要的列

      • 简化 from_json 和 to_json 的组合

      • 优化 named_struct 操作

      TRUE

      spark.sql.Optimizer.ExcludedR

      定义要禁用的优化器规则,由逗号分隔的规则名称标识。某些规则无法禁用,因为它们是正确性所必需的。优化器会记录哪些规则已成功禁用。

      (无)

      spark.sql.Optimizer.runtime.bloomFil applicationSideScanSizeThreshold

      设置在应用程序端注入 Bloom 过滤器所需的最小聚合扫描大小(以字节为单位)。

      10GB

      spark.sql.Optimizer.runtime.bloomFil creationSideThreshold

      定义在创建端注入 Bloom 滤镜的最大大小阈值。

      10MB

      Spark.sql.Optimizer.runtime.bloomFilter.enable

      指定当随机连接的一侧具有选择性谓词时,是否插入布隆过滤器以减少随机播放数据。

      TRUE

      spark.sql.Optimizer.runtime.bloomFil expectedNumItems

      定义运行时 Bloom 过滤器中预期项目的默认数量。

      1000000

      spark.sql.Optimizer.runtime.bloomFil maxNumBits

      设置运行时 Bloom 过滤器中允许的最大位数。

      67108864

      spark.sql.Optimizer.runtime.bloomFil maxNumItems

      设置运行时 Bloom 过滤器中允许的最大预期项目数。

      4000000

      spark.sql.Optimizer.runtime.bloomFilter.number.

      限制每次查询允许的非 DPP 运行时过滤器的最大数量,以防止驱动程序 out-of-memory出错。

      10

      Spark.sql.Optimizer.runtime.bloomfilter.numbit

      定义运行时布隆过滤器中使用的默认位数。

      8388608

      spark.sql.optimizer.runtim rowlevelOperationGroup过滤器. 已启用

      指定是否为行级操作启用运行时组筛选。允许数据源:

      • 使用数据源筛选器修剪整组数据(例如文件或分区)

      • 执行运行时查询以识别匹配的记录

      • 丢弃不必要的组以避免昂贵的重写

      限制:

      • 并非所有表达式都可以转换为数据源筛选器

      • 有些表达式需要 Spark 求值(例如子查询)

      TRUE

      Spark.sql.Optimizer.runtimeFilter semiJoinReduction. 已启用

      指定当随机连接的一侧具有选择性谓词时,是否插入半联接以减少随机播放数据。

      FALSE

      spark.sql.parquet.AgregatePus

      指定是否将聚合向下推送到 Parquet 进行优化。支持:

      • 布尔型、整数、浮点型和日期类型的最小值和最大值

      • 所有数据类型的计数

      如果任何 Parquet 文件页脚中缺少统计信息,则会抛出异常。

      FALSE

      sql.parquet。 columnarReaderBatch大小

      控制每个 Parquet 矢量化阅读器批次中的行数。选择一个平衡性能开销和内存使用量的值,以防止 out-of-memory出错。

      4096

      Spark.sql.session.time

      定义会话时区,用于处理字符串文字中的时间戳和 Java 对象转换。接受:

      • 以地区为基础 IDs 的 area/city 格式(例如美国/洛杉矶)

      • 区域偏移量采用 (+/-) HH、(+/-) HH: mm 或 (+/-) HH: mm: ss 格式(例如 -08 或 + 01:00)

      • UTC 或 Z 作为 + 00:00 的别名

      (当地时区的值)

      spark.sql.shuffle.part

      设置联接或聚合期间用于数据洗牌的默认分区数。无法在结构化流式查询从同一检查点位置重新启动之间进行修改。

      200

      spark.sql。 shuffledHashJoin因子

      定义用于确定 shuffle 哈希加入资格的乘法系数。当小边数据大小乘以此系数小于大边数据大小时,将选择随机哈希联接。

      3

      火花.sql.sources。 parallelPartitionDiscovery. 阈值

      使用基于文件的源(Parquet、JSON 和 ORC)设置驱动端文件列表的最大路径数。如果在分区发现期间超出限制,则使用单独的 Spark 分布式作业列出文件。

      32

      spark.sql.statistics.histics.h

      指定是否在列统计数据计算期间生成等高直方图以提高估计精度。除了基本列统计数据所需的扫描之外,还需要进行额外的表扫描。

      FALSE

    3. 对于以天为单位的数据保留,请输入数据的保留天数。

    4. 对于结果格式,选择 CSV 或 Par q uet 作为 ML 输入通道应使用的数据格式。

  8. 对于服务访问权限,请选择将用于访问此表的现有服务角色名称,或者选择创建并使用新的服务角色

  9. 对于加密,选择使用自定义 KMS 密钥加密密钥以指定您自己的 KMS 密钥和相关信息。否则,Clean Rooms ML 将管理加密。

  10. 选择 “创建 ML 输入通道”。

    创建 ML 输入通道需要几分钟。您可以在 “机器学习模型” 选项卡上查看机器学习输入通道列表。

注意

创建 ML 输入通道后,您无法对其进行编辑。

API

创建 ML 输入频道 (API)

使用您的特定参数运行以下代码:

import boto3 acr_client = boto3.client('cleanroomsml') acr_client.create_ml_input_channel( name="ml_input_channel_name", membershipIdentifier='membership_id', configuredModelAlgorithmAssociations=[configured_model_algorithm_association_arn], retentionInDays=1, inputChannel={ "dataSource": { "protectedQueryInputParameters": { "sqlParameters": { "queryString": "select * from table", "computeConfiguration": { "worker": { "type": "CR.1X", "number": 16, "properties": { "spark": { "spark configuration key": "spark configuration value", } } } }, "resultFormat": "PARQUET" } } }, "roleArn": "arn:aws:iam::111122223333:role/role_name" } ) channel_arn = resp['ML Input Channel ARN']