

在仔細考慮之後，我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式：

1. 從 **2025 年 9 月 1 日起，**我們不會為 Amazon Kinesis Data Analytics for SQL 應用程式提供任何錯誤修正，因為考慮到即將終止，我們將對其提供有限的支援。

2. 從 **2025 年 10 月 15 日起，**您將無法建立新的 Kinesis Data Analytics for SQL 應用程式。

3. 我們將自 **2026 年 1 月 27** 日起刪除您的應用程式。您將無法啟動或操作 Amazon Kinesis Data Analytics for SQL 應用程式。從那時起，Amazon Kinesis Data Analytics for SQL 將不再提供支援。如需詳細資訊，請參閱[Amazon Kinesis Data Analytics for SQL 應用程式終止](discontinuation.md)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Kinesis Data Analytics for SQL 範例
<a name="examples"></a>

本節提供在 Amazon Kinesis Data Analytics 中建立及使用應用程式的範例。其中包含範例程式碼和逐步指示，可協助您建立 Kinesis Data Analytics 應用程式並測試結果。

 在探索這些範例之前，我們建議先檢閱 [Amazon Kinesis Data Analytics for SQL 應用程式：運作方式](how-it-works.md) 與 [Amazon Kinesis Data Analytics for SQL 應用程式入門](getting-started.md)：

**Topics**
+ [範例：轉換資料](examples-transforming.md)
+ [範例：視窗與彙總](examples-window.md)
+ [範例：聯結](examples-joins.md)
+ [範例：機器學習](examples-machine.md)
+ [範例：提醒與錯誤](examples-alerts.md)
+ [範例：解決方案加速器](examples_solution.md)

# 範例：轉換資料
<a name="examples-transforming"></a>

有時候，您的應用程式碼必須預先處理傳入的記錄，然後才能在 Amazon Kinesis Data Analytics 中執行分析。發生這種情況的原因有許多種，例如記錄不符合支援的記錄格式，導致應用程式內輸入串流中產生非標準化的資料欄。

本節提供了如何使用可用的字串函數來標準化數據，與如何從字串資料欄提取所需資訊等範例。本節還指出了您可能會覺得有用的日期時間函數。

## 使用 Lambda 預處理串流
<a name="examples-transforming-lambda"></a>

如需使用 預先處理串流的資訊 AWS Lambda，請參閱 [使用 Lambda 函數預處理資料](lambda-preprocessing.md)。

