翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
機械学習オフラインバッチ推論での OpenSearch Ingestion パイプラインの使用
Amazon OpenSearch Ingestion (OSI) パイプラインは、機械学習 (ML) オフラインバッチ推論処理をサポートし、大量のデータを低コストで効率的に強化します。非同期で処理できる大規模なデータセットがある場合は、常にオフラインバッチ推論を使用してください。オフラインバッチ推論は、Amazon Bedrock モデルと SageMaker モデルで機能します。この機能は、OpenSearch Service 2.17 以降のドメインで OpenSearch Ingestion AWS リージョンをサポートするすべての で使用できます。
注記
リアルタイム推論処理には、サードパーティプラットフォーム用の Amazon OpenSearch Service ML コネクタ を使用します。
オフラインバッチ推論処理は、ML Commons と呼ばれる OpenSearch の機能を活用します。ML Commons は、トランスポートおよび REST API コールを通じて ML アルゴリズムを提供します。これらの呼び出しは、ML リクエストごとに適切なノードとリソースを選択し、ML タスクを監視してアップタイムを確保します。このように、ML Commons は既存のオープンソースの ML アルゴリズムを活用し、新しい ML 機能の開発に必要な労力を削減できます。ML Commons の詳細については、OpenSearch.org ドキュメントの「Machine learning
仕組み
パイプラインに adding a machine learning inference processor
OpenSearch Ingestion は、ML Commons で ml_inference プロセッサを使用してオフラインバッチ推論ジョブを作成します。次に、ML Commons は batch_predict
パイプラインコンポーネントは次のように動作します。
パイプライン 1 (データの準備と変換)*:
-
ソース: OpenSearch Ingestion でサポートされている外部ソースからデータがスキャンされます。
-
データ処理者: raw データは処理され、統合された AI サービスのバッチ推論に適した形式に変換されます。
-
S3 (シンク): 処理されたデータは、統合された AI サービスでバッチ推論ジョブを実行するための入力として機能する Amazon S3 バケットにステージングされます。
パイプライン 2 (トリガー ML batch_inference):
-
ソース: パイプライン 1 の出力によって作成された新しいファイルの S3 イベント自動検出。
-
Ml_inference プロセッサ: 非同期バッチジョブを介して ML 推論を生成するプロセッサ。ターゲットドメインで実行されている設定済みの AI コネクタを介して AI サービスに接続します。
-
タスク ID: 各バッチジョブは、追跡と管理のために ml-commons のタスク ID に関連付けられます。
-
OpenSearch ML Commons: ML Commons。リアルタイムニューラル検索用のモデルをホストし、リモート AI サーバーへのコネクタを管理し、バッチ推論とジョブ管理用の API を提供します。
-
AI サービス: OpenSearch ML Commons は、Amazon Bedrock や Amazon SageMaker などの AI サービスとやり取りしてデータに対してバッチ推論を実行し、予測やインサイトを生成します。結果は別の S3 ファイルに非同期的に保存されます。
パイプライン 3 (バルク取り込み):
-
S3 (ソース): バッチジョブの結果は、このパイプラインのソースである S3 に保存されます。
-
データ変換プロセッサ: 取り込み前に、バッチ推論出力にさらに処理と変換が適用されます。これにより、データが OpenSearch インデックスに正しくマッピングされます。
-
OpenSearch インデックス (シンク): 処理された結果は OpenSearch にインデックス化され、ストレージ、検索、およびさらなる分析が行われます。
注記
*パイプライン 1 で説明されているプロセスはオプションです。必要に応じて、そのプロセスをスキップし、準備したデータを S3 シンクにアップロードしてバッチジョブを作成できます。
ml_inference プロセッサについて
OpenSearch Ingestion は、バッチ処理に S3 スキャンソースと ML 推論プロセッサ間の特殊な統合を使用します。S3 スキャンはメタデータ専用モードで動作し、実際のファイルの内容を読み取らずに S3 ファイル情報を効率的に収集します。ml_inference プロセッサは S3 ファイル URL を使用して、バッチ処理のために ML Commons と調整します。この設計では、スキャンフェーズ中の不要なデータ転送を最小限に抑えることで、バッチ推論ワークフローを最適化します。パラメータを使用して ml_inference プロセッサを定義します。以下がその例です。
processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://AWStest-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "AWSTestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" #AWSconfiguration settings aws: #AWS リージョンwhere the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml# Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"
ml_inference プロセッサを使用した取り込みパフォーマンスの向上
OpenSearch Ingestion ml_inference プロセッサは、ML 対応検索のデータインジェストパフォーマンスを大幅に強化します。プロセッサは、セマンティック検索、マルチモーダル検索、ドキュメントエンリッチメント、クエリ理解など、機械学習モデルによって生成されたデータを必要とするユースケースに最適です。セマンティック検索では、プロセッサは大量の高次元ベクトルの作成と取り込みを桁違いに高速化できます。
プロセッサのオフラインバッチ推論機能には、リアルタイムモデル呼び出しと比べて明確な利点があります。リアルタイム処理には容量制限のあるライブモデルサーバーが必要ですが、バッチ推論はコンピューティングリソースをオンデマンドで動的にスケーリングし、データを並行して処理します。例えば、OpenSearch Ingestion パイプラインが 10 億のソースデータリクエストを受信すると、ML バッチ推論入力用に 100 個の S3 ファイルが作成されます。次に、ml_inference プロセッサは 100 個の ml.m4.xlarge Amazon Elastic Compute Cloud (Amazon EC2) インスタンスを使用して SageMaker バッチジョブを開始し、14 時間で 10 億件のリクエストのベクトル化を完了します。これは、リアルタイムモードで達成することが事実上不可能なタスクです。
セマンティック検索のデータリクエストを取り込むように ml_inference プロセッサを設定する
次の手順では、テキスト埋め込みモデルを使用してセマンティック検索用に 10 億のデータリクエストを取り込むように OpenSearch Ingestion ml_inference プロセッサを設定および設定するプロセスについて説明します。
トピック
ステップ 1: OpenSearch でコネクタを作成し、モデルを登録する
次の手順では、ML Commons batch_inference_sagemaker_connector_blueprint
OpenSearch でコネクタを作成し、モデルを登録するには
-
SageMaker でバッチ変換用の Deep Java Library (DJL) ML モデルを作成します。他の DJL モデルを表示するには、GitHub の「semantic_search_with_CFN_template_for_Sagemaker
」を参照してください。 POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } } -
actionsフィールドで新しいactionタイプとしてbatch_predictを使用してコネクタを作成します。POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] } -
返されたコネクタ ID を使用して SageMaker モデルを登録します。
POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" } -
batch_predictアクションタイプを使用してモデルを呼び出します。POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }レスポンスには、バッチジョブのタスク ID が含まれます。
{ "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" } -
タスク ID を使用して Get Task API を呼び出して、バッチジョブのステータスを確認します。
GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567レスポンスには、タスクのステータスが含まれます。
{ "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }
(代替手順) ステップ 1: CloudFormation統合テンプレートを使用してコネクタとモデルを作成する
必要に応じて、 AWS CloudFormationを使用して、ML 推論に必要なすべての Amazon SageMaker コネクタとモデルを自動的に作成できます。このアプローチでは、Amazon OpenSearch Service コンソールで使用できる事前設定されたテンプレートを使用することで、セットアップを簡素化します。詳細については、「CloudFormationを使用してセマンティック検索用のリモート推論をセットアップする」を参照してください。
必要なすべての SageMaker コネクタとモデルを作成する CloudFormationスタックをデプロイするには
-
Amazon OpenSearch Service コンソールを開きます。
-
ナビゲーションペインで、[統合] を選択します。
-
検索フィールドに
SageMakerと入力し、[Amazon SageMaker を介したテキスト埋め込みモデルとの統合] を選択します。 -
[ドメインの設定] を選択し、[VPC ドメインの設定] または [パブリックドメインの設定] を選択します。
-
テンプレートフィールドに情報を入力します。[オフラインバッチ推論を有効化] で、オフラインバッチ処理用にリソースをプロビジョニングするには [true] を選択します。
-
Create を選択してCloudFormationスタックを作成します。
-
スタックを作成したら、CloudFormation コンソールで [出力] タブを開き、connector_id と model_id を見つけます。これらの値は、後でパイプラインを設定するときに必要になります。
ステップ 2: ML オフラインバッチ推論用の OpenSearch Ingestion パイプラインを作成する
次のサンプルを使用して、ML オフラインバッチ推論用の OpenSearch Ingestion パイプラインを作成します。OpenSearch Ingestion のパイプラインの作成の詳細については、「Amazon OpenSearch Ingestion パイプラインの作成」を参照してください。
[開始する前に]
次の例では、sts_role_arn パラメータに IAM ロール ARN を指定します。次の手順を使用して、このロールが OpenSearch の ml-commons にアクセスできるバックエンドロールにマッピングされていることを確認します。
-
OpenSearch Service ドメインの OpenSearch Dashboards プラグインに移動します。OpenSearch Service コンソールのドメインダッシュボードに Dashboards エンドポイントがあります。
-
メインメニューから [セキュリティ]、[ロール] を選択し、[ml_full_access] ロールを選択します。
-
[マッピングされたユーザー]、[マッピングの管理] を選択します。
-
[バックエンドロール] で、ドメインを呼び出すアクセス許可を必要とする Lambda ロールの ARN を入力します。例: arn:aws:iam::
111122223333:role/lambda-role -
[マップ] を選択し、ユーザーまたはロールが [マッピングされたユーザー] の下に表示されていることを確認します。
ML オフラインバッチ推論用の OpenSearch Ingestion パイプラインを作成するためのサンプル
version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name:namedata_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name:namedata_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region:namedefault_bucket_owner:account_IDcodec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region:us-west-2bucket:batch-inference-dlqkey_path_prefix:bedrock-dlqsts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml- copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null
ステップ 3: 取り込み用にデータを準備する
ML オフラインバッチ推論処理用にデータを準備するには、独自のツールまたはプロセスを使用してデータを自分で準備するか、OpenSearch Data Prepper
次の例では、自然言語処理タスクの実際のユーザークエリのコレクションを含む MS MARCO
{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...
MS MARCO データセットを使用してテストするには、それぞれ 1,000 万件のリクエストを含む 100 個のファイルに分散された 10 億件の入力リクエストを構築するシナリオを想像してください。ファイルは、プレフィックス s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/ で Amazon S3 に保存されます。OpenSearch Ingestion パイプラインは、これら 100 個のファイルを同時にスキャンし、並列処理のために 100 人のワーカーで SageMaker バッチジョブを開始するため、10 億個のドキュメントを効率的にベクトル化して OpenSearch に取り込むことができます。
本番環境では、OpenSearch Ingestion パイプラインを使用して、バッチ推論入力用の S3 ファイルを生成できます。パイプラインはさまざまなデータソース
ステップ 4: バッチ推論ジョブをモニタリングする
バッチ推論ジョブは、SageMaker コンソールまたは AWS CLI を使用してモニタリングできます。Get Task API を使用してバッチジョブをモニタリングすることもできます。
GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }
API はアクティブなバッチジョブタスクのリストを返します。
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }
ステップ 5: 検索を実行する
バッチ推論ジョブをモニタリングして完了を確認したら、セマンティック、ハイブリッド、会話型 (RAG を使用)、ニューラルスパース、マルチモーダルなど、さまざまなタイプの AI 検索を実行できます。OpenSearch Service でサポートされている AI 検索の詳細については、「AI search
raw ベクトルを検索するには、knn クエリタイプを使用し、vector 配列を入力として指定し、返される結果 k の数を指定します。
GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }
AI を活用した検索を実行するには、neural クエリタイプを使用します。query_text 入力、OpenSearch Ingestion パイプラインで設定した埋め込みモデルの model_id、および返された結果の k 数を指定します。埋め込みを検索結果から除外するには、_source.excludes パラメータで埋め込みフィールドの名前を指定します。
GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }