Usando um pipeline OpenSearch de ingestão com inferência de lote offline de aprendizado de máquina - OpenSearch Serviço Amazon

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usando um pipeline OpenSearch de ingestão com inferência de lote offline de aprendizado de máquina

Os pipelines do Amazon OpenSearch Ingestion (OSI) oferecem suporte ao processamento de inferência em lote offline de aprendizado de máquina (ML) para enriquecer com eficiência grandes volumes de dados a baixo custo. Use a inferência de lote offline sempre que tiver grandes conjuntos de dados que possam ser processados de forma assíncrona. A inferência de lotes offline funciona com o Amazon Bedrock e SageMaker os modelos. Esse recurso está disponível em todos os domínios Regiões da AWS compatíveis com OpenSearch Ingestão com o OpenSearch Service 2.17+.

nota

Para processamento de inferência em tempo real, useConectores Amazon OpenSearch Service ML para plataformas de terceiros.

O processamento offline de inferência em lote aproveita um recurso OpenSearch chamado ML Commons. O ML Commons fornece algoritmos de ML por meio de chamadas de transporte e da API REST. Essas chamadas escolhem os nós e os recursos certos para cada solicitação de ML e monitoram as tarefas de ML para garantir o tempo de atividade. Dessa forma, o ML Commons permite que você aproveite os algoritmos de ML de código aberto existentes e reduza o esforço necessário para desenvolver novos recursos de ML. Para obter mais informações sobre o ML Commons, consulte Aprendizado de máquina na OpenSearch documentação.org.

Como funciona

Você pode criar um pipeline de inferência em lote offline no OpenSearch Ingestion adicionando um processador de inferência de aprendizado de máquina a um pipeline. Esse processador permite que seu pipeline se conecte a serviços de IA, como SageMaker executar trabalhos de inferência em lote. Você pode configurar seu processador para se conectar ao serviço de IA desejado por meio dos conectores de IA (com suporte a batch_predict) em execução no seu domínio de destino.

OpenSearch A ingestão usa o ml_inference processador com o ML Commons para criar trabalhos de inferência em lote offline. Em seguida, o ML Commons usa a API batch_predict, que realiza inferência em grandes conjuntos de dados em um modo assíncrono offline usando um modelo implantado em servidores de modelos externos no Amazon Bedrock, Amazon, Cohere e OpenAI. SageMaker O diagrama a seguir mostra um pipeline de OpenSearch ingestão que orquestra vários componentes para realizar esse processo de ponta a ponta:

Arquitetura de três canais de processamento de inferência de IA em lote.

Os componentes da tubulação funcionam da seguinte maneira:

Pipeline 1 (preparação e transformação de dados) *:

  • Fonte: Os dados são digitalizados de sua fonte externa suportada pela OpenSearch ingestão.

  • Processadores de dados: os dados brutos são processados e transformados no formato correto para inferência em lote no serviço de IA integrado.

  • S3 (Sink): os dados processados são armazenados em um bucket Amazon S3 pronto para servir como entrada para a execução de trabalhos de inferência em lote no serviço de IA integrado.

Pipeline 2 (Trigger ML batch_inference):

  • Fonte: Detecção automatizada de eventos do S3 de novos arquivos criados pela saída do Pipeline 1.

  • Processador ML_Inference: processador que gera inferências de ML por meio de um trabalho em lote assíncrono. Ele se conecta aos serviços de IA por meio do conector de IA configurado que está sendo executado no seu domínio de destino.

  • ID da tarefa: cada trabalho em lotes é associado a um ID de tarefa no ml-commons para rastreamento e gerenciamento.

  • OpenSearch ML Commons: O ML Commons, que hospeda o modelo para pesquisa neural em tempo real, gerencia os conectores para servidores remotos de IA e serve APIs para inferência em lote e gerenciamento de tarefas.

  • Serviços de IA: o OpenSearch ML Commons interage com serviços de IA como Amazon Bedrock e Amazon SageMaker para realizar inferência em lote nos dados, produzindo previsões ou insights. Os resultados são salvos de forma assíncrona em um arquivo S3 separado.

Pipeline 3 (ingestão em massa):

  • S3 (fonte): os resultados dos trabalhos em lotes são armazenados no S3, que é a origem desse pipeline.

  • Processadores de transformação de dados: processamento e transformação adicionais são aplicados à saída de inferência em lote antes da ingestão. Isso garante que os dados sejam mapeados corretamente no OpenSearch índice.

  • OpenSearch índice (Sink): os resultados processados são indexados OpenSearch para armazenamento, pesquisa e análise posterior.

nota

*O processo descrito pelo Pipeline 1 é opcional. Se preferir, você pode pular esse processo e simplesmente carregar os dados preparados no coletor do S3 para criar trabalhos em lotes.