**Topics**
+ [使用 Lambda 預處理串流](#examples-transforming-lambda)
+ [範例：轉換字串值](examples-transforming-strings.md)
+ [範例：轉換 DateTime 值](app-string-datetime-manipulation.md)
+ [範例：轉換多個資料類型](app-tworecordtypes.md)

# 範例：轉換字串值
<a name="examples-transforming-strings"></a>

Amazon Kinesis Data Analytics 支援 JSON 和 CSV 等格式，用於串流來源上的記錄。如需詳細資訊，請參閱[RecordFormat](API_RecordFormat.md)。然後，這些記錄會根據輸入組態映射到應用程式內串流的列。如需詳細資訊，請參閱[設定應用程式輸入](how-it-works-input.md)。輸入組態會指定串流來源中的記錄欄位的映射方式，將其對應至應用程式內串流中的資料欄。

當串流來源上的記錄遵循支援的格式時，此映射就會運作，產出內含標準化資料的應用程式內串流。但如果串流來源上的資料不符合支援的標準，該怎麼辦？ 舉例來說，如果您的串流來源包含點擊流資料、IoT 感應器和應用程式日誌等資料，該怎麼辦？ 

請考量以下範例：
+ 串流來源包含應用程式日誌：應用程式日誌會遵循標準 Apache 日誌格式，並使用 JSON 格式寫入串流。

  ```
  {
     "Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pb.gif HTTP/1.1" 304 0"
  }
  ```

  如需有關標準 Apache 日誌格式的詳細資訊，請參閱 Apache 網站上的[日誌檔案](https://httpd.apache.org/docs/2.4/logs.html)。

   
+ 串流來源包含半結構化資料：下列範例顯示兩筆記錄。`Col_E_Unstructured` 欄位值是一系列逗號分隔值。共有五個資料欄：前四個資料行具有字串類型值，最後一欄包含逗號分隔值。

  ```
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  ```
+ 串流來源上的記錄包含 URL，而您需要部分 URL 網域名稱以進行分析。

  ```
  { "referrer" : "http://www.amazon.com"}
  { "referrer" : "http://www.stackoverflow.com" }
  ```

在這種情況下，下列兩步驟程序通常適用於建立內含標準化資料的應用程式內串流：

1. 設定應用程式輸入，將非結構化欄位映射至所建立的應用程式內輸入串流中 `VARCHAR(N)` 類型的資料欄。

1. 在應用程式碼中，使用字串函數將此單一欄分割成多個資料欄，然後將資料列儲存在另一個應用程式內串流中。您的應用程式碼建立的應用程式內串流將有標準化資料。接著您可以分析應用程式內串流。

Amazon Kinesis Data Analytics 提供下列字串操作、標準 SQL 函數，以及 SQL 標準的擴充功能，以便使用字串欄：
+ **字串運算子**：運算子 (例如 `LIKE` 和 `SIMILAR` ) 在比較字串時很有用。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[字串運算子](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-operators.html)。
+ **SQL 函數**：下列函數在操作個別字串時非常有用。如需詳細資訊，請參閱* Amazon Managed Service for Apache Flink SQL 參考資料*中的[字串與搜尋函數](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-and-search-functions.html)。
  + `CHAR_LENGTH` - 提供字串的長度。
  + `INITCAP` - 傳回輸入字串的轉換版本，使得每個空格分隔文字的第一個字元都是大寫字母，而其他所有字元都是小寫字母。
  + `LOWER/UPPER` - 將字串轉換為小寫或大寫。
  + `OVERLAY` - 以第二個字串引數 (取代字串) 取代第一個字串引數 (原始字串) 的一部分。
  + `POSITION` - 搜尋另一個字串中的字串。
  + `REGEX_REPLACE` - 以替代子字串取代子字串。
  + `SUBSTRING` - 從特定位置開始擷取來源字串的一部分。
  + `TRIM` - 從來源字串的開頭或結尾移除指定字元的執行個體。
+ **SQL 擴充功能**:這些對於使用非結構化字串 (例如日誌和 URI) 非常有用。如需詳細資訊，請參閱* Amazon Managed Service for Apache Flink SQL 參考資料*中的[日誌剖析函數](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-pattern-matching-functions.html)。
  + `FAST_REGEX_LOG_PARSER` - 工作方式類似於 regex 剖析器，但需要幾個快捷方式來確保更快的結果。舉例來說，快速 regex 剖析器找到第一個匹配 (稱為*懶惰語義*) 後就會停止。
  + `FIXED_COLUMN_LOG_PARSE` - 剖析固定寬度欄位，並自動將其轉換為指定的 SQL 類型。
  + `REGEX_LOG_PARSE` - 根據預設 Java 規則表達式模式剖析字串。
  + `SYS_LOG_PARSE` - 剖析 UNIX/Linux 系統日誌中常見的項目。
  + `VARIABLE_COLUMN_LOG_PARSE` - 將輸入字串分割為以分隔符號字元或字串分隔的欄位。
  + `W3C_LOG_PARSE` - 可用於快速格式化 Apache 日誌。

如需使用這些函式的範例，請參閱主題：

**Topics**
+ [範例：擷取字串的一部分 (子字串函數)](examples-transforming-strings-substring.md)
+ [範例：使用 Regex (REGEX\$1REPLACE 函數) 替換子字串](examples-transforming-strings-regexreplace.md)
+ [範例：根據規則表達式剖析日誌字串 (REGEX\$1LOG\$1PARSE 函數)](examples-transforming-strings-regexlogparse.md)
+ [範例：剖析網頁日誌 (W3C\$1LOG\$1PARSE 函式)](examples-transforming-strings-w3clogparse.md)
+ [範例：將字串拆分為多個字段 (VARIABLE\$1COLUMN\$1LOG\$1PARSE 函數)](examples-transforming-strings-variablecolumnlogparse.md)

# 範例：擷取字串的一部分 (子字串函數)
<a name="examples-transforming-strings-substring"></a>

此範例使用 `SUBSTRING` 函數來轉換 Amazon Kinesis Data Analytics 中的串流。`SUBSTRING` 函數從特定位置擷取來源字串的一部分。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[子字串](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-substring.html)。

在此範例中，將下列記錄寫入 Amazon Kinesis 資料串流。

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



接著，您可以在主控台上建立 Kinesis Data Analytics 應用程式，並將 Kinesis 資料串流當作串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有一個資料欄 (`REFERRER`) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含推薦網站欄中的 URL 清單。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/referrer-10.png)


然後，使用應用程式碼搭配 `SUBSTRING` 函數來剖析 URL 字串，以擷取公司名稱。接著將產生的資料插入另一個應用程式內串流，如下所示：



![\[主控台螢幕擷取畫面，顯示即時分析標籤，與應用程式內串流生成的資料。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/referrer-20.png)


**Topics**
+ [步驟 1：建立 Kinesis 資料串流](#examples-transforming-strings-substring-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-transforming-strings-substring-2)

## 步驟 1：建立 Kinesis 資料串流
<a name="examples-transforming-strings-substring-1"></a>

建立 Amazon Kinesis 資料串流，並填入日誌紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 執行下列 Python 程式碼，填入範例日誌記錄。這個簡單的代碼會持續寫入相同的日誌記錄到串流。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-transforming-strings-substring-2"></a>

接下來，建立 Kinesis Data Analytics 應用程式，如下所示：

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**。

1. 在**連接至來源**頁面，執行下列動作：

   1. 選擇您在上一節建立的串流。

   1. 選擇建立 IAM 角色 選項。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述只有一個資料欄。

   1. 選擇**儲存並繼續**。

   

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               SUBSTRING("referrer", 12, (POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4)) 
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. 選擇 **儲存並執行 SQL**。在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：使用 Regex (REGEX\$1REPLACE 函數) 替換子字串
<a name="examples-transforming-strings-regexreplace"></a>

此範例使用 `REGEX_REPLACE` 函數來轉換 Amazon Kinesis Data Analytics 中的字串。 `REGEX_REPLACE` 用替代子字串替換一個子字串。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[REGEX\$1REPLACE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-replace.html)。

在此範例中，將下列記錄寫入 Amazon Kinesis data stream：

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



接著，您可以在主控台上建立 Kinesis Data Analytics 應用程式，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有一個資料欄 (REFERRER) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含推薦網站欄中的 URL 清單。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/referrer-10.png)


然後，將應用程式碼與 `REGEX_REPLACE` 函數搭配使用，以轉換 URL 使用 `https://` 而不是 `http://`。接著將產生的資料插入另一個應用程式內串流，如下所示：



![\[主控台螢幕截圖，顯示產生的資料表，其中帶有列時間、ingest_time 和推薦網站欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_regex_replace.png)


**Topics**
+ [步驟 1：建立 Kinesis 資料串流](#examples-transforming-strings-regexreplace-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-transforming-strings-regexreplace-2)

## 步驟 1：建立 Kinesis 資料串流
<a name="examples-transforming-strings-regexreplace-1"></a>

建立 Amazon Kinesis 資料串流，並填入日誌紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 執行下列 Python 程式碼，以填入範例日誌記錄。這個簡單的代碼會持續寫入相同的日誌記錄到串流。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-transforming-strings-regexreplace-2"></a>

接下來，建立 Kinesis Data Analytics 應用程式，如下所示：

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**。

1. 在**連接至來源**頁面，執行下列動作：

   1. 選擇您在上一節建立的串流。

   1. 選擇建立 IAM 角色 選項。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述只有一個資料欄。

   1. 選擇**儲存並繼續**。

   

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中：

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0)
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. 選擇**儲存並執行 SQL**。在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：根據規則表達式剖析日誌字串 (REGEX\$1LOG\$1PARSE 函數)
<a name="examples-transforming-strings-regexlogparse"></a>

此範例使用 `REGEX_LOG_PARSE` 函數來轉換 Amazon Kinesis Data Analytics 中的字串。 `REGEX_LOG_PARSE` 根據預設 Java 規則表達式模式剖析字串。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[REGEX\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-log-parse.html)。

在此範例中，將下列記錄寫入 Amazon Kinesis 資料串流：

```
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
...
```



接著，在主控台上建立 Kinesis Data Analytics 應用程式，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有一個資料欄 (LOGENTRY) 的應用程式內結構描述，如下所示。

![\[主控台螢幕擷取畫面，顯示帶 LOGENTRY 資料欄的應用程式內結構描述\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_regex_log_parse_0.png)


然後，使用應用程式碼搭配 `REGEX_LOG_PARSE` 函數來剖析日誌字串，以擷取資料元素。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[主控台螢幕擷取畫面顯示產生的資料表，其中包含 ROWTIME、LOGENTRY、MATCH1 和 MATCH2 資料欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_regex_log_parse_1.png)


**Topics**
+ [步驟 1：建立 Kinesis 資料串流](#examples-transforming-strings-regexlogparse-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-transforming-strings-regexlogparse-2)

## 步驟 1：建立 Kinesis 資料串流
<a name="examples-transforming-strings-regexlogparse-1"></a>

建立 Amazon Kinesis 資料串流，並填入日誌紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 執行下列 Python 程式碼，填入範例日誌記錄。這個簡單的代碼會持續寫入相同的日誌記錄到串流。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] "
           '"GET /index.php HTTP/1.1" 200 125 "-" '
           '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-transforming-strings-regexlogparse-2"></a>

接下來，建立 Kinesis Data Analytics 應用程式，如下所示：

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，然後指定應用程式名稱。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**。

1. 在**連接至來源**頁面，執行下列動作：

   1. 選擇您在上一節建立的串流。

   1. 選擇建立 IAM 角色 選項。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述只有一個資料欄。

   1. 選擇**儲存並繼續**。

   

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (logentry VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24));
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM T.LOGENTRY, T.REC.COLUMN1, T.REC.COLUMN2
          FROM 
               (SELECT STREAM LOGENTRY,
                   REGEX_LOG_PARSE(LOGENTRY, '(\w.+) (\d.+) (\w.+) (\w.+)') AS REC
                   FROM SOURCE_SQL_STREAM_001) AS T;
      ```

   1. 選擇 **儲存並執行 SQL**。在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：剖析網頁日誌 (W3C\$1LOG\$1PARSE 函式)
<a name="examples-transforming-strings-w3clogparse"></a>

此範例使用 `W3C_LOG_PARSE` 函數來轉換 Amazon Kinesis Data Analytics 中的串流。您可以使用 `W3C_LOG_PARSE` 來快速格式化 Apache 日誌。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[W3C\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-w3c-log-parse.html)。

在此範例中，將日誌記錄寫入 Amazon Kinesis 資料串流。範例日誌如下所示：

```
{"Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pba.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:03 -0700] "GET /icons/apache_pbb.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:04 -0700] "GET /icons/apache_pbc.gif HTTP/1.1" 304 0"}
...
```



接著，在主控台上建立 Kinesis Data Analytics 應用程式，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並以一個資料欄 (日誌) 推斷應用程式內結構描述，如下所示：

![\[主控台螢幕擷取畫面顯示格式化的串流範例標籤，其中包含帶有日誌欄的應用程式內結構描述。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/log-10.png)


然後，您可以將應用程式碼與 `W3C_LOG_PARSE` 函數搭配使用來剖析日誌，並建立另一個應用程式內串流，其中包含不同資料欄的各種日誌欄位，如下所示：

![\[主控台螢幕擷取畫面，顯示帶有應用程式內串流的即時分析標籤。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/log-20.png)


**Topics**
+ [步驟 1：建立 Kinesis 資料串流](#examples-transforming-strings-w3clogparse-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-transforming-strings-w3clogparse-2)

## 步驟 1：建立 Kinesis 資料串流
<a name="examples-transforming-strings-w3clogparse-1"></a>

建立 Amazon Kinesis 資料串流，並依照下列方式填入日誌記錄：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 執行下列 Python 程式碼，以填入範例日誌記錄。這個簡單的代碼會持續寫入相同的日誌記錄到串流。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] "
           '"GET /icons/apache_pb.gif HTTP/1.1" 304 0'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-transforming-strings-w3clogparse-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**。

1. 在**連接至來源**頁面，執行下列動作：

   1. 選擇您在上一節建立的串流。

   1. 選擇建立 IAM 角色 選項。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述只有一個資料欄。

   1. 選擇**儲存並繼續**。

   

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
      column1 VARCHAR(16),
      column2 VARCHAR(16),
      column3 VARCHAR(16),
      column4 VARCHAR(16),
      column5 VARCHAR(16),
      column6 VARCHAR(16),
      column7 VARCHAR(16));
      
      CREATE OR REPLACE PUMP "myPUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
              SELECT STREAM
                  l.r.COLUMN1,
                  l.r.COLUMN2,
                  l.r.COLUMN3,
                  l.r.COLUMN4,
                  l.r.COLUMN5,
                  l.r.COLUMN6,
                  l.r.COLUMN7
              FROM (SELECT STREAM W3C_LOG_PARSE("log", 'COMMON')
                    FROM "SOURCE_SQL_STREAM_001") AS l(r);
      ```

   1. 選擇 **儲存並執行 SQL**。在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：將字串拆分為多個字段 (VARIABLE\$1COLUMN\$1LOG\$1PARSE 函數)
<a name="examples-transforming-strings-variablecolumnlogparse"></a>

此範例使用 `VARIABLE_COLUMN_LOG_PARSE` 函數來操作 Kinesis Data Analytics 中的字串。`VARIABLE_COLUMN_LOG_PARSE` 將輸入字串分割，變成以分隔符號字元或字串分開的欄位。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[VARIABLE\$1COLUMN\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-variable-column-log-parse.html)。

在此範例中，將半結構化記錄寫入 Amazon Kinesis 資料串流。範例記錄如下：

```
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
```



接著，您可以在主控台上建立 Kinesis Data Analytics 應用程式，並將 Kinesis 資料串流當作串流來源。探索程序會讀取串流來源上的範例記錄，並以四個資料欄推斷應用程式內結構描述，如下所示：

![\[主控台螢幕擷取畫面，顯示帶 4 個資料欄的應用程式內結構描述\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/unstructured-10.png)


然後，您可以將應用程式碼與 `VARIABLE_COLUMN_LOG_PARSE` 函數搭配使用來剖析逗號分隔的值，並將正規化的列插入另一個應用程式內串流，如下所示：



![\[主控台螢幕擷取畫面，顯示帶有應用程式內串流的即時分析標籤。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/unstructured-20.png)


**Topics**
+ [步驟 1：建立 Kinesis 資料串流](#examples-transforming-strings-variablecolumnlogparse-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-transforming-strings-variablecolumnlogparse-2)

## 步驟 1：建立 Kinesis 資料串流
<a name="examples-transforming-strings-variablecolumnlogparse-1"></a>

建立 Amazon Kinesis 資料串流，並填入日誌紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 執行下列 Python 程式碼，以填入範例日誌記錄。這個簡單的代碼會持續寫入相同的日誌記錄到串流。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

   

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-transforming-strings-variablecolumnlogparse-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**。

1. 在**連接至來源**頁面，執行下列動作：

   1. 選擇您在上一節建立的串流。

   1. 選擇建立 IAM 角色 選項。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。請注意，推斷的結構描述只有一個資料欄。

   1. 選擇**儲存並繼續**。

   

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，撰寫應用程式碼，然後驗證結果：

   1. 請複製以下應用程式碼，然後貼到編輯器中：

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
                  "column_A" VARCHAR(16),
                  "column_B" VARCHAR(16),
                  "column_C" VARCHAR(16),
                  "COL_1" VARCHAR(16),             
                  "COL_2" VARCHAR(16),            
                  "COL_3" VARCHAR(16));
      
      CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM  t."Col_A", t."Col_B", t."Col_C",
                        t.r."COL_1", t.r."COL_2", t.r."COL_3"
         FROM (SELECT STREAM 
                 "Col_A", "Col_B", "Col_C",
                 VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
                                           'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)',
                                           ',') AS r 
               FROM "SOURCE_SQL_STREAM_001") as t;
      ```

   1. 選擇 **儲存並執行 SQL**。在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：轉換 DateTime 值
<a name="app-string-datetime-manipulation"></a>

Amazon Kinesis Data Analytics 支援將資料欄轉換為時間戳記。舉例來說，除了 `ROWTIME` 欄之外，您可能想要使用自己的時間戳記做為 `GROUP BY` 子句的一部分，將其當作另一個以時間為基礎的視窗。Kinesis Data Analytics 提供操作和 SQL 函數，用於處理日期和時間欄位。
+ **日期和時間運算子**：您可以對日期、時間和間隔資料類型執行算術運算。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[日期、時間和間隔運算子](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-timestamp-interval.html)。

   
+ **SQL 函數**：包括以下項目。如需詳細資訊，請參閱* Amazon Managed Service for Apache Flink SQL 參考資料*中的[日期和時間函數](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html)。
  + `EXTRACT()` - 從日期、時間、時間戳記或間隔表達式中擷取一個欄位。
  + `CURRENT_TIME` - 傳回查詢執行的時間 (UTC)。
  + `CURRENT_DATE` - 傳回查詢執行的日期 (UTC)。
  + `CURRENT_TIMESTAMP` - 傳回查詢執行的時間戳記 (UTC)。
  + `LOCALTIME` - 傳回由 Kinesis Data Analytics 運行環境定義，查詢執行時的目前時間 (UTC)。
  + `LOCALTIMESTAMP` - 傳回由 Kinesis Data Analytics 運行環境定義的目前時間戳記 (UTC)。

     
+ **SQL 擴充功能**：包括下列項目。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[日期和時間函數](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html)，以及[日期時間轉換參考資料](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-datetime-conversion-functions.html)。
  + `CURRENT_ROW_TIMESTAMP` - 傳回串流中每一列的新時間戳記。
  + `TSDIFF` - 傳回兩個時間戳記的差值 (以毫秒為單位)。
  + `CHAR_TO_DATE` - 將字串轉換為日期。
  + `CHAR_TO_TIME` - 將字串轉換為時間。
  + `CHAR_TO_TIMESTAMP` - 將字串轉換為時間戳記。
  + `DATE_TO_CHAR` - 將日期轉換為字串。
  + `TIME_TO_CHAR` - 將時間轉換為字串。
  + `TIMESTAMP_TO_CHAR` - 將時間戳記轉換為字串。

先前大多數 SQL 函數使用格式來轉換資料欄。此格式具靈活度。舉例來說，您可以指定格式 `yyyy-MM-dd hh:mm:ss` 將輸入字串 `2009-09-16 03:15:24` 轉換為時間戳記。如需詳細資訊，請參閱* Amazon Managed Service for Apache Flink SQL 參考資料*中的[文字至時間戳記 (Sys)](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-char-to-timestamp.html)。

## 範例：轉換日期
<a name="examples-transforming-dates"></a>

在此範例中，將下列記錄寫入 Amazon Kinesis 資料串流。

```
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"}
{"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"}
{"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"}
...
```



接著，您可以在主控台上建立 Kinesis Data Analytics 應用程式，並將 Kinesis 串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有兩個資料欄 (`EVENT_TIME` 和 `TICKER`) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含事件時間和股票代碼欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_datetime_convert_0.png)


然後，將應用程式碼與 SQL 函數搭配使用，以各種方式轉換 `EVENT_TIME` 時間戳記欄位。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_datetime_convert_1.png)




### 步驟 1：建立 Kinesis 資料串流
<a name="examples-transforming-dates-1"></a>

建立 Amazon Kinesis 資料串流，並將事件時間和股票記錄填入其中，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。

1. 執行下列 Python 程式碼，以使用範例資料填入串流。這個簡單的程式碼，會持續將帶有隨機股票代號和當前時間戳記的記錄寫入串流。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

### 步驟 2：建立 Amazon Kinesis Data Analytics 應用程式
<a name="examples-transforming-dates-2"></a>

建立應用程式，如下所示：

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   1. 選擇您在上一節建立的串流。

   1. 選擇建立 IAM 角色。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述，以及用來推斷建立的應用程式內串流結構描述之範例記錄。推斷的結構描述有兩個資料欄。

   1. 選擇**編輯結構描述**。將 **EVENT\$1TIME** 欄的**欄類型**變更為 `TIMESTAMP`。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

   

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          TICKER VARCHAR(4), 
          event_time TIMESTAMP, 
          five_minutes_before TIMESTAMP, 
          event_unix_timestamp BIGINT,
          event_timestamp_as_char VARCHAR(50),
          event_second INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      
      SELECT STREAM 
          TICKER, 
          EVENT_TIME,
          EVENT_TIME - INTERVAL '5' MINUTE,
          UNIX_TIMESTAMP(EVENT_TIME),
          TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
          EXTRACT(SECOND FROM EVENT_TIME) 
      FROM "SOURCE_SQL_STREAM_001"
      ```

   1. 選擇 **儲存並執行 SQL**。在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：轉換多個資料類型
<a name="app-tworecordtypes"></a>

 擷取、轉換和載入 (ETL) 應用程式的常見需求，是在串流來源上處理多個記錄類型。您可以建立 Kinesis Data Analytics 應用程式來處理這些類型的串流來源。程序如下：

1. 首先，將串流來源對應到應用程式內輸入串流，類似於所有其他 Kinesis Data Analytics 應用程式。

1. 然後，在應用程式碼中撰寫 SQL 陳述式，從應用程式內輸入串流擷取特定類型的資料列。接著將它們插入個別的應用程式內串流。(您可以在應用程式碼中建立其他應用程式內串流。)

在本練習中，您有一個接收兩種類型 (`Order` 和 `Trade`) 記錄的串流來源。這些是股票訂單和相應的交易。針對每個訂單，可能有零個或多個交易。每種類型的範例記錄如下所示：

**訂單記錄**

```
{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}
```

**交易記錄**

```
{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}
```

當您使用 建立應用程式時 AWS 管理主控台，主控台會針對建立的應用程式內輸入串流顯示下列推斷結構描述。根據預設，主控台會命名此應用程式內串流 `SOURCE_SQL_STREAM_001`。

![\[顯示格式化的應用程式內串流範例之主控台螢幕擷取畫面\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/two-record-types-10.png)


儲存組態時，Amazon Kinesis Data Analytics 會持續從串流來源讀取資料，並在應用程式內串流中插入資料列。您現在可以對應用程式內串流中的資料執行分析。

在此範例的應用程式碼中，先建立兩個額外的應用程式內串流，`Order_Stream` 和 `Trade_Stream`。然後，您可以根據記錄類型從 `SOURCE_SQL_STREAM_001` 串流中篩選列，並使用幫浦將它們插入新建立的串流中。如需此編碼模式的相關資訊，請參閱 [應用程式碼](how-it-works-app-code.md)。

1. 將訂單和交易列過濾到個別的應用程式內串流：

   1. 篩選 `SOURCE_SQL_STREAM_001` 中的訂單記錄，並將訂單儲存在 `Order_Stream` 中。

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  order_id     integer, 
                  order_type   varchar(10),
                  ticker       varchar(4),
                  order_price  DOUBLE, 
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM oid, otype,oticker, oprice, recordtype 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Order';
      ```

   1. 篩選 `SOURCE_SQL_STREAM_001` 中的交易記錄，並將訂單儲存在 `Trade_Stream` 中。

      ```
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 (trade_id     integer, 
                  order_id     integer, 
                  trade_price  DOUBLE, 
                  ticker       varchar(4),
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM tid, toid, tprice, tticker, recordtype
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Trade';
      ```

1. 現在，您可以對這些串流執行其他分析。在此範例，於一分鐘的[翻轉視窗](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/tumbling-window-concepts.html)中計算股票代碼的交易數量，並將結果保存到另一個 `DESTINATION_SQL_STREAM` 串流中。

   ```
   --do some analytics on the Trade_Stream and Order_Stream. 
   -- To see results in console you must write to OPUT_SQL_STREAM.
   
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
               ticker  varchar(4),
               trade_count   integer
               );
   
   CREATE OR REPLACE PUMP "Output_Pump" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker, count(*) as trade_count
         FROM   "Trade_Stream"
         GROUP BY ticker,
                   FLOOR("Trade_Stream".ROWTIME TO MINUTE);
   ```

   您會看到下列結果。  
![\[顯示 SQL 結果標籤上結果的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/two-record-types-20.png)

**Topics**
+ [步驟 1：準備資料](tworecordtypes-prepare.md)
+ [步驟 2：建立應用程式](tworecordtypes-create-app.md)

**後續步驟**  
[步驟 1：準備資料](tworecordtypes-prepare.md)

# 步驟 1：準備資料
<a name="tworecordtypes-prepare"></a>

在本節中，建立 Kinesis 資料串流，然後在串流中填入訂單和交易記錄。這是您在下一個步驟中建立之應用程式的串流來源。

**Topics**
+ [步驟 1.1：建立串流來源](#tworecordtypes-prepare-create-stream)
+ [步驟 1.2：填入串流來源](#tworecordtypes-prepare-populate-stream)

## 步驟 1.1：建立串流來源
<a name="tworecordtypes-prepare-create-stream"></a>

您可以使用主控台或 AWS CLI建立 Kinesis 資料串流。此範例假設 `OrdersAndTradesStream` 為串流名稱。
+ **使用 主控台** – 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。選擇**資料串流**，然後建立具有一個碎片的串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。
+ **使用 AWS CLI** – 使用下列 Kinesis `create-stream` AWS CLI 命令來建立串流：

  ```
  $ aws kinesis create-stream \
  --stream-name OrdersAndTradesStream \
  --shard-count 1 \
  --region us-east-1 \
  --profile adminuser
  ```

## 步驟 1.2：填入串流來源
<a name="tworecordtypes-prepare-populate-stream"></a>

執行下列 Python 指令碼，將範例記錄填入 `OrdersAndTradesStream`。如果您使用不同的名稱建立串流，請更新相應的 Python 程式碼。

1. 安裝 Python 與 `pip`。

   如需安裝 Python 的相關資訊，請參閱 [Python](https://www.python.org/) 網站。

   您可以使用 Pip 安裝相依性。如需安裝 Pip 的詳細資訊，請參閱 Pip 網站的[安裝](https://pip.pypa.io/en/stable/installing/)。

1. 執行以下 Python 程式碼。程式碼中的 `put-record` 命令會將 JSON 記錄寫入串流。

   ```
    
   import json
   import random
   import boto3
   
   STREAM_NAME = "OrdersAndTradesStream"
   PARTITION_KEY = "partition_key"
   
   
   def get_order(order_id, ticker):
       return {
           "RecordType": "Order",
           "Oid": order_id,
           "Oticker": ticker,
           "Oprice": random.randint(500, 10000),
           "Otype": "Sell",
       }
   
   
   def get_trade(order_id, trade_id, ticker):
       return {
           "RecordType": "Trade",
           "Tid": trade_id,
           "Toid": order_id,
           "Tticker": ticker,
           "Tprice": random.randint(0, 3000),
       }
   
   
   def generate(stream_name, kinesis_client):
       order_id = 1
       while True:
           ticker = random.choice(["AAAA", "BBBB", "CCCC"])
           order = get_order(order_id, ticker)
           print(order)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY
           )
           for trade_id in range(1, random.randint(0, 6)):
               trade = get_trade(order_id, trade_id, ticker)
               print(trade)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(trade),
                   PartitionKey=PARTITION_KEY,
               )
           order_id += 1
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**後續步驟**  
 [步驟 2：建立應用程式](tworecordtypes-create-app.md)

# 步驟 2：建立應用程式
<a name="tworecordtypes-create-app"></a>

在本節建立 Amazon Kinesis Data Analytics 應用程式。然後新增輸入組態，將上一節建立的串流來源對應至應用程式內輸入串流，藉此更新應用程式。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式** 。此範例使用應用程式名稱 **ProcessMultipleRecordTypes**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   1. 選擇您在 [步驟 1：準備資料](tworecordtypes-prepare.md) 中建立的串流。

   1. 選擇建立 IAM 角色。

   1. 等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。

   1. 選擇**儲存並繼續**。

1. 在應用程式中樞，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  "order_id"     integer, 
                  "order_type"   varchar(10),
                  "ticker"       varchar(4),
                  "order_price"  DOUBLE, 
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Order';
      --********************************************
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 ("trade_id"     integer, 
                  "order_id"     integer, 
                  "trade_price"  DOUBLE, 
                  "ticker"       varchar(4),
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType"
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Trade';
      --*****************************************************************
      --do some analytics on the Trade_Stream and Order_Stream. 
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                  "ticker"  varchar(4),
                  "trade_count"   integer
                  );
      
      CREATE OR REPLACE PUMP "Output_Pump" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM "ticker", count(*) as trade_count
            FROM   "Trade_Stream"
            GROUP BY "ticker",
                      FLOOR("Trade_Stream".ROWTIME TO MINUTE);
      ```

   1. 選擇 **儲存並執行 SQL**。選擇**即時分析**標籤，查看應用程式建立的所有應用程式內串流，並驗證資料。

   

**後續步驟**  
您可以設定應用程式輸出，將結果保留到外部目的地，例如另一個 Kinesis 串流或 Firehose 資料交付串流。

# 範例：視窗與彙總
<a name="examples-window"></a>

本節提供使用視窗語彙總查詢的 Amazon Kinesis Data Analytics 應用程式範例。(如需詳細資訊，請參閱 [窗口化查詢](windowed-sql.md)。) 每個範例皆包含逐步指示與範例程式碼，協助您建立 Kinesis Data Analytics 應用程式並測試結果。

**Topics**
+ [範例：交錯視窗](examples-window-stagger.md)
+ [範例：使用列時間的輪轉窗口](examples-window-tumbling-rowtime.md)
+ [範例：使用事件時間戳記的輪轉窗口](examples-window-tumbling-event.md)
+ [範例：擷取最常發生的值 (TOP\$1K\$1ITEMS\$1TUMBLING)](examples-window-topkitems.md)
+ [範例：從查詢彙總部分結果](examples-window-partialresults.md)

# 範例：交錯視窗
<a name="examples-window-stagger"></a>

當視窗化查詢為每個不同的分割區索引鍵處理個別視窗時，從具有相符索引鍵的資料到達時開始，即為*交錯視窗*。如需詳細資訊，請參閱 [交錯窗口](stagger-window-concepts.md)。這個 Amazon Kinesis Data Analytics 範例使用 EVENT\$1TIME 和 TICKER 欄來建立交錯視窗。來源串流包含六個記錄的群組，其中有相同的 EVENT\$1TIME 和 TICKER 值，這些資料會在一分鐘內到達，但不一定具有相同的分鐘值 (例如 `18:41:xx`)。

在此範例中，將下列記錄於指定時間寫入 Amazon Kinesis 資料串流。指令碼不會將時間寫入串流，但應用程式擷取記錄的時間會寫入 `ROWTIME` 欄位：

```
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:30
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:40
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:50
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:00
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:10
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:21
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:31
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:41
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:51
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:01
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:11
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:21
...
```



然後，您可以在 中建立 Kinesis Data Analytics 應用程式 AWS 管理主控台，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有兩個資料欄 (`EVENT_TIME` 和 `TICKER`) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含價格和股票代碼欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_stagger_schema.png)


您可以將應用程式碼與 `COUNT` 函數搭配使用，以建立資料的視窗化彙總。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_stagger.png)


在下列程序中，您會建立 Kinesis Data Analytics 應用程式，根據 EVENT\$1TIME 和 TICKER，在交錯視窗彙總輸入串流中的值。

**Topics**
+ [步驟 1：建立 Kinesis Data Stream](#examples-stagger-window-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-stagger-window-2)

## 步驟 1：建立 Kinesis Data Stream
<a name="examples-stagger-window-1"></a>

建立 Amazon Kinesis 資料串流，並填入紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 若要在生產環境中將記錄寫入 Kinesis 資料串流，建議您使用 [Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 或 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。為了簡單起見，這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會在一分鐘的時間內，連續將具有相同隨機 `EVENT_TIME` 和股票代號的六筆記錄寫入串流。讓指令碼持續執行，以便在稍後的步驟產生應用程式結構描述。

   ```
    
   import datetime
   import json
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
       return {
           "EVENT_TIME": event_time.isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           # Send six records, ten seconds apart, with the same event time and ticker
           for _ in range(6):
               print(data)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(data),
                   PartitionKey="partitionkey",
               )
               time.sleep(10)
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-stagger-window-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   

   1. 選擇您在上一節建立的串流。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有兩個資料欄。

   1. 選擇**編輯結構描述**。將 **EVENT\$1TIME** 欄的**欄類型**變更為 `TIMESTAMP`。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          event_time TIMESTAMP,
          ticker_symbol    VARCHAR(4),
          ticker_count     INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM 
              EVENT_TIME, 
              TICKER,
              COUNT(TICKER) AS ticker_count
          FROM "SOURCE_SQL_STREAM_001"
          WINDOWED BY STAGGER (
                  PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
      ```

   1. 選擇 **儲存並執行 SQL**。

      在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：使用列時間的輪轉窗口
<a name="examples-window-tumbling-rowtime"></a>

當窗口查詢以非重疊的方式處理每個窗口時，即稱作*輪轉窗口*。如需詳細資訊，請參閱[輪轉窗口（使用 GROUP BY 彙總）](tumbling-window-concepts.md)。此 Amazon Kinesis Data Analytics 範例使用 `ROWTIME` 資料欄來建立輪轉窗口。該 `ROWTIME` 欄示應用程式讀取記錄的時間。

在此範例中，將下列記錄寫入 Amazon Kinesis 資料串流。

```
{"TICKER": "TBV", "PRICE": 33.11}
{"TICKER": "INTC", "PRICE": 62.04}
{"TICKER": "MSFT", "PRICE": 40.97}
{"TICKER": "AMZN", "PRICE": 27.9}
...
```



然後，您可以在 中建立 Kinesis Data Analytics 應用程式 AWS 管理主控台，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有兩個資料欄 (`TICKER` 和 `PRICE`) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含價格和股票代碼欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime_schema.png)


您可以將應用程式碼與 `MIN` 和 `MAX` 函數搭配使用，以建立資料的窗口化彙總。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime.png)


在下列程序中，建立 Kinesis Data Analytics 應用程式，以根據 ROWTIME 在輪轉窗口彙總輸入串流中的值。

**Topics**
+ [步驟 1：建立 Kinesis Data Stream](#examples-tumbling-window-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-tumbling-window-2)

## 步驟 1：建立 Kinesis Data Stream
<a name="examples-tumbling-window-1"></a>

建立 Amazon Kinesis 資料串流，並填入紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 若要在生產環境中將記錄寫入 Kinesis 資料串流，建議您使用 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。為了簡單起見，這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會持續將隨機股票代號記錄寫入串流。讓指令碼持續執行，以便在稍後的步驟產生應用程式結構描述。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-tumbling-window-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   

   1. 選擇您在上一節建立的串流。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有兩個資料欄。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE)
              FROM "SOURCE_SQL_STREAM_001"
              GROUP BY TICKER, 
                  STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
      ```

   1. 選擇 **儲存並執行 SQL**。

      在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：使用事件時間戳記的輪轉窗口
<a name="examples-window-tumbling-event"></a>

當窗口查詢以非重疊的方式處理每個窗口時，即稱作*輪轉窗口*。如需詳細資訊，請參閱[輪轉窗口（使用 GROUP BY 彙總）](tumbling-window-concepts.md)。這個 Amazon Kinesis Data Analytics 範例示範使用事件時間戳記 (即使用者建立，包含在串流資料中的時間戳記) 的輪轉窗口。範例中使用這種方法，而非僅使用 ROWTIME，即 Kinesis Data Analytics 在應用程式收到記錄時所建立的時間戳記。如果您想要根據事件發生的時間 (而非應用程式收到事件的時間) 建立彙總，可在串流資料中使用事件時間戳記。在此範例中，`ROWTIME` 值會每分鐘觸發彙總，而記錄會依照 `ROWTIME` 和包含的事件時間兩者進行彙總。

在此範例中，將下列記錄寫入 Amazon Kinesis 串流：此 `EVENT_TIME` 值設定為過去 5 秒，以模擬處理和傳輸延遲，這些延遲可能會造成事件發生與記錄擷取至 Kinesis Data Analytics 的時間差。

```
{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65}
{"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61}
{"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48}
{"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64}
...
```



然後，您可以在 中建立 Kinesis Data Analytics 應用程式 AWS 管理主控台，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有三個資料欄 (`EVENT_TIME`、`TICKER` 和`PRICE`) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含事件時間、股票代號以及價格欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_tumbling_event_schema.png)


您可以將應用程式碼與 `MIN` 和 `MAX` 函數搭配使用，以建立資料的窗口化彙總。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_tumbling_event.png)


在下列程序中，建立 Kinesis Data Analytics 應用程式，該應用程式會根據事件時間在輪轉窗口中彙總輸入串流的值。

**Topics**
+ [步驟 1：建立 Kinesis Data Stream](#examples-window-tumbling-event-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-window-tumbling-event-2)

## 步驟 1：建立 Kinesis Data Stream
<a name="examples-window-tumbling-event-1"></a>

建立 Amazon Kinesis 資料串流，並填入紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 若要在生產環境中將記錄寫入 Kinesis 資料串流，建議您使用 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。為了簡單起見，這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會持續將隨機股票代號記錄寫入串流。讓指令碼持續執行，以便在稍後的步驟產生應用程式結構描述。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-window-tumbling-event-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   

   1. 選擇您在上一節建立的串流。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有三個資料欄。

   1. 選擇**編輯結構描述**。將 **EVENT\$1TIME** 欄的**欄類型**變更為 `TIMESTAMP`。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND),
              TICKER,
               MIN(PRICE) AS MIN_PRICE,
               MAX(PRICE) AS MAX_PRICE
          FROM    "SOURCE_SQL_STREAM_001"
          GROUP BY TICKER, 
                   STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), 
                   STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
      ```

   1. 選擇 **儲存並執行 SQL**。

      在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：擷取最常發生的值 (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="examples-window-topkitems"></a>

這個 Amazon Kinesis Data Analytics 範例示範如何使用 `TOP_K_ITEMS_TUMBLING` 函數擷取輪轉窗口中最常出現的值。如需詳細資訊，請參閱《Amazon Managed Service for Apache Flink SQL 參考資料》**中的 [`TOP_K_ITEMS_TUMBLING` 函數](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/top-k.html)。

此 `TOP_K_ITEMS_TUMBLING` 功能在彙總超過數萬或數十萬個金鑰，且想要減少資源使用量時非常有用。該函數產生與 `GROUP BY` 和 `ORDER BY` 子句匯總相同的結果。

在此範例中，將下列記錄寫入 Amazon Kinesis 資料串流：

```
{"TICKER": "TBV"}
{"TICKER": "INTC"}
{"TICKER": "MSFT"}
{"TICKER": "AMZN"}
...
```



然後，您可以在 中建立 Kinesis Data Analytics 應用程式 AWS 管理主控台，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並以一個資料欄 (`TICKER`) 推斷應用程式內結構描述，如下所示：

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含股票代碼欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_topk_schema.png)


您可以將應用程式碼與 `TOP_K_VALUES_TUMBLING` 函數搭配使用，以建立資料的視窗化彙總。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_topk.png)


在下列程序中，建立 Kinesis Data Analytics 應用程式，以擷取輸入串流中最常出現的值。

**Topics**
+ [步驟 1：建立 Kinesis Data Stream](#examples-window-topkitems-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-window-topkitems-2)

## 步驟 1：建立 Kinesis Data Stream
<a name="examples-window-topkitems-1"></a>

建立 Amazon Kinesis 資料串流，並填入紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 若要在生產環境中將記錄寫入 Kinesis 資料串流，建議您使用 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。為了簡單起見，這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會持續將隨機股票代號記錄寫入串流。讓指令碼保持執行，以便在稍後的步驟中產生應用程式結構描述。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-window-topkitems-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   

   1. 選擇您在上一節建立的串流。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有一個資料欄。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後在編輯器中貼上。

      ```
      CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
        "TICKER" VARCHAR(4), 
        "MOST_FREQUENT_VALUES" BIGINT
      );
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
          INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM * 
              FROM TABLE (TOP_K_ITEMS_TUMBLING(
                  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
                  'TICKER',         -- name of column in single quotes
                  5,                       -- number of the most frequently occurring values
                  60                       -- tumbling window size in seconds
                  )
              );
      ```

   1. 選擇 **儲存並執行 SQL**。

      在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：從查詢彙總部分結果
<a name="examples-window-partialresults"></a>

如 Amazon Kinesis 資料串流中有事件時間與擷取時間不完全相符的記錄，輪轉窗口中一些結果抵達，但不一定在窗口中發生。在這種情況下，輪轉窗口只會包含您想要的部分結果集。您可以使用以下幾種方法來修正此問題：
+ 僅使用輪轉窗口，並用 upserts 透過資料庫或資料倉儲彙總部分後處理結果。這種方法在處理應用程式時非常有效。它會無限期地處理彙總運算子 (`sum`、`min`、`max` 等等) 的延遲資料。這種方法的缺點是，您必須在資料庫層開發和維護其他應用程式邏輯。
+ 使用輪轉和滑動窗口，可提早產生部分結果，但在滑動窗口期間也會繼續產生完整的結果。這種方法使用複寫，而非 upsert 處理延遲資料，這樣就不需要在資料庫層添加額外的應用程式邏輯。此方法的缺點是它使用了更多的 Kinesis 處理單元 (KPU)，且依舊產生兩個結果，這可能不適用於某些使用案例。

如需輪轉與滑動窗口的詳細資訊，請參閱 [窗口化查詢](windowed-sql.md)。

在下列程序中，輪轉窗口彙總會產生兩個部分結果 (傳送至 `CALC_COUNT_SQL_STREAM` 應用程式內串流)，必須結合才能產生最終結果。接著，應用程式會產生第二個彙總 (傳送至 `DESTINATION_SQL_STREAM` 應用程式內串流)，結合兩個部分結果。

**若要建立使用事件時間彙總部分結果的應用程式**

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中，選擇**資料分析**。建立 Kinesis Data Analytics 應用程式，如 [Amazon Kinesis Data Analytics for SQL 應用程式入門](getting-started.md) 教學課程所述。

1. 在 SQL 編輯器中，以下列項目取代應用程式碼：

   ```
   CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);
   	            
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);            
   	
   CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
       INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER_SYMBOL",
           STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
           COUNT(*) AS "TickerCount"
       FROM "SOURCE_SQL_STREAM_001"
       GROUP BY
           STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE),
           STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
           TICKER_SYMBOL;
   
   CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER",
           "TRADETIME",
           SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
       FROM "CALC_COUNT_SQL_STREAM"
       WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);
   ```

   應用程式碼中的 `SELECT` 陳述式會篩選 `SOURCE_SQL_STREAM_001` 中的資料欄，找出變更大於 1% 的股票價格，並使用幫浦將這些資料列插入另一個應用程式內串流 `CHANGE_STREAM`。

1. 選擇 **儲存並執行 SQL**。

第一個幫浦會將串流輸出至 `CALC_COUNT_SQL_STREAM` ，類似下列內容。請注意，結果集不完整：

![\[主控台螢幕擷取畫面顯示部分結果。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_partial_0.png)


然後，第二個幫浦輸出一個包含完整結果集的串流至 `DESTINATION_SQL_STREAM`：

![\[主控台螢幕擷取畫面顯示完整結果。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_partial_1.png)


# 範例：聯結
<a name="examples-joins"></a>

本節提供使用聯結查詢的 Kinesis Data Analytics 應用程式範例。每個範例皆包含逐步指示，協助您建立 Kinesis Data Analytics 應用程式並測試結果。

**Topics**
+ [範例：將參考資料新增至 Kinesis Data Analytics 應用程式](app-add-reference-data.md)

# 範例：將參考資料新增至 Kinesis Data Analytics 應用程式
<a name="app-add-reference-data"></a>

在本練習中，將參考資料新增至現有的 Kinesis Data Analytics 應用程式。如需參考資料的相關資訊，請參閱下列主題：
+ [Amazon Kinesis Data Analytics for SQL 應用程式：運作方式](how-it-works.md)
+ [設定應用程式輸入](how-it-works-input.md)

在本練習中，將參考資料新增至您在 Kinesis Data Analytics [入門](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)練習中建立的應用程式。參考資料提供每個股票代號的公司名稱，例如：

```
Ticker, Company
AMZN,Amazon
ASD, SomeCompanyA
MMB, SomeCompanyB
WAS,  SomeCompanyC
```

首先，完成[入門](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)練習中的步驟，以建立入門應用程式。接著請依照這些步驟設定參考資料，並將其新增至您的應用程式：

1. **準備資料**
   + 將先前的參考資料做為物件存放在 Amazon Simple Storage Service (Amazon S3)。
   + 建立 Kinesis Data Analytics 可擔任的 IAM 角色，以代您讀取 Amazon S3 物件。

1. **將參考資料來源新增到應用程式。**

   Kinesis Data Analytics 會讀取 Amazon S3 物件，並建立應用程式內參考資料表，讓您在應用程式碼中查詢。

1. **測試代碼。**

   在應用程式碼中，撰寫聯結查詢，將應用程式內串流與應用程式內參考資料表聯結，以取得每個股票代號的公司名稱。

**Topics**
+ [步驟 1：準備](#add-refdata-prepare)
+ [步驟 2：將參考資料來源新增至應用程式組態](#add-refdata-create-iamrole)
+ [步驟 3：測試：查詢應用程式內參考資料表](#add-refdata-test)

## 步驟 1：準備
<a name="add-refdata-prepare"></a>

在本節中，將範例參考資料做為物件存放在 Amazon S3 儲存貯體中。同時建立 Kinesis Data Analytics 可擔任的 IAM 角色，以代您讀取物件。

### 將參考資料存放為 Amazon S3 物件
<a name="prepare-create-s3object"></a>

在本節中，將範例參考資料做為 Amazon S3 物件儲存。

1. 開啟文字編輯器，加入以下資料，並將檔案儲存為 `TickerReference.csv`。

   ```
   Ticker, Company
   AMZN,Amazon
   ASD, SomeCompanyA
   MMB, SomeCompanyB
   WAS,  SomeCompanyC
   ```

   

1. 將 `TickerReference.csv` 檔案上傳至 S3 儲存貯體。如需指示說明，請參閱 *Amazon Simple Storage Service 使用者指南*中的[上傳物件至 Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UploadingObjectsintoAmazonS3.html)。

### 建立 IAM 角色
<a name="prepare-create-iamrole"></a>

接下來，建立 Kinesis Data Analytics 可擔任的 IAM 角色，並讀取 Amazon S3 物件。

1. 在 AWS Identity and Access Management (IAM) 中，建立名為 的 IAM 角色**KinesisAnalytics-ReadS3Object**。若要建立角色，請按照 *IAM 使用者指南*中[建立 Amazon 服務的角色 (AWS 管理主控台)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html#roles-creatingrole-service-console) 所述指示操作。

   在 IAM 主控台，指定下列項目：
   + 在**選取角色類型**中，選擇 **AWS Lambda**。建立角色之後，您將變更信任政策，以允許 Kinesis Data Analytics （而非 AWS Lambda) 擔任該角色。
   + 請勿在**附加政策**頁面附加任何政策。

1. 更新 IAM 角色政策：

   

   1. 在 IAM 主控台中，選擇您剛建立的角色。

   1. 在**信任關係**標籤上，更新信任政策以授與 Kinesis Data Analytics 權限來擔任該角色。信任政策如下所示：

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Principal": {
              "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
          }
        ]
      }
      ```

