

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# SageMaker AI Spark for Scala の例
<a name="apache-spark-example1"></a>

Amazon SageMaker AI は、Apache Spark アプリケーションを SageMaker と統合するために使用できる Apache AI Spark ライブラリ ([SageMaker AI Spark](https://github.com/aws/sagemaker-spark/tree/master/sagemaker-spark-sdk)) を提供しています。このトピックでは、Scala を使用した SageMaker AI Spark の使用を開始するのに役立つ例を提供しています。SageMaker AI Apache Spark ライブラリの詳細については、「[Amazon SageMaker AI での Apache Spark](apache-spark.md)」を参照してください。

**Spark for Scala をダウンロードする**

Python Spark (PySpark) と Scala ライブラリのソースコードと例はともに、[SageMaker AI Spark](https://github.com/aws/sagemaker-spark) GitHub リポジトリからダウンロードできます。

SageMaker AI Spark ライブラリのインストール方法の詳細については、「[SageMaker Spark](https://github.com/aws/sagemaker-spark/tree/master/sagemaker-spark-sdk)」を参照してください。

SageMaker AI Spark SDK for Scala は Maven の一元化されたリポジトリで提供されています。プロジェクトに Spark ライブラリを追加するため、`pom.xml` ファイルに次の依存関係を追加します。
+  プロジェクトが Maven で構築されている場合は、pom.xml ファイルに以下を追加します。

  ```
  <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>sagemaker-spark_2.11</artifactId>
      <version>spark_2.2.0-1.0</version>
  </dependency>
  ```
+ プロジェクトが Spark 2.1 を基盤としている場合は、pom.xml ファイルに以下を追加します。

  ```
  <dependency>
      <groupId>com.amazonaws</groupId>
      <artifactId>sagemaker-spark_2.11</artifactId>
      <version>spark_2.1.1-1.0</version>
  </dependency>
  ```

**Spark for Scala の例**

このセクションには、SageMaker AI が提供する Apache Spark Scala ライブラリを使用したサンプルコードが用意されており、Spark クラスター内で `DataFrame` を使って SageMaker AI でモデルをトレーニングできます。その後、[Apache Spark を使う Amazon SageMaker AI でのモデルトレーニングとホスティングにカスタムアルゴリズムを使用する](apache-spark-example1-cust-algo.md) の方法と、[Spark Pipeline で SageMakerEstimator を使用する](apache-spark-example1-extend-pipeline.md) の方法の例を提供しています。

また、このサンプルは SageMaker AI ホスティングサービスを使用して、結果のモデルアーティファクトをホストします。この例の詳細については、「[開始方法: SageMaker AI Spark SDK を使用した SageMaker AI での K-Means クラスタリング](https://github.com/aws/sagemaker-spark?tab=readme-ov-file#getting-started-k-means-clustering-on-sagemaker-with-sagemaker-spark-sdk)」を参照してください。具体的には、この例では次を実行します。
+ `KMeansSageMakerEstimator` を使用してデータに対してモデルの適合 (またはトレーニング) を行います。

  この例では SageMaker AI 提供の k-means アルゴリズムを使ってモデルをトレーニングするため、`KMeansSageMakerEstimator` を使います。モデルは (MNIST データセットの) 手書きの 1 桁の数字の画像を使用してトレーニングを行います。画像は入力 `DataFrame` として指定します。利用しやすいように、SageMaker AI はこのデータセットを Amazon S3 バケットで提供しています。

  レスポンスとして、推定器は `SageMakerModel` オブジェクトを返します。
+ トレーニング済みの `SageMakerModel` を使用して推論を取得する

  SageMaker AI でホストされているモデルから推論を取得するには、`SageMakerModel.transform` メソッドを呼び出します。入力として `DataFrame` を渡します。このメソッドは入力 `DataFrame` を、そのモデルから取得した推論が含まれる別の `DataFrame` に変換します。

  推論は、入力した指定の手書きの 1 桁の数字の画像が属するクラスターを特定します。詳細については、「[K-Means アルゴリズム](k-means.md)」を参照してください。

```
import org.apache.spark.sql.SparkSession
import com.amazonaws.services.sagemaker.sparksdk.IAMRole
import com.amazonaws.services.sagemaker.sparksdk.algorithms
import com.amazonaws.services.sagemaker.sparksdk.algorithms.KMeansSageMakerEstimator

val spark = SparkSession.builder.getOrCreate

// load mnist data as a dataframe from libsvm
val region = "us-east-1"
val trainingData = spark.read.format("libsvm")
  .option("numFeatures", "784")
  .load(s"s3://sagemaker-sample-data-$region/spark/mnist/train/")
val testData = spark.read.format("libsvm")
  .option("numFeatures", "784")
  .load(s"s3://sagemaker-sample-data-$region/spark/mnist/test/")

val roleArn = "arn:aws:iam::account-id:role/rolename"

val estimator = new KMeansSageMakerEstimator(
  sagemakerRole = IAMRole(roleArn),
  trainingInstanceType = "ml.p2.xlarge",
  trainingInstanceCount = 1,
  endpointInstanceType = "ml.c4.xlarge",
  endpointInitialInstanceCount = 1)
  .setK(10).setFeatureDim(784)

// train
val model = estimator.fit(trainingData)

val transformedData = model.transform(testData)
transformedData.show
```

このコード例では、以下を行います。
+ SageMaker AI 提供の S3 バケットから MNIST データセット (`awsai-sparksdk-dataset`) を Spark `DataFrame` (`mnistTrainingDataFrame`) にロードする。

  ```
  // Get a Spark session.
  
  val spark = SparkSession.builder.getOrCreate
  
  // load mnist data as a dataframe from libsvm
  val region = "us-east-1"
  val trainingData = spark.read.format("libsvm")
    .option("numFeatures", "784")
    .load(s"s3://sagemaker-sample-data-$region/spark/mnist/train/")
  val testData = spark.read.format("libsvm")
    .option("numFeatures", "784")
    .load(s"s3://sagemaker-sample-data-$region/spark/mnist/test/")
  
  val roleArn = "arn:aws:iam::account-id:role/rolename"
  trainingData.show()
  ```

  `show` メソッドは、データフレーム内の最初の 20 行のデータを表示します。

  ```
  +-----+--------------------+
  |label|            features|
  +-----+--------------------+
  |  5.0|(784,[152,153,154...|
  |  0.0|(784,[127,128,129...|
  |  4.0|(784,[160,161,162...|
  |  1.0|(784,[158,159,160...|
  |  9.0|(784,[208,209,210...|
  |  2.0|(784,[155,156,157...|
  |  1.0|(784,[124,125,126...|
  |  3.0|(784,[151,152,153...|
  |  1.0|(784,[152,153,154...|
  |  4.0|(784,[134,135,161...|
  |  3.0|(784,[123,124,125...|
  |  5.0|(784,[216,217,218...|
  |  3.0|(784,[143,144,145...|
  |  6.0|(784,[72,73,74,99...|
  |  1.0|(784,[151,152,153...|
  |  7.0|(784,[211,212,213...|
  |  2.0|(784,[151,152,153...|
  |  8.0|(784,[159,160,161...|
  |  6.0|(784,[100,101,102...|
  |  9.0|(784,[209,210,211...|
  +-----+--------------------+
  only showing top 20 rows
  ```

  各行の構成は次の通りです。
  + `label` 列は画像のラベルを特定します。例えば、手書きの数字の画像が 5 桁の場合、ラベルの値は 5 になります。
  + `features` 列には `org.apache.spark.ml.linalg.Vector` 値のベクトル (`Double`) が格納されます。これらは手書きの数字の 784 の特徴です (各手書きの数字は 28 x 28 ピクセルの画像で、784 の特徴があります)。
+ SageMaker AI 推定ツール (`KMeansSageMakerEstimator`) を作成する 

  この推定ツールの `fit` メソッドは、SageMaker AI 提供の k-means アルゴリズムを使用し、入力 `DataFrame` を使ってモデルをトレーニングします。レスポンスとして、推論の取得に使用できる `SageMakerModel` オブジェクトを返します。
**注記**  
`KMeansSageMakerEstimator` は SageMaker AI の `SageMakerEstimator` を拡張し、これが Apache Spark の `Estimator` を拡張します。

  ```
  val estimator = new KMeansSageMakerEstimator(
    sagemakerRole = IAMRole(roleArn),
    trainingInstanceType = "ml.p2.xlarge",
    trainingInstanceCount = 1,
    endpointInstanceType = "ml.c4.xlarge",
    endpointInitialInstanceCount = 1)
    .setK(10).setFeatureDim(784)
  ```

   

  コンストラクターのパラメータが、SageMaker AI でのモデルのトレーニングとデプロイに使用される情報を示します。
  + `trainingInstanceType` と `trainingInstanceCount` - モデルトレーニングに使用される ML コンピューティングインスタンスのタイプと数を指定します。
  + `endpointInstanceType` - SageMaker AI にモデルをホストする際に使用する ML コンピューティングインスタンスタイプを指定します。デフォルトでは、1 つの ML コンピューティングインスタンスが引き受けられます。
  + `endpointInitialInstanceCount` - SageMaker AI 内でモデルをホストするエンドポイントを最初にバックアップする ML コンピューティングインスタンスの数を指定します。
  + `sagemakerRole` - SageMaker AI はこの IAM ロールを継承し、代理でタスクを実行します。たとえば、モデルのトレーニングの場合、S3 からデータを読み取り、トレーニングの結果 (モデルアーティファクト) を S3 に書き込みます。
**注記**  
この例では暗黙的に SageMaker AI クライアントを作成します。このクライアントを作成するには、認証情報を指定する必要があります。API は、これらの認証情報を使って、SageMaker AI へのリクエストを認証します。例えば、認証情報を使って、トレーニングジョブ作成のリクエストや、SageMaker AI ホスティングサービスを使うモデルのデプロイのための API コールを認証します。
  + `KMeansSageMakerEstimator` オブジェクトが作成された後、モデルのトレーニングで使用される次のパラメータを設定します。
    + k-means アルゴリズムによってモデルのトレーニング中に作成される必要があるクラスターの数。0 ～ 9 のそれぞれの桁に 1 つ、合計で 10 のクラスターを指定します。
    + 各入力画像に 784 の特徴があることを特定します (各手書きの数字は 28 x 28 ピクセルの画像で、784 の特徴があります)。
+ 推定器の `fit` メソッドを呼び出す

  ```
  // train
  val model = estimator.fit(trainingData)
  ```

  入力 `DataFrame` をパラメータとして渡します。このモデルは、モデルのトレーニングと SageMaker AI へのデプロイの、すべての処理を行います。詳細については、[Apache Spark アプリケーションを SageMaker AI と統合する](apache-spark.md#spark-sdk-common-process) を参照してください。レスポンスで `SageMakerModel` オブジェクトを取得します。これは、SageMaker AI にデプロイされたモデルからの推論の取得に使うことができます。

  入力 `DataFrame` のみを指定します。モデルのトレーニングに使用される k-means アルゴリズムへのレジストリパスについては、`KMeansSageMakerEstimator` はすでに把握しているため、指定する必要はありません。
+ `SageMakerModel.transform` メソッドを呼び出し、SageMaker AI にデプロイされたモデルから推論を取得します。

  `transform` メソッドは入力として `DataFrame` を取り、それを変換し、そのモデルから取得した推論が含まれる別の `DataFrame` を返します。

  ```
  val transformedData = model.transform(testData)
  transformedData.show
  ```

  わかりやすいように、この例でモデルのトレーニングに使用した同じ `DataFrame` を `transform` の入力として使用します。`transform` メソッドは、次のような処理を実行します。
  + 入力 `DataFrame` 内の `features` 列を protobuf でシリアル化し、それを推論のために SageMaker AI エンドポイントに送信します。
  + プロトコルバッファーのレスポンスを、変換された `distance_to_cluster` 内の追加の 2 列 (`closest_cluster` と `DataFrame`) に逆シリアル化します。

  `show` メソッドは入力 `DataFrame` 内の最初の 20 行の推論を取得します。

  ```
  +-----+--------------------+-------------------+---------------+
  |label|            features|distance_to_cluster|closest_cluster|
  +-----+--------------------+-------------------+---------------+
  |  5.0|(784,[152,153,154...|  1767.897705078125|            4.0|
  |  0.0|(784,[127,128,129...|  1392.157470703125|            5.0|
  |  4.0|(784,[160,161,162...| 1671.5711669921875|            9.0|
  |  1.0|(784,[158,159,160...| 1182.6082763671875|            6.0|
  |  9.0|(784,[208,209,210...| 1390.4002685546875|            0.0|
  |  2.0|(784,[155,156,157...|  1713.988037109375|            1.0|
  |  1.0|(784,[124,125,126...| 1246.3016357421875|            2.0|
  |  3.0|(784,[151,152,153...|  1753.229248046875|            4.0|
  |  1.0|(784,[152,153,154...|  978.8394165039062|            2.0|
  |  4.0|(784,[134,135,161...|  1623.176513671875|            3.0|
  |  3.0|(784,[123,124,125...|  1533.863525390625|            4.0|
  |  5.0|(784,[216,217,218...|  1469.357177734375|            6.0|
  |  3.0|(784,[143,144,145...|  1736.765869140625|            4.0|
  |  6.0|(784,[72,73,74,99...|   1473.69384765625|            8.0|
  |  1.0|(784,[151,152,153...|    944.88720703125|            2.0|
  |  7.0|(784,[211,212,213...| 1285.9071044921875|            3.0|
  |  2.0|(784,[151,152,153...| 1635.0125732421875|            1.0|
  |  8.0|(784,[159,160,161...| 1436.3162841796875|            6.0|
  |  6.0|(784,[100,101,102...| 1499.7366943359375|            7.0|
  |  9.0|(784,[209,210,211...| 1364.6319580078125|            6.0|
  +-----+--------------------+-------------------+---------------+
  ```

  次のようにデータを解釈できます。
  + `label` が 5 の手書き数字は、クラスター 4 (`closest_cluster`) に属します。
  + `label` が 0 の手書き数字は、クラスター 5 に属します。
  + `label` が 4 の手書き数字は、クラスター 9 に属します。
  + `label` が 1 の手書き数字は、クラスター 6 に属します。

**Topics**
+ [Apache Spark を使う Amazon SageMaker AI でのモデルトレーニングとホスティングにカスタムアルゴリズムを使用する](apache-spark-example1-cust-algo.md)
+ [Spark Pipeline で SageMakerEstimator を使用する](apache-spark-example1-extend-pipeline.md)

# Apache Spark を使う Amazon SageMaker AI でのモデルトレーニングとホスティングにカスタムアルゴリズムを使用する
<a name="apache-spark-example1-cust-algo"></a>

[SageMaker AI Spark for Scala の例](apache-spark-example1.md)では、モデルのトレーニングに Amazon SageMaker AI が提供する k-means アルゴリズムを使用しているため、`kMeansSageMakerEstimator` を使用します。代わりに、モデルのトレーニングに独自のカスタムアルゴリズムを使用してもかまいません。Docker イメージが作成済みである場合、独自の `SageMakerEstimator` を作成でき、カスタムイメージの Amazon Elastic Container Registry パスを指定できます。

次のサンプルは、`SageMakerEstimator` から `KMeansSageMakerEstimator` を作成する方法を示しています。新しい推定器で、トレーニングや推論のコードイメージへの Docker レジストリパスを明示的に指定します。

```
import com.amazonaws.services.sagemaker.sparksdk.IAMRole
import com.amazonaws.services.sagemaker.sparksdk.SageMakerEstimator
import com.amazonaws.services.sagemaker.sparksdk.transformation.serializers.ProtobufRequestRowSerializer
import com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.KMeansProtobufResponseRowDeserializer

val estimator = new SageMakerEstimator(
  trainingImage =
    "811284229777.dkr.ecr.us-east-1.amazonaws.com/kmeans:1",
  modelImage =
    "811284229777.dkr.ecr.us-east-1.amazonaws.com/kmeans:1",
  requestRowSerializer = new ProtobufRequestRowSerializer(),
  responseRowDeserializer = new KMeansProtobufResponseRowDeserializer(),
  hyperParameters = Map("k" -> "10", "feature_dim" -> "784"),
  sagemakerRole = IAMRole(roleArn),
  trainingInstanceType = "ml.p2.xlarge",
  trainingInstanceCount = 1,
  endpointInstanceType = "ml.c4.xlarge",
  endpointInitialInstanceCount = 1,
  trainingSparkDataFormat = "sagemaker")
```

コード内の `SageMakerEstimator` コンストラクターのパラメータは次のとおりです。
+ `trainingImage` - カスタムコードが含まれるトレーニングイメージへの Docker レジストリパスを特定します。
+ `modelImage` - 推論コードが含まれるイメージへの Docker レジストリパスを特定します。
+ `requestRowSerializer` - `com.amazonaws.services.sagemaker.sparksdk.transformation.RequestRowSerializer` を実装します。

  このパラメータは入力 `DataFrame` 内の行をシリアル化して、推論のためにそれを SageMaker AI にホストされたモデルに送信します。
+ `responseRowDeserializer` - 次を実装します。

  `com.amazonaws.services.sagemaker.sparksdk.transformation.ResponseRowDeserializer`.

  このパラメータは、SageMaker AI にホストされたモデルからのレスポンスを逆シリアル化し、`DataFrame` に戻します。
+ `trainingSparkDataFormat` - トレーニングデータを `DataFrame` から S3 にアップロードするときに Spark が使うデータ形式を指定します。たとえば、protobuf 形式の場合は `"sagemaker"`、カンマ区切り値の場合は `"csv"`、LibSVM 形式の場合は `"libsvm"` と指定します。

独自の `RequestRowSerializer` と `ResponseRowDeserializer` を実装し、推論コードが対応しているデータ形式 (.libsvm、.csv など) から行を逆シリアル化できます。

# Spark Pipeline で SageMakerEstimator を使用する
<a name="apache-spark-example1-extend-pipeline"></a>

次の例に示すように、`org.apache.spark.ml.Estimator` 推定器と `org.apache.spark.ml.Model` モデル、および `SageMakerEstimator` 推定器と `SageMakerModel` モデルを `org.apache.spark.ml.Pipeline`パイプラインで使用できます。

```
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.PCA
import org.apache.spark.sql.SparkSession
import com.amazonaws.services.sagemaker.sparksdk.IAMRole
import com.amazonaws.services.sagemaker.sparksdk.algorithms
import com.amazonaws.services.sagemaker.sparksdk.algorithms.KMeansSageMakerEstimator

val spark = SparkSession.builder.getOrCreate

// load mnist data as a dataframe from libsvm
val region = "us-east-1"
val trainingData = spark.read.format("libsvm")
  .option("numFeatures", "784")
  .load(s"s3://sagemaker-sample-data-$region/spark/mnist/train/")
val testData = spark.read.format("libsvm")
  .option("numFeatures", "784")
  .load(s"s3://sagemaker-sample-data-$region/spark/mnist/test/")

// substitute your SageMaker IAM role here
val roleArn = "arn:aws:iam::account-id:role/rolename"

val pcaEstimator = new PCA()
  .setInputCol("features")
  .setOutputCol("projectedFeatures")
  .setK(50)

val kMeansSageMakerEstimator = new KMeansSageMakerEstimator(
  sagemakerRole = IAMRole(integTestingRole),
  requestRowSerializer =
    new ProtobufRequestRowSerializer(featuresColumnName = "projectedFeatures"),
  trainingSparkDataFormatOptions = Map("featuresColumnName" -> "projectedFeatures"),
  trainingInstanceType = "ml.p2.xlarge",
  trainingInstanceCount = 1,
  endpointInstanceType = "ml.c4.xlarge",
  endpointInitialInstanceCount = 1)
  .setK(10).setFeatureDim(50)

val pipeline = new Pipeline().setStages(Array(pcaEstimator, kMeansSageMakerEstimator))

// train
val pipelineModel = pipeline.fit(trainingData)

val transformedData = pipelineModel.transform(testData)
transformedData.show()
```

`trainingSparkDataFormatOptions` パラメータは、モデルのトレーニングを行うために、"projectedFeatures" 列をプロトコルバッファーにシリアル化するように Spark を構成します。さらに、Spark はデフォルトで "label" 列をプロトコルバッファーにシリアル化します。

"projectedFeatures" 列を使用して推論を作成するため、その列名を `ProtobufRequestRowSerializer` に渡します。

次の例は変換された `DataFrame` を示します。

```
+-----+--------------------+--------------------+-------------------+---------------+
|label|            features|   projectedFeatures|distance_to_cluster|closest_cluster|
+-----+--------------------+--------------------+-------------------+---------------+
|  5.0|(784,[152,153,154...|[880.731433034386...|     1500.470703125|            0.0|
|  0.0|(784,[127,128,129...|[1768.51722024166...|      1142.18359375|            4.0|
|  4.0|(784,[160,161,162...|[704.949236329314...|  1386.246826171875|            9.0|
|  1.0|(784,[158,159,160...|[-42.328192193771...| 1277.0736083984375|            5.0|
|  9.0|(784,[208,209,210...|[374.043902028333...|   1211.00927734375|            3.0|
|  2.0|(784,[155,156,157...|[941.267714528850...|  1496.157958984375|            8.0|
|  1.0|(784,[124,125,126...|[30.2848596410594...| 1327.6766357421875|            5.0|
|  3.0|(784,[151,152,153...|[1270.14374062052...| 1570.7674560546875|            0.0|
|  1.0|(784,[152,153,154...|[-112.10792566485...|     1037.568359375|            5.0|
|  4.0|(784,[134,135,161...|[452.068280676606...| 1165.1236572265625|            3.0|
|  3.0|(784,[123,124,125...|[610.596447285397...|  1325.953369140625|            7.0|
|  5.0|(784,[216,217,218...|[142.959601818422...| 1353.4930419921875|            5.0|
|  3.0|(784,[143,144,145...|[1036.71862533658...| 1460.4315185546875|            7.0|
|  6.0|(784,[72,73,74,99...|[996.740157435754...| 1159.8631591796875|            2.0|
|  1.0|(784,[151,152,153...|[-107.26076167417...|   960.963623046875|            5.0|
|  7.0|(784,[211,212,213...|[619.771820430940...|   1245.13623046875|            6.0|
|  2.0|(784,[151,152,153...|[850.152101817161...|  1304.437744140625|            8.0|
|  8.0|(784,[159,160,161...|[370.041887230547...| 1192.4781494140625|            0.0|
|  6.0|(784,[100,101,102...|[546.674328209335...|    1277.0908203125|            2.0|
|  9.0|(784,[209,210,211...|[-29.259112927426...| 1245.8182373046875|            6.0|
+-----+--------------------+--------------------+-------------------+---------------+
```