As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Use um pipeline OpenSearch de ingestão com o Amazon Kinesis Data Streams
Use um pipeline OpenSearch de ingestão com o Amazon Kinesis Data Streams para ingerir dados de registros de stream de vários streams em domínios e coleções do Amazon Service. OpenSearch O pipeline OpenSearch de ingestão incorpora a infraestrutura de ingestão de streaming para fornecer uma forma de alta escala e baixa latência de ingerir continuamente registros de streaming do Kinesis.
Tópicos
Amazon Kinesis Data Streams como origem
Com o procedimento a seguir, você aprenderá a configurar um pipeline de OpenSearch ingestão que usa o Amazon Kinesis Data Streams como fonte de dados. Esta seção aborda os pré-requisitos necessários, como criar um domínio de OpenSearch serviço ou uma coleção OpenSearch sem servidor e percorrer as etapas para configurar a função do pipeline e criar o pipeline.
Pré-requisitos
Para configurar o pipeline, você precisa de um ou mais Kinesis Data Streams. Esses fluxos devem estar recebendo registros ou estar prontos para receber registros de outras origens. Para obter mais informações, consulte Visão geral da OpenSearch ingestão.
Para configurar o pipeline
-
Crie um domínio OpenSearch de serviço ou uma coleção OpenSearch sem servidor
Para criar um domínio ou uma coleção, consulte Introdução à OpenSearch ingestão.
Para criar um perfil do IAM com as permissões certas para acessar dados de gravação na coleção ou no domínio, consulte Políticas baseadas no recurso.
-
Configurar o perfil do pipeline com permissões
Configure o perfil de pipeline que você deseja usar na configuração do pipeline e adicione a ele as seguintes permissões. Substitua
placeholder valuespor suas próprias informações.Se a criptografia do lado do servidor estiver habilitada nos fluxos, a política da AWS KMS a seguir permitirá descriptografar os registros. Substitua
placeholder valuespor suas próprias informações.Para que um pipeline grave dados em um domínio, o domínio deve ter uma política de acesso em nível de domínio que permita que a função de pipeline sts_role_arn o acesse.
O exemplo de política de acesso a domínio a seguir permite que o perfil de pipeline denominado
pipeline-rolecriado na etapa anterior () grave dados no domínioingestion-domain. Substituaplaceholder valuespor suas próprias informações.{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::your-account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:Região da AWS:account-id:domain/domain-name/*" } ] } -
Criar o pipeline
Configure um pipeline OpenSearch de ingestão especificando K inesis-data-streams como a origem. Você pode localizar um blueprint pronto disponível no Console OpenSearch de ingestão para criar esse pipeline. (Opcional) Para criar o pipeline usando o AWS CLI, você pode usar um blueprint chamado "
AWS-KinesisDataStreamsPipeline”. Substituaplaceholder valuespor suas próprias informações.version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Região da AWS of the Data Stream. region: "us-east-1" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Região da AWS of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"Opções de configuração
Para ver as opções de configuração do Kinesis, consulte Opções de configuração
na OpenSearchdocumentação. Atributos de metadados disponíveis
-
stream_name: nome do Kinesis Data Streams de onde o registro foi ingerido
-
partition_key: chave de partição do registro do Kinesis Data Streams que está sendo ingerido
-
sequence_number: número da sequência do registro do Kinesis Data Streams que está sendo ingerido
-
sequence_number: número da subsequência do registro do Kinesis Data Streams que está sendo ingerido
-
-
(Opcional) Configure as unidades de computação recomendadas (OCUs) para o pipeline do Kinesis Data Streams
Um pipeline de origem do OpenSearch Kinesis Data Streams também pode ser configurado para ingerir registros de stream de mais de um stream. Para ter uma ingestão mais rápida, recomendamos que você adicione uma unidade computacional adicional por cada novo fluxo adicionado.
Consistência de dados
OpenSearch A ingestão suporta o end-to-end reconhecimento para garantir a durabilidade dos dados. Quando o pipeline lê registros de fluxo do Kinesis, ele distribui dinamicamente o trabalho de leitura de registros de fluxo com base nos fragmentos associados aos fluxos. O Pipeline verificará automaticamente os fluxos quando receber uma confirmação após ingerir todos os registros no domínio ou na coleção. OpenSearch Isso evitará o processamento duplicado dos registros de fluxo.
Para criar o índice com base no nome do fluxo, defina-o na seção do coletor do opensearch como “index_${getMetadata(\“stream_name\”)}”.
Várias contas do Amazon Kinesis Data Streams como origem
Você pode conceder acesso a várias contas com o Amazon Kinesis Data Streams OpenSearch para que os pipelines de ingestão possam acessar o Kinesis Data Streams em outra conta como fonte. Conclua as seguintes etapas para habilitar o acesso entre contas:
Configure o acesso entre contas
-
Defina a política de recursos na conta que tem o fluxo do Kinesis
Substitua
placeholder valuespor suas próprias informações. -
(Opcional) Configurar o consumidor e a política de recursos do consumidor
Essa é uma etapa opcional e apenas será necessária se você planejar usar a estratégia de consumidor de fanout avançado para ler registros de fluxo. Para ter mais informações, consulte Desenvolver consumidores de fanout avançado com throughput dedicado.
-
Configurar consumidor
Para reutilizar um consumidor existente, você pode ignorar esta etapa. Para obter mais informações, consulte a RegisterStreamConsumerReferência da API do Amazon Kinesis Data Streams.
No exemplo de comando CLI a seguir, substitua o por suas
placeholder valuespróprias informações.exemplo Exemplo de comando da CLI:
aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:Região da AWS:account-id:stream/stream-name" \ --consumer-nameconsumer-name -
Configurar a política de recursos do consumidor
Na declaração a seguir,
placeholder valuessubstitua o por suas próprias informações.
-
-
Configuração do pipeline
Para ingestão entre contas, adicione os seguintes atributos em
kinesis_data_streamspara cada fluxo:-
stream_arn: o arn do fluxo que pertence à conta na qual o fluxo existe -
consumer_arn: esse é um atributo opcional e deve ser especificado se a estratégia padrão de consumidor de fanout avançado for escolhida. Especifique o arn do consumidor real para esse campo. Substituaplaceholder valuespor suas próprias informações.
version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name" consumer_arn: "consumer arn" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Região da AWS of the domain. region: "us-east-1" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" # Provide the Região da AWS of the domain. region: "us-east-1" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name:network-policy-name# Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq" # Provide the Região da AWS of the bucket. region: "us-east-1" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role" -
-
Perfil do pipeline OSI no Kinesis Data Streams
-
Política do IAM
Adicione a seguinte política ao perfil do pipeline. Substitua
placeholder valuespor suas próprias informações. -
Política de confiança
Para ingerir dados da conta do fluxo, você precisará estabelecer uma relação de confiança entre o perfil de ingestão do pipeline e a conta do fluxo. Adicione o que se seque ao perfil do pipeline. Substitua
placeholder valuespor suas próprias informações.
-