

在仔細考慮之後，我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式：

1. 從 **2025 年 9 月 1 日起，**我們不會為 Amazon Kinesis Data Analytics for SQL 應用程式提供任何錯誤修正，因為考慮到即將終止，我們將對其提供有限的支援。

2. 從 **2025 年 10 月 15 日起，**您將無法建立新的 Kinesis Data Analytics for SQL 應用程式。

3. 我們將自 **2026 年 1 月 27** 日起刪除您的應用程式。您將無法啟動或操作 Amazon Kinesis Data Analytics for SQL 應用程式。從那時起，Amazon Kinesis Data Analytics for SQL 將不再提供支援。如需詳細資訊，請參閱[Amazon Kinesis Data Analytics for SQL 應用程式終止](discontinuation.md)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 範例：偵測串流上的資料異常 (RANDOM\$1CUT\$1FOREST 函數)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics 提供函數 (`RANDOM_CUT_FOREST`)，可根據數值欄中的值為每筆記錄指派異常分數。如需詳細資訊，請參閱* Amazon Managed Service for Apache Flink SQL 參考資料*中的 [`RANDOM_CUT_FOREST` 函數](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)。

在本練習中，撰寫應用程式碼，將異常分數指派給應用程式串流來源上的記錄。若要設定應用程式，請執行下列動作：

1. **設定串流來源**：設定 Kinesis 資料串流並寫入範例 `heartRate` 資料，如下所示：

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   此程序會提供 Python 指令碼供您填入串流。這些 `heartRate` 值是隨機產生的，99% 記錄的 `heartRate` 值介於 60 到 100 之間，而只有 1% 的 `heartRate` 值介於 150 和 200 之間。因此，`heartRate` 值介於 150 和 200 之間的記錄為異常。

1. **設定輸入**：使用主控台可建立 Kinesis Data Analytics 應用程式，並透過將串流來源對應至應用程式內串流 (`SOURCE_SQL_STREAM_001`) ，以設定應用程式輸入。當應用程式啟動時，Kinesis Data Analytics 會持續讀取串流來源，並將記錄插入應用程式內串流。

1. **指定應用程式碼**：此範例使用下列應用程式碼：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   程式碼會讀取 `SOURCE_SQL_STREAM_001` 中的資料列、指派異常分數，然後將產生的資料欄寫入另一個應用程式內串流 (`TEMP_STREAM`)。然後，應用程式碼會排序 `TEMP_STREAM` 中的記錄，並將結果儲存到另一個應用程式內串流 (`DESTINATION_SQL_STREAM`)。您可以使用幫浦將資料列插入應用程式內串流。如需詳細資訊，請參閱[應用程式內串流與幫浦](streams-pumps.md)。

1. **配置輸出**：設定將應用程式輸出，將 `DESTINATION_SQL_STREAM` 中的資料保存在外部目的地，即另一個 Kinesis 資料串流。檢閱指派給每筆記錄的異常分數，並判斷哪些分數表示應用程式外部的異常 (且需要收到提醒)。您可以使用 AWS Lambda 函數來處理這些異常分數並設定提醒。

本練習會使用美國東部 (維吉尼亞北部) (`us-east-1`) 來建立這些串流，以及您的應用程式。如果您使用任何其他區域，則須更新相應程式碼。

**Topics**
+ [步驟 1：準備](app-anomaly-prepare.md)
+ [步驟 2：建立應用程式](app-anom-score-create-app.md)
+ [步驟 3：設定應用程式輸出](app-anomaly-create-ka-app-config-destination.md)
+ [步驟 4：驗證輸出](app-anomaly-verify-output.md)

**後續步驟**  
[步驟 1：準備](app-anomaly-prepare.md)

# 步驟 1：準備
<a name="app-anomaly-prepare"></a>

為本練習建立 Amazon Kinesis Data Analytics 應用程式之前，您必須建立兩個 Kinesis 資料串流。將其中一個串流設定為應用程式的串流來源，另一個串流設定為 Kinesis Data Analytics 保留應用程式輸出的目的地。

