调用模型进行实时推理 - 亚马逊 SageMaker AI

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

调用模型进行实时推理

使用 Amazon SageMaker AI 将模型部署到终端节点后,您可以通过向模型发送推理请求来与模型进行交互。要向模型发送推理请求,您需要调用承载该模型的端点。您可以使用 Amazon SageMaker Studio AWS SDKs、或调用您的终端节点AWS CLI。

使用 Amazon SageMaker Studio 调用您的模型

将模型部署到终端节点后,您可以通过 Amazon SageMaker Studio 查看终端节点,并通过发送单个推理请求来测试您的终端节点。

注意

SageMaker AI 仅支持在 Studio 中对实时端点进行端点测试。

向端点发送测试推理请求
  1. 启动 Amazon SageMaker Studio。

  2. 在左侧导航窗格中,选择部署

  3. 从下拉菜单中选择端点

  4. 按名称查找端点,然后在表中选择名称。端点面板中列出的端点名称是在部署模型时定义的。Studio 工作区将在新选项卡中打开端点页面。

  5. 选择测试推理选项卡。

  6. 测试选项中,选择以下选项之一:

    1. 选择测试示例请求,立即向端点发送请求。使用 JSON 编辑器提供 JSON 格式的示例数据,然后选择发送请求向端点提交请求。提交请求后,Studio 会在 JSON 编辑器右侧的卡片中显示推理输出。

    2. 选择使用 Python SDK 示例代码,查看向端点发送请求的代码。然后,从推理请求示例部分复制代码示例,并在测试环境中运行代码。

卡片顶部显示了发送到端点的请求类型(仅接受 JSON)。卡片中显示了以下字段:

  • 状态 – 显示以下状态类型之一:

    • Success – 请求成功。

    • Failed – 请求失败。响应显示在失败原因下方。

    • Pending – 当推理请求处于待处理状态时,状态会显示一个旋转的圆形图标。

  • 执行时长 – 调用耗费的时间(结束时间减去开始时间),以毫秒为单位。

  • 请求时间 – 自发送请求以来过去的分钟数。

  • 结果时间 – 自返回结果以来过去的分钟数。

使用 适用于 Python (Boto3) 的 AWS SDK 调用模型

如果要在应用程序代码中调用模型端点,则可以使用其中之一 AWSSDKs,包括适用于 Python (Boto3) 的 AWS SDK。使用该 SDK 调用端点时,您需要使用以下 Python 方法之一:

  • invoke_endpoint:向模型端点发送推理请求,并返回模型生成的响应。

    在模型完成生成推理负载后,此方法将其作为一个响应返回。有关更多信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 invoke_endpoint

  • invoke_endpoint_with_response_stream:向模型端点发送推理请求,并在模型生成响应时以增量方式流式传输响应。

    使用这种方法,您的应用程序会在响应部分可用时立即收到这些部分。有关更多信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 invoke_endpoint

    此方法仅用于调用支持推理流的模型。

在应用程序代码中使用这些方法之前,必须初始化 A SageMaker I Runtime 客户端,并且必须指定终端节点的名称。以下示例为接下来的示例设置了客户端和端点:

import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'

调用以获取推理响应

以下示例使用 invoke_endpoint 方法,通过 适用于 Python (Boto3) 的 AWS SDK 调用端点:

# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))

此示例提供Body字段中的输入数据, SageMaker 让 AI 传递给模型。此数据的格式必须与用于训练的数据格式相同。示例将响应赋值给 response 变量。

response 变量提供了对 HTTP 状态、已部署模型的名称以及其他字段的访问。以下代码段将打印 HTTP 状态代码:

print(response["HTTPStatusCode"])

调用以流式处理推理响应

如果您部署了支持推理流的模型,则可调用该模型以流的形式接收其推理负载部分。模型在生成推理响应时,会以增量方式交付这些部分。应用程序在接收推理流时,无需等待模型生成整个响应负载。取而代之的是,当响应的部分内容可用时,应用程序会立即收到。

通过在应用程序中使用推理流,您可以创建交互,在交互中用户会认为推理速度很快,因为他们能立即获得第一部分。您可以实施流式处理以支持快速的交互式体验,例如聊天机器人、虚拟助手和音乐生成器。例如,您可以创建一个聊天机器人,以增量方式显示大型语言模型 (LLM) 生成的文本。

要获取推理流,您可以使用 invoke_endpoint_with_response_stream 方法。在响应正文中,SDK 提供了 EventStream 对象,该对象以一系列 PayloadPart 对象的形式给出推理。

例 推理流

以下示例是 PayloadPart 对象流:

