本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
調用模型以進行即時推論
使用 Amazon SageMaker AI 將模型部署到端點後,您可以傳送推論請求來與模型互動。若要將推論請求傳送至模型,您可以調用負責託管該模型的端點。您可以使用 Amazon SageMaker Studio、AWS SDK 或 AWS CLI 來調用端點。
使用 Amazon SageMaker Studio 調用您的模型
將模型部署至端點後,您可以透過 Amazon SageMaker Studio 檢視端點,並傳送單一推論請求來測試端點。
注意
SageMaker AI 僅支援針對即時端點在 Studio 進行端點測試。
將測試推論請求傳送至您的端點
-
啟動 Amazon SageMaker Studio。
-
在左側導覽窗格中選擇部署。
-
從下拉式清單中,選擇端點。
-
依名稱尋找您的端點,然後在資料表中選擇名稱。端點面板中列出的端點名稱是在部署模型時所定義的。Studio工作區會在新索引標籤中開啟端點頁面。
-
選擇測試推論索引標籤。
-
針對測試選項選取以下任一選項:
-
選取測試範例請求,以立即將請求傳送至端點。使用 JSON 編輯器,以 JSON 格式提供範例資料,然後選擇傳送請求,將請求提交至端點。提交請求之後,Studio 會在 JSON 編輯器右側的卡片中顯示推論輸出。
-
選取使用 Python SDK 範例程式碼,檢視傳送請求至端點的程式碼。接著,從範例推論請求區段複製程式碼範例,並從測試環境執行程式碼。
-
卡片頂端會顯示傳送至端點的請求類型 (僅接受 JSON)。卡片會顯示下列欄位:
狀態 — 會顯示以下其中一項狀態類型:
Success— 請求已成功。Failed— 請求失敗。回應會顯示在 Failure Reason (失敗原因) 底下。Pending— 當推論請求處於待處理時,狀態會顯示旋轉的圓形圖示。
執行長度 — 調用所花費的時間 (結束時間減去開始時間),以毫秒為單位。
請求時間 — 自發送請求以來已過了多少分鐘。
結果時間 — 自傳回結果後已過了多少分鐘。
使用 適用於 Python (Boto3) 的 AWS SDK 調用您的模型
如果您想要在應用程式程式碼中叫用模型端點,您可以使用其中一個 AWSSDKs,包括 適用於 Python (Boto3) 的 AWS SDK。若要使用此 SDK 調用您的端點,請使用下列其中一種 Python 方法:
-
invoke_endpoint— 將推論請求傳送至模型端點,並傳回模型產生的回應。此方法會傳回推論承載,以作為模型完成生成後的一個回應。如需更多資訊,請參閱適用於 Python 的 AWS SDK (Boto3) API 參考中的 invoke_endpoint
。 -
invoke_endpoint_with_response_stream— 將推論請求傳送至模型端點,並在模型產生推論時以累加方式串流回應。透過這種方法,應用程式會在回應的某部分可用時即接收到該回應部分。如需更多資訊,請參閱適用於 Python 的 AWS SDK (Boto3) API 參考中的 invoke_endpoint
。 只能使用此方法來調用支援推論串流的模型。
您必須先啟用 SageMaker AI 執行時期用戶端,並且指定端點名稱,才能在應用程式程式碼中使用這些方法。下列範例會針對隨後其餘範例設定用戶端和端點:
import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'
調用以取得推論回應
下列範例會使用 invoke_endpoint 方法透過 適用於 Python (Boto3) 的 AWS SDK 來調用端點:
# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))
此範例在 Body 欄位中提供輸入資料,供 SageMaker AI 傳遞至模型。此資料的格式必須與用於訓練的格式相同。此範例會將回應指派至 response 變數。
response 變數可讓您存取 HTTP 狀態、已部署模型的名稱以及其他欄位。下列程式碼片段會列印 HTTP 狀態程式碼:
print(response["HTTPStatusCode"])
調用以串流推論回應
如果您部署了支援推論串流的模型,您可以調用模型來接收其推論承載作為串流的部分。模型會在模型產生這些部分時累加地交付這些部分。當應用程式收到推論串流時,應用程式不需要等待模型產生整個回應承載。相反地,應用程式會在可用時接收部分回應。
藉由在應用程式中使用推論串流,您可以建立互動,讓使用者認為推論速度很快,因為他們會立即取得第一部分。您可以實作串流以支援快速互動體驗,例如聊天機器人、虛擬助理和音樂產生器。例如,您可以建立一個聊天機器人,以累加方式顯示由大型語言模型 (LLM) 產生的文字。
若要取得推論串流,您可以使用 invoke_endpoint_with_response_stream 方法。在回應內文中,SDK 提供了一個 EventStream 物件,它會將推論作為一系列 PayloadPart 物件來提供。
範例 推論串流
下列為 PayloadPart 物件串流的範例。
{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .
在每個承載部分中,Bytes 欄位會提供來自模型的推論回應之一部分。此部分可以是模型產生的任何內容類型,例如文字、影像或音訊資料。在此範例中,這些部分是 JSON 物件,其中包含來自 LLM 的生成文字。
通常,承載部分包含來自模型的離散資料區塊。在此範例中,離散區塊是整個 JSON 物件。有時候,回應會將區塊拆分為多個承載部分,或者將多個區塊合併為一個承載部分。下列範例會顯示 JSON 格式的資料區塊,這些資料分割為兩個承載部分:
{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
當您撰寫處理推論串流的應用程式程式碼時,請包含處理這些偶爾分割和組合資料的邏輯。作為一種策略,您可以編寫代碼來串連應用程式接收承載部分的 Bytes 內容。透過在此處串連 JSON 資料範例,您可以將資料合併為以換行符號分隔的 JSON 主體。接著,您的程式碼可透過剖析每行上的整個 JSON 物件來處理串流。
下列範例會顯示當您串連 Bytes 範例內容時所建立的以換行符號分隔之 JSON:
{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
範例 處理推論串流的程式碼
下列範例 Python 類別 (SmrInferenceStream) 示範如何處理以 JSON 格式傳送文字資料的推論串流:
import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]
此範例會執行下列動作來處理推論串流:
-
啟用 SageMaker AI 執行時期用戶端並命名模型端點。取得推論串流之前,端點託管的模型必須支援推論串流。
-
在範例
stream_inference方法中,接收請求內文並將其傳遞給 SDK 的invoke_endpoint_with_response_stream方法。 -
迭代 SDK 傳回的
EventStream物件中之每個事件。 -
從每個事件中,取得
PayloadPart物件中Bytes物件的內容。 -
在範例
_write方法中,寫入緩衝區以串連Bytes物件的內容。合併的內容會形成以換行符號分隔的 JSON 主體。 -
使用範例
_readlines方法來獲取可迭代的一系列 JSON 物件。 -
在每個 JSON 物件中,您都會取得一段推論。
-
使用
yield表達式,以累加方式傳回片段。
下列範例會建立並使用 SmrInferenceStream 物件:
request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')
此範例會將請求內容傳遞給 stream_inference 方法。它會不斷回應以印出推論串流傳回的每個部分。
此範例假設位於指定端點的模型是產生文字的 LLM。此範例的輸出是以累加方式列印的文字產生內文:
a challenging problem in machine learning. The goal is to . . .
使用 AWS CLI 調用您的模型
您可以使用 AWS Command Line Interface() 執行命令來叫用模型端點AWS CLI。AWS CLI 支援使用 invoke-endpoint 命令的標準推論請求,並支援使用 invoke-endpoint-async 命令的非同步推論請求。
注意
AWS CLI不支援串流推論請求。
下列範例會使用 invoke-endpoint 命令,將推論請求傳送至模型端點:
aws sagemaker-runtime invoke-endpoint \ --endpoint-nameendpoint_name\ --bodyfileb://$file_name\output_file.txt
請針對 --endpoint-name 參數提供您在建立端點時為端點指定的名稱。請針對 --body 參數提供要傳遞至模型之 SageMaker AI 的輸入資料。資料的格式必須與用於訓練的格式相同。此範例顯示如何將二進位資料傳送至您的端點。
如需將檔案內容傳遞至 參數fileb://時何時使用 file:// 的詳細資訊AWS CLI,請參閱本機檔案參數的最佳實務
若要取得更多資訊,並查看您可以傳遞的其他參數,請參閱 AWS CLI 命令推論中的 invoke-endpoint。
如果 invoke-endpoint 命令成功,它會傳回類似以下的回應:
{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }
如果命令未成功,請檢查輸入承載的格式是否正確。
檢查檔案輸出檔案來檢視調用的輸出 (在此範例中為 output_file.txt)。
more output_file.txt
使用適用於 Python 的 AWSSDK 叫用您的模型
叫用 以雙向串流推論請求和回應
如果您想要在應用程式程式碼中調用模型端點以支援雙向串流,您可以使用新的適用於 Python 的實驗性 SDK
注意
新的實驗性 SDK 與標準 Boto3 SDK 不同,並支援資料交換的持久性雙向連線。使用實驗性 Python 開發套件時,我們強烈建議對任何非實驗性使用案例嚴格鎖定開發套件版本。
若要使用雙向串流叫用您的端點,請使用 invoke_endpoint_with_bidirectional_stream方法。此方法會建立持久性連線,可讓您將多個承載區塊串流至模型,同時在模型處理資料時即時接收回應。在您明確關閉輸入串流或端點關閉連線之前,連線會保持開啟狀態,最多支援 30 分鐘的連線時間。
先決條件
在應用程式程式碼中使用雙向串流之前,您必須:
-
安裝實驗性 SageMaker 執行期 HTTP/2 SDK
-
為您的 SageMaker 執行期用戶端設定AWS登入資料
-
部署支援雙向串流到 SageMaker 端點的模型
設定雙向串流用戶端
下列範例示範如何初始化雙向串流所需的元件:
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)
完成雙向串流用戶端
下列範例示範如何建立雙向串流用戶端,將多個文字承載傳送至 SageMaker 端點,並即時處理回應:
import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())
用戶端會使用連接埠 8443 上的區域端點 URI 來初始化 SageMaker 執行期 HTTP/2 用戶端,這是雙向串流連線所需的。start_session() 方法會呼叫 invoke_endpoint_with_bidirectional_stream() 來建立持久性連線,並建立非同步任務來同時處理傳入的回應。
send_event() 方法會在適當的請求物件中包裝承載資料,並透過輸入串流傳送資料,同時_process_responses()方法會在端點到達時持續接聽和處理來自端點的回應。這種雙向方法可啟用即時互動,其中傳送請求和接收回應都會同時透過相同的連線進行。