

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 調整訓練指令碼以使用 SMDDP 集體操作
<a name="data-parallel-modify-sdp-select-framework"></a>

本節提供的訓練指令碼範例經過簡化，並僅強調在訓練指令碼中啟用 SageMaker AI 分散式資料平行化 (SMDDP) 程式庫所需的變更。如需示範如何使用 SMDDP 程式庫執行分散式訓練作業的端對端 Jupyter 筆記本範例，請參閱 [Amazon SageMaker AI 資料平行程式庫範例](distributed-data-parallel-v2-examples.md)。

**Topics**
+ [在 PyTorch 訓練指令碼中使用 SMDDP 程式庫](data-parallel-modify-sdp-pt.md)
+ [在 PyTorch Lightning 訓練指令碼中使用 SMDDP 程式庫](data-parallel-modify-sdp-pt-lightning.md)
+ [在 TensorFlow 訓練指令碼中使用 SMDDP 程式庫 (已棄用)](data-parallel-modify-sdp-tf2.md)

# 在 PyTorch 訓練指令碼中使用 SMDDP 程式庫
<a name="data-parallel-modify-sdp-pt"></a>

從 SageMaker AI 分散式資料平行化 (SMDDP) 程式庫 v1.4.0 開始，您可以使用程式庫做為 [PyTorch 分散式套件](https://pytorch.org/tutorials/beginner/dist_overview.html)的後端選項。若要使用 SMDDP `AllReduce` 和 `AllGather` 集體操作，您只需在訓練指令碼開頭匯入 SMDDP 程式庫，並在程序群組初始化期間將 SMDDP 設定為 PyTorch 分散式模組的後端。只需一行後端規範，您就可以保持所有原生 PyTorch 分散式模組和整個訓練指令碼不變。下列程式碼片段示範如何使用 SMDDP 程式庫做為 PyTorch 型分散式訓練套件的後端：[PyTorch 分散式資料平行化 (DDP)](https://pytorch.org/docs/stable/notes/ddp.html)、[PyTorch 全碎片資料平行化 (FSDP)](https://pytorch.org/docs/stable/fsdp.html)、[DeepSpeed](https://github.com/microsoft/DeepSpeed) 和 [Megatron-DeepSpeed](https://github.com/microsoft/Megatron-DeepSpeed)。

## 對於 PyTorch DDP 或 FSDP
<a name="data-parallel-enable-for-ptddp-ptfsdp"></a>

初始化程序群組，如下所示。

```
import torch.distributed as dist
import smdistributed.dataparallel.torch.torch_smddp

dist.init_process_group(backend="smddp")
```

**注意**  
(僅適用於 PyTorch DDP 任務) `smddp` 後端目前不支援使用 `torch.distributed.new_group()` API 建立子程序群組。您也無法與其他程序群組後端 (例如 `NCCL` 和 `Gloo`) 同時使用 `smddp` 後端。

## 對於 DeepSpeed 或 Megatron-DeepSpeed
<a name="data-parallel-enable-for-deepspeed"></a>

初始化程序群組，如下所示。

```
import deepspeed
import smdistributed.dataparallel.torch.torch_smddp

deepspeed.init_distributed(dist_backend="smddp")
```

**注意**  
若要在 [使用 SageMaker Python SDK 透過 SMDDP 啟動分散式訓練任務](data-parallel-use-api.md) 中使用 SMDDP `AllGather` 搭配 `mpirun` 型啟動器 (`smdistributed` 和 `pytorchddp`)，您也需要在訓練指令碼中設定下列環境變數。  

```
export SMDATAPARALLEL_OPTIMIZE_SDP=true
```

如需撰寫 PyTorch FSDP 訓練指令碼的一般指引，請參閱 PyTorch 文件中的[使用全碎片資料平行化 (FSDP) 的進階模型訓練](https://pytorch.org/tutorials/intermediate/FSDP_adavnced_tutorial.html)。

如需撰寫 PyTorch DDP 訓練指令碼的一般指引，請參閱 PyTorch 文件中的[分散式資料平行化入門](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)。

完成訓練指令碼的調整後，請繼續前往[使用 SageMaker Python SDK 透過 SMDDP 啟動分散式訓練任務](data-parallel-use-api.md)。

# 在 PyTorch Lightning 訓練指令碼中使用 SMDDP 程式庫
<a name="data-parallel-modify-sdp-pt-lightning"></a>

如果您想使用 [PyTorch Lightning](https://pytorch-lightning.readthedocs.io/en/latest/starter/introduction.html) 訓練指令碼，並在 SageMaker AI 執行分散式資料平行化訓練任務，您只需對訓練指令碼進行最少的修改即可執行訓練任務。必要的變更包如下：匯入 `smdistributed.dataparallel` 程式庫的 PyTorch 模組、設定 PyTorch Lightning 的環境變數以接受 SageMaker AI 訓練工具組預設的 SageMaker AI 環境變數，以及將程序群組後端設定為 `"smddp"` 以啟動 SMDDP 程式庫。若要進一步了解，請逐步執行以程式碼範例分解步驟的下列指示。

**注意**  
SageMaker AI 資料平行化程式庫 v1.5.0 及更新版本提供 PyTorch Lightning 支援。

## PyTorch Lightning == v2.1.0 和 PyTorch == 2.0.1
<a name="smddp-pt-201-lightning-210"></a>

1. 匯入 `pytorch_lightning` 程式庫和 `smdistributed.dataparallel.torch` 模組。

   ```
   import lightning as pl
   import smdistributed.dataparallel.torch.torch_smddp
   ```

1. 執行個體化 [LightningEnvironment](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.plugins.environments.LightningEnvironment.html)。

   ```
   from lightning.fabric.plugins.environments.lightning import LightningEnvironment
   
   env = LightningEnvironment()
   env.world_size = lambda: int(os.environ["WORLD_SIZE"])
   env.global_rank = lambda: int(os.environ["RANK"])
   ```

1. **對於 PyTorch DDP** – 建立 [DDPStrategy](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.DDPStrategy.html) 類別的物件並將 `process_group_backend` 設為 `"smddp"`、將 `accelerator` 設為 `"gpu"`，然後將其傳遞給 [Trainer](https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html) 類別。

   ```
   import lightning as pl
   from lightning.pytorch.strategies import DDPStrategy
   
   ddp = DDPStrategy(
       cluster_environment=env, 
       process_group_backend="smddp", 
       accelerator="gpu"
   )
   
   trainer = pl.Trainer(
       max_epochs=200, 
       strategy=ddp, 
       devices=num_gpus, 
       num_nodes=num_nodes
   )
   ```

   **對於 PyTorch FSDP** – 建立 [FSDPStrategy](https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.strategies.FSDPStrategy.html) 類別的物件 (搭配自行選擇的[包裝政策](https://pytorch.org/docs/stable/fsdp.html)) 並將 `process_group_backend` 設為 `"smddp"`、將 `accelerator` 設為 `"gpu"`，然後將其傳遞給 [Trainer](https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html) 類別。

   ```
   import lightning as pl
   from lightning.pytorch.strategies import FSDPStrategy
   
   from functools import partial
   from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy
   
   policy = partial(
       size_based_auto_wrap_policy, 
       min_num_params=10000
   )
   
   fsdp = FSDPStrategy(
       auto_wrap_policy=policy,
       process_group_backend="smddp", 
       cluster_environment=env
   )
   
   trainer = pl.Trainer(
       max_epochs=200, 
       strategy=fsdp, 
       devices=num_gpus, 
       num_nodes=num_nodes
   )
   ```

完成訓練指令碼的調整後，請繼續前往[使用 SageMaker Python SDK 透過 SMDDP 啟動分散式訓練任務](data-parallel-use-api.md)。

**注意**  
在 [使用 SageMaker Python SDK 透過 SMDDP 啟動分散式訓練任務](data-parallel-use-api.md) 中建構 SageMaker AI PyTorch 估算器並提交訓練任務請求時，您必須提供 `requirements.txt` 以在 SageMaker AI PyTorch 訓練容器中安裝 `pytorch-lightning` 與 `lightning-bolts`。  

```
# requirements.txt
pytorch-lightning
lightning-bolts
```
如需指定來源目錄以放置 `requirements.txt` 檔案和訓練指令碼以及工作提交的更多相關資訊，請參閱 *Amazon SageMaker AI Python SDK 文件*中的[使用第三方程式庫](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#id12)。

# 在 TensorFlow 訓練指令碼中使用 SMDDP 程式庫 (已棄用)
<a name="data-parallel-modify-sdp-tf2"></a>

**重要**  
SMDDP 程式庫已停止對 TensorFlow 的支援，且不再於 v2.11.0 之後的 TensorFlow DLC 中提供。若要尋找已安裝 SMDDP 程式庫的舊版 TensorFlow DLC，請參閱[支援的架構](distributed-data-parallel-support.md#distributed-data-parallel-supported-frameworks)。

下列步驟說明如何修改 TensorFlow 訓練指令碼，利用 SageMaker AI 的分散式資料平行化資料庫。  

程式庫 API 的設計旨在近似 Horovod API。請參閱 [SageMaker AI 分散式資料平行化 TensorFlow API 文件](https://sagemaker.readthedocs.io/en/stable/api/training/smd_data_parallel.html#api-documentation)，了解程式庫為 TensorFlow 提供之每個 API 的其他詳細資訊。

**注意**  
SageMaker AI 分散式資料平行化可配合包含 `tf` 核心模組的 TensorFlow 訓練指令碼 (`tf.keras` 模組除外) 調整。SageMaker AI 分散式資料平行化不支援包含 Keras 實作的 TensorFlow。

**注意**  
SageMaker AI 分散式資料平行化程式庫支援開箱即用的自動混合精確度 (AMP)。除了對訓練指令碼進行架構層級修改之外，無需執行任何額外操作即可啟用 AMP。如果梯度是 FP16，SageMaker AI 資料平行化程式庫會以 FP16 執行其 `AllReduce` 操作。有關將 AMP API 實施到訓練腳本的詳細資訊，請參閱以下資源：  
[架構 - ](https://docs.nvidia.com/deeplearning/performance/mixed-precision-training/index.html#tensorflow)*NVIDIA 深度學習效能文件中的 TensorFlow*
*NVIDIA 開發人員文件*中[適用於深度學習的自動混合精確度](https://developer.nvidia.com/automatic-mixed-precision)
*TensorFlow 文件*中的 [TensorFlow 混合精確度 API](https://www.tensorflow.org/guide/mixed_precision)

1. 匯入程式庫的 TensorFlow 用戶端並將其初始化。

   ```
   import smdistributed.dataparallel.tensorflow as sdp 
   sdp.init()
   ```

1. 使用 `local_rank` 將每個 GPU 固定到單一 `smdistributed.dataparallel` 程序-這是指特定節點內程序的相對等級。`sdp.tensorflow.local_rank()` API 為您提供裝置的本地排名。領導節點是等級 0，工作者節點是等級 1、2、3，依此類推。這在下列程式碼區塊中調用為 `sdp.local_rank()`。`set_memory_growth` 與分散式 SageMaker AI 沒有直接相關，但必須針對使用 TensorFlow 的分散式訓練進行設定。

   ```
   gpus = tf.config.experimental.list_physical_devices('GPU')
   for gpu in gpus:
       tf.config.experimental.set_memory_growth(gpu, True)
   if gpus:
       tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
   ```

1. 按工作者數量調整學習速率。`sdp.tensorflow.size()` API 提供叢集中的工作者數量。這在下列程式碼區塊中調用為 `sdp.size()`。

   ```
   learning_rate = learning_rate * sdp.size()
   ```

1. 使用程式庫的 `DistributedGradientTape`，在訓練期間最佳化 `AllReduce` 操作。這包含 `tf.GradientTape`。  

   ```
   with tf.GradientTape() as tape:
         output = model(input)
         loss_value = loss(label, output)
       
   # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape
   tape = sdp.DistributedGradientTape(tape)
   ```

1. 廣播從領導節點 (排名 0) 到所有工作者節點 (排名 1 到 n) 的初始模型變數。為了確保所有工作者排名初始化一致，必須這麼做。初始化模型和最佳化工具變數後，請使用 `sdp.tensorflow.broadcast_variables` API。這在下面的程式碼塊中調用為 `sdp.broadcast_variables()`。

   ```
   sdp.broadcast_variables(model.variables, root_rank=0)
   sdp.broadcast_variables(opt.variables(), root_rank=0)
   ```

1. 最後，修改指令碼，僅在領導節點保存檢查點。領導節點有同步化的模型。這也可避免工作者節點覆寫檢查點，以及可能損壞檢查點。

   ```
   if sdp.rank() == 0:
       checkpoint.save(checkpoint_dir)
   ```

以下是 TensorFlow 訓練指令碼範例，適用於使用程式庫進行分散式訓練。

```
import tensorflow as tf

# SageMaker AI data parallel: Import the library TF API
import smdistributed.dataparallel.tensorflow as sdp

# SageMaker AI data parallel: Initialize the library
sdp.init()

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    # SageMaker AI data parallel: Pin GPUs to a single library process
    tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')

# Prepare Dataset
dataset = tf.data.Dataset.from_tensor_slices(...)

# Define Model
mnist_model = tf.keras.Sequential(...)
loss = tf.losses.SparseCategoricalCrossentropy()

# SageMaker AI data parallel: Scale Learning Rate
# LR for 8 node run : 0.000125
# LR for single node run : 0.001
opt = tf.optimizers.Adam(0.000125 * sdp.size())

@tf.function
def training_step(images, labels, first_batch):
    with tf.GradientTape() as tape:
        probs = mnist_model(images, training=True)
        loss_value = loss(labels, probs)

    # SageMaker AI data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape
    tape = sdp.DistributedGradientTape(tape)

    grads = tape.gradient(loss_value, mnist_model.trainable_variables)
    opt.apply_gradients(zip(grads, mnist_model.trainable_variables))

    if first_batch:
       # SageMaker AI data parallel: Broadcast model and optimizer variables
       sdp.broadcast_variables(mnist_model.variables, root_rank=0)
       sdp.broadcast_variables(opt.variables(), root_rank=0)

    return loss_value

...

# SageMaker AI data parallel: Save checkpoints only from master node.
if sdp.rank() == 0:
    checkpoint.save(checkpoint_dir)
```

完成訓練指令碼的調整後，請繼續前往 [使用 SageMaker Python SDK 透過 SMDDP 啟動分散式訓練任務](data-parallel-use-api.md)。