調用模型以進行即時推論 - Amazon SageMaker AI

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

調用模型以進行即時推論

使用 Amazon SageMaker AI 將模型部署到端點後,您可以傳送推論請求來與模型互動。若要將推論請求傳送至模型,您可以調用負責託管該模型的端點。您可以使用 Amazon SageMaker Studio、AWS SDK 或 AWS CLI 來調用端點。

使用 Amazon SageMaker Studio 調用您的模型

將模型部署至端點後,您可以透過 Amazon SageMaker Studio 檢視端點,並傳送單一推論請求來測試端點。

注意

SageMaker AI 僅支援針對即時端點在 Studio 進行端點測試。

將測試推論請求傳送至您的端點
  1. 啟動 Amazon SageMaker Studio。

  2. 在左側導覽窗格中選擇部署

  3. 從下拉式清單中,選擇端點

  4. 依名稱尋找您的端點,然後在資料表中選擇名稱。端點面板中列出的端點名稱是在部署模型時所定義的。Studio工作區會在新索引標籤中開啟端點頁面。

  5. 選擇測試推論索引標籤。

  6. 針對測試選項選取以下任一選項:

    1. 選取測試範例請求,以立即將請求傳送至端點。使用 JSON 編輯器,以 JSON 格式提供範例資料,然後選擇傳送請求,將請求提交至端點。提交請求之後,Studio 會在 JSON 編輯器右側的卡片中顯示推論輸出。

    2. 選取使用 Python SDK 範例程式碼,檢視傳送請求至端點的程式碼。接著,從範例推論請求區段複製程式碼範例,並從測試環境執行程式碼。

卡片頂端會顯示傳送至端點的請求類型 (僅接受 JSON)。卡片會顯示下列欄位:

  • 狀態 — 會顯示以下其中一項狀態類型:

    • Success — 請求已成功。

    • Failed — 請求失敗。回應會顯示在 Failure Reason (失敗原因) 底下。

    • Pending — 當推論請求處於待處理時,狀態會顯示旋轉的圓形圖示。

  • 執行長度 — 調用所花費的時間 (結束時間減去開始時間),以毫秒為單位。

  • 請求時間 — 自發送請求以來已過了多少分鐘。

  • 結果時間 — 自傳回結果後已過了多少分鐘。

使用 適用於 Python (Boto3) 的 AWS SDK 調用您的模型

如果您想要在應用程式程式碼中叫用模型端點,您可以使用其中一個 AWSSDKs,包括 適用於 Python (Boto3) 的 AWS SDK。若要使用此 SDK 調用您的端點,請使用下列其中一種 Python 方法:

  • invoke_endpoint — 將推論請求傳送至模型端點,並傳回模型產生的回應。

    此方法會傳回推論承載,以作為模型完成生成後的一個回應。如需更多資訊,請參閱適用於 Python 的 AWS SDK (Boto3) API 參考中的 invoke_endpoint

  • invoke_endpoint_with_response_stream — 將推論請求傳送至模型端點,並在模型產生推論時以累加方式串流回應。

    透過這種方法,應用程式會在回應的某部分可用時即接收到該回應部分。如需更多資訊,請參閱適用於 Python 的 AWS SDK (Boto3) API 參考中的 invoke_endpoint

    只能使用此方法來調用支援推論串流的模型。

您必須先啟用 SageMaker AI 執行時期用戶端,並且指定端點名稱,才能在應用程式程式碼中使用這些方法。下列範例會針對隨後其餘範例設定用戶端和端點:

import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'

調用以取得推論回應

下列範例會使用 invoke_endpoint 方法透過 適用於 Python (Boto3) 的 AWS SDK 來調用端點:

# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))

此範例在 Body 欄位中提供輸入資料,供 SageMaker AI 傳遞至模型。此資料的格式必須與用於訓練的格式相同。此範例會將回應指派至 response 變數。

response 變數可讓您存取 HTTP 狀態、已部署模型的名稱以及其他欄位。下列程式碼片段會列印 HTTP 狀態程式碼:

print(response["HTTPStatusCode"])

調用以串流推論回應

如果您部署了支援推論串流的模型,您可以調用模型來接收其推論承載作為串流的部分。模型會在模型產生這些部分時累加地交付這些部分。當應用程式收到推論串流時,應用程式不需要等待模型產生整個回應承載。相反地,應用程式會在可用時接收部分回應。

藉由在應用程式中使用推論串流,您可以建立互動,讓使用者認為推論速度很快,因為他們會立即取得第一部分。您可以實作串流以支援快速互動體驗,例如聊天機器人、虛擬助理和音樂產生器。例如,您可以建立一個聊天機器人,以累加方式顯示由大型語言模型 (LLM) 產生的文字。

若要取得推論串流,您可以使用 invoke_endpoint_with_response_stream 方法。在回應內文中,SDK 提供了一個 EventStream 物件,它會將推論作為一系列 PayloadPart 物件來提供。

範例 推論串流

下列為 PayloadPart 物件串流的範例。

{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .

在每個承載部分中,Bytes 欄位會提供來自模型的推論回應之一部分。此部分可以是模型產生的任何內容類型,例如文字、影像或音訊資料。在此範例中,這些部分是 JSON 物件,其中包含來自 LLM 的生成文字。

通常,承載部分包含來自模型的離散資料區塊。在此範例中,離散區塊是整個 JSON 物件。有時候,回應會將區塊拆分為多個承載部分,或者將多個區塊合併為一個承載部分。下列範例會顯示 JSON 格式的資料區塊,這些資料分割為兩個承載部分:

{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}

當您撰寫處理推論串流的應用程式程式碼時,請包含處理這些偶爾分割和組合資料的邏輯。作為一種策略,您可以編寫代碼來串連應用程式接收承載部分的 Bytes 內容。透過在此處串連 JSON 資料範例,您可以將資料合併為以換行符號分隔的 JSON 主體。接著,您的程式碼可透過剖析每行上的整個 JSON 物件來處理串流。

下列範例會顯示當您串連 Bytes 範例內容時所建立的以換行符號分隔之 JSON:

{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
範例 處理推論串流的程式碼

下列範例 Python 類別 (SmrInferenceStream) 示範如何處理以 JSON 格式傳送文字資料的推論串流:

import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]

此範例會執行下列動作來處理推論串流:

  • 啟用 SageMaker AI 執行時期用戶端並命名模型端點。取得推論串流之前,端點託管的模型必須支援推論串流。

  • 在範例 stream_inference 方法中,接收請求內文並將其傳遞給 SDK 的 invoke_endpoint_with_response_stream 方法。

  • 迭代 SDK 傳回的 EventStream 物件中之每個事件。

  • 從每個事件中,取得 PayloadPart 物件中 Bytes 物件的內容。

  • 在範例 _write 方法中,寫入緩衝區以串連 Bytes 物件的內容。合併的內容會形成以換行符號分隔的 JSON 主體。

  • 使用範例 _readlines 方法來獲取可迭代的一系列 JSON 物件。

  • 在每個 JSON 物件中,您都會取得一段推論。

  • 使用 yield 表達式,以累加方式傳回片段。

下列範例會建立並使用 SmrInferenceStream 物件:

request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')

此範例會將請求內容傳遞給 stream_inference 方法。它會不斷回應以印出推論串流傳回的每個部分。

此範例假設位於指定端點的模型是產生文字的 LLM。此範例的輸出是以累加方式列印的文字產生內文:

a challenging problem in machine learning. The goal is to . . .

使用 AWS CLI 調用您的模型

您可以使用 AWS Command Line Interface() 執行命令來叫用模型端點AWS CLI。AWS CLI 支援使用 invoke-endpoint 命令的標準推論請求,並支援使用 invoke-endpoint-async 命令的非同步推論請求。

注意

AWS CLI不支援串流推論請求。

下列範例會使用 invoke-endpoint 命令,將推論請求傳送至模型端點:

aws sagemaker-runtime invoke-endpoint \ --endpoint-name endpoint_name \ --body fileb://$file_name \ output_file.txt

請針對 --endpoint-name 參數提供您在建立端點時為端點指定的名稱。請針對 --body 參數提供要傳遞至模型之 SageMaker AI 的輸入資料。資料的格式必須與用於訓練的格式相同。此範例顯示如何將二進位資料傳送至您的端點。

如需將檔案內容傳遞至 參數fileb://時何時使用 file:// 的詳細資訊AWS CLI,請參閱本機檔案參數的最佳實務

若要取得更多資訊,並查看您可以傳遞的其他參數,請參閱 AWS CLI 命令推論中的 invoke-endpoint

如果 invoke-endpoint 命令成功,它會傳回類似以下的回應:

{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }

如果命令未成功,請檢查輸入承載的格式是否正確。

檢查檔案輸出檔案來檢視調用的輸出 (在此範例中為 output_file.txt)。

more output_file.txt

使用適用於 Python 的 AWSSDK 叫用您的模型

叫用 以雙向串流推論請求和回應

如果您想要在應用程式程式碼中調用模型端點以支援雙向串流,您可以使用新的適用於 Python 的實驗性 SDK,其支援雙向串流功能,並支援 HTTP/2。此 SDK 可讓您在用戶端應用程式與 SageMaker 端點之間進行即時雙向通訊,讓您可以在模型產生推論請求的同時接收串流回應。這對於用戶端和伺服器都需要透過持久性連線持續交換資料的互動式應用程式特別有用。

注意

新的實驗性 SDK 與標準 Boto3 SDK 不同,並支援資料交換的持久性雙向連線。使用實驗性 Python 開發套件時,我們強烈建議對任何非實驗性使用案例嚴格鎖定開發套件版本。

若要使用雙向串流叫用您的端點,請使用 invoke_endpoint_with_bidirectional_stream方法。此方法會建立持久性連線,可讓您將多個承載區塊串流至模型,同時在模型處理資料時即時接收回應。在您明確關閉輸入串流或端點關閉連線之前,連線會保持開啟狀態,最多支援 30 分鐘的連線時間。

先決條件

在應用程式程式碼中使用雙向串流之前,您必須:

  1. 安裝實驗性 SageMaker 執行期 HTTP/2 SDK

  2. 為您的 SageMaker 執行期用戶端設定AWS登入資料

  3. 部署支援雙向串流到 SageMaker 端點的模型

設定雙向串流用戶端

下列範例示範如何初始化雙向串流所需的元件:

from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)

完成雙向串流用戶端

下列範例示範如何建立雙向串流用戶端,將多個文字承載傳送至 SageMaker 端點,並即時處理回應:

import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())

用戶端會使用連接埠 8443 上的區域端點 URI 來初始化 SageMaker 執行期 HTTP/2 用戶端,這是雙向串流連線所需的。start_session() 方法會呼叫 invoke_endpoint_with_bidirectional_stream() 來建立持久性連線,並建立非同步任務來同時處理傳入的回應。

send_event() 方法會在適當的請求物件中包裝承載資料,並透過輸入串流傳送資料,同時_process_responses()方法會在端點到達時持續接聽和處理來自端點的回應。這種雙向方法可啟用即時互動,其中傳送請求和接收回應都會同時透過相同的連線進行。