AWS Clean Rooms ML での ML 入力チャネルの作成 - AWS Clean Rooms

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Clean Rooms ML での ML 入力チャネルの作成

前提条件:

  • にアクセスできる AWS アカウント 。 AWS Clean Rooms

  • ML 入力チャネルを作成する AWS Clean Rooms にセットアップされたコラボレーション

  • コラボレーションでデータをクエリし、ML 入力チャネルを作成するアクセス許可。

  • (オプション) ML 入力チャネルに関連付ける既存のモデルアルゴリズム、または新しい入力チャネルを作成するアクセス許可

  • (オプション) 指定したモデルに対して実行できる分析ルールを含むテーブル。

  • (オプション) データセットの生成に使用する既存の SQL クエリまたは分析テンプレート

  • (オプション) 適切なアクセス許可、または新しいサービスロールを作成するためのアクセス許可を持つ既存のサービスロール

  • (オプション) 独自の暗号化 AWS KMS キーを使用する場合のカスタムキー

  • コラボレーションで ML モデルを作成および管理するための適切なアクセス許可

ML 入力チャネルは、特定のデータクエリから作成されるデータセットです。データをクエリできるメンバーは、ML 入力チャネルを作成することで、トレーニングと推論のためにデータを準備できます。ML 入力チャネルを作成すると、そのデータを同じコラボレーション内のさまざまなトレーニングモデルで使用できます。トレーニングと推論用に個別の ML 入力チャネルを作成する必要があります。

ML 入力チャネルを作成するには、入力データのクエリと ML 入力チャネルの作成に使用される SQL クエリを指定する必要があります。このクエリの結果はメンバーと共有されることはなく、Clean Rooms ML の境界内にとどまります。リファレンス Amazon リソースネーム (ARN) は、次のステップでモデルのトレーニングや推論の実行に使用されます。

