Usar um pipeline do OpenSearch Ingestion com o Amazon DynamoDB
Você pode usar o plug-in do DynamoDB
Você pode processar dados do DynamoDB com ou sem um snapshot inicial completo.
-
Com um snapshot completo: o DynamoDB usa a recuperação para um ponto no tempo (PITR) para criar um backup e carregá-lo no Amazon S3. Depois, o OpenSearch Ingestion indexa o snapshot em um ou vários índices do OpenSearch. Para manter a consistência, o pipeline sincroniza todas as alterações do DynamoDB com o OpenSearch. Essa opção exige que você habilite PITR e DynamoDB Streams.
-
Sem snapshot: o OpenSearch Ingestion transmite somente os eventos novos do DynamoDB. Escolha essa opção se você já tiver um snapshot ou precisar de transmissão contínua em tempo real sem dados históricos. Essa opção exige que você habilite apenas o DynamoDB Streams.
Para obter mais informações, consulte Integração ETL zero do DynamoDB com o Amazon OpenSearch Service no Guia do desenvolvedor do Amazon DynamoDB.
Tópicos
Pré-requisitos
Para configurar o pipeline, você precisa ter uma tabela do DynamoDB com o DynamoDB Streams habilitado. Seu fluxo deve usar o tipo de visualização de fluxo NEW_IMAGE. Porém, os pipelines do OpenSearch Ingestion também podem transmitir eventos com NEW_AND_OLD_IMAGES se esse tipo de visualização de fluxo for apropriado para o seu caso de uso.
Se estiver usando snapshots, você também deverá habilitar a recuperação para um ponto no tempo na sua tabela. Para obter mais informações, consulte Criar uma tabela, Habilitar a recuperação para um ponto no tempo e Habilitar um fluxo, no Guia do Desenvolvedor do Amazon DynamoDB.
Etapa 1: configurar a função do pipeline
Depois de configurar a tabela do DynamoDB, defina o perfil de pipeline que você deseja usar na configuração do pipeline e adicione as seguintes permissões do DynamoDB nesse perfil:
Também é possível usar uma chave gerenciada pelo cliente do AWS KMS para criptografar os arquivos de dados de exportação. Para descriptografar os objetos exportados, especifique s3_sse_kms_key_id para o ID da chave na configuração de exportação do pipeline, com o seguinte formato: arn:aws:kms:. A política a seguir inclui as permissões necessárias para usar uma chave gerenciada pelo cliente:region:account-id:key/my-key-id
{ "Sid": "allowUseOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt" ], "Resource":arn:aws:kms:}region:account-id:key/my-key-id
Etapa 2: Criar o pipeline
Em seguida, é possível configurar um pipeline da Ingestão do OpenSearch como o seguinte, que especifica o DynamoDB como origem. Essa amostra de pipeline ingere dados de table-a com o snapshot de PITR, seguido por eventos do DynamoDB Streams. Uma posição inicial de LATEST indica que o pipeline deve ler os dados mais recentes do DynamoDB Streams.
version: "2" cdc-pipeline: source: dynamodb: tables: - table_arn: "arn:aws:dynamodb:region:account-id:table/table-a" export: s3_bucket: "my-bucket" s3_prefix: "export/" stream: start_position: "LATEST" aws: region: "us-east-1" sink: - opensearch: hosts: ["https://search-mydomain.region.es.amazonaws.com"] index: "${getMetadata(\"table-name\")}" index_type: custom normalize_index: true document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" document_version: "${getMetadata(\"document_version\")}" document_version_type: "external"
Você pode usar um esquema do DynamoDB pré-configurado para criar esse pipeline. Para obter mais informações, consulte Trabalhar com esquemas.
Consistência de dados
O OpenSearch Ingestion oferece suporte ao reconhecimento de ponta a ponta para garantir a durabilidade dos dados. Quando um pipeline lê snapshots ou fluxos, ele cria partições dinamicamente para processamento paralelo. O pipeline marca uma partição como concluída quando recebe uma confirmação depois de ingerir todos os registros no domínio ou na coleção do OpenSearch.
Se você quiser fazer a ingestão em uma coleção de pesquisa do OpenSearch sem Servidor, gere um ID de documento no pipeline. Se quiser fazer a ingestão em uma coleção de séries temporais do OpenSearch sem Servidor, observe que o pipeline não gera um ID de documento.
Um pipeline do OpenSearch Ingestion também mapeia ações de eventos de entrada para ações de indexação em massa correspondentes para ajudar a ingerir documentos. Isso mantém os dados consistentes, de modo que cada alteração de dados no DynamoDB seja reconciliada com as alterações correspondentes do documento no OpenSearch.
Mapear tipo de dados
O OpenSearch Service mapeia tipos de dados dinamicamente em cada documento recebido para o tipo de dados correspondente no DynamoDB. A tabela a seguir mostra como o OpenSearch Service mapeia automaticamente vários tipos de dados.
| Tipo de dados | OpenSearch | DynamoDB |
|---|---|---|
| Número |
O OpenSearch mapeia dados numéricos automaticamente. Se for um número inteiro, o OpenSearch o mapeará como um valor longo. Se for um número fracionário, o OpenSearch o mapeará como um valor flutuante. O OpenSearch mapeia vários atributos dinamicamente com base no primeiro documento enviado. Se houver uma combinação de tipos de dados para o mesmo atributo no DynamoDB, como um número inteiro e um fracionário, o mapeamento poderá falhar. Por exemplo, se o primeiro documento tiver um atributo como um número inteiro, e um documento posterior tiver o mesmo atributo como um número fracionário, o OpenSearch não conseguirá ingerir o segundo documento. Nesses casos, é necessário fornecer um modelo de mapeamento explícito, como o seguinte:
Se precisar de precisão dupla, use o mapeamento de campo do tipo string. Não há um tipo numérico equivalente com suporte para 38 dígitos de precisão no OpenSearch. |
O DynamoDB é compatível com números. |
| Number set | O OpenSearch mapeia automaticamente um conjunto de números em uma matriz de valores longos ou flutuantes. Assim como os números escalares, isso depende de o primeiro número ingerido ser um número inteiro ou fracionário. É possível fornecer mapeamentos para conjuntos de números da mesma maneira que você mapeia strings escalares. |
O DynamoDB oferece suporte a tipos que representam conjuntos de números. |
| String |
O OpenSearch mapeia valores de string automaticamente como texto. Em algumas situações, como valores enumerados, é possível mapear para o tipo de palavra-chave. O exemplo a seguir mostra como mapear um atributo do DynamoDB denominado
|
O DynamoDB é compatível com strings. |
| String set |
O OpenSearch mapeia um conjunto de strings automaticamente para uma matriz de strings. É possível fornecer mapeamentos para conjuntos de strings da mesma maneira que você mapeia strings escalares. |
O DynamoDB oferece suporte a tipos que representam conjuntos de strings. |
| Binário |
O OpenSearch mapeia dados binários automaticamente como texto. Você pode fornecer um mapeamento para escrevê-los como campos binários no OpenSearch. O exemplo a seguir mostra como mapear um atributo do DynamoDB denominado
|
O DynamoDB oferece suporte a atributos de tipo binário. |
| Binary Set |
O OpenSearch mapeia um conjunto binário automaticamente para uma matriz de dados binários como texto. É possível fornecer mapeamentos para conjuntos de números da mesma maneira que você mapeia binários escalares. |
O DynamoDB oferece suporte a tipos que representam conjuntos de valores binários. |
| Booleano |
O OpenSearch mapeia um tipo booliano do DynamoDB para um tipo booliano do OpenSearch. |
O DynamoDB é compatível com atributos do tipo booliano. |
| Null |
O OpenSearch pode ingerir documentos do tipo null do DynamoDB. Ele salva o valor como um valor nulo no documento. Não há mapeamento para esse tipo, e esse campo não é indexado nem pesquisável. Se o mesmo nome de atributo for usado para um tipo nulo e depois mudar para um tipo diferente, como string, o OpenSearch criará um mapeamento dinâmico para o primeiro valor não nulo. Os valores subsequentes ainda podem ser valores nulos do DynamoDB. |
O DynamoDB oferece suporte a atributos de tipo nulo. |
| Mapa |
O OpenSearch mapeia os atributos de mapa do DynamoDB para campos aninhados. Os mesmos mapeamentos são aplicáveis a um campo aninhado. O exemplo a seguir mapeia uma string em um campo aninhado para um tipo de palavra-chave no OpenSearch:
|
O DynamoDB oferece suporte a atributos de tipo de mapa. |
| Lista |
O OpenSearch fornece resultados diferentes para listas do DynamoDB, dependendo do que está nessas listas. Quando uma lista contém todos os mesmos tipos de tipos escalares (por exemplo, uma lista de todas as strings), o OpenSearch a ingere como uma matriz desse tipo. Isso funciona para os tipos string, número, booliano e null. As restrições para cada um desses tipos são iguais às restrições para um escalar do mesmo tipo. Também é possível fornecer mapeamentos para listas de mapas usando o mesmo mapeamento que você usaria para um mapa. Você não pode fornecer uma lista de tipos mistos. |
O DynamoDB oferece suporte para atributos de tipo de lista. |
| Defina |
O OpenSearch fornece resultados diferentes para conjuntos do DynamoDB, dependendo do que está nessas listas. Quando um conjunto contém todos os mesmos tipos de tipos escalares (por exemplo, um conjunto de todas as strings), o OpenSearch o ingere como uma matriz desse tipo. Isso funciona para os tipos string, número, booliano e null. As restrições para cada um desses tipos são iguais às restrições para um escalar do mesmo tipo. Também é possível fornecer mapeamentos para conjuntos de mapas usando o mesmo mapeamento que você usaria para um mapa. Você não pode fornecer um conjunto de tipos mistos. |
O DynamoDB oferece suporte a tipos que representam conjuntos. |
Recomendamos que você configure a fila de mensagens não entregues (DLQ) no pipeline do OpenSearch Ingestion. Se você tiver configurado a fila, o OpenSearch Service enviará todos os documentos com falha que não puderem ser ingeridos devido a falhas de mapeamento dinâmico para a fila.
Se os mapeamentos automáticos falharem, será possível usar template_type e template_content na configuração do pipeline para definir regras de mapeamento explícitas. Como alternativa, é possível criar modelos de mapeamento diretamente no seu domínio de pesquisa ou na sua coleção antes de iniciar o pipeline.
Limitações
Considere as seguintes limitações ao configurar um pipeline do OpenSearch Ingestion para o DynamoDB:
-
Atualmente, a integração do OpenSearch Ingestion com o DynamoDB não oferece suporte para ingestão entre regiões. Sua tabela do DynamoDB e o pipeline do OpenSearch Ingestion devem estar na mesma Região da AWS.
-
Sua tabela do DynamoDB e o pipeline do OpenSearch Ingestion devem estar na mesma Conta da AWS.
-
Um pipeline do OpenSearch Ingestion oferece suporte para somente uma tabela do DynamoDB como origem.
-
O DynamoDB Streams apenas armazena dados em log por até 24 horas. Se a ingestão de um snapshot inicial de uma tabela grande levar 24 horas ou mais, haverá uma certa perda inicial de dados. Para mitigar essa perda de dados, estime o tamanho da tabela e configure as unidades de computação apropriadas dos pipelines do OpenSearch Ingestion.
Alarmes do CloudWatch recomendados para DynamoDB
As seguintes métricas do CloudWatch são recomendadas para monitorar o desempenho do seu pipeline de ingestão. Essas métricas podem ajudar você a identificar a quantidade de dados processados nas exportações, a quantidade de eventos processados nos fluxos, os erros no processamento de exportações e eventos de fluxo e o número de documentos gravados no destino. Você pode configurar alarmes do CloudWatch para executar uma ação quando uma dessas métricas excede um valor especificado por um período especificado.
| Métrica | Descrição |
|---|---|
dynamodb-pipeline.blockingbuffer.bufferUsage.value |
Indica quanto do buffer está sendo utilizado. |
dynamodb-pipeline.dynamodb.activeExports3ObjectConsumers.value
|
Mostra o número total de OCUs que estão processando ativamente objetos do Amazon S3 para a exportação. |
dynamodb-pipeline.dynamodb.bytesprocessed.count
|
Contagem de bytes processados a partir da fonte do DynamoDB. |
dynamodb-pipeline.dynamodb.changeeventsprocessed.count
|
Número de eventos de alteração processados no fluxo do DynamoDB. |
dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count
|
Número de erros de eventos de alteração processados no DynamoDB. |
dynamodb-pipeline.dynamodb.exportJobFailure.count
|
Number of export job submission attempts that have failed. |
dynamodb-pipeline.dynamodb.exportJobSuccess.count
|
Number of export jobs that have been submitted successfully. |
dynamodb-pipeline.dynamodb.exportRecordsProcessed.count
|
Número total de registros processados a partir da exportação. |
dynamodb-pipeline.dynamodb.exportRecordsTotal.count
|
Número total de registros exportados do DynamoDB, essencial para acompanhar volumes de exportação de dados. |
dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count
|
Total number of export data files that have been processed successfully from Amazon S3. |
dynamodb-pipeline.opensearch.bulkBadRequestErrors.count
|
Count of errors during bulk requests due to malformed request. |
Dynamodb-pipeline.opensearch.bulkRequestLatency.avg
|
Average latency for bulk write requests made to OpenSearch. |
dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count
|
Number of bulk requests that failed because the target data could not be found. |
dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count
|
Number of retries by OpenSearch Ingestion pipelines to write OpenSearch cluster. |
dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum
|
Total size in bytes of all bulk requests made to OpenSearch. |
dynamodb-pipeline.opensearch.documentErrors.count
|
Number of errors when sending documents to OpenSearch. The documents causing the errors witll be sent to DLQ. |
dynamodb-pipeline.opensearch.documentsSuccess.count
|
Number of documents successfully written to an OpenSearch cluster or collection. |
dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count
|
Number of documents successfully indexed in OpenSearch on the first attempt. |
|
|
Count of errors due to version conflicts in documents during processing. |
|
|
Average latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writint to the destination. |
dynamodb-pipeline.opensearch.PipelineLatency.max
|
Maximum latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writing the destination. |
dynamodb-pipeline.opensearch.recordsIn.count
|
Count of records successfully ingested into OpenSearch. This metric is essential for tracking the volume of data being processed and stored. |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count
|
Number of records that failed to write to DLQ. |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count
|
Number of records that are written to DLQ. |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count
|
Count of latency measurements for requests to the Amazon S3 dead-letter queue. |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum
|
Total latency for all requests to the Amazon S3 dead-letter queue |
dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum
|
Total size in bytes of all requests made to the Amazon S3 dead-letter queue. |
dynamodb-pipeline.recordsProcessed.count
|
Total number of records processed in the pipeline, a key metric for overal throughput. |
dynamodb.changeEventsProcessed.count
|
No records are being gathered from DynamoDB streams. This could be due to no activitiy on the table, an export being in progress, or an issue accessing the DynamoDB streams. |
|
|
The attempt to trigger an export to S3 failed. |
|
|
Count of bulk request errors in OpenSearch due to invalid input, crucial for monitoring data quality and operational issues. |
opensearch.EndToEndLatency.avg
|
The end to end latnecy is higher than desired for reading from DynamoDB streams. This could be due to an underscaled OpenSearch cluster or a maximum pipeline OCU capacity that is too low for the WCU throughput on the DynamoDB table. This end to end latency will be high after an export and should decrease over time as it catches up to the latest DynamoDB streams. |