

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exemples et didacticiels pour les blocs-notes Studio dans Managed Service for Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [Tutoriel : Création d'un bloc-notes Studio dans Managed Service pour Apache Flink](example-notebook.md)
+ [Tutoriel : Déployer un bloc-notes Studio en tant que service géré pour une application Apache Flink à état durable](example-notebook-deploy.md)
+ [Afficher des exemples de requêtes pour analyser des données dans un bloc-notes Studio](how-zeppelin-sql-examples.md)

# Tutoriel : Création d'un bloc-notes Studio dans Managed Service pour Apache Flink
<a name="example-notebook"></a>

Le didacticiel suivant explique comment créer un bloc-notes Studio qui lit les données d'un flux de données Kinesis ou d'un cluster Amazon MSK.

**Topics**
+ [Remplir les conditions préalables](#example-notebook-setup)
+ [Création d'une AWS Glue base de données](#example-notebook-glue)
+ [Étapes suivantes : créer un bloc-notes Studio avec Kinesis Data Streams ou Amazon MSK](#examples-notebook-nextsteps)
+ [Créer un bloc-notes Studio avec Kinesis Data Streams](example-notebook-streams.md)
+ [Créer un bloc-notes Studio avec Amazon MSK](example-notebook-msk.md)
+ [Nettoyez votre application et les ressources dépendantes](example-notebook-cleanup.md)

## Remplir les conditions préalables
<a name="example-notebook-setup"></a>

Assurez-vous qu'il s' AWS CLI agit de la version 2 ou d'une version ultérieure. Pour installer la dernière version AWS CLI, voir [Installation, mise à jour et désinstallation de la AWS CLI version 2.](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)

## Création d'une AWS Glue base de données
<a name="example-notebook-glue"></a>

Votre bloc-notes Studio utilise une base de données [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) pour les métadonnées relatives à votre source de données Amazon MSK.

**Création d'une AWS Glue base de données**

1. Ouvrez la AWS Glue console à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Choisissez **Ajouter une base de données**. Dans la fenêtre **Ajouter une base de données**, saisissez **default** comme **nom de la base de données**. Choisissez **Créer**. 

## Étapes suivantes : créer un bloc-notes Studio avec Kinesis Data Streams ou Amazon MSK
<a name="examples-notebook-nextsteps"></a>

Avec ce didacticiel, vous pouvez créer un bloc-notes Studio qui utilise Kinesis Data Streams ou Amazon MSK :
+ [Créer un bloc-notes Studio avec Kinesis Data Streams](example-notebook-streams.md) : avec Kinesis Data Streams, vous créez rapidement une application qui utilise un flux de données Kinesis comme source. Il vous suffit de créer un flux de données Kinesis en tant que ressource dépendante.
+ [Créer un bloc-notes Studio avec Amazon MSK](example-notebook-msk.md) : avec Amazon MSK, vous créez une application qui utilise un cluster Amazon MSK comme source. Vous devez créer un Amazon VPC, une instance client Amazon EC2 et un cluster Amazon MSK en tant que ressources dépendantes.

# Créer un bloc-notes Studio avec Kinesis Data Streams
<a name="example-notebook-streams"></a>

Ce didacticiel explique comment créer un bloc-notes Studio qui utilise un flux de données Kinesis comme source.

**Topics**
+ [Remplir les conditions préalables](#example-notebook-streams-setup)
+ [Création d'une AWS Glue table](#example-notebook-streams-glue)
+ [Créer un bloc-notes Studio avec Kinesis Data Streams](#example-notebook-streams-create)
+ [Envoyer des données vers votre flux de données Kinesis](#example-notebook-streams-send)
+ [Tester votre bloc-notes Studio](#example-notebook-streams-test)

## Remplir les conditions préalables
<a name="example-notebook-streams-setup"></a>

Avant de créer un bloc-notes Studio, créez un flux de données Kinesis (`ExampleInputStream`). Votre application utilise ce flux comme source de l’application.

Vous pouvez créer ce flux à l’aide de la console Amazon Kinesis ou de la commande AWS CLI suivante. Pour obtenir des instructions sur la console, consultez [Création et mise à jour de flux de données](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) dans le *Guide du développeur Amazon Kinesis Data Streams*. Nommez le flux **ExampleInputStream** et définissez le **Nombre de partitions ouvertes** sur **1**.

Pour créer le stream (`ExampleInputStream`) à l'aide de AWS CLI, utilisez la commande Amazon Kinesis `create-stream` AWS CLI suivante.

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Création d'une AWS Glue table
<a name="example-notebook-streams-glue"></a>

Votre bloc-notes Studio utilise une base de données [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) pour les métadonnées relatives à votre source de données Kinesis Data Streams.

**Note**  
Vous pouvez d’abord créer la base de données manuellement ou laisser le service géré pour Apache Flink la créer pour vous lors de la création du bloc-notes. De même, vous pouvez soit créer la table manuellement comme décrit dans cette section, soit utiliser le code du connecteur de création de table pour le service géré pour Apache Flink dans votre bloc-notes dans Apache Zeppelin pour créer votre table via une instruction DDL. Vous pouvez ensuite vous enregistrer AWS Glue pour vous assurer que la table a été correctement créée.

**Création d’une table**

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Si vous n'avez pas encore de AWS Glue base de données, choisissez **Bases de données** dans la barre de navigation de gauche. Choisissez **Ajouter une base de données**. Dans la fenêtre **Ajouter une base de données**, saisissez **default** comme **nom de la base de données**. Sélectionnez **Create** (Créer).

1. Dans la barre de navigation de gauche, choisissez **Tables**. Sur la page **Tables**, choisissez **Ajouter des tables**, **Ajouter une table manuellement**.

1. Sur la page **Configurer les propriétés de votre table**, saisissez **stock** pour **Nom de la table**. Assurez-vous de sélectionner la base de données que vous avez créée précédemment. Choisissez **Suivant**.

1. Sur la page **Ajouter un magasin de données**, choisissez **Kinesis**. Pour le **Nom du flux**, saisissez **ExampleInputStream**. Pour **URL source Kinesis**, saisissez **https://kinesis.us-east-1.amazonaws.com**. Si vous copiez et collez l’**URL source Kinesis**, veillez à supprimer les espaces de début ou de fin. Choisissez **Suivant**.

1. Sur la page **Classification**, choisissez **JSON**. Choisissez **Suivant**.

1. Sur la page **Définir un schéma**, choisissez Ajouter une colonne pour ajouter une colonne. Ajoutez des colonnes avec les propriétés suivantes :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/example-notebook-streams.html)

   Choisissez **Suivant**.

1. Sur la page suivante, vérifiez vos paramètres, puis choisissez **Terminer**.

1. Choisissez la table que vous venez de créer dans la liste des tables.

1. Choisissez **Modifier la table** et ajoutez une propriété avec la clé `managed-flink.proctime` et la valeur `proctime`.

1. Choisissez **Appliquer**.

## Créer un bloc-notes Studio avec Kinesis Data Streams
<a name="example-notebook-streams-create"></a>

Maintenant que vous avez créé les ressources utilisées par votre application, vous pouvez créer votre bloc-notes Studio. 

**Topics**
+ [Créez un bloc-notes Studio à l'aide du AWS Management Console](#example-notebook-create-streams-console)
+ [Créez un bloc-notes Studio à l'aide du AWS CLI](#example-notebook-msk-create-api)

### Créez un bloc-notes Studio à l'aide du AWS Management Console
<a name="example-notebook-create-streams-console"></a>

1. Ouvrez le service géré pour la console Apache Flink [ https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1\$1/applications/tableau](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de bord. 

1. Sur la page **Applications de service géré pour Apache Flink**, choisissez l’onglet **Studio**. Choisissez **Créer un bloc-notes Studio**.
**Note**  
Vous pouvez aussi créer un bloc-notes Studio à partir des consoles Amazon MSK ou Kinesis Data Streams en sélectionnant votre cluster Amazon MSK ou votre flux de données Kinesis d’entrée, puis en choisissant **Traiter les données en temps réel**.

1. Sur la page **Créer un bloc-notes Studio**, fournissez les informations suivantes :
   + Saisissez **MyNotebook** comme nom du bloc-notes.
   + Pour **Base de données AWS Glue**, choisissez **Par défaut**.

   Choisissez **Créer un bloc-notes Studio**.

1. Sur la **MyNotebook**page, choisissez **Exécuter**. Attendez que **État** indique **En cours d’exécution**. Des frais s’appliquent lorsque le bloc-notes fonctionne.

### Créez un bloc-notes Studio à l'aide du AWS CLI
<a name="example-notebook-msk-create-api"></a>

Pour créer votre bloc-notes Studio à l'aide du AWS CLI, procédez comme suit :

1. Vérifiez votre identifiant de compte. Vous avez besoin de cette valeur pour créer votre application.

1. Créez le rôle `arn:aws:iam::AccountID:role/ZeppelinRole` et ajoutez les autorisations suivantes au rôle créé automatiquement par la console.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Créez un fichier nommé `create.json` avec le contenu suivant. Remplacez les valeurs des espaces réservés par vos informations.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Pour créer votre application, exécutez la commande suivante.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Lorsque la commande est terminée, vous voyez une sortie contenant les détails de votre nouveau bloc-notes Studio. Voici un exemple de la sortie.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Pour lancer votre application, exécutez la commande suivante. Remplacez les exemples de valeur par l’identifiant de votre compte.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Envoyer des données vers votre flux de données Kinesis
<a name="example-notebook-streams-send"></a>

Pour envoyer des données de test vers votre flux de données Kinesis, procédez comme suit :

1. Utilisez le [Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Choisissez **Créer un utilisateur Cognito** avec. CloudFormation

1. La CloudFormation console s'ouvre avec le modèle Kinesis Data Generator. Choisissez **Suivant**.

1. Sur la page **Spécifier les détails de la pile**, saisissez le nom d’utilisateur et le mot de passe de votre utilisateur Cognito. Choisissez **Suivant**.

1. Sur la page **Configurer les options de pile**, choisissez **Suivant**.

1. Sur la page **Review Kinesis-Data-Generator-Cognito -User**, sélectionnez l'option **J'accuse réception AWS CloudFormation susceptible de créer des ressources IAM**. case à cocher. Sélectionnez **Créer une pile**.

1. Attendez la fin de la création de la CloudFormation pile. **Une fois la pile terminée, ouvrez la pile **Kinesis-Data-Generator-Cognito-User** dans la console et choisissez l'onglet Sorties. CloudFormation ** Ouvrez l'URL répertoriée pour la valeur **KinesisDataGeneratorUrl**de sortie.

1. Sur la page **Amazon Kinesis Data Generator**, connectez-vous avec les informations d’identification que vous avez créées à l’étape 4.

1. Sur la page suivante, renseignez les valeurs suivantes :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/example-notebook-streams.html)

   Pour **Modèle d’enregistrement**, collez le code suivant :

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Choisissez **Envoyer les données**.

1. Le générateur enverra les données à votre flux de données Kinesis. 

   Laissez le générateur tourner pendant que vous terminez la section suivante.

## Tester votre bloc-notes Studio
<a name="example-notebook-streams-test"></a>

Dans cette section, vous utilisez votre bloc-notes Studio pour interroger les données de votre flux de données Kinesis.

1. Ouvrez le service géré pour la console Apache Flink [ https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1\$1/applications/tableau](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de bord.

1. Sur la page **Applications de service géré pour Apache Flink**, choisissez l’onglet **Bloc-notes Studio**. Sélectionnez **MyNotebook**.

1. Sur la **MyNotebook**page, choisissez **Ouvrir dans Apache Zeppelin**.

   L’interface Apache Zeppelin s’ouvre dans un nouvel onglet.

1. Sur la page **Bienvenue sur Zeppelin \$1**, choisissez **Note Zeppelin**.

1. Sur la page **Note Zeppelin**, entrez la requête suivante dans une nouvelle note :

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Choisissez l’icône d’exécution.

   Après un court instant, la note affiche les données du flux de données Kinesis.

Pour ouvrir le tableau de bord Apache Flink de votre application afin de visualiser les aspects opérationnels, choisissez **FLINK JOB**. Pour plus d’informations sur le tableau de bord Flink, consultez [Tableau de bord Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) dans le [guide du développeur du service géré pour Apache Flink](https://docs.aws.amazon.com/).

Pour d’autres exemples de requêtes SQL Flink Streaming, consultez la section [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) de la documentation [Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Créer un bloc-notes Studio avec Amazon MSK
<a name="example-notebook-msk"></a>

Ce didacticiel explique comment créer un bloc-notes Studio qui utilise un cluster Amazon MSK comme source.

**Topics**
+ [Configuration d'un cluster Amazon MSK](#example-notebook-msk-setup)
+ [Ajoutez une passerelle NAT à votre VPC](#example-notebook-msk-nat)
+ [Création d'une AWS Glue connexion et d'une table](#example-notebook-msk-glue)
+ [Créer un bloc-notes Studio avec Amazon MSK](#example-notebook-msk-create)
+ [Envoyer des données à votre cluster Amazon MSK](#example-notebook-msk-send)
+ [Tester votre bloc-notes Studio](#example-notebook-msk-test)

## Configuration d'un cluster Amazon MSK
<a name="example-notebook-msk-setup"></a>

Pour ce didacticiel, vous avez besoin d’un cluster Amazon MSK qui autorise l’accès en texte brut. Si vous n'avez pas encore configuré de cluster Amazon MSK, suivez le didacticiel [Getting Started Using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) pour créer un Amazon VPC, un cluster Amazon MSK, une rubrique et une instance client Amazon. EC2 

Lorsque vous suivez le didacticiel, procédez comme suit :
+ Dans l’[étape 3 : créer un cluster Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), à l’étape 4, modifiez la valeur `ClientBroker` de `TLS` à **PLAINTEXT**.

## Ajoutez une passerelle NAT à votre VPC
<a name="example-notebook-msk-nat"></a>

Si vous avez créé un cluster Amazon MSK en suivant le didacticiel [Mise en route avec Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html), ou si votre VPC Amazon existant ne possède pas encore de passerelle NAT pour ses sous-réseaux privés, vous devez ajouter une passerelle NAT à votre VPC Amazon. Le schéma suivant illustre l’architecture. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/images/vpc_05.png)


Pour créer une passerelle NAT pour votre Amazon VPC, procédez comme suit :

1. Ouvrez la console Amazon VPC à l’adresse [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Dans la barre de navigation de gauche, choisissez **Passerelles NAT**.

1. Sur la page **Passerelles NAT**, choisissez **Créer une passerelle NAT**.

1. Sur la page **Créer une passerelle NAT**, renseignez les valeurs suivantes :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/example-notebook-msk.html)

   Choisissez **Créer une passerelle NAT**.

1. Dans le volet de navigation de gauche, choisissez **Tables de routage**.

1. Choisissez **Créer une table de routage**.

1. Sur la page **Créer une table de routage**, fournissez les informations suivantes :
   + **Balise de nom** : **ZeppelinRouteTable**
   + **VPC** **: Choisissez votre VPC (par exemple, VPC).AWS KafkaTutorial**

   Sélectionnez **Create** (Créer).

1. Dans la liste des tables de routage, choisissez **ZeppelinRouteTable**. Choisissez l’onglet **Routes**, puis **Modifier les routes**.

1. Sur la page **Modifier les routes**, choisissez **Ajouter une route**.

1. Dans le **** Pour **Destination**, saisissez **0.0.0.0/0**. Pour **Target**, choisissez **NAT Gateway**, **ZeppelinGateway**. Choisissez **Enregistrer les routes**. Choisissez **Close** (Fermer).

1. Sur la page Tables de routage, lorsque cette option est **ZeppelinRouteTable**sélectionnée, choisissez l'onglet **Associations de sous-réseaux**. Choisissez **Modifier les associations de sous-réseaux**.

1. Sur la page **Modifier les associations de sous-réseaux**, choisissez **AWS KafkaTutorialSubnet2** et **AWS KafkaTutorialSubnet3**. Choisissez **Save** (Enregistrer).

## Création d'une AWS Glue connexion et d'une table
<a name="example-notebook-msk-glue"></a>

Votre bloc-notes Studio utilise une base de données [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) pour les métadonnées relatives à votre source de données Amazon MSK. Dans cette section, vous créez une AWS Glue connexion qui décrit comment accéder à votre cluster Amazon MSK et un AWS Glue tableau qui décrit comment présenter les données de votre source de données à des clients tels que votre bloc-notes Studio. 

**Créer une connexion**

1. Connectez-vous à la AWS Glue console AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Si vous n'avez pas encore de AWS Glue base de données, choisissez **Bases de données** dans la barre de navigation de gauche. Choisissez **Ajouter une base de données**. Dans la fenêtre **Ajouter une base de données**, saisissez **default** comme **nom de la base de données**. Sélectionnez **Create** (Créer).

1. Dans le menu de navigation de gauche, sélectionnez **Connexions**. Choisissez **Ajouter une connexion**.

1. Dans la fenêtre **Ajouter une connexion**, indiquez les valeurs suivantes :
   + Pour **Nom de connexion**, saisissez **ZeppelinConnection**.
   + Pour **Type de connexion**, choisissez **Kafka**.
   + Pour le **serveur bootstrap Kafka URLs**, fournissez la chaîne de broker bootstrap de votre cluster. Vous pouvez obtenir les agents d’amorçage depuis la console MSK ou en saisissant la commande CLI suivante :

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Décochez la case **Exiger une connexion SSL**.

   Choisissez **Suivant**.

1. Sur la page **VPC**, renseignez les valeurs suivantes :
   + **Pour le **VPC**, choisissez le nom de votre VPC (par exemple, VPC.) AWS KafkaTutorial**
   + Pour **Sous-réseau**, choisissez **AWS KafkaTutorialSubnet2.**
   + Pour **Groupes de sécurité**, sélectionnez tous les groupes disponibles.

   Choisissez **Suivant**.

1. Sur la page **Propriétés de la connexion**/**Accès à la connexion**, choisissez **Terminer**.

**Création d’une table**
**Note**  
Vous pouvez soit créer la table manuellement comme décrit dans les étapes suivantes, soit utiliser le code du connecteur de création de table pour le service géré pour Apache Flink dans votre bloc-notes dans Apache Zeppelin pour créer votre table via une instruction DDL. Vous pouvez ensuite vous enregistrer AWS Glue pour vous assurer que la table a été correctement créée.

1. Dans la barre de navigation de gauche, choisissez **Tables**. Sur la page **Tables**, choisissez **Ajouter des tables**, **Ajouter une table manuellement**.

1. Sur la page **Configurer les propriétés de votre table**, saisissez **stock** pour **Nom de la table**. Assurez-vous de sélectionner la base de données que vous avez créée précédemment. Choisissez **Suivant**.

1. Sur la page **Ajouter un magasin de données**, choisissez **Kafka**. Pour le **nom du sujet**, entrez le nom de votre sujet (par exemple **AWS KafkaTutorialTopic**). Pour **Connection**, choisissez **ZeppelinConnection**.

1. Sur la page **Classification**, choisissez **JSON**. Choisissez **Suivant**.

1. Sur la page **Définir un schéma**, choisissez Ajouter une colonne pour ajouter une colonne. Ajoutez des colonnes avec les propriétés suivantes :    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/example-notebook-msk.html)

   Choisissez **Suivant**.

1. Sur la page suivante, vérifiez vos paramètres, puis choisissez **Terminer**.

1. Choisissez la table que vous venez de créer dans la liste des tables.

1. Choisissez **Modifier le tableau** et ajoutez les propriétés suivantes :
   + clé :`managed-flink.proctime`, valeur : `proctime`
   + clé :`flink.properties.group.id`, valeur : `test-consumer-group`
   + clé :`flink.properties.auto.offset.reset`, valeur : `latest`
   + clé :`classification`, valeur : `json`

   Sans ces paires clé/valeur, le bloc-notes Flink rencontre une erreur. 

1. Choisissez **Appliquer**.

## Créer un bloc-notes Studio avec Amazon MSK
<a name="example-notebook-msk-create"></a>

Maintenant que vous avez créé les ressources utilisées par votre application, vous pouvez créer votre bloc-notes Studio. 

**Topics**
+ [Créez un bloc-notes Studio à l'aide du AWS Management Console](#example-notebook-create-msk-console)
+ [Créez un bloc-notes Studio à l'aide du AWS CLI](#example-notebook-msk-create-api)

**Note**  
Vous pouvez également créer un bloc-notes Studio à partir de la console Amazon MSK en choisissant un cluster existant, puis en choisissant **Traiter les données en temps réel**.

### Créez un bloc-notes Studio à l'aide du AWS Management Console
<a name="example-notebook-create-msk-console"></a>

1. Ouvrez le service géré pour la console Apache Flink [ https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1\$1/applications/tableau](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de bord.

1. Sur la page **Applications de service géré pour Apache Flink**, choisissez l’onglet **Studio**. Choisissez **Créer un bloc-notes Studio**.
**Note**  
Pour créer un bloc-notes Studio à partir des consoles Amazon MSK ou Kinesis Data Streams, sélectionnez votre cluster Amazon MSK ou votre flux de données Kinesis d’entrée, puis choisissez **Traiter les données en temps réel**.

1. Sur la page **Créer un bloc-notes Studio**, fournissez les informations suivantes :
   + Pour **Nom du bloc-notes Studio**, saisissez **MyNotebook**.
   + Pour **Base de données AWS Glue**, choisissez **Par défaut**.

   Choisissez **Créer un bloc-notes Studio**.

1. Sur la **MyNotebook**page, choisissez l'onglet **Configuration**. Dans la section **Mise en réseau**, choisissez **Modifier**.

1. Dans la MyNotebook page **Modifier le réseau pour**, choisissez la **configuration VPC basée sur le cluster Amazon MSK**. Choisissez votre cluster Amazon MSK pour **Cluster Amazon MSK**. Sélectionnez **Enregistrer les modifications**.

1. Sur la **MyNotebook**page, choisissez **Exécuter**. Attendez que **État** indique **En cours d’exécution**.

### Créez un bloc-notes Studio à l'aide du AWS CLI
<a name="example-notebook-msk-create-api"></a>

Pour créer votre bloc-notes Studio à l'aide du AWS CLI, procédez comme suit :

1. Assurez-vous de disposer des informations suivantes. Vous avez besoin de ces valeurs pour créer votre application.
   + Votre ID de compte.
   + ID de sous-réseau IDs et de groupe de sécurité pour l'Amazon VPC qui contient votre cluster Amazon MSK.

1. Créez un fichier nommé `create.json` avec le contenu suivant. Remplacez les valeurs des espaces réservés par vos informations.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Pour créer votre application, exécutez la commande suivante.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Lorsque la commande est terminée, vous devriez obtenir une sortie similaire à celle qui suit, montrant les détails de votre nouveau bloc-notes Studio :

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Pour lancer votre application, exécutez la commande suivante. Remplacez les exemples de valeur par l’identifiant de votre compte.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Envoyer des données à votre cluster Amazon MSK
<a name="example-notebook-msk-send"></a>

Dans cette section, vous allez exécuter un script Python dans votre EC2 client Amazon pour envoyer des données à votre source de données Amazon MSK.

1. Connectez-vous à votre EC2 client Amazon.

1. Exécutez les commandes suivantes pour installer Python version 3, Pip et le package Kafka pour Python, puis confirmez les actions :

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Configurez le AWS CLI sur votre machine cliente en saisissant la commande suivante :

   ```
   aws configure
   ```

   Fournissez les informations d’identification de votre compte, et **us-east-1** pour la `region`.

1. Créez un fichier nommé `stock.py` avec le contenu suivant. Remplacez la valeur de l'échantillon par la chaîne Bootstrap Brokers de votre cluster Amazon MSK et mettez à jour le nom du sujet si celui-ci n'est pas : **AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Exécutez le script avec la commande suivante :

   ```
   $ python3 stock.py
   ```

1. Laissez le script s’exécuter pendant que vous complétez la section suivante.

## Tester votre bloc-notes Studio
<a name="example-notebook-msk-test"></a>

Dans cette section, vous utilisez votre bloc-notes Studio pour interroger les données de votre cluster Amazon MSK.

1. Ouvrez le service géré pour la console Apache Flink [ https://console.aws.amazon.com/managed-flink/chez vous ? region=us-east-1\$1/applications/tableau](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de bord.

1. Sur la page **Applications de service géré pour Apache Flink**, choisissez l’onglet **Bloc-notes Studio**. Sélectionnez **MyNotebook**.

1. Sur la **MyNotebook**page, choisissez **Ouvrir dans Apache Zeppelin**.

   L’interface Apache Zeppelin s’ouvre dans un nouvel onglet.

1. Sur la page **Bienvenue sur Zeppelin \$1**, choisissez **Nouvelle note Zeppelin**.

1. Sur la page **Note Zeppelin**, entrez la requête suivante dans une nouvelle note :

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Choisissez l’icône d’exécution.

   L’application affiche les données du cluster Amazon MSK.

Pour ouvrir le tableau de bord Apache Flink de votre application afin de visualiser les aspects opérationnels, choisissez **FLINK JOB**. Pour plus d’informations sur le tableau de bord Flink, consultez [Tableau de bord Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) dans le [guide du développeur du service géré pour Apache Flink](https://docs.aws.amazon.com/).

Pour d’autres exemples de requêtes SQL Flink Streaming, consultez la section [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) de la documentation [Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Nettoyez votre application et les ressources dépendantes
<a name="example-notebook-cleanup"></a>

## Configurez votre bloc-notes Studio.
<a name="example-notebook-cleanup-app"></a>

1. Ouvrez la console du service géré pour Apache Flink.

1. Sélectionnez **MyNotebook**.

1. Choisissez **Actions**, puis **Supprimer**.

## Supprimer votre AWS Glue base de données et votre connexion
<a name="example-notebook-cleanup-glue"></a>

1. Ouvrez la AWS Glue console à l'adresse [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Dans la barre de navigation de gauche, choisissez **Bases de données**. Cochez la case à côté de **Défaut** pour le sélectionner. Choisissez **Action**, **Supprimer la base de données**. Confirmez votre sélection.

1. Dans le menu de navigation de gauche, sélectionnez **Connexions**. Cochez la case à côté **ZeppelinConnection**pour le sélectionner. Choisissez **Action**, puis **Supprimer la connexion**. Confirmez votre sélection.

## Pour supprimer la politique et le rôle IAM
<a name="example-notebook-msk-cleanup-iam"></a>

1. Ouvrez la console IAM à l’adresse [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Dans le menu de navigation de gauche, choisissez **Rôles**.

1. Utilisez la barre de recherche pour rechercher le **ZeppelinRole**rôle.

1. Choisissez le **ZeppelinRole**rôle. Choisissez **Supprimer le rôle**. Confirmez la suppression.

## Supprimer votre groupe de CloudWatch journaux
<a name="example-notebook-cleanup-cw"></a>

La console crée un groupe de CloudWatch journaux et un flux de journaux pour vous lorsque vous créez votre application à l'aide de la console. Vous n’avez pas de groupe ni flux de journaux si vous avez créé votre application à l’aide de l’interface AWS CLI.

1. Ouvrez la CloudWatch console à l'adresse [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Dans le menu de navigation de gauche, choisissez **Groupe de journaux**.

1. Choisissez le groupe**/AWS/KinesisAnalytics/MyNotebook**log.

1. Sélectionnez **Actions**, **Supprimer le ou les groupes de journaux**. Confirmez la suppression.

## Nettoyez les ressources de Kinesis Data Streams
<a name="example-notebook-cleanup-streams"></a>

Pour supprimer votre flux Kinesis, ouvrez la console Kinesis Data Streams, sélectionnez votre flux Kinesis, puis choisissez **Actions**, **Supprimer.**

## Nettoyage des ressources MSK
<a name="example-notebook-cleanup-msk"></a>

Suivez les étapes de cette section si vous avez créé un cluster Amazon MSK pour ce didacticiel. Cette section contient des instructions pour nettoyer votre instance client Amazon EC2, Amazon VPC et votre cluster Amazon MSK.

### Supprimer votre cluster Amazon MSK
<a name="example-notebook-msk-cleanup-msk"></a>

Suivez ces étapes si vous avez créé un cluster Amazon MSK pour ce didacticiel.

1. Vous voulez ouvrir la console Amazon MSK à la [https://console.aws.amazon.com/msk/maison ? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Sélectionnez **AWS KafkaTutorialCluster**. Sélectionnez **Delete (Supprimer)**. Saisissez **delete** dans la fenêtre qui apparaît et confirmez votre sélection.

### Résilier une instance client
<a name="example-notebook-msk-cleanup-client"></a>

Suivez ces étapes si vous avez créé une instance de client Amazon EC2 pour ce didacticiel.

1. Ouvrez la console Amazon EC2 à l’adresse [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Dans la barre de navigation de gauche, choisissez **Instances**.

1. Cochez la case à côté **ZeppelinClient**pour le sélectionner.

1. Choisissez **État de l’instance**, **Résilier l’instance**.

### Supprimer votre Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

Suivez ces étapes si vous avez créé un Amazon VPC pour ce didacticiel.

1. Ouvrez la console Amazon EC2 à l’adresse [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Choisissez **Interfaces réseau** dans la barre de navigation de gauche.

1. Saisissez l’identifiant de VPC dans la barre de recherche, puis appuyez sur Entrée.

1. Cochez la case dans l’en-tête du tableau pour sélectionner toutes les interfaces réseau affichées.

1. Sélectionnez **Actions**, **Détacher**. Dans la fenêtre qui apparaît, choisissez **Activer** sous **Forcer le détachement**. Choisissez **Détacher** et attendez que toutes les interfaces réseau atteignent l’état **Disponible**.

1. Cochez la case dans l’en-tête du tableau pour sélectionner à nouveau toutes les interfaces réseau affichées.

1. Sélectionnez **Actions**, **Supprimer**. Confirmez l’action.

1. Ouvrez la console Amazon VPC à l’adresse [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Sélectionnez **AWS KafkaTutorialVPC**. Choisissez **Actions**, **Supprimer le VPC**. Saisissez **delete** et confirmez la suppression.

# Tutoriel : Déployer un bloc-notes Studio en tant que service géré pour une application Apache Flink à état durable
<a name="example-notebook-deploy"></a>

Le didacticiel suivant explique comment déployer un bloc-notes Studio en tant qu’application état durable de service géré pour Apache Flink.

**Topics**
+ [Exécuter les opérations prérequises](#example-notebook-durable-setup)
+ [Déployez une application durable à l'aide du AWS Management Console](#example-notebook-deploy-console)
+ [Déployez une application durable à l'aide du AWS CLI](#example-notebook-deploy-cli)

## Exécuter les opérations prérequises
<a name="example-notebook-durable-setup"></a>

Créez un nouveau bloc-notes Studio en suivant [Tutoriel : Création d'un bloc-notes Studio dans Managed Service pour Apache Flink](example-notebook.md), à l’aide de Kinesis Data Streams ou d’Amazon MSK. Nommez le bloc-notes Studio `ExampleTestDeploy`.

## Déployez une application durable à l'aide du AWS Management Console
<a name="example-notebook-deploy-console"></a>

1. Ajoutez un emplacement de compartiment S3 là où vous souhaitez que le code empaqueté soit stocké sous **Emplacement du code d’application (*facultatif***) dans la console. Cela permet de suivre les étapes de déploiement et d’exécution de votre application directement depuis le bloc-notes.

1. Ajoutez les autorisations requises au rôle d’application pour activer le rôle que vous utilisez pour lire et écrire dans un compartiment Amazon S3 et pour lancer une application de service géré pour Apache Flink :
   + Amazon S3 FullAccess
   + Amazon a géré- flinkFullAccess
   + Accès à vos sources, destinations et, VPCs le cas échéant. Pour de plus amples informations, veuillez consulter [Vérifiez les autorisations IAM pour les blocs-notes Studio](how-zeppelin-iam.md).

1. Utilisez l’exemple de code suivant :

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. Avec le lancement de cette fonctionnalité, vous verrez une nouvelle liste déroulante dans le coin supérieur droit de chaque note de votre bloc-notes avec le nom du bloc-notes. Vous pouvez effectuer les opérations suivantes :
   + Consultez les paramètres du bloc-notes Studio dans la AWS Management Console.
   + Créez votre note Zeppelin et exportez-la sur Amazon S3. À ce stade, donnez un nom à votre application et choisissez **Générer et exporter**. Vous recevrez une notification lorsque l’exportation sera terminée.
   + Si nécessaire, vous pouvez consulter et exécuter des tests supplémentaires sur l’exécutable dans Amazon S3.
   + Une fois la compilation terminée, vous pourrez déployer votre code sous la forme d’une application de streaming Kinesis dotée d’un état durable et de l’autoscaling.
   + Utilisez le menu déroulant et choisissez **Déployer la note Zeppelin en tant qu’application de streaming Kinesis**. Vérifiez le nom de l'application et choisissez **Déployer via AWS la console**.
   + Cela vous mènera à la AWS Management Console page de création d'un service géré pour l'application Apache Flink. Notez que le nom de l’application, le parallélisme, l’emplacement du code, Glue DB par défaut, le VPC (le cas échéant) et les rôles IAM ont été préremplis. Vérifiez que les rôles IAM disposent des autorisations requises pour accéder à vos sources et destinations. Les instantanés sont activés par défaut pour la gestion des applications à état durable.
   + Choisissez **Créer une application**.
   + Vous pouvez choisir de **configurer** et de modifier tous les paramètres, puis choisir **Exécuter** pour démarrer votre application de streaming.

## Déployez une application durable à l'aide du AWS CLI
<a name="example-notebook-deploy-cli"></a>

Pour déployer une application à l'aide de AWS CLI, vous devez mettre à jour votre modèle de service AWS CLI afin d'utiliser le modèle de service fourni avec vos informations sur la version bêta 2. Pour obtenir des informations sur l’utilisation du modèle de service mis à jour, consultez [Remplir les conditions préalablesExécuter les opérations prérequises](example-notebook.md#example-notebook-setup).

L’exemple suivant permet de créer un nouveau bloc-notes Studio :

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

L’exemple suivant permet de lancer un bloc-notes Studio :

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

Le code suivant renvoie l’URL de la page du bloc-notes Apache Zeppelin d’une application :

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Afficher des exemples de requêtes pour analyser des données dans un bloc-notes Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Création de tableaux avec Amazon MSK/Apache Kafka](#how-zeppelin-examples-creating-tables)
+ [Création de tables avec Kinesis](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [Interrogez une fenêtre qui clignote](#how-zeppelin-examples-tumbling)
+ [Interroger une fenêtre coulissante](#how-zeppelin-examples-sliding)
+ [Utiliser du SQL interactif](#how-zeppelin-examples-interactive-sql)
+ [Utiliser le connecteur BlackHole SQL](#how-zeppelin-examples-blackhole-connector-sql)
+ [Utiliser Scala pour générer des exemples de données](#notebook-example-data-generator)
+ [Utilisez Scala interactif](#notebook-example-interactive-scala)
+ [Utiliser du Python interactif](#notebook-example-interactive-python)
+ [Utilisez une combinaison de Python, SQL et Scala interactifs](#notebook-example-interactive-pythonsqlscala)
+ [Utiliser un flux de données Kinesis entre comptes](#notebook-example-crossaccount-kds)

Pour obtenir des informations sur les paramètres de requête SQL d’Apache Flink, consultez [Flink on Zeppelin Notebooks for Interactive Data Analysis](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

Pour afficher votre application dans le tableau de bord Apache Flink, choisissez **FLINK JOB** sur la page **Note Zeppelin** de votre application.

Pour plus d’informations sur les requêtes de fenêtre, consultez [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) dans la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

Pour d’autres exemples de requêtes SQL Apache Flink Streaming, consultez la section [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) de la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Création de tableaux avec Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Vous pouvez utiliser le connecteur Amazon MSK Flink avec le service géré pour Apache Flink Studio afin d’authentifier votre connexion à l’aide de l’authentification en texte brut, SSL ou IAM. Créez vos tables en utilisant les propriétés spécifiques selon vos besoins.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Vous pouvez les combiner avec d’autres propriétés sur le [connecteur SQL Apache Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/).

## Création de tables avec Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

Dans l’exemple suivant, vous créez une table à l’aide de Kinesis :

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Pour plus d’informations sur les autres propriétés que vous pouvez utiliser, consultez [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Interrogez une fenêtre qui clignote
<a name="how-zeppelin-examples-tumbling"></a>

La requête SQL Flink Streaming suivante sélectionne le prix le plus élevé pour chaque fenêtre bascule de cinq secondes dans la table `ZeppelinTopic` :

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Interroger une fenêtre coulissante
<a name="how-zeppelin-examples-sliding"></a>

La requête SQL Apache Flink Streaming suivante sélectionne le prix le plus élevé pour chaque fenêtre défilante de cinq secondes dans la table `ZeppelinTopic` :

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Utiliser du SQL interactif
<a name="how-zeppelin-examples-interactive-sql"></a>

Cet exemple imprime la durée maximale de l’événement et le temps de traitement, ainsi que la somme des valeurs de la table des valeurs clés. Assurez-vous que vous disposez de l’exemple de script de génération de données de l’exécution [Utiliser Scala pour générer des exemples de données](#notebook-example-data-generator). Pour essayer d’autres requêtes SQL telles que le filtrage et les jointures dans votre bloc-notes Studio, consultez [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) dans la documentation Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Utiliser le connecteur BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

Le connecteur BlackHole SQL ne vous oblige pas à créer un flux de données Kinesis ou un cluster Amazon MSK pour tester vos requêtes. Pour plus d'informations sur le connecteur BlackHole SQL, consultez la section [Connecteur BlackHole SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) dans la documentation d'Apache Flink. Dans cet exemple, le catalogue par défaut est un catalogue en mémoire.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Utiliser Scala pour générer des exemples de données
<a name="notebook-example-data-generator"></a>

Cet exemple utilise Scala pour générer des exemples de données. Vous pouvez utiliser ces exemples de données pour tester différentes requêtes. Utilisez l’instruction create table pour créer la table des valeurs clés.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Utilisez Scala interactif
<a name="notebook-example-interactive-scala"></a>

Il s’agit de la traduction Scala de [Utiliser du SQL interactif](#how-zeppelin-examples-interactive-sql). Pour d’autres exemples de Scala, consultez [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) dans la documentation d’Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Utiliser du Python interactif
<a name="notebook-example-interactive-python"></a>

Il s’agit de la traduction Python de [Utiliser du SQL interactif](#how-zeppelin-examples-interactive-sql). Pour d’autres exemples de Python, consultez [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) dans la documentation d’Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Utilisez une combinaison de Python, SQL et Scala interactifs
<a name="notebook-example-interactive-pythonsqlscala"></a>

Vous pouvez utiliser n’importe quelle combinaison de SQL, Python et Scala dans votre bloc-notes pour une analyse interactive. Dans un bloc-notes Studio que vous envisagez de déployer en tant qu’application à état durable, vous pouvez utiliser une combinaison de SQL et de Scala. Cet exemple montre les sections qui sont ignorées et celles qui sont déployées dans l’application à état durable.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Utiliser un flux de données Kinesis entre comptes
<a name="notebook-example-crossaccount-kds"></a>

Pour utiliser un flux de données Kinesis se trouvant dans un compte autre que le compte associé à votre bloc-notes Studio, créez un rôle d’exécution de service dans le compte sur lequel votre bloc-notes Studio est exécuté et une politique d’approbation des rôles dans le compte contenant le flux de données. Utilisez `aws.credentials.provider`, `aws.credentials.role.arn` et `aws.credentials.role.sessionName` dans le connecteur Kinesis dans votre instruction DDL de création de table pour créer une table par rapport au flux de données.

Utilisez le rôle d’exécution de service suivant pour le compte de bloc-notes Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Utilisez la politique `AmazonKinesisFullAccess` et la politique d’approbation des rôles suivante pour le compte de flux de données.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Utilisez le paragraphe suivant pour l’instruction de création de table.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```