기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
실시간 추론을 위한 모델 호출
Amazon SageMaker AI를 사용하여 모델을 엔드포인트에 배포한 후 추론 요청을 보내 모델과 상호 작용할 수 있습니다. 모델에 추론 요청을 보내려면 추론 요청을 호스팅하는 엔드포인트를 호출합니다. 사용자는 Amazon SageMaker Studio, AWS, SDK 또는 AWS CLI를 사용하여 엔드포인트를 테스트할 수 있습니다.
Amazon SageMaker 스튜디오를 사용하여 모델 호출
모델을 엔드포인트에 배포한 후 Amazon SageMaker Studio를 통해 엔드포인트를 보고 단일 추론 요청을 전송하여 엔드포인트를 테스트할 수 있습니다.
참고
SageMaker AI는 실시간 엔드포인트에 대해 Studio에서 엔드포인트 테스트만 지원합니다.
엔드포인트에 테스트 추론 요청을 보내려면
-
Amazon SageMaker Studio를 실행합니다.
-
왼쪽 탐색 창에서 배포를 선택합니다.
-
드롭다운에서 엔드포인트를 선택합니다.
-
이름으로 엔드포인트를 찾고 표에서 이름을 선택합니다. 엔드포인트 패널에 나열된 엔드포인트 이름은 모델을 배포할 때 정의됩니다. Studio 작업 영역에서 엔드포인트 페이지가 새 탭에서 열립니다.
-
추론 테스트 탭을 선택합니다.
-
지정일에는 다음 옵션 중 하나를 선택합니다.
-
샘플 요청 테스트를 선택하여 즉시 엔드포인트로 요청을 보냅니다. JSON 편집기를 사용하여 샘플 데이터를 JSON 형식으로 제공하고 요청 전송을 선택하여 요청을 엔드포인트에 제출합니다. 요청 제출 후 Studio는 JSON 에디터 오른쪽에 있는 카드에 추론 출력을 표시합니다.
-
Python SDK 예제 코드 사용을 선택하여 엔드포인트에 요청을 보내기 위한 코드를 확인합니다. 그런 다음 추론 요청 예제 섹션에서 코드 예제를 복사하고 테스트 환경에서 코드를 실행합니다.
-
카드 상단에는 엔드포인트로 전송된 요청 유형이 표시됩니다 (JSON만 허용). 카드에는 다음 필드가 표시됩니다.
상태 - 다음 상태 유형 중 하나를 표시합니다.
Success– 요청이 성공했습니다.Failed– 요청이 실패했습니다. 실패 사유 아래에 응답이 표시됩니다.Pending- 추론 요청이 보류 중인 동안에는 상태에 회전하는 원형 아이콘이 표시됩니다.
실행 길이 - 호출에 걸린 시간(종료 시간에서 시작 시간을 뺀 값)을 밀리초 단위로 나타냅니다.
요청 시간 - 요청이 전송된 후 몇 분이 경과했는지입니다.
결과 시간 - 결과가 반환된 후 몇 분이 경과했는지를 나타냅니다.
AWS SDK for Python (Boto3)을 사용하여 모델 호출
애플리케이션 코드에서 모델 엔드포인트를 호출하려는 경우를 포함한 AWSSDKs 중 하나를 사용할 수 있습니다AWS SDK for Python (Boto3). 이 SDK로 엔드포인트를 호출하려면 다음 Python 메서드 중 하나를 사용합니다.
-
invoke_endpoint– 모델 엔드포인트에 추론 요청을 보내고 모델이 생성한 응답을 반환합니다.이 메서드는 모델이 추론 페이로드를 생성한 후 하나의 응답으로 추론 페이로드를 반환합니다. 자세한 내용은 Python용 AWSSDK(Boto3) API 참조의 invoke_endpoint
항목을 참조하세요. -
invoke_endpoint_with_response_stream- 모델 엔드포인트에 추론 요청을 보내고 모델이 추론을 생성하는 동안 응답을 점진적으로 스트리밍합니다.이 메서드를 사용하면 해당 부분을 사용할 수 있게 되는 즉시 애플리케이션이 응답의 일부를 수신하기 시작합니다. 자세한 내용은 Python용 AWSSDK(Boto3) API 참조의 invoke_endpoint
항목을 참조하세요. 이 메서드는 추론 스트리밍을 지원하는 모델을 호출할 때만 사용하세요.
애플리케이션 코드에서 이러한 메서드를 사용하려면 사용자는 먼저 SageMaker AI 런타임 클라이언트를 초기화하고 엔드포인트의 이름을 지정해야 합니다. 다음 예제는 이어지는 나머지 예제를 위해 클라이언트와 엔드포인트를 설정합니다.
import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'
호출하여 추론 응답 가져오기
다음 예시에서는 invoke_endpoint 메서드를 사용하여 AWS SDK for Python (Boto3)을 포함하는 엔드포인트를 호출합니다.
# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))
이 예시는 SageMaker AI가 모델로 전달할 Body 필드에 입력 데이터를 제공합니다. 이 데이터는 훈련에 사용한 것과 동일한 형식이어야 합니다. 이 예제에서는 response 변수에 응답을 할당합니다.
response 변수는 HTTP 상태, 배포된 모델 이름 및 기타 필드에 대한 액세스를 제공합니다. 다음 스니펫은 HTTP 상태 코드를 인쇄합니다.
print(response["HTTPStatusCode"])
호출하여 추론 응답을 스트리밍합니다.
추론 스트리밍을 지원하는 모델을 배포한 경우 사용자는 모델을 호출하여 추론 페이로드를 파트 스트림으로 수신할 수 있습니다. 모델은 모델이 이러한 부분을 생성할 때 이러한 부분을 점진적으로 제공합니다. 애플리케이션이 추론 스트림을 수신하면 애플리케이션이 모델이 전체 응답 페이로드를 생성할 때까지 기다릴 필요가 없습니다. 대신 애플리케이션은 응답이 제공되는 즉시 일부 응답을 받습니다.
애플리케이션에서 추론 스트림을 사용하면 사용자가 첫 번째 부분을 즉시 받기 때문에 추론이 빠르다고 인식할 수 있는 상호 작용을 만들 수 있습니다. 사용자는 스트리밍을 구현하여 챗봇, 가상 어시스턴트, 음악 생성기와 같은 빠른 대화형 환경을 지원할 수 있습니다. 예를 들어 대규모 언어 모델(LLM)에서 생성된 텍스트를 점진적으로 표시하는 챗봇을 만들 수 있습니다.
추론 스트림을 가져오려면 invoke_endpoint_with_response_stream 메서드를 사용할 수 있습니다. 응답 본문에서 SDK는 EventStream 객체를 제공하며, 이 객체는 추론을 일련의 PayloadPart 객체로 제공합니다.
예 추론 스트림
다음 예제는 PayloadPart JSON 객체의 스트림입니다.
{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .
각 페이로드 부분에서 Bytes 필드는 모델의 추론 응답의 일부를 제공합니다. 이 부분은 텍스트, 이미지, 오디오 데이터 등 모델이 생성하는 모든 콘텐츠 유형일 수 있습니다. 이 예제에서 부분은 LLM에서 생성된 텍스트를 포함하는 JSON 객체입니다.
일반적으로 페이로드 부분에는 모델의 개별 데이터 청크가 포함됩니다. 이 예제에서는 불연속형 청크가 전체 JSON 객체입니다. 스트리밍 응답이 청크를 여러 페이로드 부분으로 분할하거나 여러 청크를 하나의 페이로드 부분으로 결합하는 경우가 있습니다. 다음 예제는 두 페이로드 부분으로 분할된 JSON 형식의 데이터 청크를 보여줍니다.
{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
추론 스트림을 처리하는 애플리케이션 코드를 작성할 때는 이러한 가끔 발생하는 데이터 분할 및 조합을 처리하는 로직을 포함하세요. 한 가지 전략으로는 애플리케이션이 페이로드 부분을 수신하는 동안 Bytes의 내용을 연결하는 코드를 작성할 수 있습니다. 여기에 있는 예제 JSON 데이터를 연결하면 데이터를 줄바꿈으로 구분된 JSON 본문으로 결합할 수 있습니다. 그러면 코드가 각 줄의 전체 JSON 객체를 파싱하여 스트림을 처리할 수 있습니다.
다음 예제는 Bytes의 예제 내용을 연결할 때 생성되는 줄바꿈으로 구분된 JSON을 보여줍니다.
{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
예 추론 스트림을 처리하는 코드
다음 예제 Python 클래스 SmrInferenceStream는 사용자가 텍스트 데이터를 JSON 형식으로 보내는 추론 스트림을 처리하는 방법을 보여줍니다.
import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]
이 예제에서는 다음을 수행하여 추론 스트림을 처리합니다.
-
SageMaker AI 런타임 클라이언트를 초기화하고 모델 엔드포인트의 이름을 설정합니다. 사용자가 인퍼런스 스트림을 가져오려면 엔드포인트가 호스트하는 모델이 인퍼런스 스트리밍을 지원해야 합니다.
-
예제
stream_inference메서드에서는 요청 본문을 수신하여 SDK의invoke_endpoint_with_response_stream메서드에 전달합니다. -
SDK가 반환하는
EventStream객체의 각 이벤트를 반복합니다. -
각 이벤트에서
PayloadPart객체에 있는Bytes객체의 내용을 가져옵니다. -
예제
_write메서드에서는 버퍼에 기록하여Bytes객체의 내용을 연결합니다. 결합된 내용은 줄바꿈으로 구분된 JSON 본문을 형성합니다. -
예제
_readlines메서드를 사용하여 반복 가능한 일련의 JSON 객체를 가져옵니다. -
각 JSON 객체에서 추론의 일부를 가져옵니다.
-
yield표현식을 사용하면 조각들을 점진적으로 반환합니다.
다음 예제에서는 SmrInferenceStream 객체를 만들고 사용합니다.
request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')
이 예제는 요청 본문을 stream_inference 메서드에 전달합니다. 응답을 반복하여 추론 스트림이 반환하는 각 부분을 인쇄합니다.
이 예제에서는 지정된 엔드포인트의 모델이 텍스트를 생성하는 LLM이라고 가정합니다. 이 예제의 출력은 점진적으로 인쇄되는 생성된 텍스트 본문입니다.
a challenging problem in machine learning. The goal is to . . .
AWS CLI를 사용하여 모델 호출
AWS Command Line Interface()를 사용하여 명령을 실행하여 모델 엔드포인트를 호출할 수 있습니다AWS CLI. AWS CLI는 invoke-endpoint 명령을 통한 표준 추론 요청을 지원하고 invoke-endpoint-async 명령을 통한 비동기 추론 요청을 지원합니다.
참고
AWS CLI는 스트리밍 추론 요청을 지원하지 않습니다.
다음 예시에서는 invoke-endpoint 명령을 사용하여 모델 엔드포인트에 추론 요청을 보냅니다.
aws sagemaker-runtime invoke-endpoint \ --endpoint-nameendpoint_name\ --bodyfileb://$file_name\output_file.txt
--endpoint-name 파라미터에는 엔드포인트를 생성할 때 지정한 엔드포인트 이름을 제공합니다. --body 파라미터의 경우 SageMaker AI가 모델로 전달할 입력 데이터를 제공합니다. 데이터는 훈련에 사용한 것과 동일한 형식이어야 합니다. 이 예제는 엔드포인트로 바이너리 데이터를 보내는 방법을 보여줍니다.
파일 콘텐츠를 파라미터에 전달할 fileb:// 때를 file:// 다시 사용해야 하는 경우에 대한 자세한 내용은 로컬 파일 파라미터 모범 사례를
자세한 내용과 전달할 수 있는 추가 파라미터를 보려면 사용자는 AWS CLI 명령 참조서의 invoke-endpoint 섹션을 참조하세요.
invoke-endpoint 명령이 성공하면 다음과 같은 응답이 반환됩니다.
{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }
명령이 성공하지 못하면 입력 페이로드가 올바른 형식인지 확인합니다.
파일 출력 파일(이 예제에서는 output_file.txt)을 확인하여 호출 결과를 확인합니다.
more output_file.txt
AWSSDK for Python을 사용하여 모델 호출
추론 요청 및 응답을 양방향으로 스트리밍하도록 호출
애플리케이션 코드에서 모델 엔드포인트를 호출하여 양방향 스트리밍을 지원하려는 경우 HTTP/2 지원과 함께 양방향 스트리밍 기능을 지원하는 새로운 실험용 SDK for Python
참고
새로운 실험용 SDK는 표준 Boto3 SDK와 다르며 데이터 교환을 위한 지속적인 양방향 연결을 지원합니다. 실험용 Python SDK를 사용하는 동안 실험적이지 않은 사용 사례에 대해서는 SDK 버전을 엄격하게 고정할 것을 강력히 권장합니다.
양방향 스트리밍으로 엔드포인트를 호출하려면 invoke_endpoint_with_bidirectional_stream 메서드를 사용합니다. 이 방법은 모델이 데이터를 처리할 때 실시간으로 응답을 수신하면서 여러 페이로드 청크를 모델에 스트리밍할 수 있는 영구 연결을 설정합니다. 입력 스트림을 명시적으로 닫거나 엔드포인트가 연결을 닫을 때까지 연결이 열린 상태로 유지되어 최대 30분의 연결 시간을 지원합니다.
사전 조건
애플리케이션 코드에서 양방향 스트리밍을 사용하려면 먼저 다음을 수행해야 합니다.
-
실험용 SageMaker 런타임 HTTP/2 SDK 설치
-
SageMaker 런타임 클라이언트에 대한 AWS자격 증명 설정
-
SageMaker 엔드포인트에 양방향 스트리밍을 지원하는 모델 배포
양방향 스트리밍 클라이언트 설정
다음 예제에서는 양방향 스트리밍에 필요한 구성 요소를 초기화하는 방법을 보여줍니다.
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)
전체 양방향 스트리밍 클라이언트
다음 예제에서는 여러 텍스트 페이로드를 SageMaker 엔드포인트로 보내고 실시간으로 응답을 처리하는 양방향 스트리밍 클라이언트를 생성하는 방법을 보여줍니다.
import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())
클라이언트는 양방향 스트리밍 연결에 필요한 포트 8443의 리전 엔드포인트 URI를 사용하여 SageMaker 런타임 HTTP/2 클라이언트를 초기화합니다. start_session() 메서드는를 호출invoke_endpoint_with_bidirectional_stream()하여 영구 연결을 설정하고 수신 응답을 동시에 처리하는 비동기 작업을 생성합니다.
send_event() 메서드는 적절한 요청 객체에 페이로드 데이터를 래핑하여 입력 스트림을 통해 전송하는 반면, _process_responses() 메서드는 엔드포인트가 도착할 때 엔드포인트에서 응답을 지속적으로 수신 대기하고 처리합니다. 이 양방향 접근 방식을 사용하면 동일한 연결을 통해 요청 전송과 응답 수신이 동시에 이루어지는 실시간 상호 작용이 가능합니다.