本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
搭配機器學習離線批次推論使用 OpenSearch 擷取管道
Amazon OpenSearch Ingestion (OSI) 管道支援機器學習 (ML) 離線批次推論處理,以低成本有效率地充實大量資料。每當您有可非同步處理的大型資料集時,請使用離線批次推論。離線批次推論適用於 Amazon Bedrock 和 SageMaker 模型。此功能適用於所有支援 OpenSearch Service 2AWS 區域.17+ 網域的 OpenSearch Ingestion。
注意
對於即時推論處理,請使用 適用於第三方平台的 Amazon OpenSearch Service ML 連接器。
離線批次推論處理會利用稱為 ML Commons 的 OpenSearch 功能。ML Commons 透過傳輸和 REST API 呼叫提供 ML 演算法。這些呼叫會為每個 ML 請求選擇適當的節點和資源,並監控 ML 任務以確保正常執行時間。如此一來,ML Commons 可讓您利用現有的開放原始碼 ML 演算法,並減少開發新的 ML 功能所需的工作量。如需 ML Commons 的詳細資訊,請參閱 OpenSearch.org 文件中的機器學習
運作方式
您可以透過將機器學習推論處理器新增至管道,在 OpenSearch Ingestion 上建立離線批次推論管道。 https://docs.opensearch.org/latest/ingest-pipelines/processors/ml-inference/
OpenSearch Ingestion 使用具有 ML Commons 的ml_inference處理器來建立離線批次推論任務。ML Commons 接著會使用 batch_predict
管道元件的運作方式如下:
管道 1 (資料準備和轉換)*:
-
來源:從 OpenSearch Ingestion 支援的外部來源掃描資料。
-
資料處理者:原始資料會經過處理,並轉換為正確的格式,以便在整合式 AI 服務上進行批次推論。
-
S3 (接收器):處理的資料會暫存在 Amazon S3 儲存貯體中,準備做為在整合 AI 服務上執行批次推論任務的輸入。
管道 2 (觸發 ML batch_inference):
-
來源:管道 1 輸出所建立新檔案的自動化 S3 事件偵測。
-
Ml_inference 處理器:透過非同步批次工作產生 ML 推論的處理器。它透過在目標網域上執行的已設定 AI 連接器連接到 AI 服務。
-
任務 ID:每個批次任務都與 ml-commons 中的任務 ID 相關聯,以進行追蹤和管理。
-
OpenSearch ML Commons:ML Commons,託管模型以進行即時神經搜尋、管理遠端 AI 伺服器的連接器,並提供 APIs以進行批次推論和任務管理。
-
AI 服務:OpenSearch ML Commons 與 Amazon Bedrock 和 Amazon SageMaker 等 AI 服務互動,對資料執行批次推論,產生預測或洞察。結果會以非同步方式儲存至單獨的 S3 檔案。
管道 3 (大量擷取):
-
S3 (來源):批次任務的結果會儲存在 S3 中,這是此管道的來源。
-
資料轉換處理器:進一步的處理和轉換會在擷取之前套用至批次推論輸出。這可確保資料在 OpenSearch 索引中正確映射。
-
OpenSearch 索引 (Sink):處理的結果會索引到 OpenSearch 中,以進行儲存、搜尋和進一步分析。
注意
*管道 1 所述的程序是選用的。如果您願意,可以略過該程序,只需在 S3 接收器中上傳準備好的資料,即可建立批次任務。
關於 ml_inference 處理器
OpenSearch Ingestion 使用 S3 掃描來源和 ML 推論處理器之間的特殊整合來進行批次處理。S3 Scan 以僅限中繼資料模式運作,可有效率地收集 S3 檔案資訊,而無需讀取實際的檔案內容。ml_inference 處理器使用 S3 檔案 URLs 與 ML Commons 協調以進行批次處理。此設計可將掃描階段期間不必要的資料傳輸降至最低,以最佳化批次推論工作流程。您可以使用參數定義ml_inference處理器。請見此處範例:
processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://AWStest-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "AWSTestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" #AWSconfiguration settings aws: #AWS 區域where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml# Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"
使用 ml_inference 處理器改善擷取效能
OpenSearch Ingestion ml_inference 處理器可大幅增強啟用 ML 搜尋的資料擷取效能。處理器非常適合需要機器學習模型產生資料的使用案例,包括語意搜尋、多模式搜尋、文件擴充和查詢理解。在語意搜尋中,處理器可以依數量級加速大量、高維度向量的建立和擷取。
相較於即時模型調用,處理器的離線批次推論功能具有獨特的優勢。雖然即時處理需要容量限制的即時模型伺服器,但批次推論會隨需動態擴展運算資源,並平行處理資料。例如,當 OpenSearch Ingestion 管道收到十億個來源資料請求時,它會為 ML 批次推論輸入建立 100 個 S3 檔案。然後,ml_inference處理器使用 100 個 ml.m4.xlarge Amazon Elastic Compute Cloud (Amazon EC2) 執行個體啟動 SageMaker 批次任務,在 14 小時內完成十億個請求的向量化,這項任務幾乎不可能在即時模式下完成。
設定 ml_inference 處理器以擷取語意搜尋的資料請求
下列程序會逐步引導您設定和設定 OpenSearch Ingestion ml_inference 處理器,以使用文字內嵌模型擷取 10 億個資料請求以進行語意搜尋。
主題
步驟 1:在 OpenSearch 中建立連接器並註冊模型
針對下列程序,請使用 ML Commons batch_inference_sagemaker_connector_blueprint
在 OpenSearch 中建立連接器並註冊模型
-
在 SageMaker 中建立用於批次轉換的 Deep Java Library (DJL) ML 模型。若要檢視其他 DJL 模型,請參閱 GitHub 上的 semantic_search_with_CFN_template_for_Sagemaker
: POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } } -
在
actions欄位中使用batch_predict作為新action類型建立連接器:POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] } -
使用傳回的連接器 ID 註冊 SageMaker 模型:
POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" } -
使用
batch_predict動作類型叫用模型:POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }回應包含批次任務的任務 ID:
{ "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" } -
使用任務 ID 呼叫 Get Task API 來檢查批次任務狀態:
GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567回應包含任務狀態:
{ "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }
(替代程序) 步驟 1:使用CloudFormation整合範本建立連接器和模型
如果您願意,您可以使用 AWS CloudFormation自動建立 ML 推論所需的所有必要 Amazon SageMaker 連接器和模型。此方法使用 Amazon OpenSearch Service 主控台中可用的預先設定範本來簡化設定。如需詳細資訊,請參閱使用 CloudFormation設定語意搜尋的遠端推論。
部署可建立所有必要 SageMaker 連接器和模型的CloudFormation堆疊
-
開啟 Amazon OpenSearch Service 主控台。
-
在導覽窗格中選擇整合。
-
在搜尋欄位中,輸入
SageMaker,然後選擇透過 Amazon SageMaker 與文字內嵌模型整合。 -
選擇設定網域,然後選擇設定 VPC 網域或設定公有網域。
-
在範本欄位中輸入資訊。針對啟用離線批次推論,選擇 true 來佈建資源以進行離線批次處理。
-
選擇建立以建立CloudFormation堆疊。
-
建立堆疊後,在主控台中CloudFormation開啟輸出索引標籤 尋找 connector_id 和 model_id。稍後當您設定管道時,將需要這些值。
步驟 2:為 ML 離線批次推論建立 OpenSearch 擷取管道
使用下列範例建立適用於 ML 離線批次推論的 OpenSearch 擷取管道。如需建立 OpenSearch Ingestion 管道的詳細資訊,請參閱 建立 Amazon OpenSearch 擷取管道。
開始之前
在下列範例中,您可以指定 sts_role_arn 參數的 IAM 角色 ARN。使用下列程序來驗證此角色是否對應至可存取 OpenSearch 中 ml-commons 的後端角色。
-
導覽至 OpenSearch Service 網域的 OpenSearch Dashboards 外掛程式。您可以在 OpenSearch Service 主控台的網域儀表板上找到儀表板端點。
-
從主選單選擇安全性、角色,然後選取 ml_full_access 角色。
-
選擇 Mapped users (已映射的使用者)、Manage mapping (管理映射)。
-
在後端角色下,輸入需要呼叫網域許可的 Lambda 角色 ARN。以下是範例:arn:aws:iam::
111122223333:role/lambda-role -
選擇 Map (映射),並確認使用者或角色顯示在 Mapped users (已映射的使用者) 中。
為 ML 離線批次推論建立 OpenSearch 擷取管道的範例
version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name:namedata_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name:namedata_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region:namedefault_bucket_owner:account_IDcodec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region:us-west-2bucket:batch-inference-dlqkey_path_prefix:bedrock-dlqsts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml- copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null
步驟 3:準備資料以供擷取
若要準備資料以進行 ML 離線批次推論處理,請使用您自己的工具或程序自行準備資料,或使用 OpenSearch Data Prepper
下列範例使用 MS MARCO
{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...
若要使用 MS MARCO 資料集進行測試,假設您建構分佈於 100 個檔案的 10 億個輸入請求,每個請求都包含 1,000 萬個請求。這些檔案會存放在字首為 s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/ 的 Amazon S3 中。OpenSearch Ingestion 管道會同時掃描這 100 個檔案,並啟動具有 100 個工作者的 SageMaker 批次任務以進行平行處理,讓 10 億份文件能夠有效率地向 OpenSearch 進行向量化和擷取。
在生產環境中,您可以使用 OpenSearch 擷取管道來產生用於批次推論輸入的 S3 檔案。管道支援各種資料來源
步驟 4:監控批次推論任務
您可以使用 SageMaker 主控台或 監控批次推論任務AWS CLI。您也可以使用 Get Task API 來監控批次任務:
GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }
API 會傳回作用中批次任務的清單:
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }
步驟 5:執行搜尋
監控批次推論任務並確認已完成之後,您可以執行各種類型的 AI 搜尋,包括語意、混合、對話 (使用 RAG)、神經稀疏和多模態。如需 OpenSearch Service 支援的 AI 搜尋詳細資訊,請參閱 AI 搜尋
若要搜尋原始向量,請使用knn查詢類型,提供vector陣列做為輸入,並指定傳回的結果k數目:
GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }
若要執行 AI 支援的搜尋,請使用 neural 查詢類型。指定query_text輸入、您在 OpenSearch Ingestion 管道中設定的內嵌模型model_id的 ,以及傳回的結果k數目。若要從搜尋結果中排除內嵌,請在 _source.excludes 參數中指定內嵌欄位的名稱:
GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }