

After careful consideration, we have decided to discontinue Amazon Kinesis Data Analytics for SQL applications:

1. From **September 1, 2025**, we won't provide any bug fixes for Amazon Kinesis Data Analytics for SQL applications because we will have limited support for it, given the upcoming discontinuation.

2. From **October 15, 2025**, you will not be able to create new Kinesis Data Analytics for SQL applications.

3. We will delete your applications starting **January 27, 2026**. You will not be able to start or operate your Amazon Kinesis Data Analytics for SQL applications. Support will no longer be available for Amazon Kinesis Data Analytics for SQL from that time. For more information, see [Amazon Kinesis Data Analytics for SQL Applications discontinuation](discontinuation.md).

# Examples: Transforming Data
<a name="examples-transforming"></a>

There are times when your application code must preprocess incoming records before performing any analytics in Amazon Kinesis Data Analytics. This can happen for various reasons, such as when records don't conform to the supported record formats, resulting in unnormalized columns in the in-application input streams. 

This section provides examples of how to use the available string functions to normalize data, how to extract information that you need from string columns, and so on. The section also points to date time functions that you might find useful. 

## Preprocessing Streams with Lambda
<a name="examples-transforming-lambda"></a>

For information about preprocessing streams with AWS Lambda, see [Preprocessing Data Using a Lambda Function](lambda-preprocessing.md).

**Topics**
+ [Preprocessing Streams with Lambda](#examples-transforming-lambda)
+ [Examples: Transforming String Values](examples-transforming-strings.md)
+ [Example: Transforming DateTime Values](app-string-datetime-manipulation.md)
+ [Example: Transforming Multiple Data Types](app-tworecordtypes.md)

# Examples: Transforming String Values
<a name="examples-transforming-strings"></a>

Amazon Kinesis Data Analytics supports formats such as JSON and CSV for records on a streaming source. For details, see [RecordFormat](API_RecordFormat.md). These records then map to rows in an in-application stream as per the input configuration. For details, see [Configuring Application Input](how-it-works-input.md). The input configuration specifies how record fields in the streaming source map to columns in an in-application stream. 

This mapping works when records on the streaming source follow the supported formats, which results in an in-application stream with normalized data. But what if data on your streaming source does not conform to supported standards? For example, what if your streaming source contains data such as clickstream data, IoT sensors, and application logs? 

Consider these examples:
+ Streaming source contains application logs – The application logs follow the standard Apache log format, and are written to the stream using JSON format. 

  ```
  {
     "Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pb.gif HTTP/1.1" 304 0"
  }
  ```

  For more information about the standard Apache log format, see [Log Files](https://httpd.apache.org/docs/2.4/logs.html) on the Apache website. 

   
+ Streaming source contains semi-structured data – The following example shows two records. The `Col_E_Unstructured` field value is a series of comma-separated values. There are five columns: the first four have string type values, and the last column contains comma-separated values.

  ```
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  ```
+ Records on your streaming source contain URLs, and you need a portion of the URL domain name for analytics.

  ```
  { "referrer" : "http://www.amazon.com"}
  { "referrer" : "http://www.stackoverflow.com" }
  ```

In such cases, the following two-step process generally works for creating in-application streams that contain normalized data:

1. Configure application input to map the unstructured field to a column of the `VARCHAR(N)` type in the in-application input stream that is created.

1. In your application code, use string functions to split this single column into multiple columns and then save the rows in another in-application stream. This in-application stream that your application code creates will have normalized data. You can then perform analytics on this in-application stream.

Amazon Kinesis Data Analytics provides the following string operations, standard SQL functions, and extensions to the SQL standard for working with string columns: 
+ **String operators** – Operators such as `LIKE` and `SIMILAR` are useful in comparing strings. For more information, see [String Operators](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-operators.html) in the *Amazon Managed Service for Apache Flink SQL Reference*.
+ **SQL functions** – The following functions are useful when manipulating individual strings. For more information, see [String and Search Functions](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-and-search-functions.html) in the *Amazon Managed Service for Apache Flink SQL Reference*.
  + `CHAR_LENGTH` – Provides the length of a string. 
  + `INITCAP` – Returns a converted version of the input string such that the first character of each space-delimited word is uppercase, and all other characters are lowercase. 
  + `LOWER/UPPER` – Converts a string to lowercase or uppercase. 
  + `OVERLAY` – Replaces a portion of the first string argument (the original string) with the second string argument (the replacement string).
  + `POSITION` – Searches for a string within another string. 
  + `REGEX_REPLACE` – Replaces a substring with an alternative substring.
  + `SUBSTRING` – Extracts a portion of a source string starting at a specific position. 
  + `TRIM` – Removes instances of the specified character from the beginning or end of the source string. 
+ **SQL extensions** – These are useful for working with unstructured strings such as logs and URIs. For more information, see [Log Parsing Functions](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-pattern-matching-functions.html) in the *Amazon Managed Service for Apache Flink SQL Reference*.
  + `FAST_REGEX_LOG_PARSER` – Works similar to the regex parser, but it takes several shortcuts to ensure faster results. For example, the fast regex parser stops at the first match it finds (known as *lazy semantics*).
  + `FIXED_COLUMN_LOG_PARSE` – Parses fixed-width fields and automatically converts them to the given SQL types.
  + `REGEX_LOG_PARSE` – Parses a string based on default Java regular expression patterns.
  + `SYS_LOG_PARSE` – Parses entries commonly found in UNIX/Linux system logs.
  + `VARIABLE_COLUMN_LOG_PARSE` – Splits an input string into fields separated by a delimiter character or a delimiter string.
  + `W3C_LOG_PARSE` – Can be used for quickly formatting Apache logs.

For examples using these functions, see the following topics:

**Topics**
+ [Example: Extracting a Portion of a String (SUBSTRING Function)](examples-transforming-strings-substring.md)
+ [Example: Replacing a Substring using Regex (REGEX\$1REPLACE Function)](examples-transforming-strings-regexreplace.md)
+ [Example: Parsing Log Strings Based on Regular Expressions (REGEX\$1LOG\$1PARSE Function)](examples-transforming-strings-regexlogparse.md)
+ [Example: Parsing Web Logs (W3C\$1LOG\$1PARSE Function)](examples-transforming-strings-w3clogparse.md)
+ [Example: Split Strings into Multiple Fields (VARIABLE\$1COLUMN\$1LOG\$1PARSE Function)](examples-transforming-strings-variablecolumnlogparse.md)

# Example: Extracting a Portion of a String (SUBSTRING Function)
<a name="examples-transforming-strings-substring"></a>

This example uses the `SUBSTRING` function to transform a string in Amazon Kinesis Data Analytics. The `SUBSTRING` function extracts a portion of a source string starting at a specific position. For more information, see [SUBSTRING](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-substring.html) in the *Amazon Managed Service for Apache Flink SQL Reference*. 

In this example, you write the following records to an Amazon Kinesis data stream. 

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



You then create an Kinesis Data Analytics application on the console, using the Kinesis data stream as the streaming source. The discovery process reads sample records on the streaming source and infers an in-application schema with one column (`REFERRER`), as shown.

![\[Console screenshot showing the in-application schema with a list of URLs in the referrer column.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/referrer-10.png)


Then, you use the application code with the `SUBSTRING` function to parse the URL string to retrieve the company name. Then you insert the resulting data into another in-application stream, as shown following: 



![\[Console screenshot showing real-time analytics tab with resulting data in the in-application stream.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/referrer-20.png)


**Topics**
+ [Step 1: Create a Kinesis Data Stream](#examples-transforming-strings-substring-1)
+ [Step 2: Create the Kinesis Data Analytics Application](#examples-transforming-strings-substring-2)

## Step 1: Create a Kinesis Data Stream
<a name="examples-transforming-strings-substring-1"></a>

Create an Amazon Kinesis data stream and populate the log records as follows:

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. Choose **Create Kinesis stream**, and create a stream with one shard. For more information, see [Create a Stream](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) in the *Amazon Kinesis Data Streams Developer Guide*.

1. Run the following Python code to populate sample log records. This simple code continuously writes the same log record to the stream.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Step 2: Create the Kinesis Data Analytics Application
<a name="examples-transforming-strings-substring-2"></a>

Next, create an Kinesis Data Analytics application as follows:

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics).

1. Choose **Create application**, type an application name, and choose **Create application**.

1. On the application details page, choose **Connect streaming data**.

1. On the **Connect to source** page, do the following:

   1. Choose the stream that you created in the preceding section. 

   1. Choose the option to create an IAM role.

   1. Choose **Discover schema**. Wait for the console to show the inferred schema and samples records used to infer the schema for the in-application stream created. The inferred schema has only one column.

   1. Choose **Save and continue**.

   

1. On the application details page, choose **Go to SQL editor**. To start the application, choose **Yes, start application** in the dialog box that appears.

1. In the SQL editor, write the application code, and verify the results as follows:

   1. Copy the following application code and paste it into the editor.

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               SUBSTRING("referrer", 12, (POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4)) 
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. Choose **Save and run SQL**. On the **Real-time analytics **tab, you can see all the in-application streams that the application created and verify the data. 

# Example: Replacing a Substring using Regex (REGEX\$1REPLACE Function)
<a name="examples-transforming-strings-regexreplace"></a>

This example uses the `REGEX_REPLACE` function to transform a string in Amazon Kinesis Data Analytics. `REGEX_REPLACE` replaces a substring with an alternative substring. For more information, see [REGEX\$1REPLACE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-replace.html) in the *Amazon Managed Service for Apache Flink SQL Reference*.

In this example, you write the following records to an Amazon Kinesis data stream: 

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



You then create an Kinesis Data Analytics application on the console, with the Kinesis data stream as the streaming source. The discovery process reads sample records on the streaming source and infers an in-application schema with one column (REFERRER) as shown.

![\[Console screenshot showing in-application schema with list of URLs in the referrer column.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/referrer-10.png)


Then, you use the application code with the `REGEX_REPLACE` function to convert the URL to use `https://` instead of `http://`. You insert the resulting data into another in-application stream, as shown following: 



![\[Console screenshot showing resulting data table with ROWTIME, ingest_time, and referrer columns.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/ex_regex_replace.png)


**Topics**
+ [Step 1: Create a Kinesis Data Stream](#examples-transforming-strings-regexreplace-1)
+ [Step 2: Create the Kinesis Data Analytics Application](#examples-transforming-strings-regexreplace-2)

## Step 1: Create a Kinesis Data Stream
<a name="examples-transforming-strings-regexreplace-1"></a>

Create an Amazon Kinesis data stream and populate the log records as follows:

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. Choose **Create Kinesis stream**, and create a stream with one shard. For more information, see [Create a Stream](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) in the *Amazon Kinesis Data Streams Developer Guide*.

1. Run the following Python code to populate the sample log records. This simple code continuously writes the same log record to the stream.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Step 2: Create the Kinesis Data Analytics Application
<a name="examples-transforming-strings-regexreplace-2"></a>

Next, create an Kinesis Data Analytics application as follows:

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics).

1. Choose **Create application**, type an application name, and choose **Create application**.

1. On the application details page, choose **Connect streaming data**. 

1. On the **Connect to source** page, do the following:

   1. Choose the stream that you created in the preceding section. 

   1. Choose the option to create an IAM role.

   1. Choose **Discover schema**. Wait for the console to show the inferred schema and samples records used to infer the schema for the in-application stream created. The inferred schema has only one column.

   1. Choose **Save and continue**.

   

1. On the application details page, choose **Go to SQL editor**. To start the application, choose **Yes, start application** in the dialog box that appears.

1. In the SQL editor, write the application code and verify the results as follows:

   1. Copy the following application code, and paste it into the editor:

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0)
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. Choose **Save and run SQL**. On the **Real-time analytics **tab, you can see all the in-application streams that the application created and verify the data. 

# Example: Parsing Log Strings Based on Regular Expressions (REGEX\$1LOG\$1PARSE Function)
<a name="examples-transforming-strings-regexlogparse"></a>

This example uses the `REGEX_LOG_PARSE` function to transform a string in Amazon Kinesis Data Analytics. `REGEX_LOG_PARSE` parses a string based on default Java regular expression patterns. For more information, see [REGEX\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-log-parse.html) in the *Amazon Managed Service for Apache Flink SQL Reference*.

In this example, you write the following records to an Amazon Kinesis stream: 

```
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
...
```



You then create an Kinesis Data Analytics application on the console, with the Kinesis data stream as the streaming source. The discovery process reads sample records on the streaming source and infers an in-application schema with one column (LOGENTRY), as shown following.

![\[Console screenshot showing in-application schema with LOGENTRY column.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/ex_regex_log_parse_0.png)


Then, you use the application code with the `REGEX_LOG_PARSE` function to parse the log string to retrieve the data elements. You insert the resulting data into another in-application stream, as shown in the following screenshot: 



![\[Console screenshot showing the resulting data table with ROWTIME, LOGENTRY, MATCH1, and MATCH2 columns.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/ex_regex_log_parse_1.png)


**Topics**
+ [Step 1: Create a Kinesis Data Stream](#examples-transforming-strings-regexlogparse-1)
+ [Step 2: Create the Kinesis Data Analytics Application](#examples-transforming-strings-regexlogparse-2)

## Step 1: Create a Kinesis Data Stream
<a name="examples-transforming-strings-regexlogparse-1"></a>

Create an Amazon Kinesis data stream and populate the log records as follows:

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. Choose **Create Kinesis stream**, and create a stream with one shard. For more information, see [Create a Stream](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) in the *Amazon Kinesis Data Streams Developer Guide*.

1. Run the following Python code to populate sample log records. This simple code continuously writes the same log record to the stream.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] "
           '"GET /index.php HTTP/1.1" 200 125 "-" '
           '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Step 2: Create the Kinesis Data Analytics Application
<a name="examples-transforming-strings-regexlogparse-2"></a>

Next, create an Kinesis Data Analytics application as follows:

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics).

1. Choose **Create application**, and specify an application name.

1. On the application details page, choose **Connect streaming data**. 

1. On the **Connect to source** page, do the following:

   1. Choose the stream that you created in the preceding section. 

   1. Choose the option to create an IAM role.

   1. Choose **Discover schema**. Wait for the console to show the inferred schema and samples records used to infer the schema for the in-application stream created. The inferred schema has only one column.

   1. Choose **Save and continue**.

   

1. On the application details page, choose **Go to SQL editor**. To start the application, choose **Yes, start application** in the dialog box that appears.

1. In the SQL editor, write the application code, and verify the results as follows:

   1. Copy the following application code and paste it into the editor.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (logentry VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24));
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM T.LOGENTRY, T.REC.COLUMN1, T.REC.COLUMN2
          FROM 
               (SELECT STREAM LOGENTRY,
                   REGEX_LOG_PARSE(LOGENTRY, '(\w.+) (\d.+) (\w.+) (\w.+)') AS REC
                   FROM SOURCE_SQL_STREAM_001) AS T;
      ```

   1. Choose **Save and run SQL**. On the **Real-time analytics **tab, you can see all the in-application streams that the application created and verify the data.

# Example: Parsing Web Logs (W3C\$1LOG\$1PARSE Function)
<a name="examples-transforming-strings-w3clogparse"></a>

This example uses the `W3C_LOG_PARSE` function to transform a string in Amazon Kinesis Data Analytics. You can use `W3C_LOG_PARSE` to format Apache logs quickly. For more information, see [W3C\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-w3c-log-parse.html) in the *Amazon Managed Service for Apache Flink SQL Reference*.

In this example, you write log records to an Amazon Kinesis data stream. Example logs are shown following:

```
{"Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pba.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:03 -0700] "GET /icons/apache_pbb.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:04 -0700] "GET /icons/apache_pbc.gif HTTP/1.1" 304 0"}
...
```



You then create an Kinesis Data Analytics application on the console, with the Kinesis data stream as the streaming source. The discovery process reads sample records on the streaming source and infers an in-application schema with one column (log), as shown following:

![\[Console screenshot showing formatted stream sample tab with the in-application schema containing the log column.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/log-10.png)


Then, you use the application code with the `W3C_LOG_PARSE` function to parse the log, and create another in-application stream with various log fields in separate columns, as shown following:

![\[Console screenshot showing real-time analytics tab with in-application stream.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/log-20.png)


**Topics**
+ [Step 1: Create a Kinesis Data Stream](#examples-transforming-strings-w3clogparse-1)
+ [Step 2: Create the Kinesis Data Analytics Application](#examples-transforming-strings-w3clogparse-2)

## Step 1: Create a Kinesis Data Stream
<a name="examples-transforming-strings-w3clogparse-1"></a>

Create an Amazon Kinesis data stream, and populate the log records as follows:

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. Choose **Create Kinesis stream**, and create a stream with one shard. For more information, see [Create a Stream](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) in the *Amazon Kinesis Data Streams Developer Guide*.

1. Run the following Python code to populate the sample log records. This simple code continuously writes the same log record to the stream.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] "
           '"GET /icons/apache_pb.gif HTTP/1.1" 304 0'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## Step 2: Create the Kinesis Data Analytics Application
<a name="examples-transforming-strings-w3clogparse-2"></a>

Create an Kinesis Data Analytics application as follows:

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics).

1. Choose **Create application**, type an application name, and choose **Create application**.

1. On the application details page, choose **Connect streaming data**.

1. On the **Connect to source** page, do the following:

   1. Choose the stream that you created in the preceding section. 

   1. Choose the option to create an IAM role.

   1. Choose **Discover schema**. Wait for the console to show the inferred schema and samples records used to infer the schema for the in-application stream created. The inferred schema has only one column.

   1. Choose **Save and continue**.

   

1. On the application details page, choose **Go to SQL editor**. To start the application, choose **Yes, start application** in the dialog box that appears.

1. In the SQL editor, write the application code, and verify the results as follows:

   1. Copy the following application code and paste it into the editor.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
      column1 VARCHAR(16),
      column2 VARCHAR(16),
      column3 VARCHAR(16),
      column4 VARCHAR(16),
      column5 VARCHAR(16),
      column6 VARCHAR(16),
      column7 VARCHAR(16));
      
      CREATE OR REPLACE PUMP "myPUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
              SELECT STREAM
                  l.r.COLUMN1,
                  l.r.COLUMN2,
                  l.r.COLUMN3,
                  l.r.COLUMN4,
                  l.r.COLUMN5,
                  l.r.COLUMN6,
                  l.r.COLUMN7
              FROM (SELECT STREAM W3C_LOG_PARSE("log", 'COMMON')
                    FROM "SOURCE_SQL_STREAM_001") AS l(r);
      ```

   1. Choose **Save and run SQL**. On the **Real-time analytics **tab, you can see all the in-application streams that the application created and verify the data.

# Example: Split Strings into Multiple Fields (VARIABLE\$1COLUMN\$1LOG\$1PARSE Function)
<a name="examples-transforming-strings-variablecolumnlogparse"></a>

This example uses the `VARIABLE_COLUMN_LOG_PARSE` function to manipulate strings in Kinesis Data Analytics. `VARIABLE_COLUMN_LOG_PARSE` splits an input string into fields separated by a delimiter character or a delimiter string. For more information, see [VARIABLE\$1COLUMN\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-variable-column-log-parse.html) in the *Amazon Managed Service for Apache Flink SQL Reference*.

In this example, you write semi-structured records to an Amazon Kinesis data stream. The example records are as follows:

```
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
```



You then create an Kinesis Data Analytics application on the console, using the Kinesis stream as the streaming source. The discovery process reads sample records on the streaming source and infers an in-application schema with four columns, as shown following:

![\[Console screenshot showing in-application schema with 4 columns.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/unstructured-10.png)


Then, you use the application code with the `VARIABLE_COLUMN_LOG_PARSE` function to parse the comma-separated values, and insert normalized rows in another in-application stream, as shown following:



![\[Console screenshot showing real-time analytics tab with in-application stream.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/unstructured-20.png)


**Topics**
+ [Step 1: Create a Kinesis Data Stream](#examples-transforming-strings-variablecolumnlogparse-1)
+ [Step 2: Create the Kinesis Data Analytics Application](#examples-transforming-strings-variablecolumnlogparse-2)

## Step 1: Create a Kinesis Data Stream
<a name="examples-transforming-strings-variablecolumnlogparse-1"></a>

Create an Amazon Kinesis data stream and populate the log records as follows:

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. Choose **Create Kinesis stream**, and create a stream with one shard. For more information, see [Create a Stream](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) in the *Amazon Kinesis Data Streams Developer Guide*.

1. Run the following Python code to populate the sample log records. This simple code continuously writes the same log record to the stream.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

   

## Step 2: Create the Kinesis Data Analytics Application
<a name="examples-transforming-strings-variablecolumnlogparse-2"></a>

Create an Kinesis Data Analytics application as follows:

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics).

1. Choose **Create application**, type an application name, and choose **Create application**.

1. On the application details page, choose **Connect streaming data**. 

1. On the **Connect to source** page, do the following:

   1. Choose the stream that you created in the preceding section.

   1. Choose the option to create an IAM role.

   1. Choose **Discover schema**. Wait for the console to show the inferred schema and samples records used to infer the schema for the in-application stream created. Note that the inferred schema has only one column.

   1. Choose **Save and continue**.

   

1. On the application details page, choose **Go to SQL editor**. To start the application, choose **Yes, start application** in the dialog box that appears.

1. In the SQL editor, write application code, and verify the results:

   1. Copy the following application code and paste it into the editor:

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
                  "column_A" VARCHAR(16),
                  "column_B" VARCHAR(16),
                  "column_C" VARCHAR(16),
                  "COL_1" VARCHAR(16),             
                  "COL_2" VARCHAR(16),            
                  "COL_3" VARCHAR(16));
      
      CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM  t."Col_A", t."Col_B", t."Col_C",
                        t.r."COL_1", t.r."COL_2", t.r."COL_3"
         FROM (SELECT STREAM 
                 "Col_A", "Col_B", "Col_C",
                 VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
                                           'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)',
                                           ',') AS r 
               FROM "SOURCE_SQL_STREAM_001") as t;
      ```

   1. Choose **Save and run SQL**. On the **Real-time analytics **tab, you can see all the in-application streams that the application created and verify the data.

# Example: Transforming DateTime Values
<a name="app-string-datetime-manipulation"></a>

Amazon Kinesis Data Analytics supports converting columns to time stamps. For example, you might want to use your own time stamp as part of a `GROUP BY` clause as another time-based window, in addition to the `ROWTIME` column. Kinesis Data Analytics provides operations and SQL functions for working with date and time fields. 
+ **Date and time operators** – You can perform arithmetic operations on dates, times, and interval data types. For more information, see [Date, Timestamp, and Interval Operators](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-timestamp-interval.html) in the *Amazon Managed Service for Apache Flink SQL Reference*.

   
+ **SQL Functions** – These include the following. For more information, see [Date and Time Functions](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html) in the *Amazon Managed Service for Apache Flink SQL Reference*. 
  + `EXTRACT()` – Extracts one field from a date, time, time stamp, or interval expression.
  + `CURRENT_TIME` – Returns the time when the query executes (UTC).
  + `CURRENT_DATE` – Returns the date when the query executes (UTC).
  + `CURRENT_TIMESTAMP` – Returns the time stamp when the query executes (UTC).
  + `LOCALTIME` – Returns the current time when the query executes as defined by the environment on which Kinesis Data Analytics is running (UTC).
  + `LOCALTIMESTAMP` – Returns the current time stamp as defined by the environment on which Kinesis Data Analytics is running (UTC).

     
+ **SQL Extensions** – These include the following. For more information, see [Date and Time Functions](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html) and [Datetime Conversion Functions](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-datetime-conversion-functions.html) in the *Amazon Managed Service for Apache Flink SQL Reference*. 
  + `CURRENT_ROW_TIMESTAMP` – Returns a new time stamp for each row in the stream. 
  + `TSDIFF` – Returns the difference of two time stamps in milliseconds.
  + `CHAR_TO_DATE` – Converts a string to a date.
  + `CHAR_TO_TIME` – Converts a string to time.
  + `CHAR_TO_TIMESTAMP` – Converts a string to a time stamp.
  + `DATE_TO_CHAR` – Converts a date to a string.
  + `TIME_TO_CHAR` – Converts a time to a string.
  + `TIMESTAMP_TO_CHAR` – Converts a time stamp to a string.

Most of the preceding SQL functions use a format to convert the columns. The format is flexible. For example, you can specify the format `yyyy-MM-dd hh:mm:ss` to convert an input string `2009-09-16 03:15:24` into a time stamp. For more information, [Char To Timestamp(Sys)](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-char-to-timestamp.html) in the *Amazon Managed Service for Apache Flink SQL Reference*. 

## Example: Transforming Dates
<a name="examples-transforming-dates"></a>

In this example, you write the following records to an Amazon Kinesis data stream. 

```
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"}
{"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"}
{"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"}
...
```



You then create an Kinesis Data Analytics application on the console, with the Kinesis stream as the streaming source. The discovery process reads sample records on the streaming source and infers an in-application schema with two columns (`EVENT_TIME` and `TICKER`) as shown.

![\[Console screenshot showing the in-application schema with event time and ticker columns..\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/ex_datetime_convert_0.png)


Then, you use the application code with SQL functions to convert the `EVENT_TIME` time stamp field in various ways. You then insert the resulting data into another in-application stream, as shown in the following screenshot: 



![\[Console screenshot showing the resulting data in an in-application stream..\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/ex_datetime_convert_1.png)




### Step 1: Create a Kinesis Data Stream
<a name="examples-transforming-dates-1"></a>

Create an Amazon Kinesis data stream and populate it with event time and ticker records as follows:

1. Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Choose **Data Streams** in the navigation pane.

1. Choose **Create Kinesis stream**, and create a stream with one shard.

1. Run the following Python code to populate the stream with sample data. This simple code continuously writes a record with a random ticker symbol and the current time stamp to the stream.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

### Step 2: Create the Amazon Kinesis Data Analytics Application
<a name="examples-transforming-dates-2"></a>

Create an application as follows:

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics).

1. Choose **Create application**, type an application name, and choose **Create application**.

1. On the application details page, choose **Connect streaming data** to connect to the source.

1. On the **Connect to source** page, do the following:

   1. Choose the stream that you created in the preceding section. 

   1. Choose to create an IAM role.

   1. Choose **Discover schema**. Wait for the console to show the inferred schema and the sample records that are used to infer the schema for the in-application stream created. The inferred schema has two columns.

   1. Choose **Edit Schema**. Change the **Column type** of the **EVENT\$1TIME** column to `TIMESTAMP`.

   1. Choose **Save schema and update stream samples**. After the console saves the schema, choose **Exit**.

   1. Choose **Save and continue**.

   

1. On the application details page, choose **Go to SQL editor**. To start the application, choose **Yes, start application** in the dialog box that appears.

1. In the SQL editor, write the application code and verify the results as follows:

   1. Copy the following application code and paste it into the editor.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          TICKER VARCHAR(4), 
          event_time TIMESTAMP, 
          five_minutes_before TIMESTAMP, 
          event_unix_timestamp BIGINT,
          event_timestamp_as_char VARCHAR(50),
          event_second INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      
      SELECT STREAM 
          TICKER, 
          EVENT_TIME,
          EVENT_TIME - INTERVAL '5' MINUTE,
          UNIX_TIMESTAMP(EVENT_TIME),
          TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
          EXTRACT(SECOND FROM EVENT_TIME) 
      FROM "SOURCE_SQL_STREAM_001"
      ```

   1. Choose **Save and run SQL**. On the **Real-time analytics **tab, you can see all the in-application streams that the application created and verify the data. 

# Example: Transforming Multiple Data Types
<a name="app-tworecordtypes"></a>

 A common requirement in extract, transform, and load (ETL) applications is to process multiple record types on a streaming source. You can create Kinesis Data Analytics applications to process these kinds of streaming sources. The process is as follows:

1. First, you map the streaming source to an in-application input stream, similar to all other Kinesis Data Analytics applications.

1. Then, in your application code, you write SQL statements to retrieve rows of specific types from the in-application input stream. You then insert them into separate in-application streams. (You can create additional in-application streams in your application code.)

In this exercise, you have a streaming source that receives records of two types (`Order` and `Trade`). These are stock orders and corresponding trades. For each order, there can be zero or more trades. Example records of each type are shown following:

**Order record**

```
{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}
```

**Trade record**

```
{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}
```

When you create an application using the AWS Management Console, the console displays the following inferred schema for the in-application input stream created. By default, the console names this in-application stream `SOURCE_SQL_STREAM_001`.

![\[Console screenshot showing the formatted in-application stream sample.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/two-record-types-10.png)


When you save the configuration, Amazon Kinesis Data Analytics continuously reads data from the streaming source and inserts rows in the in-application stream. You can now perform analytics on data in the in-application stream. 

In the application code in this example, you first create two additional in-application streams, `Order_Stream` and `Trade_Stream`. You then filter the rows from the `SOURCE_SQL_STREAM_001` stream based on the record type and insert them in the newly created streams using pumps. For information about this coding pattern, see [Application Code](how-it-works-app-code.md).

1. Filter order and trade rows into separate in-application streams:

   1. Filter the order records in the `SOURCE_SQL_STREAM_001`, and save the orders in the `Order_Stream`.

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  order_id     integer, 
                  order_type   varchar(10),
                  ticker       varchar(4),
                  order_price  DOUBLE, 
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM oid, otype,oticker, oprice, recordtype 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Order';
      ```

   1. Filter the trade records in the `SOURCE_SQL_STREAM_001`, and save the orders in the `Trade_Stream`.

      ```
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 (trade_id     integer, 
                  order_id     integer, 
                  trade_price  DOUBLE, 
                  ticker       varchar(4),
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM tid, toid, tprice, tticker, recordtype
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Trade';
      ```

1. Now you can perform additional analytics on these streams. In this example, you count the number of trades by the ticker in a one-minute [tumbling window](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/tumbling-window-concepts.html) and save the results to yet another stream, `DESTINATION_SQL_STREAM`. 

   ```
   --do some analytics on the Trade_Stream and Order_Stream. 
   -- To see results in console you must write to OPUT_SQL_STREAM.
   
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
               ticker  varchar(4),
               trade_count   integer
               );
   
   CREATE OR REPLACE PUMP "Output_Pump" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker, count(*) as trade_count
         FROM   "Trade_Stream"
         GROUP BY ticker,
                   FLOOR("Trade_Stream".ROWTIME TO MINUTE);
   ```

   You see the result, as shown following:  
![\[Console screenshot showing the results on the SQL results tab.\]](http://docs.aws.amazon.com/kinesisanalytics/latest/dev/images/two-record-types-20.png)

**Topics**
+ [Step 1: Prepare the Data](tworecordtypes-prepare.md)
+ [Step 2: Create the Application](tworecordtypes-create-app.md)

**Next Step**  
[Step 1: Prepare the Data](tworecordtypes-prepare.md)

# Step 1: Prepare the Data
<a name="tworecordtypes-prepare"></a>

In this section, you create a Kinesis data stream, and then populate order and trade records on the stream. This is your streaming source for the application that you create in the next step.

**Topics**
+ [Step 1.1: Create a Streaming Source](#tworecordtypes-prepare-create-stream)
+ [Step 1.2: Populate the Streaming Source](#tworecordtypes-prepare-populate-stream)

## Step 1.1: Create a Streaming Source
<a name="tworecordtypes-prepare-create-stream"></a>

You can create a Kinesis data stream using the console or the AWS CLI. The example assumes `OrdersAndTradesStream` as the stream name. 
+ **Using the console** – Sign in to the AWS Management Console and open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis). Choose **Data Streams**, and then create a stream with one shard. For more information, see [Create a Stream](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) in the *Amazon Kinesis Data Streams Developer Guide*.
+ **Using the AWS CLI** – Use the following Kinesis `create-stream` AWS CLI command to create the stream:

  ```
  $ aws kinesis create-stream \
  --stream-name OrdersAndTradesStream \
  --shard-count 1 \
  --region us-east-1 \
  --profile adminuser
  ```

## Step 1.2: Populate the Streaming Source
<a name="tworecordtypes-prepare-populate-stream"></a>

Run the following Python script to populate sample records on the `OrdersAndTradesStream`. If you created the stream with a different name, update the Python code appropriately. 

1. Install Python and `pip`.

   For information about installing Python, see the [Python](https://www.python.org/) website. 

   You can install dependencies using pip. For information about installing pip, see [Installation](https://pip.pypa.io/en/stable/installing/) on the pip website.

1. Run the following Python code. The `put-record` command in the code writes the JSON records to the stream.

   ```
    
   import json
   import random
   import boto3
   
   STREAM_NAME = "OrdersAndTradesStream"
   PARTITION_KEY = "partition_key"
   
   
   def get_order(order_id, ticker):
       return {
           "RecordType": "Order",
           "Oid": order_id,
           "Oticker": ticker,
           "Oprice": random.randint(500, 10000),
           "Otype": "Sell",
       }
   
   
   def get_trade(order_id, trade_id, ticker):
       return {
           "RecordType": "Trade",
           "Tid": trade_id,
           "Toid": order_id,
           "Tticker": ticker,
           "Tprice": random.randint(0, 3000),
       }
   
   
   def generate(stream_name, kinesis_client):
       order_id = 1
       while True:
           ticker = random.choice(["AAAA", "BBBB", "CCCC"])
           order = get_order(order_id, ticker)
           print(order)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY
           )
           for trade_id in range(1, random.randint(0, 6)):
               trade = get_trade(order_id, trade_id, ticker)
               print(trade)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(trade),
                   PartitionKey=PARTITION_KEY,
               )
           order_id += 1
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**Next Step**  
 [Step 2: Create the Application](tworecordtypes-create-app.md)

# Step 2: Create the Application
<a name="tworecordtypes-create-app"></a>

In this section, you create an Kinesis Data Analytics application. You then update the application by adding input configuration that maps the streaming source you created in the preceding section to an in-application input stream. 

1. Open the Managed Service for Apache Flink console at [ https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics).

1. Choose **Create application**. This example uses the application name **ProcessMultipleRecordTypes**.

1. On the application details page, choose **Connect streaming data** to connect to the source. 

1. On the **Connect to source** page, do the following:

   1. Choose the stream that you created in [Step 1: Prepare the Data](tworecordtypes-prepare.md). 

   1. Choose to create an IAM role.

   1. Wait for the console to show the inferred schema and samples records that are used to infer the schema for the in-application stream created.

   1. Choose **Save and continue**.

1. On the application hub, choose **Go to SQL editor**. To start the application, choose **Yes, start application** in the dialog box that appears.

1. In the SQL editor, write the application code and verify the results:

   1. Copy the following application code and paste it into the editor.

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  "order_id"     integer, 
                  "order_type"   varchar(10),
                  "ticker"       varchar(4),
                  "order_price"  DOUBLE, 
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Order';
      --********************************************
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 ("trade_id"     integer, 
                  "order_id"     integer, 
                  "trade_price"  DOUBLE, 
                  "ticker"       varchar(4),
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType"
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Trade';
      --*****************************************************************
      --do some analytics on the Trade_Stream and Order_Stream. 
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                  "ticker"  varchar(4),
                  "trade_count"   integer
                  );
      
      CREATE OR REPLACE PUMP "Output_Pump" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM "ticker", count(*) as trade_count
            FROM   "Trade_Stream"
            GROUP BY "ticker",
                      FLOOR("Trade_Stream".ROWTIME TO MINUTE);
      ```

   1. Choose **Save and run SQL**. Choose the **Real-time analytics** tab to see all of the in-application streams that the application created and verify the data. 

   

**Next Step**  
You can configure application output to persist results to an external destination, such as another Kinesis stream or a Firehose data delivery stream. 