本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
預處理和後處理
您可以使用自訂的預先處理和後製處理 Python 指令碼,將輸入轉換至模型監控,或在成功執行監控後擴充程式碼。將這些指令碼上傳到 Amazon S3,並在建立模型監控時參照它們。
下列範例顯示如何使用預先處理和後製處理指令碼來自訂監控排程。將使用者預留位置文字
取代為自己的資訊。
import boto3, os from sagemaker import get_execution_role, Session from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor # Upload pre and postprocessor scripts session = Session() bucket = boto3.Session().resource("s3").Bucket(session.default_bucket()) prefix = "
demo-sagemaker-model-monitor
" pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor
.py")).upload_file("preprocessor
.py") post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor
.py")).upload_file("postprocessor
.py") # Get execution role role = get_execution_role() # can be an empty string # Instance type instance_type = "instance-type
" # instance_type = "ml.m5.xlarge" # Example # Create a monitoring schedule with pre and postprocessing my_default_monitor = DefaultModelMonitor( role=role, instance_count=1
, instance_type=instance_type, volume_size_in_gb=20
, max_runtime_in_seconds=3600
, ) s3_report_path = "s3://{}/{}".format(bucket, "reports
") monitor_schedule_name = "monitor-schedule-name
" endpoint_name = "endpoint-name
" my_default_monitor.create_monitoring_schedule( post_analytics_processor_script=post_processor_script, record_preprocessor_script=pre_processor_script, monitor_schedule_name=monitor_schedule_name, # use endpoint_input for real-time endpoint endpoint_input=endpoint_name, # or use batch_transform_input for batch transform jobs # batch_transform_input=batch_transform_name, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )
預處理指令碼
當您需要將輸入轉換為模型監控時,請使用預先處理指令碼。
例如,假設模型的輸出是陣列 [1.0,
2.1]
。Amazon SageMaker Model Monitor 容器僅適用於表格式或扁平化 JSON 結構,例如 {“
。您可以使用如下的預先處理指令碼,將陣列轉換為正確的 JSON 結構。prediction0
”: 1.0,
“prediction1
” : 2.1}
def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data output_data = inference_record.endpoint_output.data.rstrip("\n") data = output_data + "," + input_data return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }
在另一個範例中,假設您的模型具有可選功能,並且您使用 -1
來表示可選功能有缺少值。如果您有資料品質監控,您可能想要將 -1
從輸入值陣列中移除,這樣它就不會包含在監控的指標計算中。您可以使用如下所示的指令碼來移除這些值。
def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
您的預先處理指令碼會接收 inference_record
為其唯一的輸入。下列程式碼片段說明 inference_record
的範例。
{ "captureData": { "endpointInput": { "observedContentType": "text/csv", "mode": "INPUT", "data": "
132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1
", "encoding": "CSV" }, "endpointOutput": { "observedContentType": "text/csv; charset=utf-8", "mode": "OUTPUT", "data": "0.01076381653547287
", "encoding": "CSV" } }, "eventMetadata": { "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9
", "inferenceTime": "2019-11-20T23:33:12Z
" }, "eventVersion": "0
" }
下列程式碼片段顯示 inference_record
的完整類別結構。
KEY_EVENT_METADATA = "eventMetadata" KEY_EVENT_METADATA_EVENT_ID = "eventId" KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime" KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes" KEY_EVENTDATA_ENCODING = "encoding" KEY_EVENTDATA_DATA = "data" KEY_GROUND_TRUTH_DATA = "groundTruthData" KEY_EVENTDATA = "captureData" KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput" KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput" KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput" KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType" KEY_EVENTDATA_MODE = "mode" KEY_EVENT_VERSION = "eventVersion" class EventConfig: def __init__(self, endpoint, variant, start_time, end_time): self.endpoint = endpoint self.variant = variant self.start_time = start_time self.end_time = end_time class EventMetadata: def __init__(self, event_metadata_dict): self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None) self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None) self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None) class EventData: def __init__(self, data_dict): self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None) self.data = data_dict.get(KEY_EVENTDATA_DATA, None) self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) self.mode = data_dict.get(KEY_EVENTDATA_MODE, None) def as_dict(self): ret = { KEY_EVENTDATA_ENCODING: self.encoding, KEY_EVENTDATA_DATA: self.data, KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType, } return ret class CapturedData: def __init__(self, event_dict): self.event_metadata = None self.endpoint_input = None self.endpoint_output = None self.batch_transform_output = None self.ground_truth = None self.event_version = None self.event_dict = event_dict self._event_dict_postprocessed = False if KEY_EVENT_METADATA in event_dict: self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA]) if KEY_EVENTDATA in event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]: self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT]) if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]: self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT]) if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]: self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT]) if KEY_GROUND_TRUTH_DATA in event_dict: self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA]) if KEY_EVENT_VERSION in event_dict: self.event_version = event_dict[KEY_EVENT_VERSION] def as_dict(self): if self._event_dict_postprocessed is True: return self.event_dict if KEY_EVENTDATA in self.event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict() if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][ KEY_EVENTDATA_ENDPOINT_OUTPUT ] = self.endpoint_output.as_dict() if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict() self._event_dict_postprocessed = True return self.event_dict def __str__(self): return str(self.as_dict())
自訂取樣
您也可以在預先處理指令碼中套用自訂取樣策略。若要這麼做,請設定模型監控的第一方預先建置容器,根據您指定的取樣率忽略某個百分比的記錄。在下列範例中,處理常式會取樣 10% 的記錄,傳回 10% 的處理常式呼叫的記錄,其餘為空白清單。
import random def preprocess_handler(inference_record): # we set up a sampling rate of 0.1 if random.random() > 0.1: # return an empty list return [] input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
預先處理指令碼的自訂記錄
如果您的預先處理指令碼傳回錯誤,請檢查記錄至 CloudWatch 的例外狀況訊息以進行偵錯。您可以在 CloudWatch 上透過 preprocess_handler
介面來存取記錄器。您可以將指令碼中所需的任何資訊記錄至 CloudWatch。這在對預先處理指令碼進行偵錯時非常有用。以下範例顯示如何使用 preprocess_handler
介面記錄至 CloudWatch
def preprocess_handler(inference_record, logger): logger.info(f"I'm a processing record: {inference_record}") logger.debug(f"I'm debugging a processing record: {inference_record}") logger.warning(f"I'm processing record with missing value: {inference_record}") logger.error(f"I'm a processing record with bad value: {inference_record}") return inference_record
後處理指令碼
如果您想要在成功執行監控之後擴充程式碼,請使用後製處理指令碼。
def postprocess_handler(): print("Hello from post-proc script!")