

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# PyTorch トレーニングスクリプトを変更する
<a name="model-parallel-customize-training-script-pt"></a>

このセクションでは、PyTorch トレーニングスクリプトを変更して、自動パーティショニングおよび手動パーティショニングに使用する SageMaker モデル並列処理ライブラリを設定する方法について学習します。

**注記**  
ライブラリでサポートされている PyTorch のバージョンを確認するには、「[サポートされているフレームワークと AWS リージョン](distributed-model-parallel-support.md)」を参照してください。

**ヒント**  
SageMaker モデル並列処理ライブラリで PyTorch トレーニングスクリプトを使用する方法を示すエンドツーエンドのノートブックサンプルについては、「[Amazon SageMaker AI モデル並列処理ライブラリ v1 の例](distributed-model-parallel-examples.md)」を参照してください。

自動パーティショニングはデフォルトで有効になっています。別に指定されていない限り、次のスクリプトでは自動パーティショニングが使用されます。

**Topics**
+ [PyTorch による自動分割](#model-parallel-customize-training-script-pt-16)
+ [PyTorch による手動分割](#model-parallel-customize-training-script-pt-16-hvd)
+ [考慮事項](#model-parallel-pt-considerations)
+ [サポートされていないフレームワーク機能](#model-parallel-pt-unsupported-features)

## PyTorch による自動分割
<a name="model-parallel-customize-training-script-pt-16"></a>

SageMaker のモデル並列処理ライブラリで PyTorch トレーニングスクリプトを実行するには、次のトレーニングスクリプトの変更が必要です。

1. ライブラリをインポートし、[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) で初期化します。

1. モデルを [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedModel](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedModel) でラップします。基盤となる `nn.Module` オブジェクトの `forward` メソッドから返されたテンソルはモデル並列デバイス間でブロードキャストされるため、通信オーバーヘッドが発生します。したがって、call メソッド範囲外の不要なテンソル (中間アクティベーションなど) は返さないように注意してください。
**注記**  
FP16 のトレーニングでは、[smdistributed.modelparallel.torch.model\$1creation()](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html) コンテキストマネージャーを使用してモデルをラップする必要があります。詳細については、「[モデル並列処理による FP16 トレーニング](model-parallel-extended-features-pytorch-fp16.md)」を参照してください。

1. オプティマイザを [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer) でラップします。
**注記**  
FP16 トレーニングでは、静的または動的な損失スケーリングを設定する必要があります。詳細については、「[モデル並列処理による FP16 トレーニング](model-parallel-extended-features-pytorch-fp16.md)」を参照してください。

1. ユーザーモデルの代わりに、返された `DistributedModel` オブジェクトを使用します。

1. フォワードおよびバックワードロジックをステップ関数に入れ、[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) で修飾します。

1. `torch.cuda.set_device(smp.local_rank())` を使って、各プロセスをそれ固有のデバイスに制限します。

1. `smp.step` 呼び出しの前に `.to()` APIを使って入力テンソルを GPU に移動します (次の例を参照)。

1. `torch.Tensor.backward` と `torch.autograd.backward` を `DistributedModel.backward` に置き換えます。

1. `reduce_mean` などの [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) メソッドを使って、マイクロバッチ全体の出力に対して後処理を実行します。

1. 評価ステップがある場合は、同様にフォワードロジックを `smp.step` で修飾された関数内に配置し、[`StepOutput` API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) を使って出力を後処理します。

1. `DataLoader` で `drop_last=True` を設定します。または、バッチサイズがマイクロバッチ数で割り切れない場合は、トレーニングループ内でバッチを手動でスキップします。

SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

```
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets

import smdistributed.modelparallel.torch as smp

class GroupedNet(nn.Module):
    def __init__(self):
        super(GroupedNet, self).__init__()
        # define layers

    def forward(self, x):
        # define forward pass and return model outputs


# smdistributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, data, target):
    output = model(data)
    loss = F.nll_loss(output, target, reduction="mean")
    model.backward(loss)
    return output, loss


def train(model, device, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # smdistributed: Move input tensors to the GPU ID used by the current process,
        # based on the set_device call.
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        # Return value, loss_mb is a StepOutput object
        _, loss_mb = train_step(model, data, target)

        # smdistributed: Average the loss across microbatches.
        loss = loss_mb.reduce_mean()

        optimizer.step()

# smdistributed: initialize the backend
smp.init()

# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")

# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
dataset = datasets.MNIST("../data", train=True, download=False)

# smdistributed: Shard the dataset based on data-parallel ranks
if smp.dp_size() > 1:
    partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
    dataset = SplitDataset(dataset, partitions=partitions_dict)
    dataset.select(f"{smp.dp_rank()}")

# smdistributed: Set drop_last=True to ensure that batch size is always divisible
# by the number of microbatches
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

model = GroupedNet()
optimizer = optim.Adadelta(model.parameters(), lr=4.0)

# smdistributed: Use the DistributedModel container to provide the model
# to be partitioned across different ranks. For the rest of the script,
# the returned DistributedModel object should be used in place of
# the model provided for DistributedModel class instantiation.
model = smp.DistributedModel(model)
optimizer = smp.DistributedOptimizer(optimizer)

train(model, device, train_loader, optimizer)
```

## PyTorch による手動分割
<a name="model-parallel-customize-training-script-pt-16-hvd"></a>

[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer) コンテキストマネージャーを使って、特定のデバイスにモジュールを配置します。どの `smp.partition` コンテキストにも配置されていないモジュールは、`default_partition` に配置されます。`auto_partition` が `False` に設定されている場合は、`default_partition` を指定する必要があります。特定の `smp.partition` コンテキスト内に作成されたモジュールは、対応するパーティションに配置されます。

SageMaker のモデル並列処理ライブラリ API の詳細については、「[API ドキュメント](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)」を参照してください。

```
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets

import smdistributed.modelparallel.torch as smp

class GroupedNet(nn.Module):
    def __init__(self):
        super(GroupedNet, self).__init__()
        with smp.partition(0):
            # define child modules on device 0
        with smp.partition(1):
            # define child modules on device 1

    def forward(self, x):
        # define forward pass and return model outputs


# smdistributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, data, target):
    output = model(data)
    loss = F.nll_loss(output, target, reduction="mean")
    model.backward(loss)
    return output, loss


def train(model, device, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # smdistributed: Move input tensors to the GPU ID used by the current process,
        # based on the set_device call.
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        # Return value, loss_mb is a StepOutput object
        _, loss_mb = train_step(model, data, target)

        # smdistributed: Average the loss across microbatches.
        loss = loss_mb.reduce_mean()

        optimizer.step()

# smdistributed: initialize the backend
smp.init()

# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")

# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
dataset = datasets.MNIST("../data", train=True, download=False)

# smdistributed: Shard the dataset based on data-parallel ranks
if smp.dp_size() > 1:
    partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
    dataset = SplitDataset(dataset, partitions=partitions_dict)
    dataset.select(f"{smp.dp_rank()}")

# smdistributed: Set drop_last=True to ensure that batch size is always divisible
# by the number of microbatches
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

model = GroupedNet()
optimizer = optim.Adadelta(model.parameters(), lr=4.0)

# smdistributed: Use the DistributedModel container to provide the model
# to be partitioned across different ranks. For the rest of the script,
# the returned DistributedModel object should be used in place of
# the model provided for DistributedModel class instantiation.
model = smp.DistributedModel(model)
optimizer = smp.DistributedOptimizer(optimizer)

train(model, device, train_loader, optimizer)
```

## 考慮事項
<a name="model-parallel-pt-considerations"></a>

SageMaker のモデル並列処理ライブラリを使用して PyTorch トレーニングスクリプトを設定する場合、次の点に注意する必要があります。
+ グローバル勾配ノルム (例えば、LAMB オプティマイザの一部のバリアントやグローバル勾配クリッピングなど、モデル全体からの勾配ノルム) に依存する最適化手法を使っている場合は、正確性確保のために、モデルパーティション全体のすべてのノルムを収集する必要があります。これを行うには、ライブラリの通信基本データタイプを使用できます。
+ モデル内の `nn.Modules` の forward メソッドに渡すすべての `torch.Tensor` 引数は、モジュール出力の計算に使う必要があります。つまり、ライブラリは、モジュール出力が依存しないモジュールに渡す `torch.Tensor` 引数が存在するケースをサポートしていません。
+ `smp.DistributedModel.backward()` 呼び出しに渡す引数は、すべてのモデル出力に依存する必要があります。つまり、`smp.DistributedModel.backward` 呼び出しに供給されるテンソルの計算で使用されない `smp.DistributedModel.forward` 呼び出しからの出力はあり得ません。
+ コードに `torch.cuda.synchronize()` 呼び出しがある場合は、同期呼び出しの直前に `torch.cuda.set_device(smp.local_rank())` を呼び出す必要がある場合があります。そうしないと、デバイス 0 に不要な CUDA コンテキストが作成され、メモリを不必要に消費する場合があります。
+ ライブラリは `nn.Modules` を異なるデバイスに配置するため、モデル内のモジュールは、`smp.step` 内で変更されるグローバル状態に依存してはいけません。トレーニング中に固定されたままの状態や、すべてのプロセスに表示される方法で `smp.step` の外部で変更される状態は、すべて許可されます。
+ ライブラリを使う場合、GPU にモデルを移動する必要はありません (例えば、`model.to(device)` を使って)。モデルがパーティショニングされる前 (最初の `smp.step` 呼び出しの前) にモデルを GPU に移動しようとすると、move 呼び出しは無視されます。ライブラリは、ランクに割り当てられたモデルの一部を GPU に自動的に移動します。ライブラリによるトレーニングが開始したら、モデルを CPU に移動して使用しないでください。モデルにはモジュールの正しいパラメータがなく、プロセスによって保持されているパーティションに割り当てられないためです。モデル並列処理ライブラリを使用してトレーニングした後、モデルを再トレーニングしたり、ライブラリを使用せずに推論に使用したりする場合は、チェックポイント API を使ってモデル全体を保存し、通常の PyTorch モジュールにロードし直すことをお勧めします。
+ あるモジュールの出力が別のモジュールにフィードされるようなモジュールのリストがある場合、そのリストを `nn.Sequential` に置き換えると、パフォーマンスを大幅に向上させることができます。
+ 重みの更新 (`optimizer.step()`) は、`smp.step` の外部で実行される必要があります。このときに、バックワードパス全体が終了し、勾配が使用可能になるためです。モデル並列処理とデータ並列処理を実行するハイブリッドモデルを使用する場合、この時点で勾配の AllReduce も終了することが保証されます。
+ ライブラリをデータ並列処理と組み合わせて使用する場合、ステップに参加していないランクを待機して AllReduce がハングしないように、すべてのデータ並列ランクのバッチ数が同じになるようにしてください。
+ ml.p4d インスタンスタイプ (ml.p4d.24xlarge など) を使ってトレーニングジョブを起動する場合は、データローダー変数 `num_workers=0` を設定する必要があります。例えば、`DataLoader` を次のように定義できます。

  ```
  dataloader = torch.utils.data.DataLoader(
              data,
              batch_size=batch_size,
              num_workers=0,
              pin_memory=True,
              drop_last=True,
              shuffle=shuffle,
          )
  ```
+ `smp.step` への入力は、`DataLoader` によって生成されたモデル入力である必要があります。これは、`smp.step` が内部で入力テンソルをバッチディメンションに沿って分割し、パイプライン化するためです。つまり、`DataLoader` 自体を `smp.step` 関数に渡して、内部でモデル入力を生成しようとしてもうまく行きません。

  例えば、`DataLoader` を次のように定義する場合:

  ```
  train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)
  ```

  `train_loader` によって生成されたモデル入力にアクセスし、それらを `smp.step` で修飾された関数に渡してください。`train_loader` を `smp.step` 関数に直接渡さないでください。

  ```
  def train(model, device, train_loader, optimizer):
      model.train()
      for batch_idx, (data, target) in enumerate(train_loader):
          ...
          _, loss_mb = train_step(model, data, target)
          ...
  
  @smp.step
  def train_step(model, data, target):
      ...
      return output, loss
  ```
+ `smp.step` への入力テンソルは、`.to()` API を使って現在のデバイスに移動する必要があります。これは、`torch.cuda.set_device(local_rank())` 呼び出しの後に実行する必要があります。

  例えば、`train` 関数を次のように定義します。この関数は、`.to()` API を使って現在のデバイスに `data` と `target` を追加してから、これらの入力テンソルを使って `train_step` を呼び出します。

  ```
  def train(model, device, train_loader, optimizer):
      model.train()
      for batch_idx, (data, target) in enumerate(train_loader):
          # smdistributed: Move input tensors to the GPU ID used by the current process,
          # based on the set_device call.
          data, target = data.to(device), target.to(device)
          optimizer.zero_grad()
          # Return value, loss_mb is a StepOutput object
          _, loss_mb = train_step(model, data, target)
  
          # smdistributed: Average the loss across microbatches.
          loss = loss_mb.reduce_mean()
  
          optimizer.step()
  ```

  この `smp.set` で修飾された関数への入力テンソルは、上記の `train` 関数で現在のデバイスに移動されました。モデルを現在のデバイスに移動する必要はありません**。ライブラリは、ランクに割り当てられたモデルの一部を GPU に自動的に移動します。

  ```
  @smp.step
  def train_step(model, data, target):
      output = model(data)
      loss = F.nll_loss(output, target, reduction="mean")
      model.backward(loss)
      return output, loss
  ```

## サポートされていないフレームワーク機能
<a name="model-parallel-pt-unsupported-features"></a>

次の PyTorch 機能は、SageMaker のモデル並列処理ライブラリではサポートされていません。
+ ネイティブの [PyTorch DDP](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) のデータ並列処理を使う場合、[https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html) ラッパーモジュールは、ライブラリでサポートされていません。ライブラリは、パラメータブロードキャストや勾配 AllReduce など、PyTorch DDP との統合を内部的に管理しています。ライブラリを使う場合、モジュールバッファはトレーニングの開始時に 1 回だけブロードキャストされます。モデルにモジュールバッファがあり、各ステップでデータ並列グループ間で同期させる必要がある場合は、`smp.get_dp_process_group()` で取得できるプロセスグループを使って、`torch.distributed` API を使ってそれを行えます。
+ 混合精度トレーニングでは、`apex.amp` モジュールはサポートされていません。自動混合精度でライブラリを使う推奨方法は、`torch.cuda.amp` を使用することです。ただし、Torch の実装の代わりに `smp.amp.GradScaler` を使用することを除きます。
+ `torch.jit.ScriptModules` または `ScriptFunctions` は、`smp.DistributedModel` ではサポートされていません。
+ `apex` : `apex` からの `FusedLayerNorm`、`FusedAdam`、`FusedLAMB`、`FusedNovoGrad` はサポートされていません。代わりに、`smp.optimizers` および `smp.nn` API を介してこれらのライブラリ実装を使用できます。