Descripción general de las características de canalización en Amazon OpenSearch Ingestion - Amazon OpenSearch Service

Descripción general de las características de canalización en Amazon OpenSearch Ingestion

Amazon OpenSearch Ingestion aprovisiona canalizaciones, que consisten en un origen, un búfer, cero o más procesadores y uno o más receptores. Las canalizaciones de incorporación funcionan con Data Prepper como motor de datos. Para obtener información general sobre los distintos componentes de una canalización, consulte Conceptos clave de Amazon OpenSearch Ingestion.

En las secciones siguientes se incluye información general sobre algunas de las características más utilizadas en Amazon OpenSearch Ingestion.

nota

No es una lista exhaustiva de las características disponibles para las canalización. Para obtener una documentación completa de todas las funciones de canalización disponibles, consulte Documentación de Data Prepper. Tenga en cuenta que OpenSearch Ingestion impone algunas restricciones a los complementos y opciones que puede usar. Para obtener más información, consulte Complementos y opciones compatibles para las canalizaciones de Amazon OpenSearch Ingestion.

Almacenamiento en búfer persistente

Un búfer persistente almacena los datos en un búfer basado en disco en varias zonas de disponibilidad para mejorar la durabilidad de los datos. Puede utilizar el almacenamiento en búfer persistente para ingerir datos de todos los orígenes basados en push compatibles sin configurar un búfer independiente. Estos orígenes incluyen HTTP y OpenTelemetry para los registros, los rastros y las métricas. Para habilitar el almacenamiento en búfer persistente, seleccione Habilitar búfer persistente al crear o actualizar una canalización. Para obtener más información, consulte Creación de canalizaciones de Amazon OpenSearch Ingestion.

OpenSearch Ingestion determina de forma dinámica el número de OCU que se van a utilizar para el almacenamiento en búfer persistente, teniendo en cuenta el origen de datos, las transformaciones de transmisión y el destino del receptor. Como asigna algunas OCU al almacenamiento en búfer, es posible que tenga que aumentar las OCU mínimas y máximas para mantener el mismo rendimiento de ingesta. Las canalizaciones conservan los datos en el búfer hasta 72 horas.

Si habilita el almacenamiento en búfer persistente para una canalización, los tamaños máximos de carga útil de solicitud predeterminados son los siguientes:

  • Orígenes HTTP: 10 MB

  • Orígenes de OpenTelemetry: 4 MB

En el caso de los orígenes HTTP, puede aumentar el tamaño máximo de carga útil a 20 MB. El tamaño de la carga útil de la solicitud incluye toda la solicitud HTTP, que normalmente contiene varios eventos. Cada evento no puede superar los 3,5 MB.

Las canalizaciones con el almacenamiento en búfer persistente dividen las unidades de canalización configuradas en unidades de computación y unidades de búfer. Si una canalización utiliza un procesador con un uso intensivo de la CPU, como grok, key-value o split string, asigna las unidades en una proporción de búfer/proceso de 1:1. De lo contrario, las asigna en una proporción de 3:1, lo que siempre favorece las unidades de cómputo.

Por ejemplo:

  • Canalización con grok y 2 unidades como máximo: 1 unidad de computación y 1 unidad de búfer

  • Canalización con grok y 5 unidades como máximo: 3 unidades de computación y 2 unidades de búfer

  • Canalización sin procesadores y 2 unidades como máximo: 1 unidad de computación y 1 unidad de búfer

  • Canalización sin procesadores y 4 unidades como máximo: 1 unidad de computación y 3 unidades de búfer

  • Canalización con grok y 5 unidades como máximo: 2 unidades de computación y 3 unidades de búfer

De forma predeterminada, las canalizaciones utilizan una Clave propiedad de AWS para cifrar los datos del búfer. Estas canalizaciones no necesitan ningún permiso adicional para el rol de canalización.

Como alternativa, se puede especificar una clave administrada por el cliente y agregar los siguientes permisos de IAM al rol de canalización:

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:us-east-1:111122223333:key/ASIAIOSFODNN7EXAMPLE" } ] }

Para más información, consulte las claves administradas por el cliente en la Guía para desarrolladores de AWS Key Management Service.

nota

Si deshabilita el almacenamiento en búfer persistente, su canalización se ejecuta por completo con el almacenamiento en búfer en memoria.

División

Puede configurar una canalización de OpenSearch Ingestion para dividir los eventos entrantes en una subcanalización, lo que le permitirá realizar diferentes tipos de procesamiento en el mismo evento entrante.

El siguiente ejemplo de canalización divide los eventos entrantes en dos subcanalizaciones. Cada subcanalización utiliza su propio procesador para enriquecer y manipular los datos y, a continuación, los envía a distintos índices de OpenSearch.

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

Encadenar

Puede encadenar varias subcanalizaciones para procesar y enriquecer los datos por partes. En otras palabras, puede enriquecer un evento entrante con ciertas capacidades de procesamiento en una subcanalización, luego enviarlo a otra subcanalización para un enriquecimiento adicional con un procesador diferente, y finalmente enviarlo a su receptor de OpenSearch.

En el siguiente ejemplo, la subcanalización log_pipeline enriquece un evento de registro entrante con un conjunto de procesadores y, a continuación, envía el evento a un índice de OpenSearch denominado enriched_logs. La canalización envía el mismo evento a la subcanalización de log_advanced_pipeline, que lo procesa y lo envía a un índice de OpenSearch diferente denominado enriched_advanced_logs.

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

Colas de mensajes fallidos

Las colas de mensajes fallidos (DLQ) son los destinos de los eventos que una canalización no logra registrar en un receptor. En OpenSearch Ingestion, debe especificar un bucket de Amazon S3 con los permisos de escritura adecuados para usarlo como DLQ. Puede añadir una configuración de DLQ a cada receptor de una canalización. Cuando una canalización detecta errores de escritura, crea objetos de DLQ en el bucket de S3 configurado. Los objetos DLQ existen dentro de un archivo JSON como una matriz de eventos fallidos.

Una canalización escribe eventos en la DLQ cuando se cumple alguna de las siguientes condiciones:

  • Se ha agotado el número máximo de reintentos del receptor de OpenSearch. OpenSearch Ingestion requiere un mínimo de 16 para esta configuración.

  • El receptor rechaza los eventos debido a una condición de error.

Configuración

Para configurar una cola de mensajes fallidos para una subcanalización, elija Habilitar DLQ de S3 al configurar el destino de su receptor. A continuación, especifique los ajustes necesarios para la cola. Para obtener más información, consulte Configuración en la documentación de Data Prepper DLQ.

Los archivos que se escriben en este DLQ de S3 tienen el siguiente patrón de nomenclatura:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

Para obtener instrucciones acerca de cómo configurar manualmente el rol de canalización para permitir el acceso al bucket de S3 en el que escribe la DLQ, consulte Permisos para escribir en Amazon S3 o en una cola de mensajes fallidos.

Ejemplo

Considere el siguiente ejemplo de archivo DLQ:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

Este es un ejemplo de datos que no se pudieron escribir en el receptor y que se envían al bucket DLQ de S3 para su posterior análisis:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

Administración de índices

Amazon OpenSearch Ingestion cuenta con numerosas funciones de administración de índices, entre las que se incluyen las siguientes.

Creación de índices

Puede especificar un nombre de índice en un receptor de canalización y OpenSearch Ingestion crea el índice cuando aprovisiona la canalización. Si ya existe un índice, la canalización lo usa para indexar los eventos entrantes. Si detiene y reinicia una canalización, o si actualiza su configuración de YAML, la canalización intentará crear nuevos índices si aún no existen. Una canalización nunca puede eliminar un índice.

En el siguiente ejemplo, los receptores crean dos índices cuando se aprovisiona la canalización:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

Creación de nombres y patrones de índice

Puede generar nombres de índice dinámicos mediante variables de los campos de los eventos entrantes. En la configuración del receptor, utilice el formato string${} para indicar la interpolación de cadenas y utilice un puntero JSON para extraer los campos de los eventos. Las opciones para index_type son custom o management_disabled. Como index_type se revierte a custom para los dominios de OpenSearch y management_disabled para las colecciones de OpenSearch sin servidor, se puede dejar sin configurar.

Por ejemplo, la siguiente canalización selecciona el campo metadataType entre los eventos entrantes para generar nombres de índice.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

La siguiente configuración sigue generando un índice nuevo cada día o cada hora.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

El nombre del índice también puede ser una cadena simple con un patrón de fecha y hora como sufijo, por ejemplo my-index-%{yyyy.MM.dd}. Cuando el receptor envía datos a OpenSearch, reemplaza el patrón de fecha y hora por el horario UTC y crea un índice nuevo para cada día, por ejemplo my-index-2022.01.25. Para obtener más información, consulte la clase DateTimeFormatter.

Este nombre de índice también puede ser una cadena formateada (con o sin un sufijo de patrón de fecha y hora), como my-${index}-name. Cuando el receptor envía datos a OpenSearch, reemplaza la parte "${index}" por el valor en caso de que se esté procesando. Si el formato lo es "${index1/index2/index3}", reemplaza el campo index1/index2/index3 por su valor en el evento.

Creación de identificadores de documentos

Una canalización puede generar un identificador de documento al indexar los documentos en OpenSearch. Puede deducir estos identificadores de documentos a partir de los campos incluidos en los eventos entrantes.

En este ejemplo, se utiliza el campo uuid de un evento entrante para generar un identificador de documento.

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" "document_id": "uuid"

En el ejemplo siguiente, el procesador Add entries combina los campos uuid y other_field del evento entrante para generar un identificador de documento.

La acción create garantiza que los documentos con identificadores idénticos no se sobrescriban. La canalización elimina los documentos duplicados sin ningún tipo de reintento ni ningún evento de DLQ. Esta es una expectativa razonable para los autores de canalización que utilizan esta acción, ya que el objetivo es evitar actualizar los documentos existentes.

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id: "my_doc_id"

Es posible que desee establecer el identificador del documento de un evento en un campo de un subobjeto. En el siguiente ejemplo, el complemento receptor de OpenSearch utiliza el subobjeto info/id para generar un identificador de documento.

sink: - opensearch: ... document_id: info/id

Si se produce el siguiente evento, la canalización generará un documento con el campo _id establecido en json001:

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

Creación de identificadores de enrutamiento

Puede usar la opción routing_field del complemento del receptor de OpenSearch para establecer el valor de una propiedad de enrutamiento de documentos (_routing) en el valor de un evento entrante.

El enrutamiento admite la sintaxis de punteros JSON, por lo que también están disponibles campos anidados, no solo campos de nivel superior.

sink: - opensearch: ... routing_field: metadata/id document_id: id

Si se produce el siguiente evento, el complemento genera un documento con el campo _routing establecido en abcd:

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

Para obtener instrucciones sobre cómo crear plantillas de índice que las canalizaciones puedan utilizar durante la creación del índice, consulte Plantillas de índice.

Reconocimiento integral

OpenSearch Ingestion garantiza la durabilidad y la fiabilidad de los datos mediante el seguimiento de su entrega desde el origen hasta los receptores en las canalizaciones sin estado mediante un reconocimiento integral.

nota

Actualmente, solo el complemento fuente de S3 admite el reconocimiento integral.

Con el reconocimiento integral, el complemento de origen de la canalización crea un conjunto de reconocimiento para monitorear un lote de eventos. Recibe un reconocimiento positivo cuando esos eventos se envían correctamente a sus receptores, o un reconocimiento negativo cuando alguno de los eventos no se ha podido enviar a sus receptores.

En caso de fallo o caída de un componente de la canalización, o si un origen no recibe un reconocimiento, se agota el tiempo de espera y toma las medidas necesarias, como volver a intentarlo o registrar el fallo. Si la canalización tiene configurados varios receptores o subcanalizaciones, las confirmaciones a nivel de evento solo se envían después de que el evento se envíe a todos los receptores de todas las subcanalizaciones. Si un receptor tiene una DLQ configurada, los reconocimientos integrales también rastrean los eventos escritos en la DLQ.

Para habilitar el reconocimiento de extremo a extremo, expanda Opciones adicionales en la configuración de origen de Amazon S3 y elija Habilitar el reconocimiento de mensajes de extremo a extremo.

Contrapresión del origen

Una canalización puede sufrir una contrapresión cuando está ocupada procesando datos o si sus receptores están temporalmente inactivos o tardan en incorporar datos. OpenSearch Ingestion tiene diferentes formas de gestionar la contrapresión en función del complemento origen que utilice la canalización.

Origen de HTTP

Las canalizaciones que utilizan el complemento origen de HTTP gestionan la contrapresión de forma diferente en función del componente de la canalización que esté congestionado:

  • Búferes: cuando los búferes están llenos, la canalización comienza a devolver el estado HTTP REQUEST_TIMEOUT con el código de error 408 al punto de conexión de origen. A medida que se van liberando los búferes, la canalización vuelve a procesar los eventos HTTP.

  • Subprocesos de origen: cuando todos los subprocesos de origen HTTP están ocupados ejecutando solicitudes y el tamaño de la cola de solicitudes sin procesar ha superado el número máximo de solicitudes permitido, la canalización comienza a devolver el estado HTTP TOO_MANY_REQUESTS con el código de error 429 al punto de conexión de origen. Cuando la cola de solicitudes cae por debajo del tamaño máximo permitido, la canalización vuelve a procesar las solicitudes.

Origen de OTel

Cuando los búferes están llenos para las canalizaciones que utilizan orígenes de OpenTelemetry (registros de OTel, métricas de OTel y trazas de OTel), la canalización comienza a devolver el estado HTTP REQUEST_TIMEOUT con el código de error 408 al punto de conexión de origen. A medida que se van liberando los búferes, la canalización vuelve a procesar los eventos.

Fuente de S3

Cuando los búferes están llenos para las canalizaciones con un origen de S3, las canalizaciones dejan de procesar las notificaciones de SQS. A medida que se van liberando los búferes, la canalización vuelve a procesar las notificaciones.

Si un receptor está inactivo o no puede incorporar datos y está activado el reconocimiento integral para el origen, la canalización deja de procesar las notificaciones de SQS hasta que reciba un reconocimiento correcto de todos los receptores.