

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Managed Service for Apache Flink의 Studio 노트북 예제 및 자습서
<a name="how-zeppelin-examples"></a>

**Topics**
+ [자습서: Managed Service for Apache Flink에서 Studio 노트북 생성](example-notebook.md)
+ [자습서: 지속 가능한 상태를 사용하는 Managed Service for Apache Flink 애플리케이션으로 Studio 노트북 배포](example-notebook-deploy.md)
+ [Studio 노트북에서 데이터를 분석하기 위한 예제 쿼리 보기](how-zeppelin-sql-examples.md)

# 자습서: Managed Service for Apache Flink에서 Studio 노트북 생성
<a name="example-notebook"></a>

다음 자습서는 Kinesis 데이터 스트림 또는 Amazon MSK 클러스터에서 데이터를 읽는 Studio 노트북을 생성하는 방법을 보여줍니다.

**Topics**
+ [사전 조건 완료](#example-notebook-setup)
+ [AWS Glue 데이터베이스 생성](#example-notebook-glue)
+ [다음 단계: Kinesis Data Streams 또는 Amazon MSK를 사용한 Studio 노트북 생성](#examples-notebook-nextsteps)
+ [Kinesis Data Streams로 Studio 노트북 생성](example-notebook-streams.md)
+ [Amazon MSK로 Studio 노트북 생성](example-notebook-msk.md)
+ [애플리케이션 및 종속 리소스 정리](example-notebook-cleanup.md)

## 사전 조건 완료
<a name="example-notebook-setup"></a>

 AWS CLI 가 버전 2 이상인지 확인합니다. 최신 버전을 설치하려면 버전 2 설치, 업데이트 및 제거를 AWS CLI참조하세요. [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) 

## AWS Glue 데이터베이스 생성
<a name="example-notebook-glue"></a>

Studio 노트북은 Amazon MSK 데이터 소스에 대한 메타데이터용 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 데이터베이스를 사용합니다.

**AWS Glue 데이터베이스 생성**

1. [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. **데이터베이스 추가(Add database)**를 선택합니다. **데이터베이스 추가** 창에서 **데이터베이스 이름**을 **default**(으)로 입력합니다. **생성(Create)**을 선택합니다.

## 다음 단계: Kinesis Data Streams 또는 Amazon MSK를 사용한 Studio 노트북 생성
<a name="examples-notebook-nextsteps"></a>

이 자습서에서는 Kinesis Data Streams 또는 Amazon MSK를 사용하는 Studio 노트북을 만들 수 있습니다.
+ [Kinesis Data Streams로 Studio 노트북 생성](example-notebook-streams.md): Kinesis Data Streams 를 사용하면 Kinesis 데이터 스트림을 소스로 사용하는 애플리케이션을 빠르게 생성할 수 있습니다. Kinesis 데이터 스트림을 종속 리소스로 생성하기만 하면 됩니다.
+ [Amazon MSK로 Studio 노트북 생성](example-notebook-msk.md): Amazon MSK를 사용하면 Amazon MSK 클러스터를 소스로 사용하는 애플리케이션을 생성할 수 있습니다. Amazon VPC, Amazon EC2 클라이언트 인스턴스, Amazon MSK 클러스터를 종속 리소스로 생성해야 합니다.

# Kinesis Data Streams로 Studio 노트북 생성
<a name="example-notebook-streams"></a>

이 자습서에서는 Kinesis 데이터 스트림을 소스로 사용하는 Studio 노트북을 생성하는 방법을 설명합니다.

**Topics**
+ [사전 조건 완료](#example-notebook-streams-setup)
+ [AWS Glue 테이블 생성](#example-notebook-streams-glue)
+ [Kinesis Data Streams로 Studio 노트북 생성](#example-notebook-streams-create)
+ [Kinesis 데이터 스트림으로 데이터 전송](#example-notebook-streams-send)
+ [Studio 노트북을 테스트](#example-notebook-streams-test)

## 사전 조건 완료
<a name="example-notebook-streams-setup"></a>

Studio 노트북을 생성하기 전에 Kinesis 데이터 스트림(`ExampleInputStream`)을 생성하세요. 애플리케이션은 이 스트림을 애플리케이션 소스로 사용합니다.

Amazon Kinesis 콘솔 또는 다음 AWS CLI 명령을 사용하여 이러한 스트림을 만들 수 있습니다. 콘솔 지침은 *Amazon Kinesis Data Streams 개발자 가이드*의 [데이터 스트림 생성 및 업데이트](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)를 참조하세요. 스트림의 이름을 **ExampleInputStream**(으)로 지정하고 **열린 샤드 수**를 **1**(으)로 설정합니다.

를 사용하여 스트림(`ExampleInputStream`)을 생성하려면 다음 Amazon Kinesis `create-stream` AWS CLI 명령을 AWS CLI사용합니다.

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## AWS Glue 테이블 생성
<a name="example-notebook-streams-glue"></a>

Studio 노트북은 Kinesis Data Streams 데이터 소스에 대한 메타데이터용 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 데이터베이스를 사용합니다.

**참고**  
먼저 데이터베이스를 수동으로 생성하거나 노트북을 만들 때 Managed Service for Apache Flink가 자동으로 데이터베이스를 생성하도록 할 수 있습니다. 마찬가지로 이 섹션에 설명된 대로 테이블을 수동으로 생성하거나 Apache Zeppelin 내에서 노트북의 Managed Service for Apache Flink용 테이블 커넥터 코드 생성을 사용하여 DDL 문을 통해 테이블을 생성할 수 있습니다. 그런 다음 테이블 AWS Glue 이 올바르게 생성되었는지 확인할 수 있습니다.

**표 생성**

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. 아직 AWS Glue 데이터베이스가 없는 경우 왼쪽 탐색 모음에서 **데이터베이스를** 선택합니다. **데이터베이스 추가**를 선택합니다. **데이터베이스 추가** 창에서 **데이터베이스 이름**을 **default**(으)로 입력합니다. **생성(Create)**을 선택합니다.

1. 왼쪽 탐색 모음에서 **표**를 선택합니다. **표** 페이지에서 **표 추가**, **표 수동 추가**를 선택합니다.

1. **표 속성 설정** 페이지에서 **표 명칭**에 **stock**을 입력합니다. 이전에 만든 데이터베이스를 선택했는지 확인하세요. **다음**을 선택합니다.

1. **데이터 스토어 추가** 페이지에서 **Kinesis**를 선택합니다. **스트림 이름**에는 **ExampleInputStream**을(를) 입력합니다. **Kinesis 소스 URL**의 경우 **https://kinesis.us-east-1.amazonaws.com** 입력을 선택합니다. **Kinesis 소스 URL**을 복사하여 붙여넣는 경우 선행 또는 후행 공백을 모두 삭제해야 합니다. **Next**(다음)를 선택합니다.

1. **분류** 페이지에서 **JSON**을 선택합니다. **Next**(다음)를 선택합니다.

1. **스키마 정의** 페이지에서 열 추가를 선택하여 열을 추가합니다. 다음 속성을 가진 열을 추가합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/example-notebook-streams.html)

   **Next**(다음)를 선택합니다.

1. 다음 페이지에서 설정을 확인하고 **마침**을 선택합니다.

1. 테이블 목록에서 새로 생성한 테이블을 선택합니다.

1. **테이블 편집**을 선택하고 `managed-flink.proctime` 키와 `proctime` 값이 있는 속성을 추가합니다.

1. **적용**을 선택합니다.

## Kinesis Data Streams로 Studio 노트북 생성
<a name="example-notebook-streams-create"></a>

애플리케이션에서 사용하는 리소스를 생성했으니 이제 Studio 노트북을 생성합니다.

**Topics**
+ [를 사용하여 Studio 노트북 생성 AWS Management Console](#example-notebook-create-streams-console)
+ [를 사용하여 Studio 노트북 생성 AWS CLI](#example-notebook-msk-create-api)

### 를 사용하여 Studio 노트북 생성 AWS Management Console
<a name="example-notebook-create-streams-console"></a>

1. [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **Managed Service for Apache Flink 애플리케이션** 페이지에서 **Studio** 탭을 선택합니다. **Studio 노트북 생성**을 선택합니다.
**참고**  
입력한 Amazon MSK 클러스터 또는 Kinesis 데이터 스트림을 선택하고 **실시간 데이터 처리**를 선택하여 Amazon MSK 또는 Kinesis Data Streams 콘솔에서 Studio 노트북을 생성할 수도 있습니다.

1. **Studio 노트북 생성** 페이지에서 다음 정보를 입력합니다.
   + 노트북 이름을 **MyNotebook**(으)로 입력합니다.
   + **AWS Glue 데이터베이스**의 **기본값**을 선택합니다.

   **Studio 노트북 생성**을 선택합니다.

1. **MyNotebook** 페이지에서 **실행**을 선택합니다. **상태**가 **실행 중**으로 표시될 때까지 기다리세요. 노트북이 실행 중일 때는 요금이 부과됩니다.

### 를 사용하여 Studio 노트북 생성 AWS CLI
<a name="example-notebook-msk-create-api"></a>

를 사용하여 Studio 노트북을 생성하려면 다음을 AWS CLI수행합니다.

1. 계정 ID를 확인합니다. 애플리케이션을 생성하려면 이 값을 사용합니다.

1. `arn:aws:iam::AccountID:role/ZeppelinRole` 역할을 생성하고 콘솔에서 자동 생성된 역할에 다음 권한을 추가합니다.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. 다음 콘텐츠를 가진 `create.json`이라는 파일을 생성합니다: 자리 표시자 값을 정보로 바꿉니다.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 애플리케이션을 생성하려면 다음 명령을 실행합니다.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 명령이 완료되면 새 Studio 노트북의 세부 정보를 보여주는 출력이 표시됩니다. 다음은 출력의 예제입니다.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 애플리케이션을 시작하려면 다음 명령을 실행합니다. 샘플 값을 계정 ID로 바꿉니다.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Kinesis 데이터 스트림으로 데이터 전송
<a name="example-notebook-streams-send"></a>

Kinesis 데이터 스트림으로 테스트 데이터를 보내려면 다음을 수행하세요.

1. [Kinesis 데이터 제너레이터(KDG)](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)를 엽니다.

1. **CloudFormation으로 Cognito 사용자 만들기**를 선택합니다.

1.  CloudFormation 콘솔이 Kinesis Data Generator 템플릿과 함께 열립니다. **Next**(다음)를 선택합니다.

1. **스택 세부 정보 지정** 페이지에서 Cognito 사용자의 사용자 이름 및 암호를 입력합니다. **Next**(다음)를 선택합니다.

1. **스택 옵션 구성** 페이지에서 **다음**을 선택합니다.

1. **Kinesis-Data-Generator-Cognito-User 검토** 페이지에서 ** AWS CloudFormation이 IAM 리소스를 생성할 수 있음을 승인합니다.** 확인란을 선택합니다. **스택 생성**을 선택합니다.

1.  CloudFormation 스택 생성이 완료될 때까지 기다립니다. 스택이 완료되면 CloudFormation 콘솔에서 **Kinesis-Data-Generator-Cognito-User** 스택을 열고 **출력** 탭을 선택합니다. **KinesisDataGeneratorUrl** 출력 값에 대해 나열된 URL을 엽니다.

1. **Amazon Kinesis Data Generator** 페이지에서 4단계에서 생성한 보안 인증을 사용하여 로그인합니다.

1. 다음 페이지에서 다음 값을 입력합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/example-notebook-streams.html)

   **레코드 템플릿**의 경우 다음 코드를 붙여넣습니다.

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. **데이터 보내기**를 선택합니다.

1. 생성기가 데이터를 Kinesis 데이터 스트림으로 전송합니다.

   다음 섹션을 완료하는 동안 생성기를 계속 실행하세요.

## Studio 노트북을 테스트
<a name="example-notebook-streams-test"></a>

이 섹션에서는 Studio 노트북을 사용하여 Kinesis 데이터 스트림의 데이터를 쿼리합니다.

1. [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **Managed Service for Apache Flink 애플리케이션** 페이지에서 **Studio 노트북** 탭을 선택합니다. **MyNotebook**을 선택합니다.

1. **MyNotebook** 페이지에서 **Apache Zeppelin에서 열기**를 선택합니다.

   Apache Zeppelin 인터페이스가 새 탭에서 열립니다.

1. **제플린에 오신 것을 환영합니다\$1** 페이지에서 **Zeppelin Note**를 선택하세요.

1. **Zeppelin 노트** 페이지에서 새로운 노트에 다음 쿼리를 입력합니다.

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   실행 아이콘을 선택합니다.

   잠시 후 노트에 Kinesis 데이터 스트림의 데이터가 표시됩니다.

애플리케이션에서 작동 측면을 볼 수 있도록 Apache Flink 대시보드를 열려면 **FLINK JOB**을 선택합니다. Flink 대시보드에 대한 자세한 내용을 알아보려면 [Managed Service for Apache Flink 개발자 가이드](https://docs.aws.amazon.com/)의 [Apache Flink 대시보드](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)를 참조하세요.

Flink Streaming SQL 쿼리의 더 많은 예는 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)를 참조하세요.

# Amazon MSK로 Studio 노트북 생성
<a name="example-notebook-msk"></a>

이 자습서에서는 소스로 Amazon MSK 클러스터를 사용하는 Studio 노트북을 생성하는 방법을 설명합니다.

**Topics**
+ [Amazon MSK 클러스터 설정](#example-notebook-msk-setup)
+ [VPC에 NAT 게이트웨이 추가](#example-notebook-msk-nat)
+ [AWS Glue 연결 및 테이블 생성](#example-notebook-msk-glue)
+ [Amazon MSK로 Studio 노트북 생성](#example-notebook-msk-create)
+ [Amazon MSK 클러스터로 데이터 전송](#example-notebook-msk-send)
+ [Studio 노트북을 테스트](#example-notebook-msk-test)

## Amazon MSK 클러스터 설정
<a name="example-notebook-msk-setup"></a>

이 자습서에서는 일반 텍스트 액세스를 허용하는 Amazon MSK 클러스터가 필요합니다. Amazon MSK 클러스터를 아직 설정하지 않은 경우, [Amazon MSK 사용 시작하기](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) 자습서를 따라 Amazon VPC, Amazon MSK 클러스터, 주제 및 Amazon EC2 클라이언트 인스턴스를 생성하세요.

자습서에 따라 다음을 수행하십시오:
+ [3단계: Amazon MSK 클러스터 생성](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)의 4단계에서 `ClientBroker` 값을 `TLS`에서 **PLAINTEXT**로 변경합니다.

## VPC에 NAT 게이트웨이 추가
<a name="example-notebook-msk-nat"></a>

[Amazon MSK 사용 시작하기](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) 자습서에 따라 Amazon MSK 클러스터를 생성했거나 기존 Amazon VPC에 프라이빗 서브넷용 NAT 게이트웨이가 아직 없는 경우, Amazon VPC에 NAT 게이트웨이를 추가해야 합니다. 다음 다이어그램은 아키텍처입니다.

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/images/vpc_05.png)


Amazon VPC용 NAT 게이트웨이를 생성하려면 다음을 수행합니다.

1. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)에서 Amazon VPC 콘솔을 엽니다.

1. 왼쪽 탐색 모음에서 **NAT 게이트웨이**를 선택합니다.

1. **NAT 게이트웨이** 페이지에서 **NAT 게이트웨이 생성**을 선택합니다.

1. **NAT 게이트웨이 생성** 페이지에서 다음 값을 입력합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/example-notebook-msk.html)

   **NAT 게이트웨이 생성**을 선택합니다.

1. 왼쪽 탐색 모음에서 **경로 표**를 선택합니다.

1. [**Create Route Table**]을 선택합니다.

1. **경로 표 생성** 페이지에서 다음 정보를 제공합니다.
   + **Name tag:** **ZeppelinRouteTable**
   + **VPC**: VPC(예: **AWS KafkaTutorialVPC**)를 선택합니다.

   **생성(Create)**을 선택합니다.

1. 경로 표 목록에서 **ZeppelinRouteTable**을 선택합니다. **경로** 탭에서 **경로 편집**을 선택합니다.

1. **경로 편집** 페이지에서 **경로 추가**를 선택합니다.

1. ****에서 **대상 주소**에 **0.0.0.0/0**을 입력합니다. **타겟**에서 **NAT 게이트웨이**, **ZeppelinGateway**를 선택합니다. **경로 저장**을 선택합니다. **닫기**를 선택하세요.

1. 경로 표 페이지에서 **ZeppelinRouteTable**을 선택한 상태에서 **서브넷 연결** 탭을 선택합니다. **서브넷 연결 편집**을 선택합니다.

1. **서브넷 연결 편집** 페이지에서 **AWS KafkaTutorialSubnet2**와 **AWS KafkaTutorialSubnet3**을 선택합니다. **저장(Save)**을 선택합니다.

## AWS Glue 연결 및 테이블 생성
<a name="example-notebook-msk-glue"></a>

Studio 노트북은 Amazon MSK 데이터 소스에 대한 메타데이터용 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) 데이터베이스를 사용합니다. 이 섹션에서는 Amazon MSK 클러스터에 액세스하는 방법을 설명하는 AWS Glue 연결과 Studio 노트북과 같은 클라이언트에 데이터 소스의 데이터를 제공하는 방법을 설명하는 AWS Glue 테이블을 생성합니다.

**연결 생성**

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. 아직 AWS Glue 데이터베이스가 없는 경우 왼쪽 탐색 모음에서 **데이터베이스를** 선택합니다. **데이터베이스 추가**를 선택합니다. **데이터베이스 추가** 창에서 **데이터베이스 이름**을 **default**(으)로 입력합니다. **생성(Create)**을 선택합니다.

1. 왼쪽 탐색 모음에서 **연결**을 선택합니다. **연결 추가**를 선택합니다.

1. **연결 추가** 창에서 다음 값을 입력합니다.
   + **연결 명칭**에 **ZeppelinConnection**을 입력합니다.
   + **연결 유형**에서 **Kafka**를 선택합니다.
   + **Kafka 부트스트랩 서버 URL**에 클러스터의 부트스트랩 브로커 문자열을 제공하세요. MSK 콘솔에서 또는 다음 CLI 명령을 입력하여 부트스트랩 브로커를 가져올 수 있습니다.

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + **SSL 연결 필요** 확인란의 선택을 취소하세요.

   **Next**(다음)를 선택합니다.

1. **VPC** 페이지에서 다음 값을 입력합니다.
   + **VPC**에서 VPC의 이름(예: ** AWS KafkaTutorialVPC**)을 선택합니다.
   + **서브넷**에서 **AWS KafkaTutorialSubnet2**를 선택합니다.
   + **보안 그룹**의 경우 사용 가능한 모든 그룹을 선택합니다.

   **Next**(다음)를 선택합니다.

1. **연결 속성** 및 **연결 액세스** 페이지에서 **마침**을 선택합니다.

**표 생성**
**참고**  
다음 단계에 설명된 대로 표을 수동으로 생성하거나 Apache Zeppelin 내의 노트북에서 Managed Service for Apache Flink용 표 커넥터 코드 생성을 사용하여 DDL 문을 통해 표을 생성할 수 있습니다. 그런 다음 테이블 AWS Glue 이 올바르게 생성되었는지 확인할 수 있습니다.

1. 왼쪽 탐색 모음에서 **표**를 선택합니다. **표** 페이지에서 **표 추가**, **표 수동 추가**를 선택합니다.

1. **표 속성 설정** 페이지에서 **표 명칭**에 **stock**을 입력합니다. 이전에 만든 데이터베이스를 선택했는지 확인하세요. **Next**(다음)를 선택합니다.

1. **데이터 스토어 추가** 페이지에서 **Kafka**를 선택합니다. **주제 명칭**에는 주제 명칭(예: **AWS KafkaTutorialTopic**)을 입력합니다. **연결**에서 **Zeppel In Connection**을 선택합니다.

1. **분류** 페이지에서 **JSON**을 선택합니다. **Next**(다음)를 선택합니다.

1. **스키마 정의** 페이지에서 열 추가를 선택하여 열을 추가합니다. 다음 속성을 가진 열을 추가합니다.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/example-notebook-msk.html)

   **Next**(다음)를 선택합니다.

1. 다음 페이지에서 설정을 확인하고 **마침**을 선택합니다.

1. 테이블 목록에서 새로 생성한 테이블을 선택합니다.

1. **테이블 편집**을 선택하고 다음 속성을 추가합니다.
   + 키: `managed-flink.proctime`, 값: `proctime`
   + 키: `flink.properties.group.id`, 값: `test-consumer-group`
   + 키: `flink.properties.auto.offset.reset`, 값: `latest`
   + 키: `classification`, 값: `json`

   이러한 키/값 페어가 없으면 Flink 노트북에 오류가 발생합니다.

1. **적용**을 선택합니다.

## Amazon MSK로 Studio 노트북 생성
<a name="example-notebook-msk-create"></a>

애플리케이션에서 사용하는 리소스를 생성했으니 이제 Studio 노트북을 생성합니다.

**Topics**
+ [를 사용하여 Studio 노트북 생성 AWS Management Console](#example-notebook-create-msk-console)
+ [를 사용하여 Studio 노트북 생성 AWS CLI](#example-notebook-msk-create-api)

**참고**  
Amazon MSK 콘솔에서 기존 클러스터를 선택한 다음 **실시간 데이터 처리**를 선택하여 Studio 노트북을 생성할 수도 있습니다.

### 를 사용하여 Studio 노트북 생성 AWS Management Console
<a name="example-notebook-create-msk-console"></a>

1. [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **Managed Service for Apache Flink 애플리케이션** 페이지에서 **Studio** 탭을 선택합니다. **Studio 노트북 생성**을 선택합니다.
**참고**  
Amazon MSK 또는 Kinesis Data Streams 콘솔에서 Studio 노트북을 생성하려면 입력 Amazon MSK 클러스터 또는 Kinesis 데이터 스트림을 선택한 다음 **실시간 데이터 처리**를 선택합니다.

1. **Studio 노트북 생성** 페이지에서 다음 정보를 입력합니다.
   + **Studio 노트북 명칭** **MyNotebook**을 입력합니다.
   + **AWS Glue 데이터베이스**의 **기본값**을 선택합니다.

   **Studio 노트북 생성**을 선택합니다.

1. **MyNotebook** 페이지에서 **구성** 탭을 선택합니다. **네트워킹** 섹션에서 **편집**을 선택합니다.

1. **MyNotebook의 네트워킹 편집** 페이지에서 **Amazon MSK 클러스터를 기반으로 하는 VPC 구성**을 선택합니다. **Amazon MSK 클러스터**용 Amazon MSK 클러스터를 선택하세요. **Save changes**(변경 사항 저장)를 선택합니다.

1. **MyNotebook** 페이지에서 **실행**을 선택합니다. **상태**가 **실행 중**으로 표시될 때까지 기다리세요.

### 를 사용하여 Studio 노트북 생성 AWS CLI
<a name="example-notebook-msk-create-api"></a>

를 사용하여 Studio 노트북을 생성하려면 다음을 AWS CLI수행합니다.

1. 다음 정보가 있는지 확인합니다. 애플리케이션을 생성하려면 이러한 값이 필요합니다.
   + 계정 ID
   + Amazon MSK 클러스터를 포함하는 Amazon VPC의 서브넷 ID 및 보안 그룹 ID

1. 다음 콘텐츠를 가진 `create.json`이라는 파일을 생성합니다: 자리 표시자 값을 정보로 바꿉니다.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. 애플리케이션을 생성하려면 다음 명령을 실행합니다.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. 명령이 완료되면 새 Studio 노트북의 세부 정보를 보여 주는 다음과 유사한 출력이 나타나야 합니다.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. 애플리케이션을 시작하려면 다음 명령을 실행합니다. 샘플 값을 계정 ID로 바꿉니다.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Amazon MSK 클러스터로 데이터 전송
<a name="example-notebook-msk-send"></a>

이 섹션에서는 Amazon EC2 클라이언트에서 Python 스크립트를 실행하여 Amazon MSK 데이터 소스로 데이터를 전송합니다.

1. Amazon EC2 클라이언트에 연결합니다.

1. 다음 명령을 실행하여 Python 버전 3, Pip 및 Python용 Kafka 패키지를 설치하고 작업을 확인합니다.

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 다음 명령을 입력하여 클라이언트 시스템에서를 구성합니다 AWS CLI .

   ```
   aws configure
   ```

   `region`에 계정 자격 증명 및 **us-east-1**을 제공하세요.

1. 다음 콘텐츠를 가진 `stock.py`이라는 파일을 생성합니다: 샘플 값을 Amazon MSK 클러스터의 부트스트랩 브로커 문자열로 바꾸고, 주제가 **AWS KafkaTutorialTopic**이 아닌 경우 주제 명칭을 업데이트하세요.

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 다음 명령으로 스크립트를 실행합니다:

   ```
   $ python3 stock.py
   ```

1. 다음 섹션을 완료하는 동안 스크립트는 실행 상태로 두세요.

## Studio 노트북을 테스트
<a name="example-notebook-msk-test"></a>

이 섹션에서는 Studio 노트북을 사용하여 Amazon MSK 클러스터에서 데이터를 쿼리합니다.

1. [https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **Managed Service for Apache Flink 애플리케이션** 페이지에서 **Studio 노트북** 탭을 선택합니다. **MyNotebook**을 선택합니다.

1. **MyNotebook** 페이지에서 **Apache Zeppelin에서 열기**를 선택합니다.

   Apache Zeppelin 인터페이스가 새 탭에서 열립니다.

1. **제플린에 오신 것을 환영합니다\$1** 페이지에서 **Zeppelin 새로운 노트**를 선택하세요.

1. **Zeppelin 노트** 페이지에서 새로운 노트에 다음 쿼리를 입력합니다.

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   실행 아이콘을 선택합니다.

   애플리케이션은 Amazon MSK 클러스터의 데이터를 표시합니다.

애플리케이션에서 작동 측면을 볼 수 있도록 Apache Flink 대시보드를 열려면 **FLINK JOB**을 선택합니다. Flink 대시보드에 대한 자세한 내용을 알아보려면 [Managed Service for Apache Flink 개발자 가이드](https://docs.aws.amazon.com/)의 [Apache Flink 대시보드](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)를 참조하세요.

Flink Streaming SQL 쿼리의 더 많은 예는 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)를 참조하세요.

# 애플리케이션 및 종속 리소스 정리
<a name="example-notebook-cleanup"></a>

## Studio 노트북 삭제
<a name="example-notebook-cleanup-app"></a>

1. Managed Service for Apache Flink 콘솔을 엽니다.

1. **MyNotebook**을 선택합니다.

1. **작업**을 선택한 다음 **삭제**를 선택합니다.

## AWS Glue 데이터베이스 및 연결 삭제
<a name="example-notebook-cleanup-glue"></a>

1. [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) AWS Glue 콘솔을 엽니다.

1. 왼쪽 탐색 모음에서 **데이터베이스**를 선택합니다. **기본값** 옆의 체크박스를 선택하여 선택합니다. **작업**, **데이터베이스 삭제**를 선택합니다. 선택 항목을 확인합니다.

1. 왼쪽 탐색 모음에서 **연결**을 선택합니다. **ZeppelInConnection** 옆의 체크박스를 선택하여 선택하세요. **작업**, **연결 삭제**를 선택합니다. 선택 항목을 확인합니다.

## IAM 역할 및 정책 삭제
<a name="example-notebook-msk-cleanup-iam"></a>

1. IAM 콘솔([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/))을 엽니다.

1. 왼쪽 탐색 모음에서 **역할**을 선택합니다.

1. 검색창을 사용하여 **ZeppelInRole** 역할을 검색합니다.

1. **ZeppelInRole** 역할을 선택하세요. **역할 삭제**를 선택합니다. 삭제를 확인합니다.

## CloudWatch 로그 그룹 삭제
<a name="example-notebook-cleanup-cw"></a>

콘솔을 사용하여 애플리케이션을 생성하면 콘솔에서 CloudWatch Logs 그룹과 로그 스트림을 자동으로 생성합니다. AWS CLI를 사용하여 애플리케이션을 생성한 경우에는 로그 그룹과 스트림이 없습니다.

1. [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)에서 CloudWatch 콘솔을 엽니다.

1. 왼쪽 탐색 모음에서 **로그 그룹**을 선택합니다.

1. **/AWS/KinesisAnalytics/MyNotebook** 로그 그룹을 선택합니다.

1. **작업**, **로그 그룹 삭제**를 선택합니다. 삭제를 확인합니다.

## Kinesis Data Streams 리소스 정리
<a name="example-notebook-cleanup-streams"></a>

Kinesis 스트림을 삭제하려면 Kinesis 데이터 스트림 콘솔을 열고 Kinesis 스트림을 선택한 다음 **작업**, **삭제**를 선택합니다.

## MSK 리소스 정리
<a name="example-notebook-cleanup-msk"></a>

이 자습서에서 Amazon MSK 클러스터를 생성한 경우 이번 섹션의 단계를 따르세요. 이 섹션에는 Amazon EC2 클라이언트 인스턴스, Amazon VPC 및 Amazon MSK 클러스터를 정리하는 방법이 나와 있습니다.

### Amazon MSK 클러스터 삭제
<a name="example-notebook-msk-cleanup-msk"></a>

이 자습서에서 Amazon MSK 클러스터를 생성한 경우 다음 단계를 따르세요.

1. [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/)에서 Amazon MSK 콘솔을 엽니다.

1. **AWS KafkaTutorialCluster**를 선택합니다. **삭제**를 선택합니다. 나타나는 창에 **delete**을(를) 입력하고 선택을 확인합니다.

### 클라이언트 인스턴스 종료
<a name="example-notebook-msk-cleanup-client"></a>

이 자습서에서 Amazon EC2 클라이언트 인스턴스를 생성한 경우 다음 단계를 따르세요.

1. [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)에서 Amazon EC2 콘솔을 엽니다.

1. 왼쪽 탐색 모음에서 **인스턴스**를 선택합니다.

1. **ZeppelinClient** 옆의 체크박스를 선택하여 선택하세요.

1. **인스턴스 상태**, **인스턴스 종료**를 선택합니다.

### Amazon VPC 삭제
<a name="example-notebook-msk-cleanup-vpc"></a>

이 자습서에서 Amazon VPC를 만든 경우 다음 단계를 따르세요.

1. [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/)에서 Amazon EC2 콘솔을 엽니다.

1. 왼쪽 탐색 표시줄에서 **네트워크 인터페이스**를 선택합니다.

1. 검색창에 VPC ID를 입력하고 Enter 키를 눌러 검색합니다.

1. 테이블 헤더의 체크박스를 선택하여 표시된 모든 네트워크 인터페이스를 선택합니다.

1. **작업**, **분리**를 선택합니다. 표시되는 창에서 **강제 분리** 아래에서 **활성화**를 선택합니다. **분리**를 선택하고 모든 네트워크 인터페이스가 **사용 가능** 상태에 도달할 때까지 기다립니다.

1. 테이블 헤더의 체크박스를 선택하여 표시된 모든 네트워크 인터페이스를 다시 선택합니다.

1. **작업**, **삭제**를 선택합니다. 작업을 확인합니다.

1. [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)에서 Amazon VPC 콘솔을 엽니다.

1. **AWS KafkaTutorialVPC**를 선택합니다. **작업**, **VPC 삭제**를 선택합니다. **delete**을(를) 입력하고 삭제를 확인합니다.

# 자습서: 지속 가능한 상태를 사용하는 Managed Service for Apache Flink 애플리케이션으로 Studio 노트북 배포
<a name="example-notebook-deploy"></a>

다음 자습서에서는 지속 가능한 상태의 Apache Flink 애플리케이션용 관리 서비스로 Studio 노트북을 배포하는 방법을 보여줍니다.

**Topics**
+ [사전 조건 완료](#example-notebook-durable-setup)
+ [를 사용하여 내구성 있는 상태의 애플리케이션 배포 AWS Management Console](#example-notebook-deploy-console)
+ [를 사용하여 내구성 있는 상태의 애플리케이션 배포 AWS CLI](#example-notebook-deploy-cli)

## 사전 조건 완료
<a name="example-notebook-durable-setup"></a>

Kinesis Data Streams 또는 Amazon MSK를 사용하여 [자습서: Managed Service for Apache Flink에서 Studio 노트북 생성](example-notebook.md)에 따라 새 Studio 노트북을 생성하세요. Studio 노트북의 이름을 `ExampleTestDeploy`(으)로 지정하세요.

## 를 사용하여 내구성 있는 상태의 애플리케이션 배포 AWS Management Console
<a name="example-notebook-deploy-console"></a>

1. 콘솔의 **애플리케이션 코드 위치(*선택 사항*)**에 패키지 코드를 저장할 S3 버킷 위치를 추가합니다. 이렇게 하면 단계를 통해 노트북에서 직접 애플리케이션을 배포하고 실행할 수 있습니다.

1. Amazon S3 버킷을 읽고 쓰는 데 사용하는 역할을 활성화하고 Managed Service for Apache Flink 애플리케이션을 시작하는 데 필요한 권한을 애플리케이션 역할에 추가합니다.
   + AmazonS3FullAccess
   + Amazonmanaged-flinkFullAccess
   + 해당하는 경우 소스, 대상 및 VPC에 액세스할 수 있습니다. 자세한 설명은 [Studio 노트북의 IAM 권한 검토](how-zeppelin-iam.md) 섹션을 참조하세요.

1. 다음 예제 코드를 사용하세요.

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. 이번 기능 출시와 함께 노트북의 각 노트 오른쪽 상단 모서리에 노트북 이름과 함께 새로운 드롭다운이 표시됩니다. 다음을 수행할 수 있습니다.
   +  AWS Management Console에서 Studio 노트북 설정을 볼 수 있습니다.
   + Zeppelin Note를 빌드하고 Amazon S3로 내보내세요. 이때 애플리케이션 이름을 입력하고 **빌드 및 내보내기**를 선택합니다. 내보내기가 완료되면 알림을 받게 됩니다.
   + 필요한 경우 Amazon S3의 실행 파일에서 추가 테스트를 보고 실행할 수 있습니다.
   + 빌드가 완료되면 지속 가능한 상태 및 자동 크기 조정 기능을 갖춘 Kinesis 스트리밍 애플리케이션으로 코드를 배포할 수 있습니다.
   + 드롭다운을 사용하여 **Zeppelin Note를 Kinesis 스트리밍 애플리케이션으로 배포**를 선택합니다. 애플리케이션 이름을 검토하고 ** AWS 콘솔을 통해 배포**를 선택합니다.
   + 그러면 Managed Service for Apache Flink 애플리케이션을 생성하는 AWS Management Console 페이지로 이동합니다. 참고로 애플리케이션 이름, 병렬 처리, 코드 위치, 기본 Glue DB, VPC (해당하는 경우) 및 IAM 역할이 미리 입력되어 있습니다. IAM 역할에 소스 및 대상에 필요한 권한이 있는지 확인하세요. 안정적인 애플리케이션 상태 관리를 위해 스냅샷은 기본적으로 활성화됩니다.
   + **애플리케이션 생성**을 선택합니다.
   + 설정 **구성** 및 수정을 선택한 다음 **Run**을 선택하여 스트리밍 애플리케이션을 시작할 수 있습니다.

## 를 사용하여 내구성 있는 상태의 애플리케이션 배포 AWS CLI
<a name="example-notebook-deploy-cli"></a>

를 사용하여 애플리케이션을 배포하려면 Beta 2 정보와 함께 제공된 서비스 모델을 AWS CLI 사용하도록를 업데이트 AWS CLI해야 합니다. 업데이트된 서비스 모델을 사용하는 방법에 대한 자세한 내용은 [사전 조건 완료사전 조건 완료](example-notebook.md#example-notebook-setup) 섹션을 참조하세요.

다음 예제 코드에서는 새 Studio 노트북을 생성합니다.

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

다음 코드 예제에서는 Studio 노트북을 시작합니다.

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

다음 코드는 애플리케이션의 Apache Zeppelin 노트북 페이지의 URL을 반환합니다.

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Studio 노트북에서 데이터를 분석하기 위한 예제 쿼리 보기
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Amazon MSK/Apache Kafka로 테이블 생성](#how-zeppelin-examples-creating-tables)
+ [Kinesis를 사용하여 테이블 생성](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [텀블링 윈도우 쿼리](#how-zeppelin-examples-tumbling)
+ [슬라이딩 창 쿼리](#how-zeppelin-examples-sliding)
+ [대화형 SQL 사용](#how-zeppelin-examples-interactive-sql)
+ [BlackHole SQL 커넥터 사용](#how-zeppelin-examples-blackhole-connector-sql)
+ [Scala를 사용하여 샘플 데이터 생성](#notebook-example-data-generator)
+ [대화형 Scala 사용](#notebook-example-interactive-scala)
+ [대화형 Python 사용](#notebook-example-interactive-python)
+ [대화형 Python, SQL, Scala를 조합하여 사용](#notebook-example-interactive-pythonsqlscala)
+ [계정 간 Kinesis 데이터 스트림 사용](#notebook-example-crossaccount-kds)

Apache Flink SQL 쿼리 설정에 대한 자세한 내용은 [대화형 데이터 분석을 위한 Zeppelin 노트북의 Flink](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html)를 참조하세요.

Apache Flink 대시보드에서 애플리케이션을 보려면 애플리케이션의 **Zeppelin Note** 페이지에서 **FLINK JOB**을 선택하세요.

윈도우 쿼리에 대한 자세한 내용은 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [윈도우](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html)를 참조하세요.

Apache Flink Streaming SQL 쿼리의 더 많은 예는 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)를 참조하세요.

## Amazon MSK/Apache Kafka로 테이블 생성
<a name="how-zeppelin-examples-creating-tables"></a>

Managed Service for Apache Flink Studio와 Amazon MSK Flink 커넥터를 사용하여 일반 텍스트, SSL 또는 IAM 인증과의 연결을 인증할 수 있습니다. 요구 사항에 따라 특정 속성을 사용하여 테이블을 생성하세요.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

[Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/)에서 이러한 속성을 다른 속성과 결합할 수 있습니다.

## Kinesis를 사용하여 테이블 생성
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

다음 예제에서는 Kinesis를 사용하여 테이블을 생성합니다.

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

사용할 수 있는 다른 속성에 대한 자세한 내용은 [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/)를 참조하세요.

## 텀블링 윈도우 쿼리
<a name="how-zeppelin-examples-tumbling"></a>

다음 Flink Streaming SQL 쿼리는 `ZeppelinTopic` 테이블에서 각 5초의 텀블링 윈도우 내에서 가장 높은 가격을 선택합니다.

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## 슬라이딩 창 쿼리
<a name="how-zeppelin-examples-sliding"></a>

다음 Apache Flink Streaming SQL 쿼리는 `ZeppelinTopic` 표에서 각 5초 슬라이딩 윈도우에서 가장 높은 가격을 선택합니다.

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## 대화형 SQL 사용
<a name="how-zeppelin-examples-interactive-sql"></a>

이 예제는 최대 이벤트 시간 및 처리 시간과 키 값 테이블의 값의 합계를 인쇄합니다. [Scala를 사용하여 샘플 데이터 생성](#notebook-example-data-generator) 샘플 데이터 생성 스크립트가 실행 중인지 확인하세요. Studio 노트북에서 필터링 및 조인과 같은 다른 SQL 쿼리를 시도하려면 Apache Flink 설명서: Apache Flink 설명서의 [쿼리](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)를 참조하세요.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## BlackHole SQL 커넥터 사용
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

BlackHole SQL 커넥터를 사용하면 쿼리를 테스트하기 위해 Kinesis 데이터 스트림 또는 Amazon MSK 클러스터를 생성할 필요가 없습니다. BlackHole SQL 커넥터에 대한 자세한 내용은 Apache Flink 설명서의 [BlackHole SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) 참조하세요. 이 예제에서 기본 카탈로그는 인메모리 카탈로그입니다.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Scala를 사용하여 샘플 데이터 생성
<a name="notebook-example-data-generator"></a>

이 예제에서는 Scala를 사용하여 샘플 데이터를 생성합니다. 이 샘플 데이터를 사용하여 다양한 쿼리를 테스트할 수 있습니다. 테이블 생성 명령문을 사용하여 키 값 테이블을 생성합니다.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## 대화형 Scala 사용
<a name="notebook-example-interactive-scala"></a>

[대화형 SQL 사용](#how-zeppelin-examples-interactive-sql)의 스칼라 번역본입니다. 더 많은 스칼라 예제를 보려면 Apache Flink 설명서의 [테이블 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)를 참조하세요.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 대화형 Python 사용
<a name="notebook-example-interactive-python"></a>

[대화형 SQL 사용](#how-zeppelin-examples-interactive-sql)의 Python 번역은 다음과 같습니다 더 많은 Python 예제를 보려면 Apache Flink 문서의 [테이블 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)를 참조하세요.

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## 대화형 Python, SQL, Scala를 조합하여 사용
<a name="notebook-example-interactive-pythonsqlscala"></a>

대화형 분석을 위해 노트북에서 SQL, Python 및 Scala의 모든 조합을 사용할 수 있습니다. 지속 가능한 상태의 애플리케이션으로 배포할 Studio 노트북에서는 SQL과 Scala를 함께 사용할 수 있습니다. 이 예제에서는 무시되는 섹션과 지속 가능한 상태의 애플리케이션에 배포되는 섹션을 보여줍니다.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## 계정 간 Kinesis 데이터 스트림 사용
<a name="notebook-example-crossaccount-kds"></a>

Studio 노트북이 있는 계정이 아닌 다른 계정에 있는 Kinesis 데이터 스트림을 사용하려면 Studio 노트북이 실행되는 계정에서 서비스 실행 역할을 생성하고 데이터 스트림이 있는 계정에서 역할 신뢰 정책을 생성하세요. 테이블 생성 DDL 명령문의 Kinesis 커넥터에서 `aws.credentials.provider`, `aws.credentials.role.arn`, 및 `aws.credentials.role.sessionName`을(를) 사용하여 데이터 스트림에 대한 테이블을 생성합니다.

Studio 노트북 계정에 다음 서비스 실행 역할을 사용하세요.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

데이터 스트림 계정에 `AmazonKinesisFullAccess` 정책과 다음 역할 신뢰 정책을 사용하세요.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

테이블 생성 명령문에는 다음 단락을 사용하세요.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```