

Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics pour les applications SQL :

1. À compter du **1er septembre 2025,** nous ne fournirons aucune correction de bogue pour les applications Amazon Kinesis Data Analytics for SQL, car leur support sera limité, compte tenu de l'arrêt prochain.

2. À compter du **15 octobre 2025,** vous ne pourrez plus créer de nouvelles applications Kinesis Data Analytics for SQL.

3. Nous supprimerons vos candidatures à compter **du 27 janvier 2026**. Vous ne serez pas en mesure de démarrer ou d'utiliser vos applications Amazon Kinesis Data Analytics for SQL. Support ne sera plus disponible pour Amazon Kinesis Data Analytics for SQL à partir de cette date. Pour de plus amples informations, veuillez consulter [Arrêt d'Amazon Kinesis Data Analytics pour les applications SQL](discontinuation.md).

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exemple : Détection d'anomalies de données sur un flux (fonction RANDOM\$1CUT\$1FOREST)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics fournit une fonction (`RANDOM_CUT_FOREST`) qui peut attribuer un score d’anomalie à chaque enregistrement en fonction de valeurs dans les colonnes numériques. Pour plus d’informations, consultez la section [Fonction `RANDOM_CUT_FOREST`](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html) dans le manuel *Référence SQL du service géré Amazon pour Apache Flink*. 

Dans cet exercice, vous allez écrire du code d'application pour attribuer un score d'anomalie à des enregistrements sur la source de diffusion de votre application. Pour configurer l'application, procédez comme suit :

1. **Configurer une source de streaming** : vous configurez un flux de données Kinesis et écrivez des échantillons de données `heartRate`, comme illustré ci-après :

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   La procédure fournit un script Python qui vous permet de remplir le flux. Les valeurs `heartRate` sont générées de façon aléatoire, avec 99 % des enregistrements ayant des valeurs `heartRate` comprises entre 60 et 100, et seulement 1 % ayant des valeurs `heartRate` entre 150 et 200. Les enregistrements avec des valeurs `heartRate` entre 150 et 200 sont donc des anomalies. 

1. **Configurer l’entrée** : à l’aide de la console, vous créez une application Kinesis Data Analytics et configurez l’entrée de l’application en mappant la source de streaming à un flux intégré à l’application (`SOURCE_SQL_STREAM_001`). Lorsque l’application démarre, Kinesis Data Analytics lit en continu la source de streaming et insère des enregistrements dans le flux intégré à l’application.

1. **Spécifiez le code d'application** – L'exemple utilise le code d'application suivant :

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   Le code lit les lignes dans le flux `SOURCE_SQL_STREAM_001`, attribue un score d'anomalie et écrit les lignes résultantes dans une autre flux intégré à l'application (`TEMP_STREAM`). Le code d'application trie ensuite les enregistrements dans le flux `TEMP_STREAM` et enregistre les résultats dans un autre flux intégré à l'application (`DESTINATION_SQL_STREAM`). Vous utilisez des pompes pour insérer des lignes dans les flux intégrés à l'application. Pour de plus amples informations, veuillez consulter [Flux et pompes intégrés à l'application](streams-pumps.md).

1. **Configurer la sortie** : vous configurez la sortie de l’application pour conserver les données du flux `DESTINATION_SQL_STREAM` dans une destination externe qui est un autre flux de données Kinesis. Les opérations visant à examiner les scores d'anomalie qui sont attribués à chaque enregistrement et à déterminer quel score indique une anomalie (et que vous avez besoin d'être alerté) sont externes à l'application. Vous pouvez utiliser une AWS Lambda fonction pour traiter ces scores d'anomalies et configurer les alertes. 

L’exercice utilise la région USA Est (Virginie du Nord) (`us-east-1`) pour créer ces flux et votre application. Si vous utilisez une autre région, vous devez mettre à jour le code en conséquence.

**Topics**
+ [

# Étape 1 : Préparation
](app-anomaly-prepare.md)
+ [

# Étape 2 : Créer une application
](app-anom-score-create-app.md)
+ [

# Étape 3 : Configuration de la sortie de l'application
](app-anomaly-create-ka-app-config-destination.md)
+ [

# Étape 4 : Vérification de la sortie
](app-anomaly-verify-output.md)

**Étape suivante**  
[Étape 1 : Préparation](app-anomaly-prepare.md)

# Étape 1 : Préparation
<a name="app-anomaly-prepare"></a>

Avant de créer une application d’analyse de données Amazon Kinesis Data Analytics pour cet exercice, vous devez créer deux flux de données Kinesis. Configurez l’un des flux en tant que source de streaming pour votre application et l’autre flux en tant que destination où Kinesis Data Analytics conserve la sortie de votre application. 

**Topics**
+ [

## Étape 1.1 : Création des flux de données d'entrée et de sortie
](#app-anomaly-create-two-streams)
+ [

## Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée
](#app-anomaly-write-sample-records-inputstream)

## Étape 1.1 : Création des flux de données d'entrée et de sortie
<a name="app-anomaly-create-two-streams"></a>

Dans cette section, vous créez deux flux Kinesis : `ExampleInputStream` et `ExampleOutputStream`. Vous pouvez créer ces flux avec la AWS Management Console ou l' AWS CLI.
+ 

**Pour utiliser la console**

  1. [Connectez-vous à la console Kinesis AWS Management Console et ouvrez-la à https://console.aws.amazon.com l'adresse /kinesis.](https://console.aws.amazon.com/kinesis)

  1. Choisissez **Create data stream (Créer un flux de données)**. Créez un flux avec une partition nommée `ExampleInputStream`. Pour de plus amples informations, consultez [Créer un flux](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) dans le *Guide du développeur Amazon Kinesis Data Streams*.

  1. Répétez l'étape précédente, en créant un flux avec une seule partition nommée `ExampleOutputStream`.
+ 

**Pour utiliser le AWS CLI**

  1. Utilisez la `create-stream` AWS CLI commande Kinesis suivante pour créer le premier flux ()`ExampleInputStream`.

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. Exécutez la même commande en remplaçant le nom du flux par `ExampleOutputStream`. Cette commande crée le deuxième flux que l'application utilisera pour écrire la sortie.

## Étape 1.2 : Ecriture d'exemples d'enregistrements dans le flux d'entrée
<a name="app-anomaly-write-sample-records-inputstream"></a>

Dans cette étape, vous exécutez du code Python pour générer en continu des exemples d'enregistrements et les écrire dans le flux `ExampleInputStream`.

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. Installez Python et `pip`.

   Pour plus d'informations sur l'installation de Python, consultez le site web [Python](https://www.python.org/). 

   Vous pouvez installer des dépendances à l'aide de pip. Pour plus d'informations sur l'installation de pip, consultez [Installation](https://pip.pypa.io/en/stable/installing/) sur le site web de pip.

1. Exécutez le code Python suivant. La commande `put-record` dans le code écrit les enregistrements JSON dans le flux.

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**Étape suivante**  
[Étape 2 : Créer une application](app-anom-score-create-app.md)

# Étape 2 : Créer une application
<a name="app-anom-score-create-app"></a>

Dans cette section, vous allez créer une application Amazon Kinesis Data Analytics comme suit :
+ Configurez l’entrée de l’application pour utiliser le flux de données Kinesis que vous avez créé dans [Étape 1 : Préparation](app-anomaly-prepare.md) comme source de streaming.
+ Utilisez le modèle **Anomaly Detection (Détection des anomalies)** dans la console. 

**Pour créer une application**

1. Suivez les étapes 1, 2 et 3 de l’exercice Kinesis Data Analytics **Mise en route** (voir [Étape 3.1 : Créer une application](get-started-create-app.md)). 
   + Dans la configuration de la source, procédez de la façon suivante :
     + Spécifiez la source de diffusion que vous avez créée dans la section précédente. 
     + Une fois que la console a déduit le schéma, modifiez celui-ci et définissez le type de colonne `heartRate` sur `INTEGER`. 

       La plupart des valeurs de taux de cœur sont normales et le processus de découverte va très probablement attribuer le type `TINYINT` à cette colonne. Mais un faible pourcentage de valeurs présente un taux de cœur élevé. Si ces valeurs élevées ne correspondent au type `TINYINT`, Kinesis Data Analytics envoie ces lignes vers un flux d’erreurs. Mettez à jour le type de données en `INTEGER` pour qu'il puisse recevoir toutes les données de taux de cœur générées.
   + Utilisez le modèle **Anomaly Detection (Détection des anomalies)** dans la console. Mettez à jour ensuite le code du modèle pour fournir le nom de colonne approprié.

1. Mettez à jour le code d'application en fournissant des noms de colonne. Le code d'application résultant apparaît ci-dessous (collez ce code dans l'éditeur SQL) :

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. Exécutez le code SQL et vérifiez les résultats dans la console Kinesis Data Analytics :  
![\[Capture d'écran de la console montrant l'onglet d'analyse en temps réel avec les données résultantes dans le flux intégré à l'application.\]](http://docs.aws.amazon.com/fr_fr/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**Étape suivante**  
[Étape 3 : Configuration de la sortie de l'application](app-anomaly-create-ka-app-config-destination.md)

# Étape 3 : Configuration de la sortie de l'application
<a name="app-anomaly-create-ka-app-config-destination"></a>

Une fois que [Étape 2 : Créer une application](app-anom-score-create-app.md) est terminé, le code d'application lit des données de taux de cœur à partir d'une source de diffusion et attribue un score d'anomalie à chacune d'entre elles. 

Vous pouvez maintenant envoyer les résultats de l’application depuis le flux intégré à l’application vers une destination externe, qui est un autre flux de données Kinesis (`OutputStreamTestingAnomalyScores`). Vous pouvez ensuite analyser les scores d'anomalie et déterminer quel taux de cœur est anormal. Vous pouvez ensuite étendre encore cette application pour générer des alertes. 

Suivez les étapes ci-dessous pour configurer la sortie de l'application :



1. Ouvrez la console Amazon Kinesis Data Analytics. Dans l'éditeur SQL, choisissez **Destination** ou **Add a destination** dans le tableau de bord d'application. 

1. Sur la page **Connect to destination (Se connecter à une destination)**, choisissez le flux `OutputStreamTestingAnomalyScores` que vous avez créé dans la section précédente.

   Maintenant, vous disposez d’une destination externe où Amazon Kinesis Data Analytics conserve tous les enregistrements que votre application écrit dans le flux intégré à l’application `DESTINATION_SQL_STREAM`. 

1. Vous pouvez éventuellement configurer AWS Lambda pour surveiller le `OutputStreamTestingAnomalyScores` flux et vous envoyer des alertes. Pour obtenir des instructions, veuillez consulter [Prétraitement des données à l’aide d’une fonction Lambda](lambda-preprocessing.md). Si vous ne définissez pas d’alertes, vous pouvez vérifier les enregistrements écrits par Kinesis Data Analytics dans la destination externe, ce qui représente le flux de données Kinesis `OutputStreamTestingAnomalyScores`, comme décrit dans [Étape 4 : Vérification de la sortie](app-anomaly-verify-output.md).

**Étape suivante**  
[Étape 4 : Vérification de la sortie](app-anomaly-verify-output.md)

# Étape 4 : Vérification de la sortie
<a name="app-anomaly-verify-output"></a>

Après avoir configuré la sortie de l'application dans [Étape 3 : Configuration de la sortie de l'application](app-anomaly-create-ka-app-config-destination.md), utilisez les commandes de l' AWS CLI suivantes pour lire les enregistrements dans le flux de destination qui est écrit par l'application :

1. Exécutez la commande `get-shard-iterator` pour obtenir un pointeur vers les données du flux de sortie.

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   Vous obtenez une réponse comportant une valeur d'itérateur de partition, comme illustré dans l'exemple de réponse suivant :

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   Copiez la valeur d'itérateur de partition. 

1. Exécutez la commande AWS CLI `get-records`.

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   La commande renvoie une page d'enregistrements et un autre itérateur de partition que vous pouvez utiliser dans la commande `get-records` suivante pour extraire l'ensemble d'enregistrements suivant.