

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Utilice una canalización OpenSearch de ingestión con Amazon Kinesis Data Streams
<a name="configure-client-kinesis"></a>

Utilice una canalización de OpenSearch ingestión con Amazon Kinesis Data Streams para incorporar datos de registros de transmisiones de varias transmisiones a dominios y colecciones de OpenSearch Amazon Service. La canalización de OpenSearch ingestión incorpora la infraestructura de ingesta de streaming para proporcionar una forma a gran escala y baja latencia de ingerir continuamente los registros de streaming de Kinesis.

**Topics**
+ [

## Amazon Kinesis Data Streams como origen
](#confluent-cloud-kinesis)
+ [

## Amazon Kinesis Data Streams como origen
](#kinesis-cross-account-source)

## Amazon Kinesis Data Streams como origen
<a name="confluent-cloud-kinesis"></a>

Con el siguiente procedimiento, aprenderá a configurar una canalización de OpenSearch ingestión que utilice Amazon Kinesis Data Streams como fuente de datos. En esta sección se describen los requisitos previos necesarios, como la creación de un dominio de OpenSearch servicio o una colección OpenSearch sin servidor, y se explican los pasos para configurar la función de canalización y crear la canalización.

### Requisitos previos
<a name="s3-prereqs"></a>

Para configurar su canalización, necesitará uno o varios Kinesis Data Streams activos. Estas transmisiones deben recibir registros o estar listas para recibir registros de otros orígenes. Para obtener más información, consulte [Descripción general de la OpenSearch ingestión](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html).

**Configuración de su canalización**

1. 

**Cree un dominio OpenSearch de servicio o una colección OpenSearch sin servidor**

   Para crear un dominio o una colección, consulta [Cómo empezar a usar OpenSearch Ingestion](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html).

   Para crear un rol de IAM con los permisos correctos para acceder a los datos de escritura de la colección o dominio, consulte [Políticas basadas en recursos](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource).

1. 

**Configure el rol de canalización con permisos**

   [Configure el rol de canalización](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink) que desee usar en la configuración de canalización y agregue los siguientes permisos. Sustituya *placeholder values* por su propia información.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowReadFromStream",
               "Effect": "Allow",
               "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:DescribeStreamConsumer",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards",
                   "kinesis:ListStreams",
                   "kinesis:ListStreamConsumers",
                   "kinesis:RegisterStreamConsumer",
                   "kinesis:SubscribeToShard"
               ],
               "Resource": [
                   "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name"
               ]
           }
       ]
   }
   ```

------

   Si el cifrado del lado del servidor está habilitado en las transmisiones, la siguiente política de AWS KMS permite descifrar los registros. Sustituya *placeholder values* por su propia información.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowDecryptionOfCustomManagedKey",
               "Effect": "Allow",
               "Action": [
                   "kms:Decrypt",
                   "kms:GenerateDataKey"
               ],
               "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id"
           }
       ]
   }
   ```

------

   Para que una canalización escriba datos en un dominio, el dominio debe tener una [política de acceso a nivel de dominio](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) que permita al rol de canalización **sts\$1role\$1arn** acceder a ellos.

   El siguiente ejemplo de política de acceso al dominio permite que el rol de canalización que creó en el paso anterior, (`pipeline-role`), escriba datos en el dominio `ingestion-domain`. Sustituya *placeholder values* por su propia información.

   ```
   {
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::your-account-id:role/pipeline-role"
         },
         "Action": ["es:DescribeDomain", "es:ESHttp*"],
         "Resource": "arn:aws:es:Región de AWS:account-id:domain/domain-name/*"
       }
     ]
   }
   ```

1. 

**Creación de la canalización**

   Configure una canalización OpenSearch de ingestión especificando **K inesis-data-streams** como fuente. Puede encontrar un plano listo para usar en la consola de OpenSearch ingestión para crear dicha canalización. (Opcional) Para crear la canalización mediante el AWS CLI, puede utilizar un plano denominado "». **`AWS-KinesisDataStreamsPipeline`** Sustituya *placeholder values* por su propia información.

   ```
   version: "2"
   kinesis-pipeline:
     source:
       kinesis_data_streams:
         acknowledgments: true
         codec:
           # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records.
           # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch.
           # json:
             # key_name: "logEvents"
             # These keys contain the metadata sent by CloudWatch Subscription Filters
             # in addition to the individual log events:
             # include_keys: [ 'owner', 'logGroup', 'logStream' ]
           newline:
         streams:
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
   
           # buffer_timeout: "1s"
           # records_to_accumulate: 100
           # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
           # consumer_strategy: "polling"
           # if consumer strategy is set to "polling", enable the polling config below.
           # polling:
             # max_polling_records: 100
             # idle_time_between_reads: "250ms"
         aws:
           # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com
           sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
           # Provide the Región de AWS of the Data Stream.
           region: "us-east-1"
   
     sink:
       - opensearch:
           # Provide an Amazon OpenSearch Serverless domain endpoint
           hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
           index: "index_${getMetadata(\"stream_name\")}"
           # Ensure adding unique document id as a combination of the metadata attributes available.
           document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
           aws:
             # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
             sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
             # Provide the Región de AWS of the domain.
             region: "us-east-1"
             # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
             serverless: false
             # serverless_options:
               # Specify a name here to create or update network policy for the serverless collection
               # network_policy_name: "network-policy-name"
           # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
           # distribution_version: "es6"
           # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
           # enable_request_compression: true/false
           # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
           dlq:
             s3:
               # Provide an S3 bucket
               bucket: "your-dlq-bucket-name"
               # Provide a key path prefix for the failed requests
               # key_path_prefix: "kinesis-pipeline/logs/dlq"
               # Provide the region of the bucket.
               region: "us-east-1"
               # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
               sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

**Opciones de configuración**  
Para ver las opciones de configuración de Kinesis, consulte las [opciones de configuración](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kinesis/#configuration-options) en la *OpenSearch*documentación.

**Atributos de metadatos disponibles**
   + **stream\$1name: nombre** de las transmisiones de datos de Kinesis desde las que se ingirió el registro
   + **partition\$1key: clave** de partición del registro de Kinesis Data Streams que se está ingiriendo
   + **sequence\$1number: número de** secuencia del registro de Kinesis Data Streams que se está ingiriendo
   + **sub\$1sequence\$1number: número de** subsecuencia del registro de Kinesis Data Streams que se está ingiriendo

1. 

**(Opcional) Configure las unidades de cómputo recomendadas (OCUs) para la canalización de Kinesis Data Streams**

   También se puede configurar una canalización de origen de OpenSearch Kinesis Data Streams para ingerir registros de transmisión de más de una transmisión. Para una ingesta más rápida, le recomendamos que agregue una unidad de cómputo adicional por cada nueva transmisión que se agregue.

### Coherencia de datos
<a name="confluent-cloud-kinesis-private"></a>

OpenSearch La ingestión permite el end-to-end reconocimiento para garantizar la durabilidad de los datos. Cuando la canalización lee los registros de transmisión de Kinesis, distribuye dinámicamente el trabajo de lectura de los registros de transmisión en función de los particiones asociadas a las transmisiones. Pipeline comprobará automáticamente las transmisiones de puntos cuando reciba un acuse de recibo tras haber ingerido todos los registros del dominio o la colección. OpenSearch Esto evitará el procesamiento duplicado de los registros de las transmisiones.

Para crear el índice en función del nombre del flujo, defina el índice en la sección del receptor de opensearch como **"index\$1\$1\$1getMetadata(\$1"stream\$1name\$1")\$1"**.

## Amazon Kinesis Data Streams como origen
<a name="kinesis-cross-account-source"></a>

Puede conceder acceso a todas las cuentas con Amazon Kinesis Data Streams para OpenSearch que las canalizaciones de ingestión puedan acceder a Kinesis Data Streams de otra cuenta como fuente. Complete los siguientes pasos para habilitar el acceso entre cuentas:

**Configuración del acceso entre cuentas**

1. 

**Establezca la política de recursos en la cuenta que tiene la transmisión de Kinesis**

   Sustituya *placeholder values* por su propia información.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "StreamReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name"
           },
           {
               "Sid": "StreamEFOReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name"
           }
       ]
   }
   ```

------

1. 

**(Opcional) Configure una política para consumidores y recursos para consumidores**

   Este paso es opcional y solo será necesario si planea utilizar la estrategia Enhanced Fanout Consumer para leer los registros de las transmisiones. Para más información, consulte [Develop enhanced fan-out consumers with dedicated throughput](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html).

   1. 

**Configurar consumidor**

      Para reutilizar un consumidor existente, puede omitir este paso. Para obtener más información, consulte la [RegisterStreamConsumer](https://docs.aws.amazon.com/dms/latest/APIReference/API_RegisterStreamConsumer.html)referencia de la *API de Amazon Kinesis Data Streams*.

      En el siguiente ejemplo de comando CLI, sustituya el *placeholder values* por su propia información.  
**Example Ejemplo de comando de la CLI:**  

      ```
      aws kinesis register-stream-consumer \
      --stream-arn "arn:aws:kinesis:Región de AWS:account-id:stream/stream-name" \
      --consumer-name consumer-name
      ```

   1. 

**Configure la política de recursos para el consumidor**

      En la siguiente declaración, *placeholder values* sustitúyala por su propia información.

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Sid": "ConsumerEFOReadStatementID",
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
                  },
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name"
              }
          ]
      }
      ```

------

1. 

**Configuración de canalización**

   Para la ingesta entre cuentas, agregue los siguientes atributos en `kinesis_data_streams` para cada transmisión:
   + `stream_arn`: el ARN de la transmisión que pertenece a la cuenta en la que existe la transmisión.
   + `consumer_arn`: se trata de un atributo opcional y debe especificarse si se opta por la estrategia de consumo de distribución ramificada mejorada de forma predeterminada. Especifique el nombre real del consumidor para este campo. Sustituya *placeholder values* por su propia información.

   ```
   version: "2"
        kinesis-pipeline:
          source:
            kinesis_data_streams:
              acknowledgments: true
              codec:
                newline:
              streams:
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                  # Enable this if ingestion should start from the start of the stream.
                  # initial_position: "EARLIEST"
                  # checkpoint_interval: "PT5M"
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                   # initial_position: "EARLIEST"
        
                # buffer_timeout: "1s"
                # records_to_accumulate: 100
                # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
                # consumer_strategy: "polling"
                # if consumer strategy is set to "polling", enable the polling config below.
                # polling:
                  # max_polling_records: 100
                  # idle_time_between_reads: "250ms"
              aws:
                # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                # Provide the Región de AWS of the domain.
                region: "us-east-1"
        
          sink:
            - opensearch:
                # Provide an OpenSearch Serverless domain endpoint
                hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
                index: "index_${getMetadata(\"stream_name\")}"
                # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes
                document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
                aws:
                  # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
                  sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                  # Provide the Región de AWS of the domain.
                  region: "us-east-1"
                  # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
                  serverless: false
                    # serverless_options:
                    # Specify a name here to create or update network policy for the serverless collection
                  # network_policy_name: network-policy-name
                # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
                # distribution_version: "es6"
                # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
                # enable_request_compression: true/false
                # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
                dlq:
                  s3:
                    # Provide an Amazon S3 bucket
                    bucket: "your-dlq-bucket-name"
                    # Provide a key path prefix for the failed requests
                    # key_path_prefix: "alb-access-log-pipeline/logs/dlq"
                    # Provide the Región de AWS of the bucket.
                    region: "us-east-1"
                    # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                    sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

1. 

**OSI Pipeline Role Kinesis Data Streams**

   1. 

**Política de IAM**

      Agregue la siguiente política al rol de canalización. Sustituya *placeholder values* por su propia información.

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": [
                  "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              },
              {
                  "Sid": "allowReadFromStream",
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStream",
                      "kinesis:DescribeStreamSummary",
                      "kinesis:GetRecords",
                      "kinesis:GetShardIterator",
                      "kinesis:ListShards",
                      "kinesis:ListStreams",
                      "kinesis:ListStreamConsumers",
                      "kinesis:RegisterStreamConsumer"
                  ],
                  "Resource": [
                      "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              }
          ]
      }
      ```

------

   1. 

**Política de confianza**

      Para poder ingerir datos de la cuenta de transmisión, tendrá que establecer una relación de confianza entre la función de ingesta de canalización y la cuenta de transmisión. Agregue lo siguiente al rol de canalización. Sustituya *placeholder values* por su propia información.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [{
           "Effect": "Allow",
           "Principal": {
             "AWS": "arn:aws:iam::111122223333:root"
            },
           "Action": "sts:AssumeRole"
        }]
      }
      ```

------