

Nach reiflicher Überlegung haben wir uns entschieden, Amazon Kinesis Data Analytics für SQL-Anwendungen einzustellen:

1. Ab dem **1. September 2025** werden wir keine Bugfixes für Amazon Kinesis Data Analytics for SQL-Anwendungen bereitstellen, da wir aufgrund der bevorstehenden Einstellung nur eingeschränkten Support dafür haben werden.

2. Ab dem **15. Oktober 2025** können Sie keine neuen Kinesis Data Analytics for SQL-Anwendungen mehr erstellen.

3. Wir werden Ihre Anwendungen ab dem **27. Januar 2026** löschen. Sie können Ihre Amazon Kinesis Data Analytics for SQL-Anwendungen nicht starten oder betreiben. Ab diesem Zeitpunkt ist kein Support mehr für Amazon Kinesis Data Analytics for SQL verfügbar. Weitere Informationen finden Sie unter [Einstellung von Amazon Kinesis Data Analytics für SQL-Anwendungen](discontinuation.md).

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Beispiel für die Migration zu einem Managed Service für Apache Flink Studio
<a name="migrating-to-kda-studio-overview"></a>

Nach reiflicher Überlegung haben wir die Entscheidung getroffen, Amazon Kinesis Data Analytics für SQL-Anwendungen einzustellen. Um Ihnen bei der Planung und Migration weg von Amazon Kinesis Data Analytics for SQL-Anwendungen zu helfen, werden wir das Angebot über einen Zeitraum von 15 Monaten schrittweise einstellen. Dies sind wichtige Daten, die es zu beachten gilt: **1. September 2025,** **15. Oktober 2025** und **27. Januar** 2026.

1. Ab dem **1. September 2025** werden wir keine Bugfixes für Amazon Kinesis Data Analytics for SQL-Anwendungen bereitstellen, da wir aufgrund der bevorstehenden Einstellung nur eingeschränkten Support dafür haben werden.

1. Ab dem **15. Oktober 2025** können Sie keine neuen Amazon Kinesis Data Analytics for SQL-Anwendungen mehr erstellen. 

1. Wir werden Ihre Anwendungen ab dem **27. Januar 2026** löschen. Sie können Ihre Amazon Kinesis Data Analytics for SQL-Anwendungen nicht starten oder betreiben. Ab diesem Zeitpunkt ist kein Support mehr für Amazon Kinesis Data Analytics for SQL-Anwendungen verfügbar. Weitere Informationen hierzu finden Sie unter [Einstellung von Amazon Kinesis Data Analytics für SQL-Anwendungen](discontinuation.md).

Wir empfehlen Ihnen, [Amazon Managed Service für Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html) zu verwenden. Es kombiniert Benutzerfreundlichkeit mit fortschrittlichen Analysefunktionen, sodass Sie Anwendungen zur Stream-Verarbeitung in wenigen Minuten erstellen können.

Dieser Abschnitt enthält Code- und Architekturbeispiele, die Ihnen helfen, Ihre Amazon Kinesis Data Analytics for SQL-Anwendungs-Workloads auf Managed Service for Apache Flink zu migrieren.

Weitere Informationen finden Sie auch in diesem [AWS Blogbeitrag: Migration von Amazon Kinesis Data Analytics for SQL Applications zu Managed Service for Apache Flink Studio](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/).

## Replizieren von Kinesis Data Analytics for SQL-Abfragen in Managed Service für Apache Flink Studio
<a name="examples-migrating-to-kda-studio"></a>

Für die Migration Ihrer Workloads auf Managed Service für Apache Flink Studio oder Managed Service für Apache Flink finden Sie in diesem Abschnitt Abfrageübersetzungen, die Sie für allgemeine Anwendungsfälle verwenden können. 

Bevor Sie sich mit diesen Beispielen befassen, empfehlen wir Ihnen, zunächst den Artikel [Verwenden eines Studio-Notebooks mit einem Managed Service für Apache](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) Flink zu lesen. 

### Neuerstellung von Kinesis Data Analytics für SQL-Abfragen in Managed Service für Apache Flink Studio
<a name="examples-recreating-queries"></a>

Die folgenden Optionen bieten Übersetzungen gängiger SQL-basierter Kinesis Data Analytics Analytics-Anwendungsabfragen an Managed Service for Apache Flink Studio. 

#### Mehrschrittige Anwendung
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### DateTime Werte transformieren
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Einfache Warnungen
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### Gedrosselte Warnungen
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### Aggregieren von Teilergebnissen aus einer Abfrage
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### Umwandeln von Zeichenfolgewerten
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Ersetzen einer Teilzeichenfolge mit Regex
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Regex-Protokollanalyse
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### DateTime Werte transformieren
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Fenster und Aggregation
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### Rollierendes Fenster mit Rowtime
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### Die am häufigsten vorkommenden Werte werden abgerufen (TOP\_K\_ITEMS\_TUMBLING)
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### Ungefähre Top-K-Artikel
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### Analysieren von Web-Protokollen (Funktion W3C\_LOG\_PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### Aufteilen von Zeichenfolgen auf mehrere Felder (Funktion VARIABLE\_COLUMN\_LOG\_PARSE)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### Joins
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### Fehler
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## Migration von Random Cut Forest-Workloads
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

Wenn Sie Workloads, die Random Cut Forest verwenden, von Kinesis Analytics für SQL zu Managed Service für Apache Flink verschieben möchten, zeigt dieser [AWS -Blogbeitrag](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/), wie Sie Managed Service für Apache Flink verwenden, um einen Online-RCF-Algorithmus zur Erkennung von Anomalien auszuführen.

## Kinesis Data Firehose als Quelle durch Kinesis Data Streams ersetzen
<a name="examples-firehose"></a>

[Eine vollständige Anleitung finden Sie unter Converting-KDAsql-/. KDAStudio](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)

In der folgenden Übung ändern Sie Ihren Datenfluss, um Amazon-Managed-Service für Apache Flink Studio zu verwenden. Dies bedeutet auch, von Amazon-Kinesis-Data-Firehose zu Amazon-Kinesis-Data-Streams zu wechseln.

Zunächst stellen wir eine typische KDA-SQL-Architektur vor, bevor wir zeigen, wie Sie diese mithilfe von Amazon-Managed-Service für Apache Flink Studio und Amazon-Kinesis-Data-Streams ersetzen können. [Alternativ können Sie die Vorlage auch hier starten: CloudFormation](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)

### Amazon-Kinesis-Data-Analytics-SQL und Amazon-Kinesis-Data-Firehose
<a name="examples-firehose-legacy-setup"></a>

Hier ist der SQL-Architekturfluss von Amazon-Kinesis-Data-Analytics: 

![](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/legacy-sql.png)


Wir untersuchen zunächst die Einrichtung von Amazon-Kinesis-Data-Analytics-SQL und Amazon-Kinesis-Data-Firehose. Der Anwendungsfall ist ein Handelsmarkt, auf dem Handelsdaten, einschließlich Börsenticker und Preise, aus externen Quellen in Amazon-Kinesis-Systeme gestreamt werden. Amazon Kinesis Data Analytics for SQL verwendet den Input-Stream, um Fensterabfragen wie Tumbling Window auszuführen, um das Handelsvolumen und den`min`,`max`, und `average` Handelspreis über ein einminütiges Fenster für jeden Börsenticker zu ermitteln.  

Amazon-Kinesis-Data-Analytics-SQL ist so eingerichtet, dass es Daten aus der Amazon-Kinesis-Data-Firehose-API aufnimmt. Nach der Verarbeitung sendet Amazon-Kinesis-Data-Analytics-SQL die verarbeiteten Daten an eine andere Amazon-Kinesis-Data-Firehose, die dann die Ausgabe in einem Amazon-S3-Bucket speichert. 