**Topics**
+ [步驟 1.1：建立輸入和輸出資料串流](#app-anomaly-create-two-streams)
+ [步驟 1.2：將範例記錄寫入輸入串流。](#app-anomaly-write-sample-records-inputstream)

## 步驟 1.1：建立輸入和輸出資料串流
<a name="app-anomaly-create-two-streams"></a>

在本節中，您會建立兩個 Kinesis 串流：`ExampleInputStream` 和 `ExampleOutputStream`。您可以使用 AWS 管理主控台 或 AWS CLI建立這些串流。
+ 

**使用主控台**

  1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

  1. 選擇 **建立資料串流**。建立具有 `ExampleInputStream` 碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

  1. 重複上一個步驟，以名為 `ExampleOutputStream` 的碎片建立串流。
+ 

**使用 AWS CLI**

  1. 使用下列 Kinesis `create-stream` AWS CLI 命令來建立第一個串流 (`ExampleInputStream`)。

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. 運行相同的命令，將串流名稱更改為 `ExampleOutputStream`。這個命令會建立應用程式用來寫入輸出的第二個串流。

## 步驟 1.2：將範例記錄寫入輸入串流。
<a name="app-anomaly-write-sample-records-inputstream"></a>

在此步驟中，執行 Python 程式碼以持續產生範例記錄，並將這些記錄寫入 `ExampleInputStream` 串流。

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. 安裝 Python 與 `pip`。

   如需安裝 Python 的相關資訊，請參閱 [Python](https://www.python.org/) 網站。

   您可以使用 Pip 安裝相依性。如需安裝 Pip 的詳細資訊，請參閱 Pip 網站的[安裝](https://pip.pypa.io/en/stable/installing/)。

1. 執行以下 Python 程式碼。程式碼中的 `put-record` 命令會將 JSON 記錄寫入串流。

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**後續步驟**  
[步驟 2：建立應用程式](app-anom-score-create-app.md)

# 步驟 2：建立應用程式
<a name="app-anom-score-create-app"></a>

在本節建立 Amazon Kinesis Data Analytics 應用程式，如下所示：
+ 設定應用程式輸入，以使用您在 [步驟 1：準備](app-anomaly-prepare.md) 中建立的 Kinesis 資料串流作為串流來源。
+ 使用主控台上的**異常偵測**範本。

**建立應用程式**

1. 按照 Kinesis Data Analytics **入門**練習中的步驟 1、2 和 3 進行操作 (請參閱 [步驟 3.1：建立應用程式](get-started-create-app.md))。
   + 在來源設定中，執行下列動作：
     + 指定您在前一節建立的串流來源。
     + 主控台推斷結構描述後，請編輯結構描述，並將 `heartRate` 欄類型設定為 `INTEGER`。

       大部分的心率值都是正常的，而且探索程序很可能會將 `TINYINT` 類型指派給此資料欄。但是，一小部分的值顯示出高心率。如果這些高數值不符合 `TINYINT` 類型，Kinesis Data Analytics 會將這些資料列傳送至錯誤串流。將資料類型更新為 `INTEGER`，以便容納所有產生的心率資料。
   + 使用主控台上的**異常偵測**範本。然後，您可以更新範本程式碼，以提供適當的資料欄名稱。

1. 提供資料欄名稱以更新應用程式碼。生成的應用程式碼如下所示 (將此代碼貼到 SQL 編輯器中)：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. 在 Kinesis Data Analytics 主控台上執行 SQL 程式碼並檢閱結果：  
![\[主控台螢幕擷取畫面會顯示即時分析標籤，並在應用程式內串流中顯示生成的資料。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**後續步驟**  
[步驟 3：設定應用程式輸出](app-anomaly-create-ka-app-config-destination.md)

# 步驟 3：設定應用程式輸出
<a name="app-anomaly-create-ka-app-config-destination"></a>

完成 [步驟 2：建立應用程式](app-anom-score-create-app.md) 後，將得到應用程式碼，該程式碼會從串流來源讀取心率資料，並為每個來源指派異常分數。

您現在可以將應用程式結果從應用程式內串流傳送到外部目的地，即另一個 Kinesis 資料串流 (`OutputStreamTestingAnomalyScores`)。您可以分析異常分數並確定哪些心率是異常的。然後，您可以進一步擴充此應用程式以產生提醒。

請依照下列步驟設定應用程式輸出：



1. 開啟 Amazon Kinesis Data Analytics 主控台。在 SQL 編輯器中，選擇應用程式儀表板中的**目的地**或**新增目的地**。

1. 在**連線至目的地**頁面，選擇您在前一節建立的 `OutputStreamTestingAnomalyScores` 串流。

   現在您有一個外部目的地，可讓 Amazon Kinesis Data Analytics 將應用程式寫入的任何紀錄保留在應用程式內 `DESTINATION_SQL_STREAM` 串流中。

1. 您可以選擇性地設定 `OutputStreamTestingAnomalyScores` AWS Lambda 來監控串流並傳送提醒給您。如需說明，請參閱[使用 Lambda 函數預處理資料](lambda-preprocessing.md)。如果未設定提醒，您可以檢閱 Kinesis Data Analytics 寫入外部目的地 (Kinesis 資料串流) 的記錄 `OutputStreamTestingAnomalyScores`，如 [步驟 4：驗證輸出](app-anomaly-verify-output.md) 中所述。

**後續步驟**  
[步驟 4：驗證輸出](app-anomaly-verify-output.md)

# 步驟 4：驗證輸出
<a name="app-anomaly-verify-output"></a>

在 [步驟 3：設定應用程式輸出](app-anomaly-create-ka-app-config-destination.md) 設定應用程式輸出後，請使用下列 AWS CLI 命令來讀取應用程式寫入目的地串流的記錄：

1. 運行 `get-shard-iterator` 命令以獲取指向輸出串流資料的指標。

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   您將得到具有碎片迭代器值的回應，如下列範例回應所示：

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   複製碎片迭代器值。

1. 執行 AWS CLI `get-records` 命令。

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   此命令會傳回記錄頁面，以及您可以在後續 `get-records` 命令中使用的另一個碎片迭代器來擷取下一組記錄。