As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Invocar modelos para inferência em tempo real
Depois de usar o Amazon SageMaker AI para implantar um modelo em um endpoint, você pode interagir com o modelo enviando solicitações de inferência para ele. Para enviar uma solicitação de inferência para um modelo, você invoca o endpoint que o hospeda. Você pode invocar seus endpoints usando o Amazon SageMaker Studio AWS SDKs, o ou o. AWS CLI
Invoque seu modelo usando o Amazon Studio SageMaker
Depois de implantar seu modelo em um endpoint, você pode visualizar o endpoint por meio do Amazon SageMaker Studio e testar seu endpoint enviando solicitações de inferência únicas.
nota
SageMaker A IA só oferece suporte a testes de endpoint no Studio para endpoints em tempo real.
Para enviar uma solicitação de inferência de teste para seu endpoint
-
Inicie o Amazon SageMaker Studio.
-
No painel de navegação à esquerda, escolha Implantações.
-
No menu suspenso, escolha Endpoints.
-
Encontre seu endpoint pelo nome e escolha o nome na tabela. Os nomes dos endpoints listados no painel Endpoints são definidos quando você implanta um modelo. A área de trabalho do Studio abre a página de Endpoint em uma nova guia.
-
Escolha a guia Testar inferência.
-
Em Opções de teste, escolha uma das seguintes opções:
-
Selecione Testar a solicitação de amostra para enviar imediatamente uma solicitação ao endpoint. Use o editor JSON para fornecer dados de amostra no formato JSON e escolha Enviar solicitaçãopara enviar a solicitação ao endpoint. Depois de enviar a solicitação, o Studio mostra a saída da inferência em um cartão à direita do editor JSON.
-
Selecione Usar código de exemplo do Python SDK para visualizar o código e enviar uma solicitação ao endpoint. Em seguida, copie o exemplo de código da seção Exemplo de solicitação de inferência e execute o código no ambiente de teste.
-
A parte superior do cartão mostra o tipo de solicitação que foi enviada ao endpoint (somente JSON é aceito). O cartão mostra os seguintes campos:
Status: exibe um dos seguintes tipos de status:
Success: a solicitação foi bem-sucedida.Failed: a solicitação falhou. Uma resposta aparece em Motivo da falha.Pending: enquanto a solicitação de inferência está pendente, o status mostra um ícone circular giratório.
Duração da execução: quanto tempo demorou a invocação (hora de término menos a hora de início) em milissegundos.
Tempo da solicitação: quantos minutos se passaram desde que a solicitação foi enviada.
Tempo do resultado: quantos minutos se passaram desde que o resultado foi devolvido.
Invocar o modelo usando o AWS SDK para Python (Boto3)
Se quiser invocar um endpoint de modelo no código do seu aplicativo, você pode usar um dos AWSSDKs, incluindo o. AWS SDK para Python (Boto3) Para invocar o endpoint com esse SDK, use um dos seguintes métodos Python:
-
invoke_endpoint: envia uma solicitação de inferência para um endpoint do modelo e exibe a resposta que o modelo gera.Esse método retorna a carga útil de inferência como uma resposta depois que o modelo termina de gerá-la. Para mais informações, consulte a invoke_endpoint
no AWSSDK para Referência API Python (Boto3). -
invoke_endpoint_with_response_stream: envia uma solicitação de inferência para um endpoint do modelo e transmite a resposta de forma incremental enquanto o modelo a gera.Com esse método, a aplicação recebe partes da resposta à medida que se tornam disponíveis. Para mais informações, consulte a invoke_endpoint
no AWSSDK para Referência API Python (Boto3). Use esse método somente para invocar modelos que seja compatível comm streaming de inferência.
Antes de usar esses métodos no código do aplicativo, você deve inicializar um cliente do SageMaker AI Runtime e especificar o nome do seu endpoint. O seguinte exemplo configura o cliente e o endpoint para os demais seguintes exemplos:
import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'
Invoque para obter uma resposta de inferência
O seguinte exemplo usa o método invoke_endpoint para invocar um endpoint com o AWS SDK para Python (Boto3):
# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))
Este exemplo fornece dados de entrada no Body campo para a SageMaker IA passar para o modelo. Esses dados devem estar no mesmo formato usado para treinamento. O exemplo designa a resposta para a variável response.
A variável response fornece acesso ao status HTTP, ao nome do modelo implantado e a outros campos. O trecho a seguir imprime o código de status de HTTP:
print(response["HTTPStatusCode"])
Invoque para obter uma resposta de fluxo
Se você implantou um modelo que é compatível com streaming de inferência, você pode invocar o modelo para receber sua carga útil de inferência como um fluxo de partes. O modelo entrega essas peças de forma incremental à medida que o modelo as gera. Quando uma aplicação recebe um fluxo de inferência, a aplicação não precisa esperar que o modelo gere toda a carga útil da resposta. Em vez disso, a aplicação recebe imediatamente partes da resposta à medida que se tornam disponíveis.
Ao consumir um fluxo de inferência em sua aplicação, você pode criar interações em que seus usuários percebam que a inferência é rápida porque obtêm a primeira parte imediatamente. Você pode implementar o streaming para oferecer apoio a experiências interativas rápidas, como chatbots, assistentes virtuais e geradores de música. Por exemplo, você pode criar um chatbot que mostre incrementalmente o texto gerado por um modelo de linguagem grande (LLM).
Para obter um fluxo de inferência, você pode usar o método invoke_endpoint_with_response_stream. No corpo da resposta, o SDK fornece um objeto EventStream, que fornece a inferência como uma série de objetos PayloadPart.
exemplo Fluxo de inferência
O seguinte exemplo é um fluxo de objetos PayloadPart:
{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .
Em cada parte da carga útil, o campo Bytes fornece uma parte da resposta de inferência do modelo. Essa parte pode ser qualquer tipo de conteúdo gerado por um modelo, como texto, imagem ou dados de áudio. Neste exemplo, as partes são objetos JSON que contêm texto gerado de um LLM.
Normalmente, a parte da carga útil contém um bloco discreto de dados do modelo. Neste exemplo, os blocos discretos são objetos JSON inteiros. Ocasionalmente, a resposta de streaming divide as partes em várias partes da carga útil ou combina várias partes em uma parte da carga útil. O seguinte exemplo mostra um bloco de dados no formato JSON dividido em duas partes da carga útil:
{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
Ao escrever um código de aplicação que processa um fluxo de inferência, inclua uma lógica que manipule essas divisões e combinações ocasionais de dados. Como uma estratégia, você pode escrever um código que concatene o conteúdo Bytes enquanto sua aplicação recebe as partes da carga útil. Ao concatenar os dados JSON de exemplo aqui, você combinaria os dados em um corpo JSON delimitado por nova linha. Em seguida, seu código poderia processar o fluxo analisando todo o objeto JSON em cada linha.
O seguinte exemplo mostra o JSON delimitado por nova linha que você criaria ao concatenar o conteúdo de exemplo de Bytes:
{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
exemplo Código para processar um fluxo de inferência
O seguinte exemplo de classe Python, SmrInferenceStream, demonstra como você pode processar um fluxo de inferência que envia dados de texto no formato JSON:
import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]
Este exemplo processa o fluxo de inferência fazendo o seguinte:
-
Inicializa um cliente SageMaker AI Runtime e define o nome de um endpoint do modelo. Antes de obter um fluxo de inferência, o modelo que o endpoint hospeda deve oferecer compatibilidade com o streaming de inferência.
-
No método de exemplo
stream_inference, recebe um corpo de solicitação e o passa para o métodoinvoke_endpoint_with_response_streamdo SDK. -
Itera sobre cada evento no objeto
EventStreamque o SDK retorna. -
De cada evento, obtém o conteúdo do objeto
Bytesno objetoPayloadPart. -
No método
_writede exemplo, grava em um buffer para concatenar o conteúdo dos objetosBytes. O conteúdo combinado forma um corpo JSON delimitado por nova linha. -
Usa o método
_readlinesde exemplo para obter uma série iterável de objetos JSON. -
Em cada objeto JSON, obtém uma parte da inferência.
-
Com a expressão
yield, retorna as peças de forma incremental.
O seguinte exemplo cria e usa um objeto SmrInferenceStream:
request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')
Este exemplo passa um corpo de solicitação para o método stream_inference. Ele itera sobre a resposta para imprimir cada peça que o fluxo de inferência retorna.
O exemplo pressupõe que o modelo no endpoint especificado é um LLM que gera texto. A saída desse exemplo é um corpo de texto gerado que é impresso de forma incremental:
a challenging problem in machine learning. The goal is to . . .
Invocar o modelo usando o AWS CLI
Você pode invocar o endpoint do seu modelo executando comandos com o AWS Command Line Interface ()AWS CLI. A AWS CLI oferece apoio a solicitações de inferência padrão com o comando invoke-endpoint e oferece apoio a solicitações de inferência assíncronas com o comando invoke-endpoint-async.
nota
O AWS CLI não suporta solicitações de inferência de streaming.
O seguinte exemplo usa o comando invoke-endpoint para enviar uma solicitação de inferência para um endpoint do modelo:
aws sagemaker-runtime invoke-endpoint \ --endpoint-nameendpoint_name\ --bodyfileb://$file_name\output_file.txt
Para o parâmetro --endpoint-name, forneça o nome do endpoint que você especificou quando o criou. Para o --body parâmetro, forneça dados de entrada para que a SageMaker IA passe para o modelo. Os dados devem estar no mesmo formato usado para treinamento. Este exemplo mostra como enviar dados binários para o seu endpoint.
Para obter mais informações sobre quando usar file:// over fileb:// ao passar o conteúdo de um arquivo para um parâmetro doAWS CLI, consulte Melhores práticas para parâmetros de arquivos locais
Para obter mais informações e ver parâmetros adicionais que você pode passar, consulte invoke-endpoint na Referência de Comandos AWS CLI.
Se o comando invoke-endpoint for bem-sucedido, ele retornará uma resposta como a seguinte:
{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }
Se o comando não for bem-sucedido, verifique se a carga útil de entrada está no formato correto.
Visualize a saída da invocação verificando o arquivo de saída do arquivo (output_file.txt neste exemplo).
more output_file.txt
Invoque seu modelo usando o AWS SDK para Python
Invocar para transmitir bidirecionalmente uma solicitação e resposta de inferência
Se você quiser invocar um endpoint de modelo no código do seu aplicativo para suportar streaming bidirecional, você pode usar o novo SDK experimental para Python
nota
O novo SDK experimental é diferente do SDK Boto3 padrão e oferece suporte a conexões bidirecionais persistentes para troca de dados. Ao usar o SDK experimental do Python, recomendamos fortemente a fixação estrita em uma versão do SDK para qualquer caso de uso não experimental.
Para invocar seu endpoint com streaming bidirecional, use o método. invoke_endpoint_with_bidirectional_stream Esse método estabelece uma conexão persistente que permite transmitir vários fragmentos de carga útil para seu modelo enquanto recebe respostas em tempo real enquanto o modelo processa os dados. A conexão permanece aberta até que você feche explicitamente o fluxo de entrada ou o endpoint feche a conexão, suportando até 30 minutos de tempo de conexão.
Pré-requisitos
Antes de usar o streaming bidirecional no código do aplicativo, você deve:
-
Instale o SDK experimental SageMaker Runtime HTTP/2
-
Configurar AWS credenciais para seu cliente SageMaker Runtime
-
Implemente um modelo que suporte streaming bidirecional para um endpoint SageMaker
Configurar o cliente de streaming bidirecional
O exemplo a seguir mostra como inicializar os componentes necessários para streaming bidirecional:
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)
Cliente de streaming bidirecional completo
O exemplo a seguir demonstra como criar um cliente de streaming bidirecional que envia várias cargas de texto para um SageMaker endpoint e processa as respostas em tempo real:
import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())
O cliente inicializa o cliente SageMaker Runtime HTTP/2 com o URI do endpoint regional na porta 8443, que é necessário para conexões de streaming bidirecionais. O session() método start_ chama invoke_endpoint_with_bidirectional_stream() para estabelecer a conexão persistente e cria uma tarefa assíncrona para processar as respostas recebidas simultaneamente.
O send_event() método agrupa os dados da carga nos objetos de solicitação apropriados e os envia pelo fluxo de entrada, enquanto escuta e processa _process_responses() continuamente as respostas do endpoint à medida que elas chegam. Essa abordagem bidirecional permite a interação em tempo real, na qual o envio de solicitações e o recebimento de respostas acontecem simultaneamente na mesma conexão.