Sobre o processador ml_inference

OpenSearch A ingestão usa uma integração especializada entre a fonte do S3 Scan e o processador de inferência de ML para processamento em lote. O S3 Scan opera no modo somente de metadados para coletar com eficiência as informações do arquivo S3 sem ler o conteúdo real do arquivo. O ml_inference processador usa o arquivo S3 URLs para coordenar com o ML Commons o processamento em lote. Esse design otimiza o fluxo de trabalho de inferência em lote, minimizando a transferência desnecessária de dados durante a fase de digitalização. Você define o ml_inference processador usando parâmetros. Exemplo:

processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://AWS test-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "AWS TestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" # AWS configuration settings aws: # Região da AWS where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml # Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"

Melhorias no desempenho de ingestão usando o processador ml_inference

O ml_inference processador OpenSearch de ingestão melhora significativamente o desempenho de ingestão de dados para pesquisas habilitadas para ML. O processador é ideal para casos de uso que exigem dados gerados pelo modelo de aprendizado de máquina, incluindo pesquisa semântica, pesquisa multimodal, enriquecimento de documentos e compreensão de consultas. Na busca semântica, o processador pode acelerar a criação e a ingestão de vetores de grande volume e alta dimensão em uma ordem de magnitude.

A capacidade de inferência de lote off-line do processador oferece vantagens distintas em relação à invocação do modelo em tempo real. Embora o processamento em tempo real exija um servidor de modelo ativo com limitações de capacidade, a inferência em lote dimensiona dinamicamente os recursos de computação sob demanda e processa os dados em paralelo. Por exemplo, quando o pipeline de OpenSearch ingestão recebe um bilhão de solicitações de dados de origem, ele cria 100 arquivos S3 para entrada de inferência em lote de ML. O ml_inference processador então inicia um trabalho SageMaker em lote usando 100 instâncias do ml.m4.xlarge Amazon Elastic Compute Cloud EC2 (Amazon), concluindo a vetorização de um bilhão de solicitações em 14 horas — uma tarefa que seria praticamente impossível de realizar em tempo real.

Configurar o processador ml_inference para ingerir solicitações de dados para uma pesquisa semântica

Os procedimentos a seguir orientam você no processo de instalação e configuração do ml_inference processador de OpenSearch ingestão para ingerir um bilhão de solicitações de dados para pesquisa semântica usando um modelo de incorporação de texto.

Etapa 1: criar conectores e registrar modelos no OpenSearch

Para o procedimento a seguir, use o ML Commons batch_inference_sagemaker_connector_blueprint para criar um conector e um modelo na Amazon. SageMaker Se você preferir usar modelos de OpenSearch CloudFormation integração, consulte (Procedimento alternativo) Etapa 1: criar conectores e modelos usando um modelo de CloudFormation integração mais adiante nesta seção.

