

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Managed Service for Apache Flink: como funciona
<a name="how-it-works"></a>

O Managed Service for Apache Flink é um serviço Amazon totalmente gerenciado que permite usar um aplicativo Apache Flink para processar dados de transmissão. Em primeiro lugar, programe o aplicativo Apache Flink e, em seguida, crie seu aplicativo Managed Service for Apache Flink.

## Programe seu aplicativo Apache Flink
<a name="how-it-works-programming"></a>

Um aplicativo Apache Flink é um aplicativo Java ou Scala criado com a estrutura Apache Flink. Você cria e constrói seu aplicativo Apache Flink localmente. 

Os aplicativos usam principalmente a [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html) ou a [API de tabela](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/). Os outros Apache Flink também APIs estão disponíveis para você usar, mas são menos usados na criação de aplicativos de streaming.

As características dos dois APIs são as seguintes:

### DataStream API
<a name="how-it-works-prog-datastream"></a>

O modelo de programação da DataStream API Apache Flink é baseado em dois componentes:
+ **Fluxo de dados:** a representação estruturada de um fluxo contínuo de registros de dados.
+ **Operador de transformação:** usa um ou mais fluxos de dados como entrada e produz um ou mais fluxos de dados como saída.

Os aplicativos criados com a DataStream API fazem o seguinte:
+ Leem dados de uma fonte de dados (como um fluxo do Kinesis ou um tópico do Amazon MSK).
+ Aplicam transformações aos dados, como filtragem, agregação ou enriquecimento.
+ Gravam os dados transformados em um coletor de dados.

Os aplicativos que usam a DataStream API podem ser escritos em Java ou Scala e podem ser lidos de um stream de dados do Kinesis, de um tópico do Amazon MSK ou de uma fonte personalizada.

Seu aplicativo processa dados usando um *conector*. O Apache Flink usa os tipos de conectores a seguir: 
+ **Fonte**: um conector usado para ler dados externos.
+ **Coletor**: um conector usado para gravar em locais externos. 
+ **Operador**: um conector usado para processar dados dentro do aplicativo.

Um aplicativo típico consiste em pelo menos um fluxo de dados com uma fonte, um fluxo de dados com um ou mais operadores e pelo menos um coletor de dados.

Para obter mais informações sobre como usar a DataStream API, consulte[Revise os componentes DataStream da API](how-datastream.md).

### API de tabela
<a name="how-it-works-prog-table"></a>

O modelo de programação da API de tabela do Apache Flink é baseado nos seguintes componentes:
+ **Ambiente de tabela:** uma interface para dados subjacentes usado para criar e hospedar uma ou mais tabelas. 
+ **Tabela:** um objeto que fornece acesso a uma tabela ou exibição SQL.
+ **Fonte da tabela:** usada para ler dados de uma fonte externa, como um tópico do Amazon MSK.
+ **Função de tabela:** uma consulta SQL ou chamada de API usada para transformar dados.
+ **Coletor de tabela:** usado para gravar dados em um local externo, como um bucket do Amazon S3.

Os aplicativos criados com a API de tabela fazem o seguinte:
+ Criam um `TableEnvironment` conectando-se a um `Table Source`. 
+ Criam uma tabela no `TableEnvironment` usando as funções de consultas SQL ou API de tabela.
+ Executam uma consulta na tabela usando a API de tabela ou SQL
+ Aplicam transformações nos resultados da consulta usando funções de API de tabela ou consultas SQL.
+ Gravam os resultados da consulta ou função em um `Table Sink`.

Os aplicativos que usam a API de tabela podem ser escritos em Java ou Scala e podem consultar dados usando chamadas de API ou consultas SQL. 

Para obter mais informações sobre como usar a API de tabela, consulte [Analisar os componentes da API Table](how-table.md).

## Crie seu aplicativo Managed Service for Apache Flink
<a name="how-it-works-app"></a>

O Managed Service for Apache Flink é um AWS serviço que cria um ambiente para hospedar seu aplicativo Apache Flink e fornece as seguintes configurações:
+ **[Use as propriedades de runtime](how-properties.md):** parâmetros que você pode fornecer ao seu aplicativo. Você pode alterar esses parâmetros sem recompilar o código do aplicativo.
+ **[Implemente a tolerância a falhas](how-fault.md)**: como seu aplicativo se recupera de interrupções e reinicializações.
+ **[Registro em log e monitoramento no Amazon Managed Service for Apache Flink](monitoring-overview.md)**: como seu aplicativo registra eventos no CloudWatch Logs. 
+ **[Implemente a escalabilidade de aplicativos](how-scaling.md)**: como seu aplicativo provisiona recursos de computação.

Você pode criar seu aplicativo Managed Service for Apache Flink usando o console ou o AWS CLI. Para começar a criar um aplicativo Managed Service for Apache Flink, consulte [Tutorial: Comece a usar a DataStream API no Managed Service para Apache Flink](getting-started.md).

# Crie um aplicativo Managed Service for Apache Flink
<a name="how-creating-apps"></a>

Este tópico contém informações sobre como criar um aplicativo Managed Service for Apache Flink.

**Topics**
+ [Compile seu código de aplicativo Managed Service for Apache Flink](#how-creating-apps-building)
+ [Crie seu aplicativo Managed Service for Apache Flink](#how-creating-apps-creating)
+ [Use chaves gerenciadas pelo cliente.](#how-creating-apps-use-cmk)
+ [Inicie seu aplicativo Managed Service for Apache Flink](#how-creating-apps-starting)
+ [Verifique seu aplicativo Managed Service for Apache Flink](#how-creating-apps-verifying)
+ [Habilite reversões de sistema para seu aplicativo Managed Service for Apache Flink](how-system-rollbacks.md)

## Compile seu código de aplicativo Managed Service for Apache Flink
<a name="how-creating-apps-building"></a>

Esta seção descreve os componentes que você usa para criar o código do aplicativo Managed Service for Apache Flink. 

Recomendamos que você use a versão mais recente suportada do Apache Flink para o seu código do aplicativo. Para obter informações sobre a atualização de aplicativos Managed Service for Apache Flink, consulte [Use atualizações de versão in-place para o Apache Flink](how-in-place-version-upgrades.md). 

Você cria o código do seu aplicativo usando o [Apache Maven](https://maven.apache.org/). Um projeto Apache Maven usa um arquivo `pom.xml` para especificar as versões dos componentes que ele usa. 

**nota**  
O Managed Service for Apache Flink suporta arquivos JAR de até 512 MB de tamanho. Se você usar um arquivo JAR maior do que isso, seu aplicativo falhará ao iniciar.

Agora, os aplicativos podem usar a API Java de qualquer versão do Scala. Você precisará empacotar a biblioteca padrão Scala de sua escolha em seus aplicativos Scala.

Para obter informações sobre como criar um aplicativo Managed Service for Apache Flink que usa **Apache Beam**, consulte [Use o Apache Beam com aplicativos Managed Service for Apache Flink](how-creating-apps-beam.md).

### Especifique a versão do Apache Flink do seu aplicativo
<a name="how-creating-apps-building-flink"></a>

Ao usar o runtime do Managed Service for Apache Flink versão 1.1.0 e posterior, você especifica a versão do Apache Flink que seu aplicativo usa ao compilar seu aplicativo. Você fornece a versão do Apache Flink com o parâmetro `-Dflink.version`. Por exemplo, se você estiver usando o Apache Flink 2.2.0, forneça o seguinte:

```
mvn package -Dflink.version=2.2.0
```

Para compilar aplicativos com versões mais antigas do Apache Flink, consulte [Versões anteriores](earlier.md).

## Crie seu aplicativo Managed Service for Apache Flink
<a name="how-creating-apps-creating"></a>

Depois de compilar o código do aplicativo, faça o seguinte para criar seu aplicativo Managed Service for Apache Flink (Amazon MSF):
+ **Faça upload do código do aplicativo**: faça upload do código do aplicativo em um bucket do Amazon S3. Ao criar o aplicativo, você especifica o nome do bucket do S3 e o nome do objeto do código do aplicativo. Para ver um tutorial que mostra como fazer upload do código do seu aplicativo, consulte o tutorial [Tutorial: Comece a usar a DataStream API no Managed Service para Apache Flink](getting-started.md).
+ **Crie seu aplicativo Managed Service for Apache Flink**: use um dos métodos a seguir para criar seu aplicativo Amazon MSF:
**nota**  
O Amazon MSF criptografa seu aplicativo por padrão usando. Chaves pertencentes à AWS Você também pode criar seu novo aplicativo usando chaves gerenciadas pelo AWS KMS cliente (CMKs) para criar, possuir e gerenciar suas chaves você mesmo. Para obter informações sobre CMKs, consulte[Gerenciamento de chaves no Amazon Managed Service for Apache Flink](key-management-flink.md).
  + **Crie seu aplicativo Amazon MSF usando o AWS console:** Você pode criar e configurar seu aplicativo usando o AWS console. 

    Quando você cria seu aplicativo usando o console, os recursos dependentes do seu aplicativo (como fluxos de CloudWatch registros, funções do IAM e políticas do IAM) são criados para você. 

    Ao criar seu aplicativo usando o console, você especifica qual versão do Apache Flink seu aplicativo usa selecionando-a no menu suspenso na página **Managed Service for Apache Flink - Criar aplicativo**. 

    Para obter um tutorial sobre como usar o console para criar um aplicativo, consulte o tutorial [Tutorial: Comece a usar a DataStream API no Managed Service para Apache Flink](getting-started.md).
  + **Crie seu aplicativo Amazon MSF usando a AWS CLI**: Você pode criar e configurar seu aplicativo usando a AWS CLI. 

    Ao criar seu aplicativo usando a CLI, você também deve criar manualmente os recursos dependentes do aplicativo (como fluxos de CloudWatch registros, funções do IAM e políticas do IAM).

    Ao criar seu aplicativo usando o CLI, você especifica qual versão do Apache Flink seu aplicativo usa usando o parâmetro `RuntimeEnvironment` da ação `CreateApplication`.
**nota**  
É possível alterar o `RuntimeEnvironment` de um aplicativo existente. Para saber como, consulte [Use atualizações de versão in-place para o Apache Flink](how-in-place-version-upgrades.md).

## Use chaves gerenciadas pelo cliente.
<a name="how-creating-apps-use-cmk"></a>

No Amazon MSF, chaves gerenciadas pelo cliente (CMKs) são um recurso com o qual você pode criptografar os dados do seu aplicativo com uma chave que você cria, possui e gerencia em AWS Key Management Service ()AWS KMS. Para um aplicativo Amazon MSF, isso significa que todos os dados sujeitos a um [ponto de verificação](how-fault.md) ou [snapshot](how-snapshots.md) do Flink são criptografados com uma CMK que você define para esse aplicativo.

Para usar a CMK com seu aplicativo, primeiro você deve [criar seu novo aplicativo](#how-creating-apps-creating) e, em seguida, aplicar uma CMK. Para obter mais informações sobre o uso CMKs, consulte[Gerenciamento de chaves no Amazon Managed Service for Apache Flink](key-management-flink.md).

## Inicie seu aplicativo Managed Service for Apache Flink
<a name="how-creating-apps-starting"></a>

Depois de criar o código do aplicativo, carregá-lo no S3 e criar seu aplicativo Managed Service for Apache Flink, você inicia o aplicativo. O início de um aplicativo Managed Service for Apache Flink normalmente leva vários minutos.

Use um dos métodos a seguir para iniciar o aplicativo:
+ **Inicie seu aplicativo Managed Service for Apache Flink usando o AWS console:** Você pode executar seu aplicativo escolhendo **Executar** na página do seu aplicativo no AWS console.
+ **Inicie seu serviço gerenciado para o aplicativo Apache Flink usando a AWS API:** você pode executar seu aplicativo usando a [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)ação. 

## Verifique seu aplicativo Managed Service for Apache Flink
<a name="how-creating-apps-verifying"></a>

Você pode verificar se o aplicativo está funcionando das seguintes maneiras:
+ **Usando CloudWatch registros:** você pode usar o CloudWatch Logs e o CloudWatch Logs Insights para verificar se o aplicativo está funcionando corretamente. Para obter informações sobre como usar o CloudWatch Logs com seu aplicativo Managed Service for Apache Flink, consulte. [Registro em log e monitoramento no Amazon Managed Service for Apache Flink](monitoring-overview.md)
+ **Usando CloudWatch métricas:** você pode usar CloudWatch métricas para monitorar a atividade do seu aplicativo ou a atividade nos recursos que seu aplicativo usa para entrada ou saída (como streams do Kinesis, streams Firehose ou buckets do Amazon S3). Para obter mais informações sobre CloudWatch métricas, consulte Como [trabalhar com métricas](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html) no Guia CloudWatch do usuário da Amazon.
+ **Monitoramento de locais de saída:** se seu aplicativo grava a saída em um local (como um bucket ou banco de dados do Amazon S3), você pode monitorar esse local para dados gravados.

# Habilite reversões de sistema para seu aplicativo Managed Service for Apache Flink
<a name="how-system-rollbacks"></a>

Com a capacidade de reversão do sistema, você pode obter maior disponibilidade do seu aplicativo Apache Flink em execução no Amazon Managed Service for Apache Flink. A opção por essa configuração permite que o serviço reverta automaticamente o aplicativo para a versão em execução anterior quando uma ação, como `UpdateApplication` ou `autoscaling`, causa falha no código ou na configuração.

**nota**  
Para usar o recurso de reversão do sistema, você precisa aderir à atualização do aplicativo. Os aplicativos existentes não usarão automaticamente a reversão do sistema por padrão.

## Como funciona
<a name="how-rollback-works"></a>

Quando você inicia a operação de um aplicativo, como uma atualização ou escalamento, o Amazon Managed Service for Apache Flink primeiro tenta executá-la. Se ele detectar problemas que impeçam a operação de ser bem-sucedida, como falhas de código ou permissões insuficientes, o serviço iniciará automaticamente uma operação `RollbackApplication`.

A reversão tenta restaurar o aplicativo para a versão anterior que estava sendo executada com êxito, junto com o estado do aplicativo associado. Se a reversão for feita com êxito, o aplicativo continuará processando dados com o mínimo de tempo de inatividade, usando a versão anterior. Se a reversão automática também falhar, o Amazon Managed Service for Apache Flink fará a transição do aplicativo para o status `READY`, para que você possa corrigir o erro e repetir a operação. 

Você deve aderir ao uso de reversões automáticas do sistema. Você pode habilitar isso usando o console ou a API para todas as operações em seu aplicativo a partir de agora. 

O exemplo de solicitação a seguir para a ação `UpdateApplication` habilita reversões de sistema para um aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## Analise cenários comuns para reversão automática do sistema
<a name="common-scenarios"></a>

Os cenários a seguir ilustram onde as reversões automáticas do sistema são benéficas: 
+ **Atualizações do aplicativo:** se você atualizar seu aplicativo com um novo código que contém falhas ao inicializar a tarefa do Flink por meio do método principal, a reversão automática permite que a versão anterior funcional seja restaurada. Outros cenários de atualização em que as reversões do sistema são úteis incluem: 
  + Se seu aplicativo for atualizado para ser executado com um paralelismo maior que [maxParallelism](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto).
  + Se seu aplicativo for atualizado para ser executado com sub-redes incorretas para um aplicativo na VPC, isso resultará em uma falha durante o startup da tarefa do Flink. 
+ **Atualizações da versão do Flink:** quando você atualiza para uma nova versão do Apache Flink e o aplicativo atualizado encontra um problema de compatibilidade de snapshots, a reversão do sistema permite que você reverta automaticamente para a versão anterior do Flink. 
+ **AutoScaling:** quando o aplicativo se expande, mas apresenta problemas de restauração a partir de um ponto de salvamento, devido à incompatibilidade do operador entre o instantâneo e o gráfico de tarefas do Flink.

## Use a operação APIs para reversões do sistema
<a name="operation-apis"></a>

Para fornecer melhor visibilidade, o Amazon Managed Service para Apache Flink tem duas operações APIs relacionadas a aplicativos que podem ajudá-lo a rastrear falhas e reversões de sistema relacionadas.

`ListApplicationOperations`

Essa API lista todas as operações realizadas no aplicativo, incluindo `UpdateApplication`, `Maintenance`, `RollbackApplication` e outras em ordem cronológica inversa. O exemplo a seguir solicita que a ação `ListApplicationOperations` liste as primeiras 10 operações do aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

O seguinte exemplo de solicitação de `ListApplicationOperations` ajuda a filtrar a lista para atualizações anteriores no aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

Essa API fornece informações detalhadas sobre uma operação específica listada por `ListApplicationOperations`, incluindo o motivo da falha, se aplicável. O exemplo de solicitação a seguir para a ação `DescribeApplicationOperation` lista detalhes de uma operação específica do aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

Para obter informações sobre a solução de problemas, consulte [Práticas recomendadas de reversão do sistema](troubleshooting-system-rollback.md).

# Execute um aplicativo Managed Service for Apache Flink
<a name="how-running-apps"></a>

Este tópico contém informações sobre como executar o Managed Service for Apache Flink.

Quando você executa o seu Managed Service for Apache Flink, o serviço cria um trabalho do Apache Flink. Um trabalho do Apache Flink é o ciclo de vida de execução do seu aplicativo Managed Service for Apache Flink. A execução do trabalho e os recursos que ele usa são gerenciados pelo Job Manager. O Job Manager divide a execução do aplicativo em tarefas. Cada tarefa é gerenciada por um gerenciador de tarefas. Ao monitorar o desempenho do seu aplicativo, você pode examinar o desempenho de cada gerenciador de tarefas ou do Job Manager como um todo. 

Para obter informações sobre os trabalhos do Apache Flink, consulte [Trabalhos e programação](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) na documentação do Apache Flink.

## Identifique o status do aplicativo e do trabalho
<a name="how-running-job-status"></a>

Tanto seu aplicativo quanto o trabalho do aplicativo têm um status de execução atual:
+ **Status do aplicativo:** seu aplicativo tem um status atual que descreve a sua fase de execução. Os status do aplicativo incluem o seguinte:
  + **Status de aplicativo estacionário:** seu aplicativo normalmente permanece nesses status até que você faça uma alteração de status:
    + **READY:** Um aplicativo novo ou interrompido está no status PRONTO até que você o execute.
    + **RUNNING:** Um aplicativo que foi iniciado com sucesso está no status EXECUTANDO.
  + **Status transitórios de aplicativos:** Um aplicativo nesses status normalmente está em processo de transição para outro status. Se um aplicativo permanecer em um status transitório por um período de tempo, você poderá interromper o aplicativo usando a [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)ação com o `Force` parâmetro definido como. `true` Esses status incluem o seguinte:
    + `STARTING:`Ocorre após a [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)ação. O aplicativo está passando do status `READY` para `RUNNING`.
    + `STOPPING:`Ocorre após a [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)ação. O aplicativo está passando do status `RUNNING` para `READY`.
    + `DELETING:`Ocorre após a [DeleteApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html)ação. O aplicativo está em processo de ser excluído.
    + `UPDATING:`Ocorre após a [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)ação. O aplicativo está sendo atualizado e voltará ao status `RUNNING` ou `READY`.
    + `AUTOSCALING:`O aplicativo tem a `AutoScalingEnabled` propriedade de [ ParallelismConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)definir como`true`, e o serviço está aumentando o paralelismo do aplicativo. Quando o aplicativo está nesse status, a única ação de API válida que você pode usar é a [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)ação com o `Force` parâmetro definido como`true`. Para obter mais informações sobre escalabilidade, consulte [Use a escalabilidade automática no Managed Service for Apache Flink](how-scaling-auto.md).
    + `FORCE_STOPPING:`Ocorre depois que a [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)ação é chamada com o `Force` parâmetro definido como`true`. O aplicativo está em processo de ser interrompido à força. O aplicativo faz a transição do status `STARTING`, `UPDATING`, `STOPPING`, ou `AUTOSCALING` para o status`READY`.
    + `ROLLING_BACK:`Ocorre depois que a [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)ação é chamada. O aplicativo está em processo de ser revertido para uma versão anterior. O aplicativo faz a transição do status `UPDATING` ou `AUTOSCALING` para o status `RUNNING`.
    + `MAINTENANCE:` Ocorre enquanto o Managed Service for Apache Flink aplica patches ao seu aplicativo. Para obter mais informações, consulte [Gerencie as tarefas de manutenção do Managed Service for Apache Flink.](maintenance.md).

  Você pode verificar o status do seu aplicativo usando o console ou usando a [DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html)ação.
+ **Status do trabalho:** quando seu aplicativo está no status `RUNNING`, o seu trabalho tem um status que descreve a fase de execução atual. Um trabalho começa no status `CREATED` e, em seguida, passa para o status `RUNNING` quando é iniciado. Se ocorrerem condições de erro, o seu aplicativo entrará no seguinte status: 
  + Para aplicativos que usam o Apache Flink 1.11 e versões posteriores, seu aplicativo entra no status `RESTARTING`.
  + Para aplicativos que usam o Apache Flink 1.8 e versões anteriores, seu aplicativo entra no status `FAILING`.

  Em seguida, o aplicativo passa para o status `RESTARTING` ou `FAILED`, dependendo se o trabalho pode ser reiniciado. 

  Você pode verificar o status do trabalho examinando o CloudWatch registro de sua inscrição em busca de alterações de status.

## Execute workloads em lote
<a name="batch-workloads"></a>

O Managed Service for Apache Flink suporta a execução de workloads em batch do Apache Flink. **Em um trabalho em lote, quando um trabalho do Apache Flink atinge o status **FINALIZADO**, o status do aplicativo Managed Service for Apache Flink é definido como PRONTO**. Para obter mais informações sobre os status de trabalho do Flink, consulte [Trabalhos e programação](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/).

# Analise os recursos do aplicativo Managed Service for Apache Flink
<a name="how-resources"></a>

Esta seção descreve os recursos do sistema que seu aplicativo usa. Entender como o Managed Service for Apache Flink provisiona e usa recursos ajudará você a projetar, criar e manter um Managed Service estável e com desempenho para o aplicativo Apache Flink.

## Recursos do aplicativo Managed Service for Apache Flink
<a name="how-resources-kda"></a>

O Managed Service for Apache Flink é um AWS serviço que cria um ambiente para hospedar seu aplicativo Apache Flink. O serviço Managed Service for Apache Flink fornece recursos usando unidades chamadas **Kinesis Processing** Units (). KPUs

Uma KPU representa os seguintes recursos do sistema:
+ Um Núcleo de CPU
+ 4 GB de memória, dos quais um GB é memória nativa e três GB são memória heap
+ 50 GB de espaço em disco

KPUs execute aplicativos em unidades de execução distintas chamadas **tarefas** e **subtarefas**. Você pode pensar em uma subtarefa como o equivalente a um thread.

O número de KPUs disponíveis para um aplicativo é igual à `Parallelism` configuração do aplicativo, dividido pela `ParallelismPerKPU` configuração do aplicativo. 

Para obter mais informações sobre paralelismo de aplicativo, consulte [Implemente a escalabilidade de aplicativos](how-scaling.md).

## Recursos do aplicativo Apache Flink
<a name="how-resources-flink"></a>

O ambiente Apache Flink aloca recursos para seu aplicativo usando unidades chamadas **slots de tarefas**. Quando o Managed Service for Apache Flink aloca recursos para seu aplicativo, ele atribui um ou mais slots de tarefas do Apache Flink a uma única KPU. O número de slots atribuídos a uma única KPU é igual à configuração `ParallelismPerKPU` do seu aplicativo. Para obter mais informações sobre slots de tarefas, consulte [ Programação de trabalho](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) na documentação do Apache Flink.

### Paralelismo do operador
<a name="how-resources-flink-operatorparallelism"></a>

Você pode definir o número máximo de subtarefas que um operador pode usar. Esse valor é chamado de **Paralelismo do operador**. Por padrão, o paralelismo de cada operador em seu aplicativo é igual ao paralelismo do aplicativo. Isso significa que, por padrão, cada operador em seu aplicativo pode usar todas as subtarefas disponíveis no aplicativo, se necessário.

Você pode definir o paralelismo dos operadores em seu aplicativo usando o método `setParallelism`. Usando esse método, você pode controlar o número de subtarefas que cada operador pode usar ao mesmo tempo.

Para obter mais informações sobre operadores, consulte [ Operadores](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) na documentação do Apache Flink.

### Encadeamento de operadores
<a name="how-resources-flink-operatorchaining"></a>

Normalmente, cada operador usa uma subtarefa separada para executar, mas se vários operadores sempre forem executados em sequência, o runtime poderá atribuir todos à mesma tarefa. Esse processo é chamado de **Encadeamento de operadores**.

Vários operadores sequenciais podem ser encadeados em uma única tarefa se todos operarem com os mesmos dados. A seguir encontram-se alguns dos critérios necessários para que isso ocorra:
+ Os operadores fazem um encaminhamento simples de 1 para 1.
+ Todos os operadores têm o mesmo paralelismo de operadores.

Quando seu aplicativo encadeia operadores em uma única subtarefa, ele conserva os recursos do sistema, porque o serviço não precisa realizar operações de rede e alocar subtarefas para cada operador. Para determinar se seu aplicativo está usando o encadeamento de operadores, veja o gráfico de tarefas no console do Managed Service for Apache Flink. Cada vértice no aplicativo representa um ou mais operadores. O gráfico mostra operadores que foram encadeados como um único vértice.

# Cobrança por segundo no Managed Service for Apache Flink
<a name="how-pricing"></a>

Agora, o Managed Service for Apache Flink é cobrado em incrementos de um segundo. Há uma cobrança mínima de dez minutos por aplicativo. A cobrança por segundo é aplicável a aplicativos recém-lançados ou já em execução. Esta seção descreve como o Managed Service for Apache Flink contabiliza e cobra seu uso. Para saber mais sobre os preços do Apache Flink, consulte [Preços do Amazon Managed Service for Apache Flink](https://aws.amazon.com/managed-service-apache-flink/pricing/). 

## Como funciona
<a name="how-resources-kda"></a>

O Managed Service for Apache Flink cobra pela duração e pelo número de **Kinesis Processing Units (KPUs) que são cobradas em incrementos de um segundo nas unidades** suportadas. Regiões da AWS Cada KPU é composta por 1 vCPU de computação e 4 GB de memória. É cobrada uma taxa horária com base no número de pessoas KPUs usadas para executar seus aplicativos. 

Por exemplo, um aplicativo executado por 20 minutos e 10 segundos será cobrado por esse tempo multiplicado pelos recursos usados. Um aplicativo executado por 5 minutos pagará o mínimo de dez minutos multiplicado pelos recursos usados.

O Managed Service for Apache Flink informa o uso em horas. Por exemplo, 15 minutos corresponde a 0,25 horas. 

Para aplicativos Apache Flink, é cobrada uma única KPU adicional por aplicativo, usada para orquestração. Os aplicativos também são cobrados pela execução do armazenamento e pelos backups duráveis. O armazenamento de aplicativos em execução é usado para recursos de processamento de estado no Managed Service for Apache Flink e é cobrado por. GB/month. Durable backups are optional and provide point-in-time recovery for applications, charged per GB/month 

No modo de streaming, o Managed Service for Apache Flink dimensiona automaticamente o número KPUs exigido pelo seu aplicativo de processamento de stream à medida que as demandas de memória e computação flutuam. Você pode optar por provisionar seu aplicativo com o número necessário de KPUs. 

## Região da AWS disponibilidade
<a name="how-pricing-regions"></a>

**nota**  
No momento, o faturamento por segundo não está disponível nas seguintes regiões: AWS GovCloud (Leste dos EUA), AWS GovCloud (Oeste dos EUA), China (Pequim) e China (Ningxia).

A cobrança por segundo está disponível nas Regiões da AWS seguintes: 
+ Leste dos EUA (Norte da Virgínia) – us-east-1
+ Leste dos EUA (Ohio): us-east-2
+ Oeste dos EUA (N. da Califórnia): us-west-1
+ Oeste dos EUA (Oregon): us-west-2
+ África (Cidade do Cabo): af-south-1
+ Ásia-Pacífico (Hong Kong): ap-east-1
+ Ásia-Pacífico (Hyderabad) - ap-south-1
+ Ásia-Pacífico (Jacarta): ap-southeast-3
+ Ásia-Pacífico (Melbourne) - ap-southeast-4
+ Ásia-Pacífico (Mumbai): ap-south-1
+ Asia Pacific (Osaka): ap-northeast-3
+ Ásia-Pacífico (Seul): ap-northeast-2
+ Ásia-Pacífico (Singapura): ap-southeast-1
+ Ásia-Pacífico (Sydney) – ap-southeast-2
+ Ásia-Pacífico (Tóquio) – ap-northeast-1
+ Canadá (Central): ca-central-1
+ Oeste do Canadá (Calgary): ca-west-1
+ Europa (Frankfurt): eu-central-1
+ Europa (Irlanda): eu-west-1
+ Europa (Londres): eu-west-2
+ Europa (Milão): eu-south-1
+ Europa (Paris): eu-west-3
+ Europa (Espanha): eu-south-2
+ Europa (Estocolmo): eu-north-1
+ Europa (Zurique): eu-central-2
+ Israel (Tel Aviv) - il-central-1
+ Oriente Médio (Bahrein): me-south-1
+ Oriente Médio (EAU): me-central-1
+ América do Sul (São Paulo): sa-east-1

## Exemplos de preços
<a name="how-pricing-examples"></a>

É possível encontrar exemplos de preços na página de preços do Managed Service for Apache Flink. Para obter mais informações, consulte [Preços do Amazon Managed Service for Apache Flink](https://aws.amazon.com/managed-service-apache-flink/pricing/). A seguir estão mais alguns exemplos com ilustrações do Relatório de Uso de Custos para cada um.

### Uma workload pesada e de longa duração
<a name="pricing-example-1"></a>

Você tem um grande serviço de transmissão de vídeo e gostaria de criar uma recomendação de vídeo em tempo real com base nas interações de seus usuários. Você usa um aplicativo Apache Flink no Managed Service for Apache Flink para ingerir continuamente eventos de interação do usuário de vários fluxos de dados do Kinesis e processar eventos em tempo real antes de enviá-los para um sistema subsequente. Os eventos de interação do usuário são transformados usando vários operadores. Isso inclui particionar dados por tipo de evento, enriquecer dados com metadados adicionais, classificar dados por data e hora e armazenar dados em buffer por 5 minutos antes da entrega. O aplicativo tem várias etapas de transformação que exigem muita computação e são paralelizáveis. Seu aplicativo Flink está configurado para ser executado com 20 KPUs para acomodar a carga de trabalho. Seu aplicativo usa 1 GB de backup durável de aplicativos todos os dias. As cobranças mensais do Managed Service for Apache Flink são calculadas da seguinte forma:

**Cobranças mensais**

O preço na região Leste dos EUA (Norte da Virgínia) é de 0,11 dólares por hora de KPU. O Managed Service para Apache Flink aloca 50 GB de armazenamento de aplicativos em execução por KPU e cobra 0,10 dólares por GB/mês.
+ Cobranças mensais de KPU: 24 horas \$1 30 dias\$1 (20 KPUs \$1 1 KPU adicional para aplicativo de streaming) \$1 0,11 USD/hora = 1.584,00 USD
+ Cobranças mensais de execução do armazenamento de aplicativos: 30 dias \$1 20 \$1 50 KPUs GB/KPUs \$1 0,10 USD/GB por mês = 100,00 USD
+ Taxas mensais de armazenamento durável de aplicativos: 30 dias \$1 1 GB \$1 US\$1 0,023/GB/mês = US\$1 0,03
+ Total de cobranças: US\$1 1.584,00 \$1 US\$1 100 \$1 US\$1 0,03 = **US\$1 1.684,03**

**Relatório de uso de custos do Managed Service for Apache Flink no console de Gerenciamento de Cobranças e Custos do mês**

Kinesis Analytics
+ US\$1 1.684,03 - Leste dos EUA (Norte da Virgínia)
+ Amazon Kinesis Analytics CreateSnapshot
  + US\$1 0,023 por GB/mês de backups duráveis de aplicativos
    + 1 GB por mês - US\$1 0,03
+ Amazon Kinesis Analytics StartApplication
  + US\$1 0,10 por GB por mês de armazenamento de aplicativos em execução
    + 1.000 GB por mês - US\$1 100
  + US\$1 0,11 por unidade de processamento Kinesis por hora para aplicativos Apache Flink
    + 15.120 KPU por hora - US\$1 1.584

### Uma workload em lote que é executada por aproximadamente 15 minutos todos os dias
<a name="pricing-example-2"></a>

Você usa um aplicativo Apache Flink no Managed Service for Apache Flink para transformar dados de log no Amazon Simple Storage Service (Amazon S3) em modo de lote. Os dados de log são transformados usando vários operadores. Isso inclui aplicar um esquema aos diferentes eventos de log, particionar dados por tipo de evento e classificar dados por data e hora. O aplicativo tem muitas etapas de transformação, mas nenhuma é computacionalmente intensiva. Esse aplicativo ingere dados a 2.000 records/second por 15 minutos todos os dias em um mês de 30 dias. Você não cria nenhum backup durável de aplicativos. As cobranças mensais do Managed Service for Apache Flink são calculadas da seguinte forma:

**Cobranças mensais**

O preço na região Leste dos EUA (Norte da Virgínia) é de 0,11 dólares por hora de KPU. O Managed Service para Apache Flink aloca 50 GB de armazenamento de aplicativos em execução por KPU e cobra 0,10 dólares por GB/mês.
+ Carga de trabalho em lote: durante os 15 minutos por dia, o serviço gerenciado para o aplicativo Apache Flink está processando 2.000 records/second, which takes 2KPUs. 30 days/month \$1 15 minutes/day = 450 minutes/month
+ Cobranças mensais de KPU: 450 minutes/month \$1 (2 KPUs \$1 1 KPU adicional para aplicativo de streaming) \$1 0,11 USD/hora = 2,48 USD
+ Cobranças mensais de execução do armazenamento de aplicativos: 450 minutes/month \$1 2 \$1 50 KPUs GB/KPUs \$1 0,10 USD/GB por mês = 0,11 USD
+ Total de cobranças: US\$1 2,48 \$1 US\$1 0,11 = **US\$1 2,59**

**Relatório de uso de custos do Managed Service for Apache Flink no console de Gerenciamento de Cobranças e Custos do mês**

Kinesis Analytics
+ US\$1 2,59 - Leste dos EUA (Norte da Virgínia)
+ Amazon Kinesis Analytics StartApplication
  + US\$1 0,10 por GB por mês de backups de aplicativos em execução
    + 1,042 GB por mês - US\$1 0,11
  + US\$1 0,11 por unidade de processamento Kinesis por hora para aplicativos Apache Flink
    + 22,5 KPU por hora - US\$1 2,48

### Um aplicativo de teste que para e inicia continuamente na mesma hora, causando várias cobranças mínimas
<a name="pricing-example-3"></a>

Você tem uma grande plataforma de comércio eletrônico que processa milhões de transações todos os dias. Você quer desenvolver detecção de fraudes em tempo real. Você usa um aplicativo Apache Flink no Managed Service for Apache Flink para ingerir eventos de transação do Kinesis Data Streams e processar eventos em tempo real com diferentes etapas de transformação. Isso inclui usar uma janela deslizante para agregar eventos, particionar eventos por tipos e aplicar regras de detecção específicas para diferentes tipos de eventos. Durante o desenvolvimento, você inicia e interrompe o aplicativo várias vezes para testar e depurar seu comportamento. Há ocasiões em que seu aplicativo é executado apenas por alguns minutos. Há uma hora em que você está testando seu aplicativo com 4 KPUs e seu aplicativo não usa nenhum backup de aplicativo durável:
+ Às 10h05, você inicia o aplicativo, que é executado por 30 minutos antes de ser interrompido às 10h35.
+ Às 10h40, você inicia o aplicativo novamente, que é executado por 5 minutos antes de ser interrompido às 10h45.
+ Às 10h50, você reinicia o aplicativo, que é executado por 2 minutos antes de ser interrompido às 10h52.

O Managed Service for Apache Flink cobra um mínimo de 10 minutos de uso cada vez que um aplicativo começa a ser executado. O uso mensal do Managed Service for Apache Flink para o aplicativo será calculado da seguinte forma:
+ Primeira vez que o aplicativo é iniciado e interrompido: 30 minutos de uso
+ Segunda vez que o aplicativo é iniciado e interrompido: 10 minutos de uso (seu aplicativo é executado por 5 minutos arredondados para a carga mínima de 10 minutos)
+ Terceira vez que o aplicativo é iniciado e interrompido: 10 minutos de uso (seu aplicativo é executado por 2 minutos, arredondado para a carga mínima de 10 minutos)

No total, seriam cobrados 50 minutos de uso do seu aplicativo. Se não houver outras vezes no mês em que o aplicativo está em execução, as cobranças mensais do Managed Service for Apache Flink serão calculadas da seguinte forma:

**Cobranças mensais**

O preço na região Leste dos EUA (Norte da Virgínia) é de 0,11 dólares por hora de KPU. O Managed Service para Apache Flink aloca 50 GB de armazenamento de aplicativos em execução por KPU e cobra 0,10 dólares por GB/mês.
+ Cobranças mensais de KPU: 50 minutos\$1 (4 KPUs \$1 1 KPU adicional para aplicativo de streaming) \$1 0,11 USD/hora = 0,46 USD (arredondado para o centavo mais próximo)
+ Taxas mensais de armazenamento de aplicativos em execução: 50 minutos \$1 4 \$1 50 KPUs GB/KPUs \$1 0,10 USD por GB por mês = 0,03 USD (arredondado para o centavo mais próximo)
+ Total de cobranças: US\$1 0,46 \$1 US\$1 0,03 = **US\$1 0,49**

**Relatório de uso de custos do Managed Service for Apache Flink no console de Gerenciamento de Cobranças e Custos do mês**

Kinesis Analytics
+ US\$1 0,49 - Leste dos EUA (Norte da Virgínia)
+ Amazon Kinesis Analytics StartApplication
  + US\$1 0,10 por GB por mês de armazenamento de aplicativos em execução
    + 0,232 GB por mês - US\$1 0,03
  + US\$1 0,11 por unidade de processamento Kinesis por hora para aplicativos Apache Flink
    + 4,167 KPU por hora - US\$1 0,46

# Revise os componentes DataStream da API
<a name="how-datastream"></a>

Seu aplicativo Apache Flink usa a [ DataStream API Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) para transformar dados em um fluxo de dados. 

Esta seção descreve os diferentes componentes que movem, transformam e rastreiam os dados:
+ [Use conectores para mover dados no Managed Service for Apache Flink com a API DataStream](how-connectors.md): esses componentes movem dados entre seu aplicativo e fontes de dados e destinos externos.
+ [Transforme dados usando operadores no Managed Service for Apache Flink com a API DataStream](how-operators.md): esses componentes transformam ou agrupam elementos de dados em seu aplicativo.
+ [Acompanhe eventos no Managed Service para Apache Flink usando a API DataStream](how-time.md): este tópico descreve como o Managed Service for Apache Flink rastreia eventos ao usar a DataStream API.

# Use conectores para mover dados no Managed Service for Apache Flink com a API DataStream
<a name="how-connectors"></a>

Na DataStream API Amazon Managed Service for Apache Flink, *conectores* são componentes de software que movem dados para dentro e para fora de um aplicativo Managed Service for Apache Flink. Os conectores são integrações flexíveis que permitem a leitura de arquivos e diretórios. Os conectores consistem em módulos completos para interagir com os serviços da Amazon e sistemas de terceiros.

Os tipos de conectores incluem o seguinte:
+ [Adicione fontes de dados de transmissão](how-sources.md): forneça dados para seu aplicativo a partir de um fluxo de dados do Kinesis, arquivo ou de outra fonte de dados.
+ [Grave dados usando coletores ](how-sinks.md): envie dados do seu aplicativo para um fluxo de dados do Kinesis, fluxo do Firehose ou outro destino de dados.
+ [Use E/S assíncrona](how-async.md): fornece acesso assíncrono a uma fonte de dados (como um banco de dados) para enriquecer os eventos de fluxo. 

## Conectores disponíveis
<a name="how-connectors-list"></a>

A estrutura do Apache Flink contém conectores para acessar dados de várias fontes. Para obter informações sobre conectores disponíveis na estrutura do Apache Flink, consulte [Conectores](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) na [Documentação do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Atenção**  
Se você tem aplicativos em execução no Flink 1.6, 1.8, 1.11 ou 1.13 e gostaria de executar nas regiões do Oriente Médio (EAU), Ásia-Pacífico (Hyderabad), Israel (Tel Aviv), Europa (Zurique), Oriente Médio (EAU), Ásia-Pacífico (Melbourne) e Ásia-Pacífico (Jacarta), talvez seja necessário recompilar seu archive de aplicativos com um conector atualizado ou fazer o upgrade para o Flink 1.18.   
Os conectores Apache Flink são armazenados em seus próprios repositórios de código aberto. Se você estiver atualizando para a versão 1.18 ou posterior, deverá atualizar suas dependências. Para acessar o repositório dos AWS conectores Apache Flink, consulte. [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
A antiga fonte `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` do Kinesis foi descontinuada e pode ser removida com uma versão futura do Flink. Em vez disso, use o [Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Não há compatibilidade de estado entre `FlinkKinesisConsumer` e `KinesisStreamsSource`. Para obter detalhes, consulte [Migração de trabalhos existentes para a nova fonte do Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) na documentação do Apache Flink.  
 A seguir estão as diretrizes recomendadas:   


**Atualizações de conectores**  

| Versão do Flink | Conector usado | Resolução | 
| --- | --- | --- | 
| 1.19, 1.20 | Fonte do Kinesis  |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector mais recente do Kinesis Data Streams. Ele deve ser da versão 5.0.0 e posteriores. Para obter mais informações, consulte [Conector do Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Coletor do Kinesis |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de coletor mais recente do Kinesis Data Streams. Ele deve ser da versão 5.0.0 e posteriores. Para obter mais informações, consulte [Coletor de fluxos do Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1.19, 1.20 | Fonte do DynamoDB Streams |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de fonte mais recente do DynamoDB Streams. Ele deve ser da versão 5.0.0 e posteriores. Para ter mais informações, consulte [conector do Amazon DynamoDB](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | Coletor do DynamoDB  | Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de coletor mais recente do DynamoDB. Ele deve ser da versão 5.0.0 e posteriores. Para ter mais informações, consulte [conector do Amazon DynamoDB](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Coletor do Amazon SQS |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de coletor mais recente do Amazon SQS. Ele deve ser da versão 5.0.0 e posteriores. Para obter mais informações, consulte [Coletor do Amazon SQS](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/).  | 
| 1.19, 1.20 | Coletor do Amazon Managed Service para Prometheus |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de coletor mais recente do Amazon Managed Service for Prometheus. Ele deve ser da versão 1.0.0 e posteriores. Para obter mais informações, consulte [Coletor do Prometheus](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/).  | 

# Adicione fontes de dados de transmissão ao Managed Service for Apache Flink
<a name="how-sources"></a>

O Apache Flink fornece conectores para leitura de arquivos, soquetes, coleções e fontes personalizadas. No código do seu aplicativo, você usa uma [fonte do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) para receber dados de um fluxo. Esta seção descreve as fontes que estão disponíveis para os serviços da Amazon.

## Use fluxos de dados do Kinesis
<a name="input-streams"></a>

A `KinesisStreamsSource` fornece dados de transmissão para seu aplicativo a partir de um fluxo de dados do Amazon Kinesis. 

### Criar uma `KinesisStreamsSource`
<a name="input-streams-create"></a>

O exemplo de código a seguir demonstra como criar um `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Para obter mais informações sobre o uso de um`KinesisStreamsSource`, consulte o [Amazon Kinesis Data](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Streams Connector na [documentação do Apache Flink e KinesisConnectors ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) nosso exemplo público no Github.

### Crie um `KinesisStreamsSource` que usa um consumidor EFO
<a name="input-streams-efo"></a>

O `KinesisStreamsSource` agora oferece suporte ao [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Se um consumidor do Kinesis usa o EFO, o serviço Kinesis Data Streams fornece sua própria largura de banda dedicada, em vez de fazer com que o consumidor compartilhe a largura de banda fixa do stream com os outros consumidores que estão lendo o stream.

Para obter mais informações sobre como usar o EFO com o consumidor Kinesis, [consulte FLIP-128: Enhanced Fan Out](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) for Kinesis Consumers. AWS 

Você habilita o consumidor EFO definindo os seguintes parâmetros no consumidor do Kinesis:
+ **READER\$1TYPE: ** defina esse parâmetro como **EFO** para que seu aplicativo use um consumidor EFO para acessar os dados do Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME:** defina esse parâmetro como um valor de sequência de caracteres que é exclusivo entre os consumidores desse fluxo. A reutilização de um nome de consumidor no mesmo Kinesis Data Stream fará com que o consumidor anterior que usava esse nome seja excluído. 

Para configurar um `KinesisStreamsSource` para usar o EFO, adicione os seguintes parâmetros ao consumidor:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Para obter um exemplo de um aplicativo do Managed Service for Apache Flink que usa um consumidor EFO, consulte [nosso exemplo público de Conectores do Kinesis no Github](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors).

## Use o Amazon MSK
<a name="input-msk"></a>

A fonte `KafkaSource` fornece dados de transmissão para seu aplicativo a partir de um tópico do Amazon MSK. 

### Criar uma `KafkaSource`
<a name="input-msk-create"></a>

O exemplo de código a seguir demonstra como criar um `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Para obter mais informações sobre como usar uma `KafkaSource`, consulte [Replicação do MSK](earlier.md#example-msk).

# Grave dados com coletores no Managed Service for Apache Flink
<a name="how-sinks"></a>

No código do seu aplicativo, você pode usar qualquer conector de [coletor do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) para gravar em sistemas externos, incluindo serviços do AWS , como Kinesis Data Streams e DynamoDB.

O Apache Flink também fornece coletores para arquivos e soquetes, e você pode implementar coletores personalizados. Entre os vários coletores suportados, os seguintes são usados com frequência: 

## Use fluxos de dados do Kinesis
<a name="sinks-streams"></a>

O Apache Flink fornece informações sobre o [conector do Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) na documentação do Apache Flink.

Para obter um exemplo de um aplicativo que usa um fluxo de dados Kinesis para entrada e saída, consulte [Tutorial: Comece a usar a DataStream API no Managed Service para Apache Flink](getting-started.md).

## Use Amazon Kafka e Amazon Managed Streaming for Apache Kafka (MSK)
<a name="sinks-MSK"></a>

O [conector do Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) fornece amplo suporte para publicação de dados no Apache Kafka e no Amazon MSK, incluindo garantias “exatamente uma vez”. Para aprender a gravar no Kafka, consulte [exemplos de conectores do Kafka](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) na documentação do Apache Flink.

## Use o Amazon S3.
<a name="sinks-s3"></a>

É possível utilizar o `StreamingFileSink` do Apache Flink para gravar objetos em um bucket do Amazon S3.

Para obter um exemplo sobre como gravar objetos no S3, consulte [Exemplo: gravação em um bucket do Amazon S3](earlier.md#examples-s3). 

## Use o Firehose
<a name="sinks-firehose"></a>

O `FlinkKinesisFirehoseProducer` é um coletor do Apache Flink confiável e escalável para armazenar a saída do aplicativo usando o serviço [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/). Esta seção descreve como configurar um projeto do Maven para criar e utilizar um `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Criar uma `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Exemplo de código `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Criar uma `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

O exemplo de código a seguir demonstra como criar um `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Exemplo de código `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

O exemplo de código a seguir demonstra como criar e configurar um `FlinkKinesisFirehoseProducer` e enviar dados de um fluxo de dados do Apache Flink para o serviço Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Para ver um tutorial completo sobre como usar o coletor do Firehose, consulte [Exemplo: gravação no Firehose](earlier.md#get-started-exercise-fh).

# Use I/O assíncrono no serviço gerenciado para Apache Flink
<a name="how-async"></a>

Um I/O operador assíncrono enriquece os dados do stream usando uma fonte de dados externa, como um banco de dados. O Managed Service for Apache Flink enriquece os eventos de transmissão de forma assíncrona para que as solicitações possam ser agrupadas em lotes para maior eficiência. 

Para obter mais informações, consulte [E/S assíncrona](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) na Documentação do Apache Flink.

# Transforme dados usando operadores no Managed Service for Apache Flink com a API DataStream
<a name="how-operators"></a>

Para transformar os dados recebidos em um Managed Service for Apache Flink, você usa um *operador* do Apache Flink. Um operador do Apache Flink transforma um ou mais fluxos de dados em um novo fluxo de dados. O novo fluxo de dados contém dados modificados do fluxo de dados original. O Apache Flink fornece mais de 25 operadores de processamento de fluxo pré-definidos. Para obter mais informações, consulte [Operadores](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) na Documentação do Apache Flink.

**Topics**
+ [Use operadores de transformação](#how-operators-transform)
+ [Use operadores de agregação](#how-operators-agg)

## Use operadores de transformação
<a name="how-operators-transform"></a>

Veja a seguir um exemplo de uma transformação de texto simples em um dos campos de um fluxo de dados JSON. 

Esse código cria um fluxo de dados transformado. O novo fluxo de dados tem os mesmos dados do fluxo original, com a string “` Company`" anexada ao conteúdo do campo `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Use operadores de agregação
<a name="how-operators-agg"></a>

Este é um exemplo de um operador de agregação. O código cria um fluxo de dados agregado. O operador cria uma janela em cascata de 5 segundos e retorna a soma dos valores de `PRICE` dos registros na janela com o mesmo valor de `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Para obter mais exemplos de código, consulte [Exemplos de como criar e trabalhar com aplicativos no Managed Service for Apache Flink.](examples-collapsibles.md). 

# Acompanhe eventos no Managed Service para Apache Flink usando a API DataStream
<a name="how-time"></a>

O Managed Service for Apache Flink rastreia eventos usando os seguintes timestamps:
+ **Tempo de processamento:** refere-se à hora do sistema da máquina que está executando a respectiva operação.
+ **Hora do evento:** refere-se à hora em que cada evento individual ocorreu em seu dispositivo produtor.
+ **Tempo de ingestão:** refere-se à hora em que os eventos entram no serviço Managed Service for Apache Flink.

Você define o tempo usado pelo ambiente de streaming usando`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Para obter mais informações sobre timestamps, consulte [Geração de marcas d’água](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) na Documentação do Apache Flink.

# Analisar os componentes da API Table
<a name="how-table"></a>

Seu aplicativo Apache Flink usa a [API de tabela do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/) para interagir com dados em um fluxo usando um modelo relacional. Você usa a API de tabela para acessar dados usando fontes de tabela e, em seguida, usa funções de tabela para transformar e filtrar dados da tabela. Você pode transformar e filtrar dados tabulares usando funções de API ou comandos SQL. 

Esta seção contém os seguintes tópicos:
+ [Conectores da API Table](how-table-connectors.md): esses componentes movem dados entre seu aplicativo e fontes de dados e destinos externos.
+ [Atributos de tempo da API Table](how-table-timeattributes.md): este tópico descreve como o Managed Service for Apache Flink rastreia eventos ao usar a API de tabela.

# Conectores da API Table
<a name="how-table-connectors"></a>

No modelo de programação do Apache Flink, os conectores são componentes que seu aplicativo usa para ler ou gravar dados de fontes externas, como outros serviços. AWS 

Com a API de tabela do Apache Flink, você pode usar os seguintes tipos de conectores:
+ [Fontes da API Table](#how-table-connectors-source): use conectores de origem da API de tabela para criar tabelas dentro da sua `TableEnvironment` usando chamadas de API ou consultas SQL.
+ [Coletores da API Table](#how-table-connectors-sink): use comandos SQL para gravar dados de tabela em fontes externas, como um tópico do Amazon MSK ou um bucket do Amazon S3.

## Fontes da API Table
<a name="how-table-connectors-source"></a>

Você cria uma fonte de tabela a partir de um fluxo de dados. O código a seguir cria uma tabela a partir de um tópico do Amazon MSK:

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

Para obter mais informações sobre fontes de tabelas, consulte [Tabela e conectores SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) na Documentação do Apache Flink.

## Coletores da API Table
<a name="how-table-connectors-sink"></a>

Para gravar dados da tabela em um coletor, você cria o coletor em SQL e, em seguida, executa o coletor baseado em SQL no objeto `StreamTableEnvironment`.

O exemplo de código a seguir demonstra como gravar dados de tabela em um coletor do Amazon S3:

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 Você pode usar o parâmetro `format` para controlar qual formato o Managed Service for Apache Flink usa para gravar a saída no coletor. Para obter informações sobre formatos, consulte [ Conectores suportados](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) na Documentação do Apache Flink.

## Fontes e coletores definidos pelo usuário
<a name="how-table-connectors-userdef"></a>

Você pode usar os conectores Apache Kafka existentes para enviar dados de e para outros serviços de AWS , como Amazon MSK e Amazon S3. Para interagir com outras fontes de dados e destinos, você pode definir suas próprias fontes e coletores. Para obter mais informações, consulte [ Fontes e coletores definidos pelo usuário](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/) na Documentação do Apache Flink.

# Atributos de tempo da API Table
<a name="how-table-timeattributes"></a>

Cada registro em um fluxo de dados tem vários timestamps que definem quando os eventos relacionados ao registro ocorreram:
+ **Hora do evento**: um timestamp definido pelo usuário que define quando o evento que criou o registro ocorreu.
+ **Tempo de ingestão**: o momento em que seu aplicativo recuperou o registro do fluxo de dados.
+ **Tempo de processamento**: o momento em que seu aplicativo processou o registro.

Quando a API Table do Apache Flink cria janelas com base nas horas registradas, você define quais desses timestamps ela usa usando o método `setStreamTimeCharacteristic`. 

Para obter mais informações sobre o uso de timestamps com a API Table, consulte [ Atributos de tempo](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/) e [ Processamento oportuno de fluxos](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/) na Documentação do Apache Flink.

# Use o Python com Managed Service for Apache Flink
<a name="how-python"></a>

**nota**  
Se você estiver desenvolvendo o aplicativo Python Flink em um novo Mac com o chip Apple Silicon, poderá encontrar alguns problemas [conhecidos com](https://issues.apache.org/jira/browse/FLINK-26981) as dependências do Python da versão 1.15. PyFlink Nesse caso, recomendamos executar o interpretador Python no Docker. Para step-by-step obter instruções, consulte [Desenvolvimento da PyFlink versão 1.15 no Apple Silicon Mac](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon).

A versão 2.2 do Apache Flink inclui suporte para criar aplicativos usando Python versão 3.12; o suporte para Python versão 3.8 foi removido. Para obter mais informações, consulte [Flink Python Docs](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/). Você cria um serviço gerenciado para o aplicativo Apache Flink usando Python fazendo o seguinte:
+ Crie o código do seu aplicativo Python como um arquivo de texto com um `main` método.
+ Empacote o arquivo de código do seu aplicativo e todas as dependências do Python ou Java em um arquivo zip e faça o upload para um bucket do Amazon S3.
+ Crie seu serviço gerenciado para o aplicativo Apache Flink, especificando a localização do código do Amazon S3, as propriedades do aplicativo e as configurações do aplicativo.

Em um alto nível, a Python Table API é um invólucro em torno da API Java Table. Para obter informações sobre a API Table do Python, consulte o [ Tutorial da API Table](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/) na documentação do Apache Flink.

# Programe seu aplicativo Managed Service for Apache Flink Python
<a name="how-python-programming"></a>

Você codifica seu aplicativo Python para o Managed Service for Apache Flink usando a API Apache Flink Python Table. O mecanismo Apache Flink traduz as instruções da Python Table API (em execução na Python VM) em instruções da Java Table API (executadas na Java VM). 

Para isso, você usa a Python Table API da seguinte maneira:
+ Crie uma referência para o `StreamTableEnvironment`.
+ Crie `table` objetos a partir dos dados de streaming de origem executando consultas na `StreamTableEnvironment` referência.
+ Execute consultas em seus `table` objetos para criar tabelas de saída.
+ Grave suas tabelas de saída em seus destinos usando um`StatementSet`.

Para começar a usar a Python Table API no Managed Service for Apache Flink, consulte. [Conceitos básicos do Amazon Managed Service for Apache Flink para Python](gs-python.md)

## Leia e grave dados de transmissão
<a name="how-python-programming-readwrite"></a>

Para ler e gravar dados de streaming, você executa consultas SQL no ambiente de tabela.

### Criar uma tabela
<a name="how-python-programming-readwrite-createtable"></a>

O exemplo de código a seguir demonstra uma função definida pelo usuário que cria uma consulta SQL. A consulta SQL cria uma tabela que interage com um stream do Kinesis:

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### Leia dados de transmissão
<a name="how-python-programming-readwrite-read"></a>

O exemplo de código a seguir demonstra como usar a consulta `CreateTable` SQL anterior em uma referência de ambiente de tabela para ler dados:

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### Grave dados de transmissão
<a name="how-python-programming-readwrite-write"></a>

O exemplo de código a seguir demonstra como usar a consulta SQL do `CreateTable` exemplo para criar uma referência de tabela de saída e como usar a `StatementSet` para interagir com as tabelas para escrever dados em um stream do Kinesis de destino:

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## Leia propriedades de runtime
<a name="how-python-programming-properties"></a>

Você pode usar propriedades de runtime para configurar seu aplicativo sem alterar o código do aplicativo.

Você especifica as propriedades do aplicativo da mesma forma que com um Managed Service for Apache Flink para aplicativo Java. É possível especificar as propriedades do runtime das seguintes maneiras:
+ Usando a [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)ação.
+ Usando a [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)ação.
+ Usar seu aplicativo usando o console.

Você recupera as propriedades do aplicativo no código lendo um arquivo json chamado `application_properties.json` aquele criado pelo runtime do Managed Service for Apache Flink.

O exemplo de código a seguir demonstra a leitura das propriedades do aplicativo a partir do arquivo `application_properties.json`:

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

O exemplo de código de função definido pelo usuário a seguir demonstra a leitura de um grupo de propriedades do objeto de propriedades do aplicativo: recupera:

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

O exemplo de código a seguir demonstra a leitura de uma propriedade chamada INPUT\$1STREAM\$1KEY de um grupo de propriedades que o exemplo anterior retorna:

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## Crie o pacote de código do seu aplicativo
<a name="how-python-programming-package"></a>

Depois de criar seu aplicativo Python, você empacota seu arquivo de código e dependências em um arquivo zip.

Seu arquivo zip deve conter um script python com um método `main` e, opcionalmente, pode conter o seguinte:
+ Arquivos de código Python adicionais
+ Código Java definido pelo usuário em arquivos JAR
+ Bibliotecas Java em arquivos JAR

**nota**  
O arquivo zip do aplicativo deve conter todas as dependências do aplicativo. Você não pode referenciar bibliotecas de outras fontes para seu aplicativo.

# Crie seu aplicativo Managed Service for Apache Flink Python
<a name="how-python-creating"></a>

## Especifique seus arquivos de código
<a name="how-python-creating-code"></a>

Quando você tiver criado o pacote de código do seu aplicativo, você deve carregá-lo em um bucket do Amazon S3. Em seguida, você cria seu aplicativo usando o console ou a [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)ação.

Ao criar seu aplicativo usando a [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)ação, você especifica os arquivos de código e arquivamentos em seu arquivo zip usando um grupo especial de propriedades do aplicativo chamado`kinesis.analytics.flink.run.options`. Você pode definir os seguintes tipos de arquivos:
+ **python**: um arquivo de texto contendo um método principal do Python.
+ **jarfile**: um arquivo Java JAR contendo funções Java definidas pelo usuário.
+ **pyFiles**: um arquivo de recursos do Python contendo recursos a serem usados pelo aplicativo.
+ **pyArchives**: um arquivo zip contendo arquivos de recursos para o aplicativo.

Para obter mais informações sobre os tipos de arquivo de código do Apache Flink Python, consulte [ Interface da linha de comando](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/) na documentação do Apache Flink.

**nota**  
O Managed Service for Apache Flink não suporta os tipos de arquivo `pyModule`, `pyExecutable` ou `pyRequirements`. Todo o código, requisitos e dependências devem estar em seu arquivo zip. Você não pode especificar dependências a serem instaladas usando pip. 

O exemplo de trecho json a seguir demonstra como especificar a localização dos arquivos no arquivo zip do seu aplicativo:

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# Monitore seu aplicativo Managed Service for Apache Flink Python
<a name="how-python-monitoring"></a>

Você usa o CloudWatch log do seu aplicativo para monitorar seu serviço gerenciado para o aplicativo Apache Flink Python.

O Managed Service for Apache Flink registra as seguintes mensagens para aplicativos Python:
+ Mensagens escritas no console usando o método do `print()`aplicativo`main`.
+ Mensagens enviadas em funções definidas pelo usuário usando o pacote `logging`. O exemplo de código a seguir demonstra a gravação no log do aplicativo a partir de uma função definida pelo usuário:

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ Mensagens de erro lançadas pelo aplicativo.

  Se o aplicativo gerar uma exceção na função `main`, ela aparecerá nos registros do seu aplicativo.

  O exemplo a seguir demonstra uma entrada de registro para uma exceção lançada a partir do código Python:

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**nota**  
Devido a problemas de desempenho, recomendamos que você use somente mensagens de log personalizadas durante o desenvolvimento do aplicativo. 

## Registros de consulta com o CloudWatch Insights
<a name="how-python-monitoring-insights"></a>

A consulta do CloudWatch Insights a seguir pesquisa registros criados pelo ponto de entrada do Python enquanto executa a função principal do seu aplicativo:

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# Use as propriedades de runtime no Managed Service for Apache Flink
<a name="how-properties"></a>

Você pode usar *propriedades de runtime* para configurar seu aplicativo sem recompilar o código do aplicativo. 

**Topics**
+ [Gerencie propriedades de runtime usando o console](#how-properties-console)
+ [Gerencie propriedades de runtime usando a CLI](#how-properties-cli)
+ [Acesse propriedades de runtime em um aplicativo Managed Service for Apache Flink](#how-properties-access)

## Gerencie propriedades de runtime usando o console
<a name="how-properties-console"></a>

Você pode adicionar, atualizar ou remover propriedades de runtime do seu aplicativo Managed Service for Apache Flink usando o Console de gerenciamento da AWS.

**nota**  
Se você estiver usando uma versão anterior compatível do Apache Flink e quiser atualizar seus aplicativos existentes para o Apache Flink 1.19.1, pode fazer isso usando atualizações de versão do Apache Flink in-place. Com as atualizações de versão in-place, você mantém a rastreabilidade do aplicativo em relação a um único ARN nas versões do Apache Flink, incluindo snapshots, registros, métricas, tags, configurações do Flink e muito mais. Você pode usar esse recurso em um estado `RUNNING` e `READY`. Para obter mais informações, consulte [Use atualizações de versão in-place para o Apache Flink](how-in-place-version-upgrades.md).

**Atualizar propriedades de runtime for Managed Service for Apache Flink**

1. Faça login no e abra Console de gerenciamento da AWS o console Amazon MSF em https://console.aws.amazon.com /flink.

1. Selecione o seu aplicativo Managed Service for Apache Flink. Selecione **Detalhes do aplicativo**.

1. Na página do seu aplicativo, selecione **Configurar**.

1. Expanda a seção **Propriedades**.

1. Use os controles na seção **Propriedades** para definir um grupo de propriedades com pares de valor-chave. Use esses controles para adicionar, atualizar ou remover grupos de propriedades e propriedades de runtime.

1. Selecione **Atualizar**.

## Gerencie propriedades de runtime usando a CLI
<a name="how-properties-cli"></a>

É possível adicionar, atualizar ou remover propriedades de runtime usando [AWS CLI](https://docs.aws.amazon.com/cli). 

Esta seção inclui exemplos de solicitações de ações de API para configurar propriedades de runtime para um aplicativo. Para obter informações sobre como usar um arquivo JSON como entrada de uma ação da API, consulte [Exemplo de código de API para o Managed Service for Apache Flink](api-examples.md).

**nota**  
Substitua o exemplo de ID da conta (*`012345678901`*) nos exemplos a seguir pelo ID da sua conta.

### Adicione propriedades de runtime ao criar um aplicativo
<a name="how-properties-create"></a>

O exemplo a seguir de solicitação para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) adiciona dois grupos de propriedades de runtime (`ProducerConfigProperties` e `ConsumerConfigProperties`) quando você cria um aplicativo:

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### Adicione e atualize propriedades de runtime em um aplicativo existente
<a name="how-properties-update"></a>

O exemplo a seguir de solicitação para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) adiciona ou atualiza as propriedades de runtime de um aplicativo existente:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**nota**  
Se você usar uma chave que não tenha uma propriedade de runtime correspondente em um grupo de propriedades, o Managed Service for Apache Flink adicionará o par de chave-valor como uma nova propriedade. Se você usar uma chave para uma propriedade de runtime existente em um grupo de propriedades, o Managed Service for Apache Flink atualizará o valor da propriedade. 

### Remova propriedades de runtime
<a name="how-properties-remove"></a>

O exemplo a seguir de solicitação para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) remove todas as propriedades de runtime e grupos de propriedades de um aplicativo existente:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**Importante**  
Se você omitir um grupo de propriedades existente ou uma chave de propriedade existente em um grupo de propriedades, esse grupo de propriedades ou propriedade será removido.

## Acesse propriedades de runtime em um aplicativo Managed Service for Apache Flink
<a name="how-properties-access"></a>

Você recupera as propriedades de runtime no código do aplicativo Java usando o método estático `KinesisAnalyticsRuntime.getApplicationProperties()`, que retorna um objeto `Map<String, Properties>`.

O exemplo de código Java a seguir recupera as propriedades de runtime do seu aplicativo:

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

Você recupera um grupo de propriedades (como um objeto `Java.Util.Properties`) da seguinte forma:

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

Normalmente, você configura uma fonte ou coletor do Apache Flink passando no objeto `Properties` sem precisar recuperar as propriedades individuais. O exemplo de código a seguir demonstra como criar uma fonte do Flink transmitindo em um objeto `Properties` recuperado das propriedades de runtime:

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

Para obter exemplos de código, consulte [Exemplos de como criar e trabalhar com aplicativos no Managed Service for Apache Flink.](examples-collapsibles.md).

# Use os conectores do Apache Flink com o Managed Service for Apache Flink
<a name="how-flink-connectors"></a>

Os conectores do Apache Flink são componentes de software que movem dados para dentro e para fora de um aplicativo do Amazon Managed Service for Apache Flink. Os conectores são integrações flexíveis que permitem a leitura de arquivos e diretórios. Os conectores consistem em módulos completos para interagir com os serviços da Amazon e sistemas de terceiros.

Os tipos de conectores incluem o seguinte:
+ **Fontes:** forneça dados para seu aplicativo a partir de um fluxo de dados do Kinesis, arquivo, tópico do Apache Kafka ou de outras fontes de dados.
+ **Coletores:** envie dados do seu aplicativo para um fluxo de dados do Kinesis, fluxo do Firehose, tópico do Apache Kafka ou outros destinos de dados.
+ **E/S assíncrona:** fornece acesso assíncrono a uma fonte de dados como um banco de dados para enriquecer os fluxos. 

Os conectores do Apache Flink são armazenados em seus próprios repositórios de origem. A versão e o artefato dos conectores Apache Flink mudam dependendo da versão do Apache Flink que você está usando e se você está usando a API DataStream, Tabela ou SQL. 

O Amazon Managed Service for Apache Flink oferece suporte a mais de 40 conectores de origem e coletor pré-construídos do Apache Flink. A tabela a seguir apresenta um resumo dos conectores mais populares e das versões associadas. Também é possível criar coletores personalizados usando a estrutura Async-sink. Para obter mais informações, consulte o título [The Generic Asynchronous Base Sink](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/) na documentação do Apache Flink.

 Para acessar o repositório dos AWS conectores Apache Flink, consulte. [flink-connector-aws](https://github.com/apache/flink-connector-aws)

## Conectores para Flink 2.2
<a name="connectors-flink-2-2"></a>

Ao atualizar para o Flink 2.2, você precisa atualizar as dependências do conector para versões compatíveis com o tempo de execução do Flink 2.x. Os conectores Flink são lançados independentemente do tempo de execução do Flink, e nem todos os conectores têm uma versão compatível com o Flink 2.x ainda. A tabela a seguir resume a disponibilidade dos conectores comumente usados no Amazon Managed Service para Apache Flink até o momento em que este artigo foi escrito:


**Conectores para Flink 2.2**  

| Conector | Versão Flink 2.0\$1 | Observações | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4.0.0-2.0 | Recomendado para o Flink 2.2 | 
| Kinesis Data Streams (fonte) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Recomendado para o Flink 2.2 | 
| Kinesis Data Streams (coletor) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Recomendado para o Flink 2.2 | 
| FileSystem (S3, HDFS) | Empacotado com o Flink | Integrado à distribuição do Flink — sempre disponível | 
| JDBC | Ainda não foi lançado para 2.x | Nenhuma versão compatível com o Flink 2.x disponível | 
| OpenSearch | Ainda não foi lançado para 2.x | Nenhuma versão compatível com o Flink 2.x disponível | 
| Elasticsearch | Ainda não foi lançado para 2.x | Considere migrar para o conector OpenSearch  | 
| Amazon Managed Service for Prometheus | Ainda não foi lançado para 2.x | Nenhuma versão compatível com o Flink 2.x no momento em que este artigo foi escrito | 

Se seu aplicativo depende de um conector que ainda não tem uma versão 2.2 do Flink, você tem duas opções: esperar que o conector lance uma versão compatível ou avaliar se você pode substituí-lo por uma alternativa (por exemplo, usando o catálogo JDBC ou um coletor personalizado).

**Problemas conhecidos**
+ Os aplicativos que usam o caminho `KinesisStreamsSource` with EFO (Enhanced Fan-Out/ SubscribeToShard) introduzido nos conectores v5.0.0 e v6.0.0 podem falhar quando os streams do Kinesis são refragmentados. Esse é um problema conhecido na comunidade. Para obter mais informações, consulte [FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648).
+ Os aplicativos que usam o caminho `KinesisStreamsSource` com EFO (Enhanced Fan-Out/ SubscribeToShard) introduzido nos conectores v5.0.0 e v6.0.0 `KinesisStreamsSink` podem enfrentar impasses se o aplicativo Flink estiver sob contrapressão, resultando na interrupção completa do processamento de dados em um ou mais. TaskManagers Uma operação de parada forçada e uma operação de inicialização do aplicativo são necessárias para recuperar o aplicativo. Este é um subcaso do problema conhecido na comunidade: [FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071).

## Conectores para versões mais antigas do Flink
<a name="connectors-older-versions"></a>


**Conectores para versões mais antigas do Flink**  

| Conector | Flink versão 1.15 | Flink versão 1.18 | Flink versões 1.19 | Flink versões 1.20 | 
| --- | --- | --- | --- | --- | 
| Kinesis Data Stream — API de origem DataStream e tabela | flink-connector-kinesis, 1.15.4 | flink-connector-kinesis, 4.3.0-1.18 | flink-connector-kinesis, 5.0.0-1.19 | flink-connector-kinesis, 5.0.0-1.20 | 
| Kinesis Data Stream — Sink — DataStream e API de tabelas | flink-connector-aws-kinesis-streams, 1.15.4 | flink-connector-aws-kinesis-streams, 4.3.0-1.18 | flink-connector-aws-kinesis-streams, 5.0.0-1.19 | flink-connector-aws-kinesis-streams, 5.0.0-1.20 | 
| Kinesis Data Source/Sink Streams - - SQL | flink-sql-connector-kinesis, 1.15.4 | flink-sql-connector-kinesis, 4.3.0-1.18 | flink-sql-connector-kinesis, 5.0.0-1.19 | flink-sql-connector-kinesis-streams, 5.0.0-1.20 | 
| Kafka - DataStream e API de tabela | flink-connector-kafka, 1.15.4 | flink-connector-kafka, 3.2.0-1.18 | flink-connector-kafka, 3.3.0-1.19 | flink-connector-kafka, 3.3.0-1.20 | 
| Kafka - SQL | flink-sql-connector-kafka, 1.15.4 | flink-sql-connector-kafka, 3.2.0-1.18 | flink-sql-connector-kafka, 3.3.0-1.19 | flink-sql-connector-kafka, 3.3.0-1.20 | 
| Firehose - DataStream e API de tabelas | flink-connector-aws-kinesis- mangueira de incêndio, 1.15.4 | flink-connector-aws-firehose, 4.3.0-1.18 | flink-connector-aws-firehose, 5.0.0-1.19 | flink-connector-aws-firehose, 5.0.0-1.20 | 
| Firehose - SQL | flink-sql-connector-aws-kinesis-firemangueira, 1.15.4 | flink-sql-connector-aws-mangueira de incêndio, 4.3.0-1.18 | flink-sql-connector-aws-mangueira de incêndio, 5.0.0-1.19 | flink-sql-connector-aws-mangueira de incêndio, 5.0.0-1.20 | 
| DynamoDB DataStream — e API de tabelas | flink-connector-dynamodb, 3.0.0-1.15 | flink-connector-dynamodb, 4.3.0-1.18 | flink-connector-dynamodb, 5.0.0-1.19 | flink-connector-dynamodb, 5.0.0-1.20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb, 3.0.0-1.15 | flink-sql-connector-dynamodb, 4.3.0-1.18 | flink-sql-connector-dynamodb, 5.0.0-1.19 | flink-sql-connector-dynamodb, 5.0.0-1.20 | 
| OpenSearch - DataStream e API de tabela | - | flink-connector-opensearch, 1.2.0-1.18 | flink-connector-opensearch, 1.2.0-1.19 | flink-connector-opensearch, 1.2.0-1.19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-sql-connector-opensearch, 1.2.0-1.19 | flink-sql-connector-opensearch, 1.2.0-1.19 | 
| Amazon Managed Service para Prometheus DataStream | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-prometheus, 1.0.0-1.19 | flink-connector-prometheus, 1,0,0-1,20 | 
| Amazon SQS DataStream e API de tabelas | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-sqs, 5.0.0-1.19 | flink-connector-sqs, 5.0.0-1.20 | 

Para saber mais sobre conectores no Amazon Managed Service for Apache Flink, consulte:
+ [DataStream Conectores API](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [Conectores da API Table](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### Problemas conhecidos
<a name="connectors-known-issues"></a>

Há um problema conhecido do Apache Flink de código aberto com o conector Apache Kafka no Apache Flink versão 1.15. Esse problema foi resolvido em versões posteriores do Apache Flink. 

Para obter mais informações, consulte [Problemas conhecidos](flink-1-15-2.md#flink-1-15-known-issues). 

# Implemente a tolerância a falhas no Managed Service for Apache Flink
<a name="how-fault"></a>

O ponto de verificação é o método usado para implementar a tolerância a falhas no Amazon Managed Service for Apache Flink. Um *ponto de verificação* é um up-to-date backup de um aplicativo em execução que é usado para se recuperar imediatamente de uma interrupção ou failover inesperado do aplicativo. 

Para obter detalhes sobre pontos de verificação em aplicativos do Apache Flink, consulte [Pontos de verificação](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/) na Documentação do Apache Flink.

Um *snapshot* é um backup do estado do aplicativo criado e gerenciado manualmente. Os snapshots permitem que você restaure seu aplicativo para um estado anterior chamando [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html). Para obter mais informações, consulte [Gerenciar backups de aplicativos usando snapshots](how-snapshots.md).

Se o ponto de verificação estiver habilitado para seu aplicativo, o serviço fornecerá tolerância a falhas criando e carregando backups dos dados do aplicativo no caso de reinicializações inesperadas do aplicativo. Essas reinicializações inesperadas de aplicativos podem ser causadas por reinicializações inesperadas de tarefas, falhas de instância etc. Isso dá ao aplicativo a mesma semântica da execução sem falhas durante essas reinicializações. 

Se os instantâneos estiverem habilitados para o aplicativo e configurados usando os do aplicativo [ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html), o serviço fornecerá uma semântica de processamento exatamente uma vez durante as atualizações do aplicativo ou durante a escalabilidade ou a manutenção relacionados ao serviço.

## Configure pontos de verificação no Managed Service for Apache Flink
<a name="how-fault-configure"></a>

Você pode configurar o comportamento de ponto de verificação do seu aplicativo. Você pode definir se ele persiste no estado de ponto de verificação, com que frequência ele salva seu estado nos pontos de verificação e o intervalo mínimo entre o final de uma operação de ponto de verificação e o início de outra.

Você define as seguintes configurações usando as operações de API [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) ou [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html):
+ `CheckpointingEnabled`: indica se o ponto de verificação está ativado no aplicativo.
+ `CheckpointInterval`: contém o tempo em milissegundos entre as operações do ponto de verificação (persistência).
+ `ConfigurationType`: defina esse valor para `DEFAULT` para usar o comportamento do ponto de verificação padrão. Defina esse valor para `CUSTOM` para configurar outros valores.
**nota**  
O comportamento padrão do ponto de verificação é o seguinte:  
**CheckpointingEnabled:** verdadeiro
**CheckpointInterval:** 60000
**MinPauseBetweenCheckpoints:** 5000
Se **ConfigurationType**estiver definido como`DEFAULT`, os valores anteriores serão usados, mesmo que sejam definidos para outros valores usando o AWS Command Line Interface ou definindo os valores no código do aplicativo.
**nota**  
Para o Flink 1.15 em diante, o Managed Service for Apache Flink usará `stop-with-savepoint` durante a criação automática de instantâneos, ou seja, a atualização, a escalabilidade ou a parada do aplicativo. 
+ `MinPauseBetweenCheckpoints`: o tempo mínimo em milissegundos entre o final de uma operação de ponto de verificação e o início de outra. Definir esse valor impede o aplicativo de verificar continuamente quando uma operação de ponto de verificação levar mais tempo do que `CheckpointInterval`.

## Analise os exemplos de API de ponto de verificação
<a name="how-fault-examples"></a>

Esta seção inclui exemplos de solicitações de ações de API para configurar pontos de verificação para um aplicativo. Para obter informações sobre como usar um arquivo JSON como entrada de uma ação da API, consulte [Exemplo de código de API para o Managed Service for Apache Flink](api-examples.md).

### Configure o ponto de verificação para um novo aplicativo
<a name="how-fault-examples-create-config"></a>

O exemplo de solicitação a seguir para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) configura o ponto de verificação quando você está criando um aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### Desative o ponto de verificação para um novo aplicativo
<a name="how-fault-examples-create-disable"></a>

O exemplo de solicitação a seguir para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) desativa o ponto de verificação quando você está criando um aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### Configure o ponto de verificação para um aplicativo existente
<a name="how-fault-examples-update-config"></a>

O exemplo de solicitação a seguir para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) configura o ponto de verificação para um aplicativo existente:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### Desative o ponto de verificação para um aplicativo existente
<a name="how-fault-examples-update-update-disable"></a>

O exemplo de solicitação a seguir para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) desativa o ponto de verificação para um aplicativo existente:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# Gerenciar backups de aplicativos usando snapshots
<a name="how-snapshots"></a>

Um *snapshot* é a implementação Managed Service for Apache Flink de um *Ponto de salvamento* do Apache Flink. Um instantâneo é um backup do estado do aplicativo acionado, criado e gerenciado pelo usuário ou serviço. Para obter informações sobre os pontos de salvamento do Apache Flink, consulte [Pontos de salvamento](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) na Documentação do Apache Flink. Usando snapshots, você pode reiniciar um aplicativo a partir de um instantâneo específico do estado do aplicativo.

**nota**  
Recomendamos que seu aplicativo crie um instantâneo várias vezes ao dia para reiniciar adequadamente com os dados de estado corretos. A frequência correta para seus instantâneos depende da lógica de negócios do seu aplicativo. Tirar snapshots com frequência permite recuperar dados mais recentes, mas aumenta os custos e exige mais recursos do sistema.

No Managed Service for Apache Flink, você gerencia snapshots usando as seguintes ações de API:
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

Para saber o limite do número de snapshots por aplicativo, consulte [Quota do notebook do Managed Service for Apache Flink e Studio](limits.md). Se seu aplicativo atingir o limite de snapshots, a criação manual de um snapshot falhará com um `LimitExceededException`. 

O Managed Service for Apache Flink nunca exclui snapshots. Será necessário excluir manualmente seus snapshots usando a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html).

Para carregar um snapshot salvo do estado do aplicativo ao iniciar um aplicativo, use o parâmetro [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html) da ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) ou [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html).

**Topics**
+ [Gerenciar criação automática de snapshots](#how-fault-snapshot-update)
+ [Restaurar a partir de um snapshot que contém dados de estado incompatíveis](#how-fault-snapshot-restore)
+ [Analisar exemplos da API de snapshot](#how-fault-snapshot-examples)

## Gerenciar criação automática de snapshots
<a name="how-fault-snapshot-update"></a>

Se `SnapshotsEnabled` estiver definido como `true` no [ ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)para o aplicativo, o Managed Service for Apache Flink cria e usa automaticamente instantâneos quando o aplicativo é atualizado, escalado ou interrompido para fornecer uma semântica de processamento exatamente uma vez.

**nota**  
Definir `ApplicationSnapshotConfiguration::SnapshotsEnabled` como `false` levará à perda de dados durante as atualizações do aplicativo.

**nota**  
O Managed Service for Apache Flink aciona pontos de salvamento intermediários durante a criação do snapshot. Para a versão 1.15 ou superior do Flink, os pontos de salvamento intermediários não causam mais efeitos secundários. Consulte [Acionamento de pontos de salvamento](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

Os snapshots criados automaticamente têm as seguintes qualidades:
+ O instantâneo é gerenciado pelo serviço, mas você pode ver o instantâneo usando a [ ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)ação. Os snapshots criados automaticamente são contabilizados no seu limite de snapshots.
+ Se seu aplicativo exceder o limite de snapshots, os snapshots criados manualmente falharão, mas o serviço Managed Service for Apache Flink ainda criará snapshots com êxito quando o aplicativo for atualizado, escalado ou interrompido. Você deve excluir manualmente os instantâneos usando a [ DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)ação antes de criar mais instantâneos manualmente.

## Restaurar a partir de um snapshot que contém dados de estado incompatíveis
<a name="how-fault-snapshot-restore"></a>

Como os snapshots contêm informações sobre operadores, a restauração dos dados de estado de um snapshot para um operador que foi alterado desde a versão anterior do aplicativo pode ter resultados inesperados. Um aplicativo falhará se tentar restaurar dados de estado de um snapshot que não corresponda ao operador atual. O aplicativo com falha ficará preso no estado `STOPPING` ou `UPDATING`. 

Para permitir que um aplicativo restaure a partir de um snapshot que contém dados de estado incompatíveis, defina o `AllowNonRestoredState` parâmetro do [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)para `true` usar a [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)ação.

Você verá o seguinte comportamento quando um aplicativo for restaurado a partir de um snapshot obsoleto:
+ **Operador adicionado:** se um novo operador for adicionado, o ponto de salvamento não terá dados de estado para o novo operador. Nenhuma falha ocorrerá e não é necessário configurar `AllowNonRestoredState`.
+ **Operador excluído:** se um operador existente for excluído, o ponto de salvamento terá dados de estado do operador ausente. Ocorrerá uma falha, a menos que `AllowNonRestoredState` esteja configurada como`true`.
+ **Operador modificado:** se forem feitas alterações compatíveis, como alterar o tipo de um parâmetro para um tipo compatível, o aplicativo poderá restaurar a partir do snapshot obsoleto. Para obter mais informações sobre restauração a partir de snapshot, consulte [Pontos de salvamento](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) na Documentação do Apache Flink. Um aplicativo que usa o Apache Flink versão 1.8 ou posterior pode ser restaurado a partir de um snapshot com um esquema diferente. Um aplicativo que usa o Apache Flink versão 1.6 não pode ser restaurado. Para two-phase-commit coletores, recomendamos usar o snapshot do sistema (SWs) em vez do snapshot criado pelo usuário (). CreateApplicationSnapshot

  Para Flink, o Managed Service for Apache Flink aciona pontos de salvamento intermediários durante a criação do snapshot. Para o Flink 1.15 ou superior, os pontos de salvamento intermediários não causam mais efeitos secundários. Consulte [Acionamento de pontos de salvamento](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

Se você precisar retomar um aplicativo incompatível com os dados existentes do ponto de salvamento, recomendamos que você ignore a restauração a partir do snapshot definindo o parâmetro da `ApplicationRestoreType` ação como. [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)`SKIP_RESTORE_FROM_SNAPSHOT`

Para obter mais informações sobre como o Apache Flink lida com dados de estado incompatíveis, consulte [Evolução do esquema do estado](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/) na *Documentação do Apache Flink*.

## Analisar exemplos da API de snapshot
<a name="how-fault-snapshot-examples"></a>

Esta seção inclui exemplos de solicitações de ações de API para usar snapshot com um aplicativo. Para obter informações sobre como usar um arquivo JSON como entrada de uma ação da API, consulte [Exemplo de código de API para o Managed Service for Apache Flink](api-examples.md).

### Habilitar snapshots para um aplicativo
<a name="how-fault-savepoint-examples-enable"></a>

O exemplo de solicitação a seguir para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) habilita snapshots para um aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### Criar um snapshot
<a name="how-fault-savepoint-examples-create"></a>

O exemplo de solicitação da ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html) a seguir cria um snapshot do estado atual do aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Listar snapshots de um aplicativo
<a name="how-fault-snapshot-examples-list"></a>

O exemplo de solicitação da ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) a seguir lista os primeiros 50 snapshots do estado atual do aplicativo:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### Listar detalhes de um snapshot de aplicativo
<a name="how-fault-snapshot-examples-describe"></a>

O exemplo de solicitação a seguir para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) lista detalhes de um snapshot de aplicativo específico:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Excluir um snapshot
<a name="how-fault-snapshot-examples-delete"></a>

O exemplo de solicitação da ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) a seguir exclui um snapshot salvo anteriormente. Você pode obter o valor `SnapshotCreationTimestamp` usando um [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) ou [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html):

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### Reiniciar um aplicativo usando um snapshot nomeado
<a name="how-fault-snapshot-examples-load-custom"></a>

O exemplo a seguir de solicitação para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) inicia o aplicativo usando o estado salvo de um snapshot específico:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### Reiniciar um aplicativo usando o snapshot mais recente
<a name="how-fault-snapshot-examples-load-recent"></a>

O exemplo de solicitação para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) a seguir inicia o aplicativo usando o snapshot mais recente:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### Reiniciar um aplicativo sem snapshot
<a name="how-fault-snapshot-examples-load-none"></a>

O exemplo de solicitação para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) a seguir inicia o aplicativo sem carregar o estado do aplicativo, mesmo que um snapshot esteja presente:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# Use atualizações de versão in-place para o Apache Flink
<a name="how-in-place-version-upgrades"></a>

Com as atualizações de versão in-place para o Apache Flink, você mantém a rastreabilidade do aplicativo em relação a um único ARN em todas as versões do Apache Flink. Isso inclui instantâneos, registros, métricas, tags, configurações do Flink, aumentos no limite de recursos e muito mais. VPCs 

Você pode realizar atualizações de versão in-place para o Apache Flink para atualizar aplicativos existentes para uma nova versão do Flink no Amazon Managed Service for Apache Flink. Para realizar essa tarefa, você pode usar o AWS CLI AWS CloudFormation,, AWS SDK ou o. Console de gerenciamento da AWS

**nota**  
Não é possível usar atualizações de versão in-place do Apache Flink com o Amazon Managed Service for Apache Flink Studio.

**Topics**
+ [Atualize aplicativos usando atualizações de versão in-place do Apache Flink](upgrading-applications.md)
+ [Atualize o aplicativo para uma nova versão do Apache Flink](upgrading-application-new-version.md)
+ [Reverta as atualizações de aplicativos](rollback.md)
+ [Práticas recomendadas e orientações gerais sobre as atualizações de aplicativos](best-practices-recommendations.md)
+ [Precauções e problemas conhecidos com atualizações de aplicativos](precautions.md)
+ [Atualizando para o Flink 2.2: guia completo](flink-2-2-upgrade-guide.md)
+ [Guia de compatibilidade estadual para atualizações do Flink 2.2](state-compatibility.md)

# Atualize aplicativos usando atualizações de versão in-place do Apache Flink
<a name="upgrading-applications"></a>

Antes de começar, recomendamos que você assista a este vídeo: [Atualizações de versão in-place](https://www.youtube.com/watch?v=f1qGGdaP2XI).

Para realizar atualizações de versão in-loco para o Apache Flink, você pode usar o AWS CLI, AWS CloudFormation, AWS SDK ou o. Console de gerenciamento da AWS Você pode usar esse recurso com qualquer aplicativo existente que você usa com o Managed Service for Apache Flink em um estado `READY` ou `RUNNING`. Ele usa a UpdateApplication API para adicionar a capacidade de alterar o tempo de execução do Flink.

## Antes de atualizar: atualize seu aplicativo Apache Flink
<a name="before-upgrading"></a>

Ao criar seus aplicativos Flink, você os empacota com suas dependências em um JAR de aplicativo e carrega o JAR no seu bucket do Amazon S3. A partir daí, o Amazon Managed Service for Apache Flink executa o trabalho no novo runtime do Flink que você selecionou. Talvez seja necessário atualizar seus aplicativos para obter compatibilidade com o runtime do Flink para o qual você deseja fazer o upgrade. Pode haver inconsistências entre as versões do Flink que causam falha na atualização da versão. Normalmente, isso ocorre com conectores para fontes (entrada) ou destinos (coletores, saída) e dependências do Scala. O Flink 1.15 e versões posteriores no Managed Service for Apache Flink são independentes do Scala, e seu JAR deve conter a versão do Scala que você planeja usar.

**Para atualizar seu aplicativo**

1. Leia os conselhos da comunidade do Flink sobre como atualizar aplicativos com o estado. Consulte [Atualização dos aplicativos e versões do Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

1. Leia a lista de problemas e limitações conhecidos. Consulte [Precauções e problemas conhecidos com atualizações de aplicativos](precautions.md).

1. Atualize as dependências e teste seus aplicativos localmente. Normalmente, essas dependências são:

   1. O runtime e a API do Flink.

   1. Conectores recomendados para o novo runtime do Flink. Você pode encontrá-los em [Versões lançadas](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html) para o runtime específico para o qual deseja atualizar.

   1. Scala – O Apache Flink é independente de Scala, a partir do Flink 1.15. É necessário incluir as dependências do Scala que deseja usar no JAR do aplicativo.

1. Crie um novo aplicativo JAR no zipfile e carregue no Amazon S3. Recomendamos que você use um nome diferente do JAR/zipfile anterior. Se precisar reverter, você usará essas informações.

1. Se estiver executando aplicativos com estado, é altamente recomendável que você tire um snapshot do aplicativo atual. Isso permite fazer uma reversão com estado preservador se você encontrar problemas durante ou após a atualização. 

# Atualize o aplicativo para uma nova versão do Apache Flink
<a name="upgrading-application-new-version"></a>

Você pode atualizar seu aplicativo Flink usando a [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)ação.

Você pode chamar a API `UpdateApplication` de várias maneiras:
+ Use o fluxo de trabalho de **configuração** existente no Console de gerenciamento da AWS.
  + Acesse seu aplicativo no Console de gerenciamento da AWS.
  + Selecione **Configurar**.
  + Selecione o novo runtime e o snapshot a partir do qual você deseja começar, também conhecido como configuração de restauração. Use a configuração mais recente como configuração de restauração para iniciar o aplicativo a partir do snapshot mais recente. Aponte para o novo aplicativo atualizado JAR/zip no Amazon S3.
+ Use a ação AWS CLI [de atualização do aplicativo](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html).
+ Uso CloudFormation (CFN).
  + Atualize o [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment)campo. Anteriormente, CloudFormation excluía o aplicativo e criava um novo, fazendo com que seus instantâneos e outros históricos do aplicativo fossem perdidos. Agora CloudFormation atualiza seu RuntimeEnvironment local e não exclui seu aplicativo. 
+ Use o AWS SDK.
  + Consulte a documentação do SDK para obter a linguagem de programação de sua preferência. Consulte [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html). 

Você pode realizar a atualização enquanto o aplicativo estiver no estado `RUNNING` ou enquanto o aplicativo estiver parado no estado `READY`. O Amazon Managed Service for Apache Flink faz a validação para verificar a compatibilidade entre a versão original do runtime e a versão do runtime de destino. Essa verificação de compatibilidade é executada quando você executa [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)enquanto estiver no `RUNNING` estado ou na próxima, [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)se você atualizar enquanto estiver no `READY` estado. 

## Atualize um aplicativo no estado `RUNNING`
<a name="upgrading-running"></a>

O exemplo a seguir mostra a atualização de um aplicativo no `RUNNING` estado chamado `UpgradeTest` Flink 1.18 no Leste dos EUA (Norte da Virgínia) usando AWS CLI e iniciando o aplicativo atualizado a partir do snapshot mais recente. 

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ Se você habilitou os snapshots do serviço e deseja continuar o aplicativo a partir do snapshot mais recente, o Amazon Managed Service for Apache Flink verifica se o runtime do aplicativo `RUNNING` atual é compatível com o runtime de destino selecionado.
+ Se você especificou um snapshot a partir do qual continuar o runtime de destino, o Amazon Managed Service for Apache Flink verifica se o runtime de destino é compatível com o snapshot especificado. Se a verificação de compatibilidade falhar, sua solicitação de atualização será rejeitada e seu aplicativo permanecerá inalterado no estado `RUNNING`.
+ Se você optar por iniciar seu aplicativo sem um snapshot, o Amazon Managed Service for Apache Flink não executará nenhuma verificação de compatibilidade.
+ Se o aplicativo atualizado falhar ou ficar preso em um estado `UPDATING` transitivo, siga as instruções na seção [Reverta as atualizações de aplicativos](rollback.md) para retornar ao estado íntegro. 

**Fluxo de processo para execução de aplicativos com estado**

![\[O diagrama a seguir apresenta o fluxo de trabalho recomendado para atualizar o aplicativo durante a execução. Supomos que o aplicativo tem um estado e que você ativou os snapshots. Na atualização para esse fluxo de trabalho, você restaura o aplicativo a partir do snapshot mais recente que foi automaticamente obtido pelo Amazon Managed Service for Apache Flink antes da atualização.\]](http://docs.aws.amazon.com/pt_br/managed-flink/latest/java/images/in-place-update-while-running.png)


## Atualize um aplicativo no estado **READY**
<a name="upgrading-ready"></a>

O exemplo a seguir mostra a atualização de um aplicativo no estado `READY` chamado `UpgradeTest` para o Flink 1.18 no Leste dos EUA (Norte da Virgínia) usando o AWS CLI. Não há um instantâneo especificado para iniciar o aplicativo porque o aplicativo não está em execução. Você pode especificar um instantâneo ao emitir a solicitação de inicialização do aplicativo.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ Você pode atualizar o runtime de seus aplicativos no estado `READY` para qualquer versão do Flink. O Amazon Managed Service for Apache Flink não executa nenhuma verificação até você iniciar seu aplicativo.
+  O Amazon Managed Service for Apache Flink só executa verificações de compatibilidade com o snapshot que você selecionou para iniciar o aplicativo. Estas são verificações básicas de compatibilidade de acordo com a [Tabela de Compatibilidade do Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table). Elas verificam apenas a versão do Flink em que o snapshot foi tirado e a versão do Flink que você deseja. Se o runtime do Flink do snapshot selecionado for incompatível com o novo runtime do aplicativo, a solicitação inicial poderá ser rejeitada.

**Fluxo de processo para aplicativos com estado “ready”**

![\[O diagrama a seguir representa o fluxo de trabalho recomendado para atualizar o aplicativo durante o estado ready. Supomos que o aplicativo tem um estado e que você ativou os snapshots. Na atualização para esse fluxo de trabalho, você restaura o aplicativo a partir do snapshot mais recente que foi automaticamente obtido pelo Amazon Managed Service for Apache Flink no momento em que o aplicativo foi interrompido.\]](http://docs.aws.amazon.com/pt_br/managed-flink/latest/java/images/in-place-update-while-ready.png)


# Reverta as atualizações de aplicativos
<a name="rollback"></a>

Se você tiver problemas com seu aplicativo ou encontrar inconsistências no código do aplicativo entre as versões do Flink, poderá reverter usando o AWS CLI, AWS CloudFormation, AWS SDK ou o. Console de gerenciamento da AWS Os exemplos a seguir mostram como é a reversão em diferentes cenários de falha.

## A atualização do runtime foi bem-sucedida, o aplicativo está funcionando no estado `RUNNING`, mas o trabalho está falhando e está sendo reiniciado continuamente
<a name="succeeded-restarting"></a>

Suponha que você esteja tentando atualizar um aplicativo com estado chamado `TestApplication` do Flink 1.15 para o Flink 1.18 no Leste dos EUA (Norte da Virgínia). No entanto, o aplicativo Flink 1.18 atualizado está falhando ao iniciar ou está sendo reiniciado constantemente, mesmo que o aplicativo esteja no estado `RUNNING`. Este é um cenário de falha comum. Para evitar mais tempo de inatividade, recomendamos que você reverta seu aplicativo imediatamente para a versão anterior que estava funcional (Flink 1.15) e diagnostique o problema posteriormente.

Para reverter o aplicativo para a versão em execução anterior, use o AWS CLI comando [rollback-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) ou a ação da API. [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) Essa ação da API reverte as alterações feitas que resultaram na versão mais recente. Em seguida, ela reinicia seu aplicativo usando o último snapshot bem-sucedido. 

É altamente recomendável que você faça um snapshot do seu aplicativo existente antes de tentar fazer a atualização. Isso ajuda a evitar a perda de dados ou a necessidade de reprocessar dados. 

Nesse cenário de falha, não CloudFormation reverterá o aplicativo para você. Você deve atualizar o CloudFormation modelo para apontar para o tempo de execução anterior e para o código anterior para CloudFormation forçar a atualização do aplicativo. Caso contrário, CloudFormation presume que seu aplicativo tenha sido atualizado ao fazer a transição para o `RUNNING` estado.

## Como reverter um aplicativo que está travado em `UPDATING`
<a name="stuck-updating"></a>

Se seu aplicativo ficar preso no `AUTOSCALING` estado `UPDATING` or após uma tentativa de atualização, o Amazon Managed Service para Apache Flink oferece o AWS CLI comando [rollback-applications](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html), ou a ação de [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)API que pode reverter o aplicativo para a versão anterior ao travamento ou estado bloqueado. `UPDATING` `AUTOSCALING` Essa API reverte as alterações feitas que causaram o travamento do aplicativo no estado transitivo `UPDATING` ou `AUTOSCALING`.

# Práticas recomendadas e orientações gerais sobre as atualizações de aplicativos
<a name="best-practices-recommendations"></a>
+ Teste o novo job/runtime sem estado em um ambiente que não seja de produção antes de tentar uma atualização de produção.
+ Considere primeiro testar a atualização com estado em um aplicativo que não seja de produção.
+ Certifique-se de que seu novo gráfico de tarefas tenha um estado compatível com o snapshot que você usará para iniciar seu aplicativo atualizado.
  + Verifique se os tipos armazenados nos estados do operador permanecem os mesmos. Se o tipo mudou, o Apache Flink não consegue restaurar o estado do operador.
  + Certifique-se de que o Operador que IDs você definiu usando o `uid` método permaneça o mesmo. O Apache Flink tem uma forte recomendação para atribuir itens exclusivos IDs aos operadores. Para obter mais informações, consulte [Atribuição de operador IDs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids) na documentação do Apache Flink.

    Se você não atribuir IDs aos seus operadores, o Flink os gera automaticamente. Nesse caso, eles podem depender da estrutura do programa e, caso alterados, podem causar problemas de compatibilidade. O Flink usa o Operador IDs para combinar o estado do instantâneo com o operador. A alteração do operador IDs faz com que o aplicativo não seja iniciado ou que o estado armazenado no instantâneo seja descartado e o novo operador seja iniciado sem estado.
  + Não altere a chave usada para armazenar o estado da chave.
  + Não modifique o tipo de entrada de operadores com estado, como janela ou junção. Isso altera implicitamente o tipo do estado interno do operador, causando uma incompatibilidade de estado.

# Precauções e problemas conhecidos com atualizações de aplicativos
<a name="precautions"></a>

## O Kafka Commit no ponto de verificação falha repetidamente após a reinicialização do agente
<a name="apache-kafka-connector"></a>

Há um problema conhecido do Apache Flink de código aberto com o conector do Apache Kafka no Flink versão 1.15 causado por um bug crítico do Kafka Client de código aberto no Kafka Client 2.8.1. Para obter mais informações, consulte [Kafka Commit on checkpoint falha repetidamente após a reinicialização do broker](https://issues.apache.org/jira/browse/FLINK-28060) e [KafkaConsumer não consegue recuperar a conexão com o coordenador do grupo após a exceção](https://issues.apache.org/jira/browse/KAFKA-13840). commitOffsetAsync 

Para evitar esse problema, recomendamos usar o Apache Flink 1.18 ou posterior no Amazon Managed Service for Apache Flink.

## Limitações conhecidas da compatibilidade de estados
<a name="state-precautions"></a>
+ Se estiver usando a API Table, o Apache Flink não garante a compatibilidade de estados entre as versões do Flink. Para obter mais informações, consulte [Atualizações e evolução](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution) com estado na Documentação do Apache Flink.
+ Os estados do Flink 1.6 são incompatíveis com o Flink 1.18. A API rejeita sua solicitação se você tentar atualizar da versão 1.6 para a versão 1.18 e posterior com estado. Você pode atualizar para 1.8, 1.11, 1.13 e 1.15, fazer um snapshot e, em seguida, atualizar para 1.18 e versões posteriores. Para obter mais informações, consulte [Atualizar aplicativos e versões do Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/) na documentação do Apache Flink.

## Problemas conhecidos com o Flink Kinesis Connector
<a name="kinesis-connector-precautions"></a>
+ Se você estiver usando o Flink 1.11 ou anterior e usando o `amazon-kinesis-connector-flink` conector para suporte Enhanced-fan-out (EFO), deverá tomar medidas adicionais para uma atualização contínua para o Flink 1.13 ou posterior. Isso ocorre devido à alteração no nome do pacote do conector. Para obter mais informações, consulte [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink).

  O conector `amazon-kinesis-connector-flink` para Flink 1.11 e versões anteriores usa o empacotamento `software.amazon.kinesis`, enquanto o conector Kinesis para Flink 1.13 e versões posteriores usa `org.apache.flink.streaming.connectors.kinesis`. Use essa ferramenta para apoiar sua migração: [amazon-kinesis-connector-flink-state-migrator](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator).
+ Se você estiver usando o Flink 1.13 ou anterior com `FlinkKinesisProducer` e atualizando para o Flink 1.15 ou posterior, você deve continuar a usar o `FlinkKinesisProducer` no Flink 1.15 ou posterior em vez do `KinesisStreamsSink` mais recente para conseguir fazer uma atualização com estado. No entanto, se você já tiver um conjunto `uid` personalizado em sua coletor, poderá alternar para `KinesisStreamsSink` porque o `FlinkKinesisProducer` não mantém o estado. O Flink o tratará como o mesmo operador porque um `uid` personalizado foi definido.

## Aplicativos Flink criados no Scala
<a name="scala-precautions"></a>
+ A partir do Flink 1.15, o Apache Flink não inclui o Scala no runtime. Você deve incluir a versão do Scala que deseja usar e outras dependências do Scala em seu código JAR/zip ao atualizar para o Flink 1.15 ou posterior. Para obter mais informações, consulte [Amazon Managed Service for Apache Flink com a versão 1.15.2 do Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html).
+ Se seu aplicativo usa o Scala e o Flink 1.11 ou anterior (Scala 2.11) está sendo atualizado para o Flink 1.13 (Scala 2.12), certifique-se de que seu código use o Scala 2.12. Caso contrário, seu aplicativo Flink 1.13 pode falhar ao encontrar classes Scala 2.11 no runtime do Flink 1.13.

## Considerações ao reverter o aplicativo Flink
<a name="downgrading-precautions"></a>
+ É possível fazer o downgrade dos aplicativos Flink, mas isso fica limitado aos casos em que o aplicativo estava sendo executado anteriormente com a versão mais antiga do Flink. Para uma atualização com estado, o Managed Service para Apache Flink exigirá o uso de um snapshot obtido de uma versão correspondente ou anterior para o downgrade.
+ Se você estiver atualizando seu tempo de execução do Flink 1.13 ou posterior para o Flink 1.11 ou anterior e se seu aplicativo usar o back-end de HashMap estado, seu aplicativo falhará continuamente.

# Atualizando para o Flink 2.2: guia completo
<a name="flink-2-2-upgrade-guide"></a>

Este guia fornece step-by-step instruções para atualizar seu aplicativo Amazon Managed Service for Apache Flink do Flink 1.x para o Flink 2.2. Esta é uma atualização de versão importante com alterações significativas que exigem planejamento e testes cuidadosos.

**A atualização da versão principal é unidirecional**  
A operação de atualização pode mover seu aplicativo do Flink 1.x para 2.2 com preservação de estado, mas você não pode voltar de 2.2 para 1.x com o estado 2.2. Se seu aplicativo ficar insalubre após a atualização, use a API Rollback para retornar à versão 1.x com seu estado 1.x original a partir do snapshot mais recente.

## Pré-requisitos
<a name="upgrade-guide-prerequisites"></a>

Antes de começar sua atualização:
+ Resenha [Alterações e suspensões significativas](flink-2-2.md#flink-2-2-breaking-changes)
+ Resenha [Guia de compatibilidade estadual para atualizações do Flink 2.2](state-compatibility.md)
+ Garanta que você tenha um ambiente não produtivo para testes
+ Documente a configuração e as dependências atuais do aplicativo

## Entendendo seus caminhos de migração
<a name="upgrade-guide-migration-paths"></a>

Sua experiência de atualização depende da compatibilidade do seu aplicativo com o Flink 2.2. Compreender esses caminhos ajuda você a se preparar adequadamente e definir expectativas realistas.

**Caminho 1: binário compatível e estado do aplicativo**

**O que esperar:**
+ Invocar a operação de atualização
+ Conclua a migração para 2.2 com a transição do status do aplicativo: → → `RUNNING` `UPDATING` `RUNNING`
+ Preserve todo o estado do aplicativo sem perda ou reprocessamento de dados
+ Mesma experiência das migrações de versões secundárias

Ideal para: aplicativos sem estado ou aplicativos que usam serialização compatível (Avro, esquemas Protobuf compatíveis, sem coleções) POJOs 

**Caminho 2: incompatibilidades binárias**

**O que esperar:**
+ Invocar a operação de atualização
+ A operação falha e revela a incompatibilidade binária por meio da API de operações e dos registros
+ Com a reversão automática ativada: os aplicativos são revertidos automaticamente em minutos sem sua intervenção
+ Com a reversão automática desativada: os aplicativos permanecem em execução sem processamento de dados; você reverte manualmente para a versão anterior
+ Depois que o binário for corrigido, use a [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) para uma experiência semelhante ao Caminho 1

Ideal para: aplicativos removidos APIs que são detectados durante a inicialização do trabalho do Flink

**Caminho 3: estado incompatível do aplicativo**

**O que esperar:**
+ Invocar a operação de atualização
+ A migração parece ter sido bem-sucedida inicialmente
+ Os aplicativos entram em ciclos de reinicialização em segundos quando a restauração do estado falha
+ Detecte falhas por meio de CloudWatch métricas que mostram reinicializações contínuas
+ Invocar manualmente a operação Rollback
+ Retorne à produção em poucos minutos após o início da reversão
+ Analise [Migração estadual](state-compatibility.md#state-compat-migration) sua inscrição

Ideal para: aplicativos com incompatibilidades de serialização de estado (POJOs com coleções, determinado estado serializado do Kryo)

**nota**  
É altamente recomendável criar uma réplica do seu aplicativo de produção e testar cada uma das seguintes fases da atualização na réplica antes de seguir as mesmas etapas do seu aplicativo de produção.

## Fase 1: Preparação
<a name="upgrade-guide-phase-1"></a>

**Atualizar o código do aplicativo**

Atualize o código do seu aplicativo para ser compatível com o Flink 2.2:
+ **Atualize as dependências do Flink** para a versão 2.2.0 em seu ou `pom.xml` `build.gradle`
+ **Atualize as dependências do conector** para versões compatíveis com o Flink 2.2 (consulte) [disponibilidade do conector](flink-2-2.md#flink-2-2-connectors)
+ **Remova o uso obsoleto** da API:
  + Substitua a DataSet API por DataStream API ou API de tabela/SQL
  + Substitua o legacy`SourceFunction`/`SinkFunction`por FLIP-27 Source e FLIP-143 Sink APIs
  + Substitua o uso da API Scala pela API Java
+ **Atualização para o Java 17**

**Carregar código de aplicativo atualizado**
+ Crie seu aplicativo JAR com dependências do Flink 2.2
+ Faça o upload para o Amazon S3 com um **nome de arquivo diferente** do seu JAR atual (por exemplo,) `my-app-flink-2.2.jar`
+ Observe o bucket e a chave do S3 para uso na etapa de atualização

## Fase 2: ativar a reversão automática
<a name="upgrade-guide-phase-2"></a>

A reversão automática permite que o Amazon Managed Service para Apache Flink reverta automaticamente para a versão anterior se a atualização falhar.

**Verifique o status da reversão automática**

*Console de gerenciamento da AWS:*

1. Navegue até seu aplicativo

1. Escolher **configuração**

1. Em **Configurações do aplicativo**, verifique se a **reversão do sistema** está ativada

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**Ativar a reversão automática (se não estiver ativada)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## Fase 3: tirar uma foto (opcional)
<a name="upgrade-guide-phase-3"></a>

Se os instantâneos automáticos estiverem habilitados para seu aplicativo, você poderá pular esta etapa, caso contrário, tire um instantâneo do seu aplicativo para salvar o estado do seu aplicativo antes da atualização.

**Tire um instantâneo do aplicativo em execução**

*Console de gerenciamento da AWS:*

1. Navegue até seu aplicativo

1. Escolha **instantâneos**

1. Escolha **Criar instantâneo**

1. Insira um nome de instantâneo (por exemplo,`pre-flink-2.2-upgrade`)

1. Selecione **Criar**

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**Verificar a criação do snapshot**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

Espere até que `SnapshotStatus` seja `READY` antes de continuar.

## Fase 4: Atualizar o aplicativo
<a name="upgrade-guide-phase-4"></a>

Você pode atualizar seu aplicativo Flink usando a [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)ação.

Você pode chamar a API `UpdateApplication` de várias maneiras:
+ **Use o Console de gerenciamento da AWS**
  + Acesse seu aplicativo no Console de gerenciamento da AWS.
  + Selecione **Configurar**.
  + Selecione o novo runtime e o snapshot a partir do qual você deseja começar, também conhecido como configuração de restauração. Use a configuração mais recente como configuração de restauração para iniciar o aplicativo a partir do snapshot mais recente. Aponte para o novo aplicativo atualizado JAR/zip no Amazon S3.
+ **Use a AWS CLI[https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html)**ação.
+ **Use CloudFormation.**
  + Atualize o `RuntimeEnvironment` campo. Anteriormente, o CloudFormation excluía o aplicativo e criava um novo, fazendo com que seus snapshots e outros históricos do aplicativo fossem perdidos. Agora CloudFormation atualiza seu `RuntimeEnvironment` local e não exclui seu aplicativo.
+ **Use o AWS SDK.**
  + Consulte a documentação do SDK para obter a linguagem de programação de sua preferência. Consulte [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

Você pode realizar a atualização enquanto o aplicativo estiver no estado `RUNNING` ou enquanto o aplicativo estiver parado no estado `READY`. O Amazon Managed Service para Apache Flink valida a compatibilidade entre a versão original do tempo de execução e a versão do tempo de execução de destino. Essa verificação de compatibilidade é executada quando você executa `UpdateApplication` enquanto está no `RUNNING` estado ou na próxima, `StartApplication` se você atualiza enquanto está no `READY` estado.

**Atualização do estado RUNNING**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**Atualização do estado READY**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## Fase 5: atualização do monitor
<a name="upgrade-guide-phase-5"></a>

**Verificação de compatibilidade**
+ Use a API de operações para verificar o status da atualização. Se houver incompatibilidades binárias ou problemas com a inicialização do trabalho, a operação de atualização falhará com os registros.
+ Se a operação de atualização tiver sido bem-sucedida, mas o aplicativo estiver preso em ciclos de reinicialização, isso significa que o estado é incompatível com a nova versão do Flink ou que há um problema com o código atualizado. Análise [Guia de compatibilidade estadual para atualizações do Flink 2.2](state-compatibility.md) sobre como identificar problemas de incompatibilidade de estado.

**Monitore a integridade do aplicativo**

*Estado do aplicativo:*
+ O status do aplicativo deve ser alterado: `RUNNING` → `UPDATING` → `RUNNING`
+ Verifique o tempo de execução do aplicativo. Se for 2.2, a operação de atualização foi bem-sucedida.
+ Se seu aplicativo estiver ativo`RUNNING`, mas ainda estiver no tempo de execução antigo, a reversão automática foi iniciada. A API de operações mostrará a operação como`FAILED`. Verifique os registros para encontrar a exceção em caso de falha.

Além disso, monitore essas métricas em CloudWatch:

*Métrica de reinicialização:*
+ `numRestarts`: Monitore reinicializações inesperadas — a atualização será bem-sucedida se `numRestarts` for zero `uptime` e/ou `runningTime` estiver aumentando.

*Métricas do ponto de verificação:*
+ `lastCheckpointDuration`: devem ser semelhantes aos valores de pré-atualização
+ `numberOfFailedCheckpoints`: Deve permanecer em 0

## Fase 6: Validar o comportamento do aplicativo
<a name="upgrade-guide-phase-6"></a>

Depois que o aplicativo estiver sendo executado no Flink 2.2:

**Validação funcional**
+ Verifique se os dados estão sendo lidos das fontes
+ Verifique se os dados estão sendo gravados em coletores
+ Verifique se a lógica de negócios produz os resultados esperados
+ Compare a saída com a linha de base de pré-atualização

**Validação de desempenho**
+ Monitore as métricas de latência (tempo end-to-end de processamento)
+ Monitore as métricas de produtividade (registros por segundo)
+ Monitore a duração e o tamanho do ponto de verificação
+ Monitore a utilização da memória e da CPU

**Corra por mais de 24 horas**

Permita que o aplicativo seja executado por pelo menos 24 horas em produção para garantir:
+ Sem vazamentos de memória
+ Comportamento estável do ponto de verificação
+ Sem reinicializações inesperadas
+ Taxa de transferência consistente

## Fase 7: procedimentos de reversão
<a name="upgrade-guide-phase-7"></a>

Se a atualização falhar ou o aplicativo estiver em execução, mas não estiver íntegro, volte para a versão anterior.

**Reversão automática**

Se a reversão automática estiver ativada e a atualização falhar durante a inicialização, o Amazon Managed Service para Apache Flink reverte automaticamente para a versão anterior.

**Reversão manual**

Se o aplicativo estiver em execução, mas não estiver íntegro, use a `RollbackApplication` API:

*Console de gerenciamento da AWS:*

1. Navegue até seu aplicativo

1. Escolha **Ações** → **Reverter**

1. Confirme a reversão

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**O que acontece durante a reversão:**
+ O aplicativo é interrompido
+ O tempo de execução reverte para a versão anterior do Flink
+ O código do aplicativo é revertido para o JAR anterior
+ O aplicativo é reiniciado a partir do último snapshot bem-sucedido obtido **antes do upgrade**

**Importante**  
Você não pode restaurar um instantâneo do Flink 2.2 no Flink 1.x
A reversão usa o instantâneo obtido antes da atualização
Sempre tire um instantâneo antes da atualização (Fase 3)

## Próximas etapas
<a name="upgrade-guide-next-steps"></a>

Em caso de dúvidas ou problemas durante a atualização, consulte [Solucionar problemas do Managed Service for Apache Flink](troubleshooting.md) ou entre em contato com o AWS Support.

# Guia de compatibilidade estadual para atualizações do Flink 2.2
<a name="state-compatibility"></a>

Ao atualizar do Flink 1.x para o Flink 2.2, problemas de compatibilidade de estado podem impedir que seu aplicativo seja restaurado a partir de instantâneos. Este guia ajuda você a identificar possíveis problemas de compatibilidade e fornece estratégias de migração.

## Entendendo as mudanças de compatibilidade de estado
<a name="state-compat-understanding"></a>

O Amazon Managed Service para Apache Flink 2.2 introduz várias alterações de serialização que afetam a compatibilidade de estados. Os seguintes são os principais:
+ **Atualização da versão Kryo**: O Apache Flink 2.2 atualiza o serializador Kryo incluído da versão 2 para a versão 5. Como o Kryo v5 usa um formato de codificação binária diferente do Kryo v2, qualquer estado do operador serializado via Kryo em um ponto de salvamento Flink 1.x não pode ser restaurado no Flink 2.2.
+ **Serialização de coleções Java**: no Flink 1.x, as coleções Java (como `HashMap``ArrayList`, e`HashSet`) POJOs foram serializadas usando o Kryo. O Flink 2.2 introduz serializadores otimizados específicos da coleção que são incompatíveis com o estado serializado Kryo da versão 1.x. Aplicativos que usam coleções Java com serializadores POJO ou Kryo na versão 1.x não podem restaurar esse estado no Flink 2.2. Consulte a [documentação](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) do Flink para obter mais detalhes sobre tipos de dados e serialização.
+ **Compatibilidade do Kinesis Connector**: a versão do conector Kinesis Data Streams (KDS) inferior a 5.0 mantém um estado que não é compatível com o conector Kinesis Flink 2.2 versão 6.0. Você deve migrar para a versão 5.0 ou superior do conector antes da atualização.

## Referência de compatibilidade de serialização
<a name="state-compat-reference"></a>

Analise todas as declarações de estado em seu aplicativo e combine os tipos de serialização com a tabela abaixo. Se algum tipo de estado for incompatível, consulte a [Migração estadual](#state-compat-migration) seção antes de continuar com a atualização.


**Referência de compatibilidade de serialização**  

| Tipo de serialização | Compatível? | Detalhes | 
| --- | --- | --- | 
| abril (SpecificRecord,GenericRecord) | Sim | Usa seu próprio formato binário independente do Kryo. Certifique-se de usar as informações do tipo Avro nativo do Flink, não o Avro registrado como um serializador Kryo. | 
| Protobuf | Sim | Usa sua própria codificação binária independente do Kryo. Verifique se as alterações do esquema seguem as regras de evolução compatíveis com versões anteriores. | 
| POJOs sem coleções | Sim | Gerenciado pelo serializador POJO do Flink — mas somente se a classe atender a todos os critérios do POJO: classe pública, construtor público sem arg, todos os campos públicos ou acessíveis por meio de getters/setters e todos os tipos de campo próprios serializáveis pelo Flink. Um POJO que viole qualquer um desses itens volta silenciosamente para Kryo e se torna incompatível. | 
| Personalizado TypeSerializers | Sim | Compatível somente se seu serializador não delegar internamente ao Kryo. | 
| Estado da API do SQL e da tabela | Sim (com ressalva) | Usa os serializadores internos do Flink. No entanto, o Apache Flink não garante a compatibilidade de estado entre as versões principais dos aplicativos da API de tabela. Teste primeiro em um ambiente que não seja de produção. | 
| POJOs com coleções Java (HashMap,ArrayList,HashSet) | Não | No Flink 1.x, as coleções internas POJOs foram serializadas via Kryo v2. O Flink 2.2 introduz serializadores de coleção dedicados cujo formato binário é incompatível com o formato Kryo v2. | 
| Classes de casos Scala | Não | Serializado via Kryo no Flink 1.x. A atualização do Kryo v2 para v5 altera o formato binário. | 
| Registros Java | Não | Normalmente, retorne à serialização Kryo no Flink 1.x. Verifique testando comdisableGenericTypes(). | 
| Tipos de bibliotecas de terceiros | Não | Os tipos sem um serializador personalizado registrado retornam ao Kryo. A alteração do formato binário Kryo v2 para v5 quebra a compatibilidade. | 
| Qualquer tipo usando o Kryo fallback | Não | Se o Flink não conseguir lidar com um tipo com um serializador embutido ou registrado, ele retornará ao Kryo. Todo estado serializado do Kryo a partir de 1.x é incompatível com 2.2. | 

## Métodos diagnósticos
<a name="state-compat-diagnostics"></a>

Você pode identificar problemas de compatibilidade de estado de forma proativa examinando os registros do aplicativo ou inspecionando os registros após a operação da [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

**Identifique o fallback do Kryo em seu aplicativo**

Você pode usar o seguinte padrão de regex em seus registros para identificar o fallback do Kryo em seu aplicativo:

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

Log de amostra:

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

Se a atualização falhar usando a UpdateApplication API, as exceções a seguir podem indicar que você está enfrentando uma incompatibilidade de estado baseada em serializador:

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## Lista de verificação de pré-atualização
<a name="state-compat-checklist"></a>
+ Revise todas as declarações estaduais em sua inscrição
+ Verifique POJOs com coleções (`HashMap`,`ArrayList`,`HashSet`)
+ Verifique os métodos de serialização para cada tipo de estado
+ Crie um aplicativo de réplica de produção e teste a compatibilidade do estado usando a UpdateApplication API nesta réplica
+ Se o estado for incompatível, selecione uma estratégia em [Migração estadual](#state-compat-migration)
+ Ative a reversão automática na configuração do aplicativo Flink de produção

## Migração estadual
<a name="state-compat-migration"></a>

**Reconstruir o estado completo**

Ideal para aplicativos em que o estado pode ser reconstruído a partir dos dados de origem.

Se seu aplicativo puder reconstruir o estado a partir dos dados de origem:

1. Pare o aplicativo Flink 1.x

1. Atualize para o Flink 2.x com código atualizado

1. Comece com `SKIP_RESTORE_FROM_SNAPSHOT`

1. Permitir que o aplicativo reconstrua o estado

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## Práticas recomendadas
<a name="state-compat-best-practices"></a>

1. **Sempre use Avro ou Protobuf para estados complexos** — eles fornecem a evolução do esquema e são independentes do Kryo

1. **Evite coleções em POJOs** — Use o Flink nativo `ListState` e `MapState` em vez disso

1. **Teste a restauração do estado localmente** — antes da atualização da produção, teste com instantâneos reais

1. **Tire instantâneos com frequência** — especialmente antes das principais atualizações de versões

1. **Ativar reversão automática** — Configure seu aplicativo MSF para reverter automaticamente em caso de falha

1. **Documente seus tipos de estado** — mantenha a documentação de todos os tipos de estados e seus métodos de serialização

1. **Monitore os tamanhos dos pontos de verificação** — O aumento dos tamanhos dos pontos de verificação pode indicar problemas de serialização

## Próximas etapas
<a name="state-compat-next-steps"></a>

**Planeje sua atualização**: Veja[Atualizando para o Flink 2.2: guia completo](flink-2-2-upgrade-guide.md).

Em caso de dúvidas ou problemas durante a migração, consulte [Solucionar problemas do Managed Service for Apache Flink](troubleshooting.md) ou entre em contato com o AWS Support.

# Implemente a escalabilidade de aplicativos no Managed Service for Apache Flink
<a name="how-scaling"></a>

Você pode configurar a execução paralela de tarefas e a alocação de recursos para que o Amazon Managed Service for Apache Flink implemente a escalabilidade. Para obter mais informações sobre como o Apache Flink programa instâncias paralelas de tarefas, consulte [Execução paralela](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/) na Documentação do Apache Flink.

**Topics**
+ [Configurar o paralelismo de aplicativos e a KPU ParallelismPer](#how-parallelism)
+ [Aloque unidades de processamento do Kinesis](#how-scaling-kpus)
+ [Atualize o paralelismo do seu aplicativo](#how-scaling-howto)
+ [Use a escalabilidade automática no Managed Service for Apache Flink](how-scaling-auto.md)
+ [Considerações sobre o MaxParallelism](#how-scaling-auto-max-parallelism)

## Configurar o paralelismo de aplicativos e a KPU ParallelismPer
<a name="how-parallelism"></a>

Você configura a execução paralela das tarefas do aplicativo Managed Service for Apache Flink (como ler de uma fonte ou executar um operador) usando as seguintes [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html) propriedades: 
+ `Parallelism`: use esta propriedade para definir o paralelismo padrão do aplicativo Apache Flink. Todos os operadores, fontes e coletores são executados com esse paralelismo, a menos que sejam substituídos no código do aplicativo. O valor padrão é `1` e o máximo padrão é `256`.
+ `ParallelismPerKPU`: use esta propriedade para definir o número de tarefas em paralelo que podem ser programadas por unidade de processamento do Kinesis (Kinesis Processing Unit, KPU) do seu aplicativo. O padrão é `1` e o máximo é `8`. Para aplicativos que têm operações de bloqueio (por exemplo, E/S), um valor maior que `ParallelismPerKPU` leva à utilização total dos recursos da KPU.

**nota**  
O limite para `Parallelism` é igual às `ParallelismPerKPU` vezes o limite para KPUs (que tem um padrão de 64). O KPUs limite pode ser aumentado solicitando um aumento de limite. Para obter instruções sobre como solicitar um aumento de limite, consulte “Para solicitar um aumento de limite” em [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

Para obter informações sobre como definir o paralelismo de tarefas para um operador específico, consulte [ Configurar o paralelismo: operador](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level) na documentação do Apache Flink.

## Aloque unidades de processamento do Kinesis
<a name="how-scaling-kpus"></a>

O serviço gerenciado para Apache Flink provisiona a capacidade como. KPUs Uma única KPU oferece 1 vCPU e 4 GB de memória. Para cada KPU alocada, também são fornecidos 50 GB de armazenamento de aplicativos em execução. 

O Managed Service for Apache Flink calcula o KPUs que é necessário para executar seu aplicativo usando as `ParallelismPerKPU` propriedades `Parallelism` e, da seguinte forma:

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

O Managed Service for Apache Flink fornece rapidamente aos seus aplicativos recursos em resposta a picos no throughput ou na atividade de processamento. Ele remove recursos do seu aplicativo gradualmente após o pico de atividade ter passado. Para desativar a alocação automática de recursos, defina o valor `AutoScalingEnabled` como `false`, conforme descrito posteriormente em [Atualize o paralelismo do seu aplicativo](#how-scaling-howto). 

O limite padrão KPUs para seu aplicativo é 64. Para obter instruções sobre como solicitar um aumento desse limite, consulte “Para solicitar um aumento de limite” em [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

**nota**  
Uma KPU adicional é cobrada para fins de orquestração. Para obter mais informações, consulte [Preço do Managed Service for Apache Flink](https://aws.amazon.com/kinesis/data-analytics/pricing/).

## Atualize o paralelismo do seu aplicativo
<a name="how-scaling-howto"></a>

Esta seção contém exemplos de solicitações de ações de API que definem o paralelismo de um aplicativo. Para ver mais exemplos e instruções sobre como usar blocos de solicitação com ações de API, consulte [Exemplo de código de API para o Managed Service for Apache Flink](api-examples.md).

O exemplo a seguir de solicitação para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) define o paralelismo quando você está criando um aplicativo:

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

O exemplo de solicitação a seguir para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) define o paralelismo para um aplicativo existente:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

O exemplo de solicitação a seguir para a ação [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) desativa o paralelismo para um aplicativo existente:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# Use a escalabilidade automática no Managed Service for Apache Flink
<a name="how-scaling-auto"></a>

O Managed Service for Apache Flink dimensiona elasticamente o paralelismo de seu aplicativo para acomodar o throughput de dados de sua fonte e a complexidade de seu operador na maioria dos cenários. O ajuste de escala automático está habilitado por padrão. O Managed Service for Apache Flink monitora o uso de recursos (CPU) do seu aplicativo e aumenta ou diminui elasticamente o paralelismo do seu aplicativo de acordo com:
+ Seu aplicativo aumenta (aumenta o paralelismo) se o máximo da CloudWatch métrica `containerCPUUtilization` for maior que 75 por cento ou mais por 15 minutos. Isso significa que a ação `ScaleUp` é iniciada quando há 15 pontos de dados consecutivos com um período de 1 minuto igual ou superior a 75%. Uma ação `ScaleUp` dobra o `CurrentParallelism` do seu aplicativo. O `ParallelismPerKPU` não é modificado. Como consequência, o número de alocados KPUs também dobra. 
+ Seu aplicativo reduz a escala verticalmente (diminui o paralelismo) quando o uso da CPU permanece abaixo de 10% por seis horas. Isso significa que a ação `ScaleDown` é iniciada quando há 360 pontos de dados consecutivos com um período de 1 minuto inferior a 10%. Uma `ScaleDown` ação divide pela metade (arredondando para cima) o paralelismo do aplicativo. `ParallelismPerKPU`não é modificado, e o número de alocados KPUs também é reduzido pela metade (arredondado para cima). 

**nota**  
Um período máximo de `containerCPUUtilization` acima de 1 minuto pode ser referenciado para encontrar a correlação com um ponto de dados usado para a ação de escalabilidade, mas não é necessário refletir o momento exato em que a ação é iniciada.

O Managed Service for Apache Flink não reduzirá o valor `CurrentParallelism` do seu aplicativo para menos do que a configuração `Parallelism` do seu aplicativo.

Quando o serviço do Managed Service for Apache Flink estiver escalando seu aplicativo, ele estará no status `AUTOSCALING`. Você pode verificar o status atual da sua inscrição usando as [ ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)ações [ DescribeApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html)ou. Enquanto o serviço está escalando seu aplicativo, a única ação de API válida que você pode usar é [ StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)com o `Force` parâmetro definido como. `true`

Você pode usar a propriedade `AutoScalingEnabled` (parte de [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html)) para ativar ou desativar o comportamento de ajuste de escala automático. Sua AWS conta é cobrada pelas KPUs provisões do Managed Service for Apache Flink, que é uma função do seu aplicativo `parallelism` e `parallelismPerKPU` das configurações. Um pico de atividade aumenta os custos do Managed Service for Apache Flink.

Para obter mais informações sobre preços, consulte [Preço do Amazon Managed Service for Apache Flink](https://aws.amazon.com/kinesis/data-analytics/pricing/). 

Observe o seguinte sobre escalonamento de aplicativo:
+ O ajuste de escala automático está habilitado por padrão.
+ O escalonamento não se aplica aos blocos de anotações do Studio. No entanto, se você implantar um bloco de anotações do Studio como um aplicativo de estado durável, o escalonamento será aplicado ao aplicativo implantado.
+ Seu aplicativo tem um limite padrão de 64 KPUs. Para obter mais informações, consulte [Quota do notebook do Managed Service for Apache Flink e Studio](limits.md).
+ Quando o ajuste automático de escala automático atualiza o paralelismo do aplicativo, o aplicativo passa por um tempo de inatividade. Para evitar esse tempo de inatividade, faça o seguinte:
  + Desabilitar o ajuste de escala automático
  + Configure seu aplicativo `parallelism` e `parallelismPerKPU` com a [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)ação. Para obter mais informações sobre como definir as configurações de paralelismo do aplicativo, consulte [Atualize o paralelismo do seu aplicativo](how-scaling.md#how-scaling-howto).
  + Monitore periodicamente o uso de recursos do seu aplicativo para verificar se ele tem as configurações de paralelismo corretas para seu workload. Para obter informações sobre monitoramento de alocação de recursos, consulte [Métricas e dimensões no Managed Service for Apache Flink](metrics-dimensions.md).

## Implemente a escalabilidade automática personalizada
<a name="how-scaling-custom-autoscaling"></a>

Se você quiser um controle mais refinado sobre a escalabilidade automática ou usar outras métricas diferentes de `containerCPUUtilization`, você pode usar este exemplo: 
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  Esses exemplos ilustram como escalar seu aplicativo Managed Service for Apache Flink usando uma CloudWatch métrica diferente da aplicação Apache Flink, incluindo métricas do Amazon MSK e do Amazon Kinesis Data Streams, usadas como fontes ou coletores.

Para obter informações adicionais, consulte [Monitoramento aprimorado e escalabilidade automática para o Apache Flink](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/).

## Implemente a escalabilidade automática programada
<a name="how-scaling-scheduled-autoscaling"></a>

Se sua workload seguir um perfil previsível ao longo do tempo, talvez você prefira escalar seu aplicativo Apache Flink preventivamente. Isso escala seu aplicativo em um horário programado, em vez de escalar reativamente com base em uma métrica. Para configurar a escalabilidade vertical e horizontal em horários fixos do dia, você pode usar este exemplo:
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## Considerações sobre o MaxParallelism
<a name="how-scaling-auto-max-parallelism"></a>

O paralelismo máximo que uma tarefa do Flink pode escalar é limitado pelo `maxParallelism` *mínimo* em todos os operadores da tarefa. Por exemplo, se você tiver um trabalho simples com apenas uma origem e um coletor, e a origem tiver um `maxParallelism` de 16 e coletor tiver 8, o aplicativo não poderá escalar além do paralelismo de 8.

Para saber como o `maxParallelism` padrão de um operador é calculado e como substituir o padrão, consulte [Como definir o paralelismo máximo](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism) na documentação do Apache Flink.

Como regra básica, lembre-se de que, se você não definir `maxParallelism` para nenhum operador e iniciar seu aplicativo com um paralelismo menor ou igual a 128, todos os operadores terão um `maxParallelism` de 128.

**nota**  
O paralelismo máximo do trabalho é o limite superior do paralelismo para escalar seu aplicativo mantendo o estado.   
Se você modificar o `maxParallelism` de um aplicativo existente, o aplicativo não poderá ser reiniciado a partir de um snapshot anterior tirado com o `maxParallelism` antigo. Você só pode reiniciar o aplicativo sem um snapshot.   
Se planeja escalar seu aplicativo para um paralelismo maior que 128, você deve definir explicitamente o `maxParallelism` em seu aplicativo.
+ A lógica de escalabilidade automática evitará a escalar uma tarefa do Flink para um paralelismo que excederá o paralelismo máximo da tarefa.
+ Se você usar uma escalabilidade automática personalizada ou escalabilidade programada, configure-as para que não excedam o paralelismo máximo do trabalho.
+ Se você escalar manualmente seu aplicativo além do paralelismo máximo, o aplicativo falhará ao iniciar.

# Adicionar tags nos aplicativos do Managed Service for Apache Flink
<a name="how-tagging"></a>



Esta seção descreve como adicionar tags de metadados de valor-chave para aplicativos do Managed Service for Apache Flink. Essas tags podem ser usadas para as seguintes finalidades:
+ Determinar o faturamento para um aplicativo individual do Managed Service for Apache Flink. Para obter mais informações, consulte [Como usar tags de alocação](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html) no *Guia de Gerenciamento de custos e faturamento*.
+ Controlar o acesso a recursos de aplicativos com base em tags. Para obter mais informações, consulte [Controlar o acesso usando tags](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html) no *Guia do usuário do AWS Identity and Access Management *.
+ Finalidades definidas pelo usuário. Você pode definir a funcionalidade do aplicativo com base na presença de tags do usuário.

Observe as seguintes informações sobre a marcação:
+ O número máximo de tags do aplicativo inclui tags de sistema. O número máximo de tags do aplicativo definidas pelo usuário é de 50.
+ Se uma ação inclui uma lista de tags que tem valores `Key` duplicados, o serviço lançará um `InvalidArgumentException`.

**Topics**
+ [Adicionar tags quando um aplicativo é criado](how-tagging-create.md)
+ [Adicionar ou atualizar tags para um aplicativo existente](how-tagging-add.md)
+ [Listar tags para um aplicativo](how-tagging-list.md)
+ [Remover tags de um aplicativo](how-tagging-remove.md)

# Adicionar tags quando um aplicativo é criado
<a name="how-tagging-create"></a>

Você adiciona tags ao criar um aplicativo usando o `tags` parâmetro da [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)ação.

O exemplo de solicitação a seguir mostra o nó `Tags` para uma solicitação `CreateApplication`:

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# Adicionar ou atualizar tags para um aplicativo existente
<a name="how-tagging-add"></a>

Você adiciona tags a um aplicativo usando a [TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html)ação. Você não pode adicionar tags a um aplicativo usando a [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)ação.

Para atualizar uma tag existente, adicione uma tag com a mesma chave da tag existente.

O exemplo de solicitação a seguir para a ação `TagResource` adiciona novas tags ou atualiza tags existentes:

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# Listar tags para um aplicativo
<a name="how-tagging-list"></a>

Para listar as tags existentes, você usa a [ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html)ação.

O exemplo de solicitação a seguir para a ação `ListTagsForResource` lista as tags de um aplicativo:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# Remover tags de um aplicativo
<a name="how-tagging-remove"></a>

Para remover tags de um aplicativo, você usa a [UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html)ação.

O exemplo de solicitação a seguir para a ação `UntagResource` remove tags de um aplicativo:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# Use CloudFormation com o serviço gerenciado para Apache Flink
<a name="lambda-cfn-flink"></a>

O exercício a seguir mostra como iniciar um aplicativo Flink criado com o CloudFormation uso de uma função Lambda na mesma pilha. 

## Antes de começar
<a name="before-you-begin"></a>

Antes de começar este exercício, siga as etapas para criar um aplicativo Flink usando CloudFormation at [AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html).

## Grave uma função do Lambda
<a name="write-lambda-function"></a>

Para iniciar um aplicativo Flink após a criação ou atualização, usamos a API kinesisanalyticsv2 [start-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html). A chamada será acionada por um CloudFormation evento após a criação do aplicativo Flink. Discutiremos como configurar a pilha para acionar a função do Lambda posteriormente neste exercício, mas primeiro nos concentraremos na declaração da função do Lambda e em seu código. Usamos o runtime `Python3.8` neste exemplo. 

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

No código anterior, o Lambda processa os eventos CloudFormation recebidos, filtra tudo `Create` além disso `Update` e obtém o estado do aplicativo e o inicia se o estado for. `READY` Para obter o estado do aplicativo, você precisa criar a função do Lambda, conforme mostrado a seguir.

## Crie uma função do Lambda
<a name="create-lambda-role"></a>

Você cria uma função para que o Lambda “converse” com sucesso com o aplicativo e grave logs. Essa função usa políticas gerenciadas padrão, mas talvez você queira restringi-la usando políticas personalizadas.

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

Observe que os recursos do Lambda serão criados após a criação do aplicativo Flink na mesma pilha porque dependem dele.

## Invocar a função do Lambda
<a name="invoking-lambda-function"></a>

Agora, tudo o que resta é invocar a função do Lambda. Você pode fazer isso usando um [recurso personalizado](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html).

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

Isso é tudo o que você precisa para iniciar seu aplicativo Flink usando o Lambda. Agora você está pronto para criar sua própria pilha ou usar o exemplo completo abaixo para ver como todas essas etapas funcionam na prática.

## Analisar um exemplo estendido
<a name="lambda-cfn-flink-full-example"></a>

O exemplo a seguir é uma versão ligeiramente estendida das etapas anteriores com um ajuste `RunConfiguration` adicional feito por meio dos [parâmetros do modelo](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html). Esta é uma pilha funcional para você experimentar. Não deixe de ler as notas anexas: 

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

Novamente, talvez você queira ajustar as funções do Lambda e do próprio aplicativo.

Antes de criar a pilha acima, não se esqueça de especificar seus parâmetros.

parâmetros.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

Substitua `YOUR_BUCKET_ARN` e `YOUR_JAR` pelos seus requisitos específicos. Você pode seguir este [guia](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html) para criar um bucket do Amazon S3 e um jar de aplicativos.

Agora crie a pilha (substitua YOUR\$1REGION por uma região de sua escolha, por exemplo, us-east-1):

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

Agora você pode navegar até [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation) e ver o progresso. Depois de criado, você deverá ver seu aplicativo Flink no estado `Starting`. Pode demorar alguns minutos até que ele comece a `Running`. 

Para saber mais, consulte:
+ [Quatro maneiras de recuperar qualquer propriedade AWS de serviço usando AWS CloudFormation (Parte 1 de 3)](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/).
+ [Passo a passo: Pesquisando a Amazon Machine Image](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html). IDs

# Use o painel do Apache Flink com o Managed Service for Apache Flink
<a name="how-dashboard"></a>

Você pode usar o painel do Apache Flink do seu aplicativo para monitorar seu serviço gerenciado quanto à integridade do aplicativo Apache Flink. O painel do seu aplicativo mostra as seguintes informações:
+ Recursos em uso, incluindo gerenciadores de tarefas e slots de tarefas. 
+ Informações sobre trabalhos, incluindo aqueles que estão em execução, concluídos, cancelados e com falha. 

Para obter informações sobre gerenciadores de tarefas, slots de tarefas e trabalhos do Apache Flink, consulte [Arquitetura do Apache Flink](https://flink.apache.org/what-is-flink/flink-architecture/) no site do Apache Flink. 

Observe o seguinte sobre o uso do painel do Apache Flink com os aplicativos do Managed Service para Apache Flink:
+ O painel do Apache Flink para aplicativos do Managed Service for Apache Flink é somente para leitura. Você não pode alterar o seu aplicativo do Managed Service for Apache Flink usando o painel do Apache Flink.
+ O painel do Apache Flink não é compatível com o Microsoft Internet Explorer.

## Acesse o painel do Apache Flink do seu aplicativo
<a name="how-dashboard-accessing"></a>

Você pode acessar o painel do Apache Flink do seu aplicativo por meio do console do Managed Service for Apache Flink ou solicitando um endpoint de URL seguro usando a CLI.

### Acesse o painel do Apache Flink do seu aplicativo usando o console do Managed Service for Apache Flink
<a name="how-dashboard-accessing-console"></a>

Para acessar o painel do Apache Flink do seu aplicativo a partir do console, escolha **Painel do Apache Flink** na página do seu aplicativo.

**nota**  
Quando você abre o painel do console do Managed Service for Apache Flink, a URL gerada pelo console será válida por 12 horas.

### Acesse o painel do Apache Flink do seu aplicativo usando a CLI do Managed Service for Apache Flink
<a name="how-dashboard-accessing-cli"></a>

Você pode usar a CLI do Managed Service for Apache Flink para gerar uma URL para acessar o painel do seu aplicativo. A URL que você gera é válida por um período especificado.

**nota**  
Se você não acessar a URL gerada em três minutos, ela não será mais válida.

Você gera o URL do seu painel usando a [ CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html)ação. Você especifica os seguintes parâmetros para a ação: 
+ O nome do aplicativo
+ O tempo em segundos em que a URL será válida
+ Você especifica `FLINK_DASHBOARD_URL` como o tipo de URL.