------

      

   1. 在**許可**標籤上，附加一個名為 **AmazonS3ReadOnlyAccess** 的 Amazon 受管政策。此舉會授予該角色讀取 Amazon S3 物件的許可。此政策如下所示。

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "s3:Get*",
              "s3:List*"
            ],
            "Resource": "*"
          }
        ]
      }
      ```

------

## 步驟 2：將參考資料來源新增至應用程式組態
<a name="add-refdata-create-iamrole"></a>

在此步驟中，將參考資料來源新增至您的應用程式組態。首先，您需要下列資訊：
+ 您的 S3 儲存貯體名稱和物件金鑰名稱
+ IAM 角色 Amazon Resource Name (ARN)。

1. 在應用程式的主頁面中，選擇**連接參考資料**。

1. 在**連接參考資料來源**頁面中，選擇包含參考資料物件的 Amazon S3 儲存貯體，然後輸入物件的金鑰名稱。

1. 輸入 **CompanyName** 作為**應用程式內參考表名稱**。

1. 在**存取所選資源**區段中，選擇**從 Kinesis Analytics 可以擔任的 IAM 角色中選擇**，然後選擇您在上一節建立的 **KinesisAnalytics-ReadS3Object** IAM 角色。

1. 選擇**探索結構描述**。主控台檢測到參考資料中的兩欄。

1. 選擇**儲存與關閉**。

## 步驟 3：測試：查詢應用程式內參考資料表
<a name="add-refdata-test"></a>

您現在可查詢應用程式內參考資料表 `CompanyName`。您可以使用參考資訊，將股票價格資料與參考資料表聯結在一起，以豐富您的應用程式。結果會顯示公司名稱。

1. 以下列代碼取代您的應用程式碼。查詢會將應用程式內輸入串流與應用程式內參考資料表聯結。應用程式碼會將結果寫入另一個應用程式內串流 `DESTINATION_SQL_STREAM`。

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE);
   
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      SELECT STREAM ticker_symbol, "c"."Company", sector, change, price
      FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c"
      ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
   ```

1. 確認應用程式輸出是否顯示在 **SQLResults** 標籤中。請確定某些資料欄顯示公司名稱 (範例參考資料並未包含所有公司名稱)。

# 範例：機器學習
<a name="examples-machine"></a>

本節提供使用機器學習查詢的 Amazon Kinesis Data Analytics 應用程式範例。機器學習查詢會對資料執行複雜的分析，依賴串流中資料的歷史記錄來尋找不尋常的模式。每個範例皆包含逐步指示，協助您建立 Kinesis Data Analytics 應用程式並測試結果。

**Topics**
+ [範例：偵測串流上的資料異常 (RANDOM\$1CUT\$1FOREST 函數)](app-anomaly-detection.md)
+ [範例：偵測資料異常並取得說明 (RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION 函數)](app-anomaly-detection-with-explanation.md)
+ [範例：偵測串流上的熱點 (熱點功能)](app-hotspots-detection.md)

# 範例：偵測串流上的資料異常 (RANDOM\$1CUT\$1FOREST 函數)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics 提供函數 (`RANDOM_CUT_FOREST`)，可根據數值欄中的值為每筆記錄指派異常分數。如需詳細資訊，請參閱* Amazon Managed Service for Apache Flink SQL 參考資料*中的 [`RANDOM_CUT_FOREST` 函數](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)。

