

After careful consideration, we have decided to discontinue Amazon Kinesis Data Analytics for SQL applications:

1. From **September 1, 2025**, we won't provide any bug fixes for Amazon Kinesis Data Analytics for SQL applications because we will have limited support for it, given the upcoming discontinuation.

2. From **October 15, 2025**, you will not be able to create new Kinesis Data Analytics for SQL applications.

3. We will delete your applications starting **January 27, 2026**. You will not be able to start or operate your Amazon Kinesis Data Analytics for SQL applications. Support will no longer be available for Amazon Kinesis Data Analytics for SQL from that time. For more information, see [Amazon Kinesis Data Analytics for SQL Applications discontinuation](discontinuation.md).

# Example: Detecting Data Anomalies and Getting an Explanation (RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION Function)
Example: Detect Anomalies and Get an Explanation

Amazon Kinesis Data Analytics provides the `RANDOM_CUT_FOREST_WITH_EXPLANATION` function, which assigns an anomaly score to each record based on values in the numeric columns. The function also provides an explanation of the anomaly. For more information, see [RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-random-cut-forest-with-explanation.html) in the *Amazon Managed Service for Apache Flink SQL Reference*. 

In this exercise, you write application code to obtain anomaly scores for records in your application's streaming source. You also obtain an explanation for each anomaly.

**Topics**
+ [

# Step 1: Prepare the Data
](app-anomaly-with-ex-prepare.md)
+ [

# Step 2: Create an Analytics Application
](app-anom-with-exp-create-app.md)
+ [

# Step 3: Examine the Results
](examine-results-with-exp.md)

**First Step**  
[Step 1: Prepare the Data](app-anomaly-with-ex-prepare.md)

# Step 1: Prepare the Data


Before you create an Amazon Kinesis Data Analytics application for this [example](app-anomaly-detection-with-explanation.md), you create a Kinesis data stream to use as the streaming source for your application. You also run Python code to write simulated blood pressure data to the stream. 

**Topics**
+ [

## Step 1.1: Create a Kinesis Data Stream
](#app-anomaly-create-two-streams)
+ [

## Step 1.2: Write Sample Records to the Input Stream
](#app-anomaly-write-sample-records-inputstream)

## Step 1.1: Create a Kinesis Data Stream


In this section, you create a Kinesis data stream named `ExampleInputStream`. You can create this data stream using the AWS Management Console or the AWS CLI.
+ To use the console:

  1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

  1. Choose **Data Streams** in the navigation pane. Then choose **Create Kinesis stream**.

  1. For the name, type **ExampleInputStream**. For the number of shards, type **1**.
+ Alternatively, to use the AWS CLI to create the data stream, run the following command:

  ```
  $ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1
  ```

## Step 1.2: Write Sample Records to the Input Stream


In this step, you run Python code to continuously generate sample records and write them to the data stream that you created. 

1. Install Python and pip.

   For information about installing Python, see [Python](https://www.python.org/). 

   You can install dependencies using pip. For information about installing pip, see [Installation](https://pip.pypa.io/en/stable/installing/) in the pip documentation.

1. Run the following Python code. You can change the Region to the one you want to use for this example. The `put-record` command in the code writes the JSON records to the stream.

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class PressureType(Enum):
       low = "LOW"
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_blood_pressure(pressure_type):
       pressure = {"BloodPressureLevel": pressure_type.value}
       if pressure_type == PressureType.low:
           pressure["Systolic"] = random.randint(50, 80)
           pressure["Diastolic"] = random.randint(30, 50)
       elif pressure_type == PressureType.normal:
           pressure["Systolic"] = random.randint(90, 120)
           pressure["Diastolic"] = random.randint(60, 80)
       elif pressure_type == PressureType.high:
           pressure["Systolic"] = random.randint(130, 200)
           pressure["Diastolic"] = random.randint(90, 150)
       else:
           raise TypeError
       return pressure
   
   
   def generate(stream_name, kinesis_client):
       while True:
           rnd = random.random()
           pressure_type = (
               PressureType.low
               if rnd < 0.005
               else PressureType.high
               if rnd > 0.995
               else PressureType.normal
           )
           blood_pressure = get_blood_pressure(pressure_type)
           print(blood_pressure)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(blood_pressure),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

**Next Step**  
[Step 2: Create an Analytics Application](app-anom-with-exp-create-app.md)

# Step 2: Create an Analytics Application


In this section, you create an Amazon Kinesis Data Analytics application and configure it to use the Kinesis data stream that you created as the streaming source in [Step 1: Prepare the Data](app-anomaly-with-ex-prepare.md). You then run application code that uses the `RANDOM_CUT_FOREST_WITH_EXPLANATION` function.

**To create an application**

1. Open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Analytics** in the navigation pane, and then choose **Create application**.

1. Provide an application name and description (optional), and choose **Create application**.

1. Choose **Connect streaming data**, and then choose **ExampleInputStream** from the list. 

1. Choose **Discover schema**, and make sure that `Systolic` and `Diastolic` appear as `INTEGER` columns. If they have another type, choose **Edit schema**, and assign the type `INTEGER` to both of them. 

1. Under **Real time analytics**, choose **Go to SQL editor**. When prompted, choose to run your application. 

1. Paste the following code into the SQL editor, and then choose **Save and run SQL**.

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   -- Compute an anomaly score with explanation for each record in the input stream
   -- using RANDOM_CUT_FOREST_WITH_EXPLANATION
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "Systolic", "Diastolic", "BloodPressureLevel", ANOMALY_SCORE, ANOMALY_EXPLANATION 
         FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

**Next Step**  
[Step 3: Examine the Results](examine-results-with-exp.md)

# Step 3: Examine the Results


When you run the SQL code for this [example](app-anomaly-detection-with-explanation.md), you first see rows with an anomaly score equal to zero. This happens during the initial learning phase. Then you get results similar to the following:

```
ROWTIME SYSTOLIC DIASTOLIC BLOODPRESSURELEVEL ANOMALY_SCORE ANOMALY_EXPLANATION
27:49.0	101      66        NORMAL             0.711460417   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0922","ATTRIBUTION_SCORE":"0.3792"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"0.0210","ATTRIBUTION_SCORE":"0.3323"}}
27:50.0	144      123       HIGH               3.855851061   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.8567","ATTRIBUTION_SCORE":"1.7447"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"7.0982","ATTRIBUTION_SCORE":"2.1111"}}
27:50.0	113      69        NORMAL             0.740069409   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0549","ATTRIBUTION_SCORE":"0.3750"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0394","ATTRIBUTION_SCORE":"0.3650"}}
27:50.0	105      64        NORMAL             0.739644157   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0245","ATTRIBUTION_SCORE":"0.3667"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0524","ATTRIBUTION_SCORE":"0.3729"}}
27:50.0	100      65        NORMAL             0.736993425   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0203","ATTRIBUTION_SCORE":"0.3516"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0454","ATTRIBUTION_SCORE":"0.3854"}}
27:50.0	108      69        NORMAL             0.733767202   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0974","ATTRIBUTION_SCORE":"0.3961"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0189","ATTRIBUTION_SCORE":"0.3377"}}
```
+ The algorithm in the `RANDOM_CUT_FOREST_WITH_EXPLANATION` function sees that the `Systolic` and `Diastolic` columns are numeric, and uses them as input.
+ The `BloodPressureLevel` column has text data, and is therefore not taken into account by the algorithm. This column is simply a visual aide to help you quickly spot the normal, high, and low blood pressure levels in this example.
+ In the `ANOMALY_SCORE` column, records with higher scores are more anomalous. The second record in this sample set of results is the most anomalous, with an anomaly score of 3.855851061.
+ To understand the extent to which each of the numeric columns taken into account by the algorithm contributes to the anomaly score, consult the JSON field named `ATTRIBUTION_SCORE` in the `ANOMALY_SCORE` column. In the case of the second row in this set of sample results, the `Systolic` and `Diastolic` columns contribute to the anomaly in the ratio of 1.7447:2.1111. In other words, 45 percent of the explanation for the anomaly score is attributable to the systolic value, and the remaining attribution is due to the diastolic value.
+ To determine the direction in which the point represented by the second row in this sample is anomalous, consult the JSON field named `DIRECTION`. Both the diastolic and systolic values are marked as `HIGH` in this case. To determine the confidence with which these directions are correct, consult the JSON field named `STRENGTH`. In this example, the algorithm is more confident that the diastolic value is high. Indeed, the normal value for the diastolic reading is usually 60–80, and 123 is much higher than expected. 