在 AWS Clean Rooms ML 中建立 ML 輸入通道 - AWS Clean Rooms

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 AWS Clean Rooms ML 中建立 ML 輸入通道

先決條件:

  • AWS 帳戶 可存取 的 AWS Clean Rooms

  • 您要 AWS Clean Rooms 在其中建立 ML 輸入通道的協同合作設定

  • 在協同合作中查詢資料和建立 ML 輸入通道的許可。

  • (選用) 要與 ML 輸入通道建立關聯的現有模型演算法,或建立新的模型演算法的許可

  • (選用) 具有分析規則的資料表,可針對您指定的模型執行。

  • (選用) 用於產生資料集的現有 SQL 查詢或分析範本

  • (選用) 具有適當許可的現有服務角色,或建立新服務角色的許可

  • (選用) 如果您想要使用自己的加密 AWS KMS 金鑰的自訂金鑰

  • 在協同合作中建立和管理 ML 模型的適當許可

ML 輸入通道是從特定資料查詢建立的資料集。能夠查詢資料的成員可以透過建立 ML 輸入通道來準備其資料以進行訓練和推論。建立 ML 輸入通道可讓該資料在相同的協同合作中用於不同的訓練模型。您應該為訓練和推論建立個別的 ML 輸入通道。

若要建立 ML 輸入通道,您必須指定用來查詢輸入資料的 SQL 查詢,並建立 ML 輸入通道。此查詢的結果絕不會與任何成員共用,並保持在 Clean Rooms ML 的界限內。參考 Amazon Resource Name (ARN) 用於後續步驟,以訓練模型或執行推論。

