

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# ステップ 1: SageMaker の分散モデル並列ライブラリを使用して独自のトレーニングスクリプトを変更する
<a name="model-parallel-customize-training-script"></a>

このセクションでは、Amazon SageMaker AI モデル並列処理ライブラリのコア機能を使用するためにトレーニングスクリプトをカスタマイズする方法を学習します。ライブラリ固有の API 関数とパラメータを使用するには、このドキュメントを『*SageMaker Python SDK ドキュメント*』にある「[SageMaker モデル並列ライブラリ API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」と共に使用することをお勧めします。

これらのセクションで提供されるトレーニングスクリプトの例は簡略化されており、ライブラリを使うために必要な変更を強調するように考えられています。SageMaker モデル並列処理ライブラリと共に TensorFlow または PyTorch トレーニングスクリプトを使用する方法を示す、エンドツーエンドで実行可能なノートブックサンプルについては、「[Amazon SageMaker AI モデル並列処理ライブラリ v2 の例](distributed-model-parallel-v2-examples.md)」を参照してください。

**Topics**
+ [SageMaker モデル並列処理ライブラリを使用してトレーニングスクリプトのモデルを分割する](#model-parallel-model-splitting-using-smp-lib)
+ [TensorFlow トレーニングスクリプトを変更する](model-parallel-customize-training-script-tf.md)
+ [PyTorch トレーニングスクリプトを変更する](model-parallel-customize-training-script-pt.md)

## SageMaker モデル並列処理ライブラリを使用してトレーニングスクリプトのモデルを分割する
<a name="model-parallel-model-splitting-using-smp-lib"></a>

トレーニングスクリプトを変更してモデル分割を設定するには、自動分割と手動分割の 2 つの方法があります。

### 自動モデル分割
<a name="model-parallel-automated-model-splitting"></a>

SageMaker のモデル並列処理ライブラリを使用すると、自動モデル分割 (自動モデルパーティショニングとも呼ばれる) を活用できます。****このライブラリは、メモリのバランスを取り、デバイス間の通信を最小限に抑え、パフォーマンスを最適化するパーティショニングアルゴリズムを使用します。自動パーティショニングアルゴリズムを設定して、速度やメモリを最適化できます。

また、手動でモデルを分割することもできます。モデルアーキテクチャに精通していて、モデルを効率的に分割する方法について十分に理解している場合を除き、自動モデル分割をお勧めします。

#### 仕組み
<a name="model-parallel-automated-model-splitting-how-it-works"></a>

自動パーティショニングは、最初のトレーニングステップで、`smp.step` で修飾された関数が最初に呼び出されたときに行われます。この呼び出し中、ライブラリは最初に CPU RAM 上にモデルのバージョンを構築し (GPU メモリの制限を回避するため)、次にモデルグラフを分析してパーティショニングを決定します。この決定に基づいて、各モデルパーティションが GPU にロードされた後、最初のステップが実行されます。これらの分析とパーティショニングのステップのため、最初のトレーニングステップには時間がかかる場合があります。

どちらのフレームワークでも、ライブラリは、 AWS インフラストラクチャ用に最適化された独自のバックエンドを介してデバイス間の通信を管理します。

自動パーティション設計はフレームワークの特性に適応し、ライブラリは各フレームワークでより自然な粒度レベルでパーティショニングを行います。例えば、TensorFlow では、それぞれ固有のオペレーションを異なるデバイスに割り当てできますが、PyTorch では、割り当てはモジュールレベルで行われ、各モジュールは複数のオペレーションで構成されます。次のセクションでは、各フレームワークにおける設計の詳細を確認します。

##### PyTorch による自動モデル分割
<a name="model-parallel-auto-model-split-pt"></a>

最初のトレーニングステップでは、モデル並列処理ライブラリは、モデルグラフを構築し、テンソルとパラメータの形状を決定するためのトレースステップを内部的に実行します。このトレースステップの後、ライブラリは、モデル内のネストされた `nn.Module` オブジェクトと、保存されている `nn.Parameters` の量や各 `nn.Module` の実行時間などのトレースから収集された追加データからなるツリーを構築します。

次に、ライブラリは、ルートからこのツリーをトラバースし、各 `nn.Module` をデバイスに割り当てるパーティショニングアルゴリズムを実行し、計算負荷 (モジュール実行時間で測定) とメモリ使用量 (保存されている `nn.Parameter` サイズとアクティベーションの合計で測定) のバランスを取ります。複数の `nn.Modules` が同じ `nn.Parameter` を共有する場合、同じパラメータの複数のバージョンを維持しないように、これらのモジュールは同じデバイス上に配置されます。パーティショニングが決定されると、割り当てられたモジュールと重みがデバイスにロードされます。

`smp.step` デコレータを PyTorch トレーニングスクリプトに登録する方法については、「[PyTorch による自動分割](model-parallel-customize-training-script-pt.md#model-parallel-customize-training-script-pt-16)」を参照してください。

##### TensorFlow による自動モデル分割
<a name="model-parallel-auto-model-split-tf"></a>

モデル並列処理ライブラリは、トレーニング可能な変数のサイズとグラフ構造を分析し、内部でグラフパーティショニングアルゴリズムを使用します。このアルゴリズムは、デバイス間で必要な通信量を最小化することを目的に、次の 2 つの制約条件の下での、各オペレーションに対するデバイスの割り当てを導き出します。
+ 各デバイスに保存されている変数の数のバランスを取る
+ 各デバイスで実行されるオペレーションの数のバランスを取る

`optimize` に `speed` を指定した場合 (Python SDK のモデル並列処理パラメータで)、ライブラリは各デバイスにおけるオペレーションと `tf.Variable` オブジェクトの数のバランスを取ろうとします。それ以外の場合は、`tf.Variables` の合計サイズのバランスを取ろうとします。

パーティショニングの決定が行われると、ライブラリは各デバイスの実行に必要なサブグラフのシリアル化された表現を作成し、各デバイスにインポートします。パーティショニング中、ライブラリは同じ `tf.Variable` を使用するオペレーションと、同じ Keras レイヤーの一部であるオペレーションを同じデバイス上に配置します。また、TensorFlow によって課されるコロケーション制約にも対応します。これは、例えば、`tf.Variable` を共有する 2 つの Keras レイヤーがある場合、これらのレイヤーに含まれるすべてのオペレーションは、1 つのデバイスに配置されることを意味します。

`smp.step` デコレータを PyTorch トレーニングスクリプトに登録する方法については、「[TensorFlow による自動分割](model-parallel-customize-training-script-tf.md#model-parallel-customize-training-script-tf-23)」を参照してください。

##### フレームワーク間での自動モデル分割の比較
<a name="model-parallel-auto-model-split-comparison"></a>

TensorFlow では、計算の基本ユニットは `tf.Operation` であり、TensorFlow はモデルを `tf.Operation` の有向非巡回グラフ (DAG) として表すため、モデル並列処理ライブラリはこの DAG を分割して、各ノードが 1 つのデバイスに向かうようにします。非常に重要な点は、`tf.Operation` オブジェクトにはカスタマイズ可能な属性が十分豊富にあり、すべてのモデルがそのようなオブジェクトのグラフで構成されることが保証されているという意味で、それらのオブジェクトが普遍的であるということです。

一方、PyTorch には、十分に豊富で普遍的なオペレーションに相当する概念はありません。これらの特性を持つ PyTorch で最も近い計算ユニットは `nn.Module` であり、これははるかに高い粒度レベルです。ライブラリが PyTorch でこのレベルでパーティショニングを行うのはこのためです。

### 手動モデル分割
<a name="model-parallel-manual-model-splitting"></a>

デバイス間でモデルのパーティショニング方法を手動で指定する場合は、`smp.partition` コンテキストマネージャーを使用します。手動パーティショニング用にコンテキストマネージャーを設定する方法については、以下のページを参照してください。
+ [TensorFlow による手動分割](model-parallel-customize-training-script-tf.md#model-parallel-customize-training-script-tf-manual)
+ [PyTorch による手動分割](model-parallel-customize-training-script-pt.md#model-parallel-customize-training-script-pt-16-hvd)

変更を加えた後にこのオプションを使用するには、ステップ 2 で `auto_partition` を `False` に設定し、SageMaker Python SDK のフレームワーク推定器クラスで `default_partition` を定義する必要があります。`smp.partition` コンテキストマネージャーを介してパーティションに明示的に配置されていないオペレーションは、`default_partition` で実行されます。この場合、自動分割ロジックはバイパスされ、各オペレーションはユーザーの指定に基づいて配置されます。結果のグラフ構造に基づいて、モデル並列処理ライブラリがパイプライン化された実行スケジュールを自動的に作成します。

# TensorFlow トレーニングスクリプトを変更する
<a name="model-parallel-customize-training-script-tf"></a>

このセクションでは、TensorFlow トレーニングスクリプトを変更して、自動パーティショニングおよび手動パーティショニング用に SageMaker モデル並列処理ライブラリを設定する方法を学習します。例として選んだ中には、モデルとデータのハイブリッド並列処理のための Horovod との統合例も含まれています。

**注記**  
ライブラリでサポートされている TensorFlow のバージョンを確認するには、「[サポートされているフレームワークと AWS リージョン](distributed-model-parallel-support.md)」を参照してください。

ライブラリを使うためにトレーニングスクリプトに加える必要のある変更については、「[TensorFlow による自動分割](#model-parallel-customize-training-script-tf-23)」を参照してください。

Horovod でモデルとデータのハイブリッド並列処理を使うようにトレーニングスクリプトを変更する方法については、「[ハイブリッドモデルおよびデータ並列処理のための TensorFlow および Horovod による自動分割](#model-parallel-customize-training-script-tf-2.3)」を参照してください。

手動パーティショニングを使用する場合は、「[TensorFlow による手動分割](#model-parallel-customize-training-script-tf-manual)」も確認してください。

次のトピックでは、TensorFlow モデルの自動パーティショニングおよび手動パーティショニング用の SageMaker のモデル並列処理ライブラリを設定するために使用できるトレーニングスクリプトの例を示します。

**注記**  
自動パーティショニングはデフォルトで有効になっています。特に指定がない限り、サンプルスクリプトでは自動パーティショニングを使用します。

**Topics**
+ [TensorFlow による自動分割](#model-parallel-customize-training-script-tf-23)
+ [ハイブリッドモデルおよびデータ並列処理のための TensorFlow および Horovod による自動分割](#model-parallel-customize-training-script-tf-2.3)
+ [TensorFlow による手動分割](#model-parallel-customize-training-script-tf-manual)
+ [サポートされていないフレームワーク機能](#model-parallel-tf-unsupported-features)

## TensorFlow による自動分割
<a name="model-parallel-customize-training-script-tf-23"></a>

SageMaker のモデル並列処理ライブラリを使用して TensorFlow モデルを実行するには、次のトレーニングスクリプトの変更が必要です。

1. ライブラリをインポートし、[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) で初期化します。

1. Keras Model クラスの代わりに [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html) から継承することで、Keras モデルを定義します。`smp.DistributedModel` オブジェクトの call メソッドからのモデル出力を返します。call メソッドから返されたテンソルはどれもモデル並列デバイス間でブロードキャストされるため、通信オーバーヘッドが発生します。したがって、call メソッド範囲外の不要なテンソル (中間アクティベーションなど) は返さないように注意してください。

1. `tf.Dataset.batch()` メソッドで `drop_remainder=True` を設定します。これは、バッチサイズが常にマイクロバッチ数で割り切れるようにするためです。

1. データパイプラインのランダムオペレーションを `smp.dp_rank()` を使ってシードします (例: `shuffle(ds, seed=smp.dp_rank())`)。これにより、異なるモデルパーティションを保持する GPU 間でデータサンプルの一貫性が確保されます。

1. フォワードおよびバックワードロジックをステップ関数に入れ、`smp.step` で修飾します。

1. `reduce_mean` などの [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) メソッドを使って、マイクロバッチ全体の出力に対して後処理を実行します。[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 関数には、`smp.DistributedModel` の出力に依存する戻り値が必要です。

1. 評価ステップがある場合は、同様にフォワードロジックを `smp.step` で修飾された関数内に配置し、[`StepOutput` API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) を使って出力を後処理します。

SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

次の Python スクリプトは、変更後のトレーニングスクリプトの例です。

```
import tensorflow as tf

# smdistributed: Import TF2.x API
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return the model output

model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

トレーニングスクリプトの準備が完了したら、[ステップ 2: SageMaker Python SDK を使用してトレーニングジョブを起動する](model-parallel-sm-sdk.md) に進んでください。ハイブリッドモデルとデータの並列トレーニングジョブを実行する場合は、次のセクションに進んでください。

## ハイブリッドモデルおよびデータ並列処理のための TensorFlow および Horovod による自動分割
<a name="model-parallel-customize-training-script-tf-2.3"></a>

SageMaker モデル並列処理ライブラリを Horovod と共に使用することで、モデルとデータのハイブリッド並列処理を行うことができます。ハイブリッド並列処理用にライブラリがモデルを分割する方法の詳細については、「[パイプライン並列処理 (PyTorch と TensorFlow で利用可能)](model-parallel-intro.md#model-parallel-intro-pp)」を参照してください。

このステップでは、トレーニングスクリプトを変更して、SageMaker モデル並列処理ライブラリを適応させる方法に焦点を当てます。

トレーニングスクリプトを適切に設定して、[ステップ 2: SageMaker Python SDK を使用してトレーニングジョブを起動する](model-parallel-sm-sdk.md) で設定するハイブリッド並列処理設定を適用するには、データ並列ランクとモデル並列ランクをそれぞれ自動的に検出するライブラリのヘルパー関数 `smp.dp_rank()` および `smp.mp_rank()` を使用します。

ライブラリがサポートするすべての MPI プリミティブを検索するには、『SageMaker Python SDK ドキュメント』の「[MPI の基礎](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#mpi-basics)」を参照してください。

スクリプトに必要な変更点は次のとおりです。
+ `hvd.allreduce` を追加する
+ Horovod の要求に応じて、最初のバッチの後に変数をブロードキャストする
+ データパイプラインのシャッフルやシャードのオペレーションを `smp.dp_rank()` を使ってシードする。

**注記**  
Horovod を使用するときは、トレーニングスクリプトで `hvd.init` を直接呼び出すことはできません。代わりに、[ステップ 2: SageMaker Python SDK を使用してトレーニングジョブを起動する](model-parallel-sm-sdk.md) の SageMaker Python SDK の `modelparallel` パラメータで、`"horovod"` を `True` に設定する必要があります。これにより、ライブラリはモデルパーティションのデバイス割り当てに基づいて Horovod を内部的に初期化できます。`hvd.init()` をトレーニングスクリプト内で直接呼び出すと、問題が発生する可能性があります。

**注記**  
トレーニングスクリプト内で `hvd.DistributedOptimizer` API を直接使用すると、API が黙示的に `AllReduce` オペレーションを `smp.step` 内に配置するため、トレーニングのパフォーマンスと速度が低下する可能性があります。Horovod でモデル並列処理ライブラリを使用する場合は、次の例で示すように、`smp.step` から返された勾配に対して `accumulate()` または `reduce_mean()` を呼び出した後に `hvd.allreduce` を直接呼び出すことをお勧めします。

SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

```
import tensorflow as tf
import horovod.tensorflow as hvd

# smdistributed: Import TF2.x API 
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return model outputs


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels, first_batch):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    # Horovod: AllReduce the accumulated gradients
    gradients = [hvd.allreduce(g.accumulate()) for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Horovod: Broadcast the variables after first batch 
    if first_batch:
        hvd.broadcast_variables(model.variables, root_rank=0)
        hvd.broadcast_variables(optimizer.variables(), root_rank=0)

    # smdistributed: Merge predictions across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()

    for batch, (images, labels) in enumerate(train_ds):
        loss = train_step(images, labels, tf.constant(batch == 0))
```

## TensorFlow による手動分割
<a name="model-parallel-customize-training-script-tf-manual"></a>

`smp.partition` コンテキストマネージャーを使って、特定のパーティションにオペレーションを配置します。どの `smp.partition` コンテキストにも配置されていないオペレーションは、`default_partition` に配置されます。SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

```
import tensorflow as tf

# smdistributed: Import TF2.x API.
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches.
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API.
class MyModel(smp.DistributedModel):
    def __init__(self):
         # define layers

    def call(self, x):
        with smp.partition(0):
            x = self.layer0(x)
        with smp.partition(1):
            return self.layer1(x)


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

## サポートされていないフレームワーク機能
<a name="model-parallel-tf-unsupported-features"></a>

次の TensorFlow 機能はライブラリではサポートされていません。
+ `tf.GradientTape()` は現在サポートされていません。代わりに `Optimizer.get_gradients()` または `Optimizer.compute_gradients()` を使って勾配を計算できます。
+ `tf.train.Checkpoint.restore()` API は現在サポートされていません。チェックポイントには、代わりに同じ API と機能を備えた `smp.CheckpointManager` を使用してください。`smp.CheckpointManager` によるチェックポイントの復元は、最初のステップの後で実行することに注意してください。

# PyTorch トレーニングスクリプトを変更する
<a name="model-parallel-customize-training-script-pt"></a>

このセクションでは、PyTorch トレーニングスクリプトを変更して、自動パーティショニングおよび手動パーティショニングに使用する SageMaker モデル並列処理ライブラリを設定する方法について学習します。

**注記**  
ライブラリでサポートされている PyTorch のバージョンを確認するには、「[サポートされているフレームワークと AWS リージョン](distributed-model-parallel-support.md)」を参照してください。

**ヒント**  
SageMaker モデル並列処理ライブラリで PyTorch トレーニングスクリプトを使用する方法を示すエンドツーエンドのノートブックサンプルについては、「[Amazon SageMaker AI モデル並列処理ライブラリ v1 の例](distributed-model-parallel-examples.md)」を参照してください。

自動パーティショニングはデフォルトで有効になっています。別に指定されていない限り、次のスクリプトでは自動パーティショニングが使用されます。

**Topics**
+ [PyTorch による自動分割](#model-parallel-customize-training-script-pt-16)
+ [PyTorch による手動分割](#model-parallel-customize-training-script-pt-16-hvd)
+ [考慮事項](#model-parallel-pt-considerations)
+ [サポートされていないフレームワーク機能](#model-parallel-pt-unsupported-features)

## PyTorch による自動分割
<a name="model-parallel-customize-training-script-pt-16"></a>

SageMaker のモデル並列処理ライブラリで PyTorch トレーニングスクリプトを実行するには、次のトレーニングスクリプトの変更が必要です。

1. ライブラリをインポートし、[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) で初期化します。

1. モデルを [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedModel](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedModel) でラップします。基盤となる `nn.Module` オブジェクトの `forward` メソッドから返されたテンソルはモデル並列デバイス間でブロードキャストされるため、通信オーバーヘッドが発生します。したがって、call メソッド範囲外の不要なテンソル (中間アクティベーションなど) は返さないように注意してください。
**注記**  
FP16 のトレーニングでは、[smdistributed.modelparallel.torch.model\$1creation()](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html) コンテキストマネージャーを使用してモデルをラップする必要があります。詳細については、「[モデル並列処理による FP16 トレーニング](model-parallel-extended-features-pytorch-fp16.md)」を参照してください。

1. オプティマイザを [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer) でラップします。
**注記**  
FP16 トレーニングでは、静的または動的な損失スケーリングを設定する必要があります。詳細については、「[モデル並列処理による FP16 トレーニング](model-parallel-extended-features-pytorch-fp16.md)」を参照してください。

1. ユーザーモデルの代わりに、返された `DistributedModel` オブジェクトを使用します。

1. フォワードおよびバックワードロジックをステップ関数に入れ、[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) で修飾します。

1. `torch.cuda.set_device(smp.local_rank())` を使って、各プロセスをそれ固有のデバイスに制限します。

1. `smp.step` 呼び出しの前に `.to()` APIを使って入力テンソルを GPU に移動します (次の例を参照)。

1. `torch.Tensor.backward` と `torch.autograd.backward` を `DistributedModel.backward` に置き換えます。

1. `reduce_mean` などの [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) メソッドを使って、マイクロバッチ全体の出力に対して後処理を実行します。

1. 評価ステップがある場合は、同様にフォワードロジックを `smp.step` で修飾された関数内に配置し、[`StepOutput` API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) を使って出力を後処理します。

1. `DataLoader` で `drop_last=True` を設定します。または、バッチサイズがマイクロバッチ数で割り切れない場合は、トレーニングループ内でバッチを手動でスキップします。

SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

```
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets

import smdistributed.modelparallel.torch as smp

class GroupedNet(nn.Module):
    def __init__(self):
        super(GroupedNet, self).__init__()
        # define layers

    def forward(self, x):
        # define forward pass and return model outputs


# smdistributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, data, target):
    output = model(data)
    loss = F.nll_loss(output, target, reduction="mean")
    model.backward(loss)
    return output, loss


def train(model, device, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # smdistributed: Move input tensors to the GPU ID used by the current process,
        # based on the set_device call.
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        # Return value, loss_mb is a StepOutput object
        _, loss_mb = train_step(model, data, target)

        # smdistributed: Average the loss across microbatches.
        loss = loss_mb.reduce_mean()

        optimizer.step()

# smdistributed: initialize the backend
smp.init()

# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")

# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
dataset = datasets.MNIST("../data", train=True, download=False)

# smdistributed: Shard the dataset based on data-parallel ranks
if smp.dp_size() > 1:
    partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
    dataset = SplitDataset(dataset, partitions=partitions_dict)
    dataset.select(f"{smp.dp_rank()}")

# smdistributed: Set drop_last=True to ensure that batch size is always divisible
# by the number of microbatches
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

model = GroupedNet()
optimizer = optim.Adadelta(model.parameters(), lr=4.0)

# smdistributed: Use the DistributedModel container to provide the model
# to be partitioned across different ranks. For the rest of the script,
# the returned DistributedModel object should be used in place of
# the model provided for DistributedModel class instantiation.
model = smp.DistributedModel(model)
optimizer = smp.DistributedOptimizer(optimizer)

train(model, device, train_loader, optimizer)
```

## PyTorch による手動分割
<a name="model-parallel-customize-training-script-pt-16-hvd"></a>

[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer) コンテキストマネージャーを使って、特定のデバイスにモジュールを配置します。どの `smp.partition` コンテキストにも配置されていないモジュールは、`default_partition` に配置されます。`auto_partition` が `False` に設定されている場合は、`default_partition` を指定する必要があります。特定の `smp.partition` コンテキスト内に作成されたモジュールは、対応するパーティションに配置されます。

SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

```
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets

import smdistributed.modelparallel.torch as smp

class GroupedNet(nn.Module):
    def __init__(self):
        super(GroupedNet, self).__init__()
        with smp.partition(0):
            # define child modules on device 0
        with smp.partition(1):
            # define child modules on device 1

    def forward(self, x):
        # define forward pass and return model outputs


# smdistributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, data, target):
    output = model(data)
    loss = F.nll_loss(output, target, reduction="mean")
    model.backward(loss)
    return output, loss


def train(model, device, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # smdistributed: Move input tensors to the GPU ID used by the current process,
        # based on the set_device call.
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        # Return value, loss_mb is a StepOutput object
        _, loss_mb = train_step(model, data, target)

        # smdistributed: Average the loss across microbatches.
        loss = loss_mb.reduce_mean()

        optimizer.step()

# smdistributed: initialize the backend
smp.init()

# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")

# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
dataset = datasets.MNIST("../data", train=True, download=False)

# smdistributed: Shard the dataset based on data-parallel ranks
if smp.dp_size() > 1:
    partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
    dataset = SplitDataset(dataset, partitions=partitions_dict)
    dataset.select(f"{smp.dp_rank()}")

# smdistributed: Set drop_last=True to ensure that batch size is always divisible
# by the number of microbatches
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

model = GroupedNet()
optimizer = optim.Adadelta(model.parameters(), lr=4.0)

# smdistributed: Use the DistributedModel container to provide the model
# to be partitioned across different ranks. For the rest of the script,
# the returned DistributedModel object should be used in place of
# the model provided for DistributedModel class instantiation.
model = smp.DistributedModel(model)
optimizer = smp.DistributedOptimizer(optimizer)

train(model, device, train_loader, optimizer)
```

## 考慮事項
<a name="model-parallel-pt-considerations"></a>

SageMaker のモデル並列処理ライブラリを使用して PyTorch トレーニングスクリプトを設定する場合、次の点に注意する必要があります。
+ グローバル勾配ノルム (例えば、LAMB オプティマイザの一部のバリアントやグローバル勾配クリッピングなど、モデル全体からの勾配ノルム) に依存する最適化手法を使っている場合は、正確性確保のために、モデルパーティション全体のすべてのノルムを収集する必要があります。これを行うには、ライブラリの通信基本データタイプを使用できます。
+ モデル内の `nn.Modules` の forward メソッドに渡すすべての `torch.Tensor` 引数は、モジュール出力の計算に使う必要があります。つまり、ライブラリは、モジュール出力が依存しないモジュールに渡す `torch.Tensor` 引数が存在するケースをサポートしていません。
+ `smp.DistributedModel.backward()` 呼び出しに渡す引数は、すべてのモデル出力に依存する必要があります。つまり、`smp.DistributedModel.backward` 呼び出しに供給されるテンソルの計算で使用されない `smp.DistributedModel.forward` 呼び出しからの出力はあり得ません。
+ コードに `torch.cuda.synchronize()` 呼び出しがある場合は、同期呼び出しの直前に `torch.cuda.set_device(smp.local_rank())` を呼び出す必要がある場合があります。そうしないと、デバイス 0 に不要な CUDA コンテキストが作成され、メモリを不必要に消費する場合があります。
+ ライブラリは `nn.Modules` を異なるデバイスに配置するため、モデル内のモジュールは、`smp.step` 内で変更されるグローバル状態に依存してはいけません。トレーニング中に固定されたままの状態や、すべてのプロセスに表示される方法で `smp.step` の外部で変更される状態は、すべて許可されます。
+ ライブラリを使う場合、GPU にモデルを移動する必要はありません (例えば、`model.to(device)` を使って)。モデルがパーティショニングされる前 (最初の `smp.step` 呼び出しの前) にモデルを GPU に移動しようとすると、move 呼び出しは無視されます。ライブラリは、ランクに割り当てられたモデルの一部を GPU に自動的に移動します。ライブラリによるトレーニングが開始したら、モデルを CPU に移動して使用しないでください。モデルにはモジュールの正しいパラメータがなく、プロセスによって保持されているパーティションに割り当てられないためです。モデル並列処理ライブラリを使用してトレーニングした後、モデルを再トレーニングしたり、ライブラリを使用せずに推論に使用したりする場合は、チェックポイント API を使ってモデル全体を保存し、通常の PyTorch モジュールにロードし直すことをお勧めします。
+ あるモジュールの出力が別のモジュールにフィードされるようなモジュールのリストがある場合、そのリストを `nn.Sequential` に置き換えると、パフォーマンスを大幅に向上させることができます。
+ 重みの更新 (`optimizer.step()`) は、`smp.step` の外部で実行される必要があります。このときに、バックワードパス全体が終了し、勾配が使用可能になるためです。モデル並列処理とデータ並列処理を実行するハイブリッドモデルを使用する場合、この時点で勾配の AllReduce も終了することが保証されます。
+ ライブラリをデータ並列処理と組み合わせて使用する場合、ステップに参加していないランクを待機して AllReduce がハングしないように、すべてのデータ並列ランクのバッチ数が同じになるようにしてください。
+ ml.p4d インスタンスタイプ (ml.p4d.24xlarge など) を使ってトレーニングジョブを起動する場合は、データローダー変数 `num_workers=0` を設定する必要があります。例えば、`DataLoader` を次のように定義できます。

  ```
  dataloader = torch.utils.data.DataLoader(
              data,
              batch_size=batch_size,
              num_workers=0,
              pin_memory=True,
              drop_last=True,
              shuffle=shuffle,
          )
  ```
+ `smp.step` への入力は、`DataLoader` によって生成されたモデル入力である必要があります。これは、`smp.step` が内部で入力テンソルをバッチディメンションに沿って分割し、パイプライン化するためです。つまり、`DataLoader` 自体を `smp.step` 関数に渡して、内部でモデル入力を生成しようとしてもうまく行きません。

  例えば、`DataLoader` を次のように定義する場合:

  ```
  train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)
  ```

  `train_loader` によって生成されたモデル入力にアクセスし、それらを `smp.step` で修飾された関数に渡してください。`train_loader` を `smp.step` 関数に直接渡さないでください。

  ```
  def train(model, device, train_loader, optimizer):
      model.train()
      for batch_idx, (data, target) in enumerate(train_loader):
          ...
          _, loss_mb = train_step(model, data, target)
          ...
  
  @smp.step
  def train_step(model, data, target):
      ...
      return output, loss
  ```
+ `smp.step` への入力テンソルは、`.to()` API を使って現在のデバイスに移動する必要があります。これは、`torch.cuda.set_device(local_rank())` 呼び出しの後に実行する必要があります。

  例えば、`train` 関数を次のように定義します。この関数は、`.to()` API を使って現在のデバイスに `data` と `target` を追加してから、これらの入力テンソルを使って `train_step` を呼び出します。

  ```
  def train(model, device, train_loader, optimizer):
      model.train()
      for batch_idx, (data, target) in enumerate(train_loader):
          # smdistributed: Move input tensors to the GPU ID used by the current process,
          # based on the set_device call.
          data, target = data.to(device), target.to(device)
          optimizer.zero_grad()
          # Return value, loss_mb is a StepOutput object
          _, loss_mb = train_step(model, data, target)
  
          # smdistributed: Average the loss across microbatches.
          loss = loss_mb.reduce_mean()
  
          optimizer.step()
  ```

  この `smp.set` で修飾された関数への入力テンソルは、上記の `train` 関数で現在のデバイスに移動されました。モデルを現在のデバイスに移動する必要はありません**。ライブラリは、ランクに割り当てられたモデルの一部を GPU に自動的に移動します。

  ```
  @smp.step
  def train_step(model, data, target):
      output = model(data)
      loss = F.nll_loss(output, target, reduction="mean")
      model.backward(loss)
      return output, loss
  ```

## サポートされていないフレームワーク機能
<a name="model-parallel-pt-unsupported-features"></a>

次の PyTorch 機能は、SageMaker のモデル並列処理ライブラリではサポートされていません。
+ ネイティブの [PyTorch DDP](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) のデータ並列処理を使う場合、[https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html) ラッパーモジュールは、ライブラリでサポートされていません。ライブラリは、パラメータブロードキャストや勾配 AllReduce など、PyTorch DDP との統合を内部的に管理しています。ライブラリを使う場合、モジュールバッファはトレーニングの開始時に 1 回だけブロードキャストされます。モデルにモジュールバッファがあり、各ステップでデータ並列グループ間で同期させる必要がある場合は、`smp.get_dp_process_group()` で取得できるプロセスグループを使って、`torch.distributed` API を使ってそれを行えます。
+ 混合精度トレーニングでは、`apex.amp` モジュールはサポートされていません。自動混合精度でライブラリを使う推奨方法は、`torch.cuda.amp` を使用することです。ただし、Torch の実装の代わりに `smp.amp.GradScaler` を使用することを除きます。
+ `torch.jit.ScriptModules` または `ScriptFunctions` は、`smp.DistributedModel` ではサポートされていません。
+ `apex` : `apex` からの `FusedLayerNorm`、`FusedAdam`、`FusedLAMB`、`FusedNovoGrad` はサポートされていません。代わりに、`smp.optimizers` および `smp.nn` API を介してこれらのライブラリ実装を使用できます。