

Após uma análise cuidadosa, decidimos descontinuar o Amazon Kinesis Data Analytics para aplicativos SQL:

1. A partir de **1º de setembro de 2025,** não forneceremos nenhuma correção de bug para aplicativos do Amazon Kinesis Data Analytics para SQL porque teremos suporte limitado para ele, devido à próxima descontinuação.

2. A partir **de 15 de outubro de 2025,** você não poderá criar novos aplicativos Kinesis Data Analytics para SQL.

3. Excluiremos as aplicações a partir de **27 de janeiro de 2026**. Você não poderá mais iniciar nem operar as aplicações do Amazon Kinesis Data Analytics para SQL. A partir dessa data, não haverá mais suporte ao Amazon Kinesis Data Analytics para SQL. Para obter mais informações, consulte [Descontinuação de aplicações do Amazon Kinesis Data Analytics para SQL](discontinuation.md).

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Configuração de entrada do aplicativo
<a name="how-it-works-input"></a>

Seu aplicativo Amazon Kinesis Data Analytics pode receber entrada de uma única fonte de streaming e, opcionalmente, usar uma fonte de dados de referência. Para obter mais informações, consulte [Amazon Kinesis Data Analytics para aplicativos SQL: como funciona](how-it-works.md). As seções neste tópico descrevem as fontes de entrada do aplicativo.

**Topics**
+ [Configuração de uma fonte de streaming](#source-streaming)
+ [Configuração de uma fonte de referência](#source-reference)
+ [Trabalhando com JSONPath](about-json-path.md)
+ [Mapeamento de elementos de fonte de streaming para colunas de entrada do SQL](sch-mapping.md)
+ [Usar o recurso de descoberta de esquema em dados em streaming](sch-dis.md)
+ [Usar o recurso de descoberta de esquema em dados estáticos](sch-dis-ref.md)
+ [Pré-processar dados usando uma função do Lambda](lambda-preprocessing.md)
+ [Paralelização dos streams de entrada para aumentar a taxa de transferência](input-parallelism.md)

## Configuração de uma fonte de streaming
<a name="source-streaming"></a>

No momento em que você cria um aplicativo, você especifica uma fonte de streaming. Também é possível modificar uma entrada depois de criar o aplicativo. O Amazon Kinesis Data Analytics é compatível com as seguintes fontes de streaming do aplicativo:
+ Um stream de dados do Kinesis 
+ Um fluxo de entrega do Firehose

**nota**  
Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics para SQL. Os clientes atuais que usam o Kinesis Data Analytics para aplicativos SQL com `KinesisFirehoseInput` podem continuar adicionando aplicativos `KinesisFirehoseInput` em uma conta existente usando o Kinesis Data Analytics. Se você já é cliente e deseja criar uma nova conta com os aplicativos Kinesis Data Analytics para SQL com `KinesisFirehoseInput`, você pode criar um caso usando o formulário de aumento do limite de serviço. Para obter mais informações, consulte a [Central AWS Support](https://console.aws.amazon.com/support/home#/). Recomendamos sempre testar os novos aplicativos antes de passá-los à produção.

**nota**  
Se o fluxo de dados do Kinesis estiver criptografado, o Kinesis Data Analytics acessa os dados no fluxo criptografado perfeitamente sem precisar de outras configurações. O Kinesis Data Analytics não armazena dados não criptografados lidos dos fluxos de dados Kinesis. Para obter mais informações, consulte [O que é criptografia no lado do servidor para streamings de dados do Kinesis?](https://docs.aws.amazon.com/streams/latest/dev/what-is-sse.html).

O Kinesis Data Analytics apura continuamente a fonte de streaming para novos dados e os ingere em fluxos de aplicativos de acordo com a configuração de entrada. 

**nota**  
A adição de um fluxo do Kinesis como entrada do seu aplicativo não afeta os dados do fluxo. Se outro recurso, como um fluxo de entrega do Firehose, também acessasse o mesmo fluxo do Kinesis, tanto o fluxo de entrega do Firehose quanto a aplicação do Kinesis Data Analytics receberiam os mesmos dados. Entretanto, o throughput e o controle de utilização podem ser afetados.

O código de aplicativo pode consultar o stream de aplicativos. Como parte da configuração de entrada, forneça o seguinte:
+ **Fonte de streaming** – forneça o nome do recurso da Amazon (ARN) do fluxo e um perfil do IAM que o Kinesis Data Analytics pode assumir para acessar o fluxo em seu nome. 
+ **Prefixo do nome de fluxo no aplicativo** – quando você inicia o aplicativo, o Kinesis Data Analytics cria o fluxo no aplicativo especificado. No código do aplicativo, acesse o stream no aplicativo usando o mesmo nome. 

  Ou você pode mapear uma fonte de streaming para vários fluxos no aplicativo. Para obter mais informações, consulte [Limites](limits.md). Nesse caso, o Amazon Kinesis Data Analytics cria o número especificado de streams no aplicativo com os seguintes nomes *prefix*`_001`: *prefix*`_002`,, e. *prefix* `_003` Por padrão, o Kinesis Data Analytics mapeia a fonte de streaming para um stream no aplicativo chamado. *prefix* `_001`

  Há um limite na velocidade que você pode inserir linhas em um stream no aplicativo. Portanto, o Kinesis Data Analytics oferece suporte a vários streams no aplicativo para que você possa trazer registros para seu aplicativo a uma velocidade muito mais alta. Se você considerar que o aplicativo não está acompanhando o ritmo dos dados na fonte de streaming, adicione unidades de paralelismo para melhorar o desempenho. 
+ **Mapeamento de esquema** – você descreve o formato de registro (JSON, CSV) na fonte de streaming. Você também descreve como cada registro no stream mapeia para colunas no stream do aplicativo criado. É aqui você fornece nomes de colunas e tipos de dados. 

**nota**  
O Kinesis Data Analytics adiciona aspas em torno dos identificadores (nome de stream e nomes de coluna) ao criar o stream no aplicativo de entrada. Ao consultar esse stream e as colunas, você deve especificá-los entre aspas usando a mesma capitalização (letras maiúsculas e minúsculas exatamente). Para obter mais informações sobre identificadores, consulte [Identificadores](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html) no *Amazon Managed Service for Apache Flink SQL Reference*.

É possível criar um aplicativo e configurar entradas no console do Amazon Kinesis Data Analytics. O console então faz as chamadas de API necessárias. Você pode configurar a entrada do aplicativo ao criar uma nova API de aplicativo ou adicionar uma configuração de entrada a um aplicativo existente. Para obter mais informações, consulte [CreateApplication](API_CreateApplication.md) e [AddApplicationInput](API_AddApplicationInput.md). Veja a seguir a parte da configuração de entrada do corpo da solicitação da API `Createapplication`:

```
 "Inputs": [
        {
            "InputSchema": {
                "RecordColumns": [
                    {
                        "Mapping": "string",
                        "Name": "string",
                        "SqlType": "string"
                    }
                ],
                "RecordEncoding": "string",
                "RecordFormat": {
                    "MappingParameters": {
                        "CSVMappingParameters": {
                            "RecordColumnDelimiter": "string",
                            "RecordRowDelimiter": "string"
                        },
                        "JSONMappingParameters": {
                            "RecordRowPath": "string"
                        }
                    },
                    "RecordFormatType": "string"
                }
            },
            "KinesisFirehoseInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "KinesisStreamsInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "Name": "string"
        }
    ]
```

## Configuração de uma fonte de referência
<a name="source-reference"></a>

Opcionalmente, você também pode adicionar uma fonte de dados de referência a um aplicativo existente para enriquecer os dados provenientes de fontes de streaming. Você deve armazenar os dados de referência como um objeto no bucket do Amazon S3. Quando o aplicativo é iniciado, o Amazon Kinesis Data Analytics lê o objeto do Amazon S3 e cria uma tabela de referência no aplicativo. O código do aplicativo pode, então, associar-se a um streaming no aplicativo. 

Armazene os dados de referência no objeto do Amazon S3 usando formatos compatíveis (CSV, JSON). Por exemplo, suponha que o seu aplicativo realiza análises em pedidos de ações. Assuma o seguinte formato de registro na fonte de streaming:

```
Ticker, SalePrice, OrderId

AMZN     $700        1003
XYZ      $250        1004
...
```

Neste caso, você pode considerar a manutenção de uma fonte de dados de referência para fornecer detalhes para cada marcador de ações, como nome da empresa.

```
Ticker, Company
AMZN, Amazon
XYZ, SomeCompany
...
```

Você pode adicionar uma fonte de dados de referência do aplicativo com a API ou com o console. O Amazon Kinesis Data Analytics oferece as seguintes ações de API para gerenciar fontes de dados de referência:
+  [AddApplicationReferenceDataSource](API_AddApplicationReferenceDataSource.md)
+ [UpdateApplication](API_UpdateApplication.md)

Para obter informações sobre como adicionar dados de referência usando o console, consulte [Exemplo: adição de dados de referência a um aplicativo do Kinesis Data Analytics](app-add-reference-data.md).

Observe o seguinte:
+ Se o aplicativo estiver em execução, o Kinesis Data Analytics criará uma tabela de referência no aplicativo e, em seguida, carregará os dados de referência imediatamente.
+ Se o aplicativo não estiver em execução (por exemplo, no estado de prontidão), o Kinesis Data Analytics salvará apenas a configuração de entrada atualizada. Quando o aplicativo começa a executar, o Kinesis Data Analytics carrega os dados de referência no aplicativo como uma tabela.

Suponha que você deseje atualizar os dados depois que o Kinesis Data Analytics criar a tabela de referência no aplicativo. Talvez você tenha atualizado o objeto do Amazon S3 ou queira usar outro objeto do Amazon S3. Nesse caso, você pode chamar explicitamente [UpdateApplication](API_UpdateApplication.md) ou selecionar **Ações**, **Sincronizar tabela de dados de referência** no console. O Kinesis Data Analytics não atualiza a tabela de referência no aplicativo automaticamente. 

Há um limite para o tamanho do objeto do Amazon S3 que você pode criar como uma fonte de dados de referência. Para obter mais informações, consulte [Limites](limits.md). Se o tamanho do objeto exceder o limite, o Kinesis Data Analytics não poderá carregar os dados. O estado do aplicativo aparecerá como em execução, mas os dados não serão lido.

Quando você adiciona uma fonte de referência de dados, fornece as seguintes informações: 
+ **Nome da chave do objeto e bucket do S3** – além do nome do bucket e da chave do objeto, forneça também um perfil do IAM que o Kinesis Data Analytics possa assumir para ler o objeto em seu nome. 
+ **Nome da tabela de referência no aplicativo** – o Kinesis Data Analytics cria essa tabela no aplicativo e a preenche lendo o objeto do Amazon S3. Esse é o nome da tabela que você especifica no código do aplicativo.
+ **Mapeamento de esquema** – descreva o formato de registro (JSON, CSV), a codificação de dados armazenados no objeto do Amazon S3. Descreva também como cada elemento de dados é mapeado para colunas na tabela de referência no aplicativo. 

A tabela a seguir mostra o corpo de solicitação na solicitação da API `AddApplicationReferenceDataSource`.

```
{
    "applicationName": "string",
    "CurrentapplicationVersionId": number,
    "ReferenceDataSource": {
        "ReferenceSchema": {
            "RecordColumns": [
                {
                    "IsDropped": boolean,
                    "Mapping": "string",
                    "Name": "string",
                    "SqlType": "string"
                }
            ],
            "RecordEncoding": "string",
            "RecordFormat": {
                "MappingParameters": {
                    "CSVMappingParameters": {
                        "RecordColumnDelimiter": "string",
                        "RecordRowDelimiter": "string"
                    },
                    "JSONMappingParameters": {
                        "RecordRowPath": "string"
                    }
                },
                "RecordFormatType": "string"
            }
        },
        "S3ReferenceDataSource": {
            "BucketARN": "string",
            "FileKey": "string",
            "ReferenceRoleARN": "string"
        },
        "TableName": "string"
    }
}
```

# Trabalhando com JSONPath
<a name="about-json-path"></a>

**nota**  
Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics para SQL. Para saber mais, consulte [Limites](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

JSONPath é uma forma padronizada de consultar elementos de um objeto JSON. JSONPath usa expressões de caminho para navegar por elementos, elementos aninhados e matrizes em um documento JSON. Para obter mais informações sobre JSON, consulte [Introdução ao JSON](http://www.json.org/).

O Amazon Kinesis Data Analytics JSONPath usa expressões no esquema de origem do aplicativo para identificar elementos de dados em uma fonte de streaming que contém dados no formato JSON.

Para obter mais informações sobre como mapear dados de streaming para o fluxo de entrada do aplicativo, consulte [Mapeamento de elementos de fonte de streaming para colunas de entrada do SQL](sch-mapping.md).

## Acessando elementos JSON com JSONPath
<a name="about-json-path-elements"></a>

A seguir, você pode descobrir como usar JSONPath expressões para acessar vários elementos em dados formatados em JSON. Para os exemplos nesta seção, suponha que o stream de origem contenha o seguinte registro JSON:

```
{
  "customerName":"John Doe",
  "address":
  {
    "streetAddress":
    [
      "number":"123",
      "street":"AnyStreet"
    ],
    "city":"Anytown"
  }
  "orders":
  [
    { "orderId":"23284", "itemName":"Widget", "itemPrice":"33.99" },
    { "orderId":"63122", "itemName":"Gadget", "itemPrice":"22.50" },
    { "orderId":"77284", "itemName":"Sprocket", "itemPrice":"12.00" }
  ]
}
```

### Acesso a elementos JSON
<a name="about-json-path-firstlevel"></a>

Para consultar um elemento em dados JSON usando JSONPath, use a sintaxe a seguir. Aqui, `$` representa a raiz da hierarquia de dados e `elementName` é o nome do nó do elemento de consulta.

```
$.elementName
```

A expressão a seguir consulta o elemento `customerName` no exemplo de JSON anterior.

```
$.customerName
```

A expressão anterior retorna o seguinte do registro de JSON anterior.

```
John Doe
```

**nota**  
As expressões do caminho diferenciam maiúsculas e minúsculas. A expressão `$.customername` retorna `null` do exemplo de JSON anterior.

**nota**  
Se nenhum elemento aparecer no local onde a expressão do caminho especifica, a expressão retornará `null`. A expressão a seguir retorna `null` do exemplo de JSON anterior, porque não há elemento correspondente.  

```
$.customerId
```

### Acesso a elementos JSON aninhados
<a name="about-json-path-nested"></a>

Para consultar um elemento JSON aninhado, use a seguinte sintaxe.

```
$.parentElement.element
```

A expressão a seguir consulta o elemento `city` no exemplo de JSON anterior.

```
$.address.city
```

A expressão anterior retorna o seguinte do registro de JSON anterior.

```
Anytown
```

Você pode consultar mais níveis de subelementos usando a sintaxe a seguir.

```
$.parentElement.element.subElement
```

A expressão a seguir consulta o elemento `street` no exemplo de JSON anterior.

```
$.address.streetAddress.street
```

A expressão anterior retorna o seguinte do registro de JSON anterior.

```
AnyStreet
```

### Acesso a matrizes
<a name="about-json-path-arrays"></a>

Você pode acessar os dados em uma matriz JSON das seguintes formas:
+ Recupere todos os elementos na matriz como uma única linha.
+ Recupere cada elemento na matriz como uma linha separada.

#### Recuperar todos os elementos em uma matriz em uma única linha
<a name="about-json-path-arrays-row"></a>

Para consultar todo o conteúdo de uma matriz como uma única linha, use a sintaxe a seguir.

```
$.arrayObject[0:]
```

A expressão a seguir consulta todo o conteúdo do elemento `orders` no exemplo de JSON anterior usado nesta seção. Ela retorna o conteúdo da matriz em uma única coluna em uma única linha.

```
$.orders[0:]
```

A expressão anterior retorna o seguinte do registro JSON de exemplo usado nesta seção.

```
[{"orderId":"23284","itemName":"Widget","itemPrice":"33.99"},{"orderId":"61322","itemName":"Gadget","itemPrice":"22.50"},{"orderId":"77284","itemName":"Sprocket","itemPrice":"12.00"}]
```

#### Recuperar todos os elementos em uma matriz em linhas separadas
<a name="about-json-path-arrays-separate"></a>

Para consultar os elementos individuais em uma matriz como linhas separadas, use a seguinte sintaxe.

```
$.arrayObject[0:].element
```

A expressão a seguir consulta os elementos `orderId` no exemplo de JSON anterior e retorna cada elemento de matriz como uma linha separada.

```
$.orders[0:].orderId
```

A expressão anterior retorna o seguinte do registro de JSON anterior, com cada item de dados retornado como uma linha separada.


****  

|  | 
| --- |
|  23284  | 
|  63122  | 
|  77284  | 

**nota**  
Se expressões que consultam elementos não matriz estiverem incluídos em um esquema que consulta elementos de matriz individuais, os elementos não matriz serão repetidos para cada elemento na matriz. Por exemplo, suponha que um esquema para o exemplo de JSON anterior inclua as seguintes expressões:  
\$1.customerName
\$1.orders[0:].orderId
Nesse caso, as linhas de dados retornados do elemento de stream de entrada de amostra se parecem com o seguinte, com o elemento `name` repetido para cada elemento `orderId`.  


****  

|  |  | 
| --- |--- |
|  John Doe  |  23284  | 
|  John Doe  |  63122  | 
|  John Doe  |  77284  | 

**nota**  
As seguintes limitações se aplicam a expressões em matriz no Amazon Kinesis Data Analytics:  
Apenas um nível de deferência conta com suporte em uma expressão. O formato de expressão a seguir não conta com suporte.  

  ```
  $.arrayObject[0:].element[0:].subElement
  ```
Apenas uma matriz pode ser achatada em um esquema. Várias matrizes podem ser referenciadas-retornadas como uma linha contendo todos os elementos na matriz. No entanto, apenas uma matriz pode ter cada um de seus elementos retornados como linhas individuais.  
Um esquema que contém elementos no seguinte formato é válido. Este formato retorna o conteúdo da segunda matriz como uma única coluna, repetido para cada elemento na primeira matriz.  

  ```
  $.arrayObjectOne[0:].element
  $.arrayObjectTwo[0:]
  ```
Um esquema que contém elementos no seguinte formato não é válido.  

  ```
  $.arrayObjectOne[0:].element
  $.arrayObjectTwo[0:].element
  ```

## Outras considerações
<a name="about-json-path-other"></a>

As considerações adicionais para trabalhar com JSONPath são as seguintes:
+ Se nenhuma matriz for acessada por um elemento individual nas JSONPath expressões no esquema do aplicativo, uma única linha será criada no fluxo de entrada do aplicativo para cada registro JSON processado. 
+ Quando uma matriz é achatada (ou seja, seus elementos são retornados como linhas individuais), quaisquer elementos ausentes resultam na criação de um valor nulo no stream no aplicativo. 
+ Uma matriz é sempre achatada para pelo menos uma linha. Se nenhum valor for retornado (ou seja, a matriz estiver vazia ou nenhum de seus elementos for consultado), uma única linha com todos os valores nulos será retornada.

  A expressão a seguir retorna registros com valores nulos de exemplo de JSON anterior, porque não há elemento correspondente no caminho especificado.

  ```
  $.orders[0:].itemId
  ```

  A expressão anterior retorna o seguinte do registro de exemplo de JSON anterior.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/about-json-path.html)

## Related Topics
<a name="about-json-path.Related"></a>
+ [Apresentação do JSON](http://www.json.org/)

# Mapeamento de elementos de fonte de streaming para colunas de entrada do SQL
<a name="sch-mapping"></a>

**nota**  
Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics para SQL. Para saber mais, consulte [Limites](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

Com o Amazon Kinesis Data Analytics, é possível processar e analisar dados de streaming nos formatos JSON ou CSV usando o SQL padrão. 
+ Para processar e analisar dados CSV de streaming, atribua nomes de colunas e tipos de dados às colunas do stream de entrada. O aplicativo importa uma coluna do stream de entrada por definição de coluna, em ordem. 

  Não é necessário incluir todas as colunas do stream de entrada de aplicativo, mas você não pode ignorar colunas do stream de origem. Por exemplo, é possível importar as três primeiras colunas de um stream de entrada que contém cinco elementos, mas não importar apenas as colunas 1, 2 e 4.
+ Para processar e analisar dados JSON de streaming, você usa JSONPath expressões para mapear elementos JSON de uma fonte de streaming para colunas SQL em um fluxo de entrada. Para obter mais informações sobre o uso JSONPath com o Amazon Kinesis Data Analytics, [Trabalhando com JSONPath](about-json-path.md) consulte. As colunas na tabela SQL têm tipos de dados que são mapeados de tipos de JSON. Para obter os tipos de dados compatíveis, consulte [Tipos de dados](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html). Para obter detalhes sobre como converter dados JSON em dados SQL, consulte [Mapeamento de tipos de dados JSON para tipos de dados SQL](#sch-mapping-datatypes).

Para obter mais informações sobre como configurar fluxos de entrada, consulte [Configuração de entrada do aplicativo](how-it-works-input.md).

## Mapeamento de dados JSON para colunas SQL
<a name="sch-mapping-json"></a>

 Você pode mapear elementos JSON para colunas de entrada usando a API Kinesis Data Console de gerenciamento da AWS Analytics ou a Kinesis Data Analytics. 
+ Para mapear elementos para colunas usando o console, consulte [Trabalho com o editor de esquema](console-summary-edit-schema.md).
+ Para mapear elementos para colunas usando a API do Kinesis Data Analytics, consulte a seção a seguir.

Para mapear elementos JSON para colunas no stream de entrada no aplicativo, você precisa de um esquema com as seguintes informações para cada coluna:
+ **Expressão de origem:** a JSONPath expressão que identifica a localização dos dados da coluna.
+ **Nome da coluna:** o nome que as consultas do SQL usam para referenciar os dados.
+ **Tipo de dados: **o tipo de dados do SQL para a coluna.

## Usar a API
<a name="sf-map-api"></a>

Para mapear elementos de uma fonte de streaming para colunas de entrada, use a ação [CreateApplication](API_CreateApplication.md) da API do Kinesis Data Analytics. Para criar o stream no aplicativo, especifique um esquema para transformar seus dados em uma versão esquematizada usada no SQL. A ação [CreateApplication](API_CreateApplication.md) configura o aplicativo para receber a entrada de uma única fonte de streaming. Para mapear elementos JSON ou colunas CSV para colunas SQL, crie um objeto [RecordColumn](API_RecordColumn.md) na matriz [SourceSchema](API_SourceSchema.md) `RecordColumns`. O objeto [RecordColumn](API_RecordColumn.md) tem o seguinte esquema:

```
{ 
    "Mapping": "String",
    "Name": "String",
    "SqlType": "String"
}
```

Os campos do objeto [RecordColumn](API_RecordColumn.md) têm os seguintes valores:
+ `Mapping`: a JSONPath expressão que identifica a localização dos dados no registro do fluxo de entrada. Esse valor não está presente em um esquema de entrada para um stream de origem no formato CSV.
+ `Name`: o nome da coluna no streaming de dados SQL no aplicativo.
+ `SqlType`: o tipo de dados dos dados no stream de dados SQL no aplicativo.

### Exemplo de esquema de entrada JSON
<a name="sf-map-api-json-example"></a>

O exemplo a seguir demonstra o formato do valor `InputSchema` para um esquema JSON.

```
"InputSchema": {
    "RecordColumns": [
        {
            "SqlType": "VARCHAR(4)",
            "Name": "TICKER_SYMBOL",
            "Mapping": "$.TICKER_SYMBOL"
        },
        {
            "SqlType": "VARCHAR(16)",
            "Name": "SECTOR",
            "Mapping": "$.SECTOR"
        },
        {
            "SqlType": "TINYINT",
            "Name": "CHANGE",
            "Mapping": "$.CHANGE"
        },
        {
            "SqlType": "DECIMAL(5,2)",
            "Name": "PRICE",
            "Mapping": "$.PRICE"
        }
    ],
    "RecordFormat": {
        "MappingParameters": {
            "JSONMappingParameters": {
                "RecordRowPath": "$"
            }
        },
        "RecordFormatType": "JSON"
    },
    "RecordEncoding": "UTF-8"
}
```

### Exemplo de esquema de entrada CSV
<a name="sf-map-api-csv-example"></a>

O exemplo a seguir demonstra o formato do valor `InputSchema` para um esquema no formato CSV (valores separados por vírgulas).

```
"InputSchema": {
    "RecordColumns": [
        {
            "SqlType": "VARCHAR(16)",
            "Name": "LastName"
        },
        {
            "SqlType": "VARCHAR(16)",
            "Name": "FirstName"
        },
        {
            "SqlType": "INTEGER",
             "Name": "CustomerId"
        }
    ],
    "RecordFormat": {
        "MappingParameters": {
            "CSVMappingParameters": {
                "RecordColumnDelimiter": ",",
                "RecordRowDelimiter": "\n"
            }
        },
        "RecordFormatType": "CSV"
    },
    "RecordEncoding": "UTF-8"
}
```

## Mapeamento de tipos de dados JSON para tipos de dados SQL
<a name="sch-mapping-datatypes"></a>

Os tipos de dados JSON são convertidos em tipos de dados SQL correspondentes de acordo com o esquema de entrada do aplicativo. Para obter informações sobre os tipos de dados SQL com suporte, consulte [Tipos de dados](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html). O Amazon Kinesis Data Analytics converte tipos de dados JSON em tipos de dados SQL de acordo com as seguintes regras.

### Nulo literal
<a name="sch-mapping-datatypes-null"></a>

Um nulo literal no stream de entrada JSON (ou seja, `"City":null`) é convertido em um SQL nulo, independentemente do tipo de dados de destino.

### Booleano literal
<a name="sch-mapping-datatypes-boolean"></a>

Um Booleano literal no stream de entrada JSON (ou seja, `"Contacted":true`) é convertido nos seguintes dados SQL:
+ Numérico (DECIMAL, INT e assim por diante): `true` é convertido em 1; `false` é convertido em 0.
+ Binário (BINARY ou VARBINARY): 
  + `true`: o resultado tem o menor conjunto de bits e os bits restantes são desmarcados.
  + `false`: o resultado tem todos os bits desmarcados.

  A conversão em VARBINARY resulta em um valor de 1 byte de comprimento.
+ BOOLEANO: é convertido no valor SQL BOOLEAN correspondente.
+ Caractere (CHAR ou VARCHAR): é convertido no valor da string correspondente (`true` ou `false`). O valor é truncado para se ajustar ao tamanho do campo.
+ Data e hora (DATE, TIME ou TIMESTAMP): a conversão falha e um erro de coerção é gravado no stream de erro.

### Número
<a name="sch-mapping-datatypes-number"></a>

Um número literal no stream de entrada JSON (ou seja, `"CustomerId":67321`) é convertido nos seguintes dados SQL:
+ Numérico (DECIMAL, INT e assim por diante): é convertido diretamente. Se o valor convertido exceder o tamanho ou a precisão do tipo de dados de destino (ou seja, convertendo `123.4` em INT), a conversão falha e um erro de coerção é gravado no stream de erro. 
+ Binário (BINARY ou VARBINARY): a conversão falha e um erro de coerção é gravado no stream de erro.
+ BOOLEAN: 
  + `0`: é convertido em `false`.
  + Todos os outros números: são convertidos em `true`.
+ Caractere (CHAR ou VARCHAR): é convertido em uma representação do número em string.
+ Data e hora (DATE, TIME ou TIMESTAMP): a conversão falha e um erro de coerção é gravado no stream de erro.

### String
<a name="sch-mapping-datatypes-string"></a>

O valor de uma string no stream de entrada JSON (ou seja, `"CustomerName":"John Doe"`) é convertido nos seguintes dados SQL:
+ Numérico (DECIMAL, INT e assim por diante): o Amazon Kinesis Data Analytics tenta converter o valor no tipo de dados de destino. Se o valor não puder ser convertido, a conversão falha e um erro de coerção será gravado no stream de erro.
+ rio (BINARY ou VARBINARY): se a string de origem for um binário literal (ou seja, `X'3F67A23A'`, com um número par de f), o valor será convertido no tipo de dados de destino. Caso contrário, a conversão falha e um erro de coerção será gravado no stream de erro.
+ BOOLEANO: se a string de origem for `"true"`, será convertida em `true`. Essa comparação diferencia maiúsculas de minúsculas. Caso contrário, será convertida em `false`.
+ Caractere (CHAR ou VARCHAR): é convertido no valor da string na entrada. Se o valor for maior do que o tipo de dados de destino, ele estará truncado e nenhum erro será gravado no stream de erro.
+ Data e hora (DATE, TIME ou TIMESTAMP): se a string de origem estiver em um formato que pode ser convertido no valor de destino, o valor será convertido. Caso contrário, a conversão falha e um erro de coerção será gravado no stream de erro.

  Os formatos de data e hora válidos incluem:
  + "1992-02-14"
  + "1992-02-14 18:35:44.0"

### Matriz ou objeto
<a name="sch-mapping-datatypes-array"></a>

Uma matriz ou objeto no stream de entrada JSON é convertido em dados SQL da seguinte forma:
+ Caractere (CHAR ou VARCHAR): é convertido no texto de origem da matriz ou objeto. Consulte [Acesso a matrizes](about-json-path.md#about-json-path-arrays).
+ Todos os outros tipos de dados: a conversão falha e um erro de coerção é gravado no stream de erro.

Para obter um exemplo de uma matriz JSON, consulte [Trabalhando com JSONPath](about-json-path.md).

## Related Topics
<a name="sch-mapping.Related"></a>
+ [Configuração de entrada do aplicativo](how-it-works-input.md)
+ [Tipos de dados](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html)
+ [Trabalho com o editor de esquema](console-summary-edit-schema.md)
+ [CreateApplication](API_CreateApplication.md)
+ [RecordColumn](API_RecordColumn.md)
+ [SourceSchema](API_SourceSchema.md)

# Usar o recurso de descoberta de esquema em dados em streaming
<a name="sch-dis"></a>

**nota**  
Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics para SQL. Para saber mais, consulte [Limites](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

Fornecer um esquema de entrada que descreva como os registros no mapa de entrada de streaming para um stream no aplicativo pode ser complicado e propenso a erros. Use a API [DiscoverInputSchema](API_DiscoverInputSchema.md) (chamada de *API de descoberta*) para inferir um esquema. Usando amostras aleatórias de registros na fonte de streaming, a API pode inferir um esquema (ou seja, nomes de colunas, tipos de dados e posição do elemento de dados nos dados recebidos). 

**nota**  
Para usar a API de descoberta para gerar um esquema de um arquivo armazenado no Amazon S3, consulte [Usar o recurso de descoberta de esquema em dados estáticos](sch-dis-ref.md). 

O console usa a API Discovery para gerar um esquema para uma fonte de streaming especificada. Usando o console, você também pode atualizar o esquema, incluindo adicionar ou remover colunas, alterar nomes de colunas ou tipos de dados e assim por diante. No entanto, faça alterações cuidadosamente para garantir que um esquema inválido não seja criado. 

Após finalizar um esquema para o stream no aplicativo, será possível usar funções para manipular strings e valores de data e hora. Use essas funções no código do aplicativo ao trabalhar com linhas no stream no aplicativo resultante. Para obter mais informações, consulte [Exemplo: Transformação de valores DateTime](app-string-datetime-manipulation.md).

## Nomeação de colunas durante a descoberta de esquema
<a name="sch-dis-column-names"></a>

Durante a descoberta de esquema, o Amazon Kinesis Data Analytics tenta reter o máximo possível do nome da coluna original da fonte de entrada de streaming, exceto nos seguintes casos:
+ O nome do stream de origem é uma palavra-chave reservada do SQL, como `TIMESTAMP`, `USER`, `VALUES` ou `YEAR`. 
+ O nome da coluna de stream de origem contém caracteres não suportados. Somente letras, números e o caractere de sublinhado (\$1) são suportados.
+ O nome da coluna de stream de origem começa com um número.
+ O nome da coluna de stream de origem tem mais de 100 caracteres.

Se uma coluna for renomeada, o nome da coluna do esquema renomeada começará com `COL_`. Em alguns casos, nenhum nome de coluna original pode ser retido, por exemplo, se todo o nome tiver caracteres não suportados. Nesse caso, a coluna será chamada `COL_#`, com \$1 sendo um número que indica o local da coluna na ordem da coluna.

Após a conclusão da descoberta, você poderá atualizar o esquema usando o console para adicionar ou remover colunas ou alterar nomes de coluna, tipos de dados ou tamanho de dados. 

### Exemplos de nomes de coluna sugeridos pela descoberta
<a name="sch-dis-column-names-examples"></a>


****  

| Nome da coluna de stream de origem | Nome da coluna sugerido pela descoberta | 
| --- | --- | 
|  USER  |  COL\$1USER  | 
|  USER@DOMAIN  |  COL\$1USERDOMAIN  | 
|  @@  |  COL\$10  | 

## Problemas de descoberta de esquema
<a name="sch-dis-when-fails"></a>

O que acontecerá se o Kinesis Data Analytics não inferir um esquema para uma determinada fonte de streaming? 

O Kinesis Data Analytics infere o esquema para formatos comuns, como CSV e JSON, que são codificados em UTF-8. O Kinesis Data Analytics oferece suporte a quaisquer registros codificados em UTF-8 (incluindo texto bruto, como logs e registros de aplicativos) com colunas personalizadas e delimitadores de linhas. Se o Kinesis Data Analytics não inferir um esquema, você poderá definir um esquema manualmente usando o editor de esquema no console (ou usando a API).

 Se os dados não seguirem um padrão (que você pode especificar usando o editor de esquema), defina um esquema como uma única coluna do tipo VARCHAR(N), em que N é o maior número de caracteres que você espera que seu registro inclua. A partir daí, será possível usar a manipulação de strings e de data e hora para estruturar os dados a partir do momento que estiverem no stream no aplicativo. Para obter exemplos, consulte [Exemplo: Transformação de valores DateTime](app-string-datetime-manipulation.md).

# Usar o recurso de descoberta de esquema em dados estáticos
<a name="sch-dis-ref"></a>

**nota**  
Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics para SQL. Para saber mais, consulte [Limites](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

O recurso de descoberta de esquema pode gerar um esquema dos dados em um stream ou em um arquivo estático armazenado em um bucket do Amazon S3. Suponha que você deseje gerar um esquema para um aplicativo do Kinesis Data Analytics para fins de referência ou quando os dados em streaming ao vivo não estiverem disponíveis. Você pode usar o atributo de descoberta de esquema em um arquivo estático que contém uma amostra dos dados no formato esperado de dados em streaming ou de referência. O Kinesis Data Analytics pode executar a descoberta de esquema em dados de amostra de um arquivo JSON ou CSV armazenado em um bucket do Amazon S3. Executar a descoberta de esquema em um arquivo de dados usa o console ou a API [DiscoverInputSchema](API_DiscoverInputSchema.md) com o parâmetro `S3Configuration` especificado.

## Execução de descoberta de esquema usando o console
<a name="sch-dis-ref-console"></a>

Para executar a descoberta em um arquivo estático usando o console, faça o seguinte:

1. Adicione um objeto de dados de referência a um bucket do S3.

1. Selecione **Conectar dados de referência** na página principal do aplicativo no console do Kinesis Data Analytics.

1. Forneça os dados do bucket, do caminho e do perfil do IAM para acessar o objeto do Amazon S3 que contém os dados de referência.

1. Escolha **Discover schema (Descobrir esquema)**.

Para obter mais informações sobre como adicionar dados de referência e descobrir o esquema no console, consulte [Exemplo: adição de dados de referência a um aplicativo do Kinesis Data Analytics](app-add-reference-data.md).

## Execução de descoberta de esquema usando a API
<a name="sch-dis-ref-api"></a>

Para executar a descoberta em um arquivo estático usando a API, forneça à API uma estrutura `S3Configuration` com as seguintes informações:
+ `BucketARN`: o nome do recurso da Amazon (ARN) do bucket do Amazon S3 que contém o arquivo. Para o formato de um ARN de bucket do Amazon S3, consulte Amazon [Resource Names () ARNs e Amazon Service Namespaces: Amazon Simple Storage Service (Amazon S3](https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-s3)).
+ `RoleARN`: o ARN de um perfil do IAM com a política `AmazonS3ReadOnlyAccess`. Para obter mais informações sobre como adicionar uma política a um perfil, consulte [Modificação de um perfil](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html).
+ `FileKey`: o nome de arquivo do objeto.

**Para gerar um esquema de um objeto do Amazon S3 usando a API `DiscoverInputSchema`**

1. Verifique se você tem a AWS CLI configuração. Para obter mais informações, consulte [Etapa 2: configurar o AWS Command Line Interface (AWS CLI)](setup-awscli.md) na seção Conceitos básicos.

1. Crie um arquivo denominado `data.csv` com o conteúdo a seguir:

   ```
   year,month,state,producer_type,energy_source,units,consumption
   2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615
   2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535
   2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890
   2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601
   2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681
   ```

1. Faça login no console do Amazon S3 em. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Crie um bucket do Amazon S3 e faça upload do arquivo `data.csv` que você criou. Anote o ARN do bucket criado. Para obter informações sobre como criar um bucket do Amazon S3 e fazer upload de um arquivo, consulte [Conceitos básicos do Amazon Simple Storage Service](https://docs.aws.amazon.com/AmazonS3/latest/userguide/GetStartedWithS3.html). 

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/). Crie uma função com a política `AmazonS3ReadOnlyAccess`. Anote o ARN da nova função. Para obter informações sobre como criar um perfil, consulte [Criação de um perfil para delegar permissões a um serviço Amazon](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html). Para obter mais informações sobre como adicionar uma política a um perfil, consulte [Modificação de um perfil](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html).

1. Execute o seguinte `DiscoverInputSchema` comando no AWS CLI, substituindo o bucket do ARNs Amazon S3 e a função do IAM:

   ```
   $aws kinesisanalytics discover-input-schema --s3-configuration '{ "RoleARN": "arn:aws:iam::123456789012:role/service-role/your-IAM-role", "BucketARN": "arn:aws:s3:::your-bucket-name", "FileKey": "data.csv" }' 
   ```

1. A resposta é semelhante ao seguinte:

   ```
   {
       "InputSchema": {
           "RecordEncoding": "UTF-8",
           "RecordColumns": [
               {
                   "SqlType": "INTEGER",
                   "Name": "COL_year"
               },
               {
                   "SqlType": "INTEGER",
                   "Name": "COL_month"
               },
               {
                   "SqlType": "VARCHAR(4)",
                   "Name": "state"
               },
               {
                   "SqlType": "VARCHAR(64)",
                   "Name": "producer_type"
               },
               {
                   "SqlType": "VARCHAR(4)",
                   "Name": "energy_source"
               },
               {
                   "SqlType": "VARCHAR(16)",
                   "Name": "units"
               },
               {
                   "SqlType": "INTEGER",
                   "Name": "consumption"
               }
           ],
           "RecordFormat": {
               "RecordFormatType": "CSV",
               "MappingParameters": {
                   "CSVMappingParameters": {
                       "RecordRowDelimiter": "\r\n",
                       "RecordColumnDelimiter": ","
                   }
               }
           }
       },
       "RawInputRecords": [
           "year,month,state,producer_type,energy_source,units,consumption\r\n2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615\r\n2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535\r\n2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890\r\n2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601\r\n2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681"
       ],
       "ParsedInputRecords": [
           [
               null,
               null,
               "state",
               "producer_type",
               "energy_source",
               "units",
               null
           ],
           [
               "2001",
               "1",
               "AK",
               "TotalElectricPowerIndustry",
               "Coal",
               "ShortTons",
               "47615"
           ],
           [
               "2001",
               "1",
               "AK",
               "ElectricGeneratorsElectricUtilities",
               "Coal",
               "ShortTons",
               "16535"
           ],
           [
               "2001",
               "1",
               "AK",
               "CombinedHeatandPowerElectricPower",
               "Coal",
               "ShortTons",
               "22890"
           ],
           [
               "2001",
               "1",
               "AL",
               "TotalElectricPowerIndustry",
               "Coal",
               "ShortTons",
               "3020601"
           ],
           [
               "2001",
               "1",
               "AL",
               "ElectricGeneratorsElectricUtilities",
               "Coal",
               "ShortTons",
               "2987681"
           ]
       ]
   }
   ```

# Pré-processar dados usando uma função do Lambda
<a name="lambda-preprocessing"></a>

**nota**  
Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics para SQL. Para saber mais, consulte [Limites](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

Se os dados em seu stream precisarem de conversão, transformação, enriquecimento ou filtragem de formato, você poderá pré-processar os dados usando uma função. AWS Lambda Você pode fazer isso antes que o código SQL do aplicativo seja executado ou antes que seu aplicativo crie um esquema para seu streaming de dados. 

Usar uma função do Lambda para pré-processamento de registros é útil nos seguintes cenários:
+ Transformar registros de outros formatos (como KPL ou GZIP) em formatos que o Kinesis Data Analytics pode analisar. O Kinesis Data Analytics atualmente é compatível com formatos de dados JSON ou CSV.
+ Expandir dados em um formato mais acessível para operações como agregação ou detecção de anomalias. Por exemplo, se vários valores de dados são armazenados juntos em uma string, você pode expandir os dados em colunas separadas.
+ Enriquecer dados com outros serviços da Amazon, como extrapolação ou correção de erros.
+ Aplicar transformação complexa de string em campos de registro.
+ Filtrar dados para limpar os dados.

## Usar uma função do Lambda para pré-processamento de registros
<a name="lambda-preprocessing-use"></a>

Ao criar o aplicativo do Kinesis Data Analytics, você habilita o pré-processamento do Lambda na página **Conectar a uma fonte**.

**Para usar uma função do Lambda para pré-processar registros em um aplicativo do Kinesis Data Analytics**

1. [Faça login Console de gerenciamento da AWS e abra o console do Managed Service for Apache Flink em https://console.aws.amazon.com /kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Na página **Conectar a uma fonte** do seu aplicativo, escolha **Habilitada** na seção **Registrar pré-processamento com AWS Lambda**.

1. Para usar uma função do Lambda que você já tenha criado, escolha a função na lista suspensa **Função do Lambda**.

1. Para criar uma nova função do Lambda de um dos modelos de pré-processamento do Lambda, escolha o modelo na lista suspensa. Em seguida, escolha **View template name > in Lambda (Exibir <nome do modelo> no Lambda)** para editar a função.

1. Para criar uma nova função do Lambda, selecione **Criar nova**. *Para obter informações sobre a criação de uma função Lambda, consulte [Criar uma função HelloWorld Lambda e explorar o console no Guia do desenvolvedor](https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html).AWS Lambda *

1. Escolha a versão da função do Lambda a ser usada. Para usar a versão mais recente, escolha **\$1LATEST**.

Quando você escolhe ou cria uma função do Lambda para pré-processamento de registros, os registros são pré-processados antes da execução do código SQL do seu aplicativo ou o aplicativo gera um esquema dos registros.

## Permissões de pré-processamento do Lambda
<a name="lambda-preprocessing-policy"></a>

Para usar o pré-processamento do Lambda, o perfil do IAM do aplicativo requer a seguinte política de permissões:

```
     {
       "Sid": "UseLambdaFunction",
       "Effect": "Allow",
       "Action": [
           "lambda:InvokeFunction",
           "lambda:GetFunctionConfiguration"
       ],
       "Resource": "<FunctionARN>"
   }
```

## Métricas de pré-processamento do Lambda
<a name="lambda-preprocessing-metrics"></a>

Você pode usar CloudWatch a Amazon para monitorar o número de invocações do Lambda, bytes processados, sucessos e falhas, e assim por diante. [Para obter informações sobre CloudWatch métricas emitidas pelo pré-processamento do Kinesis Data Analytics Lambda, consulte Métricas do Amazon Kinesis Analytics.](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)

## Usando AWS Lambda com a Kinesis Producer Library
<a name="lambda-preprocessing-deaggregation"></a>

A [Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) (KPL) agrega pequenos registros formatados pelo usuário em registros maiores de até 1 MB para usar melhor a throughput do Amazon Kinesis Data Streams. A Kinesis Client Library (KCL) para Java é compatível com a desagregação desses registros. No entanto, você deve usar um módulo especial para desagregar os registros ao usar AWS Lambda como consumidor de seus streams. 

Para obter o código e as instruções do projeto necessários, consulte os Módulos de [desagregação da Kinesis Producer Library](https://github.com/awslabs/kinesis-deaggregation) para saber mais. AWS Lambda GitHub Você pode usar os componentes desse projeto para processar dados serializados KPL AWS Lambda em Java, Node.js e Python. Você também pode usar esses componentes como parte de um [aplicativo da KCL de várias linguagens](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java).

## Modelo de Model/Record resposta de dados de entrada de eventos de pré-processamento de dados
<a name="lambda-preprocessing-data-model"></a>

Para pré-processar registros, sua função do Lambda precisa estar em conformidade com os modelos de dados de entrada de eventos e modelos de resposta de registros exigidos. 

### Modelo de dados de entrada de eventos
<a name="lambda-preprocessing-request-model"></a>

O Kinesis Data Analytics lê continuamente os dados do fluxo de dados do Kinesis ou fluxo de entrega do Firehose. Para cada lote de registros recuperado, o serviço gerencia como esse lote é enviado à sua função do Lambda. Sua função recebe uma lista de registros como entrada. Dentro da função, você segue a lista e aplica a lógica de negócios para cumprir requisitos de pré-processamento (como enriquecimento ou conversão de formato de dados). 

O modelo de entrada da sua função de pré-processamento varia um pouco se os dados são recebidos de um fluxo de dados do Kinesis ou de um fluxo de entrega do Firehose. 

Se a origem for um fluxo de entrega do Firehose, o modelo de dados de entrada de eventos será o seguinte:

**Modelo de solicitação de dados do Kinesis Data Firehose**


| Campo | Description | 
| --- | --- | 
| Campo | Description | 
| --- | --- | 
| Campo | Description | 
| --- | --- | 
| invocationId | O Id de invocação do Lambda (GUID aleatório). | 
| applicationArn | O nome do recurso da Amazon (ARN) do aplicativo Kinesis Data Analytics | 
| streamArn | ARN do fluxo de entrega | 
| registros [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | ID de registro (GUID aleatório) | 
| kinesisFirehoseRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Carga útil de registros de origem codificada em Base64 | 
| approximateArrivalTimestamp | Tempo de chegada aproximado do registro do fluxo de entrega | 

O exemplo a seguir mostra a entrada de um fluxo de entrega do Firehose:

```
{
   "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
   "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
   "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test",
   "records":[
      {
         "recordId":"49572672223665514422805246926656954630972486059535892482",
         "data":"aGVsbG8gd29ybGQ=",
         "kinesisFirehoseRecordMetadata":{
            "approximateArrivalTimestamp":1520280173
         }
      }
   ]
}
```

Se a origem for um fluxo de dados do Kinesis, o modelo de dados de entrada de eventos será o seguinte:

**Modelo de solicitação de dados do Kinesis Streams**


| Campo | Description | 
| --- | --- | 
| Campo | Description | 
| --- | --- | 
| Campo | Description | 
| --- | --- | 
| invocationId | O Id de invocação do Lambda (GUID aleatório). | 
| applicationArn | ARN do aplicativo Kinesis Data Analytics | 
| streamArn | ARN do fluxo de entrega | 
| registros [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | ID de registro baseado no número de sequência de registro do Kinesis | 
| kinesisStreamRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| dados | Carga útil de registros de origem codificada em Base64 | 
| sequenceNumber | Número de sequência do registro do stream do Kinesis | 
| partitionKey | Chave de partição do registro do stream do Kinesis | 
| shardId | ShardId do registro do fluxo do Kinesis | 
| approximateArrivalTimestamp | Tempo de chegada aproximado do registro do fluxo de entrega | 

O exemplo a seguir mostra a entrada de um streaming de dados do Kinesis:

```
{
  "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
  "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
  "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test",
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "data": "aGVsbG8gd29ybGQ=",
      "kinesisStreamRecordMetadata":{
            "shardId" :"shardId-000000000003",
            "partitionKey":"7400791606",
            "sequenceNumber":"49572672223665514422805246926656954630972486059535892482",
            "approximateArrivalTimestamp":1520280173
         }
    }
  ]
}
```

### Modelo de resposta de registros
<a name="lambda-preprocessing-response-model"></a>

Todos os registros retornados da sua função de pré-processamento do Lambda (com registro IDs) que são enviados para a função Lambda devem ser retornados. Eles devem conter os seguintes parâmetros, ou o Kinesis Data Analytics os rejeita e aponta uma falha de pré-processamento de dados. A parte de carga útil de dados do registro pode ser transformada para cumprir requisitos de pré-processamento.

**Modelo de resposta de dados**


| Campo | Description | 
| --- | --- | 
| registros [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | O ID do registro é transmitido do Kinesis Data Analytics para o Lambda durante a invocação. O registro transformado deve conter o mesmo ID de registro. Qualquer incompatibilidade entre o ID do registro original e o ID do registro transformado é considerada uma falha de pré-processamento de dados. | 
| result | O status da transformação de dados do registro. Os valores possíveis são: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | A carga útil dos dados transformados, após a codificação base64. Cada carga de dados pode conter vários documentos JSON se o formato de dados de ingestão do aplicativo for JSON. Ou cada carga pode conter várias linhas CSV (com um delimitador de linha especificado em cada linha) se o formato de dados de ingestão do aplicativo for CSV. O serviço Kinesis Data Analytics analisa e processa dados com êxito com vários documentos JSON ou linhas CSV na mesma carga útil de dados.  | 

O exemplo a seguir mostra a saída de uma função Lambda:

```
{
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "result": "Ok",
      "data": "SEVMTE8gV09STEQ="
    }
  ]
}
```

## Falhas de pré-processamento de dados comuns
<a name="lambda-preprocessing-failures"></a>

Veja a seguir os motivos comuns pelos quais o pré-processamento pode falhar.
+ Nem todos os registros (com registro IDs) em um lote que são enviados para a função Lambda são retornados ao serviço Kinesis Data Analytics. 
+ A resposta não tem o ID do registro, o status ou o campo de carga útil de dados. O campo de carga útil de dados é opcional para um registro `Dropped` ou `ProcessingFailed`.
+ O tempo limite da função do Lambda não é suficiente para pré-processar os dados.
+ A resposta da função do Lambda excede os limites de resposta impostos pelo serviço AWS Lambda .

Para falhas de processamento de dados, o Kinesis Data Analytics continua a fazer novas tentativas de invocação do Lambda para o mesmo conjunto de registros até que tenha êxito. Você pode monitorar as CloudWatch métricas a seguir para obter informações sobre falhas.
+ Aplicativo do Kinesis Data Analytics `MillisBehindLatest`: indica o tempo de atraso do aplicativo na leitura da origem de streaming. 
+ Métricas do `InputPreprocessing` CloudWatch aplicativo Kinesis Data Analytics: indica o número de sucessos e fracassos, entre outras estatísticas. Para obter mais informações, consulte [Métricas do Amazon Kinesis Analytics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html).
+ AWS Lambda CloudWatch métricas e registros de funções.

# Criar funções do Lambda para pré-processamento
<a name="lambda-preprocessing-functions"></a>

O aplicativo Amazon Kinesis Data Analytics pode usar funções do Lambda para pré-processamento de registros à medida que são ingeridos no aplicativo. O Kinesis Data Analytics fornece os modelos a seguir no console para usar como ponto de partida para pré-processamento de seus dados.

**Topics**
+ [Criar uma função do Lambda de pré-processamento em Node.js](#lambda-preprocessing-functions-nodejs)
+ [Criar uma função do Lambda de pré-processamento em Python](#lambda-preprocessing-functions-python)
+ [Criar uma função do Lambda de pré-processamento em Java](#lambda-preprocessing-functions-java)
+ [Criar uma função do Lambda de pré-processamento em .NET](#lambda-preprocessing-functions-net)

## Criar uma função do Lambda de pré-processamento em Node.js
<a name="lambda-preprocessing-functions-nodejs"></a>

Os modelos a seguir para criar uma função do Lambda de pré-processamento em Node.js estão disponíveis no console do Kinesis Data Analytics:


| Lambda Blueprint | Linguagem e versão | Description | 
| --- | --- | --- | 
| Processamento de entrada geral do Kinesis Data Analytics  | Node.js 6.10 |  Um pré-processador de registros do Kinesis Data Analytics que recebe registros JSON ou CSV como entrada e os retorna com um status de processamento. Use esse processador como ponto de partida para a lógica de transformação personalizada.  | 
| Processamento de entrada compactado | Node.js 6.10 | Um processador de registros do Kinesis Data Analytics que recebe registros JSON ou CSV compactados (GZIP ou Deflate) como entrada e retorna registros descompactados com um status de processamento. | 

## Criar uma função do Lambda de pré-processamento em Python
<a name="lambda-preprocessing-functions-python"></a>

Os modelos a seguir para criar uma função do Lambda de pré-processamento em Python estão disponíveis no console:


| Lambda Blueprint | Linguagem e versão | Description | 
| --- | --- | --- | 
| Processamento de entrada geral do Kinesis Analytics  | Python 2.7 |  Um pré-processador de registros do Kinesis Data Analytics que recebe registros JSON ou CSV como entrada e os retorna com um status de processamento. Use esse processador como ponto de partida para a lógica de transformação personalizada.  | 
| Processamento de entrada do KPL | Python 2.7 | Um processador de registros do Kinesis Data Analytics que recebe agregados de registros JSON ou CSV do Kinesis Producer Library (KPL) como entrada e retorna registros desagregados com um status de processamento.  | 

## Criar uma função do Lambda de pré-processamento em Java
<a name="lambda-preprocessing-functions-java"></a>

Para criar uma função do Lambda em Java para pré-processar registros, use as classes de [eventos Java](https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events).

O código a seguir demonstra um exemplo de função do Lambda que pré-processa registros usando Java:

```
public class LambdaFunctionHandler implements
        RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> {

    @Override
    public KinesisAnalyticsInputPreprocessingResponse handleRequest(
            KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) {
        context.getLogger().log("InvocatonId is : " + event.invocationId);
        context.getLogger().log("StreamArn is : " + event.streamArn);
        context.getLogger().log("ApplicationArn is : " + event.applicationArn);

        List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>();
        KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records);

        event.records.stream().forEach(record -> {
            context.getLogger().log("recordId is : " + record.recordId);
            context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp);
             // Add your record.data pre-processing logic here.                               
            // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>));
        });
        return response;
    }

}
```

## Criar uma função do Lambda de pré-processamento em .NET
<a name="lambda-preprocessing-functions-net"></a>

Para criar uma função do Lambda em .NET para pré-processar registros, use as classes de [eventos .NET](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents).

O código a seguir demonstra um exemplo de função do Lambda que pré-processa registros usando C\$1:

```
public class Function
    {
        public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context)
        {
            context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
            context.Logger.LogLine($"StreamArn: {evnt.StreamArn}");
            context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");

            var response = new KinesisAnalyticsInputPreprocessingResponse
            {
                Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>()
            };

            foreach (var record in evnt.Records)
            {
                context.Logger.LogLine($"\tRecordId: {record.RecordId}");
                context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}");
                context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}");
                context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}");
                context.Logger.LogLine($"\tData: {record.DecodeData()}");

                // Add your record preprocessig logic here.

                var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record
                {
                    RecordId = record.RecordId,
                    Result = KinesisAnalyticsInputPreprocessingResponse.OK
                };
                preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
                response.Records.Add(preprocessedRecord);
            }
            return response;
        }
    }
```

Para obter mais informações sobre como criar funções do Lambda para pré-processamento e destinos em .NET, consulte [https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents).

# Paralelização dos streams de entrada para aumentar a taxa de transferência
<a name="input-parallelism"></a>

**nota**  
Depois de 12 de setembro de 2023, você não poderá criar novos aplicativos usando o Kinesis Data Firehose como fonte se ainda não estiver usando o Kinesis Data Analytics para SQL. Para saber mais, consulte [Limites](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html).

As aplicações do Amazon Kinesis Data Analytics podem oferecer suporte a vários fluxos de entrada na aplicação para escalar uma aplicação além do throughput de um único fluxo de entrada na aplicação. Para obter mais informações sobre streams de entrada no aplicativo, consulte [Amazon Kinesis Data Analytics para aplicativos SQL: como funciona](how-it-works.md).

Em quase todos os casos, o Amazon Kinesis Data Analytics escala a aplicação para lidar com a capacidade dos fluxos do Kinesis ou fluxos de origem do Firehose que alimentam a aplicação. No entanto, se a taxa de transferência do stream de origem exceder a taxa de transferência de um único stream de entrada no aplicativo, aumente explicitamente o número de streams de entrada no aplicativo usado pelo aplicativo. Isso é feito com o parâmetro `InputParallelism`.

Quando o parâmetro `InputParallelism` for maior que um, o Amazon Kinesis Data Analytics divide uniformemente as partições do stream de origem entre os streams no aplicativo. Por exemplo, se o fluxo de origem tiver 50 estilhaços e você definiu `InputParallelism` para `2`, cada fluxo de entrada no aplicativo receberá a entrada de 25 estilhaços do fluxo de origem. 

Quando você aumentar o número de streams no aplicativo, este deverá acessar os dados em cada stream explicitamente. Para obter informações sobre como acessar vários fluxos no aplicativo no seu código, consulte [Acesso a streams no aplicativo separados no aplicativo do Amazon Kinesis Data Analytics](#input-parallelism-code-example).

Embora os fragmentos de fluxo do Kinesis Data Streams e do Firehose sejam divididos entre fluxos na aplicação da mesma maneira, a forma como eles aparecem na aplicação difere:
+ Os registros de um fluxo de dados do Kinesis incluem um campo `shard_id` que pode ser usado para identificar o fragmento de origem do registro.
+ Os registros de um fluxo de entrega do Firehose não incluem um campo que identifica o fragmento ou a partição de origem do registro. Isso ocorre porque o Firehose abstrai essas informações da aplicação.

## Avaliações para saber se o número de streams de entrada no aplicativo deve ser aumentado
<a name="input-parallelism-evaluating"></a>

Na maioria dos casos, um único stream de entrada no aplicativo pode lidar com a taxa de transferência de um único stream de origem, de acordo com a complexidade e o volume de dados dos streams de entrada. Para determinar se você precisa aumentar o número de fluxos de entrada no aplicativo, você pode monitorar as `MillisBehindLatest` métricas `InputBytes` e na Amazon. CloudWatch 

Se a `InputBytes` métrica for maior que 100 MB/sec (ou você prevê que será maior do que essa taxa), isso pode causar um aumento `MillisBehindLatest` e aumentar o impacto dos problemas do aplicativo. Para resolver essa situação, recomendamos que sejam feitas as seguintes escolhas de linguagem para o aplicativo:
+ Use vários fluxos e aplicativos Kinesis Data Analytics para SQL se o aplicativo tiver necessidades de escalar além de 100 MB/segundo.
+ Use [Kinesis Data Analytics para aplicativos Java](/managed-flink/latest/java/what-is.html) se quiser usar um único fluxo e aplicativo.

Se a métrica `MillisBehindLatest` tiver uma das seguintes características, aumente a configuração `InputParallelism` do seu aplicativo:
+ A métrica `MillisBehindLatest` aumentar gradualmente, indicando que o aplicativo está atrasado em relação aos dados mais recentes no stream.
+ A métrica `MillisBehindLatest` estiver consistentemente acima de 1.000 (um segundo).

Não será preciso aumentar a configuração `InputParallelism` do aplicativo se:
+ A métrica `MillisBehindLatest` diminuir gradualmente, indicando que o aplicativo está alcançando os dados mais recentes no stream.
+ A métrica `MillisBehindLatest` estiver abaixo de 1.000 (um segundo).

Para obter mais informações sobre o uso CloudWatch, consulte o [Guia CloudWatch do usuário](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/).

## Implementação de vários streams de entrada no aplicativo
<a name="input-parallelism-implementing"></a>

Defina o número de streams de entrada no aplicativo quando um aplicativo for criado usando [CreateApplication](API_CreateApplication.md). Defina esse número após criar um aplicativo usando [UpdateApplication](API_UpdateApplication.md). 

**nota**  
Você só pode definir a configuração `InputParallelism` usando a API do Amazon Kinesis Data Analytics ou o. AWS CLI Você não pode definir essa configuração usando Console de gerenciamento da AWS o. Para obter informações sobre como configurar o AWS CLI, consulte[Etapa 2: configurar o AWS Command Line Interface (AWS CLI)](setup-awscli.md).

### Configuração da contagem do stream de entrada de um aplicativo novo
<a name="input-parallelism-implementing-create"></a>

O exemplo a seguir demonstra como usar a ação da API `CreateApplication` para definir a contagem do stream de entrada de um aplicativo novo para 2. 

Para saber mais sobre o `CreateApplication`, consulte [CreateApplication](API_CreateApplication.md).

```
{
   "ApplicationCode": "<The SQL code the new application will run on the input stream>",
   "ApplicationDescription": "<A friendly description for the new application>",
   "ApplicationName": "<The name for the new application>",
   "Inputs": [ 
    { 
      "InputId": "ID for the new input stream",
      "InputParallelism": { 
        "Count": 2
    }],
   "Outputs": [ ... ],
	}]
}
```

### Configuração da contagem do stream de entrada de um aplicativo existente
<a name="input-parallelism-implementing-update"></a>

O exemplo a seguir demonstra como usar a ação da API `UpdateApplication` para definir a contagem do stream de entrada de um aplicativo existente para 2.

Para saber mais sobre o `Update_Application`, consulte [UpdateApplication](API_UpdateApplication.md).

```
{
   "InputUpdates": [ 
      { 
         "InputId": "yourInputId",
         "InputParallelismUpdate": { 
            "CountUpdate": 2
         }
      }
   ],
}
```

## Acesso a streams no aplicativo separados no aplicativo do Amazon Kinesis Data Analytics
<a name="input-parallelism-code-example"></a>

Para usar vários streams de entrada no aplicativo em seu aplicativo, selecione explicitamente entre streams diferentes. O exemplo de código a seguir demonstra como consultar vários streams de entrada no aplicativo criado no tutorial de Conceitos básicos. 

No exemplo a seguir, cada fluxo de origem é agregado usando [COUNT](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-count.html) antes de serem combinados em um único fluxo no aplicativo chamado `in_application_stream001`. Agregar streams de origem de antemão ajuda a garantir que o stream no aplicativo combinado lide com o tráfego de vários streams sem ficar sobrecarregado. 

**nota**  
Para executar esse exemplo e obter resultados de ambos os fluxos de entrada no aplicativo, atualize o número de estilhaços do fluxo de origem e o parâmetro `InputParallelism` do aplicativo.

```
CREATE OR REPLACE STREAM in_application_stream_001 (
    ticker VARCHAR(64),
    ticker_count INTEGER
);

CREATE OR REPLACE PUMP pump001 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_001
GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol; 
        
CREATE OR REPLACE PUMP pump002 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_002
GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol;
```

O exemplo de código anterior produz saída em `in_application_stream001` semelhante a:

![\[Table showing ROWTIME, TICKER, and TICKER_COUNT columns with sample data entries.\]](http://docs.aws.amazon.com/pt_br/kinesisanalytics/latest/dev/images/input-parallelism-results.png)


## Considerações adicionais
<a name="input-parallelism-considerations"></a>

Ao usar vários streams de entrada, esteja ciente do seguinte:
+ O número máximo de streams de entrada no aplicativo é 64.
+ Os streams de entrada no aplicativo são distribuídos uniformemente entre os estilhaços do stream de entrada do aplicativo.
+ Os ganhos de desempenho da adição de streams no aplicativo não escalam de forma linear. Ou seja, dobrar o número de streams no aplicativo não dobra a taxa de transferência. Com um tamanho de linha típico, cada stream no aplicativo pode alcançar a taxa de transferência de aproximadamente 5.000 a 15.000 linhas por segundo. Ao aumentar a contagem de streams no aplicativo para 10, é possível alcançar uma taxa de transferência de 20.000 a 30.000 linhas por segundo. A velocidade da taxa de transferência depende da contagem, tipos de dados e volume de dados de campos no fluxo de entrada.
+ Algumas funções de agregação (como [AVG](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-avg.html)) podem produzir resultados inesperados quando aplicadas a fluxos de entrada particionados em diferentes estilhaços. Como é necessário executar a operação agregada em estilhaços individuais combinando-os em um stream agregado, os resultados podem ser ponderados em relação ao stream que contiver mais registros.
+ Se seu aplicativo continuar apresentando baixo desempenho (refletido por uma `MillisBehindLatest` métrica alta) depois de aumentar o número de fluxos de entrada, você pode ter atingido o limite de unidades KPUs de processamento do Kinesis (). Para obter mais informações, consulte [Escalabilidade automática de aplicativos para aumentar a taxa de transferência](how-it-works-autoscaling.md).