

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Managed Service for Apache Flink 애플리케이션 생성 및 실행
<a name="get-started-exercise"></a>

이 연습에서는 데이터 스트림을 소스 및 싱크로 사용하여 Managed Service for Apache Flink 애플리케이션을 만듭니다.

**Topics**
+ [2개의 Amazon Kinesis 데이터 스트림 생성](#get-started-exercise-1)
+ [샘플 레코드를 입력 스트림에 쓰기](#get-started-exercise-2)
+ [Apache Flink 스트리밍 Java 코드 다운로드 및 검사](#get-started-exercise-5)
+ [애플리케이션 코드 컴파일](#get-started-exercise-5.5)
+ [Apache Flink 스트리밍 Java 코드 업로드](#get-started-exercise-6)
+ [Managed Service for Apache Flink 애플리케이션 생성 및 실행](#get-started-exercise-7)

## 2개의 Amazon Kinesis 데이터 스트림 생성
<a name="get-started-exercise-1"></a>

이 연습을 위해 Amazon Managed Service for Apache Flink를 생성하기 전에 두 개의 Kinesis 데이터 스트림(`ExampleInputStream` 및 `ExampleOutputStream`)을 생성하세요. 이 애플리케이션은 애플리케이션 소스 및 대상 스트림에 대해 이러한 스트림을 사용합니다.

Amazon Kinesis 콘솔 또는 다음 AWS CLI 명령을 사용하여 이러한 스트림을 생성할 수 있습니다. 콘솔 지침은 [데이터 스트림 만들기 및 업데이트](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)를 참조하십시오.

**데이터 스트림 (AWS CLI)을 생성하려면**

1. 첫 번째 스트림(`ExampleInputStream`)을 생성하려면 다음 Amazon Kinesis `create-stream` AWS CLI 명령을 사용합니다.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. 애플리케이션에서 출력을 쓰는 데 사용하는 두 번째 스트림을 생성하려면 동일한 명령을 실행하여 스트림 명칭을 `ExampleOutputStream`으로 변경합니다.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## 샘플 레코드를 입력 스트림에 쓰기
<a name="get-started-exercise-2"></a>

이 섹션에서는 Python 스크립트를 사용하여 애플리케이션에서 처리할 샘플 레코드를 스트림에 쓰기 합니다.

**참고**  
이 섹션에서는 [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/)이 필요합니다.

1. 다음 콘텐츠를 가진 `stock.py`이라는 파일을 생성합니다:

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. 이 자습서의 뒷부분에서 `stock.py` 스크립트를 실행하여 애플리케이션으로 데이터를 전송합니다.

   ```
   $ python stock.py
   ```

## Apache Flink 스트리밍 Java 코드 다운로드 및 검사
<a name="get-started-exercise-5"></a>

이 예제에 대한 Java 애플리케이션 코드는 GitHub에서 다운로드할 수 있습니다. 애플리케이션 코드를 다운로드하려면 다음을 수행하세요.

1. 다음 명령을 사용하여 원격 리포지토리를 복제합니다:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. `GettingStarted` 디렉터리로 이동합니다.

애플리케이션 코드는 `CustomSinkStreamingJob.java` 및 `CloudWatchLogSink.java` 파일에 있습니다. 애플리케이션 코드에 대해 다음을 유의하십시오:
+ 애플리케이션은 Kinesis 소스를 사용하여 소스 스트림에서 읽습니다. 다음 조각은 Kinesis 싱크를 생성합니다.

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## 애플리케이션 코드 컴파일
<a name="get-started-exercise-5.5"></a>

이 섹션에서는 Apache Maven 컴파일러를 사용하여 애플리케이션용 Java 코드를 생성합니다. Apache Maven 및 Java Development Kit(JDK) 설치에 대한 자세한 내용을 알아보려면 [연습 완료를 위한 필수 조건](tutorial-stock-data.md#setting-up-prerequisites) 섹션을 참조하십시오.

Java 애플리케이션을 사용하려면 다음 구성 요소가 필요합니다.
+ [Project Object Model(pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html) 파일. 이 파일에는 Amazon Managed Service for Apache Flink 라이브러리를 포함하여 애플리케이션의 구성 및 종속성에 대한 정보가 들어 있습니다.
+ 애플리케이션의 로직을 포함하는 `main` 메서드

**참고**  
**다음 애플리케이션에 대해 Kinesis 커넥터를 사용하려면 커넥터의 소스 코드를 다운로드하고 [Apache Flink 설명서](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html)에 설명된 대로 빌드해야 합니다.**

**애플리케이션 코드를 생성 및 컴파일하려면**

1. 개발 환경에서 Java/Maven 애플리케이션을 생성합니다. 애플리케이션 생성에 대한 자세한 내용은 해당 개발 환경 설명서를 참조하십시오.
   + [ 첫 번째 Java 프로젝트 생성(Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ 첫 번째 Java 애플리케이션 생성, 실행 및 패키징(IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. `StreamingJob.java`라는 파일에 다음 코드를 사용하십시오.

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   앞의 코드 예제에 대해서는 다음 사항에 유의하십시오.
   + 이 파일에는 애플리케이션의 기능을 정의하는 `main` 메서드가 들어 있습니다.
   + 애플리케이션은 `StreamExecutionEnvironment` 객체를 사용하여 외부 리소스에 액세스하기 위한 소스 및 싱크 커넥터를 생성합니다.
   + 애플리케이션은 정적 속성을 사용하여 소스 및 싱크 커넥터를 만듭니다. 동적 애플리케이션 속성을 사용하려면 `createSourceFromApplicationProperties` 및 `createSinkFromApplicationProperties` 메서드를 사용하여 커넥터를 생성합니다. 이 메서드는 애플리케이션의 속성을 읽어 커넥터를 구성합니다.

1. 애플리케이션 코드를 사용하려면 이를 컴파일하고 JAR 파일로 패키징합니다. 다음 두 가지 방법 중 하나로 코드를 컴파일하고 패키징할 수 있습니다:
   + 명령줄 Maven 도구 사용. `pom.xml` 파일이 있는 디렉터리에서 다음 명령을 실행하여 JAR 파일을 생성합니다:

     ```
     mvn package
     ```
   + 귀하의 개발 환경 사용. 자세한 내용을 알아보려면 해당 개발 환경 설명서를 참조하십시오.

   패키지를 JAR 파일로 업로드하거나 패키지를 압축하여 ZIP 파일로 업로드할 수 있습니다. 를 사용하여 애플리케이션을 생성하는 경우 코드 콘텐츠 유형(JAR 또는 ZIP)을 AWS CLI지정합니다.

1. 컴파일하는 동안 오류가 발생하면 `JAVA_HOME` 환경 변수가 올바르게 설정되어 있는지 확인하십시오.

애플리케이션이 성공적으로 컴파일되면 다음 파일이 생성됩니다:

`target/java-getting-started-1.0.jar`

## Apache Flink 스트리밍 Java 코드 업로드
<a name="get-started-exercise-6"></a>

이 섹션에서는 Amazon Simple Storage Service(Amazon S3) 버킷을 만들고 애플리케이션 코드를 업로드합니다.

**애플리케이션 코드 업로드하기**

1. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)에서 Amazon S3 콘솔을 엽니다.

1. **버킷 만들기**를 선택합니다.

1. **버킷 명칭** 필드에 **ka-app-code-*<username>***을 입력합니다. 버킷 명칭에 사용자 이름 등의 접미사를 추가하여 전역적으로 고유하게 만듭니다. **다음**을 선택합니다.

1. **옵션 구성** 단계에서 설정을 기본값 그대로 두고 **다음**을 선택합니다.

1. **권한 설정** 단계에서 설정을 기본값 그대로 두고 **다음**을 선택합니다.

1. **버킷 생성**을 선택합니다.

1. Amazon S3 콘솔에서 **ka-app-code-*<username>*** 버킷을 선택하고 **업로드**를 선택합니다.

1. **파일 선택** 단계에서 **파일 추가**를 선택합니다. 이전 단계에서 생성한 `java-getting-started-1.0.jar` 파일로 이동합니다. **다음**을 선택합니다.

1. **권한 설정** 단계에서 설정을 기본값 그대로 유지합니다. **다음**을 선택합니다.

1. **속성 설정** 단계에서 설정을 기본값 그대로 유지합니다. **업로드**를 선택합니다.

이제 애플리케이션 코드가 애플리케이션에서 액세스할 수 있는 Amazon S3 버킷에 저장됩니다.

## Managed Service for Apache Flink 애플리케이션 생성 및 실행
<a name="get-started-exercise-7"></a>

콘솔이나 AWS CLI를 사용하여 Managed Service for Apache Flink 애플리케이션을 생성하고 실행할 수 있습니다.

**참고**  
콘솔을 사용하여 애플리케이션을 생성하면 AWS Identity and Access Management (IAM) 및 Amazon CloudWatch Logs 리소스가 자동으로 생성됩니다. 를 사용하여 애플리케이션을 생성할 때 이러한 리소스를 별도로 AWS CLI생성합니다.

**Topics**
+ [애플리케이션 생성 및 실행(콘솔)](#get-started-exercise-7-console)
+ [애플리케이션 생성 및 실행(AWS CLI)](#get-started-exercise-7-cli)

### 애플리케이션 생성 및 실행(콘솔)
<a name="get-started-exercise-7-console"></a>

콘솔을 사용하여 애플리케이션을 생성, 구성, 업데이트 및 실행하려면 다음 단계를 수행하세요.

#### 애플리케이션 생성
<a name="get-started-exercise-7-console-create"></a>

1. [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)에서 Kinesis 콘솔을 엽니다.

1. Amazon Kinesis 대시보드에서 **분석 애플리케이션 생성**을 선택합니다.

1. **Kinesis Analytics - 애플리케이션 생성** 페이지에서 다음과 같이 애플리케이션 세부 정보를 제공합니다.
   + **애플리케이션 명칭**에 **MyApplication**을 입력합니다.
   + **설명**에 **My java test app**를 입력합니다.
   + **런타임**에서 **Apache Flink 1.6**을 선택합니다.

1. **액세스 권한**에서 **IAM 역할 `kinesis-analytics-MyApplication-us-west-2` 생성/업데이트**를 선택합니다.

1. **애플리케이션 생성**을 선택합니다.

**참고**  
콘솔을 사용하여 Amazon Managed Service for Apache Flink 애플리케이션을 생성할 때 내 애플리케이션에 대한 IAM 역할 및 정책을 생성하는 옵션이 있습니다. 귀하의 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합니다. 이러한 IAM 리소스의 이름은 애플리케이션 명칭과 리전을 사용하여 다음과 같이 지정됩니다.  
정책: `kinesis-analytics-service-MyApplication-us-west-2`
역할: `kinesis-analytics-MyApplication-us-west-2`

#### IAM 정책 편집
<a name="get-started-exercise-7-console-iam"></a>

IAM 정책을 편집하여 Kinesis Data Streams에 액세스할 수 있는 권한을 추가합니다.

1. [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)에서 IAM 콘솔을 여세요.

1. **정책**을 선택하세요. 이전 섹션에서 콘솔이 생성한 **`kinesis-analytics-service-MyApplication-us-west-2`** 정책을 선택합니다.

1. **요약** 페이지에서 **정책 편집**을 선택합니다. **JSON** 탭을 선택합니다.

1. 다음 정책 예제의 강조 표시된 부분을 정책에 추가하세요. 샘플 계정 ID(*012345678901*)를 내 계정 ID로 바꿉니다.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### 애플리케이션 구성
<a name="get-started-exercise-7-console-configure"></a>

1. **MyApplication** 페이지에서 **구성**을 선택합니다.

1. **애플리케이션 구성** 페이지에서 **코드 위치**를 입력합니다.
   + **Amazon S3 버킷**의 경우 **ka-app-code-*<username>***를 입력합니다.
   + **Amazon S3 객체 경로**에는 **java-getting-started-1.0.jar**를 입력합니다.

1. **애플리케이션 리소스에 대한 액세스** 아래에서 **액세스 권한**의 경우 **IAM 역할 `kinesis-analytics-MyApplication-us-west-2` 생성/업데이트**를 선택합니다.

1. **속성**에서 **그룹 ID**에 **ProducerConfigProperties**를 입력합니다.

1. 다음 애플리케이션 속성 및 값을 입력합니다:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ko_kr/streams/latest/dev/get-started-exercise.html)

1. **모니터링**에서 **지표 수준 모니터링**이 **애플리케이션**으로 설정되어 있는지 확인합니다.

1. **CloudWatch 로깅**에서 **활성화** 확인란을 선택합니다.

1. **업데이트**를 선택합니다.

**참고**  
CloudWatch 로깅을 활성화하도록 선택하면 Managed Service for Apache Flink에서 로그 그룹 및 로그 스트림을 생성합니다. 이러한 리소스의 이름은 다음과 같습니다.  
로그 그룹: `/aws/kinesis-analytics/MyApplication`
로그 스트림: `kinesis-analytics-log-stream`

#### 애플리케이션을 실행합니다
<a name="get-started-exercise-7-console-run"></a>

1. **MyApplication** 페이지에서 **실행**을 선택합니다. 작업을 확인합니다.

1. 애플리케이션이 실행 중이면 페이지를 새로 고칩니다. 콘솔에 **애플리케이션 그래프**가 표시됩니다.

#### 애플리케이션 중지
<a name="get-started-exercise-7-console-stop"></a>

**MyApplication** 페이지에서 **중지**를 선택합니다. 작업을 확인합니다.

#### 애플리케이션 업데이트
<a name="get-started-exercise-7-console-update"></a>

콘솔을 사용하여 애플리케이션 속성, 모니터링 설정, 애플리케이션 JAR의 위치 또는 파일 명칭과 같은 애플리케이션 설정을 업데이트할 수 있습니다. 애플리케이션 코드를 업데이트해야 하는 경우 Amazon S3 버킷에서 애플리케이션 JAR을 다시 로드할 수도 있습니다.

**MyApplication** 페이지에서 **구성**을 선택합니다. 애플리케이션 설정을 업데이트하고 **업데이트**를 선택합니다.

### 애플리케이션 생성 및 실행(AWS CLI)
<a name="get-started-exercise-7-cli"></a>

이 섹션에서는 AWS CLI 를 사용하여 Managed Service for Apache Flink 애플리케이션을 생성하고 실행합니다. Managed Service for Apache Flink는 `kinesisanalyticsv2` AWS CLI 명령을 사용하여 Managed Service for Apache Flink 애플리케이션을 생성하고 상호 작용합니다.

#### 권한 정책 생성
<a name="get-started-exercise-7-cli-policy"></a>

먼저 소스 스트림에서 두 개의 명령문, 즉 `read` 작업에 대한 권한을 부여하는 명령문과 싱크 스트림에서 `write` 작업에 대한 권한을 부여하는 명령문이 있는 권한 정책을 만듭니다. 그런 다음 정책을 IAM 역할(다음 섹션에서 생성)에 연결합니다. 따라서 Managed Service for Apache Flink가 역할을 맡을 때 서비스는 소스 스트림에서 읽고 싱크 스트림에 쓸 수 있는 권한이 있습니다.

다음 코드를 사용하여 `KAReadSourceStreamWriteSinkStream` 권한 정책을 생성합니다. `username`을 애플리케이션 코드를 저장하기 위해 Amazon S3 버킷을 만들 때 사용한 사용자 이름으로 바꿉니다. Amazon 리소스 이름(ARN)의 계정 ID(`012345678901`)를 사용자의 계정 ID로 바꿉니다.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

권한 정책을 생성하는 단계별 지침은 *IAM 사용자 가이드*의 [IAM 자습서: 첫 번째 고객 관리형 정책 만들기 및 연결](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy)을 참조하세요.

**참고**  
다른 AWS 서비스에 액세스하려면를 사용할 수 있습니다 AWS SDK for Java. Managed Service for Apache Flink는 SDK에 필요한 자격 증명을 애플리케이션과 연결된 서비스 실행 IAM 역할의 자격 증명으로 자동 설정합니다. 추가 단계는 필요 없습니다.

#### IAM 역할 생성
<a name="get-started-exercise-7-cli-role"></a>

이 섹션에서는 Managed Service for Apache Flink가 소스 스트림을 읽고 싱크 스트림에 쓰기 위해 맡을 수 있는 IAM 역할을 생성합니다.

Managed Service for Apache Flink는 권한 없이 스트림에 액세스할 수 없습니다. IAM 역할을 통해 이러한 권한을 부여합니다. 각 IAM 역할에는 두 가지 정책이 연결됩니다. 신뢰 정책은 Managed Service for Apache Flink가 역할을 취할 수 있는 권한을 부여하고, 권한 정책은 역할을 취한 후 Managed Service for Apache Flink에서 수행할 수 있는 작업을 결정합니다.

이전 섹션에서 생성한 권한 정책을 이 역할에 연결합니다.

**IAM 역할을 생성하려면**

1. [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)에서 IAM 콘솔을 엽니다.

1. 탐색 창에서 **역할**, **역할 생성**을 선택합니다.

1. **신뢰할 수 있는 유형의 자격 증명 선택**에서 **AWS 서비스**를 선택합니다. **이 역할을 사용할 서비스 선택**에서 **Kinesis**를 선택합니다. **사용 사례 선택**에서 **Kinesis Analytics**를 선택합니다.

   **다음: 권한**을 선택합니다.

1. **권한 정책 연결** 페이지에서 **다음: 검토**를 선택합니다. 역할을 생성한 후에 권한 정책을 연결합니다.

1. **역할 생성** 페이지에서 **역할 이름**으로 **KA-stream-rw-role**을 입력합니다. **역할 생성**을 선택합니다.

   `KA-stream-rw-role`이라는 새 IAM 역할이 생성되었습니다. 그런 다음 역할의 신뢰 정책 및 권한 정책을 업데이트합니다.

1. 역할에 권한 정책을 연결합니다.
**참고**  
이 연습에서는 Managed Service for Apache Flink가 이 역할을 취하여 Kinesis 데이터 스트림(소스)에서 데이터를 읽고 출력을 다른 Kinesis 데이터 스트림에 씁니다. 따라서 이전 단계 [권한 정책 생성](#get-started-exercise-7-cli-policy)에서 생성한 정책을 연결합니다.

   1. **요약** 페이지에서 **권한** 탭을 선택합니다.

   1. **정책 연결**을 선택합니다.

   1. 검색 상자에 **KAReadSourceStreamWriteSinkStream**(이전 섹션에서 생성한 정책)을 입력합니다.

   1. **KAReadInputStreamWriteOutputStream** 정책을 선택하고 **정책 연결**을 선택합니다.

이제 애플리케이션이 리소스에 액세스하는 데 사용하는 서비스 실행 역할이 생성되었습니다. 새 역할의 ARN을 기록합니다.

역할 생성에 대한 단계별 지침은 *IAM 사용 설명서*의 [IAM 역할 생성(콘솔)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console)을 참조하세요.

#### Managed Service for Apache Flink 애플리케이션 생성
<a name="get-started-exercise-7-cli-create"></a>

1. 다음 JSON 코드를 `create_request.json`이라는 파일에 저장합니다. 샘플 역할 ARN을 이전에 생성한 역할을 위한 ARN으로 바꿉니다. 버킷 ARN 접미사(`username`)를 이전 섹션에서 선택한 접미사로 바꿉니다. 서비스 실행 역할의 샘플 계정 ID(`012345678901`)를 해당 계정 ID로 바꿉니다.

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. 위의 요청과 함께 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) 작업을 실행하여 애플리케이션을 생성합니다.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

애플리케이션이 생성되었습니다. 다음 단계에서는 애플리케이션을 시작합니다.

#### 애플리케이션 시작
<a name="get-started-exercise-7-cli-start"></a>

이 섹션에서는 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) 작업을 사용하여 애플리케이션을 시작합니다.

**애플리케이션을 시작하려면**

1. 다음 JSON 코드를 `start_request.json`이라는 파일에 저장합니다.

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. 위의 요청과 함께 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) 작업을 실행하여 애플리케이션을 시작합니다.

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

애플리케이션이 실행됩니다. Amazon CloudWatch 콘솔에서 Managed Service for Apache Flink 지표를 확인하여 애플리케이션이 작동하는지 확인할 수 있습니다.

#### 애플리케이션 중지
<a name="get-started-exercise-7-cli-stop"></a>

이 섹션에서는 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) 작업을 사용하여 애플리케이션을 중지합니다.

**애플리케이션을 중지하려면**

1. 다음 JSON 코드를 `stop_request.json`이라는 파일에 저장합니다.

   ```
   {"ApplicationName": "test"
   }
   ```

1. 다음 요청과 함께 [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) 작업을 실행하여 애플리케이션을 중지합니다.

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

애플리케이션이 중지됩니다.