本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
调用模型进行实时推理
使用 Amazon SageMaker AI 将模型部署到终端节点后,您可以通过向模型发送推理请求来与模型进行交互。要向模型发送推理请求,您需要调用承载该模型的端点。您可以使用 Amazon SageMaker Studio AWS SDKs、或调用您的终端节点AWS CLI。
使用 Amazon SageMaker Studio 调用您的模型
将模型部署到终端节点后,您可以通过 Amazon SageMaker Studio 查看终端节点,并通过发送单个推理请求来测试您的终端节点。
注意
SageMaker AI 仅支持在 Studio 中对实时端点进行端点测试。
向端点发送测试推理请求
-
启动 Amazon SageMaker Studio。
-
在左侧导航窗格中,选择部署。
-
从下拉菜单中选择端点。
-
按名称查找端点,然后在表中选择名称。端点面板中列出的端点名称是在部署模型时定义的。Studio 工作区将在新选项卡中打开端点页面。
-
选择测试推理选项卡。
-
在测试选项中,选择以下选项之一:
-
选择测试示例请求,立即向端点发送请求。使用 JSON 编辑器提供 JSON 格式的示例数据,然后选择发送请求向端点提交请求。提交请求后,Studio 会在 JSON 编辑器右侧的卡片中显示推理输出。
-
选择使用 Python SDK 示例代码,查看向端点发送请求的代码。然后,从推理请求示例部分复制代码示例,并在测试环境中运行代码。
-
卡片顶部显示了发送到端点的请求类型(仅接受 JSON)。卡片中显示了以下字段:
状态 – 显示以下状态类型之一:
Success– 请求成功。Failed– 请求失败。响应显示在失败原因下方。Pending– 当推理请求处于待处理状态时,状态会显示一个旋转的圆形图标。
执行时长 – 调用耗费的时间(结束时间减去开始时间),以毫秒为单位。
请求时间 – 自发送请求以来过去的分钟数。
结果时间 – 自返回结果以来过去的分钟数。
使用 适用于 Python (Boto3) 的 AWS SDK 调用模型
如果要在应用程序代码中调用模型端点,则可以使用其中之一 AWSSDKs,包括适用于 Python (Boto3) 的 AWS SDK。使用该 SDK 调用端点时,您需要使用以下 Python 方法之一:
-
invoke_endpoint:向模型端点发送推理请求,并返回模型生成的响应。在模型完成生成推理负载后,此方法将其作为一个响应返回。有关更多信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 invoke_endpoint
。 -
invoke_endpoint_with_response_stream:向模型端点发送推理请求,并在模型生成响应时以增量方式流式传输响应。使用这种方法,您的应用程序会在响应部分可用时立即收到这些部分。有关更多信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 invoke_endpoint
。 此方法仅用于调用支持推理流的模型。
在应用程序代码中使用这些方法之前,必须初始化 A SageMaker I Runtime 客户端,并且必须指定终端节点的名称。以下示例为接下来的示例设置了客户端和端点:
import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'
调用以获取推理响应
以下示例使用 invoke_endpoint 方法,通过 适用于 Python (Boto3) 的 AWS SDK 调用端点:
# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))
此示例提供Body字段中的输入数据, SageMaker 让 AI 传递给模型。此数据的格式必须与用于训练的数据格式相同。示例将响应赋值给 response 变量。
response 变量提供了对 HTTP 状态、已部署模型的名称以及其他字段的访问。以下代码段将打印 HTTP 状态代码:
print(response["HTTPStatusCode"])
调用以流式处理推理响应
如果您部署了支持推理流的模型,则可调用该模型以流的形式接收其推理负载部分。模型在生成推理响应时,会以增量方式交付这些部分。应用程序在接收推理流时,无需等待模型生成整个响应负载。取而代之的是,当响应的部分内容可用时,应用程序会立即收到。
通过在应用程序中使用推理流,您可以创建交互,在交互中用户会认为推理速度很快,因为他们能立即获得第一部分。您可以实施流式处理以支持快速的交互式体验,例如聊天机器人、虚拟助手和音乐生成器。例如,您可以创建一个聊天机器人,以增量方式显示大型语言模型 (LLM) 生成的文本。
要获取推理流,您可以使用 invoke_endpoint_with_response_stream 方法。在响应正文中,SDK 提供了 EventStream 对象,该对象以一系列 PayloadPart 对象的形式给出推理。
例 推理流
以下示例是 PayloadPart 对象流:
{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .
在每个负载部分中,Bytes 字段提供模型推理响应的一部分。此部分可以是模型生成的任何内容类型,如文本、图像或音频数据。在此示例中,这些部分是 JSON 对象,其中包含 LLM 生成的文本。
通常,负载部分包含来自模型的离散数据块。在本示例中,离散块是整个 JSON 对象。有时,流媒式响应会将数据块分成多个负载部分,或者将多个数据块组合成一个负载部分。以下示例显示了一个 JSON 格式的数据块,该数据块分为两个负载部分:
{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
在编写处理推理流的应用程序代码时,应包括处理这些偶尔的数据拆分和组合的逻辑。作为一种策略,您可以编写代码,在应用程序接收负载部分的同时,连接 Bytes 的内容。通过连接此处的示例 JSON 数据,可以将这些数据组合成一个以换行符分隔的 JSON 正文。然后,您的代码可以通过解析每行上的整个 JSON 对象来处理流。
以下示例显示了您在连接 Bytes 的以下示例内容时,创建的以换行符分隔的 JSON:
{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
例 用于处理推理流的代码
以下示例 Python 类 SmrInferenceStream 演示了如何处理以 JSON 格式发送文本数据的推理流:
import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]
此示例通过执行以下操作处理推理流:
-
初始化 A SageMaker I 运行时客户端并设置模型端点的名称。在获得推理流之前,端点托管的模型必须支持推理流。
-
在示例
stream_inference方法中,接收请求正文并将其传递给 SDK 的invoke_endpoint_with_response_stream方法。 -
遍历 SDK 返回的
EventStream对象中的每个事件。 -
从每个事件中获取
PayloadPart对象中Bytes对象的内容。 -
在示例
_write方法中,写入缓冲区以连接Bytes对象的内容。组合后的内容构成以换行符分隔的 JSON 正文。 -
使用示例
_readlines方法获取一系列可迭代的 JSON 对象。 -
在每个 JSON 对象中,获取推理的一部分。
-
使用
yield表达式,以增量方式返回这些部分。
以下示例创建并使用了 SmrInferenceStream 对象:
request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')
此示例将请求正文传递给 stream_inference 方法。该方法将遍历响应,以打印推理流返回的每个部分。
此示例假设指定端点处的模型是生成文本的 LLM。此示例的输出是生成的文本正文,文本以增量方式打印:
a challenging problem in machine learning. The goal is to . . .
使用 AWS CLI 调用模型
您可以通过使用 AWS Command Line Interface (AWS CLI) 运行命令来调用您的模型端点。AWS CLI 支持使用 invoke-endpoint 命令发送标准推理请求,并支持使用 invoke-endpoint-async 命令发送异步推理请求。
注意
AWS CLI不支持流式推理请求。
以下示例使用 invoke-endpoint 命令,向模型端点发送推理请求:
aws sagemaker-runtime invoke-endpoint \ --endpoint-nameendpoint_name\ --bodyfileb://$file_name\output_file.txt
对于 --endpoint-name 参数,请提供创建端点时指定的端点名称。对于--body参数,提供 SageMaker AI 要传递给模型的输入数据。数据的格式必须与用于训练的数据格式相同。此示例显示了如何向端点发送二进制数据。
有关在将文件内容传递给的参数fileb://时何时使用 file:// over 的更多信息AWS CLI,请参阅本地文件参数的最佳实践
有关更多信息以及可以传递的其他参数,请参阅《AWS CLI命令参考》中的 invoke-endpoint。
如果 invoke-endpoint 命令成功,则将返回如下所示的响应:
{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }
如果命令不成功,请检查输入负载的格式是否正确。
可通过检查文件输出文件(在此例中为 output_file.txt),查看调用的输出。
more output_file.txt
使用适用于 Python 的AWS软件开发工具包调用您的模型
调用以双向流式传输推理请求和响应
如果您想在应用程序代码中调用模型端点以支持双向流式传输,则可以使用新的实验性 Python SDK
注意
新的实验性 SDK 不同于标准 Boto3 SDK,它支持用于数据交换的永久双向连接。在使用实验性 Python SDK 时,对于任何非实验性用例,我们强烈建议严格固定到某个 SDK 版本。
要通过双向流媒体调用您的终端节点,请使用invoke_endpoint_with_bidirectional_stream方法。此方法可建立持久连接,允许您将多个有效载荷区块流式传输到模型,同时在模型处理数据时实时接收响应。在您明确关闭输入流或端点关闭连接之前,连接将保持打开状态,最多支持 30 分钟的连接时间。
先决条件
在应用程序代码中使用双向流媒体之前,您必须:
-
安装实验 SageMaker 运行时 HTTP/2 软件开发工具包
-
为您的 SageMaker Runtime 客户端设置AWS凭据
-
部署支持向端点进行双向流式传输的 SageMaker 模型
设置双向流媒体客户端
以下示例显示如何初始化双向流式传输所需的组件:
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)
完整的双向流媒体客户端
以下示例演示如何创建双向流式传输客户端,该客户端将多个文本负载发送到 SageMaker 端点并实时处理响应:
import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())
客户端使用端口 8443 上的区域端点 URI 初始化 SageMaker Runtime HTTP/2 客户端,这是双向流媒体连接所必需的。start_ session() 方法调用invoke_endpoint_with_bidirectional_stream()以建立持久连接,并创建一个异步任务来同时处理传入的响应。
该send_event()方法将负载数据封装在相应的请求对象中,并通过输入流发送它们,而该_process_responses()方法则在端点到达时持续监听和处理来自端点的响应。这种双向方法可以实现实时交互,在这种交互中,发送请求和接收响应都通过同一个连接同时发生。