在本練習中，撰寫應用程式碼，將異常分數指派給應用程式串流來源上的記錄。若要設定應用程式，請執行下列動作：

1. **設定串流來源**：設定 Kinesis 資料串流並寫入範例 `heartRate` 資料，如下所示：

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   此程序會提供 Python 指令碼供您填入串流。這些 `heartRate` 值是隨機產生的，99% 記錄的 `heartRate` 值介於 60 到 100 之間，而只有 1% 的 `heartRate` 值介於 150 和 200 之間。因此，`heartRate` 值介於 150 和 200 之間的記錄為異常。

1. **設定輸入**：使用主控台可建立 Kinesis Data Analytics 應用程式，並透過將串流來源對應至應用程式內串流 (`SOURCE_SQL_STREAM_001`) ，以設定應用程式輸入。當應用程式啟動時，Kinesis Data Analytics 會持續讀取串流來源，並將記錄插入應用程式內串流。

1. **指定應用程式碼**：此範例使用下列應用程式碼：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   程式碼會讀取 `SOURCE_SQL_STREAM_001` 中的資料列、指派異常分數，然後將產生的資料欄寫入另一個應用程式內串流 (`TEMP_STREAM`)。然後，應用程式碼會排序 `TEMP_STREAM` 中的記錄，並將結果儲存到另一個應用程式內串流 (`DESTINATION_SQL_STREAM`)。您可以使用幫浦將資料列插入應用程式內串流。如需詳細資訊，請參閱[應用程式內串流與幫浦](streams-pumps.md)。

1. **配置輸出**：設定將應用程式輸出，將 `DESTINATION_SQL_STREAM` 中的資料保存在外部目的地，即另一個 Kinesis 資料串流。檢閱指派給每筆記錄的異常分數，並判斷哪些分數表示應用程式外部的異常 (且需要收到提醒)。您可以使用 AWS Lambda 函數來處理這些異常分數並設定提醒。

本練習會使用美國東部 (維吉尼亞北部) (`us-east-1`) 來建立這些串流，以及您的應用程式。如果您使用任何其他區域，則須更新相應程式碼。

**Topics**
+ [步驟 1：準備](app-anomaly-prepare.md)
+ [步驟 2：建立應用程式](app-anom-score-create-app.md)
+ [步驟 3：設定應用程式輸出](app-anomaly-create-ka-app-config-destination.md)
+ [步驟 4：驗證輸出](app-anomaly-verify-output.md)

**後續步驟**  
[步驟 1：準備](app-anomaly-prepare.md)

# 步驟 1：準備
<a name="app-anomaly-prepare"></a>

為本練習建立 Amazon Kinesis Data Analytics 應用程式之前，您必須建立兩個 Kinesis 資料串流。將其中一個串流設定為應用程式的串流來源，另一個串流設定為 Kinesis Data Analytics 保留應用程式輸出的目的地。

**Topics**
+ [步驟 1.1：建立輸入和輸出資料串流](#app-anomaly-create-two-streams)
+ [步驟 1.2：將範例記錄寫入輸入串流。](#app-anomaly-write-sample-records-inputstream)

## 步驟 1.1：建立輸入和輸出資料串流
<a name="app-anomaly-create-two-streams"></a>

在本節中，您會建立兩個 Kinesis 串流：`ExampleInputStream` 和 `ExampleOutputStream`。您可以使用 AWS 管理主控台 或 AWS CLI建立這些串流。
+ 

**使用主控台**

  1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

  1. 選擇 **建立資料串流**。建立具有 `ExampleInputStream` 碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

  1. 重複上一個步驟，以名為 `ExampleOutputStream` 的碎片建立串流。
+ 

**使用 AWS CLI**

  1. 使用下列 Kinesis `create-stream` AWS CLI 命令來建立第一個串流 (`ExampleInputStream`)。

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. 運行相同的命令，將串流名稱更改為 `ExampleOutputStream`。這個命令會建立應用程式用來寫入輸出的第二個串流。

## 步驟 1.2：將範例記錄寫入輸入串流。
<a name="app-anomaly-write-sample-records-inputstream"></a>

在此步驟中，執行 Python 程式碼以持續產生範例記錄，並將這些記錄寫入 `ExampleInputStream` 串流。

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. 安裝 Python 與 `pip`。

   如需安裝 Python 的相關資訊，請參閱 [Python](https://www.python.org/) 網站。

   您可以使用 Pip 安裝相依性。如需安裝 Pip 的詳細資訊，請參閱 Pip 網站的[安裝](https://pip.pypa.io/en/stable/installing/)。

1. 執行以下 Python 程式碼。程式碼中的 `put-record` 命令會將 JSON 記錄寫入串流。

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**後續步驟**  
[步驟 2：建立應用程式](app-anom-score-create-app.md)

# 步驟 2：建立應用程式
<a name="app-anom-score-create-app"></a>

在本節建立 Amazon Kinesis Data Analytics 應用程式，如下所示：
+ 設定應用程式輸入，以使用您在 [步驟 1：準備](app-anomaly-prepare.md) 中建立的 Kinesis 資料串流作為串流來源。
+ 使用主控台上的**異常偵測**範本。

**建立應用程式**

1. 按照 Kinesis Data Analytics **入門**練習中的步驟 1、2 和 3 進行操作 (請參閱 [步驟 3.1：建立應用程式](get-started-create-app.md))。
   + 在來源設定中，執行下列動作：
     + 指定您在前一節建立的串流來源。
     + 主控台推斷結構描述後，請編輯結構描述，並將 `heartRate` 欄類型設定為 `INTEGER`。

       大部分的心率值都是正常的，而且探索程序很可能會將 `TINYINT` 類型指派給此資料欄。但是，一小部分的值顯示出高心率。如果這些高數值不符合 `TINYINT` 類型，Kinesis Data Analytics 會將這些資料列傳送至錯誤串流。將資料類型更新為 `INTEGER`，以便容納所有產生的心率資料。
   + 使用主控台上的**異常偵測**範本。然後，您可以更新範本程式碼，以提供適當的資料欄名稱。

1. 提供資料欄名稱以更新應用程式碼。生成的應用程式碼如下所示 (將此代碼貼到 SQL 編輯器中)：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. 在 Kinesis Data Analytics 主控台上執行 SQL 程式碼並檢閱結果：  
![\[主控台螢幕擷取畫面會顯示即時分析標籤，並在應用程式內串流中顯示生成的資料。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**後續步驟**  
[步驟 3：設定應用程式輸出](app-anomaly-create-ka-app-config-destination.md)

# 步驟 3：設定應用程式輸出
<a name="app-anomaly-create-ka-app-config-destination"></a>

完成 [步驟 2：建立應用程式](app-anom-score-create-app.md) 後，將得到應用程式碼，該程式碼會從串流來源讀取心率資料，並為每個來源指派異常分數。

您現在可以將應用程式結果從應用程式內串流傳送到外部目的地，即另一個 Kinesis 資料串流 (`OutputStreamTestingAnomalyScores`)。您可以分析異常分數並確定哪些心率是異常的。然後，您可以進一步擴充此應用程式以產生提醒。

請依照下列步驟設定應用程式輸出：



1. 開啟 Amazon Kinesis Data Analytics 主控台。在 SQL 編輯器中，選擇應用程式儀表板中的**目的地**或**新增目的地**。

1. 在**連線至目的地**頁面，選擇您在前一節建立的 `OutputStreamTestingAnomalyScores` 串流。

   現在您有一個外部目的地，可讓 Amazon Kinesis Data Analytics 將應用程式寫入的任何紀錄保留在應用程式內 `DESTINATION_SQL_STREAM` 串流中。

1. 您可以選擇性地設定 `OutputStreamTestingAnomalyScores` AWS Lambda 來監控串流並傳送提醒給您。如需說明，請參閱[使用 Lambda 函數預處理資料](lambda-preprocessing.md)。如果未設定提醒，您可以檢閱 Kinesis Data Analytics 寫入外部目的地 (Kinesis 資料串流) 的記錄 `OutputStreamTestingAnomalyScores`，如 [步驟 4：驗證輸出](app-anomaly-verify-output.md) 中所述。

**後續步驟**  
[步驟 4：驗證輸出](app-anomaly-verify-output.md)

# 步驟 4：驗證輸出
<a name="app-anomaly-verify-output"></a>

在 [步驟 3：設定應用程式輸出](app-anomaly-create-ka-app-config-destination.md) 設定應用程式輸出後，請使用下列 AWS CLI 命令來讀取應用程式寫入目的地串流的記錄：

1. 運行 `get-shard-iterator` 命令以獲取指向輸出串流資料的指標。

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   您將得到具有碎片迭代器值的回應，如下列範例回應所示：

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   複製碎片迭代器值。

1. 執行 AWS CLI `get-records` 命令。

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   此命令會傳回記錄頁面，以及您可以在後續 `get-records` 命令中使用的另一個碎片迭代器來擷取下一組記錄。

# 範例：偵測資料異常並取得說明 (RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION 函數)
<a name="app-anomaly-detection-with-explanation"></a>

Amazon Kinesis Data Analytics 提供 `RANDOM_CUT_FOREST_WITH_EXPLANATION` 函數，可根據數值欄中的值為每筆記錄指派異常分數。該函數還提供了異常的解釋。如需詳細資訊，請參閱* Amazon Managed Service for Apache Flink SQL 參考資料*中的 [RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-random-cut-forest-with-explanation.html)。

在本練習中，撰寫應用程式碼，以取得應用程式串流來源上的紀錄異常分數。您還可以獲得每個異常的解釋。

**Topics**
+ [步驟 1：準備資料](app-anomaly-with-ex-prepare.md)
+ [步驟 2：建立 Analytics 應用程式](app-anom-with-exp-create-app.md)
+ [步驟 3：檢查結果](examine-results-with-exp.md)

**首要步驟**  
[步驟 1：準備資料](app-anomaly-with-ex-prepare.md)

# 步驟 1：準備資料
<a name="app-anomaly-with-ex-prepare"></a>

在為此[範例](app-anomaly-detection-with-explanation.md)建立 Amazon Kinesis Data Analytics 應用程式之前，您必須先建立 Kinesis 資料串流作為應用程式的串流來源。您也可以執行 Python 程式碼，將模擬的血壓資料寫入串流。

**Topics**
+ [步驟 1.1：建立 Kinesis 資料串流](#app-anomaly-create-two-streams)
+ [步驟 1.2：將範例記錄寫入輸入串流](#app-anomaly-write-sample-records-inputstream)

## 步驟 1.1：建立 Kinesis 資料串流
<a name="app-anomaly-create-two-streams"></a>

在本節中，建立名為 `ExampleInputStream` 的 Kinesis 資料串流。您可以使用 AWS 管理主控台 或 建立此資料串流 AWS CLI。
+ 使用主控台：

  1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

  1. 在導覽窗格中選擇 **資料串流**。選擇 **建立 Kinesis 串流**。

  1. 在角色名稱輸入 **ExampleInputStream**。在碎片數鍵入 **1**。
+ 或者，若要使用 AWS CLI 建立資料串流，請執行下列命令：

  ```
  $ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1
  ```

## 步驟 1.2：將範例記錄寫入輸入串流
<a name="app-anomaly-write-sample-records-inputstream"></a>

在此步驟中，執行 Python 程式碼以持續產生範例記錄，並將其寫入您建立的資料串流。

1. 安裝 Python 與 Pip。

   如需安裝 Python 的相關資訊，請參閱 [Python](https://www.python.org/) 網站。

   您可以使用 Pip 安裝相依性。如需安裝 Pip 的詳細資訊，請參閱 Pip 文件中的[安裝](https://pip.pypa.io/en/stable/installing/)。

1. 執行以下 Python 程式碼。您可以將區域變更為要在本範例中使用的區域。程式碼中的 `put-record` 命令會將 JSON 記錄寫入串流。

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class PressureType(Enum):
       low = "LOW"
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_blood_pressure(pressure_type):
       pressure = {"BloodPressureLevel": pressure_type.value}
       if pressure_type == PressureType.low:
           pressure["Systolic"] = random.randint(50, 80)
           pressure["Diastolic"] = random.randint(30, 50)
       elif pressure_type == PressureType.normal:
           pressure["Systolic"] = random.randint(90, 120)
           pressure["Diastolic"] = random.randint(60, 80)
       elif pressure_type == PressureType.high:
           pressure["Systolic"] = random.randint(130, 200)
           pressure["Diastolic"] = random.randint(90, 150)
       else:
           raise TypeError
       return pressure
   
   
   def generate(stream_name, kinesis_client):
       while True:
           rnd = random.random()
           pressure_type = (
               PressureType.low
               if rnd < 0.005
               else PressureType.high
               if rnd > 0.995
               else PressureType.normal
           )
           blood_pressure = get_blood_pressure(pressure_type)
           print(blood_pressure)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(blood_pressure),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

**後續步驟**  
[步驟 2：建立 Analytics 應用程式](app-anom-with-exp-create-app.md)

# 步驟 2：建立 Analytics 應用程式
<a name="app-anom-with-exp-create-app"></a>

在本節中，建立 Amazon Kinesis Data Analytics 應用程式，並將您在 [步驟 1：準備資料](app-anomaly-with-ex-prepare.md) 中建立的 Kinesis 資料串流設定為串流來源 。然後執行使用該 `RANDOM_CUT_FOREST_WITH_EXPLANATION` 函數的應用程式碼。

**建立應用程式**

1. 在以下網址開啟 Kinesis 主控台：[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)。

1. 在導覽窗格中，選擇 **Data Analytics**，然後選擇**建立應用程式**。

1. 提供應用程式名稱和說明 (選用)，然後選擇**建立應用程式**。

1. 選擇**連接串流資料**，然後從清單中選擇 **ExampleInputStream**。

1. 選擇**探索結構描述**，並確定 `Systolic` 與 `Diastolic` 顯示為 `INTEGER` 資料欄。如果它們具備其他類型，請選擇**編輯結構描述**，然後將類型 `INTEGER` 指定給它們。

1. 在**即時分析**下方，選擇**至 SQL 編輯器**。出現提示時，選擇執行您的應用程式。

1. 將下列程式碼貼到 SQL 編輯器中，然後選擇**儲存並執行 SQL**。

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   -- Compute an anomaly score with explanation for each record in the input stream
   -- using RANDOM_CUT_FOREST_WITH_EXPLANATION
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "Systolic", "Diastolic", "BloodPressureLevel", ANOMALY_SCORE, ANOMALY_EXPLANATION 
         FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

**後續步驟**  
[步驟 3：檢查結果](examine-results-with-exp.md)

# 步驟 3：檢查結果
<a name="examine-results-with-exp"></a>

執行此[範例](app-anomaly-detection-with-explanation.md)的 SQL 程式碼時，您會先看到異常分數等於零的資料列。這發生在初始學習階段。您會得到類似以下的結果：

```
ROWTIME SYSTOLIC DIASTOLIC BLOODPRESSURELEVEL ANOMALY_SCORE ANOMALY_EXPLANATION
27:49.0	101      66        NORMAL             0.711460417   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0922","ATTRIBUTION_SCORE":"0.3792"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"0.0210","ATTRIBUTION_SCORE":"0.3323"}}
27:50.0	144      123       HIGH               3.855851061   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.8567","ATTRIBUTION_SCORE":"1.7447"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"7.0982","ATTRIBUTION_SCORE":"2.1111"}}
27:50.0	113      69        NORMAL             0.740069409   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0549","ATTRIBUTION_SCORE":"0.3750"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0394","ATTRIBUTION_SCORE":"0.3650"}}
27:50.0	105      64        NORMAL             0.739644157   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0245","ATTRIBUTION_SCORE":"0.3667"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0524","ATTRIBUTION_SCORE":"0.3729"}}
27:50.0	100      65        NORMAL             0.736993425   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0203","ATTRIBUTION_SCORE":"0.3516"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0454","ATTRIBUTION_SCORE":"0.3854"}}
27:50.0	108      69        NORMAL             0.733767202   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0974","ATTRIBUTION_SCORE":"0.3961"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0189","ATTRIBUTION_SCORE":"0.3377"}}
```
+ `RANDOM_CUT_FOREST_WITH_EXPLANATION` 函數中的演算法會確保 `Systolic` 和 `Diastolic` 資料欄為數值，並將當作它們輸入。
+ 該 `BloodPressureLevel` 資料欄具有文本資料，因此演算法不會考慮。此資歷欄僅為視覺輔助，以幫助您快速發現範例中的正常、高血壓和低血壓水準。
+ 在 `ANOMALY_SCORE` 欄中，分數較高的記錄更為異常。此樣本結果集內的第二個記錄最異常，異常分數為 3.855851061。
+ 若要瞭解演算法考量的每個數值欄對異常分數的貢獻程度，請參閱 `ANOMALY_SCORE` 欄中名為 `ATTRIBUTION_SCORE` 的 JSON 欄位。在這組樣本結果第二列的情況下，`Systolic` 和 `Diastolic` 欄貢獻給異常的比例為 1. 7447:2.1111。換句話說，異常分數 45％ 的解釋歸因於收縮值，其餘歸因於舒張值。
+ 若要判斷此範例第二列所表示的點異常之方向，請參閱名為 `DIRECTION` 的 JSON 欄位。在此案例中，舒張值和收縮值都標記為 `HIGH`。若要判斷這些方向正確的信賴度，請參閱名為 `STRENGTH` 的 JSON 欄位。在此範例中，演算法更有信心舒張值高。事實上，舒張讀數的正常值通常為 60—80，123 比預期高得多。

# 範例：偵測串流上的熱點 (熱點功能)
<a name="app-hotspots-detection"></a>

Amazon Kinesis Data Analytics 提供的 `HOTSPOTS` 功能可以找出並傳回資料中相對密集區域的相關資訊。如需詳細資訊，請參閱 *Amazon Managed Service for Apache Flink SQL 參考資料*中的[熱點](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-hotspots.html)。

在本練習中撰寫應用程式碼，以便在應用程式的串流來源上尋找熱點。若要設定應用程式，請執行下列動作：

1. **設定串流來源**：設定 Kinesis 串流並寫入範例座標資料，如下所示：

   ```
   {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
   {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}
   ```

   此範例提供 Python 指令碼供您填入串流。`x` 和 `y` 值是隨機產生的，有些記錄會叢集在某些位置。

   如果指令碼刻意生成數值做為熱點的一部分，則會提供 `is_hot` 欄位做為指標。這可協助您評估熱點偵測功能是否正常運作。

1. **建立應用程式**：接著使用 AWS 管理主控台建立 Kinesis Data Analytics 應用程式。將串流來源對應至應用程式內串流 (`SOURCE_SQL_STREAM_001`)，以設定應用程式輸入。當應用程式啟動時，Kinesis Data Analytics 會持續讀取串流來源，並將記錄插入應用程式內串流。

   在本練習中，針對應用程式使用下列程式碼：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   此程式碼會讀取 `SOURCE_SQL_STREAM_001` 中的資料列、分析找出顯著熱點，然後將產生的資料寫入另一個應用程式內串流 (`DESTINATION_SQL_STREAM`)。您可以使用幫浦將資料列插入應用程式內串流。如需詳細資訊，請參閱[應用程式內串流與幫浦](streams-pumps.md)。

1. **設定輸出**：設定應用程式輸出，將資料從應用程式傳送至外部目的地，即另一個 Kinesis 資料串流。查看熱點分數，並判斷哪些分數表示出現了熱點（且需要收到提醒）。您可以使用 AWS Lambda 函數進一步處理熱點資訊並設定提醒。

1. **驗證輸出**：此範例包含一個 JavaScript 應用程式，可從輸出串流讀取資料並以圖形方式顯示，以便您即時檢視應用程式產生的熱點。



本練習會使用美國西部 (奧勒岡) (`us-west-2`) 來建立這些串流及應用程式。如果您使用任何其他區域，請更新相應程式碼。

**Topics**
+ [步驟 1：建立輸入和輸出串流](app-hotspots-prepare.md)
+ [步驟 2：建立 Amazon Kinesis Data Analytics 應用程式](app-hotspot-create-app.md)
+ [步驟 3：設定應用程式輸出](app-hotspots-create-ka-app-config-destination.md)
+ [步驟 4：驗證應用程式輸出](app-hotspots-verify-output.md)

# 步驟 1：建立輸入和輸出串流
<a name="app-hotspots-prepare"></a>

為[熱點範例](app-hotspots-detection.md)建立 Amazon Kinesis Data Analytics 應用程式之前，您必須建立兩個 Kinesis 資料串流。將其中一個串流設定為應用程式的串流來源，另一個串流設定為 Kinesis Data Analytics 保留應用程式輸出的目的地。

**Topics**
+ [步驟 1.1：建立 Kinesis 資料串流](#app-hotspots-create-two-streams)
+ [步驟 1.2：將範例記錄寫入輸入串流](#app-hotspots-write-sample-records-inputstream)

## 步驟 1.1：建立 Kinesis 資料串流
<a name="app-hotspots-create-two-streams"></a>

在本節中，建立兩個 Kinesis 資料串流：`ExampleInputStream` 和 `ExampleOutputStream`。

使用主控台或 AWS CLI建立資料串流。
+ 如要使用主控台建立資料串流：

  1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

  1. 在導覽窗格中選擇**資料串流**。

  1. 選擇**建立 Kinesis 串流**，然後建立內含一個名為 `ExampleInputStream` 的碎片之串流。

  1. 重複上一個步驟，以名為 `ExampleOutputStream` 的碎片建立串流。
+ 使用 AWS CLI建立資料串流：
  + 使用下列 Kinesis `create-stream` AWS CLI 命令建立串流 (`ExampleInputStream` 和 `ExampleOutputStream`)。若要建立應用程式用來寫入輸出的第二個串流，請執行相同的命令，並將串流名稱變更為 `ExampleOutputStream`。

    ```
    $ aws kinesis create-stream \
    --stream-name ExampleInputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
                             
    $ aws kinesis create-stream \
    --stream-name ExampleOutputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
    ```

## 步驟 1.2：將範例記錄寫入輸入串流
<a name="app-hotspots-write-sample-records-inputstream"></a>

在此步驟中，執行 Python 程式碼以持續產生範例記錄，並將這些記錄寫入 `ExampleInputStream` 串流。

```
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
{"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
```

1. 安裝 Python 與 `pip`。

   如需安裝 Python 的相關資訊，請參閱 [Python](https://www.python.org/) 網站。

   您可以使用 Pip 安裝相依性。如需安裝 Pip 的詳細資訊，請參閱 Pip 網站的[安裝](https://pip.pypa.io/en/stable/installing/)。

1. 執行以下 Python 程式碼。該程式碼會執行下列作業：
   + 在 (X, Y) 平面中的某個位置生成潛在熱點。
   + 為每個熱點產生一組 1,000 個點。在這些點中，20% 會叢集在熱點周圍。其餘部分會在整個空間內隨機產生。
   + `put-record` 命令會將 JSON 記錄寫入串流。
**重要**  
請勿將此檔案上傳到 Web 伺服器，因為它包含您的 AWS 憑證。

   ```
    
   import json
   from pprint import pprint
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_hotspot(field, spot_size):
       hotspot = {
           "left": field["left"] + random.random() * (field["width"] - spot_size),
           "width": spot_size,
           "top": field["top"] + random.random() * (field["height"] - spot_size),
           "height": spot_size,
       }
       return hotspot
   
   
   def get_record(field, hotspot, hotspot_weight):
       rectangle = hotspot if random.random() < hotspot_weight else field
       point = {
           "x": rectangle["left"] + random.random() * rectangle["width"],
           "y": rectangle["top"] + random.random() * rectangle["height"],
           "is_hot": "Y" if rectangle is hotspot else "N",
       }
       return {"Data": json.dumps(point), "PartitionKey": "partition_key"}
   
   
   def generate(
       stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
   ):
       """
       Generates points used as input to a hotspot detection algorithm.
       With probability hotspot_weight (20%), a point is drawn from the hotspot;
       otherwise, it is drawn from the base field. The location of the hotspot
       changes for every 1000 points generated.
       """
       points_generated = 0
       hotspot = None
       while True:
           if points_generated % 1000 == 0:
               hotspot = get_hotspot(field, hotspot_size)
           records = [
               get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
           ]
           points_generated += len(records)
           pprint(records)
           kinesis_client.put_records(StreamName=stream_name, Records=records)
   
           time.sleep(0.1)
   
   
   if __name__ == "__main__":
       generate(
           stream_name=STREAM_NAME,
           field={"left": 0, "width": 10, "top": 0, "height": 10},
           hotspot_size=1,
           hotspot_weight=0.2,
           batch_size=10,
           kinesis_client=boto3.client("kinesis"),
       )
   ```



**後續步驟**  
[步驟 2：建立 Amazon Kinesis Data Analytics 應用程式](app-hotspot-create-app.md)

# 步驟 2：建立 Amazon Kinesis Data Analytics 應用程式
<a name="app-hotspot-create-app"></a>

在[熱點範例](app-hotspots-detection.md)的本節中，建立 Amazon Kinesis Data Analytics 應用程式，如下所示：
+ 設定應用程式輸入，以使用您在[步驟 1](app-hotspots-prepare.md) 中建立的 Kinesis 資料串流作為串流來源。
+ 使用 AWS 管理主控台中提供的應用程式碼。

**建立應用程式**

1. 按照[入門](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)練習中的步驟 1、2 和 3 建立 Kinesis Data Analytics 應用程式 (請參閱 [步驟 3.1：建立應用程式](get-started-create-app.md))。

   在來源設定中，執行下列動作：
   + 指定您在 [步驟 1：建立輸入和輸出串流](app-hotspots-prepare.md) 中建立的串流來源。
   + 主控台推斷結構描述後，請編輯結構描述。請確定 `x` 和 `y` 欄類型已設定為 `DOUBLE`，且 `IS_HOT` 欄類型設定為 `VARCHAR`。

1. 使用以下應用程式碼 (您可將此代碼貼到 SQL 編輯器中)：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   

1. 執行 SQL 程式碼並檢閱結果。  
![\[顯示列時間，熱點和 hotspot_heat 的 SQL 代碼結果。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/hotspot-v2-40.png)





**後續步驟**  
[步驟 3：設定應用程式輸出](app-hotspots-create-ka-app-config-destination.md)

# 步驟 3：設定應用程式輸出
<a name="app-hotspots-create-ka-app-config-destination"></a>

在[熱點範例](app-hotspots-detection.md)中，可用 Amazon Kinesis Data Analytics 應用程式碼探索串流來源的重要熱點，並為每個熱點指派熱度分數。

您現在可以將應用程式結果從應用程式內串流傳送到外部目的地，即另一個 Kinesis 資料串流 (`ExampleOutputStream`)。然後，您可以分析熱點分數，並確定熱點熱度的適當閾值。您可以進一步擴充此應用程式以產生提醒。

**如要設定應用程式輸出**

1. 在 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesisanalytics) 上開啟 Kinesis Data Analytics 主控台。

1. 在 SQL 編輯器中，選擇應用程式儀表板中的**目的地**或**新增目的地**。

1. 在**新增目的地**頁面，選擇**從串流中選取**。然後，選擇您在上一節建立的 `ExampleOutputStream` 串流。

   現在您有一個外部目的地，可讓 Amazon Kinesis Data Analytics 將應用程式寫入的任何紀錄保留在應用程式內 `DESTINATION_SQL_STREAM` 串流中。

1. 您可以選擇設定 `ExampleOutputStream` AWS Lambda 來監控串流並傳送提醒給您。如需詳細資訊，請參閱[使用 Lambda 函數作為輸出](how-it-works-output-lambda.md)。您也可以檢閱 Kinesis Data Analytics 寫入外部目的地 (Kinesis 串流 `ExampleOutputStream`) 的記錄，如 [步驟 4：驗證應用程式輸出](app-hotspots-verify-output.md) 中所述。

**後續步驟**  
[步驟 4：驗證應用程式輸出](app-hotspots-verify-output.md)

# 步驟 4：驗證應用程式輸出
<a name="app-hotspots-verify-output"></a>

在[熱點範例](app-hotspots-detection.md)的這節中，設定 Web 應用程式，以可擴展向量圖形 (SVG) 控制顯示熱點資訊。

1. 使用下列內容建立名為 `index.html` 的檔案：

   ```
   <!doctype html>
   <html lang=en>
   <head>
       <meta charset=utf-8>
       <title>hotspots viewer</title>
   
       <style>
       #visualization {
         display: block;
         margin: auto;
       }
   
       .point {
         opacity: 0.2;
       }
   
       .hot {
         fill: red;
       }
   
       .cold {
         fill: blue;
       }
   
       .hotspot {
         stroke: black;
         stroke-opacity: 0.8;
         stroke-width: 1;
         fill: none;
       }
       </style>
       <script src="https://sdk.amazonaws.com/js/aws-sdk-2.202.0.min.js"></script>
       <script src="https://d3js.org/d3.v4.min.js"></script>
   </head>
   <body>
   <svg id="visualization" width="600" height="600"></svg>
   <script src="hotspots_viewer.js"></script>
   </body>
   </html>
   ```

1. 在同一目錄中，建立名為 `hotspots_viewer.js` 的檔案，內含下列內容：在提供的變數中提供您的 、憑證和輸出串流名稱。

   ```
   // Visualize example output from the Kinesis Analytics hotspot detection algorithm.
   // This script assumes that the output stream has a single shard.
   
   // Modify this section to reflect your AWS configuration
   var awsRegion = "",        // The  where your Kinesis Analytics application is configured.
       accessKeyId = "",      // Your Access Key ID
       secretAccessKey = "",  // Your Secret Access Key
       outputStream = "";     // The name of the Kinesis Stream where the output from the HOTSPOTS function is being written
   
   // The variables in this section should reflect way input data was generated and the parameters that the HOTSPOTS
   // function was called with.
   var windowSize = 1000, // The window size used for hotspot detection
       minimumDensity = 40,  // A filter applied to returned hotspots before visualization
       xRange = [0, 10],  // The range of values to display on the x-axis
       yRange = [0, 10];  // The range of values to display on the y-axis
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // D3 setup
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var svg = d3.select("svg"),
       margin = {"top": 20, "right": 20, "bottom": 20, "left": 20},
       graphWidth = +svg.attr("width") - margin.left - margin.right,
       graphHeight = +svg.attr("height") - margin.top - margin.bottom;
   
   // Return the linear function that maps the segment [a, b] to the segment [c, d].
   function linearScale(a, b, c, d) {
       var m = (d - c) / (b - a);
       return function(x) {
           return c + m * (x - a);
       };
   }
   
   // helper functions to extract the x-value from a stream record and scale it for output
   var xValue = function(r) { return r.x; },
       xScale = linearScale(xRange[0], xRange[1], 0, graphWidth),
       xMap = function(r) { return xScale(xValue(r)); };
   
   // helper functions to extract the y-value from a stream record and scale it for output
   var yValue = function(r) { return r.y; },
       yScale = linearScale(yRange[0], yRange[1], 0, graphHeight),
       yMap = function(r) { return yScale(yValue(r)); };
   
   // a helper function that assigns a CSS class to a point based on whether it was generated as part of a hotspot
   var classMap = function(r) { return r.is_hot == "Y" ? "point hot" : "point cold"; };
   
   var g = svg.append("g")
       .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
   
   function update(records, hotspots) {
   
       var points = g.selectAll("circle")
           .data(records, function(r) { return r.dataIndex; });
   
       points.enter().append("circle")
           .attr("class", classMap)
           .attr("r", 3)
           .attr("cx", xMap)
           .attr("cy", yMap);
   
       points.exit().remove();
   
       if (hotspots) {
           var boxes = g.selectAll("rect").data(hotspots);
   
           boxes.enter().append("rect")
               .merge(boxes)
               .attr("class", "hotspot")
               .attr("x", function(h) { return xScale(h.minValues[0]); })
               .attr("y", function(h) { return yScale(h.minValues[1]); })
               .attr("width", function(h) { return xScale(h.maxValues[0]) - xScale(h.minValues[0]); })
               .attr("height", function(h) { return yScale(h.maxValues[1]) - yScale(h.minValues[1]); });
   
           boxes.exit().remove();
       }
   }
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Use the AWS SDK to pull output records from Kinesis and update the visualization
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var kinesis = new AWS.Kinesis({
       "region": awsRegion,
       "accessKeyId": accessKeyId,
       "secretAccessKey": secretAccessKey
   });
   
   var textDecoder = new TextDecoder("utf-8");
   
   // Decode an output record into an object and assign it an index value
   function decodeRecord(record, recordIndex) {
       var record = JSON.parse(textDecoder.decode(record.Data));
       var hotspots_result = JSON.parse(record.HOTSPOTS_RESULT);
       record.hotspots = hotspots_result.hotspots
           .filter(function(hotspot) { return hotspot.density >= minimumDensity});
       record.index = recordIndex
       return record;
   }
   
   // Fetch a new records from the shard iterator, append them to records, and update the visualization
   function getRecordsAndUpdateVisualization(shardIterator, records, lastRecordIndex) {
       kinesis.getRecords({
           "ShardIterator": shardIterator
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var newRecords = data.Records.map(function(raw) { return decodeRecord(raw, ++lastRecordIndex); });
           newRecords.forEach(function(record) { records.push(record); });
   
           var hotspots = null;
           if (newRecords.length > 0) {
               hotspots = newRecords[newRecords.length - 1].hotspots;
           }
   
           while (records.length > windowSize) {
               records.shift();
           }
   
           update(records, hotspots);
   
           getRecordsAndUpdateVisualization(data.NextShardIterator, records, lastRecordIndex);
       });
   }
   
   // Get a shard iterator for the output stream and begin updating the visualization. Note that this script will only
   // read records from the first shard in the stream.
   function init() {
       kinesis.describeStream({
           "StreamName": outputStream
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var shardId = data.StreamDescription.Shards[0].ShardId;
   
           kinesis.getShardIterator({
               "StreamName": outputStream,
               "ShardId": shardId,
               "ShardIteratorType": "LATEST"
           }, function(err, data) {
               if (err) {
                   console.log(err, err.stack);
                   return;
               }
               getRecordsAndUpdateVisualization(data.ShardIterator, [], 0);
           })
       });
   }
   
   // Start the visualization
   init();
   ```

1. 在執行第一部分的 Python 程式碼之後，在網頁瀏覽器中開啟 `index.html`。熱點資訊會顯示在頁面上，如下所示。

     
![\[顯示熱點資訊的可擴展向量圖形圖。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/hotspots_visualizer.png)

# 範例：提醒與錯誤
<a name="examples-alerts"></a>

本節提供使用提醒與錯誤的 Kinesis Data Analytics 應用程式範例。每個範例皆包含逐步指示與程式碼，可協助您建立 Kinesis Data Analytics 應用程式並測試結果。

**Topics**
+ [範例：建立簡單提醒](app-simple-alerts.md)
+ [範例：建立限流提醒](app-throttled-alerts.md)
+ [範例：探索應用程式內錯誤串流](app-explore-error-stream.md)

# 範例：建立簡單提醒
<a name="app-simple-alerts"></a>

在此 Kinesis Data Analytics 應用程式中，查詢會在透過示範串流建立的應用程式內串流上持續執行。如需詳細資訊，請參閱[持續查詢](continuous-queries-concepts.md)。

如果有任何資料列顯示大於 1% 的股票價格變更，則這些資料欄會插入另一個應用程式內串流。在練習中，您可以規劃應用程式輸出讓結果持續留在外部目的地。然後，您可以進一步調查結果。例如，您可以使用 AWS Lambda 函數來處理記錄並傳送提醒給您。

**建立簡單提醒應用程式**

1. 依照 Kinesis Data Analytics[入門](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)練習中所述建立分析應用程式。

1. 在 Kinesis Data Analytics 的 SQL 編輯器中，以下列項目取代應用程式碼：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
              (ticker_symbol VARCHAR(4), 
               sector        VARCHAR(12), 
               change        DOUBLE, 
               price         DOUBLE);
   
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker_symbol, sector, change, price 
         FROM   "SOURCE_SQL_STREAM_001"
         WHERE  (ABS(Change / (Price - Change)) * 100) > 1;
   ```

   應用程式碼中的 `SELECT` 陳述式會篩選 `SOURCE_SQL_STREAM_001` 中的資料欄，找出大於 1% 的股票價格變動。然後，它會使用幫浦將這些資料欄插入另一個應用程式內串流 `DESTINATION_SQL_STREAM`。如需使用幫浦將資料欄插入應用程式串流之編碼模式的相關詳細資訊，請參閱 [應用程式碼](how-it-works-app-code.md)。

1. 選擇 **儲存並執行 SQL**。

1. 新增目的地。如要新增，請在 SQL 編輯器中選擇**目的地**標籤，或在應用程式詳細資訊頁面上選擇**新增目的地**。

   1. 在 SQL 編輯器中，選擇**目的地**標籤，然後選擇**連線至目的地**。

      在**連線至目的地**頁面上，選擇**新增**。

   1. 選擇**至 Kinesis 串流**。

   1. 在 Amazon Kinesis Data Streams 主控台上，建立具有一個碎片的新 Kinesis 串流 (例如 `gs-destination`)。等待直到流狀態為**作用中**。

   1. 返回 Kinesis Data Analytics 主控台。在**連線至目的地**頁面，選擇您建立的串流。

      如果串流未出現，請重新整理頁面。

   1. 選擇**儲存並繼續**。

   現在您有一個外部目的地，即 Kinesis 資料串流，Kinesis Data Analytics 會將您的應用程式輸出保留在 `DESTINATION_SQL_STREAM` 應用程式內串流中。

1. 設定 AWS Lambda 以監控您建立的 Kinesis 串流，並叫用 Lambda 函數。

   如需說明，請參閱[使用 Lambda 函數預處理資料](lambda-preprocessing.md)。

# 範例：建立限流提醒
<a name="app-throttled-alerts"></a>

在此 Kinesis Data Analytics 應用程式中，查詢會在透過示範串流建立的應用程式內串流上持續執行。如需詳細資訊，請參閱[持續查詢](continuous-queries-concepts.md)。如果有任何資料欄顯示大於 1% 的股票價格變更，則這些資料列會插入另一個應用程式內串流。該應用程序會限流提醒，以便在股票價格變化時立即發送提醒。但是，每個股票代碼每分鐘不會發送超過一個提醒到應用程式內串流。

**建立限流提醒應用程式**

1. 依照 Kinesis Data Analytics[入門](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)練習中所述建立 Kinesis Data Analytics 應用程式。

1. 在 Kinesis Data Analytics 的 SQL 編輯器中，以下列項目取代應用程式碼：

   ```
   CREATE OR REPLACE STREAM "CHANGE_STREAM" 
              (ticker_symbol VARCHAR(4), 
               sector        VARCHAR(12), 
               change        DOUBLE, 
               price         DOUBLE);
   
   CREATE OR REPLACE PUMP "change_pump" AS 
      INSERT INTO "CHANGE_STREAM"
         SELECT STREAM ticker_symbol, sector, change, price 
         FROM   "SOURCE_SQL_STREAM_001"
         WHERE  (ABS(Change / (Price - Change)) * 100) > 1;
         
   -- ** Trigger Count and Limit **
   -- Counts "triggers" or those values that evaluated true against the previous where clause
   -- Then provides its own limit on the number of triggers per hour per ticker symbol to what
   -- is specified in the WHERE clause
   
   CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM (
      ticker_symbol VARCHAR(4), 
      change REAL,
      trigger_count INTEGER);
   
   CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAM
   SELECT STREAM ticker_symbol, change, trigger_count
   FROM (
       SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_count
       FROM "CHANGE_STREAM"
       --window to perform aggregations over last minute to keep track of triggers
       WINDOW W1 AS (PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING)
   )
   WHERE trigger_count >= 1;
   ```

   應用程式碼中的 `SELECT` 陳述式會篩選 `SOURCE_SQL_STREAM_001` 中的資料欄，找出變更大於 1% 的股票價格，並使用幫浦將這些資料列插入另一個應用程式內串流 `CHANGE_STREAM`。

   接著，應用程式會建立第二個名為 `TRIGGER_COUNT_STREAM` 的串流，來進行限流提醒。第二個查詢會從視窗中選取記錄，該視窗在每次接受記錄時向前跳轉，因此每個股票代號每分鐘只有一筆記錄會寫入串流。

1. 選擇**儲存並執行 SQL**。

輸出至 `TRIGGER_COUNT_STREAM` 的串流類似於下列範例：

![\[主控台螢幕截圖顯示包含股票代號，百分比變化和觸發計數欄的輸出串流。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex-throttle-alerts.png)


# 範例：探索應用程式內錯誤串流
<a name="app-explore-error-stream"></a>

Amazon Kinesis Data Analytics 會為您建立的每個應用程式提供應用程式內錯誤串流。應用程式無法處理的任何資料欄都會傳送至此錯誤串流。您可以考慮將錯誤串流資料保存到外部目的地，以便進行調查。

在主控台上執行以下練習。在這些範例中，編輯探索過程推斷的結構描述，然後驗證傳送至錯誤資料串流的資料欄，以便在輸入組態中引入錯誤。

**Topics**
+ [介紹剖析錯誤](#intro-error-parse-error)
+ [引入除以零錯誤](#intro-error-divide-zero)

## 介紹剖析錯誤
<a name="intro-error-parse-error"></a>

在本練習中，您會引入剖析錯誤。

1. 依照 Kinesis Data Analytics [入門](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)練習中所述建立 Kinesis Data Analytics 應用程式。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**。

1. 如果按照入門練習進行操作，您的帳戶中即會有一個示範串流 (`kinesis-analytics-demo-stream`)。在**連接到來源**頁面，選擇此示範串流。

1. Kinesis Data Analytics 會從示範串流取得範例，為其建立的應用程式內輸入串流推斷結構描述。主控台會在**格式化串流範例**標籤中顯示推斷的結構描述和範例資料。

1. 接下來，編輯結構描述並修改資料欄類型，以引入剖析錯誤。選擇**編輯結構描述**。

1. 將 `TICKER_SYMBOL` 資料欄類型從 `VARCHAR(4)` 變更為 `INTEGER`。

   既然建立的應用程式內結構描述之資料欄類型無效，Kinesis Data Analytics 就無法將資料引入應用程式內串流。相反地，它會將資料欄傳送至錯誤資料流。

1. 選擇**儲存結構描述**。

1. 選擇**重新整理結構描述範例**。

   請注意，**格式化串流**範例中沒有資料欄。但是，**錯誤串流**標籤顯示帶有錯誤訊息的資料。**錯誤串流**標籤會顯示傳送至應用程式內錯誤串流的資料。

   因為您已變更資料欄類型，Kinesis Data Analytics 無法將資料引入應用程式內的輸入串流。相反地，它會將資料欄傳送至錯誤資料流。

## 引入除以零錯誤
<a name="intro-error-divide-zero"></a>

在本練習中，更新應用程式碼以引入執行期錯誤 (除以零)。請注意，Amazon Kinesis Data Analytics 會將產生的資料列傳送到應用程式內錯誤串流，而不是傳送到應該寫入結果的 `DESTINATION_SQL_STREAM` 應用程式內串流。



1. 依照 Kinesis Data Analytics [入門](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)練習中所述建立 Kinesis Data Analytics 應用程式。

   在**即時分析**標籤上驗證結果，如下所示：

   酸

1. 更新應用程式碼中的 `SELECT` 陳述式，以引入除以零；例如：

   ```
   SELECT STREAM ticker_symbol, sector, change, (price / 0) as ProblemColumn
   FROM "SOURCE_SQL_STREAM_001"
   WHERE sector SIMILAR TO '%TECH%';
   ```

   

1. 執行應用程式。

   由於發生除以零的執行期錯誤，Kinesis Data Analytics 會將資料欄傳送到應用程式內錯誤串流，而不是寫入 `DESTINATION_SQL_STREAM`。在**即時分析**標籤上，選擇錯誤串流，然後您就可以在應用程式內錯誤串流中看到這些資料欄。

# 範例：解決方案加速器
<a name="examples_solution"></a>

[AWS 解決方案網站](https://aws.amazon.com/solutions/)有可用的 AWS CloudFormation 範本，可用來快速建立完整的串流資料解決方案。

可使用以下範本：

## AWS 帳戶 活動的即時洞見
<a name="examples_solution_activity"></a>

此解決方案可即時記錄和視覺化 AWS 帳戶（您的） 資源存取和用量指標。如需詳細資訊，請參閱[AWS 帳戶 活動的即時洞](https://docs.aws.amazon.com/solutions/latest/real-time-insights-account-activity/welcome.html)見。

## 使用 Kinesis Data Analytics 進行即時 AWS IoT 裝置監控
<a name="examples_solution_iot"></a>

此解決方案可即時收集、處理、分析和視覺化 IoT 裝置連線和活動資料。如需詳細資訊，請參閱[使用 Kinesis Data Analytics 進行即時 AWS IoT 裝置監控](https://docs.aws.amazon.com/solutions/latest/real-time-iot-device-monitoring-with-kinesis/welcome.html)。

## 使用 Kinesis Data Analytics 來進行即時 Web 分析
<a name="examples_solution_web"></a>

該解決方案實時收集，處理，分析和視覺化網站點擊流資料。如需詳細資訊，請參閱[使用 Kinesis Data Analytics 進行即時 Web 分析](https://docs.aws.amazon.com/solutions/latest/real-time-web-analytics-with-kinesis/welcome.html)。

## Amazon Connected 車輛解決方案
<a name="examples_solution_vehicle"></a>

該解決方案實時收集，處理，分析和視覺化來自車輛的 IoT 資料。如需詳細資訊，請參閱 [Amazon Connected 車輛解決方案](https://docs.aws.amazon.com//solutions/latest/connected-vehicle-solution/welcome.html)。