

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# 例: ストリームでデータの異常を検出する (RANDOM\$1CUT\$1FOREST 関数)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics では、数値列の値に基づいて異常スコアを各レコードに割り当てる関数 (`RANDOM_CUT_FOREST`) を提供しています。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「[`RANDOM_CUT_FOREST` Function](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)」を参照してください。

この実習では、アプリケーションのストリーミングソースのレコードに異常スコアを割り当てるアプリケーションコードを作成します。アプリケーションをセットアップするには、以下を実行します。

1. **ストリーミングソースのセットアップ** – Kinesis データストリームをセットアップして、次のようにサンプル `heartRate` データを書き込みます。

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   この手順では、ストリームに入力するための Python スクリプトを提供しています。`heartRate` 値はランダムに生成されます。レコードの 99 パーセントは `heartRate` 値が 60 から 100 の間で、`heartRate` 値の 1 パーセントのみが 150 から 200 の間です。したがって、`heartRate` 値が 150 から 200 の間のレコードは異常です。

1. **入力の設定** – コンソールを使用して、Kinesis Data Analytics アプリケーションを作成し、ストリーミングソースをアプリケーション内ストリーム (`SOURCE_SQL_STREAM_001`) にマッピングすることでアプリケーション入力を設定します。アプリケーションが起動すると、Kinesis Data Analytics は継続的にストリーミングソースを読み取り、アプリケーション内ストリームにレコードを挿入します。

1. **アプリケーションコードの指定** – この例では、次のアプリケーションコードを使用します。

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   コードは `SOURCE_SQL_STREAM_001` の行を読み取り、異常スコアを割り当て、結果の行を別のアプリケーション内ストリーム (`TEMP_STREAM`) に書き込みます。次に、アプリケーションコードは`TEMP_STREAM` のレコードをソートして、結果を別のアプリケーション内ストリーム (`DESTINATION_SQL_STREAM`) に保存します。ポンプを使用して、アプリケーション内ストリームに行を挿入します。詳細については、「[アプリケーション内ストリームとポンプ](streams-pumps.md)」を参照してください。

1. **出力の設定** – `DESTINATION_SQL_STREAM` のデータを永続化して、別の Kinesis データストリームの外部宛先に書き込むようにアプリケーション出力を設定できます。各レコードに割り当てられた異常スコアを確認して、どのスコアが異常を示しているか (また、アラートが必要か) を調べるのは、アプリケーションの範囲外です。 AWS Lambda 関数を使用して、これらの異常スコアを処理し、アラートを設定できます。

この実習では、米国東部 (バージニア北部) (`us-east-1`) を使用して、これらのストリームとアプリケーションを作成します。他のリージョンも使用する場合は、それに応じてコードを更新する必要があります。

**Topics**
+ [ステップ 1: 準備](app-anomaly-prepare.md)
+ [ステップ 2: アプリケーションの作成](app-anom-score-create-app.md)
+ [ステップ 3: アプリケーション出力を設定する](app-anomaly-create-ka-app-config-destination.md)
+ [ステップ 4: 出力の確認](app-anomaly-verify-output.md)

**次のステップ**  
[ステップ 1: 準備](app-anomaly-prepare.md)