In diesem Fall verwenden Sie den Amazon-Kinesis-Datengenerator. Mit dem Amazon-Kinesis-Datengenerator können Sie Testdaten an Ihre Amazon-Kinesis-Data-Streams oder Amazon-Kinesis-Data-Firehose-Bereitstellungsstreams senden. [Um zu beginnen, folgen Sie den Anweisungen hier.](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) Verwenden Sie [hier](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) die CloudFormation Vorlage anstelle der in der [Anleitung angegebenen Vorlage:](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

Sobald Sie die CloudFormation Vorlage ausgeführt haben, enthält der Ausgabebereich die Amazon Kinesis Data Generator-URL. Melden Sie sich mit der Cognito-Benutzer-ID und dem Passwort, die Sie [hier](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) eingerichtet haben, beim Portal an. Wählen Sie die Region und den Namen des Zielstreams aus. Wählen Sie für den aktuellen Status die Bereitstellungsstreams von Amazon-Kinesis-Data-Firehose. Wählen Sie für den neuen Status den Namen des Amazon-Kinesis-Data-Firehose Streams. Sie können je nach Ihren Anforderungen mehrere Vorlagen erstellen und die Vorlage mithilfe der Schaltfläche **Vorlage testen** ausprobieren, bevor Sie sie an den Ziel-Stream senden.

Im Folgenden finden Sie ein Beispiel für eine Nutzlast mit Amazon-Kinesis-Datengenerator. Der Datengenerator zielt darauf ab, die eingegebenen Amazon-Kinesis-Firehose-Streams kontinuierlich zu streamen. Der Amazon-Kinesis-SDK-Client kann auch Daten von anderen Produzenten senden. 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

Der folgende JSON-Code wird verwendet, um eine zufällige Reihe von Handelszeiten und -daten, Börsentickerdaten und Aktienkursen zu generieren:

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

Sobald Sie **Daten senden** auswählen, beginnt der Generator mit dem Senden von Mock-Daten.

Externe Systeme streamen die Daten an Amazon-Kinesis-Data-Firehose. Mit Amazon-Kinesis-Data-Analytics for SQL-Anwendungen können Sie Streaming-Daten mithilfe von Standard-SQL analysieren. Der Service ermöglicht die Erstellung und Ausführung von SQL-Code für Streaming-Quellen zum Durchführen von Zeitreihenanalysen, Füllen von Echtzeit-Dashboards und Erstellen von Echtzeitmetriken. Amazon-Kinesis-Data-Analytics for SQL-Anwendungen könnte einen Ziel-Stream aus SQL-Abfragen im Eingabe-Stream erstellen und den Ziel-Stream an eine andere Amazon-Kinesis-Data-Firehose senden. Das Ziel Amazon-Kinesis-Data-Firehose könnte die Analysedaten als Endzustand an Amazon-S3 senden.

Der Legacy-Code von Amazon-Kinesis-Data-Analytics-SQL basiert auf einer Erweiterung des SQL-Standards. 

Sie verwenden die folgende Abfrage in Amazon-Kinesis-Data-Analytics-SQL. Sie erstellen zunächst einen Ziel-Stream für die Abfrageausgabe. Dann verwenden Sie `PUMP`, ein Amazon-Kinesis-Data-Analytics-Repository-Objekt (eine Erweiterung des SQL-Standards), das eine kontinuierlich ablaufende `INSERT INTO stream SELECT ... FROM`-Abfragefunktion bietet und so die kontinuierliche Eingabe der Ergebnisse einer Abfrage in einen benannten Stream ermöglicht. 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

Das vorangegangene SQL verwendet zwei Zeitfenster: `tradeTimestamp` das Zeitfenster stammt aus der Payload des eingehenden Streams und `ROWTIME.tradeTimestamp` wird auch oder genannt`Event Time`. `client-side time` Häufig ist es wünschenswert, diese Zeit in Analysen zu verwenden, da dies die Zeit ist, zu der ein Ereignis aufgetreten ist. Zahlreiche Ereignisquellen, wie Smartphones und Web-Clients, besitzen jedoch keine zuverlässigen Uhren, was zu ungenauen Zeiten führen kann. Zusätzlich können Konnektivitätsprobleme dazu führen, dass Datensätze in einem Stream nicht in der gleichen Reihenfolge angezeigt werden, in der sie aufgetreten sind. 

In-App-Streams enthalten außerdem eine spezielle Spalte namens `ROWTIME`. In dieser wird ein Zeitstempel gespeichert, wenn Amazon-Kinesis-Data-Analytics eine Zeile in den ersten In-Application-Stream einfügt. `ROWTIME` spiegelt den Zeitstempel wider, zu dem Amazon-Kinesis-Data-Analytics nach dem Lesen aus der Streaming-Quelle einen Datensatz in den ersten In-Application-Stream eingefügt hat. Dieser `ROWTIME`-Wert wird anschließend in der gesamten Anwendung beibehalten. 

Das SQL bestimmt die Anzahl der Ticker als`volume`, `min``max`, und den `average` Preis über ein 60-Sekunden-Intervall. 

Die Verwendung dieser Zeiten in Abfragen mit Fenstern auf Zeitbasis hat Vor- und Nachteile. Wählen Sie eine oder mehrere dieser Zeiten und entwickeln Sie eine Strategie für den Umgang mit den relevanten Nachteilen, abhängig von Ihrem Anwendungsfall. 

Eine Zwei-Fenster-Strategie mit zwei zeitbasierten Fenster verwendet sowohl `ROWTIME` als auch eine der beiden anderen Zeiten, beispielsweise die Ereigniszeit.
+ Sie sollten `ROWTIME` als erstes Fenster verwenden, das die Häufigkeit steuert, mit der die Abfrage die Ergebnisse ausgibt, wie im folgenden Beispiel gezeigt. Sie wird nicht als logische Zeit verwendet. 
+ Sie sollten eine der beiden anderen Zeiten als logische Zeit verwenden, um sie mit Ihren Analysen zu verknüpfen. Diese Zeit stellt den Zeitpunkt dar, zu dem das Ereignis aufgetreten ist. Im folgenden Beispiel besteht das Ziel der Analyse darin, die Datensätze zu gruppieren und eine Zahl nach Ticker zurückzugeben. 

### Amazon-Managed-Service für Apache Flink 
<a name="examples-studio"></a>

In der aktualisierten Architektur ersetzen Sie Amazon-Kinesis-Data-Firehose durch Amazon-Kinesis-Data-Streams . Amazon-Kinesis-Data-Analytics for SQL-Anwendungen wird durch Amazon-Managed-Service für Apache Flink Studio ersetzt. Apache Flink-Code wird interaktiv in einem Apache Zeppelin-Notebook ausgeführt. Amazon-Managed-Service für Apache Flink Studio sendet die aggregierten Handelsdaten an einen Amazon-S3-Bucket, um sie zu speichern. Die Schritte werden im Folgenden dargestellt:

Dies ist der Architekturfluss von Amazon-Managed-Service für Apache Flink Studio:

![](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/kda-studio.png)


#### Erstellen Sie einen Kinesis Data Stream
<a name="examples-studio-create-data-stream"></a>

**So erstellen Sie einen Datenstrom mit der Konsole**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Erweitern Sie in der Navigationsleiste die Regionsauswahl und wählen Sie eine Region aus.

1. Klicken Sie auf **Create data stream (Daten-Stream erstellen)**.

1. Geben Sie auf der Seite **Kinesis-Stream erstellen** einen Namen für Ihren Datenstrom ein und wählen Sie dann den standardmäßigen Kapazitätsmodus **On-Demand**. 

   Im Modus **On-Demand** können Sie dann **Kinesis-Stream erstellen** wählen, um Ihren Datenstrom zu erstellen. 

   Auf der Seite **Kinesis streams (Kinesis-Streams)** wird für den Wert **Status** des Streams **Creating (Erstellen)** angezeigt, während der Stream erstellt wird. Sobald der Stream verwendet werden kann, ändert sich der Wert von **Status** in **Active (Aktiv)**.

1. Wählen Sie den Namen des Streams aus. Auf der Seite **Stream Details (Stream-Details)** wird eine Zusammenfassung der Stream-Konfiguration zusammen mit Überwachungsinformationen angezeigt.

1. **Ändern Sie im Amazon Kinesis Data Generator den Stream/delivery Stream in den neuen Amazon Kinesis Data Streams: TRADE\_SOURCE\_STREAM.**

   JSON und Nutzlast entsprechen denen, die Sie für Amazon-Kinesis-Data-Analytics-SQL verwendet haben. Verwenden Sie den Amazon-Kinesis-Datengenerator, um einige Beispiele für Handelsnutzdaten zu erstellen und verwenden Sie den **TRADE\_SOURCE\_STREAM**-Datenstrom als Ziel für diese Übung: 

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1.  AWS-Managementkonsole **Gehen Sie zu Managed Service for Apache Flink und wählen Sie dann Create application.**

1. Wählen Sie im Navigationsbereich links **Studio-Notebooks** aus und wählen Sie dann **Studio-Notebook erstellen**.

1. Geben Sie einen Namen für das Studio-Notebook ein.

1. Geben Sie unter **AWS -Glue-Datenbank** eine bestehende AWS Glue -Datenbank an, die die Metadaten für Ihre Quellen und Ziele definiert. Wenn Sie keine AWS Glue Datenbank haben, wählen Sie **Create** und gehen Sie wie folgt vor:

   1. Wählen Sie in der AWS Glue-Konsole im Menü auf der linken Seite unter **Datenkatalog** die Option **Datenbanken** aus.

   1. Wählen Sie **Datenbank erstellen** aus

   1. Geben Sie auf der Seite **Datenbank erstellen** einen Namen für die Datenbank ein. Wählen Sie im Abschnitt **Standort – optional** **Amazon-S3 durchsuchen** und dann den Amazon-S3-Bucket aus. Wenn noch keinen Amazon-S3-Bucket eingerichtet haben, können Sie diesen Schritt überspringen und später dazu zurückkehren.

   1. (Optional). Geben Sie eine Beschreibung für die Datenbank ein.

   1. Wählen Sie **Datenbank erstellen** aus.

1. Wählen Sie **Notebook erstellen** aus.

1. Sobald Ihr Notebook erstellt ist, wählen Sie **Ausführen**.

1. Sobald das Notebook erfolgreich gestartet wurde, starten Sie ein Zeppelin-Notebook, indem Sie **In Apache Zeppelin öffnen** wählen.

1. Wählen Sie auf der Seite Zeppelin-Notizbuch die Option Neue Notiz **erstellen** und geben Sie ihr einen Namen. *MarketDataFeed*

Der Flink-SQL-Code wird im Folgenden erklärt, aber zuerst einmal [sieht ein Zeppelin-Notebook-Bildschirm so aus](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg). Jedes Fenster im Notebook ist ein separater Codeblock und diese können einzeln ausgeführt werden.

##### Code bei Amazon-Managed-Service für Apache Flink
<a name="examples-studio-code"></a>

Amazon-Managed-Service für Apache Flink Studio verwendet Zeppelin Notebooks, um den Code auszuführen. In diesem Beispiel erfolgt die Zuordnung zum SSQL-Code, der auf Apache Flink 1.13 basiert. Der Code im Zeppelin-Notizbuch wird im Folgenden, Block für Block, angezeigt.  

Bevor Sie Code in Ihrem Zeppelin Notebook ausführen, müssen die Flink-Konfigurationsbefehle ausgeführt werden. Wenn Sie nach dem Ausführen von Code (ssql, Python oder Scala) eine Konfigurationseinstellung ändern müssen, müssen Sie Ihr Notebook beenden und neu starten. In diesem Beispiel müssen Sie Checkpointing einrichten. Checkpointing ist erforderlich, damit Sie in Amazon-S3 Daten in eine Datei streamen können. Dadurch können Daten, die zu Amazon-S3 gestreamt werden, in eine Datei geleitet werden. Die folgende Anweisung legt das Intervall auf 5000 Millisekunden fest.  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf` gibt an, dass es sich bei diesem Block um Konfigurationsanweisungen handelt. [Weitere Informationen zur Flink-Konfiguration einschließlich Checkpointing finden Sie unter Apache Flink Checkpointing.](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/)  

Die Eingabetabelle für die Quelle Amazon Kinesis Data Streams wird mit dem folgenden Flink-SSQL-Code erstellt. Beachten Sie, dass das `TRADE_TIME` Feld die vom Datengenerator date/time erstellten Daten speichert.

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

Sie können den Eingabestream mit dieser Anweisung anzeigen:

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

Bevor Sie die aggregierten Daten an Amazon-S3 senden, können Sie sie direkt in Amazon-Managed-Service für Apache Flink Studio mit einer Auswahlabfrage im rollierenden Fenster anzeigen. Dadurch werden die Handelsdaten in einem Zeitfenster von einer Minute zusammengefasst. Beachten Sie, dass die %flink.ssql-Anweisung eine Bezeichnung (type=update) haben muss:

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Sie können dann in Amazon-S3 eine Tabelle für das Ziel erstellen. Sie müssen ein Wasserzeichen verwenden. Ein Wasserzeichen ist eine Fortschrittsmetrik, die einen Zeitpunkt angibt, zu dem Sie sicher sind, dass keine verzögerten Ereignisse mehr eintreten werden. Das Wasserzeichen wird benötigt, damit verspätete Ankünfte berücksichtigt werden. Das Intervall von `‘5’ Second` ermöglicht es Handelsaktionen mit 5-sekündiger Verspätung in den Amazon-Kinesis Data Stream einzutreten und trotzdem aufgenommen zu werden, wenn sie einen Zeitstempel haben, der innerhalb des Fensters liegt. Weitere Informationen finden Sie unter [Wasserzeichen generieren](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/).    

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

Diese Anweisung fügt die Daten in die `TRADE_DESTINATION_S3` ein. `TUMPLE_ROWTIME` ist der Zeitstempel der inklusiven Obergrenze des rollierenden Fensters.

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

Lassen Sie Ihre Anweisung 10 bis 20 Minuten lang laufen, um einige Daten in Amazon-S3 zu sammeln. Brechen Sie dann Ihre Anweisung ab. 

Dadurch wird die Datei in Amazon-S3 geschlossen, sodass sie angesehen werden kann. 

So sieht der Inhalt aus: 

![](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


Sie können die [CloudFormation -Vorlage](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) verwenden, um die Infrastruktur zu erstellen. 

CloudFormation erstellt die folgenden Ressourcen in Ihrem AWS Konto:
+  Amazon-Kinesis-Data-Streams 
+  Amazon-Managed-Service für Apache Flink
+  AWS Glue Datenbank
+  Amazon-S3-Bucket
+  IAM-Rollen und Richtlinien für den Zugriff auf geeignete Ressourcen durch Amazon-Managed-Service für Apache Flink Studio

Importieren Sie das Notizbuch und ändern Sie den Namen des Amazon S3 S3-Buckets durch den neuen Amazon S3 S3-Bucket, der von erstellt wurde CloudFormation. 

![](http://docs.aws.amazon.com/de_de/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### Weitere Informationen
<a name="more"></a>

Im Folgenden finden Sie einige zusätzliche Ressourcen, mit denen Sie mehr über die Verwendung von Managed Service für Apache Flink Studio erfahren können: 
+ [Entwicklerhandbuch für Managed Service für Apache Flink Studio-Notebooks](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Dokumentation für Apache Flink 1.13](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Workshop zu Managed Service für Apache Flink Studio](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Apache Flink Windowing](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Entwicklerhandbuch für Amazon-Kinesis-Data-Analytics – Aus einem Kinesis Data Analytics Stream in einen S3 Bucket schreiben](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 