Para criar conectores e registrar modelos no OpenSearch
  1. Crie um modelo de ML da Deep Java Library (DJL) SageMaker para transformação em lote. Para ver outros modelos de DJL, consulte Semantic_search_with_cfn_template_for_SageMaker em: GitHub

    POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } }
  2. Crie um conector com batch_predict o novo action tipo no actions campo:

    POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] }
  3. Use o ID do conector retornado para registrar o SageMaker modelo:

    POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" }
  4. Invoque o modelo com o tipo de batch_predict ação:

    POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }

    A resposta contém uma ID de tarefa para o trabalho em lotes:

    { "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" }
  5. Verifique o status do trabalho em lotes chamando a API Get Task usando o ID da tarefa:

    GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567

    A resposta contém o status da tarefa:

    { "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }

(Procedimento alternativo) Etapa 1: criar conectores e modelos usando um modelo de CloudFormation integração

Se preferir, você pode usar AWS CloudFormation para criar automaticamente todos os SageMaker conectores e modelos Amazon necessários para inferência de ML. Essa abordagem simplifica a configuração usando um modelo pré-configurado disponível no console do Amazon OpenSearch Service. Para obter mais informações, consulte Usando CloudFormation para configurar a inferência remota para pesquisa semântica.

Para implantar uma CloudFormation pilha que cria todos os SageMaker conectores e modelos necessários
  1. Abra o console do Amazon OpenSearch Service.

  2. No painel de navegação, selecione Integrações.

  3. No campo PesquisarSageMaker, insira e escolha Integração com modelos de incorporação de texto por meio da Amazon SageMaker.

  4. Escolha Configurar domínio e, em seguida, escolha Configurar domínio VPC ou Configurar domínio público.

  5. Insira as informações nos campos do modelo. Em Enable Offline Batch Inference, escolha true para provisionar recursos para processamento em lote offline.

  6. Escolha Criar para criar a CloudFormation pilha.

  7. Depois que a pilha for criada, abra a guia Saídas no CloudFormation console Localize o connector_id e o model_id. Você precisará desses valores posteriormente ao configurar o pipeline.

Etapa 2: criar um pipeline OpenSearch de ingestão para inferência de lote offline de ML

Use o exemplo a seguir para criar um pipeline OpenSearch de ingestão para inferência de lote offline de ML. Para obter mais informações sobre como criar um pipeline para OpenSearch ingestão, consulteCriação de pipelines OpenSearch de ingestão da Amazon.

Antes de começar

No exemplo a seguir, você especifica um ARN de função do IAM para o sts_role_arn parâmetro. Use o procedimento a seguir para verificar se essa função está mapeada para a função de back-end que tem acesso ao ml-commons em. OpenSearch

  1. Navegue até o plug-in OpenSearch Dashboards do seu domínio OpenSearch de serviço. Você pode encontrar o endpoint do painel no painel do seu domínio no console de OpenSearch serviço.

  2. No menu principal, escolha Segurança, Funções e selecione a função ml_full_access.

  3. Escolha Usuários mapeados e Gerenciar mapeamento.

  4. Em Funções de back-end, insira o ARN da função Lambda que precisa de permissão para chamar seu domínio. Aqui está um exemplo: arn:aws:iam: ::role/ 111122223333 lambda-role

  5. Selecione Mapa e confirme se o usuário ou função aparece em Usuários mapeados.

Exemplo para criar um pipeline OpenSearch de ingestão para inferência de lote offline de ML

version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name: name data_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name: name data_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region: name default_bucket_owner: account_ID codec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml - copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null

Etapa 3: preparar seus dados para ingestão

Para preparar seus dados para o processamento de inferência em lote offline de ML, prepare você mesmo os dados usando suas próprias ferramentas ou processos ou use o OpenSearch Data Prepper. Verifique se os dados estão organizados no formato correto usando um pipeline para consumir os dados da sua fonte de dados ou criando um conjunto de dados de aprendizado de máquina.

O exemplo a seguir usa o conjunto de dados MS MARCO, que inclui uma coleção de consultas reais de usuários para tarefas de processamento de linguagem natural. O conjunto de dados é estruturado no formato JSONL, em que cada linha representa uma solicitação enviada ao modelo de incorporação de ML:

{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...

Para testar usando o conjunto de dados MS MARCO, imagine um cenário em que você constrói um bilhão de solicitações de entrada distribuídas em 100 arquivos, cada um contendo 10 milhões de solicitações. Os arquivos seriam armazenados no Amazon S3 com o prefixo s3://_djl_batch_input/. offlinebatch/sagemaker/sagemaker O pipeline de OpenSearch ingestão digitalizaria esses 100 arquivos simultaneamente e iniciaria um trabalho SageMaker em lote com 100 trabalhadores para processamento paralelo, permitindo a vetorização e a ingestão eficientes de um bilhão de documentos. OpenSearch

Em ambientes de produção, você pode usar um pipeline de OpenSearch ingestão para gerar arquivos S3 para entrada de inferência em lote. O pipeline suporta várias fontes de dados e opera em um cronograma para transformar continuamente os dados de origem em arquivos S3. Esses arquivos são então processados automaticamente pelos servidores de IA por meio de trabalhos em lote off-line agendados, garantindo o processamento e a ingestão contínuos de dados.

Etapa 4: monitorar o trabalho de inferência em lote

Você pode monitorar os trabalhos de inferência em lote usando o SageMaker console ou o. AWS CLI Você também pode usar a API Get Task para monitorar trabalhos em lotes:

GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }

A API retorna uma lista de tarefas de trabalho em lotes ativas:

{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }

Depois de monitorar o trabalho de inferência em lote e confirmar sua conclusão, você pode executar vários tipos de pesquisas de IA, incluindo semânticas, híbridas, conversacionais (com RAG), esparsas neurais e multimodais. Para obter mais informações sobre pesquisas de IA suportadas pelo OpenSearch Serviço, consulte Pesquisa de IA.

Para pesquisar vetores brutos, use o tipo de knn consulta, forneça a vector matriz como entrada e especifique o k número de resultados retornados:

GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }

Para executar uma pesquisa baseada em IA, use o tipo de neural consulta. Especifique a query_text entrada, o modelo model_id de incorporação que você configurou no pipeline de OpenSearch ingestão e o k número de resultados retornados. Para excluir incorporações dos resultados da pesquisa, especifique o nome do campo de incorporação no _source.excludes parâmetro:

GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }