

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 對具有模型並行性的模型執行檢查點和微調
<a name="distributed-model-parallel-checkpointing-and-finetuning"></a>

SageMaker 模型平行處理程式庫提供檢查點 API，可儲存模型狀態和依據各種模型平行處理策略分割的最佳化工具狀態，並從您要重新開始訓練和微調的位置載入繼續訓練的檢查點。這些 API 還支援部分或全部儲存模型和最佳化工具狀態的選項。

**Topics**
+ [對分散式模型進行檢查點](#distributed-model-parallel-checkpoint)
+ [微調分散式模型](#distributed-model-parallel-fine-tuning)

## 對分散式模型進行檢查點
<a name="distributed-model-parallel-checkpoint"></a>

根據 PyTorch 和 TensorFlow 之間的架構，以及您使用的 SageMaker 模型平行處理程式庫版本，選擇下列其中一個主題。

**Topics**
+ [對分散式 PyTorch 模型執行檢查點 (適用於 SageMaker 模型平行處理程式庫 1.10.0 版及更新版本)](#model-parallel-extended-features-pytorch-checkpoint)
+ [對分散式 PyTorch 模型執行檢查點 (適用於 1.6.0 版和 1.9.0 版之間的 SageMaker 模型平行處理程式庫)](#model-parallel-extended-features-pytorch-saving-loading-checkpoints)
+ [對分散式 TensorFlow 模型執行檢查點](#distributed-model-parallel-checkpoint-tensorflow)

### 對分散式 PyTorch 模型執行檢查點 (適用於 SageMaker 模型平行處理程式庫 1.10.0 版及更新版本)
<a name="model-parallel-extended-features-pytorch-checkpoint"></a>

SageMaker 模型平行處理程式庫提供檢查點 API，以儲存和載入分散式模型狀態及其最佳化工具狀態的完整或部分檢查點。

**注意**  
如果您使用 PyTorch 和 SageMaker 模型平行處理程式庫 1.10.0 版或更新版本，則建議使用此檢查點方法。

**部分檢查點**

若要儲存使用模型平行處理訓練的模型檢查點，請使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.save_checkpoint](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.save_checkpoint)API，並將部分檢查點選項設定為 true (`partial=True`)。這會個別儲存每個模型分割區。除了模型和最佳化工具狀態之外，您還可以透過 `user_content` 引數儲存任何其他自訂資料。檢查點模型、最佳化工具和使用者內容都儲存為個別檔案。`save_checkpoint` API 呼叫會以下列結構建立檢查點資料夾。

```
- path
  - ${tag}_partial (folder for partial checkpoints)
    - model_rankinfo.pt
    - optimizer_rankinfo.pt
    - fp16_states_rankinfo.pt
    - user_content.pt
  - $tag (checkpoint file for full checkpoints)
  - user_content_$tag (user_content file for full checkpoints)
  - newest (a file that indicates the newest checkpoint)
```

若要從部分檢查點繼續訓練，請將 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.resume_from_checkpoint](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.resume_from_checkpoint) API 與 `partial=True` 搭配使用，並指定檢查點目錄和儲存部分檢查點時使用的標籤。請注意，模型權重的實際載入發生在模型分割之後，在 `smdistributed.modelparallel.torch.step` 裝飾訓練 Step Function 初次執行期間。

儲存部分檢查點時，程式庫也會將模型分割區決定儲存為具有 `.pt` 副檔名的檔案。相反地，從部分檢查點恢復時，程式庫會將分割決定檔案載入在一起。一旦已載入分割區決定，就無法變更分割區。

下列程式碼片段示範如何在 PyTorch 訓練指令碼中設定檢查點 API。

```
import smdistributed.modelparallel.torch as smp

model = ...
model = smp.DistributedModel(model)
optimizer = ...
optimizer = smp.DistributedOptimizer(optimizer)
user_content = ...     # additional custom data
checkpoint_path = "/opt/ml/checkpoint/model_parallel"

# Save a checkpoint.
smp.save_checkpoint(
    path=checkpoint_path,
    tag=f"total_steps{total_steps}",
    partial=True,
    model=model,
    optimizer=optimizer,
    user_content=user_content
    num_kept_partial_checkpoints=5
)

# Load a checkpoint.
# This automatically loads the most recently saved checkpoint.
smp_checkpoint = smp.resume_from_checkpoint(
    path=checkpoint_path, 
    partial=True
)
```

**完整檢查點**

若要儲存最終模型成品以供推論，請使用 `smdistributed.modelparallel.torch.save_checkpoint` API 搭配 `partial=False`，該 API 結合模型分割區以建立單一模型成品。請注意，這不會結合最佳化工具狀態。

若要初始化具有特定權重的訓練，只要提供一個完整模型檢查點，您可以使用 `smdistributed.modelparallel.torch.resume_from_checkpoint` API 搭配 `partial=False`。請注意，這不會載入最佳化工具狀態。

**注意**  
一般而言，使用張量平行處理，`state_dict` 必須在原始模型實作和 `DistributedModel` 實作之間進行轉譯。或者，您可以為 `smdistributed.modelparallel.torch.resume_from_checkpoint` 提供 `state_dict` 平移函式作為引數。但是，對於 [開箱即用的支援模型](model-parallel-extended-features-pytorch-hugging-face.md#model-parallel-extended-features-pytorch-hugging-face-out-of-the-box)，程式庫會自動處理此轉譯。

下列程式碼示範如何使用檢查點 API 對使用模型平行處理訓練的 PyTorch 模型執行完整檢查點。

```
import smdistributed.modelparallel.torch as smp

model = ...
model = smp.DistributedModel(model)
optimizer = ...
optimizer = smp.DistributedOptimizer(optimizer)
user_content = ...     # additional custom data
checkpoint_path = "/opt/ml/checkpoint/model_parallel"

# Save a checkpoint.
smp.save_checkpoint(
    path=checkpoint_path,
    tag=f"total_steps{total_steps}",
    partial=False,
    model=model,
    optimizer=optimizer,
    user_content=user_content
    num_kept_partial_checkpoints=5
)

# Load a checkpoint.
# This automatically loads the most recently saved checkpoint.
smp_checkpoint = smp.resume_from_checkpoint(
    path=checkpoint_path, 
    partial=False
)
```

### 對分散式 PyTorch 模型執行檢查點 (適用於 1.6.0 版和 1.9.0 版之間的 SageMaker 模型平行處理程式庫)
<a name="model-parallel-extended-features-pytorch-saving-loading-checkpoints"></a>

SageMaker 模型平行處理程式庫提供 Python 函式，用於儲存具有張量平行處理的部分或完整訓練任務檢查點。下列程序顯示當您使用張量平行處理時，如何使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.save](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.save) 和 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.load](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.load) 儲存及載入檢查點。

**注意**  
如果您使用 PyTorch、[張量平行處理](model-parallel-extended-features-pytorch-tensor-parallelism.md)，以及 1.6.0 版和 1.9.0 版之間的 SageMaker 模型平行處理程式庫，建議使用此檢查點方法。

1. 準備模型物件，並使用程式庫包裝函式 `smp.DistributedModel()` 包裝。

   ```
   model = MyModel(...)
   model = smp.DistributedModel(model)
   ```

1. 準備模型的最佳化工具。一組模型參數是最佳化工具函式所需的可迭代引數。若要準備一組模型參數，您必須處理 `model.parameters()` 以將不重複的 ID 指派給個別模型參數。

   如果模型參數可迭代中存在具有重複 ID 的參數，則載入檢查點最佳化工具狀態將失敗。若要為最佳化工具建立具有不重複的 ID 的模型參數可迭代，請參閱以下內容：

   ```
   unique_params = []
   unique_params_set = set()
   for p in model.parameters():
     if p not in unique_params_set:
       unique_params.append(p)
       unique_params_set.add(p)
   del unique_params_set
   
   optimizer = MyOpt(unique_params, ...)
   ```

1. 使用程式庫的包裝函式 `smp.DistributedOptimizer()` 包裝最佳化工具。

   ```
   optimizer = smp.DistributedOptimizer(optimizer)
   ```

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.save](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.save) 儲存模型和最佳化工具狀態。根據您想要儲存檢查點的方式，選擇下列兩個選項其中之一：
   + **選項 1：**為單一 `MP_GROUP` 在每個 `mp_rank` 上儲存部分模型。

     ```
     model_dict = model.local_state_dict() # save a partial model
     opt_dict = optimizer.local_state_dict() # save a partial optimizer state
     # Save the dictionaries at rdp_rank 0 as a checkpoint
     if smp.rdp_rank() == 0:
         smp.save(
             {"model_state_dict": model_dict, "optimizer_state_dict": opt_dict},
             f"/checkpoint.pt",
             partial=True,
         )
     ```

     透過張量平行處理，程式庫會儲存以下列格式命名的檢查點檔案：`checkpoint.pt_{pp_rank}_{tp_rank}`。
**注意**  
使用張量平行處理，確保將 if 陳述式設定為 `if smp.rdp_rank() == 0` 而不是 `if smp.dp_rank() == 0`。當最佳化工具狀態以張量平行處理進行分割時，所有縮減的資料平行排名都必須儲存自己的最佳化工具狀態分割。使用錯誤的 *if* 陳述式執行檢查點，可能導致訓練任務停滯。如需在沒有張量平行處理的狀況下使用 `if smp.dp_rank() == 0` 的更多相關資訊，請參閱 *SageMaker Python SDK 文件*中的[儲存和載入的一般指示](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#general-instruction-for-saving-and-loading)。
   + **選項 2：**儲存完整模型。

     ```
     if smp.rdp_rank() == 0:
         model_dict = model.state_dict(gather_to_rank0=True) # save the full model
         if smp.rank() == 0:
             smp.save(
                 {"model_state_dict": model_dict},
                 "/checkpoint.pt",
                 partial=False,
             )
     ```
**注意**  
對於完整檢查點，請考量以下事項：  
如果設定 `gather_to_rank0=True`，除了 `0` 以外的所有排名會傳回空白字典。
對於完整檢查點，您只能對模型執行檢查點。目前不支援最佳化工具狀態的完整檢查點。
完整模型只需要儲存於 `smp.rank() == 0`。

1. 使用 [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.load](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.load) 載入檢查點。根據上一步中執行檢查點的方法，選擇下列兩個選項其中之一：
   + **選項 1：**載入部分檢查點。

     ```
     checkpoint = smp.load("/checkpoint.pt", partial=True)
     model.load_state_dict(checkpoint["model_state_dict"], same_partition_load=False)
     optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
     ```

     如果您知道分割區不會變更，則可以設定 `model.load_state_dict()` 中的 `same_partition_load=True` 以取得更快的載入。
   + **選項 2：**載入完整檢查點。

     ```
     if smp.rdp_rank() == 0:
         checkpoint = smp.load("/checkpoint.pt", partial=False)
         model.load_state_dict(checkpoint["model_state_dict"])
     ```

     `if smp.rdp_rank() == 0` 條件為非必要，但它可以協助避免不同 `MP_GROUP` 之間的多餘載入。張量平行處理目前不支援完整檢查點最佳化工具狀態字典。

### 對分散式 TensorFlow 模型執行檢查點
<a name="distributed-model-parallel-checkpoint-tensorflow"></a>

若要在使用模型平行處理進行訓練時儲存 TensorFlow 模型，請使用 SageMaker 模型平行處理程式庫提供的下列功能。
+ [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_tensorflow.html#smp.DistributedModel.save_model](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_tensorflow.html#smp.DistributedModel.save_model)
+ [https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_tensorflow.html#smp.CheckpointManager](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_tensorflow.html#smp.CheckpointManager)

## 微調分散式模型
<a name="distributed-model-parallel-fine-tuning"></a>

微調需要在訓練指令碼中設定。下列程式碼片段顯示使用 [AutoModelForCausalLM](https://huggingface.co/docs/transformers/main/en/model_doc/auto#transformers.AutoModelForCausalLM) 類別 Hugging Face 轉換器的訓練指令碼的範例結構，其中包含註冊 `smdistributed.model.parallel.torch` 模組和設定以進行微調的修改。

**注意**  
使用已啟動的 [smp.delayed\$1param\$1initialization](https://sagemaker.readthedocs.io/en/v2.199.0/api/training/smp_versions/latest/smd_model_parallel_pytorch.html#smdistributed.modelparallel.torch.delay_param_initialization) 功能對分散式轉換器 (由 `smp.DistributedModel()` 包裝的轉換器模型) 進行微調，需要使用 FSx for Lustre 檔案系統來設定微調任務。如果您想要使用已延遲的參數初始化選項微調大規模模型，則應設定 FSx for Lustre 檔案系統。

```
import argparse
from transformers import AutoModelForCausalLM
import smdistributed.modelparallel
import smdistributed.modelparallel.torch as smp

def parse_args():

    parser = argparse.ArgumentParser()

    # set an arg group for model
    model_grp = parser.add_argument_group(
        title="model", description="arguments to describe model configuration"
    )

    ... # set up numerous args to parse from the configuration dictionary to the script for training

    # add arg for activating fine-tuning
    model_grp.add_argument(
        "--fine_tune",
        type=int,
        default=0,
        help="Fine-tune model from checkpoint or pretrained model",
    )

def main():
    """Main function to train GPT."""
    args = parse_args()

    ... # parse numerous args

    if args.fine_tune > 0 and args.delayed_param > 0 and smp.rank() == 0:
        pretrained_model = AutoModelForCausalLM.from_pretrained(
            args.model_name or args.model_dir
        )
        model_state_dict = pretrained_model.state_dict()
        path = os.path.join(args.model_dir, "fullmodel.pt")
        torch.save(model_state_dict, path)

    # create a Transformer model and wrap by smp.model_creation() 
    # with options to configure model parallelism parameters offered by SageMaker AI
    with smp.model_creation(
        tensor_parallelism=smp.tp_size() > 1 or args.use_distributed_transformer > 0,
        zero_init=args.use_distributed_transformer == 0,
        dtype=dtype,
        distribute_embedding=args.sharded_data_parallel_degree > 1 and smp.tp_size() > 1,
        use_alibi=args.alibi > 0,
        attention_in_fp32=args.attention_in_fp32 > 0,
        fp32_residual_addition=args.residual_addition_in_fp32 > 0,
        query_key_layer_scaling=args.query_key_layer_scaling > 0 and args.bf16 < 1,
        fused_softmax=args.fused_softmax > 0,
        fused_dropout=args.fused_dropout > 0,
        fused_bias_gelu=args.fused_bias_gelu > 0,
        flash_attention=args.flash_attention > 0,
    ):
        if args.fine_tune > 0 and args.delayed_param == 0:
            model = AutoModelForCausalLM.from_pretrained(
                args.model_name or args.model_dir
            )
        else:
            model = AutoModelForCausalLM.from_config(model_config)

    # wrap the model by smp.DistributedModel() to apply SageMaker model parallelism
    model = smp.DistributedModel(
        model, trace_device="gpu", backward_passes_per_step=args.gradient_accumulation
    )

    # wrap the optimizer by smp.DistributedOptimizer() to apply SageMaker model parallelism
    optimizer= ... # define an optimizer
    optimizer = smp.DistributedOptimizer(
        optimizer,
        static_loss_scale=None,
        dynamic_loss_scale=True,
        dynamic_loss_args={"scale_window": 1000, "min_scale": 1, "delayed_shift": 2},
    )

    # for fine-tuning, use smp.resume_from_checkpoint() to load a pre-trained model
    if args.fine_tune > 0 and args.delayed_param > 0:
        smp.resume_from_checkpoint(args.model_dir, tag="fullmodel.pt", partial=False)
```

如需訓練指令碼和 Jupyter 筆記本的完整範例，請參閱 *SageMaker AI 範例 GitHub 儲存庫*中的 [PyTorch GPT-2 範例](https://github.com/aws/amazon-sagemaker-examples/tree/main/training/distributed_training/pytorch/model_parallel/gpt2)。