

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 调整训练脚本以使用 SMDDP 集体操作
<a name="data-parallel-modify-sdp-select-framework"></a>

本节提供的训练脚本示例经过简化，仅突出显示了在训练脚本中启用 SageMaker AI 分布式数据并行性 (SMDDP) 库所需的更改。有关演示如何使用 SMDDP 库运行分布式训练作业的 end-to-end Jupyter 笔记本示例，请参阅。[Amazon SageMaker AI 数据并行库示例](distributed-data-parallel-v2-examples.md)

**Topics**
+ [在训练脚本中使用 SMDDP 库 PyTorch](data-parallel-modify-sdp-pt.md)
+ [在 L PyTorch ightning 训练脚本中使用 SMDDP 库](data-parallel-modify-sdp-pt-lightning.md)
+ [在 TensorFlow 训练脚本中使用 SMDDP 库（已弃用）](data-parallel-modify-sdp-tf2.md)

# 在训练脚本中使用 SMDDP 库 PyTorch
<a name="data-parallel-modify-sdp-pt"></a>

[从 SageMaker AI 分布式数据并行度 (SMDDP) 库 v1.4.0 开始，您可以将该库用作分布式包的后端选项。PyTorch ](https://pytorch.org/tutorials/beginner/dist_overview.html)要使用 SMDDP `AllReduce` 和`AllGather`集合操作，您只需要在训练脚本的开头导入 SMDDP 库，并在进程组初始化期间将 SMDDP 设置为 PyTorch 分布式模块的后端即可。使用单行后端规范，您可以保持所有原生 PyTorch 分布式模块和整个训练脚本不变。[以下代码片段展示了如何使用 SMDDP 库作为 PyTorch基于分布式训练包的后端：分布式[PyTorch 数据并行 (DDP)、PyTorch 完全分片数据并行](https://pytorch.org/docs/stable/notes/ddp.html)[性 (FSDP) 和威震](https://pytorch.org/docs/stable/fsdp.html)天-。[DeepSpeed](https://github.com/microsoft/DeepSpeed)DeepSpeed](https://github.com/microsoft/Megatron-DeepSpeed)

## 适用于 PyTorch DDP 或 FSDP
<a name="data-parallel-enable-for-ptddp-ptfsdp"></a>

进程组初始化如下。

```
import torch.distributed as dist
import smdistributed.dataparallel.torch.torch_smddp

dist.init_process_group(backend="smddp")
```

**注意**  
（仅适用于 PyTorch DDP 作业）`smddp`后端目前不支持使用 API 创建子流程组。`torch.distributed.new_group()`您也不能同时使用 `smddp` 后端和其他进程组后端，如 `NCCL` 和 `Gloo`。

## 对于我们的 DeepSpeed 威震天-DeepSpeed
<a name="data-parallel-enable-for-deepspeed"></a>

进程组初始化如下。

```
import deepspeed
import smdistributed.dataparallel.torch.torch_smddp

deepspeed.init_distributed(dist_backend="smddp")
```

**注意**  
要将 SMDDP `AllGather` 与 [使用 Python SageMaker SDK 使用 SMDDP 启动分布式训练作业](data-parallel-use-api.md) 中基于 `mpirun` 的启动器（`smdistributed` 和 `pytorchddp`）配合使用，还需要在训练脚本中设置以下环境变量。  

```
export SMDATAPARALLEL_OPTIMIZE_SDP=true
```

有关编写 PyTorch FSDP 训练脚本的一般指导，请参阅文档中的[使用完全分片数据并行 (FSDP) 进行高级模型训练](https://pytorch.org/tutorials/intermediate/FSDP_adavnced_tutorial.html)。 PyTorch

有关编写 PyTorch DDP 训练脚本的一般指导，请参阅 PyTorch 文档中的[分布式并行数据入门](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)。

调整完训练脚本后，继续到 [使用 Python SageMaker SDK 使用 SMDDP 启动分布式训练作业](data-parallel-use-api.md)。

# 在 L PyTorch ightning 训练脚本中使用 SMDDP 库
<a name="data-parallel-modify-sdp-pt-lightning"></a>

如果您想使用 [PyTorchLightning](https://pytorch-lightning.readthedocs.io/en/latest/starter/introduction.html) 训练脚本并在 SageMaker AI 中运行分布式数据并行训练作业，则只需对训练脚本进行最少更改即可运行训练作业。必要的更改包括：导入`smdistributed.dataparallel`库的 PyTorch 模块，设置 L PyTorch ightning 的环境变量以接受 SageMaker 训练工具包预设的 SageMaker AI 环境变量，以及通过将流程组后端设置为来激活 SMDDP 库。`"smddp"`要了解详情，请仔细阅读以下分别介绍了各个步骤并提供代码示例的说明。

**注意**  
 PyTorch Lightning 支持在 SageMaker AI 数据并行库 v1.5.0 及更高版本中可用。

## PyTorch Lightning == v2.1.0 和 == 2.0.1 PyTorch
<a name="smddp-pt-201-lightning-210"></a>

1. 导入 `pytorch_lightning` 库和 `smdistributed.dataparallel.torch` 模块。

   ```
   import lightning as pl
   import smdistributed.dataparallel.torch.torch_smddp
   ```

1. 实例化。[LightningEnvironment](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.plugins.environments.LightningEnvironment.html)

   ```
   from lightning.fabric.plugins.environments.lightning import LightningEnvironment
   
   env = LightningEnvironment()
   env.world_size = lambda: int(os.environ["WORLD_SIZE"])
   env.global_rank = lambda: int(os.environ["RANK"])
   ```

1. **对于 PyTorch DDP** — 使用 for `process_group_backend` 和 `"gpu"` for 创建[DDPStrategy](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.DDPStrategy.html)类`"smddp"`的对象`accelerator`，然后将其传递给 T [rainer](https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html) 类。

   ```
   import lightning as pl
   from lightning.pytorch.strategies import DDPStrategy
   
   ddp = DDPStrategy(
       cluster_environment=env, 
       process_group_backend="smddp", 
       accelerator="gpu"
   )
   
   trainer = pl.Trainer(
       max_epochs=200, 
       strategy=ddp, 
       devices=num_gpus, 
       num_nodes=num_nodes
   )
   ```

   **对于 PyTorch FSDP** — 使用 for `process_group_backend` 和 `"gpu"` for 创建[FSDPStrategy](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.FSDPStrategy.html)类的对象（可选择[包装策略](https://pytorch.org/docs/stable/fsdp.html)）`accelerator`，然后将其传递给 T [rainer](https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html) 类。`"smddp"`

   ```
   import lightning as pl
   from lightning.pytorch.strategies import FSDPStrategy
   
   from functools import partial
   from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy
   
   policy = partial(
       size_based_auto_wrap_policy, 
       min_num_params=10000
   )
   
   fsdp = FSDPStrategy(
       auto_wrap_policy=policy,
       process_group_backend="smddp", 
       cluster_environment=env
   )
   
   trainer = pl.Trainer(
       max_epochs=200, 
       strategy=fsdp, 
       devices=num_gpus, 
       num_nodes=num_nodes
   )
   ```

调整完训练脚本后，继续到 [使用 Python SageMaker SDK 使用 SMDDP 启动分布式训练作业](data-parallel-use-api.md)。

**注意**  
构建 A SageMaker I PyTorch 估算器并在中提交训练任务请求时[使用 Python SageMaker SDK 使用 SMDDP 启动分布式训练作业](data-parallel-use-api.md)，需要在 SageMaker AI PyTorch 训练`requirements.txt`容器`lightning-bolts`中提供安装`pytorch-lightning`和。  

```
# requirements.txt
pytorch-lightning
lightning-bolts
```
有关指定存放`requirements.txt`文件以及训练脚本和作业提交的源目录的更多信息，请参阅 *Amazon A SageMaker I Python SDK 文档*中的[使用第三方库](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#id12)。

# 在 TensorFlow 训练脚本中使用 SMDDP 库（已弃用）
<a name="data-parallel-modify-sdp-tf2"></a>

**重要**  
在 v2.11.0 之后，SMDDP 库已停止支持， TensorFlow 并且在 DLCs v2.11.0 TensorFlow 之后不再可用。要查找以前安装了 TensorFlow DLCs SMDDP 库的情况，请参阅。[支持的框架](distributed-data-parallel-support.md#distributed-data-parallel-supported-frameworks)

以下步骤向您展示如何修改 TensorFlow 训练脚本以利用 SageMaker AI 的分布式数据 parallel 库。  

该库 APIs 的设计类似于 Horovod APIs。有关该库提供的每个 API 的更多详细信息 TensorFlow，请参阅 [SageMaker AI 分布式数据 parallel TensorFlow API 文档](https://sagemaker.readthedocs.io/en/stable/api/training/smd_data_parallel.html#api-documentation)。

**注意**  
SageMaker AI 分布式数据 parallel 适用于由除`tf.keras`模块之外的`tf`核心模块组成的 TensorFlow 训练脚本。 SageMaker 人工智能分布式数据 parallel 不支持 Ker TensorFlow as 实现。

**注意**  
 SageMaker AI 分布式数据并行度库支持开箱即用的自动混合精度 (AMP)。除了对训练脚本进行框架级别的修改之外，您无需执行额外操作即可启用 AMP。如果有梯度 FP16，则 SageMaker AI 数据并行度库将在中运行其操作。`AllReduce` FP16有关在训练脚本中实现 AMP APIs 的更多信息，请参阅以下资源：  
[框架- TensorFlow](https://docs.nvidia.com/deeplearning/performance/mixed-precision-training/index.html#tensorflow) 在 *NVIDIA 深度学习性能文档*中
*NVIDIA 开发人员文档*中的[适用于深度学习的自动混合精度](https://developer.nvidia.com/automatic-mixed-precision)
TensorFlow *TensorFlow文档 APIs*中的@@ [精度好坏参半](https://www.tensorflow.org/guide/mixed_precision)

1. 导入库的 TensorFlow 客户端并对其进行初始化。

   ```
   import smdistributed.dataparallel.tensorflow as sdp 
   sdp.init()
   ```

1. 使用 `local_rank` 将每个 GPU 固定到一个 `smdistributed.dataparallel` 进程，这表示进程在给定节点中的相对秩。`sdp.tensorflow.local_rank()` API 向您提供设备的局部秩。领导节点的秩为 0，Worker 节点的秩为 1、2、3，依此类推。在以下代码块中将其调用为`sdp.local_rank()`。 `set_memory_growth`与分布式 SageMaker AI 没有直接关系，但必须将其设置为使用进行分布式训练 TensorFlow。

   ```
   gpus = tf.config.experimental.list_physical_devices('GPU')
   for gpu in gpus:
       tf.config.experimental.set_memory_growth(gpu, True)
   if gpus:
       tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
   ```

1. 根据工作线程数扩展学习率。`sdp.tensorflow.size()` API 为您提供集群中工作线程的数量。以下代码块中将其作为 `sdp.size()` 调用。

   ```
   learning_rate = learning_rate * sdp.size()
   ```

1. 在训练期间，使用库的 `DistributedGradientTape` 来优化 `AllReduce` 操作。这会包装 `tf.GradientTape`。  

   ```
   with tf.GradientTape() as tape:
         output = model(input)
         loss_value = loss(label, output)
       
   # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape
   tape = sdp.DistributedGradientTape(tape)
   ```

1. 将初始模型变量从领导节点（秩 0）广播到所有 Worker 节点（秩 1 到 n）。这是确保对所有工作线程秩进行一致的初始化所必需的。在模型和优化器变量初始化后使用 `sdp.tensorflow.broadcast_variables` API。以下代码块中将其作为 `sdp.broadcast_variables()` 调用。

   ```
   sdp.broadcast_variables(model.variables, root_rank=0)
   sdp.broadcast_variables(opt.variables(), root_rank=0)
   ```

1. 最后，修改脚本，仅在领导节点上保存检查点。领导节点具有同步模型。这还可以避免 Worker 节点覆盖检查点并可能损坏检查点。

   ```
   if sdp.rank() == 0:
       checkpoint.save(checkpoint_dir)
   ```

以下是使用库进行分布式 TensorFlow 训练的示例训练脚本。

```
import tensorflow as tf

# SageMaker AI data parallel: Import the library TF API
import smdistributed.dataparallel.tensorflow as sdp

# SageMaker AI data parallel: Initialize the library
sdp.init()

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    # SageMaker AI data parallel: Pin GPUs to a single library process
    tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')

# Prepare Dataset
dataset = tf.data.Dataset.from_tensor_slices(...)

# Define Model
mnist_model = tf.keras.Sequential(...)
loss = tf.losses.SparseCategoricalCrossentropy()

# SageMaker AI data parallel: Scale Learning Rate
# LR for 8 node run : 0.000125
# LR for single node run : 0.001
opt = tf.optimizers.Adam(0.000125 * sdp.size())

@tf.function
def training_step(images, labels, first_batch):
    with tf.GradientTape() as tape:
        probs = mnist_model(images, training=True)
        loss_value = loss(labels, probs)

    # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape
    tape = sdp.DistributedGradientTape(tape)

    grads = tape.gradient(loss_value, mnist_model.trainable_variables)
    opt.apply_gradients(zip(grads, mnist_model.trainable_variables))

    if first_batch:
       # SageMaker AI data parallel: Broadcast model and optimizer variables
       sdp.broadcast_variables(mnist_model.variables, root_rank=0)
       sdp.broadcast_variables(opt.variables(), root_rank=0)

    return loss_value

...

# SageMaker AI data parallel: Save checkpoints only from master node.
if sdp.rank() == 0:
    checkpoint.save(checkpoint_dir)
```

调整完训练脚本后，请继续到[使用 Python SageMaker SDK 使用 SMDDP 启动分布式训练作业](data-parallel-use-api.md)。