{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .

在每个负载部分中,Bytes 字段提供模型推理响应的一部分。此部分可以是模型生成的任何内容类型,如文本、图像或音频数据。在此示例中,这些部分是 JSON 对象,其中包含 LLM 生成的文本。

通常,负载部分包含来自模型的离散数据块。在本示例中,离散块是整个 JSON 对象。有时,流媒式响应会将数据块分成多个负载部分,或者将多个数据块组合成一个负载部分。以下示例显示了一个 JSON 格式的数据块,该数据块分为两个负载部分:

{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}

在编写处理推理流的应用程序代码时,应包括处理这些偶尔的数据拆分和组合的逻辑。作为一种策略,您可以编写代码,在应用程序接收负载部分的同时,连接 Bytes 的内容。通过连接此处的示例 JSON 数据,可以将这些数据组合成一个以换行符分隔的 JSON 正文。然后,您的代码可以通过解析每行上的整个 JSON 对象来处理流。

以下示例显示了您在连接 Bytes 的以下示例内容时,创建的以换行符分隔的 JSON:

{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
例 用于处理推理流的代码

以下示例 Python 类 SmrInferenceStream 演示了如何处理以 JSON 格式发送文本数据的推理流:

import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]

此示例通过执行以下操作处理推理流:

  • 初始化 A SageMaker I 运行时客户端并设置模型端点的名称。在获得推理流之前,端点托管的模型必须支持推理流。

  • 在示例 stream_inference 方法中,接收请求正文并将其传递给 SDK 的 invoke_endpoint_with_response_stream 方法。

  • 遍历 SDK 返回的 EventStream 对象中的每个事件。

  • 从每个事件中获取 PayloadPart 对象中 Bytes 对象的内容。

  • 在示例 _write 方法中,写入缓冲区以连接 Bytes 对象的内容。组合后的内容构成以换行符分隔的 JSON 正文。

  • 使用示例 _readlines 方法获取一系列可迭代的 JSON 对象。

  • 在每个 JSON 对象中,获取推理的一部分。

  • 使用 yield 表达式,以增量方式返回这些部分。

以下示例创建并使用了 SmrInferenceStream 对象:

request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')

此示例将请求正文传递给 stream_inference 方法。该方法将遍历响应,以打印推理流返回的每个部分。

此示例假设指定端点处的模型是生成文本的 LLM。此示例的输出是生成的文本正文,文本以增量方式打印:

a challenging problem in machine learning. The goal is to . . .

使用 AWS CLI 调用模型

您可以通过使用 AWS Command Line Interface (AWS CLI) 运行命令来调用您的模型端点。AWS CLI 支持使用 invoke-endpoint 命令发送标准推理请求,并支持使用 invoke-endpoint-async 命令发送异步推理请求。

注意

AWS CLI不支持流式推理请求。

以下示例使用 invoke-endpoint 命令,向模型端点发送推理请求:

aws sagemaker-runtime invoke-endpoint \ --endpoint-name endpoint_name \ --body fileb://$file_name \ output_file.txt

对于 --endpoint-name 参数,请提供创建端点时指定的端点名称。对于--body参数,提供 SageMaker AI 要传递给模型的输入数据。数据的格式必须与用于训练的数据格式相同。此示例显示了如何向端点发送二进制数据。

有关在将文件内容传递给的参数fileb://时何时使用 file:// over 的更多信息AWS CLI,请参阅本地文件参数的最佳实践

有关更多信息以及可以传递的其他参数,请参阅《AWS CLI命令参考》中的 invoke-endpoint

如果 invoke-endpoint 命令成功,则将返回如下所示的响应:

{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }

如果命令不成功,请检查输入负载的格式是否正确。

可通过检查文件输出文件(在此例中为 output_file.txt),查看调用的输出。

more output_file.txt

使用适用于 Python 的AWS软件开发工具包调用您的模型

调用以双向流式传输推理请求和响应

如果您想在应用程序代码中调用模型端点以支持双向流式传输,则可以使用新的实验性 Python SDK,该软件开发工具包支持双向流传输功能并支持 HTTP/2。此 SDK 支持您的客户端应用程序和 SageMaker 终端节点之间的实时、双向通信,使您能够以增量方式发送推理请求,同时在模型生成流响应时接收流式响应。这对于交互式应用程序特别有用,在这些应用程序中,客户端和服务器都需要通过持久连接持续交换数据。

注意

新的实验性 SDK 不同于标准 Boto3 SDK,它支持用于数据交换的永久双向连接。在使用实验性 Python SDK 时,对于任何非实验性用例,我们强烈建议严格固定到某个 SDK 版本。

要通过双向流媒体调用您的终端节点,请使用invoke_endpoint_with_bidirectional_stream方法。此方法可建立持久连接,允许您将多个有效载荷区块流式传输到模型,同时在模型处理数据时实时接收响应。在您明确关闭输入流或端点关闭连接之前,连接将保持打开状态,最多支持 30 分钟的连接时间。

先决条件

在应用程序代码中使用双向流媒体之前,您必须:

  1. 安装实验 SageMaker 运行时 HTTP/2 软件开发工具包

  2. 为您的 SageMaker Runtime 客户端设置AWS凭据

  3. 部署支持向端点进行双向流式传输的 SageMaker 模型

设置双向流媒体客户端

以下示例显示如何初始化双向流式传输所需的组件:

from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)

完整的双向流媒体客户端

以下示例演示如何创建双向流式传输客户端,该客户端将多个文本负载发送到 SageMaker 端点并实时处理响应:

import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())

客户端使用端口 8443 上的区域端点 URI 初始化 SageMaker Runtime HTTP/2 客户端,这是双向流媒体连接所必需的。start_ session() 方法调用invoke_endpoint_with_bidirectional_stream()以建立持久连接,并创建一个异步任务来同时处理传入的响应。

send_event()方法将负载数据封装在相应的请求对象中,并通过输入流发送它们,而该_process_responses()方法则在端点到达时持续监听和处理来自端点的响应。这种双向方法可以实现实时交互,在这种交互中,发送请求和接收响应都通过同一个连接同时发生。