本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
搭配機器學習離線批次推論使用 OpenSearch 擷取管道
Amazon OpenSearch Ingestion (OSI) 管道支援機器學習 (ML) 離線批次推論處理,以低成本有效率地充實大量資料。每當您有可非同步處理的大型資料集時,請使用離線批次推論。離線批次推論適用於 Amazon Bedrock 和 SageMaker 模型。此功能適用於所有支援 OpenSearch Service 2 AWS 區域 .17+ 網域的 OpenSearch Ingestion。
注意
對於即時推論處理,請使用 適用於第三方平台的 Amazon OpenSearch Service ML 連接器。
離線批次推論處理會利用稱為 ML Commons 的 OpenSearch 功能。ML Commons 透過傳輸和 REST API 呼叫提供 ML 演算法。這些呼叫會為每個 ML 請求選擇適當的節點和資源,並監控 ML 任務以確保正常執行時間。如此一來,ML Commons 可讓您利用現有的開放原始碼 ML 演算法,並減少開發新的 ML 功能所需的工作量。如需 ML Commons 的詳細資訊,請參閱 OpenSearch.org 文件中的機器學習
運作方式
您可以在 OpenSearch Ingestion 上建立離線批次推論管道,方法是將機器學習推論處理器
OpenSearch Ingestion 使用具有 ML Commons 的ml_inference處理器來建立離線批次推論任務。ML Commons 接著會使用 batch_predict
管道元件的運作方式如下:
管道 1 (資料準備和轉換)*:
-
來源:從 OpenSearch Ingestion 支援的外部來源掃描資料。
-
資料處理者:原始資料會經過處理,並轉換為正確的格式,以便在整合式 AI 服務上進行批次推論。
-
S3 (接收器):處理的資料會暫存在 Amazon S3 儲存貯體中,準備做為在整合 AI 服務上執行批次推論任務的輸入。
管道 2 (觸發 ML batch_inference):
-
來源:管道 1 輸出所建立之新檔案的自動化 S3 事件偵測。
-
Ml_inference 處理器:透過非同步批次工作產生 ML 推論的處理器。它透過在目標網域上執行的已設定 AI 連接器連接到 AI 服務。
-
任務 ID:每個批次任務都與 ml-commons 中的任務 ID 相關聯,以進行追蹤和管理。
-
OpenSearch ML Commons:ML Commons,託管模型以進行即時神經搜尋、管理遠端 AI 伺服器的連接器,並提供 APIs以進行批次推論和任務管理。
-
AI 服務:OpenSearch ML Commons 與 Amazon Bedrock 和 Amazon SageMaker 等 AI 服務互動,對資料執行批次推論,產生預測或洞見。結果會以非同步方式儲存至單獨的 S3 檔案。
管道 3 (大量擷取):
-
S3 (來源):批次任務的結果會儲存在 S3 中,這是此管道的來源。
-
資料轉換處理器:進一步的處理和轉換會在擷取之前套用至批次推論輸出。這可確保資料在 OpenSearch 索引中正確映射。
-
OpenSearch 索引 (Sink):處理的結果會編製索引到 OpenSearch 中,以進行儲存、搜尋和進一步分析。
注意
*管道 1 所述的程序是選用的。如果您願意,可以略過該程序,只需在 S3 接收器中上傳準備好的資料,即可建立批次任務。
關於 ml_inference 處理器
OpenSearch Ingestion 使用 S3 掃描來源和 ML 推論處理器之間的特殊整合來進行批次處理。S3 掃描以僅限中繼資料模式運作,可有效率地收集 S3 檔案資訊,而無需讀取實際的檔案內容。ml_inference 處理器使用 S3 檔案 URLs與 ML Commons 協調以進行批次處理。此設計可將掃描階段期間不必要的資料傳輸降至最低,以最佳化批次推論工作流程。您可以使用參數定義ml_inference處理器。請見此處範例:
processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://AWS test-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "AWS TestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" # AWS configuration settings aws: # AWS 區域 where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml# Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"
使用 ml_inference 處理器改善擷取效能
OpenSearch Ingestion ml_inference 處理器可大幅增強啟用 ML 搜尋的資料擷取效能。處理器非常適合需要機器學習模型產生資料的使用案例,包括語意搜尋、多模式搜尋、文件擴充和查詢理解。在語意搜尋中,處理器可以按數量級加速大量、高維度向量的建立和擷取。
相較於即時模型調用,處理器的離線批次推論功能具有獨特的優勢。雖然即時處理需要容量限制的即時模型伺服器,但批次推論會隨需動態擴展運算資源,並平行處理資料。例如,當 OpenSearch Ingestion 管道收到十億個來源資料請求時,它會為 ML 批次推論輸入建立 100 個 S3 檔案。然後,ml_inference處理器使用 100 個 ml.m4.xlarge Amazon Elastic Compute Cloud (Amazon EC2) 執行個體啟動 SageMaker 批次任務,在 14 小時內完成十億個請求的向量化,這項任務幾乎不可能在即時模式下完成。
設定 ml_inference 處理器以擷取語意搜尋的資料請求
下列程序會逐步引導您設定和設定 OpenSearch Ingestion ml_inference 處理器,以使用文字內嵌模型擷取 10 億個資料請求以進行語意搜尋。
主題
步驟 1:在 OpenSearch 中建立連接器並註冊模型
針對下列程序,請使用 ML Commons batch_inference_sagemaker_connector_blueprint
在 OpenSearch 中建立連接器並註冊模型
-
在 SageMaker 中建立用於批次轉換的 Deep Java Library (DJL) ML 模型。若要檢視其他 DJL 模型,請參閱 GitHub 上的 semantic_search_with_CFN_template_for_Sagemaker
: POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } } -
在
actions欄位中使用batch_predict作為新action類型建立連接器:POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] } -
使用傳回的連接器 ID 註冊 SageMaker 模型:
POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" } -
使用
batch_predict動作類型叫用模型:POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }回應包含批次任務的任務 ID:
{ "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" } -
使用任務 ID 呼叫 Get Task API 來檢查批次任務狀態:
GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567回應包含任務狀態:
{ "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }
(替代程序) 步驟 1:使用 CloudFormation 整合範本建立連接器和模型
如果您願意,您可以使用 AWS CloudFormation 自動建立 ML 推論所需的所有必要 Amazon SageMaker 連接器和模型。此方法使用 Amazon OpenSearch Service 主控台中可用的預先設定範本來簡化設定。如需詳細資訊,請參閱使用 CloudFormation 設定語意搜尋的遠端推論。
部署可建立所有必要 SageMaker 連接器和模型的 CloudFormation 堆疊
-
開啟 Amazon OpenSearch Service 主控台。
-
在導覽窗格中選擇整合。
-
在搜尋欄位中,輸入
SageMaker,然後選擇透過 Amazon SageMaker 與文字內嵌模型整合。 -
選擇設定網域,然後選擇設定 VPC 網域或設定公有網域。
-
在範本欄位中輸入資訊。針對啟用離線批次推論,選擇 true 來佈建資源以進行離線批次處理。
-
選擇建立以建立 CloudFormation 堆疊。
-
建立堆疊之後,在主控台中 CloudFormation 開啟輸出索引標籤 尋找 connector_id 和 model_id。稍後當您設定管道時,將需要這些值。
步驟 2:為 ML 離線批次推論建立 OpenSearch 擷取管道
使用下列範例建立適用於 ML 離線批次推論的 OpenSearch 擷取管道。如需建立 OpenSearch Ingestion 管道的詳細資訊,請參閱 建立 Amazon OpenSearch Ingestion 管道。
開始之前
在下列範例中,您會為 sts_role_arn 參數指定 IAM 角色 ARN。使用下列程序來驗證此角色是否對應至可存取 OpenSearch 中 ml-commons 的後端角色。
-
導覽至 OpenSearch Service 網域的 OpenSearch Dashboards 外掛程式。您可以在 OpenSearch Service 主控台的網域儀表板上找到儀表板端點。
-
從主選單選擇安全性、角色,然後選取 ml_full_access 角色。
-
選擇 Mapped users (已映射的使用者)、Manage mapping (管理映射)。
-
在後端角色下,輸入需要呼叫網域許可的 Lambda 角色 ARN。以下是範例:arn:aws:iam::
111122223333:role/lambda-role -
選擇 Map (映射),並確認使用者或角色顯示在 Mapped users (已映射的使用者) 中。
為 ML 離線批次推論建立 OpenSearch 擷取管道的範例
version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name:namedata_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name:namedata_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region:namedefault_bucket_owner:account_IDcodec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region:us-west-2bucket:batch-inference-dlqkey_path_prefix:bedrock-dlqsts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml- copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null
步驟 3:準備資料以供擷取
若要準備資料以進行 ML 離線批次推論處理,請使用您自己的工具或程序自行準備資料,或使用 OpenSearch Data Prepper
下列範例使用 MS MARCO
{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...
若要使用 MS MARCO 資料集進行測試,假設您建構分佈於 100 個檔案的 10 億個輸入請求,每個請求都包含 1,000 萬個請求。這些檔案會存放在字首為 s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/ 的 Amazon S3 中。OpenSearch Ingestion 管道會同時掃描這 100 個檔案,並啟動具有 100 個工作者的 SageMaker 批次任務以進行平行處理,從而實現高效的向量化,並將十億份文件擷取到 OpenSearch。
在生產環境中,您可以使用 OpenSearch 擷取管道來產生用於批次推論輸入的 S3 檔案。管道支援各種資料來源
步驟 4:監控批次推論任務
您可以使用 SageMaker 主控台或 監控批次推論任務 AWS CLI。您也可以使用 Get Task API 來監控批次任務:
GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }
API 會傳回作用中批次任務的清單:
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }
步驟 5:執行搜尋
監控批次推論任務並確認已完成之後,您可以執行各種類型的 AI 搜尋,包括語意、混合、對話 (使用 RAG)、神經稀疏和多模態。如需 OpenSearch Service 支援的 AI 搜尋詳細資訊,請參閱 AI 搜尋
若要搜尋原始向量,請使用knn查詢類型,提供vector陣列做為輸入,並指定傳回的結果k數目:
GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }
若要執行 AI 支援的搜尋,請使用 neural 查詢類型。指定query_text輸入、您在 OpenSearch Ingestion 管道中設定的內嵌模型model_id的 ,以及傳回的結果k數目。若要從搜尋結果中排除內嵌,請在 _source.excludes 參數中指定內嵌欄位的名稱:
GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }