機械学習オフラインバッチ推論での OpenSearch Ingestion パイプラインの使用 - Amazon OpenSearch Service

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

機械学習オフラインバッチ推論での OpenSearch Ingestion パイプラインの使用

Amazon OpenSearch Ingestion (OSI) パイプラインは、機械学習 (ML) オフラインバッチ推論処理をサポートし、大量のデータを低コストで効率的に強化します。非同期で処理できる大規模なデータセットがある場合は、常にオフラインバッチ推論を使用します。オフラインバッチ推論は、Amazon Bedrock モデルと SageMaker モデルで機能します。この機能は、OpenSearch Service 2.17 以降のドメインで OpenSearch Ingestion AWS リージョン をサポートするすべての で使用できます。

注記

オフラインバッチ推論処理は、ML Commons と呼ばれる OpenSearch の機能を活用します。ML Commons は、トランスポートおよび REST API コールを通じて ML アルゴリズムを提供します。これらの呼び出しでは、ML リクエストごとに適切なノードとリソースを選択し、ML タスクをモニタリングして稼働時間を確保します。これにより、ML Commons では、既存のオープンソース ML アルゴリズムを活用し、新しい ML 機能の開発に必要な労力を削減できます。ML Commons の詳細については、OpenSearch.org ドキュメントのhttps://docs.opensearch.org/latest/ml-commons-plugin/「機械学習」を参照してください。

仕組み

OpenSearch Ingestion でオフラインバッチ推論パイプラインを作成するには、機械学習推論プロセッサをパイプラインに追加します。このプロセッサを使用すると、パイプラインを SageMaker などの AI サービスに接続してバッチ推論ジョブを実行できます。ターゲットドメインで実行されている AI コネクタ ( batch_predict サポート) を介して目的の AI サービスに接続するようにプロセッサを設定できます。

OpenSearch Ingestion は、ML Commons でml_inferenceプロセッサを使用してオフラインバッチ推論ジョブを作成します。次に、ML Commons は batch_predict API を使用します。この API は、Amazon Bedrock、Amazon SageMaker、Cohere、OpenAI の外部モデルサーバーにデプロイされたモデルを使用して、オフライン非同期モードで大規模なデータセットに対して推論を実行します。次の図は、複数のコンポーネントをオーケストレーションしてこのプロセスをエンドツーエンドで実行する OpenSearch Ingestion パイプラインを示しています。

バッチ AI 推論処理の 3 パイプラインアーキテクチャ。

パイプラインコンポーネントは次のように動作します。

パイプライン 1 (データの準備と変換)*:

  • ソース: OpenSearch Ingestion でサポートされている外部ソースからデータがスキャンされます。

  • データ処理者: raw データは処理され、統合された AI サービスのバッチ推論に適した形式に変換されます。

  • S3 (シンク): 処理されたデータは、統合された AI サービスでバッチ推論ジョブを実行するための入力として機能する Amazon S3 バケットにステージングされます。

パイプライン 2 (トリガー ML batch_inference):

  • ソース: Pipeline 1 の出力によって作成された新しいファイルの S3 イベント自動検出。

  • Ml_inference プロセッサ: 非同期バッチジョブを介して ML 推論を生成するプロセッサ。ターゲットドメインで実行されている設定済みの AI コネクタを介して AI サービスに接続します。

  • タスク ID: 各バッチジョブは、追跡と管理のために ml-commons のタスク ID に関連付けられます。

  • OpenSearch ML Commons: ML Commons。リアルタイムニューラル検索用のモデルをホストし、リモート AI サーバーへのコネクタを管理し、バッチ推論とジョブ管理用の APIs を提供します。

  • AI サービス: OpenSearch ML Commons は、Amazon Bedrock や Amazon SageMaker などの AI サービスとやり取りしてデータに対してバッチ推論を実行し、予測やインサイトを生成します。結果は別の S3 ファイルに非同期的に保存されます。

パイプライン 3 (バルク取り込み):

  • S3 (ソース): バッチジョブの結果は、このパイプラインのソースである S3 に保存されます。

  • データ変換プロセッサ: 取り込み前に、バッチ推論出力にさらに処理と変換が適用されます。これにより、データが OpenSearch インデックスに正しくマッピングされます。

  • OpenSearch インデックス (シンク): 処理された結果は OpenSearch にインデックス化され、ストレージ、検索、およびさらなる分析が行われます。

注記

*パイプライン 1 で説明されているプロセスはオプションです。必要に応じて、そのプロセスをスキップし、準備したデータを S3 シンクにアップロードしてバッチジョブを作成できます。

ml_inference プロセッサについて

OpenSearch Ingestion は、バッチ処理に S3 スキャンソースと ML 推論プロセッサ間の特殊な統合を使用します。S3 スキャンはメタデータ専用モードで動作し、実際のファイルの内容を読み取らずに S3 ファイル情報を効率的に収集します。ml_inference プロセッサは S3 ファイル URLs、バッチ処理のために ML Commons と調整します。この設計では、スキャンフェーズ中の不要なデータ転送を最小限に抑えることで、バッチ推論ワークフローを最適化します。パラメータを使用してml_inferenceプロセッサを定義します。以下がその例です。

processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://AWS test-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "AWS TestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" # AWS configuration settings aws: # AWS リージョン where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml # Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"

ml_inference プロセッサを使用した取り込みパフォーマンスの向上

OpenSearch Ingestion ml_inferenceプロセッサは、ML 対応検索のデータ取り込みパフォーマンスを大幅に強化します。プロセッサは、セマンティック検索、マルチモーダル検索、ドキュメントエンリッチメント、クエリ理解など、機械学習モデルによって生成されたデータを必要とするユースケースに最適です。セマンティック検索では、プロセッサは大量の高次元ベクトルの作成と取り込みを桁違いに高速化できます。

プロセッサのオフラインバッチ推論機能には、リアルタイムモデル呼び出しと比べて明確な利点があります。リアルタイム処理には容量制限のあるライブモデルサーバーが必要ですが、バッチ推論はコンピューティングリソースをオンデマンドで動的にスケーリングし、データを並行して処理します。例えば、OpenSearch Ingestion パイプラインが 10 億のソースデータリクエストを受信すると、ML バッチ推論入力用に 100 個の S3 ファイルが作成されます。次に、ml_inferenceプロセッサは 100 個の ml.m4.xlarge Amazon Elastic Compute Cloud (Amazon EC2) インスタンスを使用して SageMaker バッチジョブを開始し、14 時間で 10 億件のリクエストのベクトル化を完了します。これは、リアルタイムモードで達成することが実質的に不可能なタスクです。

セマンティック検索のデータリクエストを取り込むように ml_inference プロセッサを設定する

次の手順では、テキスト埋め込みモデルを使用してセマンティック検索用に 10 億のデータリクエストを取り込むように OpenSearch Ingestion ml_inferenceプロセッサを設定および設定するプロセスについて説明します。

ステップ 1: OpenSearch でコネクタを作成し、モデルを登録する

次の手順では、ML Commons batch_inference_sagemaker_connector_blueprint を使用して、Amazon SageMaker でコネクタとモデルを作成します。OpenSearch CloudFormation 統合テンプレートを使用する場合は、このセクションの(代替手順) ステップ 1: CloudFormation 統合テンプレートを使用してコネクタとモデルを作成する後半の「」を参照してください。

OpenSearch でコネクタを作成し、モデルを登録するには
  1. SageMaker でバッチ変換用の Deep Java Library (DJL) ML モデルを作成します。他の DJL モデルを表示するには、GitHub のsemantic_search_with_CFN_template_for_Sagemaker」を参照してください。

    POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } }
  2. actions フィールドで新しいactionタイプbatch_predictとして を使用してコネクタを作成します。

    POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] }
  3. 返されたコネクタ ID を使用して SageMaker モデルを登録します。

    POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" }
  4. batch_predict アクションタイプを使用してモデルを呼び出します。

    POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }

    レスポンスには、バッチジョブのタスク ID が含まれます。

    { "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" }
  5. タスク ID を使用して Get Task API を呼び出して、バッチジョブのステータスを確認します。

    GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567

    レスポンスには、タスクのステータスが含まれます。

    { "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }

(代替手順) ステップ 1: CloudFormation 統合テンプレートを使用してコネクタとモデルを作成する

必要に応じて、 AWS CloudFormation を使用して、ML 推論に必要なすべての Amazon SageMaker コネクタとモデルを自動的に作成できます。このアプローチでは、Amazon OpenSearch Service コンソールで使用できる事前設定されたテンプレートを使用することで、セットアップを簡素化します。詳細については、「CloudFormation を使用してセマンティック検索用のリモート推論をセットアップする」を参照してください。

必要なすべての SageMaker コネクタとモデルを作成する CloudFormation スタックをデプロイするには
  1. Amazon OpenSearch Service コンソールを開きます。

  2. ナビゲーションペインで、[統合] を選択します。

  3. 検索フィールドに と入力しSageMakerAmazon SageMaker を介したテキスト埋め込みモデルとの統合を選択します。

  4. Configure domain を選択し、Configure VPC domain または Configure public domain を選択します。

  5. テンプレートフィールドに情報を入力します。オフラインバッチ推論を有効にする で、オフラインバッチ処理用にリソースをプロビジョニングするには true を選択します。

  6. Create を選択して CloudFormation スタックを作成します。

  7. スタックを作成したら、コンソールで CloudFormation 出力タブを開き、 connector_idmodel_id を見つけます。これらの値は、後でパイプラインを設定するときに必要になります。

ステップ 2: ML オフラインバッチ推論用の OpenSearch Ingestion パイプラインを作成する

次のサンプルを使用して、ML オフラインバッチ推論用の OpenSearch Ingestion パイプラインを作成します。OpenSearch Ingestion のパイプラインの作成の詳細については、「」を参照してくださいAmazon OpenSearch Ingestion パイプラインの作成

[開始する前に]

次の例では、 sts_role_arnパラメータに IAM ロール ARN を指定します。次の手順を使用して、このロールが OpenSearch の ml-commons にアクセスできるバックエンドロールにマッピングされていることを確認します。

  1. OpenSearch Service ドメインの OpenSearch Dashboards プラグインに移動します。ダッシュボードエンドポイントは、OpenSearch Service コンソールのドメインダッシュボードにあります。

  2. メインメニューから [セキュリティ]、[ロール] を選択し、[ml_full_access] ロールを選択します。

  3. [マッピングされたユーザー][マッピングの管理] を選択します。

  4. バックエンドロールで、ドメインを呼び出すアクセス許可が必要な Lambda ロールの ARN を入力します。例: arn:aws:iam::111122223333:role/lambda-role

  5. [マップ] を選択し、ユーザーまたはロールが [マッピングされたユーザー] の下に表示されていることを確認します。

ML オフラインバッチ推論用の OpenSearch Ingestion パイプラインを作成するためのサンプル

version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name: name data_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name: name data_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region: name default_bucket_owner: account_ID codec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml - copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null

ステップ 3: 取り込み用にデータを準備する

ML オフラインバッチ推論処理用にデータを準備するには、独自のツールまたはプロセスを使用してデータを自分で準備するか、OpenSearch Data Prepper を使用します。パイプラインを使用してデータソースのデータを消費するか、機械学習データセットを作成して、データが正しい形式に整理されていることを確認します。

次の例では、自然言語処理タスクの実際のユーザークエリのコレクションを含む MS MARCO データセットを使用しています。データセットは JSONL 形式で構造化され、各行は ML 埋め込みモデルに送信されたリクエストを表します。

{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...

MS MARCO データセットを使用してテストするには、それぞれ 1,000 万件のリクエストを含む 100 個のファイルに分散された 10 億件の入力リクエストを構築するシナリオを想像してください。ファイルは、プレフィックス s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/ で Amazon S3 に保存されます。OpenSearch Ingestion パイプラインは、これらの 100 個のファイルを同時にスキャンし、並列処理のために 100 人のワーカーで SageMaker バッチジョブを開始するため、10 億個のドキュメントを効率的にベクトル化して OpenSearch に取り込むことができます。

本番環境では、OpenSearch Ingestion パイプラインを使用して、バッチ推論入力用の S3 ファイルを生成できます。パイプラインはさまざまなデータソースをサポートし、ソースデータを S3 ファイルに継続的に変換するようにスケジュールに従って動作します。これらのファイルは、スケジュールされたオフラインバッチジョブを通じて AI サーバーによって自動的に処理され、継続的なデータ処理と取り込みが保証されます。

ステップ 4: バッチ推論ジョブをモニタリングする

バッチ推論ジョブは、SageMaker コンソールまたは を使用してモニタリングできます AWS CLI。Get Task API を使用してバッチジョブをモニタリングすることもできます。

GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }

API はアクティブなバッチジョブタスクのリストを返します。

{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }

バッチ推論ジョブをモニタリングして完了を確認したら、セマンティック、ハイブリッド、会話型 (RAG を使用)、ニューラルスパース、マルチモーダルなど、さまざまなタイプの AI 検索を実行できます。OpenSearch Service でサポートされている AI 検索の詳細については、「AI 検索」を参照してください。

raw ベクトルを検索するには、knnクエリタイプを使用し、vector配列を入力として指定し、返される結果kの数を指定します。

GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }

AI を活用した検索を実行するには、 neuralクエリタイプを使用します。query_text 入力、OpenSearch Ingestion model_id パイプラインで設定した埋め込みモデルの 、および返された結果kの数を指定します。埋め込みを検索結果から除外するには、 _source.excludesパラメータで埋め込みフィールドの名前を指定します。

GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }