

Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics pour les applications SQL :

1. À compter du **1er septembre 2025,** nous ne fournirons aucune correction de bogue pour les applications Amazon Kinesis Data Analytics for SQL, car leur support sera limité, compte tenu de l'arrêt prochain.

2. À compter du **15 octobre 2025,** vous ne pourrez plus créer de nouvelles applications Kinesis Data Analytics for SQL.

3. Nous supprimerons vos candidatures à compter **du 27 janvier 2026**. Vous ne serez pas en mesure de démarrer ou d'utiliser vos applications Amazon Kinesis Data Analytics for SQL. Support ne sera plus disponible pour Amazon Kinesis Data Analytics for SQL à partir de cette date. Pour de plus amples informations, veuillez consulter [Arrêt d'Amazon Kinesis Data Analytics pour les applications SQL](discontinuation.md).

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exemples de Kinesis Data Analytics pour SQL
<a name="examples"></a>

Cette section fournit des exemples de création et d’utilisation d’applications dans Amazon Kinesis Data Analytics. Ils incluent des exemples de code et step-by-step des instructions pour vous aider à créer des applications Kinesis Data Analytics et à tester vos résultats. 

 Avant d'explorer ces exemples, nous vous recommandons de consulter [Applications Amazon Kinesis Data Analytics pour SQL : fonctionnement](how-it-works.md) et [Mise en route avec les applications Amazon Kinesis Data Analytics pour SQL](getting-started.md).

**Topics**
+ [Exemples : Transformation de données](examples-transforming.md)
+ [Exemples : Fenêtres et Regroupement](examples-window.md)
+ [Exemples : Jointures](examples-joins.md)
+ [Exemples : Apprentissage automatique (Machine Learning)](examples-machine.md)
+ [Exemples : Alertes et erreurs](examples-alerts.md)
+ [Exemples : Accélérateurs de solution](examples_solution.md)

# Exemples : Transformation de données
<a name="examples-transforming"></a>

Parfois, votre code d’application doit prétraiter les enregistrements entrants avant d’exécuter des analyses dans Amazon Kinesis Data Analytics. Différentes raisons peuvent entraîner ce prétraitement, par exemple quand des enregistrements sont non conformes aux formats d'enregistrement pris en charge, ce qui peut se traduire par des colonnes non normalisées dans les flux d'entrée intégrés à l'application. 

Cette section fournit des exemples montrant comment utiliser les fonctions de chaîne disponibles pour normaliser des données, comment extraire les informations dont vous avez besoin de colonnes de chaîne, etc. La section fait également référence à des fonctions de date et d'heure pouvant vous être utiles. 

## Prétraitement des flux avec Lambda
<a name="examples-transforming-lambda"></a>

Pour plus d'informations sur le prétraitement des flux avec AWS Lambda, consultez[Prétraitement des données à l’aide d’une fonction Lambda](lambda-preprocessing.md).

**Topics**
+ [Prétraitement des flux avec Lambda](#examples-transforming-lambda)
+ [Exemples : Transformation de valeurs de chaîne](examples-transforming-strings.md)
+ [Exemple : transformation DateTime des valeurs](app-string-datetime-manipulation.md)
+ [Exemple : Transformation de plusieurs types de données](app-tworecordtypes.md)

# Exemples : Transformation de valeurs de chaîne
<a name="examples-transforming-strings"></a>

Amazon Kinesis Data Analytics prend en charge des formats tels que JSON et CSV pour des enregistrements d’une source de streaming. Pour en savoir plus, consultez [RecordFormat](API_RecordFormat.md). Ces enregistrements sont alors mappés à un flux intégré à l'application, selon la configuration d'entrée. Pour en savoir plus, consultez [Configuration de l'entrée de l'application](how-it-works-input.md). La configuration d'entrée spécifie la façon dont des champs d'enregistrement de la source de diffusion sont mappés à des colonnes d'un flux intégré à l'application. 

Ce mappage fonctionne lorsque des enregistrements de la source de diffusion suivent les formats pris en charge, ce qui se traduit par un flux intégré à l'application avec des données normalisées. Mais, que ce passe-t-il si les données de votre source de diffusion ne sont pas conforme aux normes prises en charge ? Par exemple, que ce passe-t-il si votre source de diffusion contient des données telles que des données de flux de clics, des capteurs IoT et des journaux d'application ? 

Prenez en compte les exemples suivants :
+ La source de diffusion contient des journaux d'application : Les journaux d'application suivent le format de journal Apache standard et sont écrits dans le flux à l'aide du format JSON. 

  ```
  {
     "Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pb.gif HTTP/1.1" 304 0"
  }
  ```

  Pour plus d'informations sur le format de journal Apache standard, consultez [Fichiers journaux](https://httpd.apache.org/docs/2.4/logs.html) sur le site Web d'Apache. 

   
+ La source de streaming contient des données semi-structurées : L’exemple suivant montre deux enregistrements. La valeur du champ `Col_E_Unstructured` est une série de valeurs séparées par des virgules. Il existe cinq colonnes. Les quatre premières ont des valeurs de type chaîne et la dernière colonne contient des valeurs séparées par des virgules.

  ```
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  ```
+ Les enregistrements de votre source de streaming contiennent URLs, et vous avez besoin d'une partie du nom de domaine URL pour les analyses.

  ```
  { "referrer" : "http://www.amazon.com"}
  { "referrer" : "http://www.stackoverflow.com" }
  ```

Dans de tels cas, le processus en deux étapes suivant fonctionne généralement pour la création de flux intégrés à l'application contenant des données normalisées :

1. Configurez l'entrée d'application pour mapper le champ non structuré à une colonne de type `VARCHAR(N)` dans le flux d'entrée intégré à l'application qui est créé.

1. Dans votre code d'application, utilisez des fonctions de chaîne pour scinder cette colonne unique en plusieurs colonnes, puis enregistrez les lignes dans un autre flux intégré à l'application. Ce flux intégré à l'application créé par votre code d'application contiendra des données normalisées. Vous pouvez alors exécuter des analyses sur ce flux intégré à l'application.

Amazon Kinesis Data Analytics fournit les opérations de chaîne, les fonctions SQL standard et les extensions de la norme SQL suivante pour gérer les colonnes de chaîne : 
+ **Opérateurs de chaînes** : Les opérateurs tels que `LIKE` et `SIMILAR` sont utiles pour la comparaison de chaînes. Pour plus d’informations, consultez la section [Opérateurs de chaîne](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-operators.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*.
+ **Fonctions SQL** : Les fonctions suivantes sont utiles lors de la manipulation de chaînes individuelles. Pour plus d’informations, consultez la section [Fonctions de chaîne et de recherche](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-and-search-functions.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*.
  + `CHAR_LENGTH` – Fournit la longueur d'une chaîne. 
  + `INITCAP`– Renvoie une version convertie de la chaîne en entrée de telle sorte que le premier caractère de chaque mot délimité par un espace soit en majuscules et que tous les autres caractères soient en minuscules. 
  + `LOWER/UPPER` – Convertit une chaîne en minuscules ou en majuscules. 
  + `OVERLAY` – Remplace une partie du premier argument de chaîne (chaîne d'origine) par le deuxième argument de chaîne (chaîne de remplacement).
  + `POSITION` : Recherche une chaîne dans une autre chaîne. 
  + `REGEX_REPLACE` – Remplace une sous-chaîne par une autre sous-chaîne.
  + `SUBSTRING` : Extrait une partie d'une chaîne source à partir d'une position spécifique. 
  + `TRIM` – Supprime les instances du caractère spécifié depuis le début ou la fin de la chaîne source. 
+ **Extensions SQL** : elles sont utiles pour travailler avec des chaînes non structurées telles que les journaux et URIs. Pour plus d’informations, consultez la section [Fonctions d’analyse de journal](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-pattern-matching-functions.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*.
  + `FAST_REGEX_LOG_PARSER` : Fonctionne comme l’analyseur d’expressions régulières, mais accepte plusieurs raccourcis pour fournir des résultats plus rapides. Par exemple, l'analyseur d'expression régulière rapide s'arrête à la première correspondance trouvée (processus appelé *sémantique paresseuse*).
  + `FIXED_COLUMN_LOG_PARSE` – Analyse des champs à largeur fixe et les convertit automatiquement aux types SQL donnés.
  + `REGEX_LOG_PARSE` – Analyse une chaîne selon des modèles d'expression régulière Java par défaut.
  + `SYS_LOG_PARSE`— Analyse les entrées couramment présentes dans les journaux UNIX/Linux du système.
  + `VARIABLE_COLUMN_LOG_PARSE` – Scinde une chaîne d'entrée en champs séparés par un caractère délimiteur ou une chaîne de délimiteur.
  + `W3C_LOG_PARSE` – Peut être utilisé pour formater rapidement des journaux Apache.

Pour obtenir des exemples utilisant ces fonctions, consultez les rubriques suivantes :

**Topics**
+ [Exemple : Extraction d'une partie d'une chaîne (fonction SUBSTRING)](examples-transforming-strings-substring.md)
+ [Exemple : Remplacement d'une sous-chaîne à l'aide de Regex (fonction REGEX\$1REPLACE)](examples-transforming-strings-regexreplace.md)
+ [Exemple : Analyse de chaînes de journal en fonction d'expressions régulières (fonction REGEX\$1LOG\$1PARSE)](examples-transforming-strings-regexlogparse.md)
+ [Exemple : Analyse de journaux web (fonction W3C\$1LOG\$1PARSE)](examples-transforming-strings-w3clogparse.md)
+ [Exemple : Fractionnement de chaînes en plusieurs champs (fonction VARIABLE\$1COLUMN\$1LOG\$1PARSE)](examples-transforming-strings-variablecolumnlogparse.md)

# Exemple : Extraction d'une partie d'une chaîne (fonction SUBSTRING)
<a name="examples-transforming-strings-substring"></a>

Cet exemple utilise la fonction `SUBSTRING` pour transformer une chaîne dans Amazon Kinesis Data Analytics. La fonction `SUBSTRING` extrait une partie d'une chaîne source à partir d'une position spécifique. Pour plus d’informations, consultez la section [SUBSTRING](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-substring.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Amazon Kinesis. 

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



Vous créez ensuite une application Kinesis Data Analytics dans la console, à l’aide du flux de données Kinesis comme source de streaming. Le processus de découverte lit les exemples d'enregistrements de la source de diffusion et déduit un schéma intégré à l'application avec une colonne (`REFERRER`), comme illustré.

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec une liste des éléments URLs dans la colonne de référence.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/referrer-10.png)


Vous pouvez ensuite utiliser le code de l'application avec la fonction `SUBSTRING` pour analyser la chaîne d'URL afin de récupérer le nom de la société. Vous devez ensuite insérer les données obtenues dans un autre flux de l'application, comme indiqué ci-dessous : 



![\[Capture d'écran de la console montrant l'onglet d'analyse en temps réel avec les données résultantes dans le flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/referrer-20.png)


**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-transforming-strings-substring-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-transforming-strings-substring-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-transforming-strings-substring-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements de journaux comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Exécutez le code Python suivant pour remplir les exemples d'enregistrements de journal. Ce code simple écrit en continu le même enregistrement de journal dans le flux.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-transforming-strings-substring-2"></a>

Ensuite, créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Create application (Créer une application)**, saisissez un nom d'application, puis sélectionnez **Create application (Créer une application)**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)**.

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez l'option de création d'un rôle IAM.

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit ne comporte qu'une seule colonne.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

   

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               SUBSTRING("referrer", 12, (POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4)) 
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données. 

# Exemple : Remplacement d'une sous-chaîne à l'aide de Regex (fonction REGEX\$1REPLACE)
<a name="examples-transforming-strings-regexreplace"></a>

Cet exemple utilise la fonction `REGEX_REPLACE` pour transformer une chaîne dans Amazon Kinesis Data Analytics. `REGEX_REPLACE` remplace une sous-chaîne par une autre sous-chaîne. Pour plus d’informations, consultez la section [REGEX\$1REPLACE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-replace.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*.

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Amazon Kinesis : 

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



Vous créez ensuite une application Kinesis Data Analytics dans la console, avec le flux de données Kinesis comme source de streaming. Le processus de découverte lit les exemples d'enregistrements de la source de diffusion et déduit un schéma intégré à l'application avec une colonne (REFERRER), comme illustré.

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec la liste des URLs dans la colonne de référence.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/referrer-10.png)


Vous utilisez ensuite le code d'application avec la fonction `REGEX_REPLACE` pour convertir l'URL afin d'utiliser `https://` au lieu de `http://`. Vous insérez les données résultantes dans un autre flux de l'application, comme indiqué ci-dessous : 



![\[Capture d'écran de la console montrant la table de données résultante avec les colonnes ROWTIME, ingest_time et referrer.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_regex_replace.png)


**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-transforming-strings-regexreplace-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-transforming-strings-regexreplace-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-transforming-strings-regexreplace-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements de journaux comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Exécutez le code Python suivant pour remplir les exemples d'enregistrements de journal. Ce code simple écrit en continu le même enregistrement de journal dans le flux.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-transforming-strings-regexreplace-2"></a>

Ensuite, créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Create application (Créer une application)**, saisissez un nom d'application, puis sélectionnez **Create application (Créer une application)**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)**. 

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez l'option de création d'un rôle IAM.

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit ne comporte qu'une seule colonne.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

   

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur :

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0)
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données. 

# Exemple : Analyse de chaînes de journal en fonction d'expressions régulières (fonction REGEX\$1LOG\$1PARSE)
<a name="examples-transforming-strings-regexlogparse"></a>

Cet exemple utilise la fonction `REGEX_LOG_PARSE` pour transformer une chaîne dans Amazon Kinesis Data Analytics. `REGEX_LOG_PARSE` analyse une chaîne selon des modèles d’expression régulière Java par défaut. Pour plus d’informations, consultez la section [REGEX\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-log-parse.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*.

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Amazon Kinesis : 

```
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
...
```



Vous créez ensuite une application Kinesis Data Analytics dans la console, avec le flux de données Kinesis comme source de streaming. Le processus de découverte lit les exemples d'enregistrements de la source de diffusion et déduit un schéma intégré à l'application avec une colonne (LOGENTRY), comme illustré ci-après :

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec la colonne LOGENTRY.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_regex_log_parse_0.png)


Vous pouvez ensuite utiliser le code de l'application avec la fonction `REGEX_LOG_PARSE` pour analyser la chaîne de journal afin de récupérer les éléments de données. Vous insérez les données obtenues dans un autre flux intégré à l'application, comme indiqué dans la capture d'écran suivante : 



![\[Capture d'écran de la console montrant le tableau de données obtenu avec ROWTIME, LOGENTRY et MATCH1 des colonnes. MATCH2\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_regex_log_parse_1.png)


**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-transforming-strings-regexlogparse-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-transforming-strings-regexlogparse-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-transforming-strings-regexlogparse-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements de journaux comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Exécutez le code Python suivant pour remplir les exemples d'enregistrements de journal. Ce code simple écrit en continu le même enregistrement de journal dans le flux.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] "
           '"GET /index.php HTTP/1.1" 200 125 "-" '
           '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-transforming-strings-regexlogparse-2"></a>

Ensuite, créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Create application** et spécifiez un nom d'application.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)**. 

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez l'option de création d'un rôle IAM.

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit ne comporte qu'une seule colonne.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

   

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (logentry VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24));
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM T.LOGENTRY, T.REC.COLUMN1, T.REC.COLUMN2
          FROM 
               (SELECT STREAM LOGENTRY,
                   REGEX_LOG_PARSE(LOGENTRY, '(\w.+) (\d.+) (\w.+) (\w.+)') AS REC
                   FROM SOURCE_SQL_STREAM_001) AS T;
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données.

# Exemple : Analyse de journaux web (fonction W3C\$1LOG\$1PARSE)
<a name="examples-transforming-strings-w3clogparse"></a>

Cet exemple utilise la fonction `W3C_LOG_PARSE` pour transformer une chaîne dans Amazon Kinesis Data Analytics. Vous pouvez utiliser `W3C_LOG_PARSE` pour formater rapidement des journaux Apache. Pour plus d’informations, consultez la section [W3C\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-w3c-log-parse.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*.

Dans cet exemple, vous écrivez des enregistrements de journaux dans un flux de données Amazon Kinesis. Les exemples de journaux affichés sont les suivants :

```
{"Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pba.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:03 -0700] "GET /icons/apache_pbb.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:04 -0700] "GET /icons/apache_pbc.gif HTTP/1.1" 304 0"}
...
```



Vous créez ensuite une application Kinesis Data Analytics dans la console, avec le flux de données Kinesis comme source de streaming. Le processus de découverte lit les exemples d'enregistrements de la source de diffusion et déduit un schéma intégré à l'application avec une colonne (log), comme illustré ci-après :

![\[Capture d'écran de la console montrant l'onglet d'exemple de flux formaté avec le schéma intégré à l'application contenant la colonne log.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/log-10.png)


Ensuite, vous utilisez le code d'application avec la fonction `W3C_LOG_PARSE` pour analyser le journal et créer un autre flux intégré à l'application avec différents champs de journal dans des colonnes distinctes, comme illustré ci-après :

![\[Capture d'écran de la console montrant l'onglet d'analyse en temps réel avec le flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/log-20.png)


**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-transforming-strings-w3clogparse-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-transforming-strings-w3clogparse-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-transforming-strings-w3clogparse-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements de journaux comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Exécutez le code Python suivant pour remplir les exemples d'enregistrements de journal. Ce code simple écrit en continu le même enregistrement de journal dans le flux.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] "
           '"GET /icons/apache_pb.gif HTTP/1.1" 304 0'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-transforming-strings-w3clogparse-2"></a>

Créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Create application (Créer une application)**, saisissez un nom d'application, puis sélectionnez **Create application (Créer une application)**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)**.

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez l'option de création d'un rôle IAM.

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit ne comporte qu'une seule colonne.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

   

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
      column1 VARCHAR(16),
      column2 VARCHAR(16),
      column3 VARCHAR(16),
      column4 VARCHAR(16),
      column5 VARCHAR(16),
      column6 VARCHAR(16),
      column7 VARCHAR(16));
      
      CREATE OR REPLACE PUMP "myPUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
              SELECT STREAM
                  l.r.COLUMN1,
                  l.r.COLUMN2,
                  l.r.COLUMN3,
                  l.r.COLUMN4,
                  l.r.COLUMN5,
                  l.r.COLUMN6,
                  l.r.COLUMN7
              FROM (SELECT STREAM W3C_LOG_PARSE("log", 'COMMON')
                    FROM "SOURCE_SQL_STREAM_001") AS l(r);
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données.

# Exemple : Fractionnement de chaînes en plusieurs champs (fonction VARIABLE\$1COLUMN\$1LOG\$1PARSE)
<a name="examples-transforming-strings-variablecolumnlogparse"></a>

Cet exemple utilise la fonction `VARIABLE_COLUMN_LOG_PARSE` pour manipuler des chaînes dans Kinesis Data Analytics. `VARIABLE_COLUMN_LOG_PARSE` fractionne une chaîne d’entrée en champs séparés par un caractère délimiteur ou une chaîne de délimiteur. Pour plus d’informations, consultez la section [VARIABLE\$1COLUMN\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-variable-column-log-parse.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*.

Dans cet exemple, vous écrivez des enregistrements semi-structurés dans un flux de données Amazon Kinesis. Les exemples d'enregistrements sont les suivants :

```
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
```



Vous créez ensuite une application Kinesis Data Analytics dans la console, à l’aide du flux Kinesis comme source de streaming. Le processus de découverte lit les exemples d'enregistrements de la source de diffusion et déduit un schéma intégré à l'application avec quatre colonnes, comme illustré ci-après :

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec 4 colonnes.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/unstructured-10.png)


Vous utilisez ensuite le code d'application avec la fonction `VARIABLE_COLUMN_LOG_PARSE` pour analyser les valeurs séparées par des virgules et insérer des lignes normalisées dans un autre flux intégré à l'application, comme illustré ci-après :



![\[Capture d'écran de la console montrant l'onglet d'analyse en temps réel avec le flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/unstructured-20.png)


**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-transforming-strings-variablecolumnlogparse-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-transforming-strings-variablecolumnlogparse-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-transforming-strings-variablecolumnlogparse-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements de journaux comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Exécutez le code Python suivant pour remplir les exemples d'enregistrements de journal. Ce code simple écrit en continu le même enregistrement de journal dans le flux.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

   

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-transforming-strings-variablecolumnlogparse-2"></a>

Créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Create application (Créer une application)**, saisissez un nom d'application, puis sélectionnez **Create application (Créer une application)**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)**. 

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   1. Choisissez le flux que vous avez créé dans la section précédente.

   1. Choisissez l'option de création d'un rôle IAM.

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements utilisés pour déduire le schéma pour le flux intégré à l'application créé. Notez que le schéma déduit ne comporte qu'une seule colonne.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

   

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code de l'application, puis vérifiez les résultats :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur:

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
                  "column_A" VARCHAR(16),
                  "column_B" VARCHAR(16),
                  "column_C" VARCHAR(16),
                  "COL_1" VARCHAR(16),             
                  "COL_2" VARCHAR(16),            
                  "COL_3" VARCHAR(16));
      
      CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM  t."Col_A", t."Col_B", t."Col_C",
                        t.r."COL_1", t.r."COL_2", t.r."COL_3"
         FROM (SELECT STREAM 
                 "Col_A", "Col_B", "Col_C",
                 VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
                                           'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)',
                                           ',') AS r 
               FROM "SOURCE_SQL_STREAM_001") as t;
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données.

# Exemple : transformation DateTime des valeurs
<a name="app-string-datetime-manipulation"></a>

Amazon Kinesis Data Analytics prend en charge la conversion de colonnes en horodatages. Par exemple, vous pouvez utiliser votre propre horodatage dans le cadre d’une clause `GROUP BY` en tant qu’autre fenêtre temporelle, en plus de la colonne `ROWTIME`. Kinesis Data Analytics fournit des opérations et des fonctions SQL pour gérer les champs de date et d’heure. 
+ **Opérateurs de date et d’heure** : Vous pouvez effectuer des opérations arithmétiques sur les dates, les heures et les types de données d’intervalle. Pour plus d’informations, consultez la section [Opérateurs de date, d’horodatage et d’intervalle](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-timestamp-interval.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*.

   
+ **Fonctions SQL** : Ces fonctions incluent les fonctions suivantes. Pour plus d’informations, consultez la section [Fonctions de date et d’heure](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 
  + `EXTRACT()` : Extrait un champ d'une expression de date, d'heure, d'horodatage ou d'intervalle.
  + `CURRENT_TIME` : Renvoie l’heure à laquelle la requête s’exécute (UTC).
  + `CURRENT_DATE` : Renvoie la date à laquelle la requête s’exécute (UTC).
  + `CURRENT_TIMESTAMP` : Renvoie l'horodatage du moment où lorsque la requête s'exécute (UTC).
  + `LOCALTIME` : Renvoie l’heure en cours à laquelle la requête s’exécute telle que définie par l’environnement d’exécution de Kinesis Data Analytics (UTC).
  + `LOCALTIMESTAMP` : Renvoie l’horodatage actuel, tel que défini par l’environnement d’exécution de Kinesis Data Analytics (UTC).

     
+ **Extensions SQL** : Ces extensions incluent les extensions suivantes. Pour plus d’informations, consultez les sections [Fonctions de date et d’heure](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html) et [Fonctions de conversion de date et d’heure](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-datetime-conversion-functions.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 
  + `CURRENT_ROW_TIMESTAMP` : Renvoie un nouvel horodatage pour chaque ligne du flux. 
  + `TSDIFF` : Renvoie la différence entre deux horodatages en millisecondes.
  + `CHAR_TO_DATE` : Convertit une chaîne en date.
  + `CHAR_TO_TIME` : Convertit une chaîne en heure.
  + `CHAR_TO_TIMESTAMP` : Convertit une chaîne en horodatage.
  + `DATE_TO_CHAR` : Convertit une date en chaîne.
  + `TIME_TO_CHAR` : Convertit une heure en chaîne.
  + `TIMESTAMP_TO_CHAR` : Convertit un horodatage en chaîne.

La plupart des fonctions SQL précédentes utilisent un format pour convertir les colonnes. Le format est flexible. Par exemple, vous pouvez spécifier le format `yyyy-MM-dd hh:mm:ss` pour convertir une chaîne d'entrée `2009-09-16 03:15:24` en horodatage. Pour plus d’informations, consultez la section [Char vers Timestamp(Sys)](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-char-to-timestamp.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 

## Exemples : Transformation de dates
<a name="examples-transforming-dates"></a>

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Amazon Kinesis. 

```
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"}
{"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"}
{"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"}
...
```



Vous créez ensuite une application Kinesis Data Analytics dans la console, avec le flux Kinesis comme source de streaming. Le processus de découverte lit les exemples d'enregistrements de la source de diffusion et déduit un schéma intégré à l'application avec une deux colonnes (`EVENT_TIME` et `TICKER`), comme illustré.

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec les colonnes d'heure de l'événement et de symbole boursier.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_datetime_convert_0.png)


Vous utilisez ensuite le code d'application avec des fonctions SQL pour convertir le champ d'horodatage `EVENT_TIME` de différentes manières. Puis vous insérez les données obtenues dans un autre flux intégré à l'application, comme indiqué dans la capture d'écran suivante : 



![\[Capture d'écran de la console montrant les données obtenues dans un flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_datetime_convert_1.png)




### Étape 1 : Création d’un flux de données Kinesis
<a name="examples-transforming-dates-1"></a>

Créez un flux de données Amazon Kinesis et remplissez-le avec des enregistrements d’heure d’événement et de symbole boursier comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition.

1. Exécutez le code Python suivant pour remplir le flux avec des exemples de données. Ce code simple écrit un enregistrement en continu avec un symbole boursier aléatoire et l'horodatage actuel dans le flux.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

### Étape 2 : Création d’une application Amazon Kinesis Data Analytics
<a name="examples-transforming-dates-2"></a>

Créez une application comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Create application (Créer une application)**, saisissez un nom d'application, puis sélectionnez **Create application (Créer une application)**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)** pour vous connecter à la source.

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez de créer un rôle IAM.

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements qui sont utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit comporte deux colonnes.

   1. Choisissez **Edit schema (Modifier le schéma)**. Remplacez le **Column type (Type de colonne)** de la colonne **EVENT\$1TIME** par `TIMESTAMP`.

   1. Choisissez **Save schema and update stream samples (Enregistrer le schéma et mettre à jour les exemples de flux)**. Une fois que la console a enregistré le schéma, choisissez **Exit (Quitter)**.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

   

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          TICKER VARCHAR(4), 
          event_time TIMESTAMP, 
          five_minutes_before TIMESTAMP, 
          event_unix_timestamp BIGINT,
          event_timestamp_as_char VARCHAR(50),
          event_second INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      
      SELECT STREAM 
          TICKER, 
          EVENT_TIME,
          EVENT_TIME - INTERVAL '5' MINUTE,
          UNIX_TIMESTAMP(EVENT_TIME),
          TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
          EXTRACT(SECOND FROM EVENT_TIME) 
      FROM "SOURCE_SQL_STREAM_001"
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données. 

# Exemple : Transformation de plusieurs types de données
<a name="app-tworecordtypes"></a>

 Une exigence commune des applications d'extraction, de transformation et de chargement (ETL) est de pouvoir traiter plusieurs types d'enregistrement sur une source de diffusion. Vous pouvez créer des applications Kinesis Data Analytics pour traiter ces types de sources de streaming. Procédez comme suit :

1. D’abord, vous mappez la source de streaming à un flux d’entrée intégré à l’application, similaire à toutes les autres applications Kinesis Data Analytics.

1. Ensuite, dans votre code d'application, vous écrivez des instructions SQL pour récupérer des lignes de types spécifiques depuis le flux d'entrée intégré à l'application. Puis vous les insérez dans des flux intégrés à l'application distincts. (Vous pouvez créer des flux intégrés à l'application supplémentaires dans votre code d'application.)

Dans cet exercice, vous disposez d'une source de diffusion qui reçoit des enregistrements de deux types (`Order` et `Trade`). Il s'agit d'ordres de bourse et des transactions correspondantes. Pour chaque ordre, il peut y avoir zéro ou plusieurs transactions. Des exemples d'enregistrements de chaque type sont illustrés ci-après :

**Enregistrement d'ordre**

```
{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}
```

**Enregistrement de transaction**

```
{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}
```

Lorsque vous créez une application à l'aide de AWS Management Console, la console affiche le schéma déduit suivant pour le flux d'entrée intégré à l'application créé. Par défaut, la console nomme ce flux intégré à l'application `SOURCE_SQL_STREAM_001`.

![\[Capture d'écran de la console montrant l'exemple de flux intégré à l'application formaté.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/two-record-types-10.png)


Lorsque vous enregistrez la configuration, Amazon Kinesis Data Analytics lit en continu les données de la source de streaming et insère des lignes dans le flux intégré à l’application. Vous pouvez maintenant exécuter des analyses sur les données du flux intégré à l'application. 

Dans le code d'application de cet exemple, vous créez d'abord deux autres flux intégrés à l'application : `Order_Stream` et `Trade_Stream`. Vous pouvez alors filtrer les lignes du flux `SOURCE_SQL_STREAM_001` en fonction du type d'enregistrement et les insérer dans les flux nouvellement créées à l'aide de pompes. Pour plus d'informations sur ce modèle de codage, consultez [Code d'application](how-it-works-app-code.md).

1. Filtrer les lignes d'ordre et de transaction dans des flux intégrés à l'application distincts:

   1. Filtrez les enregistrements d'ordre dans le flux `SOURCE_SQL_STREAM_001`, puis enregistrez les ordres dans le flux `Order_Stream`.

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  order_id     integer, 
                  order_type   varchar(10),
                  ticker       varchar(4),
                  order_price  DOUBLE, 
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM oid, otype,oticker, oprice, recordtype 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Order';
      ```

   1. Filtrez les enregistrements de transaction dans le flux `SOURCE_SQL_STREAM_001`, puis enregistrez les ordres dans le flux `Trade_Stream`.

      ```
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 (trade_id     integer, 
                  order_id     integer, 
                  trade_price  DOUBLE, 
                  ticker       varchar(4),
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM tid, toid, tprice, tticker, recordtype
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Trade';
      ```

1. Vous pouvez maintenant exécuter des analyses supplémentaires sur ces flux. Dans cet exemple, vous comptez le nombre de transactions par symbole boursier dans une [fenêtre bascule](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/tumbling-window-concepts.html) d'une minute et enregistrez les résultats dans un autre flux, `DESTINATION_SQL_STREAM`. 

   ```
   --do some analytics on the Trade_Stream and Order_Stream. 
   -- To see results in console you must write to OPUT_SQL_STREAM.
   
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
               ticker  varchar(4),
               trade_count   integer
               );
   
   CREATE OR REPLACE PUMP "Output_Pump" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker, count(*) as trade_count
         FROM   "Trade_Stream"
         GROUP BY ticker,
                   FLOOR("Trade_Stream".ROWTIME TO MINUTE);
   ```

   Vous voyez le résultat, comme illustré ci-après :  
![\[Capture d'écran de la console affichant les résultats dans l'onglet de résultats SQL.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/two-record-types-20.png)

**Topics**
+ [Étape 1 : Préparation des données](tworecordtypes-prepare.md)
+ [Étape 2 : Création de l'application](tworecordtypes-create-app.md)

**Étape suivante**  
[Étape 1 : Préparation des données](tworecordtypes-prepare.md)

# Étape 1 : Préparation des données
<a name="tworecordtypes-prepare"></a>

Dans cette section, vous allez créer un flux de données Kinesis, puis remplir des enregistrements d’ordre et de transaction dans le flux. Il s'agit de votre source de diffusion pour l'application que vous créerez dans l'étape suivante.

**Topics**
+ [Étape 1.1 : Création d'une source de diffusion](#tworecordtypes-prepare-create-stream)
+ [Étape 1.2 : Remplissage de la source de diffusion](#tworecordtypes-prepare-populate-stream)

## Étape 1.1 : Création d'une source de diffusion
<a name="tworecordtypes-prepare-create-stream"></a>

Vous pouvez créer un flux de données Kinesis à l’aide de la console ou de l’ AWS CLI. L'exemple utilise `OrdersAndTradesStream` comme nom de flux. 
+ **Utilisation de la console** [: connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis) Choisissez **Data Streams**, puis créez un flux de données avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.
+ **À l'aide** de AWS CLI— Utilisez la `create-stream` AWS CLI commande Kinesis suivante pour créer le flux :

  ```
  $ aws kinesis create-stream \
  --stream-name OrdersAndTradesStream \
  --shard-count 1 \
  --region us-east-1 \
  --profile adminuser
  ```

## Étape 1.2 : Remplissage de la source de diffusion
<a name="tworecordtypes-prepare-populate-stream"></a>

Exécutez le script Python suivant pour remplir des exemples d'enregistrements dans `OrdersAndTradesStream`. Si vous avez créé le flux avec un autre nom, mettez à jour le code Python en conséquence. 

1. Installez Python et `pip`.

   Pour plus d'informations sur l'installation de Python, consultez le site web [Python](https://www.python.org/). 

   Vous pouvez installer des dépendances à l'aide de pip. Pour plus d'informations sur l'installation de pip, consultez [Installation](https://pip.pypa.io/en/stable/installing/) sur le site web de pip.

1. Exécutez le code Python suivant. La commande `put-record` dans le code écrit les enregistrements JSON dans le flux.

   ```
    
   import json
   import random
   import boto3
   
   STREAM_NAME = "OrdersAndTradesStream"
   PARTITION_KEY = "partition_key"
   
   
   def get_order(order_id, ticker):
       return {
           "RecordType": "Order",
           "Oid": order_id,
           "Oticker": ticker,
           "Oprice": random.randint(500, 10000),
           "Otype": "Sell",
       }
   
   
   def get_trade(order_id, trade_id, ticker):
       return {
           "RecordType": "Trade",
           "Tid": trade_id,
           "Toid": order_id,
           "Tticker": ticker,
           "Tprice": random.randint(0, 3000),
       }
   
   
   def generate(stream_name, kinesis_client):
       order_id = 1
       while True:
           ticker = random.choice(["AAAA", "BBBB", "CCCC"])
           order = get_order(order_id, ticker)
           print(order)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY
           )
           for trade_id in range(1, random.randint(0, 6)):
               trade = get_trade(order_id, trade_id, ticker)
               print(trade)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(trade),
                   PartitionKey=PARTITION_KEY,
               )
           order_id += 1
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**Étape suivante**  
 [Étape 2 : Création de l'application](tworecordtypes-create-app.md)

# Étape 2 : Création de l'application
<a name="tworecordtypes-create-app"></a>

Dans cette section, vous allez créer une application Kinesis Data Analytics. Vous mettez ensuite à jour l'application en ajoutant une configuration d'entrée qui mappe la source de diffusion que vous avez créée dans la section précédente à un flux d'entrée intégré à l'application. 

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Créer une application**. Cet exemple utilise le nom de l'application **ProcessMultipleRecordTypes**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)** pour vous connecter à la source. 

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   1. Choisissez le flux que vous avez créé dans [Étape 1 : Préparation des données](tworecordtypes-prepare.md). 

   1. Choisissez de créer un rôle IAM.

   1. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements qui sont utilisés pour déduire le schéma pour le flux intégré à l'application créé.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

1. Dans le hub d'applications, choisissez **Go to SQL editor**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code de l'application et vérifiez les résultats :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  "order_id"     integer, 
                  "order_type"   varchar(10),
                  "ticker"       varchar(4),
                  "order_price"  DOUBLE, 
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Order';
      --********************************************
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 ("trade_id"     integer, 
                  "order_id"     integer, 
                  "trade_price"  DOUBLE, 
                  "ticker"       varchar(4),
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType"
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Trade';
      --*****************************************************************
      --do some analytics on the Trade_Stream and Order_Stream. 
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                  "ticker"  varchar(4),
                  "trade_count"   integer
                  );
      
      CREATE OR REPLACE PUMP "Output_Pump" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM "ticker", count(*) as trade_count
            FROM   "Trade_Stream"
            GROUP BY "ticker",
                      FLOOR("Trade_Stream".ROWTIME TO MINUTE);
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. Choisissez l'onglet **Real-time analytics (Analyse en temps réel)** pour voir tous les flux intégrés à l'application que l'application a créés et vérifier les données. 

   

**Étape suivante**  
Vous pouvez configurer la sortie de l'application pour conserver les résultats vers une destination externe, telle qu'un autre flux Kinesis ou un flux de diffusion de données Firehose. 

# Exemples : Fenêtres et Regroupement
<a name="examples-window"></a>

Cette section fournit des exemples d’applications Amazon Kinesis Data Analytics utilisant des requêtes à fenêtres et des requêtes de regroupement. (Pour plus d’informations, consultez [Requêtes à fenêtres](windowed-sql.md).) Chaque exemple fournit des step-by-step instructions et un exemple de code pour configurer l'application Kinesis Data Analytics. 

**Topics**
+ [Exemple : Stagger Window](examples-window-stagger.md)
+ [Exemple : fenêtre bascule utilisant ROWTIME](examples-window-tumbling-rowtime.md)
+ [Exemple : fenêtre bascule utilisant un horodatage d'événement](examples-window-tumbling-event.md)
+ [Exemple : extraction des valeurs les plus fréquentes (TOP\$1K\$1ITEMS\$1TUMBLING)](examples-window-topkitems.md)
+ [Exemple : regroupement d'une partie des résultats à partir d'une requête](examples-window-partialresults.md)

# Exemple : Stagger Window
<a name="examples-window-stagger"></a>

Lorsqu'une requête de fenêtre traite des fenêtres distinctes pour chaque clé de partition unique, à compter du moment où les données avec la clé correspondante arrivent, la fenêtre est appelée *fenêtre stagger*. Pour en savoir plus, consultez [Stagger Windows](stagger-window-concepts.md). Cet exemple Amazon Kinesis Data Analytics exemple utilise les colonnes EVENT\$1TIME et TICKER pour créer des fenêtres stagger. Le flux source contient des groupes de six enregistrements avec des valeurs EVENT\$1TIME et TICKER identiques qui se produisent au sein d'une période d'une minute, mais pas nécessairement avec la même valeur de minute (par exemple, `18:41:xx`).

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Kinesis aux heures suivantes. Le script n'écrit pas les temps dans le flux, mais le temps auquel l'enregistrement est intégré par l'application est écrit dans le champ `ROWTIME` :

```
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:30
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:40
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:50
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:00
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:10
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:21
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:31
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:41
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:51
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:01
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:11
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:21
...
```



Vous créez ensuite une application Kinesis Data Analytics dans AWS Management Console le, avec le flux de données Kinesis comme source de diffusion. Le processus de découverte lit les exemples d'enregistrements sur la source de streaming et en déduit un schéma intégré à l'application avec deux colonnes (`EVENT_TIME` et `TICKER`), comme illustré ci-dessous.

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec les colonnes de prix et de symbole boursier.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_stagger_schema.png)


Vous utilisez le code de l'application à l'aide de la fonction `COUNT` pour créer un regroupement des données avec fenêtres. Vous insérez ensuite les données obtenues dans un autre flux intégré à l'application, comme indiqué dans la capture d'écran suivante : 



![\[Capture d'écran de la console montrant les données obtenues dans un flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_stagger.png)


Dans la procédure suivante, vous créez une application Kinesis Data Analytics qui regroupe les valeurs dans le flux d’entrée dans une fenêtre stagger basée sur EVENT\$1TIME et TICKER.

**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-stagger-window-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-stagger-window-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-stagger-window-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Pour écrire des enregistrements sur un flux de données Kinesis dans un environnement de production, nous vous recommandons d'utiliser [Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) ou les [API de flux de données Kinesis](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html). Pour plus de simplicité, cet exemple utilise le script Python ci-dessous pour générer des enregistrements. Exécutez le code pour remplir les exemples d'enregistrements du symbole boursier. Ce code simple écrit en continu un groupe de six enregistrements avec les mêmes valeurs `EVENT_TIME` et symbole boursier aléatoires dans le flux, durant une minute. Laissez le script s'exécuter pour pouvoir générer le schéma d'application lors d'une étape ultérieure.

   ```
    
   import datetime
   import json
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
       return {
           "EVENT_TIME": event_time.isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           # Send six records, ten seconds apart, with the same event time and ticker
           for _ in range(6):
               print(data)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(data),
                   PartitionKey="partitionkey",
               )
               time.sleep(10)
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-stagger-window-2"></a>

Créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Create application (Créer une application)**, saisissez un nom d'application, puis sélectionnez **Create application (Créer une application)**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)** pour vous connecter à la source. 

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements qui sont utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit comporte deux colonnes.

   1. Choisissez **Edit schema (Modifier le schéma)**. Remplacez le **Column type (Type de colonne)** de la colonne **EVENT\$1TIME** par `TIMESTAMP`.

   1. Choisissez **Save schema and update stream samples (Enregistrer le schéma et mettre à jour les exemples de flux)**. Une fois que la console a enregistré le schéma, choisissez **Exit (Quitter)**.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          event_time TIMESTAMP,
          ticker_symbol    VARCHAR(4),
          ticker_count     INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM 
              EVENT_TIME, 
              TICKER,
              COUNT(TICKER) AS ticker_count
          FROM "SOURCE_SQL_STREAM_001"
          WINDOWED BY STAGGER (
                  PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. 

      Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données. 

# Exemple : fenêtre bascule utilisant ROWTIME
<a name="examples-window-tumbling-rowtime"></a>

Lorsqu'une requête à fenêtres traite chaque fenêtre sans chevauchement, la fenêtre est appelée *fenêtre bascule*. Pour en savoir plus, consultez [Fenêtres bascules (regroupements à l'aide de GROUP BY)](tumbling-window-concepts.md). Cet exemple Amazon Kinesis Data Analytics utilise la colonne `ROWTIME` pour créer des fenêtres bascules. La colonne `ROWTIME` représente le moment où l'enregistrement a été lu par l'application.

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Kinesis. 

```
{"TICKER": "TBV", "PRICE": 33.11}
{"TICKER": "INTC", "PRICE": 62.04}
{"TICKER": "MSFT", "PRICE": 40.97}
{"TICKER": "AMZN", "PRICE": 27.9}
...
```



Vous créez ensuite une application Kinesis Data Analytics dans AWS Management Console le, avec le flux de données Kinesis comme source de diffusion. Le processus de découverte lit les exemples d'enregistrements sur la source de streaming et en déduit un schéma intégré à l'application avec deux colonnes (`TICKER` et `PRICE`), comme illustré ci-dessous.

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec les colonnes de prix et de symbole boursier.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime_schema.png)


Vous utilisez le code de l'application à l'aide du `MIN` et des fonctions `MAX` pour créer un regroupement des données avec fenêtres. Vous insérez ensuite les données obtenues dans un autre flux intégré à l'application, comme indiqué dans la capture d'écran suivante : 



![\[Capture d'écran de la console montrant les données obtenues dans un flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime.png)


Dans la procédure suivante, vous créez une application Kinesis Data Analytics qui regroupe les valeurs dans le flux d’entrée dans une fenêtre bascule basée sur ROWTIME.

**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-tumbling-window-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-tumbling-window-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-tumbling-window-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Pour écrire des enregistrements sur un flux de données Kinesis dans un environnement de production, nous vous recommandons d'utiliser [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) ou les [API de flux de données Kinesis](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html). Pour plus de simplicité, cet exemple utilise le script Python ci-dessous pour générer des enregistrements. Exécutez le code pour remplir les exemples d'enregistrements du symbole boursier. Ce code simple écrit de façon continue un enregistrement du symbole boursier aléatoire dans le flux. Laissez le script s'exécuter pour pouvoir générer le schéma d'application lors d'une étape ultérieure.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-tumbling-window-2"></a>

Créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Créer une application**, saisissez un nom d'application, puis sélectionnez **Créer une application**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)** pour vous connecter à la source. 

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements qui sont utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit comporte deux colonnes.

   1. Choisissez **Save schema and update stream samples (Enregistrer le schéma et mettre à jour les exemples de flux)**. Une fois que la console a enregistré le schéma, choisissez **Exit (Quitter)**.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE)
              FROM "SOURCE_SQL_STREAM_001"
              GROUP BY TICKER, 
                  STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. 

      Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données. 

# Exemple : fenêtre bascule utilisant un horodatage d'événement
<a name="examples-window-tumbling-event"></a>

Lorsqu'une requête à fenêtres traite chaque fenêtre sans chevauchement, la fenêtre est appelée *fenêtre bascule*. Pour en savoir plus, consultez [Fenêtres bascules (regroupements à l'aide de GROUP BY)](tumbling-window-concepts.md). Cet exemple Amazon Kinesis Data Analytics illustre une fenêtre bascule qui utilise un horodatage d’événement, c’est-à-dire un horodatage créé par l’utilisateur et qui est compris dans les données de streaming. Il utilise cette approche plutôt que ROWTIME, qui est un horodatage que Kinesis Data Analytics crée lorsque l’application reçoit l’enregistrement. Vous devrez utiliser un horodatage d'événement dans les données de streaming si vous souhaitez créer un regroupement basé sur le moment où un événement s'est produit, plutôt que sur le moment où il a été reçu par l'application. Dans cet exemple, la valeur `ROWTIME` déclenche le regroupement chaque minute, et les enregistrements sont regroupés à la fois par `ROWTIME` et par heure d'événement. 

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Amazon Kinesis. La valeur `EVENT_TIME` est fixée à 5 secondes dans le passé afin de simuler un délai de transmission et de traitement susceptible de créer un retard entre le moment où l’événement s’est produit et le moment où l’enregistrement est transféré vers Kinesis Data Analytics.

```
{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65}
{"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61}
{"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48}
{"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64}
...
```



Vous créez ensuite une application Kinesis Data Analytics dans AWS Management Console le, avec le flux de données Kinesis comme source de diffusion. Le processus de découverte lit les exemples d'enregistrements sur la source de streaming et en déduit un schéma intégré à l'application avec trois colonnes (`EVENT_TIME`, `TICKER`, et `PRICE`) comme illustré ci-dessous.

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec les colonnes de l'heure de l'événement, de symbole boursier et de prix.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_tumbling_event_schema.png)


Vous utilisez le code de l'application à l'aide du `MIN` et des fonctions `MAX` pour créer un regroupement des données avec fenêtres. Vous insérez ensuite les données obtenues dans un autre flux intégré à l'application, comme indiqué dans la capture d'écran suivante : 



![\[Capture d'écran de la console montrant les données obtenues dans un flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_tumbling_event.png)


Dans la procédure suivante, vous créez une application Kinesis Data Analytics qui regroupe les valeurs dans le flux d’entrée dans une fenêtre bascule basée sur une heure d’événement.

**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-window-tumbling-event-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-window-tumbling-event-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-window-tumbling-event-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Pour écrire des enregistrements sur un flux de données Kinesis dans un environnement de production, nous vous recommandons d'utiliser [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) ou les [API de flux de données Kinesis](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html). Pour plus de simplicité, cet exemple utilise le script Python ci-dessous pour générer des enregistrements. Exécutez le code pour remplir les exemples d'enregistrements du symbole boursier. Ce code simple écrit de façon continue un enregistrement du symbole boursier aléatoire dans le flux. Laissez le script s'exécuter pour pouvoir générer le schéma d'application lors d'une étape ultérieure.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-window-tumbling-event-2"></a>

Créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Créer une application**, saisissez un nom d'application, puis sélectionnez **Créer une application**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)** pour vous connecter à la source. 

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements qui sont utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit comporte trois colonnes.

   1. Choisissez **Edit schema (Modifier le schéma)**. Remplacez le **Column type (Type de colonne)** de la colonne **EVENT\$1TIME** par `TIMESTAMP`.

   1. Choisissez **Save schema and update stream samples (Enregistrer le schéma et mettre à jour les exemples de flux)**. Une fois que la console a enregistré le schéma, choisissez **Exit (Quitter)**.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND),
              TICKER,
               MIN(PRICE) AS MIN_PRICE,
               MAX(PRICE) AS MAX_PRICE
          FROM    "SOURCE_SQL_STREAM_001"
          GROUP BY TICKER, 
                   STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), 
                   STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. 

      Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données. 

# Exemple : extraction des valeurs les plus fréquentes (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="examples-window-topkitems"></a>

Cet exemple Amazon Kinesis Data Analytics montre comment utiliser la fonction `TOP_K_ITEMS_TUMBLING` pour extraire les valeurs les plus fréquentes dans une fenêtre bascule. Pour plus d’informations, consultez la section [Fonction `TOP_K_ITEMS_TUMBLING`](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/top-k.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 

La fonction `TOP_K_ITEMS_TUMBLING` est utile pour regrouper des dizaines ou des centaines de milliers de clés et si vous souhaitez réduire l'utilisation de vos ressources. La fonction produit le même résultat qu'un regroupement avec les clauses `GROUP BY` et `ORDER BY` .

Dans cet exemple, vous écrivez les enregistrements suivants dans un flux de données Amazon Kinesis : 

```
{"TICKER": "TBV"}
{"TICKER": "INTC"}
{"TICKER": "MSFT"}
{"TICKER": "AMZN"}
...
```



Vous créez ensuite une application Kinesis Data Analytics dans AWS Management Console le, avec le flux de données Kinesis comme source de diffusion. Le processus de découverte lit les exemples d'enregistrements de la source de streaming et en déduit un schéma intégré à l'application avec une colonne (`TICKER`), comme illustré ci-dessous.

![\[Capture d'écran de la console montrant le schéma intégré à l'application avec une colonne de symbole boursier.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_topk_schema.png)


Vous utilisez le code de l'application à l'aide de la fonction `TOP_K_VALUES_TUMBLING` pour créer un regroupement des données avec fenêtres. Vous insérez ensuite les données obtenues dans un autre flux intégré à l'application, comme indiqué dans la capture d'écran suivante : 



![\[Capture d'écran de la console montrant les données obtenues dans un flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_topk.png)


Dans la procédure suivante, vous créez une application Kinesis Data Analytics qui extrait les valeurs les plus fréquentes dans le flux d’entrée.

**Topics**
+ [Étape 1 : Création d’un flux de données Kinesis](#examples-window-topkitems-1)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](#examples-window-topkitems-2)

## Étape 1 : Création d’un flux de données Kinesis
<a name="examples-window-topkitems-1"></a>

Créez un flux de données Amazon Kinesis et remplissez les enregistrements comme suit :

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une seule partition. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

1. Pour écrire des enregistrements sur un flux de données Kinesis dans un environnement de production, nous vous recommandons d'utiliser [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) ou les [API de flux de données Kinesis](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html). Pour plus de simplicité, cet exemple utilise le script Python ci-dessous pour générer des enregistrements. Exécutez le code pour remplir les exemples d'enregistrements du symbole boursier. Ce code simple écrit de façon continue un enregistrement du symbole boursier aléatoire dans le flux. Laissez le script s'exécuter pour pouvoir générer le schéma d'application lors d'une étape ultérieure.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Étape 2 : Création d’une application Kinesis Data Analytics
<a name="examples-window-topkitems-2"></a>

Créez une application Kinesis Data Analytics comme suit :

1. Ouvrez le service géré pour la console Apache Flink à l'adresse [ https://console.aws.amazon.com/kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Choisissez **Create application (Créer une application)**, saisissez un nom d'application, puis sélectionnez **Create application (Créer une application)**.

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)** pour vous connecter à la source. 

1. Sur la page **Connect to source (Se connecter à la source)**, procédez comme suit :

   

   1. Choisissez le flux que vous avez créé dans la section précédente. 

   1. Choisissez **Discover schema (Découvrir le schéma)**. Attendez que la console affiche le schéma déduit et les exemples d'enregistrements qui sont utilisés pour déduire le schéma pour le flux intégré à l'application créé. Le schéma déduit comporte une colonne.

   1. Choisissez **Save schema and update stream samples (Enregistrer le schéma et mettre à jour les exemples de flux)**. Une fois que la console a enregistré le schéma, choisissez **Exit (Quitter)**.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

1. Sur la page de détails de l'application, choisissez **Go to SQL editor (Accéder à l'éditeur SQL)**. Pour lancer l'application, choisissez **Yes, start application (Oui, démarrer l'application)** dans la boîte de dialogue qui s'affiche.

1. Dans l'éditeur SQL, écrivez le code d'application et vérifiez les résultats comme suit :

   1. Copiez le code d'application suivant et collez-le dans l'éditeur:

      ```
      CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
        "TICKER" VARCHAR(4), 
        "MOST_FREQUENT_VALUES" BIGINT
      );
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
          INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM * 
              FROM TABLE (TOP_K_ITEMS_TUMBLING(
                  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
                  'TICKER',         -- name of column in single quotes
                  5,                       -- number of the most frequently occurring values
                  60                       -- tumbling window size in seconds
                  )
              );
      ```

   1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**. 

      Dans l'onglet **Real-time analytics (Analyse en temps réel)**, vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données. 

# Exemple : regroupement d'une partie des résultats à partir d'une requête
<a name="examples-window-partialresults"></a>

Si un flux de données Amazon Kinesis contient des enregistrements qui disposent d’une heure d’événement ne correspondant pas exactement à l’heure de l’intégration, une sélection de résultats dans une fenêtre bascule contiendra, dans la fenêtre, les enregistrements qui sont arrivés mais qui ne se sont pas nécessairement produits. Dans ce cas, la fenêtre bascule contient uniquement une partie des résultats que vous souhaitez. Vous pouvez utiliser plusieurs approches pour résoudre ce problème :
+ Utilisez uniquement une fenêtre bascule et regroupez une partie des résultats dans le traitement ultérieur via une base de données ou un entrepôt de données utilisant des opérations de « mises à jour/insertions ». Cette approche est efficace dans le traitement d'une application. Elle gère les données tardives indéfiniment pour les opérateurs d'agrégation (`sum`, `min`, `max`, etc.). L'inconvénient de cette approche est que vous devez développer et maintenir une logique d'application supplémentaire dans la couche de base de données.
+ Utilisez une fenêtre bascule et défilante qui produit une partie des résultats tôt, mais continue également à produire des résultats complets au cours de la période de la fenêtre défilante. Cette approche gère les données tardives grâce à un remplacement plutôt qu'une « mise à jour/insertion » afin qu'aucune autre logique d'application n'ait besoin d'être ajoutée dans la couche de base de données. L'inconvénient de cette approche est qu'elle utilise davantage d'unités de traitement Kinesis KPUs () et qu'elle produit toujours deux résultats, ce qui peut ne pas fonctionner dans certains cas d'utilisation.

Pour plus d'informations sur les fenêtres bascules et défilantes, consultez [Requêtes à fenêtres](windowed-sql.md).

Dans la procédure suivante, le regroupement de la fenêtre bascule génère deux résultats partiels (envoyés au `CALC_COUNT_SQL_STREAM` dans le flux intégré à l'application) qui doivent être combinés pour produire un résultat final. L'application génère ensuite un second regroupement (envoyé au `DESTINATION_SQL_STREAM` dans le flux intégré à l'application) qui combine les deux résultats partiels.

**Pour créer une application qui regroupe une partie des résultats à l'aide d'une heure d'évènement**

1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Analytics (Analyse des données)** dans le volet de navigation. Créez une application Kinesis Data Analytics comme décrit dans le didacticiel [Mise en route avec les applications Amazon Kinesis Data Analytics pour SQL](getting-started.md).

1. Dans l'éditeur SQL, remplacez le code d'application par les éléments suivants : 

   ```
   CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);
   	            
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);            
   	
   CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
       INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER_SYMBOL",
           STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
           COUNT(*) AS "TickerCount"
       FROM "SOURCE_SQL_STREAM_001"
       GROUP BY
           STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE),
           STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
           TICKER_SYMBOL;
   
   CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER",
           "TRADETIME",
           SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
       FROM "CALC_COUNT_SQL_STREAM"
       WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);
   ```

   L'instruction `SELECT` du code d'application filtre les lignes de `SOURCE_SQL_STREAM_001` pour obtenir les changements de cours d'action supérieurs à 1 % et insère ces lignes dans un autre flux intégré à l'application, `CHANGE_STREAM`, à l'aide d'une pompe. 

1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**.

La première pompe génère un flux vers `CALC_COUNT_SQL_STREAM` similaire à ce qui suit. Notez que l'ensemble des résultats est incomplet : 

![\[Capture d'écran de la console montrant une partie des résultats.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_partial_0.png)


La deuxième pompe génère ensuite un flux vers `DESTINATION_SQL_STREAM` contenant l'ensemble des résultats : 

![\[Capture d'écran de la console montrant l'ensemble des résultats.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex_partial_1.png)


# Exemples : Jointures
<a name="examples-joins"></a>

Cette section fournit des exemples d’applications Kinesis Data Analytics qui utilisent des requêtes de jointure. Chaque exemple fournit des step-by-step instructions pour configurer et tester votre application Kinesis Data Analytics. 

**Topics**
+ [Exemple : ajout de données de référence à une application Kinesis Data Analytics](app-add-reference-data.md)

# Exemple : ajout de données de référence à une application Kinesis Data Analytics
<a name="app-add-reference-data"></a>

Dans cet exercice, vous allez ajouter des données de référence à une application Kinesis Data Analytics existante. Pour plus d'informations sur les données de référence, consultez le rubrique suivante :
+ [Applications Amazon Kinesis Data Analytics pour SQL : fonctionnement](how-it-works.md)
+ [Configuration de l'entrée de l'application](how-it-works-input.md)

Dans cet exercice, vous allez ajouter des données de référence à l’application que vous avez créée dans l’exercice de [mise en route](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) de Kinesis Data Analytics. Les données de référence fournissent le nom de la société pour chaque symbole boursier, par exemple :

```
Ticker, Company
AMZN,Amazon
ASD, SomeCompanyA
MMB, SomeCompanyB
WAS,  SomeCompanyC
```

Suivez d'abord les étapes de l'exercice de [mise en route](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) pour créer une application de démarrage. Puis suivez ces étapes pour configurer et ajouter des données de référence à votre application :

1. **Préparation des données**
   + Stockez les données de référence précédentes sous forme d’objet dans Amazon Simple Storage Service (Amazon S3).
   + Créez un rôle IAM pouvant être assumé par Kinesis Data Analytics pour lire l’objet Amazon S3 en votre nom.

1. **Ajoutez la source des données de référence à votre application. **

   Kinesis Data Analytics lit l’objet Amazon S3 et crée une table de référence intégrée à l’application que vous pouvez interroger dans votre code d’application.

1. **Testez le code.**

   Dans votre code d'application, vous allez écrire une requête de jointure pour joindre le flux intégré à l'application à la table de référence intégrée à l'application afin d'obtenir le nom de la société pour chaque symbole boursier.

**Topics**
+ [Étape 1 : Préparation](#add-refdata-prepare)
+ [Étape 2 : Ajout de la source de données de référence à la configuration d’application](#add-refdata-create-iamrole)
+ [Étape 3 : Test : Interrogation de la table de référence intégrée à l'application](#add-refdata-test)

## Étape 1 : Préparation
<a name="add-refdata-prepare"></a>

Dans cette section, vous stockez des exemples de données de référence en tant qu’objet dans un compartiment Amazon S3. Vous créez également un rôle IAM pouvant être assumé par Kinesis Data Analytics pour lire l’objet en votre nom.

### Stockage des données de référence en tant qu’objet Amazon S3
<a name="prepare-create-s3object"></a>

Dans cette étape, vous stockez les exemples de données de référence en tant qu’objet Amazon S3.

1. Ouvrez un éditeur de texte, ajoutez les données suivantes et enregistrez le fichier sous le nom `TickerReference.csv`. 

   ```
   Ticker, Company
   AMZN,Amazon
   ASD, SomeCompanyA
   MMB, SomeCompanyB
   WAS,  SomeCompanyC
   ```

   

1. Chargez le fichier `TickerReference.csv` dans votre compartiment S3. Pour en savoir plus, consultez [Téléchargement d’objets dans Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UploadingObjectsintoAmazonS3.html) dans le *Guide de l’utilisateur Amazon Simple Storage Service*.

### Créer un rôle IAM
<a name="prepare-create-iamrole"></a>

Créez ensuite un rôle IAM que Kinesis Data Analytics peut assumer et lisez l’objet Amazon S3.

1. Dans Gestion des identités et des accès AWS (IAM), créez un rôle IAM nommé. **KinesisAnalytics-ReadS3Object** Pour créer le rôle, suivez les instructions de [Création d’un rôle pour déléguer des autorisations à un service Amazon (AWS Management Console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html#roles-creatingrole-service-console) dans le *Guide de l’utilisateur IAM*.

   Dans la console IAM, spécifiez les valeurs suivantes :
   + Dans **Sélectionner le type de rôle**, choisissez **AWS Lambda**. Après avoir créé le rôle, vous allez modifier la politique de confiance pour autoriser Kinesis Data Analytics à ( AWS Lambda ne pas) assumer le rôle.
   + N'attachez pas de stratégie sur la page **Attach Policy**.

1. Mettez à jour les stratégies de rôle IAM :

   

   1. Dans la console IAM, choisissez le rôle que vous avez créé.

   1. Dans l’onglet **Relations d’approbation**, mettez à jour la stratégie d’approbation pour accorder à Kinesis Data Analytics des autorisations pour assumer le rôle. La stratégie d'approbation est présentée ci-après :

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Principal": {
              "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
          }
        ]
      }
      ```

------

      

   1. **Dans l'onglet **Autorisations**, joignez une politique gérée par Amazon appelée AmazonS3. ReadOnlyAccess** Vous accordez ainsi au rôle les autorisations pour lire un objet Amazon S3. Cette stratégie est présentée ci-après :

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "s3:Get*",
              "s3:List*"
            ],
            "Resource": "*"
          }
        ]
      }
      ```

------

## Étape 2 : Ajout de la source de données de référence à la configuration d’application
<a name="add-refdata-create-iamrole"></a>

Dans cette étape, vous ajoutez une source de données de référence à la configuration de votre application. Pour commencer, vous avez besoin des informations suivantes : 
+ Le nom de votre compartiment S3 et le nom de la clé d’objet
+ L’Amazon Resource Name (ARN) du rôle IAM

1. Dans la page principale de l’application, choisissez **Connect reference data (Connecter des données de référence)**.

1. Sur la page **Connecter une source de données de référence**, choisissez le compartiment Amazon S3 contenant vos objets de données de référence, puis saisissez le nom de clé de l’objet.

1. Saisissez **CompanyName** comme **nom du tableau de référence intégré à l'application**.

1. Dans la section **Accès aux ressources choisies**, choisissez **Choisissez parmi les rôles IAM que Kinesis Analytics peut** assumer, puis choisissez le rôle IAM -Reads3Object que vous avez créé dans **KinesisAnalyticsla** section précédente.

1. Choisissez **Discover schema (Découvrir le schéma)**. La console détecte deux colonnes dans les données de référence.

1. Choisissez **Enregistrer et fermer**.

## Étape 3 : Test : Interrogation de la table de référence intégrée à l'application
<a name="add-refdata-test"></a>

Vous pouvez maintenant interroger la table de référence intégrée à l'application, `CompanyName`. Vous pouvez utiliser les informations de référence pour enrichir votre application en joignant les données de prix des actions à la table de référence. Le résultat indique le nom de l'entreprise.

1. Remplacez le code de votre application par ce qui suit. La requête joint le flux d'entrée intégré à l'application à la table de référence intégrée à l'application. Le code d'application écrit les résultats dans un autre flux intégré à l'application, `DESTINATION_SQL_STREAM`. 

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE);
   
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      SELECT STREAM ticker_symbol, "c"."Company", sector, change, price
      FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c"
      ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
   ```

1. Vérifiez que le résultat de l'application apparaît dans l'**SQLResults**onglet. Assurez-vous que certaines des lignes affichent des noms de société (vos exemples de données de référence n'ont pas tous les noms de société).

# Exemples : Apprentissage automatique (Machine Learning)
<a name="examples-machine"></a>

Cette section fournit des exemples d’applications Amazon Kinesis Data Analytics qui utilisent des requêtes d’apprentissage automatique (Machine Learning). Les requêtes d'apprentissage automatique (Machine Learning) effectuent des analyses complexes sur les données en s'appuyant sur l'historique des données du flux pour rechercher des modèles inhabituels. Les exemples fournissent des step-by-step instructions pour configurer et tester votre application Kinesis Data Analytics. 

**Topics**
+ [Exemple : Détection d'anomalies de données sur un flux (fonction RANDOM\$1CUT\$1FOREST)](app-anomaly-detection.md)
+ [Exemple : détection d'anomalies de données et message d'explication (fonction RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION)](app-anomaly-detection-with-explanation.md)
+ [Exemple : Détection des points chauds sur un flux (fonction HOTSPOTS)](app-hotspots-detection.md)

# Exemple : Détection d'anomalies de données sur un flux (fonction RANDOM\$1CUT\$1FOREST)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics fournit une fonction (`RANDOM_CUT_FOREST`) qui peut attribuer un score d’anomalie à chaque enregistrement en fonction de valeurs dans les colonnes numériques. Pour plus d’informations, consultez la section [Fonction `RANDOM_CUT_FOREST`](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 

Dans cet exercice, vous allez écrire du code d'application pour attribuer un score d'anomalie à des enregistrements sur la source de diffusion de votre application. Pour configurer l'application, procédez comme suit :

1. **Configurer une source de streaming** : vous configurez un flux de données Kinesis et écrivez des échantillons de données `heartRate`, comme illustré ci-après :

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   La procédure fournit un script Python qui vous permet de remplir le flux. Les valeurs `heartRate` sont générées de façon aléatoire, avec 99 % des enregistrements ayant des valeurs `heartRate` comprises entre 60 et 100, et seulement 1 % ayant des valeurs `heartRate` entre 150 et 200. Les enregistrements avec des valeurs `heartRate` entre 150 et 200 sont donc des anomalies. 

1. **Configurer l’entrée** : à l’aide de la console, vous créez une application Kinesis Data Analytics et configurez l’entrée de l’application en mappant la source de streaming à un flux intégré à l’application (`SOURCE_SQL_STREAM_001`). Lorsque l’application démarre, Kinesis Data Analytics lit en continu la source de streaming et insère des enregistrements dans le flux intégré à l’application.

1. **Spécifiez le code d'application** – L'exemple utilise le code d'application suivant :

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   Le code lit les lignes dans le flux `SOURCE_SQL_STREAM_001`, attribue un score d'anomalie et écrit les lignes résultantes dans une autre flux intégré à l'application (`TEMP_STREAM`). Le code d'application trie ensuite les enregistrements dans le flux `TEMP_STREAM` et enregistre les résultats dans un autre flux intégré à l'application (`DESTINATION_SQL_STREAM`). Vous utilisez des pompes pour insérer des lignes dans les flux intégrés à l'application. Pour de plus amples informations, veuillez consulter [Flux et pompes intégrés à l'application](streams-pumps.md).

1. **Configurer la sortie** : vous configurez la sortie de l’application pour conserver les données du flux `DESTINATION_SQL_STREAM` dans une destination externe qui est un autre flux de données Kinesis. Les opérations visant à examiner les scores d'anomalie qui sont attribués à chaque enregistrement et à déterminer quel score indique une anomalie (et que vous avez besoin d'être alerté) sont externes à l'application. Vous pouvez utiliser une AWS Lambda fonction pour traiter ces scores d'anomalies et configurer les alertes. 

L’exercice utilise la région USA Est (Virginie du Nord) (`us-east-1`) pour créer ces flux et votre application. Si vous utilisez une autre région, vous devez mettre à jour le code en conséquence.

**Topics**
+ [Étape 1 : Préparation](app-anomaly-prepare.md)
+ [Étape 2 : Créer une application](app-anom-score-create-app.md)
+ [Étape 3 : Configuration de la sortie de l'application](app-anomaly-create-ka-app-config-destination.md)
+ [Étape 4 : Vérification de la sortie](app-anomaly-verify-output.md)

**Étape suivante**  
[Étape 1 : Préparation](app-anomaly-prepare.md)

# Étape 1 : Préparation
<a name="app-anomaly-prepare"></a>

Avant de créer une application d’analyse de données Amazon Kinesis Data Analytics pour cet exercice, vous devez créer deux flux de données Kinesis. Configurez l’un des flux en tant que source de streaming pour votre application et l’autre flux en tant que destination où Kinesis Data Analytics conserve la sortie de votre application. 

**Topics**
+ [Étape 1.1 : Création des flux de données d'entrée et de sortie](#app-anomaly-create-two-streams)
+ [Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée](#app-anomaly-write-sample-records-inputstream)

## Étape 1.1 : Création des flux de données d'entrée et de sortie
<a name="app-anomaly-create-two-streams"></a>

Dans cette section, vous créez deux flux Kinesis : `ExampleInputStream` et `ExampleOutputStream`. Vous pouvez créer ces flux avec la AWS Management Console ou l' AWS CLI.
+ 

**Pour utiliser la console**

  1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

  1. Choisissez **Create data stream (Créer un flux de données)**. Créez un flux avec une partition nommée `ExampleInputStream`. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

  1. Répétez l'étape précédente, en créant un flux avec une seule partition nommée `ExampleOutputStream`.
+ 

**Pour utiliser le AWS CLI**

  1. Utilisez la `create-stream` AWS CLI commande Kinesis suivante pour créer le premier flux ()`ExampleInputStream`.

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. Exécutez la même commande en remplaçant le nom du flux par `ExampleOutputStream`. Cette commande crée le deuxième flux que l'application utilisera pour écrire la sortie.

## Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée
<a name="app-anomaly-write-sample-records-inputstream"></a>

Dans cette étape, vous exécutez du code Python pour générer en continu des exemples d'enregistrements et les écrire dans le flux `ExampleInputStream`.

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. Installez Python et `pip`.

   Pour plus d'informations sur l'installation de Python, consultez le site web [Python](https://www.python.org/). 

   Vous pouvez installer des dépendances à l'aide de pip. Pour plus d'informations sur l'installation de pip, consultez [Installation](https://pip.pypa.io/en/stable/installing/) sur le site web de pip.

1. Exécutez le code Python suivant. La commande `put-record` dans le code écrit les enregistrements JSON dans le flux.

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**Étape suivante**  
[Étape 2 : Créer une application](app-anom-score-create-app.md)

# Étape 2 : Créer une application
<a name="app-anom-score-create-app"></a>

Dans cette section, vous allez créer une application Amazon Kinesis Data Analytics comme suit :
+ Configurez l’entrée de l’application pour utiliser le flux de données Kinesis que vous avez créé dans [Étape 1 : Préparation](app-anomaly-prepare.md) comme source de streaming.
+ Utilisez le modèle **Anomaly Detection (Détection des anomalies)** dans la console. 

**Pour créer une application**

1. Suivez les étapes 1, 2 et 3 de l’exercice Kinesis Data Analytics **Mise en route** (voir [Étape 3.1 : Créer une application](get-started-create-app.md)). 
   + Dans la configuration de la source, procédez de la façon suivante :
     + Spécifiez la source de diffusion que vous avez créée dans la section précédente. 
     + Une fois que la console a déduit le schéma, modifiez celui-ci et définissez le type de colonne `heartRate` sur `INTEGER`. 

       La plupart des valeurs de taux de cœur sont normales et le processus de découverte va très probablement attribuer le type `TINYINT` à cette colonne. Mais un faible pourcentage de valeurs présente un taux de cœur élevé. Si ces valeurs élevées ne correspondent au type `TINYINT`, Kinesis Data Analytics envoie ces lignes vers un flux d’erreurs. Mettez à jour le type de données en `INTEGER` pour qu'il puisse recevoir toutes les données de taux de cœur générées.
   + Utilisez le modèle **Anomaly Detection (Détection des anomalies)** dans la console. Mettez à jour ensuite le code du modèle pour fournir le nom de colonne approprié.

1. Mettez à jour le code d'application en fournissant des noms de colonne. Le code d'application résultant apparaît ci-dessous (collez ce code dans l'éditeur SQL) :

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. Exécutez le code SQL et vérifiez les résultats dans la console Kinesis Data Analytics :  
![\[Capture d'écran de la console montrant l'onglet d'analyse en temps réel avec les données résultantes dans le flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**Étape suivante**  
[Étape 3 : Configuration de la sortie de l'application](app-anomaly-create-ka-app-config-destination.md)

# Étape 3 : Configuration de la sortie de l'application
<a name="app-anomaly-create-ka-app-config-destination"></a>

Une fois que [Étape 2 : Créer une application](app-anom-score-create-app.md) est terminé, le code d'application lit des données de taux de cœur à partir d'une source de diffusion et attribue un score d'anomalie à chacune d'entre elles. 

Vous pouvez maintenant envoyer les résultats de l’application depuis le flux intégré à l’application vers une destination externe, qui est un autre flux de données Kinesis (`OutputStreamTestingAnomalyScores`). Vous pouvez ensuite analyser les scores d'anomalie et déterminer quel taux de cœur est anormal. Vous pouvez ensuite étendre encore cette application pour générer des alertes. 

Suivez les étapes ci-dessous pour configurer la sortie de l'application :



1. Ouvrez la console Amazon Kinesis Data Analytics. Dans l'éditeur SQL, choisissez **Destination** ou **Add a destination** dans le tableau de bord d'application. 

1. Sur la page **Connect to destination (Se connecter à une destination)**, choisissez le flux `OutputStreamTestingAnomalyScores` que vous avez créé dans la section précédente.

   Maintenant, vous disposez d’une destination externe où Amazon Kinesis Data Analytics conserve tous les enregistrements que votre application écrit dans le flux intégré à l’application `DESTINATION_SQL_STREAM`. 

1. Vous pouvez éventuellement configurer AWS Lambda pour surveiller le `OutputStreamTestingAnomalyScores` flux et vous envoyer des alertes. Pour obtenir des instructions, veuillez consulter [Prétraitement des données à l’aide d’une fonction Lambda](lambda-preprocessing.md). Si vous ne définissez pas d’alertes, vous pouvez vérifier les enregistrements écrits par Kinesis Data Analytics dans la destination externe, ce qui représente le flux de données Kinesis `OutputStreamTestingAnomalyScores`, comme décrit dans [Étape 4 : Vérification de la sortie](app-anomaly-verify-output.md).

**Étape suivante**  
[Étape 4 : Vérification de la sortie](app-anomaly-verify-output.md)

# Étape 4 : Vérification de la sortie
<a name="app-anomaly-verify-output"></a>

Après avoir configuré la sortie de l'application dans [Étape 3 : Configuration de la sortie de l'application](app-anomaly-create-ka-app-config-destination.md), utilisez les commandes de l' AWS CLI suivantes pour lire les enregistrements dans le flux de destination qui est écrit par l'application :

1. Exécutez la commande `get-shard-iterator` pour obtenir un pointeur vers les données du flux de sortie.

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   Vous obtenez une réponse comportant une valeur d'itérateur de partition, comme illustré dans l'exemple de réponse suivant :

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   Copiez la valeur d'itérateur de partition. 

1. Exécutez la commande AWS CLI `get-records`.

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   La commande renvoie une page d'enregistrements et un autre itérateur de partition que vous pouvez utiliser dans la commande `get-records` suivante pour extraire l'ensemble d'enregistrements suivant.

# Exemple : détection d'anomalies de données et message d'explication (fonction RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION)
<a name="app-anomaly-detection-with-explanation"></a>

Amazon Kinesis Data Analytics fournit la fonction `RANDOM_CUT_FOREST_WITH_EXPLANATION`, qui attribue un score d’anomalie à chaque enregistrement en fonction de valeurs dans les colonnes numériques. La fonction fournit également une explication de l'anomalie. Pour plus d’informations, consultez [RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-random-cut-forest-with-explanation.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 

Dans cet exercice, vous allez écrire du code d'application pour obtenir des scores d'anomalie pour des enregistrements dans la source de diffusion de votre application. Vous allez également obtenir une explication pour chaque anomalie.

**Topics**
+ [Étape 1 : Préparation des données](app-anomaly-with-ex-prepare.md)
+ [Étape 2 : Création d'une application d'analyse](app-anom-with-exp-create-app.md)
+ [Étape 3 : Évaluation des résultats](examine-results-with-exp.md)

**Première étape**  
[Étape 1 : Préparation des données](app-anomaly-with-ex-prepare.md)

# Étape 1 : Préparation des données
<a name="app-anomaly-with-ex-prepare"></a>

Avant de créer une application Amazon Kinesis Data Analytics pour cet [exemple](app-anomaly-detection-with-explanation.md), vous devez créer un flux de données Kinesis que vous allez utiliser comme source de streaming pour votre application. Vous pouvez également exécuter un code Python pour écrire des données simulées de pression artérielle dans le flux. 

**Topics**
+ [Étape 1.1 : Création d’un flux de données Kinesis](#app-anomaly-create-two-streams)
+ [Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée](#app-anomaly-write-sample-records-inputstream)

## Étape 1.1 : Création d’un flux de données Kinesis
<a name="app-anomaly-create-two-streams"></a>

Dans cette section, vous allez créer un flux de données Kinesis nommé `ExampleInputStream`. Vous pouvez créer ce flux de données à l'aide du AWS Management Console ou du AWS CLI.
+ Pour utiliser la console :

  1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

  1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation. Ensuite, choisissez **Create Kinesis stream (Créer un flux Kinesis)**.

  1. Pour le nom, saisissez **ExampleInputStream**. Pour le nombre de partitions, saisissez **1**.
+ Vous pouvez également utiliser le AWS CLI pour créer le flux de données, exécutez la commande suivante :

  ```
  $ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1
  ```

## Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée
<a name="app-anomaly-write-sample-records-inputstream"></a>

Dans cette étape, vous exécutez du code Python pour générer en continu des exemples d'enregistrements et les écrire dans le flux de données que vous avez créé. 

1. Installez Python et pip.

   Pour plus d'informations sur l'installation de Python, consultez le site [Python](https://www.python.org/). 

   Vous pouvez installer des dépendances à l'aide de pip. Pour plus d'informations sur l'installation de pip, consultez la section [Installation](https://pip.pypa.io/en/stable/installing/) dans la documentation de pip.

1. Exécutez le code Python suivant. Vous pouvez modifier la région en choisissant celle que vous souhaitez utiliser pour cet exemple. La commande `put-record` dans le code écrit les enregistrements JSON dans le flux.

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class PressureType(Enum):
       low = "LOW"
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_blood_pressure(pressure_type):
       pressure = {"BloodPressureLevel": pressure_type.value}
       if pressure_type == PressureType.low:
           pressure["Systolic"] = random.randint(50, 80)
           pressure["Diastolic"] = random.randint(30, 50)
       elif pressure_type == PressureType.normal:
           pressure["Systolic"] = random.randint(90, 120)
           pressure["Diastolic"] = random.randint(60, 80)
       elif pressure_type == PressureType.high:
           pressure["Systolic"] = random.randint(130, 200)
           pressure["Diastolic"] = random.randint(90, 150)
       else:
           raise TypeError
       return pressure
   
   
   def generate(stream_name, kinesis_client):
       while True:
           rnd = random.random()
           pressure_type = (
               PressureType.low
               if rnd < 0.005
               else PressureType.high
               if rnd > 0.995
               else PressureType.normal
           )
           blood_pressure = get_blood_pressure(pressure_type)
           print(blood_pressure)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(blood_pressure),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

**Étape suivante**  
[Étape 2 : Création d'une application d'analyse](app-anom-with-exp-create-app.md)

# Étape 2 : Création d'une application d'analyse
<a name="app-anom-with-exp-create-app"></a>

Dans cette section, vous allez créer une application Amazon Kinesis Data Analytics et la configurer pour utiliser le flux de données Kinesis que vous avez créé en tant que source de streaming dans [Étape 1 : Préparation des données](app-anomaly-with-ex-prepare.md). Vous pouvez ensuite exécuter le code de l'application qui utilise la fonction `RANDOM_CUT_FOREST_WITH_EXPLANATION`.

**Pour créer une application**

1. [Ouvrez la console Kinesis à l'adresse /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Choisissez **Data Analytics (Analyse des données)** dans le volet de navigation, puis **Create application (Créer une application)**.

1. Attribuez un nom et une description (facultatif) à votre application, puis sélectionnez **Create application**

1. Choisissez **Connect streaming data**, puis sélectionnez-en un **ExampleInputStream**dans la liste. 

1. Sélectionnez **Discover schema** et vérifiez que `Systolic` et `Diastolic` apparaissent sous la forme de colonnes `INTEGER`. S'ils sont associés à un autre type, sélectionnez **Edit schema** et attribuez-leur le type `INTEGER`. 

1. Sous **Real time analytics**, sélectionnez **Go to SQL editor**. À l'invite, choisissez d'exécuter votre application. 

1. Collez le code suivant dans l'éditeur SQL, puis choisissez **Save and run SQL**.

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   -- Compute an anomaly score with explanation for each record in the input stream
   -- using RANDOM_CUT_FOREST_WITH_EXPLANATION
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "Systolic", "Diastolic", "BloodPressureLevel", ANOMALY_SCORE, ANOMALY_EXPLANATION 
         FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

**Étape suivante**  
[Étape 3 : Évaluation des résultats](examine-results-with-exp.md)

# Étape 3 : Évaluation des résultats
<a name="examine-results-with-exp"></a>

Lorsque vous exécutez le code SQL pour cet [exemple](app-anomaly-detection-with-explanation.md), vous devez d'abord voir des lignes avec un score d'anomalie égal à zéro. Cela se produit au cours de la phase d'apprentissage initiale. Vous obtenez ensuite des résultats similaires à ce qui suit:

```
ROWTIME SYSTOLIC DIASTOLIC BLOODPRESSURELEVEL ANOMALY_SCORE ANOMALY_EXPLANATION
27:49.0	101      66        NORMAL             0.711460417   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0922","ATTRIBUTION_SCORE":"0.3792"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"0.0210","ATTRIBUTION_SCORE":"0.3323"}}
27:50.0	144      123       HIGH               3.855851061   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.8567","ATTRIBUTION_SCORE":"1.7447"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"7.0982","ATTRIBUTION_SCORE":"2.1111"}}
27:50.0	113      69        NORMAL             0.740069409   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0549","ATTRIBUTION_SCORE":"0.3750"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0394","ATTRIBUTION_SCORE":"0.3650"}}
27:50.0	105      64        NORMAL             0.739644157   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0245","ATTRIBUTION_SCORE":"0.3667"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0524","ATTRIBUTION_SCORE":"0.3729"}}
27:50.0	100      65        NORMAL             0.736993425   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0203","ATTRIBUTION_SCORE":"0.3516"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0454","ATTRIBUTION_SCORE":"0.3854"}}
27:50.0	108      69        NORMAL             0.733767202   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0974","ATTRIBUTION_SCORE":"0.3961"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0189","ATTRIBUTION_SCORE":"0.3377"}}
```
+ L'algorithme de la fonction `RANDOM_CUT_FOREST_WITH_EXPLANATION` voit que les colonnes `Systolic` et `Diastolic` sont au format numérique et les utilise en tant qu'entrées.
+ La colonne `BloodPressureLevel` contient des données texte et n'est donc pas prise en compte par l'algorithme. Cette colonne est une simple aide visuelle qui, dans cet exemple, vous permet d'identifier rapidement les différents niveaux de pression artérielle (normal, élevé et faible).
+ Dans la colonne `ANOMALY_SCORE`, les enregistrements associés à des scores plus élevés sont anormaux. Avec une score d'anomalie de 3,855851061, le deuxième enregistrement dans cet exemple d'ensemble de résultats est le plus anormal.
+ Pour comprendre dans quelle mesure chaque colonne numérique prise en compte par l'algorithme contribue au score d'anomalie, consultez le champ JSON nommé `ATTRIBUTION_SCORE` dans la colonne `ANOMALY_SCORE`. Dans le cas de la deuxième ligne de cet exemple d'ensemble de résultats, les colonnes `Systolic` et `Diastolic` contribuent au score d'anomalie selon une proportion de 1.7447:2 .1111. En d'autres termes, 45 % de l'explication du score d'anomalie est imputable à la valeur systolique, le reste étant attribué à la valeur diastolique.
+ Pour déterminer la direction dans laquelle le point représenté par la seconde ligne de cet exemple est anormal, consultez le champ JSON nommé `DIRECTION`. Les valeurs systolique et diastolique sont marquées en tant que `HIGH` dans ce cas. Pour déterminer le degré de confiance quant à l'exactitude de ces directions, consultez le champ JSON nommé `STRENGTH`. Dans cet exemple, l'algorithme a d'autant plus confiance que la valeur diastolique est élevée. En effet, la valeur normale du relevé diastolique est généralement comprise dans la plage 60 à 80, ce qui signifie qu’une valeur de 123 est bien plus élevée que prévu. 

# Exemple : Détection des points chauds sur un flux (fonction HOTSPOTS)
<a name="app-hotspots-detection"></a>

Amazon Kinesis Data Analytics fournit la fonction `HOTSPOTS` qui peut rechercher et renvoyer des informations sur les régions relativement denses de vos données. Pour plus d’informations, consultez [HOTSPOTS](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-hotspots.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 

Dans cet exercice, vous allez écrire du code d'application pour localiser les points chauds sur la source de streaming de votre application. Pour configurer l'application, exécutez les étapes suivantes :

1. **Configurer une source de streaming** : vous configurez un flux Kinesis et écrivez des exemples de données de coordonnées, comme illustré ci-après :

   ```
   {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
   {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}
   ```

   L'exemple fournit un script Python qui vous permet de remplir le flux. Les valeurs `x` et `y` sont générées de façon aléatoire, avec des enregistrements regroupés autour de certains emplacements.

   Le champ `is_hot` est fourni en tant qu'indicateur si le script a généré intentionnellement la valeur dans le cadre d'un point chaud. Cela peut vous aider à évaluer si la fonction de détection des points chauds fonctionne correctement.

1. **Créer l’application** : avec AWS Management Console, vous pouvez alors créer une application Kinesis Data Analytics. Configurez l'entrée d'application en mappant la source de streaming sur un flux intégré à l'application (`SOURCE_SQL_STREAM_001`). Lorsque l’application démarre, Kinesis Data Analytics lit en continu la source de streaming et insère des enregistrements dans le flux intégré à l’application.

   Dans cet exercice, vous utilisez le code suivant pour l'application :

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   Le code lit les lignes dans le flux `SOURCE_SQL_STREAM_001`, l'analyse pour rechercher les points chauds significatifs et écrit les données résultantes dans une autre flux intégré à l'application (`DESTINATION_SQL_STREAM`). Vous utilisez des pompes pour insérer des lignes dans les flux intégrés à l'application. Pour de plus amples informations, veuillez consulter [Flux et pompes intégrés à l'application](streams-pumps.md).

1. **Configurer la sortie** : vous configurez la sortie de l’application pour envoyer des données depuis l’application vers une destination externe qui est un autre flux de données Kinesis. Vérifiez les scores de point chaud et identifiez ceux qui indiquent qu'un point chaud a eu lieu (et que vous devez être alerté). Vous pouvez utiliser une AWS Lambda fonction pour traiter plus en détail les informations du hotspot et configurer les alertes. 

1. **Vérifier le résultat** — L'exemple inclut une JavaScript application qui lit les données du flux de sortie et les affiche graphiquement, afin que vous puissiez visualiser les points chauds générés par l'application en temps réel. 



L’exercice utilise la région USA Ouest (Oregon) (`us-west-2`) pour créer ces flux et votre application. Si vous utilisez une autre région, mettez à jour le code en conséquence.

**Topics**
+ [Étape 1 : Créer les flux d'entrée et de sortie](app-hotspots-prepare.md)
+ [Étape 2 : Création d’une application Kinesis Data Analytics](app-hotspot-create-app.md)
+ [Étape 3 : Configurer la sortie de l'application](app-hotspots-create-ka-app-config-destination.md)
+ [Étape 4 : Vérifier la sortie de l'application](app-hotspots-verify-output.md)

# Étape 1 : Créer les flux d'entrée et de sortie
<a name="app-hotspots-prepare"></a>

Avant de créer une application Amazon Kinesis Data Analytics pour l’[exemple pour les points chauds](app-hotspots-detection.md), vous créez deux flux de données Kinesis. Configurez l’un des flux en tant que source de streaming pour votre application et l’autre flux en tant que destination où Kinesis Data Analytics conserve la sortie de votre application. 

**Topics**
+ [Étape 1.1 : Création des flux de données Kinesis](#app-hotspots-create-two-streams)
+ [Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée](#app-hotspots-write-sample-records-inputstream)

## Étape 1.1 : Création des flux de données Kinesis
<a name="app-hotspots-create-two-streams"></a>

Dans cette section, vous créez deux flux de données Kinesis : `ExampleInputStream` et `ExampleOutputStream`. 

Créez ces flux de données à l'aide de la console ou de l' AWS CLI.
+ Pour créer les flux de données à l'aide de la console :

  1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

  1. Choisissez **Data Streams (Flux de données)** dans le volet de navigation.

  1. Choisissez **Create Kinesis stream (Créer un flux Kinesis)**, puis créez un flux avec une partition nommée `ExampleInputStream`.

  1. Répétez l'étape précédente, en créant un flux avec une seule partition nommée `ExampleOutputStream`.
+ Pour créer des flux de données à l'aide de l' AWS CLI :
  + Créez des flux (`ExampleInputStream`et`ExampleOutputStream`) à l'aide de la commande Kinesis `create-stream` AWS CLI suivante. Pour créer le deuxième flux que l'application utilisera pour écrire la sortie, exécutez la même commande en remplaçant le nom du flux par `ExampleOutputStream`.

    ```
    $ aws kinesis create-stream \
    --stream-name ExampleInputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
                             
    $ aws kinesis create-stream \
    --stream-name ExampleOutputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
    ```

## Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée
<a name="app-hotspots-write-sample-records-inputstream"></a>

Dans cette étape, vous exécutez du code Python pour générer en continu des exemples d'enregistrements et les écrire dans le flux `ExampleInputStream`.

```
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
{"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
```

1. Installez Python et `pip`.

   Pour plus d'informations sur l'installation de Python, consultez le site web [Python](https://www.python.org/). 

   Vous pouvez installer des dépendances à l'aide de pip. Pour plus d'informations sur l'installation de pip, consultez [Installation](https://pip.pypa.io/en/stable/installing/) sur le site web de pip.

1. Exécutez le code Python suivant. Ce code effectue les opérations suivantes :
   + Génère un point chaud potentiel quelque part dans le plan (X, Y).
   + Génère un ensemble de 1 000 points pour chaque point chaud. Parmi ces points, 20 % sont regroupés autour du point chaud. Les autres sont générés de façon aléatoire dans l'ensemble de l'espace.
   + La commande `put-record` écrit les enregistrements JSON dans le flux.
**Important**  
Ne chargez pas ce fichier sur un serveur web, car il contient vos informations d’identification AWS .

   ```
    
   import json
   from pprint import pprint
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_hotspot(field, spot_size):
       hotspot = {
           "left": field["left"] + random.random() * (field["width"] - spot_size),
           "width": spot_size,
           "top": field["top"] + random.random() * (field["height"] - spot_size),
           "height": spot_size,
       }
       return hotspot
   
   
   def get_record(field, hotspot, hotspot_weight):
       rectangle = hotspot if random.random() < hotspot_weight else field
       point = {
           "x": rectangle["left"] + random.random() * rectangle["width"],
           "y": rectangle["top"] + random.random() * rectangle["height"],
           "is_hot": "Y" if rectangle is hotspot else "N",
       }
       return {"Data": json.dumps(point), "PartitionKey": "partition_key"}
   
   
   def generate(
       stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
   ):
       """
       Generates points used as input to a hotspot detection algorithm.
       With probability hotspot_weight (20%), a point is drawn from the hotspot;
       otherwise, it is drawn from the base field. The location of the hotspot
       changes for every 1000 points generated.
       """
       points_generated = 0
       hotspot = None
       while True:
           if points_generated % 1000 == 0:
               hotspot = get_hotspot(field, hotspot_size)
           records = [
               get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
           ]
           points_generated += len(records)
           pprint(records)
           kinesis_client.put_records(StreamName=stream_name, Records=records)
   
           time.sleep(0.1)
   
   
   if __name__ == "__main__":
       generate(
           stream_name=STREAM_NAME,
           field={"left": 0, "width": 10, "top": 0, "height": 10},
           hotspot_size=1,
           hotspot_weight=0.2,
           batch_size=10,
           kinesis_client=boto3.client("kinesis"),
       )
   ```



**Étape suivante**  
[Étape 2 : Création d’une application Kinesis Data Analytics](app-hotspot-create-app.md)

# Étape 2 : Création d’une application Kinesis Data Analytics
<a name="app-hotspot-create-app"></a>

Dans cette section de l’[exemple pour les points chauds](app-hotspots-detection.md), vous créez une application Kinesis Data Analytics en procédant comme suit :
+ Configurez l’entrée de l’application pour utiliser le flux de données Kinesis que vous avez créé comme source de streaming à l’[étape 1](app-hotspots-prepare.md).
+ Utilisez le code d'application fourni dans l' AWS Management Console.

**Pour créer une application**

1. Créez une application Kinesis Data Analytics en suivant les étapes 1, 2 et 3 de l’exercice [Mise en route](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) (voir [Étape 3.1 : Créer une application](get-started-create-app.md)).

   Dans la configuration de la source, procédez de la façon suivante :
   + Spécifiez la source de streaming que vous avez créée dans [Étape 1 : Créer les flux d'entrée et de sortie](app-hotspots-prepare.md).
   + Une fois que la console a déduit le schéma, modifiez celui-ci. Assurez-vous que les types de colonne `x` et `y``DOUBLE` sont définis sur , et que le type de colonne `IS_HOT` est défini sur `VARCHAR`. 

1. Utilisez le code d'application suivant (vous pouvez coller ce code dans l'éditeur SQL) :

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   

1. Exécutez le code SQL et vérifiez les résultats.  
![\[Résultats de code SQL affichant rowtime, hotspot et hotspot_heat.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/hotspot-v2-40.png)





**Étape suivante**  
[Étape 3 : Configurer la sortie de l'application](app-hotspots-create-ka-app-config-destination.md)

# Étape 3 : Configurer la sortie de l'application
<a name="app-hotspots-create-ka-app-config-destination"></a>

À ce stade de l’[exemple pour les points chauds](app-hotspots-detection.md), vous avez un code d’application Amazon Kinesis Data Analytics qui identifie des points chauds significatifs d’une source de streaming et qui attribue un score de chaleur à chacun. 

Vous pouvez maintenant envoyer les résultats de l’application depuis le flux intégré à l’application vers une destination externe, qui est un autre flux de données Kinesis (`ExampleOutputStream`). Vous pouvez ensuite analyser les scores de points chauds et déterminer un seuil approprié pour la chaleur des points chauds. Vous pouvez étendre encore cette application pour générer des alertes. 

**Pour configurer la sortie de l'application**

1. [Ouvrez la console Kinesis Data Analytics à l'adresse https://console.aws.amazon.com /kinesisanalytics.](https://console.aws.amazon.com/kinesisanalytics)

1. Dans l'éditeur SQL, choisissez **Destination** ou **Add a destination** dans le tableau de bord d'application. 

1. Sur la page **Add a destination (Ajouter une destination)**, choisissez **Select from your streams (Choisissez parmi vos flux)**. Choisissez ensuite le flux `ExampleOutputStream` que vous avez créé dans la section précédente.

   Maintenant, vous disposez d’une destination externe où Amazon Kinesis Data Analytics conserve tous les enregistrements que votre application écrit dans le flux intégré à l’application `DESTINATION_SQL_STREAM`. 

1. Vous pouvez éventuellement configurer AWS Lambda pour surveiller le `ExampleOutputStream` flux et vous envoyer des alertes. Pour de plus amples informations, veuillez consulter [Utilisation d'une fonction Lambda en tant que sortie](how-it-works-output-lambda.md). Vous pouvez également vérifier les enregistrements écrits par Kinesis Data Analytics dans la destination externe, qui est le flux Kinesis `ExampleOutputStream`, comme décrit dans [Étape 4 : Vérifier la sortie de l'application](app-hotspots-verify-output.md).

**Étape suivante**  
[Étape 4 : Vérifier la sortie de l'application](app-hotspots-verify-output.md)

# Étape 4 : Vérifier la sortie de l'application
<a name="app-hotspots-verify-output"></a>

Dans cette section de l'[exemple pour les points chauds](app-hotspots-detection.md), vous configurez une application web qui affiche des informations sur les points chauds dans un contrôle SVG (Scalable Vector Graphics).

1. Créez un fichier nommé `index.html` avec le contenu suivant :

   ```
   <!doctype html>
   <html lang=en>
   <head>
       <meta charset=utf-8>
       <title>hotspots viewer</title>
   
       <style>
       #visualization {
         display: block;
         margin: auto;
       }
   
       .point {
         opacity: 0.2;
       }
   
       .hot {
         fill: red;
       }
   
       .cold {
         fill: blue;
       }
   
       .hotspot {
         stroke: black;
         stroke-opacity: 0.8;
         stroke-width: 1;
         fill: none;
       }
       </style>
       <script src="https://sdk.amazonaws.com/js/aws-sdk-2.202.0.min.js"></script>
       <script src="https://d3js.org/d3.v4.min.js"></script>
   </head>
   <body>
   <svg id="visualization" width="600" height="600"></svg>
   <script src="hotspots_viewer.js"></script>
   </body>
   </html>
   ```

1. Créez un fichier dans le même répertoire, nommé `hotspots_viewer.js` avec le contenu suivant. Fournissez votre , les informations d’identification et le nom du flux de sortie dans les variables fournies.

   ```
   // Visualize example output from the Kinesis Analytics hotspot detection algorithm.
   // This script assumes that the output stream has a single shard.
   
   // Modify this section to reflect your AWS configuration
   var awsRegion = "",        // The  where your Kinesis Analytics application is configured.
       accessKeyId = "",      // Your Access Key ID
       secretAccessKey = "",  // Your Secret Access Key
       outputStream = "";     // The name of the Kinesis Stream where the output from the HOTSPOTS function is being written
   
   // The variables in this section should reflect way input data was generated and the parameters that the HOTSPOTS
   // function was called with.
   var windowSize = 1000, // The window size used for hotspot detection
       minimumDensity = 40,  // A filter applied to returned hotspots before visualization
       xRange = [0, 10],  // The range of values to display on the x-axis
       yRange = [0, 10];  // The range of values to display on the y-axis
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // D3 setup
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var svg = d3.select("svg"),
       margin = {"top": 20, "right": 20, "bottom": 20, "left": 20},
       graphWidth = +svg.attr("width") - margin.left - margin.right,
       graphHeight = +svg.attr("height") - margin.top - margin.bottom;
   
   // Return the linear function that maps the segment [a, b] to the segment [c, d].
   function linearScale(a, b, c, d) {
       var m = (d - c) / (b - a);
       return function(x) {
           return c + m * (x - a);
       };
   }
   
   // helper functions to extract the x-value from a stream record and scale it for output
   var xValue = function(r) { return r.x; },
       xScale = linearScale(xRange[0], xRange[1], 0, graphWidth),
       xMap = function(r) { return xScale(xValue(r)); };
   
   // helper functions to extract the y-value from a stream record and scale it for output
   var yValue = function(r) { return r.y; },
       yScale = linearScale(yRange[0], yRange[1], 0, graphHeight),
       yMap = function(r) { return yScale(yValue(r)); };
   
   // a helper function that assigns a CSS class to a point based on whether it was generated as part of a hotspot
   var classMap = function(r) { return r.is_hot == "Y" ? "point hot" : "point cold"; };
   
   var g = svg.append("g")
       .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
   
   function update(records, hotspots) {
   
       var points = g.selectAll("circle")
           .data(records, function(r) { return r.dataIndex; });
   
       points.enter().append("circle")
           .attr("class", classMap)
           .attr("r", 3)
           .attr("cx", xMap)
           .attr("cy", yMap);
   
       points.exit().remove();
   
       if (hotspots) {
           var boxes = g.selectAll("rect").data(hotspots);
   
           boxes.enter().append("rect")
               .merge(boxes)
               .attr("class", "hotspot")
               .attr("x", function(h) { return xScale(h.minValues[0]); })
               .attr("y", function(h) { return yScale(h.minValues[1]); })
               .attr("width", function(h) { return xScale(h.maxValues[0]) - xScale(h.minValues[0]); })
               .attr("height", function(h) { return yScale(h.maxValues[1]) - yScale(h.minValues[1]); });
   
           boxes.exit().remove();
       }
   }
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Use the AWS SDK to pull output records from Kinesis and update the visualization
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var kinesis = new AWS.Kinesis({
       "region": awsRegion,
       "accessKeyId": accessKeyId,
       "secretAccessKey": secretAccessKey
   });
   
   var textDecoder = new TextDecoder("utf-8");
   
   // Decode an output record into an object and assign it an index value
   function decodeRecord(record, recordIndex) {
       var record = JSON.parse(textDecoder.decode(record.Data));
       var hotspots_result = JSON.parse(record.HOTSPOTS_RESULT);
       record.hotspots = hotspots_result.hotspots
           .filter(function(hotspot) { return hotspot.density >= minimumDensity});
       record.index = recordIndex
       return record;
   }
   
   // Fetch a new records from the shard iterator, append them to records, and update the visualization
   function getRecordsAndUpdateVisualization(shardIterator, records, lastRecordIndex) {
       kinesis.getRecords({
           "ShardIterator": shardIterator
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var newRecords = data.Records.map(function(raw) { return decodeRecord(raw, ++lastRecordIndex); });
           newRecords.forEach(function(record) { records.push(record); });
   
           var hotspots = null;
           if (newRecords.length > 0) {
               hotspots = newRecords[newRecords.length - 1].hotspots;
           }
   
           while (records.length > windowSize) {
               records.shift();
           }
   
           update(records, hotspots);
   
           getRecordsAndUpdateVisualization(data.NextShardIterator, records, lastRecordIndex);
       });
   }
   
   // Get a shard iterator for the output stream and begin updating the visualization. Note that this script will only
   // read records from the first shard in the stream.
   function init() {
       kinesis.describeStream({
           "StreamName": outputStream
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var shardId = data.StreamDescription.Shards[0].ShardId;
   
           kinesis.getShardIterator({
               "StreamName": outputStream,
               "ShardId": shardId,
               "ShardIteratorType": "LATEST"
           }, function(err, data) {
               if (err) {
                   console.log(err, err.stack);
                   return;
               }
               getRecordsAndUpdateVisualization(data.ShardIterator, [], 0);
           })
       });
   }
   
   // Start the visualization
   init();
   ```

1. Avec le code Python de l'exécution de la première section, ouvrez `index.html` dans un navigateur web. Les informations sur le point chaud s'affichent sur la page, comme illustré ci-après.

     
![\[Image vectorielle affichant les informations relatives aux points chauds.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/hotspots_visualizer.png)

# Exemples : Alertes et erreurs
<a name="examples-alerts"></a>

Cette section fournit des exemples d’applications Kinesis Data Analytics qui utilisent des alertes et des erreurs. Chaque exemple fournit des step-by-step instructions et du code pour vous aider à configurer et à tester votre application Kinesis Data Analytics. 

**Topics**
+ [Exemple : Création d'alertes simples](app-simple-alerts.md)
+ [Exemple : Création d'alertes limitées](app-throttled-alerts.md)
+ [Exemple : Exploration du flux d'erreurs intégré à l'application](app-explore-error-stream.md)

# Exemple : Création d'alertes simples
<a name="app-simple-alerts"></a>

Dans l’application Kinesis Data Analytics, la requête est exécutée en continu sur le flux intégré à l’application qui est créé sur le flux de démonstration. Pour de plus amples informations, veuillez consulter [Requêtes continues](continuous-queries-concepts.md). 

Si des lignes montrent qu'un changement de cours d'une action est supérieur à 1 %, ces lignes sont insérées dans un autre flux intégré à l'application. Dans l'exercice, vous pouvez configurer la sortie de l'application pour conserver les résultats dans une destination externe. Vous pouvez alors étudier les résultats plus en détail. Par exemple, vous pouvez utiliser une AWS Lambda fonction pour traiter des enregistrements et vous envoyer des alertes. 

**Pour créer une application d'alertes simples**

1. Créez une application d’analyse comme décrit dans l’exercice de [mise en route](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) de Kinesis Data Analytics.

1. Dans l’éditeur SQL dans Kinesis Data Analytics, remplacez le code d’application par les éléments suivants : 

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
              (ticker_symbol VARCHAR(4), 
               sector        VARCHAR(12), 
               change        DOUBLE, 
               price         DOUBLE);
   
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker_symbol, sector, change, price 
         FROM   "SOURCE_SQL_STREAM_001"
         WHERE  (ABS(Change / (Price - Change)) * 100) > 1;
   ```

   L' instruction `SELECT` du code d'application filtre les lignes de `SOURCE_SQL_STREAM_001` pour obtenir les changements de cours d'action supérieurs à 1 %. Elle insère ensuite ces lignes dans un autre flux intégré à l'application, `DESTINATION_SQL_STREAM`, à l'aide d'une pompe. Pour plus d'informations sur le modèle de codage expliquant l'utilisation de pompes pour insérer des lignes dans des flux intégrés à l'application, consultez [Code d'application](how-it-works-app-code.md).

1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**.

1. Ajoutez une destination. Pour ce faire, vous pouvez choisir l'onglet **Destination** dans l'éditeur SQL ou **Add a destination (Ajouter une destination)** dans la page de détails de l'application.

   1. Dans l'éditeur SQL, choisissez l'onglet **Destination**, puis **Connect to a destination (Se connecter à une destination)**. 

      Sur la page **Connect to destination (Se connecter à une destination)**, choisissez **Create New (Créer Nouveau)**. 

   1. Choisissez **Go to Kinesis Streams**. 

   1. Dans la console Amazon Kinesis Data Streams, créez un nouveau flux Kinesis (par exemple, `gs-destination`) avec une partition. Attendez que l'état du flux soit **ACTIVE**.

   1. Revenez à la console Kinesis Data Analytics. Sur la page **Connect to destination (Se connecter à une destination)**, choisissez le flux que vous avez créé. 

      Si le flux n'apparaît pas, actualisez la page.

   1. Choisissez **Save and continue (Enregistrer et continuer)**.

   Maintenant, vous disposez d’une destination externe, un flux de données Kinesis, où Kinesis Data Analytics conserve la sortie de votre application dans le flux intégré à l’application `DESTINATION_SQL_STREAM`.

1. Configurez AWS Lambda pour surveiller le flux Kinesis que vous avez créé et appeler une fonction Lambda. 

   Pour obtenir des instructions, veuillez consulter [Prétraitement des données à l’aide d’une fonction Lambda](lambda-preprocessing.md).

# Exemple : Création d'alertes limitées
<a name="app-throttled-alerts"></a>

Dans l’application Kinesis Data Analytics, la requête est exécutée en continu sur le flux intégré à l’application créé sur le flux de démonstration. Pour de plus amples informations, veuillez consulter [Requêtes continues](continuous-queries-concepts.md). Si des lignes montrent que le changement de cours d'une action est supérieur à 1 %, ces lignes sont insérées dans un autre flux intégré à l'application. L'application limite les alertes de sorte qu'une alerte est envoyée immédiatement lorsque le cours d'une action change. Toutefois, pas plus d'une alerte par minute par symbole boursier est envoyée au flux intégré à l'application.

**Pour créer une application d'alertes limitées**

1. Créez une application Kinesis Data Analytics comme décrit dans l’exercice de [mise en route](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) de Kinesis Data Analytics.

1. Dans l’éditeur SQL dans Kinesis Data Analytics, remplacez le code d’application par les éléments suivants : 

   ```
   CREATE OR REPLACE STREAM "CHANGE_STREAM" 
              (ticker_symbol VARCHAR(4), 
               sector        VARCHAR(12), 
               change        DOUBLE, 
               price         DOUBLE);
   
   CREATE OR REPLACE PUMP "change_pump" AS 
      INSERT INTO "CHANGE_STREAM"
         SELECT STREAM ticker_symbol, sector, change, price 
         FROM   "SOURCE_SQL_STREAM_001"
         WHERE  (ABS(Change / (Price - Change)) * 100) > 1;
         
   -- ** Trigger Count and Limit **
   -- Counts "triggers" or those values that evaluated true against the previous where clause
   -- Then provides its own limit on the number of triggers per hour per ticker symbol to what
   -- is specified in the WHERE clause
   
   CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM (
      ticker_symbol VARCHAR(4), 
      change REAL,
      trigger_count INTEGER);
   
   CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAM
   SELECT STREAM ticker_symbol, change, trigger_count
   FROM (
       SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_count
       FROM "CHANGE_STREAM"
       --window to perform aggregations over last minute to keep track of triggers
       WINDOW W1 AS (PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING)
   )
   WHERE trigger_count >= 1;
   ```

   L'instruction `SELECT` du code d'application filtre les lignes de `SOURCE_SQL_STREAM_001` pour obtenir les changements de cours d'action supérieurs à 1 % et insère ces lignes dans un autre flux intégré à l'application, `CHANGE_STREAM`, à l'aide d'une pompe. 

   L'application crée alors un second flux nommé `TRIGGER_COUNT_STREAM` pour les alertes limitées. Une deuxième requête sélectionne des enregistrements dans une fenêtre qui avance à chaque fois qu'un enregistrement y est admis, de telle sorte qu'un seul enregistrement par symbole boursier par minute est écrit dans le flux.

1. Choisissez **Save and run SQL (Enregistrer et exécuter SQL)**.

L'exemple génère dans `TRIGGER_COUNT_STREAM` un flux similaire à ce qui suit :

![\[Capture d'écran de la console montrant le flux de sortie contenant des colonnes de symbole boursier, de pourcentage de changement et de nombre de déclencheurs.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/ex-throttle-alerts.png)


# Exemple : Exploration du flux d'erreurs intégré à l'application
<a name="app-explore-error-stream"></a>

Amazon Kinesis Data Analytics fournit un flux d’erreurs intégré à l’application pour chaque application que vous créez. Les lignes que votre application ne peut pas traiter sont envoyées à ce flux d'erreurs. Vous pouvez envisager de conserver les données du flux d'erreurs dans une destination externe pour pouvoir les examiner. 

Vous effectuez les exercices suivants sur la console. Dans ces exemples, vous introduisez des erreurs dans la configuration d'entrée en modifiant le schéma qui est déduit par le processus de découverte, puis vous vérifiez les lignes envoyées au flux d'erreurs.

**Topics**
+ [Introduction d'une erreur d'analyse](#intro-error-parse-error)
+ [Introduction d'une erreur de division par zéro](#intro-error-divide-zero)

## Introduction d'une erreur d'analyse
<a name="intro-error-parse-error"></a>

Dans cet exercice, vous introduisez une erreur d'analyse.

1. Créez une application Kinesis Data Analytics comme décrit dans l’exercice de [mise en route](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) de Kinesis Data Analytics. 

1. Sur la page de détails de l'application, choisissez **Connect streaming data (Connecter des données de diffusion)**.

1. Si vous avez suivi l'exercice de mise en route, vous disposez d'un flux de démonstration (`kinesis-analytics-demo-stream`) dans votre compte. Sur la page **Connect to source (Se connecter à la source)**, choisissez ce flux de démonstration.

1. Kinesis Data Analytics prélève un échantillon dans le flux de démonstration pour déduire un schéma pour le flux d’entrée intégré à l’application qu’il crée. La console affiche le schéma déduit et les échantillons de données dans l'onglet **Formatted stream sample**.

1. Ensuite, éditez le schéma et modifiez le type de colonne pour introduire l'erreur d'analyse. Choisissez **Edit schema (Modifier le schéma)**.

1. Remplacez le type de colonne `TICKER_SYMBOL` `VARCHAR(4)` par `INTEGER`. 

   Le type de colonne du schéma intégré à l’application qui a été créé n’est pas valide, ce qui signifie que Kinesis Data Analytics ne peut pas introduire de données dans le flux intégré à l’application. Au lieu de cela, le service envoie les lignes dans le flux d'erreurs.

1. Choisissez **Save schema**.

1. Choisissez **Refresh schema samples**.

   Notez qu'il n'y a pas de lignes dans l'exemple **Formatted stream**. Toutefois, l'onglet **Error stream** affiche des données avec un message d'erreur. L'onglet **Error stream** montre les données envoyées aux flux d'erreurs intégré à l'application. 

   Comme vous avez modifié le type de données de colonne, Kinesis Data Analytics n’a pas pu importer les données dans le flux d’entrée intégré à l’application. Au lieu de cela, le service envoie les données dans le flux d'erreurs.

## Introduction d'une erreur de division par zéro
<a name="intro-error-divide-zero"></a>

Dans le cadre de cet exercice, vous allez mettre à jour le code de l'application pour introduire une erreur d'exécution (division par zéro). Notez qu’Amazon Kinesis Data Analytics envoie les lignes obtenues sur le flux d’erreurs intégré à l’application, et non sur le flux intégré à l’application `DESTINATION_SQL_STREAM` dans lequel les résultats doivent logiquement être écrits.



1. Créez une application Kinesis Data Analytics comme décrit dans l’exercice de [mise en route](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) de Kinesis Data Analytics.

   Vérifiez les résultats dans l'onglet **Real-time analytics** en procédant comme suit :

   -

1. Mettez à jour l'instruction `SELECT` dans le code d'application pour introduire une division par zéro, par exemple : 

   ```
   SELECT STREAM ticker_symbol, sector, change, (price / 0) as ProblemColumn
   FROM "SOURCE_SQL_STREAM_001"
   WHERE sector SIMILAR TO '%TECH%';
   ```

   

1. Exécutez l'application.

   En raison de l’erreur d’exécution de division par zéro, au lieu d’écrire les résultats dans le flux `DESTINATION_SQL_STREAM`, Kinesis Data Analytics envoie les lignes dans le flux d’erreurs intégré à l’application. Dans l'onglet **Real-time analytics (Analyse en temps réel)**, choisissez le flux d'erreurs. Vous pouvez alors voir les lignes du flux d'erreurs intégré à l'application. 

# Exemples : Accélérateurs de solution
<a name="examples_solution"></a>

Le [site AWS des solutions](https://aws.amazon.com/solutions/) propose AWS CloudFormation des modèles que vous pouvez utiliser pour créer rapidement des solutions complètes de diffusion de données. 

Les modèles suivants sont disponibles :

## Informations en temps réel sur Compte AWS l'activité
<a name="examples_solution_activity"></a>

Cette solution enregistre et visualise les indicateurs d'accès et d'utilisation des ressources pour vous Compte AWS en temps réel. Pour plus d'informations, consultez la section Informations [en temps réel sur Compte AWS l'activité](https://docs.aws.amazon.com/solutions/latest/real-time-insights-account-activity/welcome.html).

## Surveillance des AWS IoT appareils en temps réel avec Kinesis Data Analytics
<a name="examples_solution_iot"></a>

Cette solution collecte, traite, analyse et affiche les données de connectivité et d'activité d'appareil IoT en temps réel. Pour plus d'informations, consultez la section [Surveillance des AWS IoT appareils en temps réel avec Kinesis Data Analytics](https://docs.aws.amazon.com/solutions/latest/real-time-iot-device-monitoring-with-kinesis/welcome.html).

## Analyse web en temps réel avec Kinesis Data Analytics
<a name="examples_solution_web"></a>

Cette solution collecte, traite, analyse et affiche les données de flux de clics sur les sites web en temps réel. Pour plus d’informations, consultez [Analyse web en temps réel avec Kinesis Data Analytics](https://docs.aws.amazon.com/solutions/latest/real-time-web-analytics-with-kinesis/welcome.html).

## Solution Amazon pour véhicules connectés
<a name="examples_solution_vehicle"></a>

Cette solution collecte, traite, analyse et affiche les données IoT de véhicules en temps réel. Pour plus d’informations, consultez [Solution Amazon pour véhicules connectés](https://docs.aws.amazon.com//solutions/latest/connected-vehicle-solution/welcome.html).