

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Exemplos e tutoriais para notebooks do Studio no Managed Service for Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [Tutorial: crie um notebook do Studio no Managed Service for Apache Flink](example-notebook.md)
+ [Tutorial: implante um notebook do Studio como um aplicativo Managed Service for Apache Flink com estado durável](example-notebook-deploy.md)
+ [Veja exemplos de consultas para analisar dados em um notebook do Studio](how-zeppelin-sql-examples.md)

# Tutorial: crie um notebook do Studio no Managed Service for Apache Flink
<a name="example-notebook"></a>

O tutorial a seguir demonstra como criar um notebook do Studio que lê dados de um fluxo de dados do Kinesis ou de um cluster Amazon MSK.

**Topics**
+ [Concluir os pré-requisitos do .](#example-notebook-setup)
+ [Crie um AWS Glue banco de dados](#example-notebook-glue)
+ [Próximas etapas: crie um notebook do Studio com o Kinesis Data Streams ou o Amazon MSK](#examples-notebook-nextsteps)
+ [Criar um bloco de anotações do Studio com o Kinesis Data Streams](example-notebook-streams.md)
+ [Crie um bloco de anotações do Studio com o Amazon MSK](example-notebook-msk.md)
+ [Limpe seu aplicativo e os recursos dependentes](example-notebook-cleanup.md)

## Concluir os pré-requisitos do .
<a name="example-notebook-setup"></a>

Certifique-se de que sua versão AWS CLI seja 2 ou posterior. Para instalar a versão mais recente AWS CLI, consulte [Instalação, atualização e desinstalação da AWS CLI versão 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html).

## Crie um AWS Glue banco de dados
<a name="example-notebook-glue"></a>

Seu bloco de anotações do Studio usa um banco de dados [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) para metadados sobre sua fonte de dados Amazon MSK.

**Criar um AWS Glue banco de dados**

1. Abra o AWS Glue console em [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Selecione **Adicionar banco de dados**. Na janela **Adicionar banco de dados**, insira **default** para **Nome do banco de dados**. Escolha **Criar**. 

## Próximas etapas: crie um notebook do Studio com o Kinesis Data Streams ou o Amazon MSK
<a name="examples-notebook-nextsteps"></a>

Com este tutorial, você pode criar um bloco de anotações Studio que usa o Kinesis Data Streams ou o Amazon MSK:
+ [Criar um bloco de anotações do Studio com o Kinesis Data Streams](example-notebook-streams.md): com o Kinesis Data Streams, você cria rapidamente um aplicativo que usa um fluxo de dados do Kinesis como uma fonte. Você só precisa criar um fluxo de dados do Kinesis como um recurso dependente.
+ [Crie um bloco de anotações do Studio com o Amazon MSK](example-notebook-msk.md): Com o Amazon MSK, você cria um aplicativo que usa um cluster do Amazon MSK como fonte. Você precisa criar uma Amazon VPC, uma instância cliente do Amazon EC2 e um cluster do Amazon MSK como recursos dependentes.

# Criar um bloco de anotações do Studio com o Kinesis Data Streams
<a name="example-notebook-streams"></a>

Este tutorial descreve como criar um bloco de anotações do Studio que usa um fluxo de dados do Kinesis como uma fonte.

**Topics**
+ [Concluir os pré-requisitos do .](#example-notebook-streams-setup)
+ [Crie uma AWS Glue tabela](#example-notebook-streams-glue)
+ [Criar um bloco de anotações do Studio com o Kinesis Data Streams](#example-notebook-streams-create)
+ [Envie dados para o Kinesis Data Streams](#example-notebook-streams-send)
+ [Teste seu bloco de anotações do Studio](#example-notebook-streams-test)

## Concluir os pré-requisitos do .
<a name="example-notebook-streams-setup"></a>

Antes de criar um bloco de anotações do Studio, crie um fluxo de dados do Kinesis (`ExampleInputStream`). Seu aplicativo usa esse fluxo para a fonte do aplicativo.

Você pode criar esses fluxos usando o console do Amazon Kinesis ou este comando AWS CLI . Para obter instruções sobre o console, consulte [Criar e atualizar fluxos de dados](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*. Nomeie o fluxo **ExampleInputStream** e defina o **Número de fragmentos abertos** como **1**.

Para criar o stream (`ExampleInputStream`) usando o AWS CLI, use o seguinte comando do Amazon Kinesis `create-stream` AWS CLI .

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Crie uma AWS Glue tabela
<a name="example-notebook-streams-glue"></a>

Seu bloco de anotações do Studio usa um [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)banco de dados para metadados sobre sua fonte de dados do Kinesis Data Streams.

**nota**  
Você pode criar o banco de dados manualmente primeiro ou deixar que o Managed Service for Apache Flink o crie para você ao criar o bloco de anotações. Da mesma forma, você pode criar manualmente a tabela conforme descrito nesta seção ou usar o código do conector de criação de tabela para o Managed Service for Apache Flink em seu bloco de anotações no Apache Zeppelin para criar sua tabela por meio de uma instrução DDL. Em seguida, você pode fazer AWS Glue o check-in para verificar se a tabela foi criada corretamente.

**Criar uma tabela**

1. Faça login no Console de gerenciamento da AWS e abra o AWS Glue console em [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Se você ainda não tiver um AWS Glue banco de dados, escolha **Bancos de dados** na barra de navegação à esquerda. Selecione **Adicionar banco de dados**. Na janela **Adicionar banco de dados**, insira **default** para **Nome do banco de dados**. Selecione **Create** (Criar).

1. Na barra de navegação à esquerda, selecione **Tabelas**. Na página **Tabelas**, selecione **Adicionar tabelas**, **Adicionar tabela manualmente**.

1. Na página **Configurar propriedades da tabela**, insira **stock** para o **Nome da tabela**. Certifique-se de selecionar o banco de dados que você criou anteriormente. Selecione **Next** (Próximo).

1. Na página **Adicionar um armazenamento de dados**, selecione **Kinesis**. Para **Nome do fluxo**, insira **ExampleInputStream**. Para **URL de origem do Kinesis**, selecione inserir **https://kinesis.us-east-1.amazonaws.com**. Se você copiar e colar o **URL de origem do Kinesis**, certifique-se de excluir todos os espaços à esquerda ou à direita. Selecione **Next** (Próximo).

1. Na página **Classificação**, selecione **JSON**. Selecione **Next** (Próximo).

1. Na página **Definir um esquema**, selecione Adicionar coluna para adicionar uma coluna. Adicione colunas com as seguintes propriedades:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/managed-flink/latest/java/example-notebook-streams.html)

   Escolha **Próximo**.

1. Na próxima página, verifique suas configurações e selecione **Concluir**.

1. Selecione a tabela recém-criada na lista de tabelas.

1. Selecione **Editar tabela** e adicione uma propriedade com a chave `managed-flink.proctime` e o valor `proctime`.

1. Selecione **Apply** (Aplicar).

## Criar um bloco de anotações do Studio com o Kinesis Data Streams
<a name="example-notebook-streams-create"></a>

Agora que você criou os recursos que o seu aplicativo usa, crie seu bloco de anotações do Studio. 

**Topics**
+ [Crie um notebook Studio usando o Console de gerenciamento da AWS](#example-notebook-create-streams-console)
+ [Crie um notebook Studio usando o AWS CLI](#example-notebook-msk-create-api)

### Crie um notebook Studio usando o Console de gerenciamento da AWS
<a name="example-notebook-create-streams-console"></a>

1. [Abra o serviço gerenciado para o console Apache Flink em casa https://console.aws.amazon.com/managed-flink/? region=us-east-1\$1/aplicativos/painel](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard). 

1. Na página **Aplicativos do Managed Service for Apache Flink**, selecione a guia **Studio**. Selecione **Criar bloco de anotações do Studio**.
**nota**  
Você também pode criar um bloco de anotações do Studio a partir dos consoles Amazon MSK ou Kinesis Data Streams, selecionando seu cluster Amazon MSK de entrada ou Kinesis Data Streams e selecionando **Processar dados em tempo real**.

1. Na página **Criar bloco de anotações do Studio**, forneça as seguintes informações:
   + Digite **MyNotebook** como nome do bloco de anotações.
   + Selecione o **padrão** para o **banco de dados AWS Glue**.

   Selecione **Criar bloco de anotações do Studio**.

1. Na **MyNotebook**página, escolha **Executar**. Aguarde até que o **Status** mostre **Em execução**. As cobranças se aplicam quando o bloco de anotações está em execução.

### Crie um notebook Studio usando o AWS CLI
<a name="example-notebook-msk-create-api"></a>

Para criar seu notebook Studio usando o AWS CLI, faça o seguinte:

1. Verifique o seu ID da conta. Você precisa desse valor para criar seu aplicativo.

1. Crie a função `arn:aws:iam::AccountID:role/ZeppelinRole` e adicione as seguintes permissões à função criada automaticamente pelo console.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Crie um arquivo chamado `create.json` com o conteúdo a seguir. Substitua os valores de espaço reservado por suas próprias informações.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Para criar o seu aplicativo, execute o comando a seguir:

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Quando o comando for concluído, você verá uma saída que mostra os detalhes do seu novo bloco de anotações do Studio. A seguir, veja um exemplo de saída.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Execute o comando a seguir para iniciar o aplicativo. Substitua o valor do exemplo pelo ID de sua conta.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Envie dados para o Kinesis Data Streams
<a name="example-notebook-streams-send"></a>

Para enviar dados de teste para seu Kinesis Data Streams, faça o seguinte:

1. Use o [Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Escolha **Criar um usuário Cognito com**. CloudFormation

1. O CloudFormation console é aberto com o modelo do Kinesis Data Generator. Escolha **Próximo**.

1. Na página **Especificar os detalhes da pilha**, insira o seu nome de usuário e a senha para seu usuário do Cognito. Selecione **Next** (Próximo).

1. Na página **Configurar opções de pilha**, selecione **Próximo**.

1. Na página **Review Kinesis-Data-Generator-Cognito -User**, escolha a opção **Eu reconheço que AWS CloudFormation pode criar recursos do IAM**. caixa de seleção. Selecione **Create Stack** (Criar pilha).

1. Aguarde até que a CloudFormation pilha termine de ser criada. **Depois que a pilha estiver concluída, abra a pilha **Kinesis-Data-Generator-Cognito-User** no console e escolha a guia Saídas. CloudFormation ** Abra o URL listado para o valor **KinesisDataGeneratorUrl**de saída.

1. Na página do **Amazon Kinesis Data Generator**, faça login com as credenciais que você criou na etapa 4.

1. Na página seguinte, forneça os valores a seguir:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/managed-flink/latest/java/example-notebook-streams.html)

   Para **Modelo de registro**, cole o seguinte código:

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Selecione **Enviar dados**.

1. O gerador enviará dados para o seu Kinesis Data Streams. 

   Deixe o gerador executando enquanto você conclui a próxima seção.

## Teste seu bloco de anotações do Studio
<a name="example-notebook-streams-test"></a>

Nesta seção, você usa seu bloco de anotações do Studio para consultar dados do seu Kinesis Data Streams.

1. [Abra o serviço gerenciado para o console Apache Flink em casa https://console.aws.amazon.com/managed-flink/? region=us-east-1\$1/aplicativos/painel](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Na página **Aplicativos do Managed Service for Apache Flink**, selecione a guia **bloco de anotações do Studio**. Selecione **MyNotebook**.

1. Na **MyNotebook**página, escolha **Abrir no Apache Zeppelin**.

   A interface do Apache Zeppelin é aberta em uma nova guia.

1. Na página **Bem-vindo ao Zeppelin\$1**, selecione **Anotação do Zeppelin**.

1. Na página **Anotação do Zeppelin**, insira a seguinte consulta em uma nova anotação:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Selecione o ícone de execução.

   Depois de um curto período de tempo, a anotação exibe dados do Kinesis Data Streams.

Para abrir o painel do Apache Flink para que seu aplicativo visualize aspectos operacionais, selecione **TRABALHO FLINK**. Para obter mais informações sobre o painel do Flink, consulte o [painel do Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) no [Guia do desenvolvedor do Managed Service for Apache Flink](https://docs.aws.amazon.com/).

Para obter mais exemplos de consultas SQL do Flink Streaming, veja [Consultas](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) na [documentação do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Crie um bloco de anotações do Studio com o Amazon MSK
<a name="example-notebook-msk"></a>

Este tutorial descreve como criar um bloco de anotações do Studio que usa um cluster do Amazon MSK como fonte.

**Topics**
+ [Configurar um cluster Amazon MSK](#example-notebook-msk-setup)
+ [Adicione um gateway NAT à sua VPC](#example-notebook-msk-nat)
+ [Crie uma AWS Glue conexão e uma tabela](#example-notebook-msk-glue)
+ [Crie um bloco de anotações do Studio com o Amazon MSK](#example-notebook-msk-create)
+ [Envie dados para seu cluster Amazon MSK](#example-notebook-msk-send)
+ [Teste seu bloco de anotações do Studio](#example-notebook-msk-test)

## Configurar um cluster Amazon MSK
<a name="example-notebook-msk-setup"></a>

Para este tutorial, você precisa de um cluster do Amazon MSK que permita o acesso em texto sem formatação. Se você ainda não tiver um cluster Amazon MSK configurado, siga o tutorial [Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) para criar uma Amazon VPC, um cluster Amazon MSK, um tópico e uma instância cliente da Amazon. EC2 

Ao seguir o tutorial, faça o seguinte:
+ Na [Etapa 3: Criar um cluster Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), na etapa 4, altere o valor `ClientBroker` de `TLS` para **PLAINTEXT**.

## Adicione um gateway NAT à sua VPC
<a name="example-notebook-msk-nat"></a>

Se você criou um cluster Amazon MSK seguindo o tutorial [Conceitos básicos do uso do Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html), ou se sua Amazon VPC existente ainda não tem um gateway NAT para suas sub-redes privadas, você deve adicionar um gateway NAT à sua Amazon VPC. O diagrama a seguir mostra a arquitetura. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/pt_br/managed-flink/latest/java/images/vpc_05.png)


Para criar um gateway NAT para sua Amazon VPC, faça o seguinte:

1. Abra o console da Amazon VPC em [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Selecione **Gateways NAT** na barra de navegação à esquerda.

1. Na página **Gateways NATs**, escolha **Criar gateway NAT**.

1. Na página **Criar gateway NAT**, entre os valores a seguir:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/managed-flink/latest/java/example-notebook-msk.html)

   Escolha **Criar um gateway NAT**.

1. Na barra de navegação à esquerda, escolha **Tabelas de rotas**.

1. Escolha **Criar tabela de rotas**.

1. Na página **Criar tabela de rotas**, forneça as seguintes informações:
   + **Nome da tag**: **ZeppelinRouteTable**
   + **VPC****: escolha sua VPC (por exemplo, VPC).AWS KafkaTutorial**

   Escolha **Criar**.

1. Na lista de tabelas de rotas, escolha **ZeppelinRouteTable**. Escolha a guia **Rotas** e selecione **Editar rotas**.

1. Na página **Editar rotas**, selecione **Adicionar rota**.

1. Em ****Para **Destino**, insira **0.0.0.0/0**. Para **Target**, escolha **NAT Gateway**, **ZeppelinGateway**. Selecione **Salvar rotas**. Escolha **Fechar**.

1. Na página Tabelas de rotas, com a **ZeppelinRouteTable**opção selecionada, escolha a guia **Associações de sub-rede**. Selecione **Editar associações de sub-rede**.

1. Na página **Editar associações de sub-rede**, escolha **AWS KafkaTutorialSubnet2** e **AWS KafkaTutorialSubnet3**. Escolha **Salvar**.

## Crie uma AWS Glue conexão e uma tabela
<a name="example-notebook-msk-glue"></a>

Seu bloco de anotações do Studio usa um banco de dados [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) para metadados sobre sua fonte de dados Amazon MSK. Nesta seção, você cria uma AWS Glue conexão que descreve como acessar seu cluster Amazon MSK e uma AWS Glue tabela que descreve como apresentar os dados em sua fonte de dados para clientes como seu notebook Studio. 

**Criar uma conexão**

1. Faça login no Console de gerenciamento da AWS e abra o AWS Glue console em [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Se você ainda não tiver um AWS Glue banco de dados, escolha **Bancos de dados** na barra de navegação à esquerda. Selecione **Adicionar banco de dados**. Na janela **Adicionar banco de dados**, insira **default** para **Nome do banco de dados**. Escolha **Criar**.

1. Selecione **Conexões** na barra de navegação à esquerda. Selecione **Adicionar conexão**.

1. Na janela **Adicionar conexão**, forneça os seguintes valores:
   + Em **Nome da conexão**, insira **ZeppelinConnection**.
   + Em **Tipo de conexão**, escolha **Kafka**.
   + Para o **servidor bootstrap Kafka URLs**, forneça a string do broker bootstrap para seu cluster. Você pode obter os corretores de bootstrap no console do MSK ou digitando o seguinte comando da CLI:

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Desmarque a caixa de seleção **Exigir conexão SSL**.

   Escolha **Próximo**.

1. Na página **VPC**, forneça os valores a seguir:
   + **Para **VPC**, escolha o nome da sua VPC (por exemplo, VPC). AWS KafkaTutorial**
   + Em **Sub-rede**, escolha **AWS KafkaTutorialSubnet2**.
   + Em **Grupos de segurança**, escolha todos os grupos disponíveis.

   Escolha **Próximo**.

1. Na página **Propriedades da conexão** / **Acesso à conexão**, selecione **Concluir**.

**Criar uma tabela**
**nota**  
Você pode criar manualmente a tabela conforme descrito nas etapas a seguir ou usar o código do conector de criação de tabela para o Managed Service for Apache Flink em seu bloco de anotações no Apache Zeppelin para criar sua tabela por meio de uma instrução DDL. Em seguida, você pode fazer AWS Glue o check-in para verificar se a tabela foi criada corretamente.

1. Na barra de navegação à esquerda, selecione **Tabelas**. Na página **Tabelas**, selecione **Adicionar tabelas**, **Adicionar tabela manualmente**.

1. Na página **Configurar propriedades da tabela**, insira **stock** para o **Nome da tabela**. Certifique-se de selecionar o banco de dados que você criou anteriormente. Escolha **Próximo**.

1. Na página **Adicionar um armazenamento de dados**, selecione **Kafka**. Para o **nome do tópico**, insira o nome do tópico (por exemplo **AWS KafkaTutorialTopic**). Para **Conexão**, escolha **ZeppelinConnection**.

1. Na página **Classificação**, selecione **JSON**. Selecione **Next** (Próximo).

1. Na página **Definir um esquema**, selecione Adicionar coluna para adicionar uma coluna. Adicione colunas com as seguintes propriedades:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/pt_br/managed-flink/latest/java/example-notebook-msk.html)

   Escolha **Próximo**.

1. Na próxima página, verifique suas configurações e selecione **Concluir**.

1. Selecione a tabela recém-criada na lista de tabelas.

1. Escolha **Editar tabela** e adicione as seguintes propriedades:
   + chave: `managed-flink.proctime`, valor: `proctime`
   + chave: `flink.properties.group.id`, valor: `test-consumer-group`
   + chave: `flink.properties.auto.offset.reset`, valor: `latest`
   + chave: `classification`, valor: `json`

   Sem esses pares de chave/valor, o notebook Flink apresenta um erro. 

1. Selecione **Apply** (Aplicar).

## Crie um bloco de anotações do Studio com o Amazon MSK
<a name="example-notebook-msk-create"></a>

Agora que você criou os recursos que o seu aplicativo usa, crie seu bloco de anotações do Studio. 

**Topics**
+ [Crie um notebook Studio usando o Console de gerenciamento da AWS](#example-notebook-create-msk-console)
+ [Crie um notebook Studio usando o AWS CLI](#example-notebook-msk-create-api)

**nota**  
Você também pode criar um bloco de anotações do Studio a partir do console Amazon MSK escolhendo um cluster existente e, em seguida, escolhendo **Processar dados em tempo real**.

### Crie um notebook Studio usando o Console de gerenciamento da AWS
<a name="example-notebook-create-msk-console"></a>

1. [Abra o serviço gerenciado para o console Apache Flink em casa https://console.aws.amazon.com/managed-flink/? region=us-east-1\$1/aplicativos/painel](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Na página **Aplicativos do Managed Service for Apache Flink**, selecione a guia **Studio**. Selecione **Criar bloco de anotações do Studio**.
**nota**  
Para criar um bloco de anotações do Studio a partir dos consoles Amazon MSK ou Kinesis Data Streams, selecione seu cluster Amazon MSK de entrada ou fluxo de dados do Kinesis e escolha **Processar dados em tempo real.**

1. Na página **‬Criar bloco de anotações do Studio**, forneça as seguintes informações:
   + Insira **MyNotebook** para **Nome do bloco de anotações do Studio**.
   + Selecione o **padrão** para o **banco de dados AWS Glue**.

   Selecione **Criar bloco de anotações do Studio**.

1. Na **MyNotebook**página, escolha a guia **Configuração**. Na seção **Redes**, selecione **Editar**.

1. Na MyNotebook página **Editar rede para**, escolha a **configuração de VPC com base no cluster Amazon MSK**. Selecione seu cluster Amazon MSK para **Cluster Amazon MSK**. Escolha **Salvar alterações**.

1. Na **MyNotebook**página, escolha **Executar**. Aguarde até que o **Status** mostre **Em execução**.

### Crie um notebook Studio usando o AWS CLI
<a name="example-notebook-msk-create-api"></a>

Para criar seu notebook Studio usando o AWS CLI, faça o seguinte:

1. Verifique se você tem as informações a seguir. Você precisa desses valores para criar seu aplicativo.
   + O ID da sua conta.
   + A sub-rede IDs e o ID do grupo de segurança da Amazon VPC que contém seu cluster Amazon MSK.

1. Crie um arquivo chamado `create.json` com o conteúdo a seguir. Substitua os valores de espaço reservado por suas próprias informações.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Para criar o seu aplicativo, execute o comando a seguir:

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Quando o comando for concluído, você deverá ver um resultado semelhante ao apresentado a seguir, mostrando os detalhes do seu novo bloco de anotações do Studio:

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Execute o comando a seguir para iniciar o aplicativo. Substitua o valor do exemplo pelo ID de sua conta.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Envie dados para seu cluster Amazon MSK
<a name="example-notebook-msk-send"></a>

Nesta seção, você executa um script Python em seu EC2 cliente Amazon para enviar dados para sua fonte de dados Amazon MSK.

1. Conecte-se ao seu EC2 cliente Amazon.

1. Execute os comandos a seguir para instalar o Python versão 3, o Pip e o pacote Kafka para Python e confirme as ações:

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Configure o AWS CLI em sua máquina cliente digitando o seguinte comando:

   ```
   aws configure
   ```

   Forneça as credenciais da sua conta e **us-east-1** para `region`.

1. Crie um arquivo chamado `stock.py` com o conteúdo a seguir. Substitua o valor da amostra pela string Bootstrap Brokers do seu cluster Amazon MSK e atualize o nome do tópico se o tópico não for: **AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Execute o script com o comando a seguir:

   ```
   $ python3 stock.py
   ```

1. Deixe o script em execução enquanto você conclui a seção a seguir.

## Teste seu bloco de anotações do Studio
<a name="example-notebook-msk-test"></a>

Nesta seção, você usa seu bloco de anotações do Studio para consultar dados do seu cluster Amazon MSK.

1. [Abra o serviço gerenciado para o console Apache Flink em casa https://console.aws.amazon.com/managed-flink/? region=us-east-1\$1/aplicativos/painel](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Na página **Aplicativos do Managed Service for Apache Flink**, selecione a guia **bloco de anotações do Studio**. Selecione **MyNotebook**.

1. Na **MyNotebook**página, escolha **Abrir no Apache Zeppelin**.

   A interface do Apache Zeppelin é aberta em uma nova guia.

1. Na página **Bem-vindo ao Zeppelin\$1**, selecione **Nova anotação do Zeppelin**.

1. Na página **Anotação do Zeppelin**, insira a seguinte consulta em uma nova anotação:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Selecione o ícone de execução.

   O aplicativo exibe dados do cluster Amazon MSK.

Para abrir o painel do Apache Flink para que seu aplicativo visualize aspectos operacionais, selecione **TRABALHO FLINK**. Para obter mais informações sobre o painel do Flink, consulte o [painel do Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) no [Guia do desenvolvedor do Managed Service for Apache Flink](https://docs.aws.amazon.com/).

Para obter mais exemplos de consultas SQL do Flink Streaming, veja [Consultas](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) na [documentação do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Limpe seu aplicativo e os recursos dependentes
<a name="example-notebook-cleanup"></a>

## Exclua seu bloco de anotações do Studio
<a name="example-notebook-cleanup-app"></a>

1. Abra o console do Managed Service for Apache Flink.

1. Selecione **MyNotebook**.

1. Escolha **Ações** e **Excluir**.

## Exclua seu AWS Glue banco de dados e conexão
<a name="example-notebook-cleanup-glue"></a>

1. Abra o AWS Glue console em [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Selecione **Bancos de dados** na barra de navegação à esquerda. Marque a caixa de seleção ao lado de **Padrão** para selecioná-la. Selecione **Ação**, **Excluir banco de dados**. Confirme a seleção.

1. Selecione **Conexões** na barra de navegação à esquerda. Marque a caixa de seleção ao lado **ZeppelinConnection**para selecioná-la. Selecione **Ação**, **Excluir conexão**. Confirme a seleção.

## Exclua a função e a política do perfil do IAM
<a name="example-notebook-msk-cleanup-iam"></a>

1. Abra o console do IAM em [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. No menu de navegação à esquerda, selecione **Funções**.

1. Use a barra de pesquisa para pesquisar a **ZeppelinRole**função.

1. Escolha a **ZeppelinRole**função. Selecione **Excluir função**. Confirme a exclusão.

## Excluir seu grupo CloudWatch de registros
<a name="example-notebook-cleanup-cw"></a>

O console cria um grupo de CloudWatch registros e um stream de registros para você quando você cria seu aplicativo usando o console. Você não tem um grupo de logs e um fluxo de logs se tiver criado seu aplicativo usando o AWS CLI.

1. Abra o CloudWatch console em [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Selecione **Grupos de log** na barra de navegação à esquerda.

1. Escolha o grupo**/AWS/KinesisAnalytics/MyNotebook**log.

1. Selecione **Actions (Ações)**, **Delete log group(s) (Excluir grupo(s) de log)**. Confirme a exclusão.

## Limpe os recursos do Kinesis Data Streams
<a name="example-notebook-cleanup-streams"></a>

****Para excluir o seu fluxo do Kinesis, abra o console do Kinesis Data Streams, selecione seu fluxo do Kinesis e selecione Ações**, Excluir**.

## Limpar os recursos do MSK
<a name="example-notebook-cleanup-msk"></a>

Siga as etapas nesta seção se você criou um cluster Amazon MSK para este tutorial. Esta seção tem instruções para limpar sua instância cliente Amazon EC2, Amazon VPC e cluster Amazon MSK.

### Exclua seu cluster do Amazon MSK
<a name="example-notebook-msk-cleanup-msk"></a>

Siga estas etapas se você criou um cluster Amazon MSK para este tutorial.

1. Abra o console Amazon MSK em [https://console.aws.amazon.com/msk/casa? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Selecione **AWS KafkaTutorialCluster**. Escolha **Excluir**. Insira **delete** na janela que aparece e confirme sua seleção.

### Encerrar sua instância cliente
<a name="example-notebook-msk-cleanup-client"></a>

Siga estas etapas se você criou uma instância cliente do Amazon EC2 para este tutorial.

1. Abra o console do Amazon EC2 em [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Selecione **Instâncias** na barra de navegação à esquerda.

1. Escolha a caixa de seleção ao lado **ZeppelinClient**para selecioná-la.

1. Selecione **Estado da instância** e **Encerrar instância**.

### Excluir sua Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

Siga estas etapas se você criou uma Amazon VPC para este tutorial.

1. Abra o console do Amazon EC2 em [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Selecione **Interfaces de rede** na barra de navegação à esquerda.

1. Insira a sua ID de VPC na barra de pesquisa e pressione Enter para pesquisar.

1. Marque a caixa de seleção no cabeçalho da tabela para selecionar todas as interfaces de rede exibidas.

1. Clique em **Actions (Ações)** e em **Detach (Desanexar)**. Na janela exibida, selecione **Ativar** em **Forçar desanexação**. Selecione **Desanexar** e aguarde até que todas as interfaces de rede atinjam o status **Disponível**.

1. Marque a caixa de seleção no cabeçalho da tabela para selecionar novamente todas as interfaces de rede exibidas.

1. Selecione **Actions**, **Delete**. Confirme a ação.

1. Abra o console da Amazon VPC em [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Selecione **AWS KafkaTutorialVPC**. Selecione **Actions**, **Excluir VPC**. Entre **delete** e confirme a exclusão.

# Tutorial: implante um notebook do Studio como um aplicativo Managed Service for Apache Flink com estado durável
<a name="example-notebook-deploy"></a>

O tutorial a seguir demonstra como implantar um Studio notebook como um aplicativo Managed Service for Apache Flink com estado durável.

**Topics**
+ [Concluir os pré-requisitos](#example-notebook-durable-setup)
+ [Implemente um aplicativo com estado durável usando o Console de gerenciamento da AWS](#example-notebook-deploy-console)
+ [Implemente um aplicativo com estado durável usando o AWS CLI](#example-notebook-deploy-cli)

## Concluir os pré-requisitos
<a name="example-notebook-durable-setup"></a>

Crie um novo Studio notebook seguindo o [Tutorial: crie um notebook do Studio no Managed Service for Apache Flink](example-notebook.md), usando o fluxo de dados Kinesis ou o Amazon MSK. Dê um nome ao Studio notebook `ExampleTestDeploy`.

## Implemente um aplicativo com estado durável usando o Console de gerenciamento da AWS
<a name="example-notebook-deploy-console"></a>

1. Adicione um local de bucket do S3 onde deseja que o código empacotado seja armazenado em **Localização do código do aplicativo - *opcional*** no console. Isso habilita as etapas para implantar e executar seu aplicativo diretamente do notebook.

1. Adicione as permissões necessárias à função do aplicativo para habilitar a função que você está usando para ler e gravar em um bucket do Amazon S3 e para iniciar um aplicativo Managed Service for Apache Flink:
   + Amazon S3 FullAccess
   + Gerenciado pela Amazon - flinkFullAccess
   + Acesso às suas fontes, destinos e VPCs conforme aplicável. Para obter mais informações, consulte [Analise as permissões do IAM para notebooks do Studio](how-zeppelin-iam.md).

1. Use o seguinte código de exemplo:

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. Com o lançamento desse atributo, você verá uma nova lista suspensa no canto superior direito de cada nota em seu notebook com o nome do notebook. Você pode fazer o seguinte:
   + Veja as configurações do Studio notebook no Console de gerenciamento da AWS.
   + Crie seu Zeppelin Note e exporte-o para o Amazon S3. Nesse ponto, forneça um nome para seu aplicativo e selecione **Criar e exportar**. Você receberá uma notificação quando a exportação for concluída.
   + Se precisar, você pode visualizar e executar quaisquer testes adicionais no executável no Amazon S3.
   + Quando a compilação estiver concluída, você poderá implantar seu código como um aplicativo de transmissão do Kinesis com estado durável e escalabilidade automática.
   + Use o menu suspenso e selecione **Implantar o Zeppelin Note como aplicativo de transmissão do Kinesis**. Revise o nome do aplicativo e escolha **Implantar via AWS console**.
   + Isso levará você à Console de gerenciamento da AWS página de criação de um serviço gerenciado para o aplicativo Apache Flink. Observe que o nome do aplicativo, o paralelismo, a localização do código, as funções padrão do Glue DB, VPC (se aplicável) e IAM foram pré-preenchidas. Valide se as funções do IAM têm as permissões necessárias para suas fontes e destinos. Os snapshots são habilitados por padrão para um gerenciamento durável do estado do aplicativo.
   + Selecione **Create application** (Criar aplicativo).
   + Você pode selecionar **Configurar** e modificar qualquer configuração e selecionar **Executar** para iniciar seu aplicativo de transmissão.

## Implemente um aplicativo com estado durável usando o AWS CLI
<a name="example-notebook-deploy-cli"></a>

Para implantar um aplicativo usando o AWS CLI, você deve atualizá-lo AWS CLI para usar o modelo de serviço fornecido com as informações do Beta 2. Para obter informações sobre como usar o modelo de serviço atualizado, consulte [Concluir os pré-requisitos do .Concluir os pré-requisitos](example-notebook.md#example-notebook-setup).

O código de exemplo a seguir cria um novo Studio notebook:

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

O código de exemplo a seguir inicia um Studio notebook:

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

O código a seguir retorna a URL da página do Apache Zeppelin notebook de um aplicativo:

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Veja exemplos de consultas para analisar dados em um notebook do Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Crie tabelas com o Amazon MSK/Apache Kafka](#how-zeppelin-examples-creating-tables)
+ [Crie tabelas com o Kinesis](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [Consulte uma janela em cascata](#how-zeppelin-examples-tumbling)
+ [Consulte uma janela deslizante](#how-zeppelin-examples-sliding)
+ [Use o SQL interativo](#how-zeppelin-examples-interactive-sql)
+ [Use o conector BlackHole SQL](#how-zeppelin-examples-blackhole-connector-sql)
+ [Use o Scala para gerar dados de amostra](#notebook-example-data-generator)
+ [Use o Scala interativo](#notebook-example-interactive-scala)
+ [Use o Python interativo](#notebook-example-interactive-python)
+ [Use uma combinação interativa de Python, SQL e Scala](#notebook-example-interactive-pythonsqlscala)
+ [Use um fluxo de dados do Kinesis entre contas](#notebook-example-crossaccount-kds)

Para obter informações sobre as configurações de consulta SQL do Apache Flink, consulte [Flink on Zeppelin](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html) Notebooks for Interactive Data Analysis.

Para visualizar seu aplicativo no painel do Apache Flink, selecione **FLINK JOB na página **Zeppelin Note **** do seu aplicativo.

Para obter mais informações sobre consultas de janela, consulte [Windows na documentação do](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) [Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

[Para obter mais exemplos de consultas SQL do Apache Flink Streaming, consulte [Consultas](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) na documentação do Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

## Crie tabelas com o Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Você pode usar o conector Amazon MSK Flink com o Managed Service for Apache Flink Studio para autenticar sua conexão com autenticação de texto simples, SSL ou IAM. Crie suas tabelas usando as propriedades específicas de acordo com seus requisitos.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Você pode combiná-las com outras propriedades no [Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/).

## Crie tabelas com o Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

No exemplo a seguir, você cria uma tabela usando o Kinesis:

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Para obter mais informações sobre outras propriedades que você pode usar, consulte [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Consulte uma janela em cascata
<a name="how-zeppelin-examples-tumbling"></a>

A consulta SQL do Flink Streaming a seguir seleciona o preço mais alto em cada janela em cascata de cinco segundos da tabela `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Consulte uma janela deslizante
<a name="how-zeppelin-examples-sliding"></a>

A consulta SQL do Flink Streaming a seguir seleciona o preço mais alto em cada janela deslizante de cinco segundos da tabela `ZeppelinTopic`:

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Use o SQL interativo
<a name="how-zeppelin-examples-interactive-sql"></a>

Este exemplo imprime o tempo máximo do evento e o tempo de processamento e a soma dos valores da tabela de valores-chave. Certifique-se de ter o exemplo de script de geração de dados da execução [Use o Scala para gerar dados de amostra](#notebook-example-data-generator). Para experimentar outras consultas SQL, como filtragem e junções em seu notebook Studio, consulte a documentação do Apache Flink: [Consultas](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) na documentação do Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Use o conector BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

O conector BlackHole SQL não exige que você crie um stream de dados do Kinesis ou um cluster Amazon MSK para testar suas consultas. Para obter informações sobre o conector BlackHole SQL, consulte [Conector BlackHole SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) na documentação do Apache Flink. Neste exemplo, o catálogo padrão é um catálogo na memória.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Use o Scala para gerar dados de amostra
<a name="notebook-example-data-generator"></a>

Este exemplo usa o Scala para gerar dados de amostra. Você pode usar esses dados de exemplo para testar várias consultas. Use a instrução criar tabela para criar a tabela de valores-chave.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Use o Scala interativo
<a name="notebook-example-interactive-scala"></a>

Esta é a tradução em Scala do [Use o SQL interativo](#how-zeppelin-examples-interactive-sql). Para ver mais exemplos de Scala, consulte [API de tabela](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) na documentação do Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Use o Python interativo
<a name="notebook-example-interactive-python"></a>

Esta é a tradução em Python do [Use o SQL interativo](#how-zeppelin-examples-interactive-sql). Para ver mais exemplos de Python, consulte [API de tabela](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) na documentação do Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Use uma combinação interativa de Python, SQL e Scala
<a name="notebook-example-interactive-pythonsqlscala"></a>

Você pode usar qualquer combinação de SQL, Python e Scala em seu notebook para análise interativa. Em um notebook do Studio que você planeja implantar como um aplicativo com estado durável, você pode usar uma combinação de SQL e Scala. Este exemplo mostra as seções que são ignoradas e aquelas que são implantadas no aplicativo com estado durável.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Use um fluxo de dados do Kinesis entre contas
<a name="notebook-example-crossaccount-kds"></a>

Para usar um fluxo de dados do Kinesis que está em uma conta diferente da conta que tem seu notebook Studio, crie uma função de execução de serviço na conta em que seu notebook Studio está sendo executado e uma política de confiança de função na conta que tem o fluxo de dados. Use `aws.credentials.provider`, `aws.credentials.role.arn` e `aws.credentials.role.sessionName` no conector Kinesis em sua instrução criar tabela DDL para criar uma tabela em relação ao fluxo de dados.

Use a seguinte função de execução de serviço para a conta do notebook Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Use a política `AmazonKinesisFullAccess` e a seguinte política de confiança de função para a conta de fluxo de dados.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Use o parágrafo a seguir para a instrução de criação de tabela.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```