Console
ML 入力チャネルを作成するには (コンソール)
  1. にサインイン AWS マネジメントコンソール し、https://console.aws.amazon.com/cleanrooms で AWS Clean Rooms コンソールを開きます。

  2. 左のナビゲーションペインで、[コラボレーション] を選択します。

  3. コラボレーションページで、ML 入力チャネルを作成するコラボレーションを選択します。

  4. コラボレーションが開いたら、ML モデルタブを選択します。

  5. カスタム ML モデルML 入力チャネル セクションで、ML 入力チャネルの作成 を選択します。

  6. ML 入力チャネルの作成ページで、ML 入力チャネルの詳細について、次の操作を行います。

    1. 名前 に、チャネルの一意の名前を入力します。

    2. (オプション) 説明 に、チャネルの説明を入力します。

    3. 関連付けられたモデルアルゴリズムで、使用するアルゴリズムを選択します。

      モデルアルゴリズムを関連付けを選択して新しいアルゴリズムを追加します。

  7. データセット で、トレーニングデータセットを生成する方法を選択します。

    • SQL クエリを選択して、SQL クエリの結果をトレーニングデータセットとして使用します。

      SQL クエリを選択した場合は、SQL クエリフィールドにクエリを入力します。

      (オプション) 最近使用したクエリをインポートするには、最近のクエリからインポートを選択します。

    • 分析テンプレートを選択して、分析テンプレートの結果をトレーニングデータセットとして使用します。

      警告

      合成データ生成は、特定の個人が元のデータセットに存在するか、それらの個人の学習属性が存在するかにかかわらず、個々の属性の推測から保護します。ただし、個人を特定できる情報 (PII) を含む元のデータセットのリテラル値が合成データセットに表示されるのを防ぐことはできません。

      1 つのデータサブジェクトのみに関連付けられている入力データセット内の値は、データセットを再識別する可能性があるため、避けることをお勧めします。たとえば、郵便番号にユーザーが 1 人しかいない場合、合成データセットにその郵便番号が存在すると、そのユーザーが元のデータセットに属していたことが確認されます。高精度値の切り捨てや、まれなカタログを他のカタログに置き換えるなどの手法を使用して、このリスクを軽減できます。これらの変換は、ML 入力チャネルの作成に使用されるクエリの一部にすることができます。

    1. テーブルが関連付けられていない場合は、テーブルを関連付けて、指定されたモデルに対して実行できる分析ルールでテーブルを追加します。

    2. このデータチャネルを作成するときに使用するワーカータイプを選択します。デフォルトのワーカータイプは CR.1X です。使用するワーカーの数を指定します。デフォルトのワーカー番号は 16 です。Spark プロパティを指定するには:

      1. [Spark のプロパティ] を拡張します。

      2. Spark プロパティの追加を選択します。

      3. Spark プロパティダイアログボックスで、ドロップダウンリストからプロパティ名を選択し、を入力します。

      次の表は、各プロパティの定義を示しています。

      Spark プロパティの詳細については、Apache Spark ドキュメントの「Spark Properties」を参照してください。

      プロパティ名 説明 デフォルト値

      spark.task.maxFailures

      ジョブが失敗するまでにタスクが失敗する連続回数を制御します。1 以上の値が必要です。許可される再試行回数は、この値から 1 を引いた値に等しくなります。試行が成功すると、失敗数はリセットされます。さまざまなタスクにまたがる障害は、この制限に累積されません。

      4

      spark.sql.files.maxPartitionBytes

      Parquet、JSON、ORC などのファイルベースのソースから読み取るときに 1 つのパーティションにパックする最大バイト数を設定します。

      128MB

      spark.hadoop.fs.s3.maxRetries

      Amazon S3 ファイルオペレーションの再試行の最大回数を設定します。

      spark.network.timeout

      すべてのネットワークインタラクションのデフォルトのタイムアウトを設定します。設定されていない場合、次のタイムアウト設定を上書きします。

      • spark.storage.blockManagerHeartbeatTimeoutMs

      • spark.shuffle.io.connectionTimeout

      • spark.rpc.askTimeout

      • spark.rpc.lookupTimeout

      120 秒

      spark.rdd.compress

      spark.io.compression.codec を使用してシリアル化された RDD パーティションを圧縮するかどうかを指定します。Java および Scala の StorageLevel.MEMORY_ONLY_SER、または Python の StorageLevel.MEMORY_ONLY に適用されます。ストレージ領域を削減しますが、追加の CPU 処理時間が必要です。

      FALSE

      spark.shuffle.spill.compress

      spark.io.compression.codec を使用してシャッフルスピルデータを圧縮するかどうかを指定します。

      TRUE

      spark.sql.adaptive.advisoryPartitionSizeInBytes

      spark.sql.adaptive.enabled が true の場合、適応最適化中のシャッフルパーティションのターゲットサイズをバイト単位で設定します。小さなパーティションを結合するとき、または歪んだパーティションを分割するときのパーティションサイズを制御します。

      (spark.sql.adaptive.shuffle.targetPostShuffleInputSize の値)

      spark.sql.adaptive.autoBroadcastJoinThreshold

      結合中にワーカーノードにブロードキャストするための最大テーブルサイズをバイト単位で設定します。適応フレームワークにのみ適用されます。spark.sql.autoBroadcastJoinThreshold と同じデフォルト値を使用します。ブロードキャストを無効にするには、-1 に設定します。

      (none)

      spark.sql.adaptive.coalescePartitions.enabled

      spark.sql.adaptive.advisoryPartitionSizeInBytes に基づいて連続するシャッフルパーティションを結合してタスクサイズを最適化するかどうかを指定します。spark.sql.adaptive.enabled を true にする必要があります。

      TRUE

      spark.sql.adaptive.coalescePartitions.initialPartitionNum

      結合前のシャッフルパーティションの初期数を定義します。spark.sql.adaptive.enabled と spark.sql.adaptive.coalescePartitions.enabled の両方が true である必要があります。デフォルトは spark.sql.shuffle.partitions の値です。

      (none)

      spark.sql.adaptive.coalescePartitions.minPartitionSize

      アダプティブ最適化中にパーティションが小さすぎるのを防ぐために、結合されたシャッフルパーティションの最小サイズを設定します。

      1 MB

      spark.sql.adaptive.coalescePartitions.parallelismFirst

      パーティションの結合中に spark.sql.adaptive.advisoryPartitionSizeInBytes ではなく、クラスター並列処理に基づいてパーティションサイズを計算するかどうかを指定します。並列処理を最大化するために、設定されたターゲットサイズよりも小さいパーティションサイズを生成します。これをビジー状態のクラスターで false に設定して、過剰な小さなタスクを防止することでリソース使用率を向上させることをお勧めします。

      TRUE

      spark.sql.adaptive.enabled

      正確なランタイム統計に基づいて、アダプティブクエリの実行を有効にして、クエリの実行中にクエリプランを再最適化するかどうかを指定します。

      TRUE

      spark.sql.adaptive.forceOptimizeSkewedJoin

      追加のシャッフルを導入した場合でも、OptimizeSkewedJoin を強制的に有効にするかどうかを指定します。

      FALSE

      spark.sql.adaptive.localShuffleReader.enabled

      ソートマージ結合からブロードキャストハッシュ結合に変換した後など、シャッフルパーティショニングが必要ない場合にローカルシャッフルリーダーを使用するかどうかを指定します。spark.sql.adaptive.enabled を true にする必要があります。

      TRUE

      spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

      ローカルハッシュマップを構築するための最大パーティションサイズをバイト単位で設定します。次の場合に、シャッフルされたハッシュ結合をソートマージ結合よりも優先します。

      • この値は spark.sql.adaptive.advisoryPartitionSizeInBytes 以上

      • すべてのパーティションサイズがこの制限内です

      spark.sql.join.preferSortMergeJoin 設定を上書きします。

      0 バイト

      spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled

      spark.sql.adaptive.advisoryPartitionSizeInBytes に基づいて小さなパーティションに分割することで、歪んだシャッフルパーティションを最適化するかどうかを指定します。spark.sql.adaptive.enabled を true にする必要があります。

      TRUE

      spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor

      分割中にパーティションをマージするためのサイズしきい値係数を定義します。この係数より小さいパーティションに spark.sql.adaptive.advisoryPartitionSizeInBytes を掛けるとマージされます。

      0.2

      spark.sql.adaptive.skewJoin.enabled

      スキューされたパーティションを分割してオプションでレプリケートすることで、シャッフルされた結合でデータスキューを処理するかどうかを指定します。ソートマージおよびシャッフルされたハッシュ結合に適用されます。spark.sql.adaptive.enabled を true にする必要があります。

      TRUE

      spark.sql.adaptive.skewJoin.skewedPartitionFactor

      パーティションスキューを決定するサイズ係数を決定します。パーティションのサイズが両方を超えると、パーティションが歪みます。

      • この係数をパーティションサイズの中央値で乗算

      • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes の値

      5

      spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

      歪んだパーティションを識別するためのサイズしきい値をバイト単位で設定します。パーティションのサイズが両方を超えると、パーティションが歪みます。

      • このしきい値

      • パーティションサイズの中央値に spark.sql.adaptive.skewJoin.skewedPartitionFactor を掛けた値

      この値は spark.sql.adaptive.advisoryPartitionSizeInBytes よりも大きく設定することをお勧めします。

      256MB

      spark.sql.autoBroadcastJoinThreshold

      結合中にワーカーノードにブロードキャストするための最大テーブルサイズをバイト単位で設定します。ブロードキャストを無効にするには、-1 に設定します。

      10MB

      spark.sql.broadcastTimeout

      ブロードキャスト結合中のブロードキャストオペレーションのタイムアウト期間を秒単位で制御します。

      300 秒

      spark.sql.cbo.enabled

      計画統計の推定でコストベースの最適化 (CBO) を有効にするかどうかを指定します。

      FALSE

      spark.sql.cbo.joinReorder.dp.star.filter

      コストベースの結合列挙中にスター結合フィルターヒューリスティックを適用するかどうかを指定します。

      FALSE

      spark.sql.cbo.joinReorder.dp.threshold

      動的プログラミングアルゴリズムで許可される結合ノードの最大数を設定します。

      12

      spark.sql.cbo.joinReorder.enabled

      コストベースの最適化 (CBO) で結合順序変更を有効にするかどうかを指定します。

      FALSE

      spark.sql.cbo.planStats.enabled

      論理プランの生成中にカタログから行数と列統計を取得するかどうかを指定します。

      FALSE

      spark.sql.cbo.starSchemaDetection

      スタースキーマ検出に基づいて結合順序変更を有効にするかどうかを指定します。

      FALSE

      spark.sql.files.maxPartitionNum

      ファイルベースのソース (Parquet、JSON、ORC) の分割ファイルパーティションのターゲット最大数を設定します。初期数がこの値を超えたときにパーティションを再スケーリングします。これは推奨されるターゲットであり、保証された制限ではありません。

      (none)

      spark.sql.files.maxRecordsPerFile

      1 つのファイルに書き込むレコードの最大数を設定します。ゼロまたは負の値に設定されている場合、制限は適用されません。

      0

      spark.sql.files.minPartitionNum

      ファイルベースのソース (Parquet、JSON、ORC) の分割ファイルパーティションのターゲット最小数を設定します。デフォルトは spark.sql.leafNodeDefaultParallelism です。これは推奨されるターゲットであり、保証された制限ではありません。

      (none)

      spark.sql.inMemoryColumnarStorage.batchSize

      列キャッシュのバッチサイズを制御します。サイズを大きくすると、メモリ使用率と圧縮が向上しますが、out-of-memoryエラーのリスクが高まります。

      10000

      spark.sql.inMemoryColumnarStorage.compressed

      データ統計に基づいて列の圧縮コーデックを自動的に選択するかどうかを指定します。

      TRUE

      spark.sql.inMemoryColumnarStorage.enableVectorizedReader

      列キャッシュのベクトル化された読み取りを有効にするかどうかを指定します。

      TRUE

      spark.sql.legacy.allowHashOnMapType

      マップタイプデータ構造でハッシュオペレーションを許可するかどうかを指定します。このレガシー設定は、古い Spark バージョンのマップタイプの処理との互換性を維持します。

      spark.sql.legacy.allowNegativeScaleOfDecimal

      10 進数型定義で負のスケール値を許可するかどうかを指定します。このレガシー設定は、負の 10 進スケールをサポートした古い Spark バージョンとの互換性を維持します。

      spark.sql.legacy.castComplexTypesToString.enabled

      複雑な型を文字列にキャストするためのレガシー動作を有効にするかどうかを指定します。古い Spark バージョンのタイプ変換ルールとの互換性を維持します。

      spark.sql.legacy.charVarcharAsString

      CHAR 型と VARCHAR 型を STRING 型として扱うかどうかを指定します。このレガシー設定は、古い Spark バージョンの文字列タイプの処理との互換性を提供します。

      spark.sql.legacy.createEmptyCollectionUsingStringType

      文字列タイプの要素を使用して空のコレクションを作成するかどうかを指定します。このレガシー設定は、古い Spark バージョンのコレクションの初期化動作との互換性を維持します。

      spark.sql.legacy.exponentLiteralAsDecimal.enabled

      指数リテラルを 10 進数型として解釈するかどうかを指定します。このレガシー設定は、古い Spark バージョンの数値リテラル処理との互換性を維持します。

      spark.sql.legacy.json.allowEmptyString.enabled

      JSON 処理で空の文字列を許可するかどうかを指定します。このレガシー設定は、古い Spark バージョンの JSON 解析動作との互換性を維持します。

      spark.sql.legacy.parquet.int96RebaseModelRead

      Parquet ファイルの読み取り時にレガシー INT96 タイムスタンプリベースモードを使用するかどうかを指定します。このレガシー設定は、古い Spark バージョンのタイムスタンプ処理との互換性を維持します。

      spark.sql.legacy.timeParserPolicy

      後方互換性のための解析動作を制御します。このレガシー設定により、文字列からタイムスタンプと日付を解析する方法が決まります。

      spark.sql.legacy.typeCoercion.datetimeToString.enabled

      日時値を文字列に変換するときにレガシー型の強制動作を有効にするかどうかを指定します。古い Spark バージョンの日時変換ルールとの互換性を維持します。

      spark.sql.maxSinglePartitionBytes

      最大パーティションサイズをバイト単位で設定します。プランナーは、並列処理を改善するために、大きなパーティションのシャッフルオペレーションを導入します。

      128 メートル

      spark.sql.metadataCacheTTLSeconds

      メタデータキャッシュtime-to-live (TTL) を制御します。パーティションファイルメタデータとセッションカタログキャッシュに適用されます。以下が必要です。

      • 0 より大きい正の値

      • spark.sql.catalogImplementationを hive に設定

      • spark.sql.hive.filesourcePartitionFileCacheSize が 0 より大きい

      • spark.sql.hive.manageFilesourcePartitions を true に設定

      -1000 ミリ秒

      spark.sql.optimizer.collapseProjectAlwaysInline

      重複が発生した場合でも、隣接する射影とインライン式を折りたたむかどうかを指定します。

      FALSE

      spark.sql.optimizer.dynamicPartitionPruning.enabled

      結合キーとして使用されるパーティション列の述語を生成するかどうかを指定します。

      TRUE

      spark.sql.optimizer.enableCsvExpressionOptimization

      from_csv オペレーションから不要な列を削除して、SQL オプティマイザの CSV 式を最適化するかどうかを指定します。

      TRUE

      spark.sql.optimizer.enableJsonExpressionOptimization

      SQL オプティマイザで JSON 式を最適化するかどうかを次のように指定します。

      • from_json オペレーションから不要な列を削除する

      • from_json と to_json の組み合わせの簡素化

      • named_struct オペレーションの最適化

      TRUE

      spark.sql.optimizer.excludedRules

      無効にするオプティマイザルールを定義し、カンマ区切りのルール名で識別します。一部のルールは、正確性のために必要であるため、無効にすることはできません。オプティマイザは、どのルールが正常に無効化されたかをログに記録します。

      (none)

      spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold

      アプリケーション側に Bloom フィルターを挿入するために必要な最小集約スキャンサイズをバイト単位で設定します。

      10GB

      spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold

      作成側で Bloom フィルターを挿入するための最大サイズしきい値を定義します。

      10MB

      spark.sql.optimizer.runtime.bloomFilter.enabled

      シャッフル結合の片側に選択的述語がある場合に、シャッフルデータを減らすために Bloom フィルターを挿入するかどうかを指定します。

      TRUE

      spark.sql.optimizer.runtime.bloomFilter.expectedNumItems

      ランタイム Bloom フィルターで予想される項目のデフォルトの数を定義します。

      1000000

      spark.sql.optimizer.runtime.bloomFilter.maxNumBits

      ランタイム Bloom フィルターで許可される最大ビット数を設定します。

      67108864

      spark.sql.optimizer.runtime.bloomFilter.maxNumItems

      ランタイム Bloom フィルターで許可される予想項目の最大数を設定します。

      4000000

      spark.sql.optimizer.runtime.bloomFilter.number.threshold

      ドライバーのout-of-memoryエラーを防ぐために、クエリごとに許可される非 DPP ランタイムフィルターの最大数を制限します。

      10

      spark.sql.optimizer.runtime.bloomFilter.numBits

      ランタイム Bloom フィルターで使用されるデフォルトのビット数を定義します。

      8388608

      spark.sql.optimizer.runtime.rowlevelOperationGroupFilter.enabled

      行レベルのオペレーションでランタイムグループのフィルタリングを有効にするかどうかを指定します。データソースに以下を許可します。

      • データソースフィルターを使用してデータグループ全体 (ファイルやパーティションなど) を削除する

      • ランタイムクエリを実行して一致するレコードを特定する

      • 不要なグループを破棄して、高価な書き換えを避ける

      機能制限:

      • すべての式がデータソースフィルターに変換できるわけではありません

      • 一部の式では、Spark 評価 (サブクエリなど) が必要です。

      TRUE

      spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled

      シャッフル結合の片側に選択述語がある場合に、シャッフルデータを減らすためにセミ結合を挿入するかどうかを指定します。

      FALSE

      spark.sql.parquet.aggregatePushdown

      最適化のために集計を Parquet にプッシュダウンするかどうかを指定します。以下をサポートします。

      • ブール型、整数型、浮動小数点型、日付型の MIN と MAX

      • すべてのデータ型の COUNT

      Parquet ファイルフッターに統計情報がない場合、例外をスローします。

      FALSE

      spark.sql.parquet.columnarReaderBatchSize

      Parquet ベクトル化された各リーダーバッチの行数を制御します。out-of-memoryエラーを防ぐために、パフォーマンスのオーバーヘッドとメモリ使用量のバランスを取る値を選択します。

      4096

      spark.sql.session.timeZone

      文字列リテラルおよび Java オブジェクト変換のタイムスタンプを処理するセッションタイムゾーンを定義します。以下を受け入れます。

      • 地域/都市形式のリージョンベースの IDs (アメリカ/Los_Angeles など)

      • (+/-)HH、(+/-)HH:mm、または (+/-)HH:mm:ss 形式のゾーンオフセット (-08 や +01:00 など)

      • +00:00 のエイリアスとしての UTC または Z

      (ローカルタイムゾーンの値)

      spark.sql.shuffle.partitions

      結合または集約中のデータシャッフルのパーティションのデフォルト数を設定します。同じチェックポイントの場所からの構造化ストリーミングクエリの再起動の間に変更することはできません。

      200

      spark.sql.shuffledHashJoinFactor

      シャッフルハッシュ結合の適格性を判断するために使用される乗算係数を定義します。シャッフルハッシュ結合は、スモールサイドのデータサイズにこの係数を掛けた値がラージサイドのデータサイズより小さい場合に選択されます。

      3

      spark.sql.sources.parallelPartitionDiscovery.threshold

      ファイルベースのソース (Parquet、JSON、ORC) を使用したドライバー側のファイルリストの最大パス数を設定します。パーティション検出中に超過すると、ファイルは別の Spark 分散ジョブを使用して一覧表示されます。

      32

      spark.sql.statistics.histogram.enabled

      推定精度を向上させるために、列統計の計算中に等高ヒストグラムを生成するかどうかを指定します。基本的な列統計に必要な範囲を超える追加のテーブルスキャンが必要です。

      FALSE

    3. データ保持日数には、データを保持する日数を入力します。

    4. 結果形式では、ML 入力チャネルが使用するデータ形式として CSV または Parquet を選択します。

  8. サービスアクセスでは、このテーブルへのアクセスに使用する既存のサービスロール名を選択するか、新しいサービスロールを作成して使用します

  9. 暗号化 では、カスタム KMS キーを使用して Encrypt シークレットを選択し、独自の KMS キーと関連情報を指定します。それ以外の場合、Clean Rooms ML が暗号化を管理します。

  10. ML 入力チャネルの作成 を選択します。

    ML 入力チャネルの作成には数分かかります。ML 入力チャネルのリストは、ML モデルタブで確認できます。

注記

ML 入力チャネルを作成した後は、編集できません。

API

ML 入力チャネルを作成するには (API)

特定のパラメータで次のコードを実行します。

import boto3 acr_client = boto3.client('cleanroomsml') acr_client.create_ml_input_channel( name="ml_input_channel_name", membershipIdentifier='membership_id', configuredModelAlgorithmAssociations=[configured_model_algorithm_association_arn], retentionInDays=1, inputChannel={ "dataSource": { "protectedQueryInputParameters": { "sqlParameters": { "queryString": "select * from table", "computeConfiguration": { "worker": { "type": "CR.1X", "number": 16, "properties": { "spark": { "spark configuration key": "spark configuration value", } } } }, "resultFormat": "PARQUET" } } }, "roleArn": "arn:aws:iam::111122223333:role/role_name" } ) channel_arn = resp['ML Input Channel ARN']