Console
建立 ML 輸入通道 (主控台)
  1. 登入 AWS 管理主控台 ,並在 https://https://console.aws.amazon.com/cleanrooms 開啟 AWS Clean Rooms 主控台。

  2. 在左側導覽窗格中,選擇協同合作

  3. 協同合作頁面上,選擇您要建立 ML 輸入通道的協同合作。

  4. 協同合作開啟後,選擇 ML 模型索引標籤。

  5. 自訂 ML 模型ML 輸入通道區段中,選擇建立 ML 輸入通道

  6. 建立 ML 輸入通道頁面上,針對 ML 輸入通道詳細資訊,執行下列動作:

    1. 名稱中,輸入頻道的唯一名稱。

    2. (選用) 針對描述,輸入頻道的描述。

    3. 針對關聯的模型演算法,選取要使用的演算法。

      選擇關聯模型演算法以新增。

  7. 針對資料集,選擇產生訓練資料集的方法:

    • 選擇 SQL 查詢以使用 SQL 查詢的結果做為訓練資料集。

      如果您選擇 SQL 查詢,請在 SQL 查詢欄位中輸入您的查詢

      (選用) 若要匯入您最近使用的查詢,請選擇從最近的查詢匯入

    • 選擇分析範本以使用分析範本的結果做為訓練資料集。

      警告

      合成資料產生可防止推斷個別屬性,無論原始資料集中是否存在特定個人,或是否存在這些個人的學習屬性。不過,它不會阻止原始資料集的常值出現,包括個人身分識別資訊 (PII) 出現在合成資料集中。

      建議您避免輸入資料集中僅與一個資料主體相關聯的值,因為這些值可能會重新識別資料主體。例如,如果只有一個使用者住在郵遞區號中,則合成資料集中是否存在該郵遞區號,會確認使用者位於原始資料集中。可以使用截斷高精確度值或將不常見目錄取代為其他 等技術來降低此風險。這些轉換可以是用來建立 ML 輸入通道之查詢的一部分。

    1. 如果沒有相關聯的資料表,請選擇關聯資料表以新增具有可對指定模型執行之分析規則的資料表。

    2. 選擇建立此資料通道時要使用的工作者類型。預設工作者類型為 CR.1X。指定要使用的工作者數量。預設工作者編號為 16。若要指定 Spark 屬性

      1. 展開 Spark 屬性

      2. 選擇新增 Spark 屬性

      3. Spark 屬性對話方塊中,從下拉式清單中選擇屬性名稱,然後輸入

      下表提供每個屬性的定義。

      如需 Spark 屬性的詳細資訊,請參閱 Apache Spark 文件中的 Spark 屬性

      屬性名稱 Description 預設值

      spark.task.maxFailures

      控制任務在任務失敗之前可以失敗的連續次數。需要大於或等於 1 的值。允許重試次數等於此值減去 1。如果任何嘗試成功,就會重設失敗計數。不同任務的故障不會累積到此限制。

      4

      spark.sql.files.maxPartitionBytes

      設定從 Parquet、JSON 和 ORC 等檔案型來源讀取時要封裝到單一分割區的最大位元組數。

      128MB

      spark.hadoop.fs.s3.maxRetries

      設定 Amazon S3 檔案操作的重試嘗試次數上限。

      spark.network.timeout

      設定所有網路互動的預設逾時。如果未設定,則覆寫下列逾時設定:

      • spark.storage.blockManagerHeartbeatTimeoutMs

      • spark.shuffle.io.connectionTimeout

      • spark.rpc.askTimeout

      • spark.rpc.lookupTimeout

      120 秒

      spark.rdd.compress

      指定是否使用 spark.io.compression.codec 壓縮序列化 RDD 分割區。適用於 Java 和 Scala 中的 StorageLevel.MEMORY_ONLY_SER,或 Python 中的 StorageLevel.MEMORY_ONLY。減少儲存空間,但需要額外的 CPU 處理時間。

      FALSE

      spark.shuffle.spill.compress

      指定是否使用 spark.io.compression.codec 壓縮隨機溢出資料。

      TRUE

      spark.sql.adaptive.advisoryPartitionSizeInBytes

      在 spark.sql.adaptive.enabled 為 true 的適應性最佳化期間,設定隨機播放分割區的目標大小,以位元組為單位。合併小型分割區或分割扭曲分割區時,控制分割區大小。

      ( spark.sql.adaptive.shuffle.targetPostShuffleInputSize 的值)

      spark.sql.adaptive.autoBroadcastJoinThreshold

      設定在聯結期間廣播至工作者節點的資料表大小上限,以位元組為單位。僅適用於自適應架構。使用與 spark.sql.autoBroadcastJoinThreshold 相同的預設值。設定為 -1 以停用廣播。

      (無)

      spark.sql.adaptive.coalescePartitions.enabled

      指定是否要根據 spark.sql.adaptive.advisoryPartitionSizeInBytes 來合併連續隨機播放分割區,以最佳化任務大小。需要 spark.sql.adaptive.enabled 才能成立。

      TRUE

      spark.sql.adaptive.coalescePartitions.initialPartitionNum

      在合併之前定義隨機播放分割區的初始數量。同時需要 spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalescePartitions.enabled 為 true。預設為 spark.sql.shuffle.partitions 的值。

      (無)

      spark.sql.adaptive.coalescePartitions.minPartitionSize

      設定合併隨機播放分割區的大小下限,以防止分割區在適應性最佳化期間變得太小。

      1 MB

      spark.sql.adaptive.coalescePartitions.parallelismFirst

      指定是否要在分割區合併期間根據叢集平行處理而非 spark.sql.adaptive.advisoryPartitionSizeInBytes 計算分割區大小。產生的分割區大小小於設定的目標大小,以最大化平行處理。我們建議在忙碌叢集上將此設定為 false,以透過防止過多的小型任務來改善資源使用率。

      TRUE

      spark.sql.adaptive.enabled

      指定是否啟用自適應查詢執行,以在查詢執行期間根據準確的執行時間統計資料重新最佳化查詢計劃。

      TRUE

      spark.sql.adaptive.forceOptimizeSkewedJoin

      指定是否強制啟用 OptimizeSkewedJoin,即使它引入額外的隨機播放。

      FALSE

      spark.sql.adaptive.localShuffleReader.enabled

      指定是否在不需要隨機分割時使用本機隨機讀取器,例如從排序合併聯結轉換為廣播雜湊聯結之後。需要 spark.sql.adaptive.enabled 才能成立。

      TRUE

      spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

      設定用於建置本機雜湊映射的分割區大小上限,以位元組為單位。在下列情況下,將隨機雜湊聯結優先於排序合併聯結:

      • 此值等於或超過 spark.sql.adaptive.advisoryPartitionSizeInBytes

      • 所有分割區大小都在此限制內

      覆寫 spark.sql.join.preferSortMergeJoin 設定。

      0 個位元組

      spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled

      指定是否根據 spark.sql.adaptive.advisoryPartitionSizeInBytes 將扭曲隨機分割區分割為較小的分割區,以最佳化扭曲隨機分割區。需要 spark.sql.adaptive.enabled 才能成立。

      TRUE

      spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor

      定義分割期間合併分割區的大小閾值因素。小於此因素的分割區乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes 會合併。

      0.2

      spark.sql.adaptive.skewJoin.enabled

      指定是否透過分割和選擇性複寫扭曲的分割區來處理隨機聯結中的資料扭曲。適用於排序合併和隨機雜湊聯結。需要 spark.sql.adaptive.enabled 才能成立。

      TRUE

      spark.sql.adaptive.skewJoin.skewedPartitionFactor

      決定決定分割區扭曲的大小因素。當分割區的大小超過以下兩者時,就會扭曲:

      • 此係數乘以中位數分割區大小

      • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 的值

      5

      spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

      設定識別扭曲分割區的大小閾值,以位元組為單位。當分割區的大小超過以下兩者時,就會扭曲:

      • 此閾值

      • 中位數分割區大小乘以 spark.sql.adaptive.skewJoin.skewedPartitionFactor

      我們建議您設定大於 spark.sql.adaptive.advisoryPartitionSizeInBytes 的值。

      256MB

      spark.sql.autoBroadcastJoinThreshold

      設定在聯結期間廣播至工作者節點的資料表大小上限,以位元組為單位。設定為 -1 以停用廣播。

      10MB

      spark.sql.broadcastTimeout

      控制廣播聯結期間廣播操作的逾時期間,以秒為單位。

      300 秒

      spark.sql.cbo.enabled

      指定是否要為計劃統計資料估算啟用成本型最佳化 (CBO)。

      FALSE

      spark.sql.cbo.joinReorder.dp.star.filter

      指定是否要在成本型聯結列舉期間套用星狀聯結篩選條件啟發式。

      FALSE

      spark.sql.cbo.joinReorder.dp.threshold

      設定動態程式設計演算法中允許的聯結節點數目上限。

      12

      spark.sql.cbo.joinReorder.enabled

      指定是否要在成本型最佳化 (CBO) 中啟用聯結重新排序。

      FALSE

      spark.sql.cbo.planStats.enabled

      指定是否要在邏輯計畫產生期間從目錄擷取資料列計數和資料欄統計資料。

      FALSE

      spark.sql.cbo.starSchemaDetection

      指定是否根據星狀結構描述偵測啟用聯結重新排序。

      FALSE

      spark.sql.files.maxPartitionNum

      設定檔案型來源 (Parquet、JSON 和 ORC) 的目標分割檔案分割區數量上限。當初始計數超過此值時,重新擴展分割區。這是建議的目標,而不是保證的限制。

      (無)

      spark.sql.files.maxRecordsPerFile

      設定寫入單一檔案的記錄數目上限。設定為零或負值時不會套用任何限制。

      0

      spark.sql.files.minPartitionNum

      設定檔案型來源 (Parquet、JSON 和 ORC) 的目標分割檔案分割區數量下限。預設為 spark.sql.leafNodeDefaultParallelism。這是建議的目標,而不是保證的限制。

      (無)

      spark.sql.inMemoryColumnarStorage.batchSize

      控制單欄式快取的批次大小。增加大小可改善記憶體使用率和壓縮,但會增加out-of-memory錯誤的風險。

      10000

      spark.sql.inMemoryColumnarStorage.compressed

      指定是否根據資料統計資料自動選取資料欄的壓縮轉碼器。

      TRUE

      spark.sql.inMemoryColumnarStorage.enableVectorizedReader

      指定是否要為單欄式快取啟用向量化讀取。

      TRUE

      spark.sql.legacy.allowHashOnMapType

      指定是否允許對映射類型資料結構執行雜湊操作。此舊版設定可維持與舊版 Spark 版本的映射類型處理的相容性。

      spark.sql.legacy.allowNegativeScaleOfDecimal

      指定是否在十進位類型定義中允許負縮放值。此舊版設定可維持與支援負小數位數擴展之較舊 Spark 版本的相容性。

      spark.sql.legacy.castComplexTypesToString.enabled

      指定是否啟用舊版行為,將複雜類型轉換為字串。維持與舊版 Spark 版本類型轉換規則的相容性。

      spark.sql.legacy.charVarcharAsString

      指定是否將 CHAR 和 VARCHAR 類型視為 STRING 類型。此舊版設定提供與較舊 Spark 版本的字串類型處理相容性。

      spark.sql.legacy.createEmptyCollectionUsingStringType

      指定是否使用字串類型元素建立空集合。此舊版設定可維持與較舊 Spark 版本的集合初始化行為的相容性。

      spark.sql.legacy.exponentLiteralAsDecimal.enabled

      指定是否要將指數常值解譯為十進位類型。此舊版設定可維持與舊版 Spark 版本數值常值處理的相容性。

      spark.sql.legacy.json.allowEmptyString.enabled

      指定是否要在 JSON 處理中允許空字串。此舊版設定可維持與舊版 Spark 版本的 JSON 剖析行為的相容性。

      spark.sql.legacy.parquet.int96RebaseModelRead

      指定是否在讀取 Parquet 檔案時使用舊版 INT96 時間戳記重新基礎模式。此舊版設定可維持與較舊 Spark 版本時間戳記處理的相容性。

      spark.sql.legacy.timeParserPolicy

      控制時間剖析行為以實現回溯相容性。此舊版設定會決定如何從字串剖析時間戳記和日期。

      spark.sql.legacy.typeCoercion.datetimeToString.enabled

      指定是否在將日期時間值轉換為字串時啟用舊版類型強制行為。維持與較舊 Spark 版本日期時間轉換規則的相容性。

      spark.sql.maxSinglePartitionBytes

      設定以位元組為單位的分割區大小上限。規劃器為較大的分割區引進隨機播放操作,以改善平行處理。

      128 公尺

      spark.sql.metadataCacheTTLSeconds

      控制中繼資料快取的time-to-live (TTL)。適用於分割區檔案中繼資料和工作階段目錄快取。需要:

      • 大於零的正值

      • spark.sql.catalogImplementation 設定為 hive

      • spark.sql.hive.filesourcePartitionFileCacheSize 大於零

      • spark.sql.hive.manageFilesourcePartitions 設定為 true

      -1000 毫秒

      spark.sql.optimizer.collapseProjectAlwaysInline

      指定是否要摺疊相鄰的投影和內嵌表達式,即使它會導致重複。

      FALSE

      spark.sql.optimizer.dynamicPartitionPruning.enabled

      指定是否要為用作聯結索引鍵的分割區資料欄產生述詞。

      TRUE

      spark.sql.optimizer.enableCsvExpressionOptimization

      指定是否要透過從_csv 操作剪除不必要的資料欄來最佳化 SQL 最佳化工具中的 CSV 表達式。

      TRUE

      spark.sql.optimizer.enableJsonExpressionOptimization

      指定是否要透過下列方式最佳化 SQL 最佳化工具中的 JSON 表達式:

      • 從_json 操作刪除不必要的資料欄

      • 簡化從_json 和 到_json 的組合

      • 最佳化 named_struct 操作

      TRUE

      spark.sql.optimizer.excludedRules

      定義要停用的最佳化工具規則,以逗號分隔的規則名稱識別。某些規則無法停用,因為其為正確性的必要條件。最佳化工具會記錄哪些規則已成功停用。

      (無)

      spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold

      設定在應用程式端插入 Bloom 篩選條件所需的最小彙總掃描大小,以位元組為單位。

      10GB

      spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold

      定義在建立端注入 Bloom 篩選條件的大小閾值上限。

      10MB

      spark.sql.optimizer.runtime.bloomFilter.enabled

      指定是否要插入 Bloom 篩選條件,以便在隨機聯結的一側具有選擇性述詞時減少隨機播放資料。

      TRUE

      spark.sql.optimizer.runtime.bloomFilter.expectedNumItems

      定義執行時間 Bloom 篩選條件中預期項目的預設數量。

      1000000

      spark.sql.optimizer.runtime.bloomFilter.maxNumBits

      設定執行時間 Bloom 篩選條件中允許的位元數上限。

      67108864

      spark.sql.optimizer.runtime.bloomFilter.maxNumItems

      設定執行時間 Bloom 篩選條件中允許的預期項目數量上限。

      4000000

      spark.sql.optimizer.runtime.bloomFilter.number.threshold

      限制每個查詢允許的非 DPP 執行時間篩選條件數量上限,以防止out-of-memory錯誤。

      10

      spark.sql.optimizer.runtime.bloomFilter.numBits

      定義執行時間 Bloom 篩選條件中使用的預設位元數。

      8388608

      spark.sql.optimizer.runtime.rowlevelOperationGroupFilter.enabled

      指定是否要為資料列層級操作啟用執行時間群組篩選。允許資料來源:

      • 使用資料來源篩選條件刪除整個資料群組 (例如檔案或分割區)

      • 執行執行時間查詢以識別相符的記錄

      • 捨棄不必要的群組,以避免昂貴的重寫

      限制:

      • 並非所有表達式都可以轉換為資料來源篩選條件

      • 有些表達式需要 Spark 評估 (例如子查詢)

      TRUE

      spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled

      指定是否要插入半聯結,以在隨機聯結的一側具有選擇性述詞時減少隨機資料。

      FALSE

      spark.sql.parquet.aggregatePushdown

      指定是否將彙總下推至 Parquet 以進行最佳化。支援:

      • 布林值、整數、浮點數和日期類型的 MIN 和 MAX

      • 所有資料類型的 COUNT

      如果任何 Parquet 檔案頁尾缺少統計資料,則擲回例外狀況。

      FALSE

      spark.sql.parquet.columnarReaderBatchSize

      控制每個 Parquet 向量化讀取器批次中的資料列數。選擇平衡效能額外負荷和記憶體用量的值,以防止out-of-memory錯誤。

      4096

      spark.sql.session.timeZone

      定義工作階段時區,以處理字串常值和 Java 物件轉換中的時間戳記。接受:

      • 區域/城市格式的區域型 IDs(例如 America/Los_Angeles)

      • 區域位移,以 (+/-)HH、(+/-)HH:mm 或 (+/-)HH:mm:ss 格式顯示 (例如 -08 或 +01:00)

      • +00:00 的 UTC 或 Z 別名

      (本機時區的值)

      spark.sql.shuffle.partitions

      設定聯結或彙總期間資料隨機播放的預設分割區數量。結構式串流查詢從相同檢查點位置重新啟動之間無法修改。

      200

      spark.sql.shuffledHashJoinFactor

      定義用於判斷隨機雜湊聯結資格的乘數係數。當小端資料大小乘以此因素小於大型端資料大小時,會選取隨機雜湊聯結。

      3

      spark.sql.sources.parallelPartitionDiscovery.threshold

      使用檔案型來源 (Parquet、JSON 和 ORC) 設定驅動程式端檔案清單的路徑數目上限。在分割區探索期間超過 時,會使用個別的 Spark 分散式任務列出檔案。

      32

      spark.sql.statistics.histogram.enabled

      指定是否要在資料欄統計資料運算期間產生等高長長條圖,以改善估算準確度。需要額外的資料表掃描,超過基本資料欄統計資料所需的掃描。

      FALSE

    3. 對於以天為單位的資料保留,請輸入要保留資料的天數。

    4. 針對結果格式,選擇 CSVParquet 做為 ML 輸入通道應使用的資料格式。

  8. 針對服務存取,選擇將用於存取此表格的現有服務角色名稱,或選擇建立並使用新的服務角色

  9. 針對加密,選擇使用自訂 KMS 金鑰加密秘密,以指定您自己的 KMS 金鑰和相關資訊。否則,Clean Rooms ML 會管理加密。

  10. 選擇建立 ML 輸入通道

    建立 ML 輸入通道需要幾分鐘的時間。您可以在 ML 模型索引標籤上查看 ML 輸入通道的清單。

注意

建立 ML 輸入通道之後,您就無法編輯它。

API

建立 ML 輸入通道 (API)

使用特定參數執行下列程式碼:

import boto3 acr_client = boto3.client('cleanroomsml') acr_client.create_ml_input_channel( name="ml_input_channel_name", membershipIdentifier='membership_id', configuredModelAlgorithmAssociations=[configured_model_algorithm_association_arn], retentionInDays=1, inputChannel={ "dataSource": { "protectedQueryInputParameters": { "sqlParameters": { "queryString": "select * from table", "computeConfiguration": { "worker": { "type": "CR.1X", "number": 16, "properties": { "spark": { "spark configuration key": "spark configuration value", } } } }, "resultFormat": "PARQUET" } } }, "roleArn": "arn:aws:iam::111122223333:role/role_name" } ) channel_arn = resp['ML Input Channel ARN']