

# Deploy large models for inference with TorchServe
<a name="large-model-inference-tutorials-torchserve"></a>

This tutorial demonstrates how to deploy large models and serve inference in Amazon SageMaker AI with TorchServe on GPUs. This example deploys the [OPT-30b](https://huggingface.co/facebook/opt-30b) model to an `ml.g5` instance. You can modify this to work with other models and instance types. Replace the `italicized placeholder text` in the examples with your own information.

TorchServe is a powerful open platform for large distributed model inference. By supporting popular libraries like PyTorch, native PiPPy, DeepSpeed, and HuggingFace Accelerate, it offers uniform handler APIs that remain consistent across distributed large model and non-distributed model inference scenarios. For more information, see [TorchServe’s large model inference documentation](https://pytorch.org/serve/large_model_inference.html#).

## Deep learning containers with TorchServe
<a name="large-model-inference-tutorials-torchserve-dlcs"></a>

To deploy a large model with TorchServe on SageMaker AI, you can use one of the SageMaker AI deep learning containers (DLCs). By default, TorchServe is installed in all AWS PyTorch DLCs. During model loading, TorchServe can install specialized libraries tailored for large models such as PiPPy, Deepspeed, and Accelerate.

The following table lists all of the [SageMaker AI DLCs with TorchServe](https://github.com/aws/deep-learning-containers/blob/master/available_images.md#sagemaker-framework-containers-sm-support-only).


| DLC cateogry | Framework | Hardware | Example URL | 
| --- | --- | --- | --- | 
| [SageMaker AI Framework Containers](https://github.com/aws/deep-learning-containers/blob/master/available_images.md#sagemaker-framework-containers-sm-support-only) |  PyTorch 2.0.0\$1  | CPU, GPU | 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.1-gpu-py310-cu118-ubuntu20.04-sagemaker | 
| [SageMaker AI Framework Graviton Containers](https://github.com/aws/deep-learning-containers/blob/master/available_images.md#sagemaker-framework-graviton-containers-sm-support-only) |  PyTorch 2.0.0\$1  | CPU | 763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference-graviton:2.0.1-cpu-py310-ubuntu20.04-sagemaker | 
| [StabilityAI Inference Containers](https://github.com/aws/deep-learning-containers/blob/master/available_images.md#stabilityai-inference-containers) |  PyTorch 2.0.0\$1  | GPU | 763104351884.dkr.ecr.us-east-1.amazonaws.com/stabilityai-pytorch-inference:2.0.1-sgm0.1.0-gpu-py310-cu118-ubuntu20.04-sagemaker | 
| [Neuron Containers](https://github.com/aws/deep-learning-containers/blob/master/available_images.md#neuron-containers) | PyTorch 1.13.1 | Neuronx | 763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference-neuron:1.13.1-neuron-py310-sdk2.12.0-ubuntu20.04 | 

## Getting started
<a name="large-model-inference-tutorials-torchserve-getting-started"></a>

Before deploying your model, complete the prerequisites. You can also configure your model parameters and customize the handler code.

### Prerequisites
<a name="large-model-inference-tutorials-torchserve-getting-started-prereqs"></a>

To get started, ensure that you have the following prerequisites:

1. Ensure you have access to an AWS account. [Set up your environment](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html) so that the AWS CLI can access your account through either an AWS IAM user or an IAM role. We recommend using an IAM role. For the purposes of testing in your personal account, you can attach the following managed permissions policies to the IAM role:
   + [AmazonEC2ContainerRegistryFullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryFullAccess)
   + [AmazonEC2FullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonEC2FullAccess)
   + [AWSServiceRoleForAmazonEKSNodegroup](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AWSServiceRoleForAmazonEKSNodegroup)
   + [AmazonSageMakerFullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonSageMakerFullAccess)
   + [AmazonS3FullAccess](https://console.aws.amazon.com/iam/home#policies/arn:aws:iam::aws:policy/AmazonS3FullAccess)

   For more information about attaching IAM policies to a role, see [Adding and removing IAM identity permissions](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) in the *AWS IAM User Guide*.

1. Locally configure your dependencies, as shown in the following examples.

   1. Install version 2 of the AWS CLI:

      ```
      # Install the latest AWS CLI v2 if it is not installed
      !curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" !unzip awscliv2.zip
      #Follow the instructions to install v2 on the terminal
      !cat aws/README.md
      ```

   1. Install SageMaker AI and the Boto3 client:

      ```
      # If already installed, update your client
      #%pip install sagemaker pip --upgrade --quiet
      !pip install -U sagemaker
      !pip install -U boto
      !pip install -U botocore
      !pip install -U boto3
      ```

### Configure model settings and parameters
<a name="large-model-inference-tutorials-torchserve-getting-started-config"></a>

TorchServe uses [https://pytorch.org/docs/stable/elastic/run.html](https://pytorch.org/docs/stable/elastic/run.html) to set up the distributed environment for model parallel processing. TorchServe has the capability to support multiple workers for a large model. By default, TorchServe uses a round-robin algorithm to assign GPUs to a worker on a host. In the case of large model inference, the number of GPUs assigned to each worker is automatically calculated based on the number of GPUs specified in the `model_config.yaml` file. The environment variable `CUDA_VISIBLE_DEVICES`, which specifies the GPU device IDs that are visible at a given time, is set based this number.

For example, suppose there are 8 GPUs on a node and one worker needs 4 GPUs on a node (`nproc_per_node=4`). In this case, TorchServe assigns four GPUs to the first worker (`CUDA_VISIBLE_DEVICES="0,1,2,3"`) and four GPUs to the second worker (`CUDA_VISIBLE_DEVICES="4,5,6,7”`).

In addition to this default behavior, TorchServe provides the flexibility for users to specify GPUs for a worker. For instance, if you set the variable `deviceIds: [2,3,4,5]` in the [model config YAML file](https://github.com/pytorch/serve/blob/5ee02e4f050c9b349025d87405b246e970ee710b/model-archiver/README.md?plain=1#L164), and set `nproc_per_node=2`, then TorchServe assigns `CUDA_VISIBLE_DEVICES=”2,3”` to the first worker and `CUDA_VISIBLE_DEVICES="4,5”` to the second worker.

In the following `model_config.yaml` example, we configure both front-end and back-end parameters for the [OPT-30b ](https://huggingface.co/facebook/opt-30b) model. The configured front-end parameters are `parallelType`, `deviceType`, `deviceIds `and `torchrun`. For more detailed information about the front-end parameters you can configure, see the [PyTorch GitHub documentation](https://github.com/pytorch/serve/blob/2bf505bae3046b0f7d0900727ec36e611bb5dca3/docs/configuration.md?plain=1#L267). The back-end configuration is based on a YAML map that allows for free-style customization. For the back-end parameters, we define the DeepSpeed configuration and additional parameters used by custom handler code.

```
# TorchServe front-end parameters
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 1200
parallelType: "tp"
deviceType: "gpu"
# example of user specified GPU deviceIds
deviceIds: [0,1,2,3] # sets CUDA_VISIBLE_DEVICES

torchrun:
    nproc-per-node: 4

# TorchServe back-end parameters
deepspeed:
    config: ds-config.json
    checkpoint: checkpoints.json

handler: # parameters for custom handler code
    model_name: "facebook/opt-30b"
    model_path: "model/models--facebook--opt-30b/snapshots/ceea0a90ac0f6fae7c2c34bcb40477438c152546"
    max_length: 50
    max_new_tokens: 10
    manual_seed: 40
```

### Customize handlers
<a name="large-model-inference-tutorials-torchserve-getting-started-handlers"></a>

TorchServe offers[ base handlers](https://github.com/pytorch/serve/tree/master/ts/torch_handler/distributed) and [handler utilities](https://github.com/pytorch/serve/tree/master/ts/handler_utils) for large model inference built with popular libraries. The following example demonstrates how the custom handler class [TransformersSeqClassifierHandler](https://github.com/pytorch/serve/blob/ab69b69a59d6ca6074df7e6d4014f07eb48dedba/examples/large_models/deepspeed/custom_handler.py#L16C7-L16C39) extends [BaseDeepSpeedHandler](https://github.com/pytorch/serve/blob/ab69b69a59d6ca6074df7e6d4014f07eb48dedba/ts/torch_handler/distributed/base_deepspeed_handler.py#L8) and uses the [handler utilities](https://github.com/pytorch/serve/blob/master/ts/handler_utils/distributed/deepspeed.py). For a full code example, see the [`custom_handler.py` code on the PyTorch GitHub documentation](https://github.com/pytorch/serve/blob/master/examples/large_models/deepspeed/custom_handler.py).

```
class TransformersSeqClassifierHandler(BaseDeepSpeedHandler, ABC):
    """
    Transformers handler class for sequence, token classification and question answering.
    """

    def __init__(self):
        super(TransformersSeqClassifierHandler, self).__init__()
        self.max_length = None
        self.max_new_tokens = None
        self.tokenizer = None
        self.initialized = False

    def initialize(self, ctx: Context):
        """In this initialize function, the HF large model is loaded and
        partitioned using DeepSpeed.
        Args:
            ctx (context): It is a JSON Object containing information
            pertaining to the model artifacts parameters.
        """
        super().initialize(ctx)
        model_dir = ctx.system_properties.get("model_dir")
        self.max_length = int(ctx.model_yaml_config["handler"]["max_length"])
        self.max_new_tokens = int(ctx.model_yaml_config["handler"]["max_new_tokens"])
        model_name = ctx.model_yaml_config["handler"]["model_name"]
        model_path = ctx.model_yaml_config["handler"]["model_path"]
        seed = int(ctx.model_yaml_config["handler"]["manual_seed"])
        torch.manual_seed(seed)

        logger.info("Model %s loading tokenizer", ctx.model_name)

        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.tokenizer.pad_token = self.tokenizer.eos_token
        config = AutoConfig.from_pretrained(model_name)
        with torch.device("meta"):
            self.model = AutoModelForCausalLM.from_config(
                config, torch_dtype=torch.float16
            )
        self.model = self.model.eval()

        ds_engine = get_ds_engine(self.model, ctx)
        self.model = ds_engine.module
        logger.info("Model %s loaded successfully", ctx.model_name)
        self.initialized = True

    def preprocess(self, requests):
        """
        Basic text preprocessing, based on the user's choice of application mode.
        Args:
            requests (list): A list of dictionaries with a "data" or "body" field, each
                            containing the input text to be processed.
        Returns:
            tuple: A tuple with two tensors: the batch of input ids and the batch of
                attention masks.
        """

    def inference(self, input_batch):
        """
        Predicts the class (or classes) of the received text using the serialized transformers
        checkpoint.
        Args:
            input_batch (tuple): A tuple with two tensors: the batch of input ids and the batch
                                of attention masks, as returned by the preprocess function.
        Returns:
            list: A list of strings with the predicted values for each input text in the batch.
        """
        
    def postprocess(self, inference_output):
        """Post Process Function converts the predicted response into Torchserve readable format.
        Args:
            inference_output (list): It contains the predicted response of the input text.
        Returns:
            (list): Returns a list of the Predictions and Explanations.
        """
```

## Prepare your model artifacts
<a name="large-model-inference-tutorials-torchserve-artifacts"></a>

Before deploying your model on SageMaker AI, you must package your model artifacts. For large models, we recommend that you use the PyTorch [torch-model-archiver](https://github.com/pytorch/serve/blob/master/model-archiver/README.md) tool with the argument `--archive-format no-archive`, which skips compressing model artifacts. The following example saves all of the model artifacts to a new folder named `opt/`.

```
torch-model-archiver --model-name opt --version 1.0 --handler custom_handler.py --extra-files ds-config.json -r requirements.txt --config-file opt/model-config.yaml --archive-format no-archive
```

Once the `opt/` folder is created, download the OPT-30b model to the folder using the PyTorch [Download\$1model](https://github.com/pytorch/serve/blob/master/examples/large_models/utils/Download_model.py) tool.

```
cd opt
python path_to/Download_model.py --model_path model --model_name facebook/opt-30b --revision main
```

Lastly, upload the model artifacts to an Amazon S3 bucket. 

```
aws s3 cp opt {your_s3_bucket}/opt --recursive
```

You should now have model artifacts stored in Amazon S3 that are ready to deploy to a SageMaker AI endpoint.

## Deploy the model using the SageMaker Python SDK
<a name="large-model-inference-tutorials-torchserve-deploy"></a>

After preparing your model artifacts, you can deploy your model to a SageMaker AI Hosting endpoint. This section describes how to deploy a single large model to an endpoint and make streaming response predictions. For more information about streaming responses from endpoints, see [Invoke real-time endpoints](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints-test-endpoints.html).

To deploy your model, complete the following steps:

1. Create a SageMaker AI session, as shown in the following example.

   ```
   import boto3
   import sagemaker
   from sagemaker import Model, image_uris, serializers, deserializers
   
   boto3_session=boto3.session.Session(region_name="us-west-2")
   smr = boto3.client('sagemaker-runtime-demo')
   sm = boto3.client('sagemaker')
   role = sagemaker.get_execution_role()  # execution role for the endpoint
   sess= sagemaker.session.Session(boto3_session, sagemaker_client=sm, sagemaker_runtime_client=smr)  # SageMaker AI session for interacting with different AWS APIs
   region = sess._region_name  # region name of the current SageMaker Studio Classic environment
   account = sess.account_id()  # account_id of the current SageMaker Studio Classic environment
   
   # Configuration:
   bucket_name = sess.default_bucket()
   prefix = "torchserve"
   output_path = f"s3://{bucket_name}/{prefix}"
   print(f'account={account}, region={region}, role={role}, output_path={output_path}')
   ```

1. Create an uncompressed model in SageMaker AI, as shown in the following example.

   ```
   from datetime import datetime
   
   instance_type = "ml.g5.24xlarge"
   endpoint_name = sagemaker.utils.name_from_base("ts-opt-30b")
   s3_uri = {your_s3_bucket}/opt
   
   model = Model(
       name="torchserve-opt-30b" + datetime.now().strftime("%Y-%m-%d-%H-%M-%S"),
       # Enable SageMaker uncompressed model artifacts
       model_data={
           "S3DataSource": {
                   "S3Uri": s3_uri,
                   "S3DataType": "S3Prefix",
                   "CompressionType": "None",
           }
       },
       image_uri=container,
       role=role,
       sagemaker_session=sess,
       env={"TS_INSTALL_PY_DEP_PER_MODEL": "true"},
   )
   print(model)
   ```

1. Deploy the model to an Amazon EC2 instance, as shown in the following example.

   ```
   model.deploy(
       initial_instance_count=1,
       instance_type=instance_type,
       endpoint_name=endpoint_name,
       volume_size=512, # increase the size to store large model
       model_data_download_timeout=3600, # increase the timeout to download large model
       container_startup_health_check_timeout=600, # increase the timeout to load large model
   )
   ```

1. Initialize a class to process the streaming response, as shown in the following example.

   ```
   import io
   
   class Parser:
       """
       A helper class for parsing the byte stream input. 
       
       The output of the model will be in the following format:
       ```
       b'{"outputs": [" a"]}\n'
       b'{"outputs": [" challenging"]}\n'
       b'{"outputs": [" problem"]}\n'
       ...
       ```
       
       While usually each PayloadPart event from the event stream will contain a byte array 
       with a full json, this is not guaranteed and some of the json objects may be split across
       PayloadPart events. For example:
       ```
       {'PayloadPart': {'Bytes': b'{"outputs": '}}
       {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
       ```
       
       This class accounts for this by concatenating bytes written via the 'write' function
       and then exposing a method which will return lines (ending with a '\n' character) within
       the buffer via the 'scan_lines' function. It maintains the position of the last read 
       position to ensure that previous bytes are not exposed again. 
       """
       
       def __init__(self):
           self.buff = io.BytesIO()
           self.read_pos = 0
           
       def write(self, content):
           self.buff.seek(0, io.SEEK_END)
           self.buff.write(content)
           data = self.buff.getvalue()
           
       def scan_lines(self):
           self.buff.seek(self.read_pos)
           for line in self.buff.readlines():
               if line[-1] != b'\n':
                   self.read_pos += len(line)
                   yield line[:-1]
                   
       def reset(self):
           self.read_pos = 0
   ```

1. Test a streaming response prediction, as shown in the following example.

   ```
   import json
   
   body = "Today the weather is really nice and I am planning on".encode('utf-8')
   resp = smr.invoke_endpoint_with_response_stream(EndpointName=endpoint_name, Body=body, ContentType="application/json")
   event_stream = resp['Body']
   parser = Parser()
   for event in event_stream:
       parser.write(event['PayloadPart']['Bytes'])
       for line in parser.scan_lines():
           print(line.decode("utf-8"), end=' ')
   ```

You have now deployed your model to a SageMaker AI endpoint and should be able to invoke it for responses. For more information about SageMaker AI real-time endpoints, see [Single-model endpoints](realtime-single-model.md).