

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 步骤 1：使用 SageMaker分布式模型并行库修改自己的训练脚本
<a name="model-parallel-customize-training-script"></a>

使用本节学习如何自定义训练脚本以使用 Amazon A SageMaker I 模型并行度库的核心功能。要使用特定于库的 API 函数和参数，我们建议您将本文档与 Pyth *SageMaker on SDK* 文档 APIs中的[模型并SageMaker 行库](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html)一起使用。

这些部分中提供的训练脚本示例经过简化，旨在重点介绍使用库时必须进行的更改。有关 end-to-end演示如何在 SageMaker 模型并行度库中使用 TensorFlow 或 PyTorch 训练脚本的可运行笔记本示例，请参阅。[亚马逊 SageMaker AI 模型并行度库 v2 示例](distributed-model-parallel-v2-examples.md)

**Topics**
+ [使用模型并行度库拆分训练脚本的 SageMaker 模型](#model-parallel-model-splitting-using-smp-lib)
+ [修改 TensorFlow 训练脚本](model-parallel-customize-training-script-tf.md)
+ [修改 PyTorch 训练脚本](model-parallel-customize-training-script-pt.md)

## 使用模型并行度库拆分训练脚本的 SageMaker 模型
<a name="model-parallel-model-splitting-using-smp-lib"></a>

有两种方法可以修改训练脚本以设置模型拆分：自动拆分或手动拆分。

### 自动模型拆分
<a name="model-parallel-automated-model-splitting"></a>

使用 SageMaker模型并行度库时，可以利用*自动模型拆分*，也称为*自动模型*分区。该库使用分区算法来平衡内存，最大限度地减少设备之间的通信并优化性能。您可以配置自动分区算法以优化速度或内存。

或者，您可以使用手动模型拆分。除非您非常熟悉模型架构，并且很好地掌握了如何有效地对模型进行分区的知识，否则我们建议自动拆分模型。

#### 工作原理
<a name="model-parallel-automated-model-splitting-how-it-works"></a>

自动分区发生在第一个训练步骤中，也就是首次调用 `smp.step` 修饰的函数时。在此调用期间，库首先在 CPU RAM 上构造模型的版本（以避免 GPU 内存限制），然后分析模型图形并做出分区决策。根据这个决策，每个模型分区加载到一个 GPU 上，然后才会执行第一步。由于这些分析和分区步骤，第一个训练步骤可能需要较长的时间。

在这两个框架中，库都通过自己的后端管理设备之间的通信，后端针对 AWS 基础架构进行了优化。

自动分区设计适应了框架的特性，库按照在每个框架中更自然的粒度级别进行分区。例如，在中 TensorFlow，可以将每个特定操作分配给不同的设备，而在模块级别中 PyTorch，分配是在模块级别完成的，其中每个模块由多个操作组成。以下部分介绍了每个框架中的具体设计。

##### 自动拆分模型 PyTorch
<a name="model-parallel-auto-model-split-pt"></a>

在第一个训练步骤中，模型并行性库在内部运行一个跟踪步骤，用于构造模型图形并确定张量和参数配置。在此跟踪步骤之后，库将构造一个树，该树包含模型中的嵌套 `nn.Module` 对象，以及通过跟踪收集的其他数据，例如所存储的 `nn.Parameters` 数量，以及每个 `nn.Module` 的执行时间。

接下来，库从根目录遍历此树，并运行分区算法，将每个 `nn.Module` 分配给一个设备，这可以平衡计算负载（按模块执行时间来衡量）和内存使用量（按总共存储的 `nn.Parameter` 大小和激活来衡量）。如果多个 `nn.Modules` 共享相同的 `nn.Parameter`，则将这些模块放置在相同设备上，以避免维护同一个参数的多个版本。在做出分区决策后，分配的模块和权重就会加载到它们的设备上。

有关如何将`smp.step`装饰器注册到 PyTorch 训练脚本的说明，请参阅[使用自动拆分 PyTorch](model-parallel-customize-training-script-pt.md#model-parallel-customize-training-script-pt-16)。

##### 自动拆分模型 TensorFlow
<a name="model-parallel-auto-model-split-tf"></a>

模型并行性库分析可训练变量的大小和图形结构，并在内部使用图形分区算法。该算法为每个操作提供了设备分配，目的是最大限度地减少所需的设备间通信量，但要遵守两个限制：
+ 平衡每个设备中存储的变量数量
+ 平衡每个设备中执行的操作数量

如果您为 `optimize` 指定 `speed`（在 Python SDK 的模型并行性参数中），则库会尝试平衡每个设备中的操作数和 `tf.Variable` 对象数量。否则，它会尝试平衡 `tf.Variables` 的总大小。

做出分区决策后，库会创建每个设备需要执行的子图形的串行化表示形式，并将其导入到每个设备上。在分区时，库会将使用相同 `tf.Variable` 的操作，以及属于同一 Keras 层的操作放在同一个设备上。它还尊重由 TensorFlow施加的主机托管限制。举例而言，这意味着如果有两个 Keras 层共享一个 `tf.Variable`，则属于这些层的所有操作都将放在单个设备上。

有关如何将`smp.step`装饰器注册到 PyTorch 训练脚本的说明，请参阅[使用自动拆分 TensorFlow](model-parallel-customize-training-script-tf.md#model-parallel-customize-training-script-tf-23)。

##### 各个框架的自动模型拆分的比较
<a name="model-parallel-auto-model-split-comparison"></a>

在中 TensorFlow，计算的基本单位是 a`tf.Operation`，它将模型 TensorFlow 表示为 `tf.Operation` s 的有向无环图 (DAG)，因此，模型并行度库对此 DAG 进行了分区，使每个节点都转到一台设备。至关重要的是，`tf.Operation` 对象具有足够丰富的可自定义属性，而且它们是通用的，因为每个模型都确保由此类对象的图形组成。

PyTorch 另一方面, 却没有一个足够丰富和普遍的同等行动概念. 具有这些特征的最接近的 PyTorch 计算单位是`nn.Module`，它的粒度级别要高得多，这就是库在此级别上进行分区的原因。 PyTorch

### 手动模型拆分
<a name="model-parallel-manual-model-splitting"></a>

如果要手动指定如何跨设备对模型进行分区，请使用 `smp.partition` 上下文管理器。有关如何设置上下文管理器以进行手动分区的说明，请参阅以下页面。
+ [使用手动拆分 TensorFlow](model-parallel-customize-training-script-tf.md#model-parallel-customize-training-script-tf-manual)
+ [使用手动拆分 PyTorch](model-parallel-customize-training-script-pt.md#model-parallel-customize-training-script-pt-16-hvd)

要在修改后使用此选项，在步骤 2 中，您需要在 Pyth SageMaker on SDK 的框架估算器类`default_partition`中设置`auto_partition`并定义一个。`False`任何未通过 `smp.partition` 上下文管理器明确放置在分区上的操作都将在 `default_partition` 上执行。在这种情况下，将绕过自动拆分逻辑，并根据您的规范放置每个操作。根据生成的图形结构，模型并行性库会自动创建管道执行计划。

# 修改 TensorFlow 训练脚本
<a name="model-parallel-customize-training-script-tf"></a>

在本节中，您将学习如何修改 TensorFlow 训练脚本以配置用于自动分区和手动分区的 SageMaker模型并行度库。这些示例还包括与 Horovod 集成的示例，用于混合模型和数据并行性。

**注意**  
要了解该库支持哪些 TensorFlow 版本，请参阅[支持的框架和 AWS 区域](distributed-model-parallel-support.md)。

[使用自动拆分 TensorFlow](#model-parallel-customize-training-script-tf-23)中列出了为使用库而必须对训练脚本进行的修改。

要了解如何修改训练脚本以在 Horovod 中使用混合模型和数据并行性，请参阅 [使用 TensorFlow 和 Horovod 自动拆分，实现混合模型和数据并行性](#model-parallel-customize-training-script-tf-2.3)。

如果您要使用手动分区，另请参阅 [使用手动拆分 TensorFlow](#model-parallel-customize-training-script-tf-manual)。

以下主题显示了训练脚本的示例，您可以使用这些脚本来配置自动分区和手动分区模型 SageMaker的模型并行度库。 TensorFlow

**注意**  
默认情况下启用自动分区。除非另行指定，否则示例脚本使用自动分区。

**Topics**
+ [使用自动拆分 TensorFlow](#model-parallel-customize-training-script-tf-23)
+ [使用 TensorFlow 和 Horovod 自动拆分，实现混合模型和数据并行性](#model-parallel-customize-training-script-tf-2.3)
+ [使用手动拆分 TensorFlow](#model-parallel-customize-training-script-tf-manual)
+ [不支持的框架功能](#model-parallel-tf-unsupported-features)

## 使用自动拆分 TensorFlow
<a name="model-parallel-customize-training-script-tf-23"></a>

要使用模型并行度库运行 TensorFlow 模型，需要对训练脚本进行 SageMaker以下更改：

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 导入和初始化库。

1. 通过从 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_tensorflow.html) 继承来定义 Keras 模型，而不是从 Keras 模型类继承。从 `smp.DistributedModel` 对象的调用方法返回模型输出。请注意，从调用方法返回的任何张量都将在模型并行设备之间广播，这会产生通信开销，因此在调用方法之外不需要的任何张量（例如中间激活）都不应返回。

1. 在 `tf.Dataset.batch()` 方法中设置 `drop_remainder=True`。这是为了确保批次大小始终可以被微批次数量整除。

1. 例如，使用`smp.dp_rank()`以下方法在数据管道中播种随机操作，`shuffle(ds, seed=smp.dp_rank())`以确保包含不同模型分区的数据样本的一致性。 GPUs 

1. 将向前和向后逻辑放在步进函数中，然后用 `smp.step` 进行修饰。

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) 方法（例如 `reduce_mean`）对微批次的输出进行后处理。[https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 函数必须具有一个取决于 `smp.DistributedModel` 的输出的返回值。

1. 如果有评估步骤，则同样将向前逻辑放在 `smp.step` 修饰的函数中，然后使用 [`StepOutput` API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) 对输出进行后处理。

要了解有关模型并行度库 API SageMaker 的更多信息，请参阅 AP [I](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html) 文档。

以下 Python 脚本是进行更改后的训练脚本的示例。

```
import tensorflow as tf

# smdistributed: Import TF2.x API
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return the model output

model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

如果您已准备好训练脚本，请继续到[第 2 步：使用 SageMaker Python 软件开发工具包启动训练 Job](model-parallel-sm-sdk.md)。如果要运行混合模型和数据并行训练作业，请继续到下一个部分。

## 使用 TensorFlow 和 Horovod 自动拆分，实现混合模型和数据并行性
<a name="model-parallel-customize-training-script-tf-2.3"></a>

您可以将 SageMaker 模型并行度库与 Horovod 配合使用，实现混合模型和数据并行性。要详细了解库如何拆分模型以用于混合并行性，请参阅[管道并行度（适用于 PyTorch 和） TensorFlow](model-parallel-intro.md#model-parallel-intro-pp)。

在这一步中，我们将重点介绍如何修改训练脚本以适应 SageMaker模型并行度库。

要正确设置训练脚本，以便选取要在 [第 2 步：使用 SageMaker Python 软件开发工具包启动训练 Job](model-parallel-sm-sdk.md) 中设置的混合并行度配置，请使用库的帮助程序函数 `smp.dp_rank()` 和 `smp.mp_rank()`，它们分别自动检测数据并行秩和模型并行秩。

要查找该库支持的所有 MPI 原语，请参阅 [Pyth SageMaker on SDK 文档中的 MPI 基础知识](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#mpi-basics)。

脚本中需要进行以下更改：
+ 添加 `hvd.allreduce`
+ 按照 Horovod 的要求，在第一个批次之后广播变量
+ 使用. 在数据管道中播种洗牌 and/or 分片操作。`smp.dp_rank()`

**注意**  
使用 Horovod 时，您不可在训练脚本中直接调用 `hvd.init`。相反，你必须在中的 SageMaker Python SDK `modelparallel` 参数`True`中`"horovod"`将其设置为[第 2 步：使用 SageMaker Python 软件开发工具包启动训练 Job](model-parallel-sm-sdk.md)。这使得库可以根据模型分区的设备分配，在内部初始化 Horovod。直接在训练脚本中调用 `hvd.init()` 可能会导致问题。

**注意**  
在训练脚本中直接使用 `hvd.DistributedOptimizer` API 可能会导致训练性能和速度不佳，因为 API 会隐式地将 `AllReduce` 操作放入 `smp.step` 中。我们建议您在 `smp.step` 返回的梯度上调用 `accumulate()` 或 `reduce_mean()` 后，直接调用 `hvd.allreduce`，以将模型并行性库与 Horovod 一起使用，如下例所示。

要了解有关模型并行度库 API SageMaker 的更多信息，请参阅 AP [I](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html) 文档。

```
import tensorflow as tf
import horovod.tensorflow as hvd

# smdistributed: Import TF2.x API 
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API 
class MyModel(smp.DistributedModel):
    def __init__(self):
        super(MyModel, self).__init__()
        # define layers

    def call(self, x, training=None):
        # define forward pass and return model outputs


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels, first_batch):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    # Horovod: AllReduce the accumulated gradients
    gradients = [hvd.allreduce(g.accumulate()) for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Horovod: Broadcast the variables after first batch 
    if first_batch:
        hvd.broadcast_variables(model.variables, root_rank=0)
        hvd.broadcast_variables(optimizer.variables(), root_rank=0)

    # smdistributed: Merge predictions across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()

    for batch, (images, labels) in enumerate(train_ds):
        loss = train_step(images, labels, tf.constant(batch == 0))
```

## 使用手动拆分 TensorFlow
<a name="model-parallel-customize-training-script-tf-manual"></a>

使用 `smp.partition` 上下文管理器将操作放在特定的分区中。未放在任何 `smp.partition` 上下文中的任何操作都放在 `default_partition` 中。要了解有关模型并行度库 API SageMaker 的更多信息，请参阅 AP [I](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html) 文档。

```
import tensorflow as tf

# smdistributed: Import TF2.x API.
import smdistributed.modelparallel.tensorflow as smp

# smdistributed: Initialize
smp.init()

# Download and load MNIST dataset.
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
    "MNIST-data-%d" % smp.rank()
)
x_train, x_test = x_train / 255.0, x_test / 255.0

# Add a channels dimension
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

# smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder
# in batching to make sure batch size is always divisible by number of microbatches.
train_ds = (
    tf.data.Dataset.from_tensor_slices((x_train, y_train))
    .shuffle(10000, seed=smp.dp_rank())
    .batch(256, drop_remainder=True)
)

# smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API.
class MyModel(smp.DistributedModel):
    def __init__(self):
         # define layers

    def call(self, x):
        with smp.partition(0):
            x = self.layer0(x)
        with smp.partition(1):
            return self.layer1(x)


model = MyModel()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

# smdistributed: Define smp.step. Return any tensors needed outside
@smp.step
def get_grads(images, labels):
    predictions = model(images, training=True)
    loss = loss_object(labels, predictions)

    grads = optimizer.get_gradients(loss, model.trainable_variables)
    return grads, loss, predictions


@tf.function
def train_step(images, labels):
    gradients, loss, predictions = get_grads(images, labels)

    # smdistributed: Accumulate the gradients across microbatches
    gradients = [g.accumulate() for g in gradients]
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # smdistributed: Merge predictions and average losses across microbatches
    train_accuracy(labels, predictions.merge())
    return loss.reduce_mean()


for epoch in range(5):
    # Reset the metrics at the start of the next epoch
    train_accuracy.reset_states()
    for images, labels in train_ds:
        loss = train_step(images, labels)
    accuracy = train_accuracy.result()
```

## 不支持的框架功能
<a name="model-parallel-tf-unsupported-features"></a>

该库不支持以下 TensorFlow 功能：
+ 当前不支持 `tf.GradientTape()`。您可以改用 `Optimizer.get_gradients()` 或 `Optimizer.compute_gradients()` 来计算梯度。
+ 目前不支持 `tf.train.Checkpoint.restore()` API。对于检查点操作，请改用 `smp.CheckpointManager`，它提供相同的 API 和功能。请注意，`smp.CheckpointManager` 的检查点还原应在第一步之后进行。

# 修改 PyTorch 训练脚本
<a name="model-parallel-customize-training-script-pt"></a>

在本节中，您将学习如何修改 PyTorch 训练脚本以配置用于自动分区和手动分区的 SageMaker 模型并行度库。

**注意**  
要了解该库支持哪些 PyTorch 版本，请参阅[支持的框架和 AWS 区域](distributed-model-parallel-support.md)。

**提示**  
有关演示如何在 SageMaker 模型并行度库中使用 PyTorch 训练脚本的 end-to-end笔记本示例，请参阅。[亚马逊 SageMaker AI 模型并行库 v1 示例](distributed-model-parallel-examples.md)

请注意，默认情况下启用自动分区。除非另行指定，否则以下脚本使用自动分区。

**Topics**
+ [使用自动拆分 PyTorch](#model-parallel-customize-training-script-pt-16)
+ [使用手动拆分 PyTorch](#model-parallel-customize-training-script-pt-16-hvd)
+ [注意事项](#model-parallel-pt-considerations)
+ [不支持的框架功能](#model-parallel-pt-unsupported-features)

## 使用自动拆分 PyTorch
<a name="model-parallel-customize-training-script-pt-16"></a>

要使用模型并行度库运行 PyTorch 训练脚本，需要进行 SageMaker以下训练脚本更改：

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 导入和初始化库。

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedModel](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedModel) 包装模型。请注意，从底层 `nn.Module` 对象的 `forward` 方法返回的任何张量都将在模型并行设备之间广播，这会产生通信开销，因此在调用方法之外不需要的任何张量（例如中间激活）都不应返回。
**注意**  
要进行 FP16 训练，你需要使用 smdistrib [uted.modelparallel.torch.model\$1creation () 上下文管理器来封装模型。](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html)有关更多信息，请参阅 [FP16 使用模型并行度进行训练](model-parallel-extended-features-pytorch-fp16.md)。

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer) 包装优化器。
**注意**  
要进行 FP16 训练，您需要设置静态或动态损失比例。有关更多信息，请参阅 [FP16 使用模型并行度进行训练](model-parallel-extended-features-pytorch-fp16.md)。

1. 使用返回的 `DistributedModel` 对象而不是用户模型。

1. 将向前和向后逻辑放在步进函数中，然后用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#smp.init) 进行修饰。

1. 通过 `torch.cuda.set_device(smp.local_rank())` 将每个进程限制在自己的设备上。

1. 在 `smp.step` 调用之前，使用 `.to()` API 将输入张量移至 GPU（参见下面的示例）。

1. 将 `torch.Tensor.backward` 和 `torch.autograd.backward` 替换为 `DistributedModel.backward`。

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) 方法（例如 `reduce_mean`）对微批次的输出进行后处理。

1. 如果有评估步骤，则同样将向前逻辑放在 `smp.step` 修饰的函数中，然后使用 [`StepOutput` API](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_common_api.html#StepOutput) 对输出进行后处理。

1. 在 `DataLoader` 中设置 `drop_last=True`。或者，如果批次大小不能被微批次数量整除，则可以手动跳过训练循环中的批次。

要了解有关模型并行度库 API SageMaker 的更多信息，请参阅 AP [I](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html) 文档。

```
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets

import smdistributed.modelparallel.torch as smp

class GroupedNet(nn.Module):
    def __init__(self):
        super(GroupedNet, self).__init__()
        # define layers

    def forward(self, x):
        # define forward pass and return model outputs


# smdistributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, data, target):
    output = model(data)
    loss = F.nll_loss(output, target, reduction="mean")
    model.backward(loss)
    return output, loss


def train(model, device, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # smdistributed: Move input tensors to the GPU ID used by the current process,
        # based on the set_device call.
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        # Return value, loss_mb is a StepOutput object
        _, loss_mb = train_step(model, data, target)

        # smdistributed: Average the loss across microbatches.
        loss = loss_mb.reduce_mean()

        optimizer.step()

# smdistributed: initialize the backend
smp.init()

# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")

# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
dataset = datasets.MNIST("../data", train=True, download=False)

# smdistributed: Shard the dataset based on data-parallel ranks
if smp.dp_size() > 1:
    partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
    dataset = SplitDataset(dataset, partitions=partitions_dict)
    dataset.select(f"{smp.dp_rank()}")

# smdistributed: Set drop_last=True to ensure that batch size is always divisible
# by the number of microbatches
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

model = GroupedNet()
optimizer = optim.Adadelta(model.parameters(), lr=4.0)

# smdistributed: Use the DistributedModel container to provide the model
# to be partitioned across different ranks. For the rest of the script,
# the returned DistributedModel object should be used in place of
# the model provided for DistributedModel class instantiation.
model = smp.DistributedModel(model)
optimizer = smp.DistributedOptimizer(optimizer)

train(model, device, train_loader, optimizer)
```

## 使用手动拆分 PyTorch
<a name="model-parallel-customize-training-script-pt-16-hvd"></a>

使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/v1.2.0/smd_model_parallel_pytorch.html#smp.DistributedOptimizer) 上下文管理器将模块放置在特定设备中。未放在任何 `smp.partition` 上下文中的任何模块都放在 `default_partition` 中。如果 `auto_partition` 设置为 `False`，则需要提供 `default_partition`。在特定 `smp.partition` 上下文中创建的模块将放置在对应的分区上。

要了解有关模型并行度库 API SageMaker 的更多信息，请参阅 AP [I](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smd_model_parallel.html) 文档。

```
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets

import smdistributed.modelparallel.torch as smp

class GroupedNet(nn.Module):
    def __init__(self):
        super(GroupedNet, self).__init__()
        with smp.partition(0):
            # define child modules on device 0
        with smp.partition(1):
            # define child modules on device 1

    def forward(self, x):
        # define forward pass and return model outputs


# smdistributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, data, target):
    output = model(data)
    loss = F.nll_loss(output, target, reduction="mean")
    model.backward(loss)
    return output, loss


def train(model, device, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # smdistributed: Move input tensors to the GPU ID used by the current process,
        # based on the set_device call.
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        # Return value, loss_mb is a StepOutput object
        _, loss_mb = train_step(model, data, target)

        # smdistributed: Average the loss across microbatches.
        loss = loss_mb.reduce_mean()

        optimizer.step()

# smdistributed: initialize the backend
smp.init()

# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")

# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
dataset = datasets.MNIST("../data", train=True, download=False)

# smdistributed: Shard the dataset based on data-parallel ranks
if smp.dp_size() > 1:
    partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
    dataset = SplitDataset(dataset, partitions=partitions_dict)
    dataset.select(f"{smp.dp_rank()}")

# smdistributed: Set drop_last=True to ensure that batch size is always divisible
# by the number of microbatches
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)

model = GroupedNet()
optimizer = optim.Adadelta(model.parameters(), lr=4.0)

# smdistributed: Use the DistributedModel container to provide the model
# to be partitioned across different ranks. For the rest of the script,
# the returned DistributedModel object should be used in place of
# the model provided for DistributedModel class instantiation.
model = smp.DistributedModel(model)
optimizer = smp.DistributedOptimizer(optimizer)

train(model, device, train_loader, optimizer)
```

## 注意事项
<a name="model-parallel-pt-considerations"></a>

使用 SageMaker模型并行度库配置 PyTorch 训练脚本时，应注意以下几点：
+ 如果您使用的优化技术依赖于全局梯度范数，例如整个模型的梯度范数（如 LAMB 优化器或全局梯度剪裁的某些变体），则需要收集模型分区中的所有范数以确保正确性。您可以使用库的通信基本数据类型来完成此操作。
+ 模型中 `nn.Modules` 的所有向前方法的所有 `torch.Tensor` 参数都必须在模块输出的计算中使用。换而言之，该库不支持向模块传递模块输出所不依赖的 `torch.Tensor` 参数。
+ 传递给 `smp.DistributedModel.backward()` 调用的参数必须依赖于所有模型输出。换而言之，`smp.DistributedModel.forward` 调用的输出，必须在计算提供给 `smp.DistributedModel.backward` 的张量时全部使用。
+ 如果您的代码中有 `torch.cuda.synchronize()` 调用，则可能需要在同步调用之前立即调用 `torch.cuda.set_device(smp.local_rank())`。否则，可能会在设备 0 中创建不必要的 CUDA 上下文，这将不必要地消耗内存。
+ 由于库放置在不同的设备上的 `nn.Modules` 中，因此模型中的模块不得依赖于任何在 `smp.step` 内部修改的全局状态。在整个训练过程中保持固定的任何状态，或者在 `smp.step` 之外以对所有进程都可见的方式修改的任何状态，都是允许的。
+ 使用库时，您无需将模型移至 GPU（例如，使用 `model.to(device)`）。如果您尝试在模型分区之前（在第一次 `smp.step` 调用之前）将模型移动到 GPU，则会忽略移动调用。对于模型中分配给某个秩的部分，库会自动将这个部分移动到其 GPU。一旦开始使用库进行训练，请不要将模型移至 CPU 并使用它，因为对于未分配给进程所持有的分区的模块，它不会有正确的参数。如果您想在使用模型并行度库训练模型后重新训练模型或在没有库的情况下将其用于推理，则推荐的方法是使用我们的检查点 API 保存完整模型并将其加载回常规模块。 PyTorch 
+ 如果您有模块列表，这样一个模块的输出就会传送到另一个模块中，而使用 `nn.Sequential` 替换该列表可以显著提高性能。
+ 权重更新 (`optimizer.step()`) 需要在 `smp.step` 外部进行，因为此时整个向后传递已完成并且梯度准备就绪。当使用具有模型和数据并行性的混合模型时，此时，梯 AllReduce 度也可以保证完成。
+ 将该库与数据并行度结合使用时，请确保所有数据并行等级上的批次数相同，这样在等待 AllReduce 未参与该步骤的排名时就不会挂起。
+ 如果您使用 ml.p4d 实例类型（例如 ml.p4d.24xlarge）启动训练作业，则必须设置数据加载程序变量 `num_workers=0`。例如，您可以如下所示定义 `DataLoader`：

  ```
  dataloader = torch.utils.data.DataLoader(
              data,
              batch_size=batch_size,
              num_workers=0,
              pin_memory=True,
              drop_last=True,
              shuffle=shuffle,
          )
  ```
+ `smp.step` 的输入必须是 `DataLoader` 生成的模型输入。这是因为在 `smp.step` 内部按照批次维度拆分输入张量，并对其进行管道传输。这意味着将 `DataLoader` 本身传递给 `smp.step` 函数以在其中生成模型输入，这种方法行不通。

  例如，如果您如下所示定义 `DataLoader`：

  ```
  train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)
  ```

  您应该访问 `train_loader` 生成的模型输入，并将这些输入传递给 `smp.step` 修饰的函数。不要将 `train_loader` 直接传递给 `smp.step` 函数。

  ```
  def train(model, device, train_loader, optimizer):
      model.train()
      for batch_idx, (data, target) in enumerate(train_loader):
          ...
          _, loss_mb = train_step(model, data, target)
          ...
  
  @smp.step
  def train_step(model, data, target):
      ...
      return output, loss
  ```
+ `smp.step` 的输入张量必须使用 `.to()` API 移动到当前设备，此操作必须在 `torch.cuda.set_device(local_rank())` 调用后进行。

  例如，您可以如下所示定义 `train` 函数。在使用这些输入张量调用 `train_step` 之前，此函数使用 `.to()` API 将 `data` 和 `target` 添加到当前设备。

  ```
  def train(model, device, train_loader, optimizer):
      model.train()
      for batch_idx, (data, target) in enumerate(train_loader):
          # smdistributed: Move input tensors to the GPU ID used by the current process,
          # based on the set_device call.
          data, target = data.to(device), target.to(device)
          optimizer.zero_grad()
          # Return value, loss_mb is a StepOutput object
          _, loss_mb = train_step(model, data, target)
  
          # smdistributed: Average the loss across microbatches.
          loss = loss_mb.reduce_mean()
  
          optimizer.step()
  ```

  在上面的 `train` 函数中，此 `smp.set` 修饰的函数的输入张量已移至当前设备。模型*无需*移至当前设备。对于模型中分配给某个秩的部分，库会自动将这个部分移动到其 GPU。

  ```
  @smp.step
  def train_step(model, data, target):
      output = model(data)
      loss = F.nll_loss(output, target, reduction="mean")
      model.backward(loss)
      return output, loss
  ```

## 不支持的框架功能
<a name="model-parallel-pt-unsupported-features"></a>

的模型并行度库不支持以下 PyTorch 功能： SageMaker
+ 如果将数据并行性与本机 [PyTorch DDP](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) 一起使用，则该库不支持[https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html)包装器模块。该库在内部管理与 PyTorch DDP 的集成，包括参数广播和渐变 AllReduce。使用库时，模块缓冲区在训练开始时仅广播一次。如果您的模型具有的模块缓冲区需要在每个步骤中跨数据并行组同步，则可以通过 `torch.distributed` API，使用可以通过 `smp.get_dp_process_group()` 获取的进程组进行同步。
+ 对于混合精度训练，不支持 `apex.amp` 模块。使用具有自动混合精度的库的推荐方法是使用 `torch.cuda.amp`，唯一的不同是使用 `smp.amp.GradScaler` 而不是在 Torch 中实施。
+ `smp.DistributedModel` 不支持 `torch.jit.ScriptModules` 或 `ScriptFunctions`。
+ 不支持 `apex` : `FusedLayerNorm`、`FusedAdam`、`FusedLAMB` 以及来自 `apex` 的 `FusedNovoGrad`。你可以通过`smp.optimizers`和`smp.nn` APIs 改用它们的库实现。