Amazon Managed Service for Apache Flink(Amazon MSF)는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려져 있었습니다.
기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Managed Service for Apache Flink 애플리케이션 생성 및 실행
이 연습에서는 Kinesis 데이터 스트림을 소스 및 싱크로 사용하는 Managed Service for Apache Flink 애플리케이션을 생성합니다.
이 섹션은 다음 주제를 포함합니다.
종속 리소스 생성
이 연습을 위해 Managed Service for Apache Flink를 생성하기 전에 먼저 다음과 같은 종속 리소스를 생성해야 합니다.
-
애플리케이션 코드를 저장하고 애플리케이션 출력을 기록하기 위한 Amazon S3 버킷입니다.
참고
이 자습서에서는 애플리케이션을 us-east-1 리전에 배포한다고 가정합니다. 다른 리전을 사용하는 경우 모든 단계를 해당 리전에 맞게 조정해야 합니다.
Amazon S3 버킷 생성
콘솔을 사용하여 Amazon S3 버킷을 생성할 수 있습니다. 리소스 생성에 대한 지침은 다음 주제를 참조하세요.
-
Amazon Simple Storage Service 사용 설명서의 S3 버킷을 생성하려면 어떻게 해야 합니까? Amazon S3 버킷 이름에 로그인 명칭을 추가하여 전역적으로 고유한 명칭을 지정합니다.
참고
이 자습서에 사용하는 리전에서 버킷을 생성해야 합니다. 자습서의 리전 기본값은 us-east-1입니다.
기타 리소스
애플리케이션을 생성할 때 Managed Service for Apache Flink는 다음과 같은 Amazon CloudWatch 리소스를 생성합니다(아직 존재하지 않는 경우).
-
/AWS/KinesisAnalytics-java/<my-application>라는 로그 그룹. -
kinesis-analytics-log-stream라는 로그 스트림.
로컬 개발 환경 설정
개발 및 디버깅을 위해 선택한 IDE에서 Apache Flink 애플리케이션을 로컬 머신에서 직접 실행할 수 있습니다. Apache Flink 종속성은 Maven을 사용하는 일반 Java 종속성과 동일하게 처리됩니다.
참고
로컬 개발 머신에는 Java JDK 11, Maven, Git이 설치되어 있어야 합니다. Eclipse Java Neon
AWS 세션 인증
애플리케이션이 Kinesis 데이터 스트림을 사용하여 데이터를 게시합니다. 로컬에서 실행하는 경우 Kinesis 데이터 스트림에 쓸 수 있는 권한이 있는 유효한 AWS 인증된 세션이 있어야 합니다. 다음 단계에 따라 세션을 인증합니다.
-
유효한 자격 증명이 구성된 AWS CLI 및 명명된 프로파일이 없는 경우 섹션을 참조하세요AWS Command Line Interface (AWS CLI) 설정.
-
IDE에 통합할 플러그인이 있는 경우 AWS이를 사용하여 IDE에서 실행 중인 애플리케이션에 자격 증명을 전달할 수 있습니다. 자세한 내용은 IntelliJ IDEA용AWS 툴킷
과 애플리케이션을 컴파일하거나 Eclipse를 실행하기 위한AWS 툴킷을 참조하세요.
Apache Flink 스트리밍 Java 코드 다운로드 및 검사
이 예제의 애플리케이션 코드는 GitHub에서 사용할 수 있습니다.
Java 애플리케이션 코드를 다운로드하려면
-
다음 명령을 사용하여 원격 리포지토리를 복제합니다.
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
./java/GettingStartedTable디렉터리로 이동합니다.
애플리케이션 구성 요소 검토
애플리케이션은 com.amazonaws.services.msf.BasicTableJob 클래스에 모두 구현되어 있습니다. main() 메서드는 소스, 변환, 싱크를 정의합니다. 이 메서드의 마지막에 있는 실행문에 의해 실행이 시작됩니다.
참고
최적의 개발자 경험을 위해 이 애플리케이션은 Amazon Managed Service for Apache Flink와 IDE의 로컬 개발 환경 모두에서 코드 변경 없이 실행되도록 설계되었습니다.
-
Amazon Managed Service for Apache Flink에서 실행할 때와 IDE에서 실행할 때 모두 동작하도록 런타임 구성을 읽기 위해 애플리케이션은 IDE에서 로컬로 독립형으로 실행되고 있는지를 자동으로 감지합니다. 이 경우 애플리케이션은 다음과 같이 런타임 구성을 다르게 로드합니다.
-
애플리케이션이 IDE에서 독립 실행 모드로 실행되고 있음을 감지하면, 프로젝트의 resources 폴더에 포함된
application_properties.json파일을 생성합니다. 파일 내용은 다음과 같습니다. -
애플리케이션이 Amazon Managed Service for Apache Flink에서 실행될 때는, 기본 동작으로 Amazon Managed Service for Apache Flink 애플리케이션에서 정의하게 될 런타임 속성에서 애플리케이션 구성을 로드합니다. Managed Service for Apache Flink 애플리케이션 생성 및 구성을(를) 참조하세요.
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
main()메서드는 애플리케이션 데이터 흐름을 정의하고 실행합니다.-
기본 스트리밍 환경을 초기화합니다. 이 예제에서는 DataStream API에서 사용할
StreamExecutionEnvironment와 SQL 및 Table API에서 사용할StreamTableEnvironment를 모두 생성하는 방법을 보여줍니다. 두 환경 객체는 동일한 런타임 환경에 대한 별도의 참조이며, 서로 다른 API를 사용하기 위한 것입니다.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build()); -
애플리케이션 구성 파라미터를 로드합니다. 이렇게 하면 애플리케이션이 실행되는 위치에 따라 올바른 위치에서 구성 파라미터를 자동으로 로드합니다.
Map<String, Properties> applicationParameters = loadApplicationProperties(env); -
애플리케이션이 Flink가 체크포인트
를 완료할 때 Amazon S3 출력 파일에 결과를 쓰는 데 사용하는 FileSystem 싱크 커넥터 입니다. 대상에 파일을 쓰려면 체크포인트를 활성화해야 합니다. 애플리케이션이 Amazon Managed Service for Apache Flink에서 실행 중인 경우 애플리케이션 구성이 체크포인트를 제어하며 기본적으로 활성화합니다. 반대로 로컬에서 실행할 때는 체크포인트가 기본적으로 비활성화됩니다. 애플리케이션은 로컬에서 실행되고 있음을 감지하고 5,000ms마다 체크포인팅하도록 구성합니다. if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); } -
이 애플리케이션은 실제 외부 소스에서 데이터를 수신하지 않습니다. DataGen 커넥터
를 통해 처리할 임의 데이터를 생성합니다. 이 커넥터는 DataStream API, SQL, Table API에서 사용할 수 있습니다. 애플리케이션은 API 간 통합을 보여주기 위해 더 큰 유연성을 제공하는 DataStram API 버전을 사용합니다. 각 레코드는 이 경우 StockPriceGeneratorFunction이라는 생성기 함수에 의해 생성되며 여기에서 사용자 지정 로직을 넣을 수 있습니다.DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class)); -
DataStream API에서는 레코드가 사용자 지정 클래스를 가질 수 있습니다. 클래스는 Flink가 이를 레코드로 사용할 수 있도록 특정 규칙을 따라야 합니다. 자세한 내용은 지원되는 데이터 형식
을 참조하세요. 이 예제에서 StockPrice클래스는 POJO입니다. -
그런 다음 소스가 실행 환경에 연결되어
StockPrice의DataStream을 생성합니다. 이 애플리케이션은 이벤트-시간 의미 체계를 사용하지 않으며 워터마크를 생성하지 않습니다. 애플리케이션 나머지 부분의 병렬 처리와는 관계없이 DataGenerator 소스를 병렬 처리 1로 실행합니다. DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1); -
데이터 처리 흐름에서 이어지는 동작은 Table API와 SQL을 사용하여 정의됩니다. 이를 위해 StockPrices의 DataStream을 테이블로 변환합니다. 테이블의 스키마는
StockPrice클래스로부터 자동으로 추론됩니다.Table stockPricesTable = tableEnv.fromDataStream(stockPrices); -
다음 코드 스니펫은 프로그래밍 방식의 Table API를 사용하여 뷰와 쿼리를 정의하는 방법을 보여줍니다.
Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable); -
결과를 JSON 파일로 Amazon S3 버킷에 쓰도록 싱크 테이블이 정의됩니다. 프로그래밍 방식으로 뷰를 정의하는 경우와의 차이를 설명하기 위해 Table API에서는 싱크 테이블을 SQL로 정의합니다.
tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")"); -
마지막 단계는 필터링된 주가 뷰를 싱크 테이블에 삽입하는
executeInsert()입니다. 이 메서드는 지금까지 정의한 데이터 흐름의 실행을 시작합니다.filteredStockPricesTable.executeInsert("s3_sink");
-
pom.xml 파일 사용
pom.xml 파일은 애플리케이션에서 필요한 모든 종속성을 정의하고, Flink에서 필요한 모든 종속성을 포함하는 fat-jar를 빌드하기 위해 Maven Shade 플러그인을 설정합니다.
-
일부 종속성은
provided범위를 가집니다. 이러한 종속성은 애플리케이션이 Amazon Managed Service for Apache Flink에서 실행될 때 자동으로 제공됩니다. 이는 애플리케이션에 필요하거나 IDE에서 로컬로 애플리케이션을 실행하는 데 필요합니다. 자세한 내용은 (TableAPI로 업데이트) 로컬에서 애플리케이션 실행 섹션을 참조하세요. Amazon Managed Service for Apache Flink에서 사용할 런타임과 동일한 Flink 버전을 사용해야 합니다. TableAPI 및 SQL을 사용하려면provided범위를 가진flink-table-planner-loader및flink-table-runtime-dependencies를 모두 포함해야 합니다.<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
Apache Flink 종속성을 pom에 기본 범위로 추가해야 합니다. 예를 들어 DataGen 커넥터
, FileSystem SQL 커넥터 , JSON 형식 등이 있습니다. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
-
로컬에서 실행할 때 Amazon S3에 쓰기 위해 S3 Hadoop 파일 시스템도
provided범위로 포함됩니다.<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> -
Maven Java Compiler 플러그인은 코드가 Apache Flink에서 현재 지원하는 JDK 버전인 Java 11로 컴파일되도록 보장합니다.
-
Maven Shade 플러그인은 fat-jar를 패키징하며 이때 런타임에서 제공하는 일부 라이브러리는 제외합니다. 또한
ServicesResourceTransformer및ManifestResourceTransformer라는 두 가지 트랜스포머를 지정합니다. 후자는 애플리케이션을 시작하기 위해main메서드를 포함하는 클래스를 구성합니다. 기본 클래스 이름을 변경하는 경우 이 트랜스포머도 업데이트해야 합니다. -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
로컬에서 애플리케이션 실행
IDE에서 Flink 애플리케이션을 로컬로 실행하고 디버깅할 수 있습니다.
참고
계속하기 전에 입력 스트림과 출력 스트림 사용이 가능한지 확인합니다. 2개의 Amazon Kinesis 데이터 스트림 생성을(를) 참조하세요. 또한 두 스트림에 대해 읽기 및 쓰기 권한이 있는지 확인합니다. AWS 세션 인증을(를) 참조하세요.
로컬 개발 환경을 설정하려면 Java 11 JDK, Apache Maven 및 Java 개발용 IDE가 필요합니다. 필수 사전 조건을 충족하는지 확인합니다. 연습 완료를 위한 필수 조건 충족을(를) 참조하세요.
Java 프로젝트를 IDE로 가져오기
IDE에서 애플리케이션 작업을 시작하려면 이를 Java 프로젝트로 가져와야 합니다.
복제한 리포지토리에는 여러 예제가 포함되어 있습니다. 각 예제는 별도의 프로젝트입니다. 이 자습서에서는 ./jave/GettingStartedTable 하위 디렉터리의 내용을 IDE로 가져옵니다.
코드를 Maven을 사용하는 기존 Java 프로젝트로 삽입합니다.
참고
새 Java 프로젝트를 가져오는 정확한 프로세스는 사용 중인 IDE에 따라 다릅니다.
로컬 애플리케이션 구성 수정
로컬에서 실행할 때 애플리케이션은 ./src/main/resources 경로 아래 프로젝트의 리소스 폴더에 있는 application_properties.json 파일의 구성을 사용합니다. 이 자습서 애플리케이션의 경우 구성 파라미터는 버킷의 이름과 데이터가 기록될 경로입니다.
구성을 편집하고 이 자습서의 시작 부분에서 생성한 버킷과 일치하도록 Amazon S3 버킷의 이름을 수정합니다.
[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "<bucket-name>", "path": "output" } } ]
참고
구성 속성 name에는 버킷 이름만 포함해야 하며, 예를 들어 my-bucket-name처럼 입력합니다. s3:// 같은 접두사나 후행 슬래시는 포함하지 않습니다.
경로를 수정하는 경우 앞뒤에 있는 슬래시는 생략합니다.
IDE 실행 구성 설정
IDE에서 com.amazonaws.services.msf.BasicTableJob 기본 클래스를 실행하여 Flink 애플리케이션을 Java 애플리케이션처럼 직접 실행하고 디버깅할 수 있습니다. 애플리케이션을 실행하기 전에 실행 구성을 설정해야 합니다. 설정은 사용 중인 IDE에 따라 다릅니다. 예를 들어 IntelliJ IDEA 설명서의 구성 요소 실행 및 디버깅
-
클래스 경로에
provided종속성을 추가합니다. 이는 로컬에서 실행할 때provided범위의 종속성이 애플리케이션에 전달되도록 하는 데 필요합니다. 이 설정이 없으면 애플리케이션은 즉시class not found오류를 표시합니다. -
자격 AWS 증명을 전달하여 Kinesis 스트림에 액세스합니다. 가장 빠른 방법은 AWS Toolkit for IntelliJ IDEA
를 사용하는 것입니다. 실행 구성에서이 IDE 플러그인을 사용하여 특정 AWS 프로필을 선택할 수 있습니다. AWS 인증은이 프로필을 사용하여 수행됩니다. 따라서 AWS 자격 증명을 직접 전달할 필요가 없습니다. -
IDE가 JDK 11을 사용하여 애플리케이션을 실행하는지 확인합니다.
IDE에서 애플리케이션 실행
BasicTableJob에 대한 실행 구성을 설정한 후에는 일반 Java 애플리케이션처럼 실행하거나 디버깅할 수 있습니다.
참고
Maven이 생성한 fat-jar는 명령줄에서 java -jar
...로 직접 실행할 수 없습니다. 이 jar에는 애플리케이션을 독립적으로 실행하는 데 필요한 Flink 코어 종속성이 포함되어 있지 않습니다.
애플리케이션이 성공적으로 시작되면 독립 실행형 미니클러스터 및 커넥터 초기화에 대한 일부 정보를 로그로 기록합니다. 그 이후에는 애플리케이션이 시작될 때 Flink가 일반적으로 출력하는 여러 INFO 로그와 일부 WARN 로그가 이어집니다.
21:28:34,982 INFO com.amazonaws.services.msf.BasicTableJob [] - Loading application properties from 'flink-application-properties-dev.json' 21:28:35,149 INFO com.amazonaws.services.msf.BasicTableJob [] - s3Path is ExampleBucket/my-output-bucket ...
초기화가 완료되면 애플리케이션은 더 이상 로그 항목을 출력하지 않습니다. 데이터가 흐르는 동안에는 로그가 출력되지 않습니다.
애플리케이션이 데이터를 올바르게 처리하고 있는지 확인하려면 다음 섹션에 설명된 대로 출력 버킷의 내용을 검사할 수 있습니다.
참고
데이터 흐름에 대한 로그를 출력하지 않는 것은 Flink 애플리케이션의 정상적인 동작입니다. 레코드마다 로그를 출력하면 디버깅에는 유용할 수 있지만 프로덕션 환경에서 실행할 때는 상당한 오버헤드를 초래할 수 있습니다.
애플리케이션이 S3 버킷에 데이터를 쓰는 동작 관찰
이 예제 애플리케이션은 내부적으로 임의 데이터를 생성해 구성한 대상 S3 버킷에 기록합니다. 기본 구성 경로를 수정하지 않은 경우 데이터는 output 경로에 작성되고 ./output/<yyyy-MM-dd>/<HH> 형식의 날짜와 시간 파티셔닝이 적용됩니다.
FileSystem 싱크 커넥터
if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
S3 버킷을 찾아보고 애플리케이션이 작성한 파일을 관찰하려면
-
https://console.aws.amazon.com/s3/
에서 S3 콘솔을 엽니다.
-
이전에 생성한 버킷을 선택합니다.
-
output경로로 이동한 다음 UTC 시간대의 현재 시각에 해당하는 날짜 및 시간 폴더로 이동합니다. -
5초마다 새 파일이 나타나는 것을 관찰할 수 있도록 주기적으로 새로 고칩니다.
-
파일 하나를 선택하고 다운로드하여 콘텐츠를 관찰합니다.
참고
파일에는 기본적으로 확장자가 없습니다. 콘텐츠는 JSON 형식으로 구성되어 있습니다. 파일은 어떤 텍스트 편집기로든 열어 그 내용을 확인할 수 있습니다.
로컬에서 실행 중인 애플리케이션 중지
IDE에서 실행 중인 애플리케이션을 중지합니다. IDE는 일반적으로 ‘중지’ 옵션을 제공합니다. 정확한 위치와 방법은 IDE에 따라 다릅니다.
애플리케이션 코드 컴파일 및 패키징
이 섹션에서는 Apache Maven을 사용하여 Java 코드를 컴파일하고 JAR 파일로 패키징합니다. Maven 명령줄 도구 또는 IDE를 사용하여 코드를 컴파일하고 패키징할 수 있습니다.
Maven 명령줄을 사용하여 컴파일 및 패키징하려면
Jave GettingStarted 프로젝트가 포함된 디렉터리로 이동하여 다음 명령을 실행합니다.
$ mvn package
IDE를 사용하여 컴파일 및 패키징하려면
IDE Maven 통합 기능을 사용하여 mvn package를 실행합니다.
두 경우 모두 JAR 파일 target/amazon-msf-java-table-app-1.0.jar이 생성됩니다.
참고
IDE에서 프로젝트 빌드를 실행해도 JAR 파일이 생성되지 않을 수 있습니다.
애플리케이션 코드 JAR 파일 업로드
이 섹션에서는 이전 섹션에서 생성한 JAR 파일을 자습서 시작 부분에서 생성한 Amazon S3 버킷에 업로드합니다. 아직 완료하지 않았으면 Amazon S3 버킷 생성을 완료합니다.
애플리케이션 코드를 업로드하려면
https://console.aws.amazon.com/s3/
에서 S3 콘솔을 엽니다. -
애플리케이션 코드를 위해 이전에 생성한 버킷을 선택합니다.
-
업로드 필드를 선택합니다.
-
Add Files를 선택합니다.
-
이전 섹션
target/amazon-msf-java-table-app-1.0.jar에서 생성한 JAR 파일로 이동합니다. -
다른 설정을 변경하지 않고 업로드를 선택합니다.
주의
<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar에서 반드시 올바른 JAR 파일을 선택해야 합니다.대상 디렉터리에는 업로드할 필요가 없는 다른 JAR 파일도 포함되어 있습니다.
Managed Service for Apache Flink 애플리케이션 생성 및 구성
콘솔이나 AWS CLI를 사용하여 Managed Service for Apache Flink 애플리케이션을 생성하고 구성할 수 있습니다. 이 자습서에서는 콘솔을 사용합니다.
참고
콘솔을 사용하여 애플리케이션을 생성하면 AWS Identity and Access Management (IAM) 및 Amazon CloudWatch Logs 리소스가 자동으로 생성됩니다. AWS CLI를 사용하여 애플리케이션을 생성할 때는 이러한 리소스를 별도로 만들어야 합니다.
애플리케이션 생성
에 로그인 AWS Management 콘솔하고 https://console.aws.amazon.com/flink Amazon MSF 콘솔을 엽니다.
-
미국 동부(버지니아 북부) us-east-1 리전이 올바르게 선택되었는지 확인합니다.
-
오른쪽 메뉴에서 Apache Flink 애플리케이션을 선택한 다음 스트리밍 애플리케이션 생성을 선택합니다. 또는 초기 페이지의 시작하기 섹션에서 스트리밍 애플리케이션 생성을 선택합니다.
-
스트리밍 애플리케이션 생성 페이지에서 다음을 완료합니다.
-
스트림 처리 애플리케이션을 설정하는 방법 선택에서 처음부터 생성을 선택합니다.
-
Apache Flink 구성의 애플리케이션 Flink 버전에서 Apache Flink 1.19를 선택합니다.
-
애플리케이션 구성 섹션에서 다음을 완료합니다.
-
애플리케이션 명칭에
MyApplication을 입력합니다. -
설명에
My Java Table API test app를 입력합니다. -
애플리케이션 리소스에 대한 액세스에서 필요한 정책이 포함된 IAM 역할 kinesis-analytics-MyApplication-us-east-1 생성 및 업데이트를 선택합니다.
-
-
애플리케이션 설정 템플릿에서 다음을 완료합니다.
-
템플릿에서 개발을 선택합니다.
-
-
-
스트리밍 애플리케이션 생성을 선택합니다.
참고
콘솔을 사용하여 Managed Service for Apache Flink 애플리케이션을 만들 때 내 애플리케이션에 대한 IAM 역할 및 정책을 둘 수 있는 옵션이 있습니다. 귀하의 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합니다. 이러한 IAM 리소스의 이름은 애플리케이션 명칭과 리전을 사용하여 다음과 같이 지정됩니다.
-
정책:
kinesis-analytics-service-MyApplication-us-east-1 -
역할:
kinesisanalytics-MyApplication-us-east-1
IAM 정책 편집
IAM 정책을 편집하여 Amazon S3 버킷에 액세스할 수 있는 권한을 추가합니다.
IAM 정책을 편집하여 S3 버킷 권한을 추가하려면
https://console.aws.amazon.com/iam/
에서 IAM 콘솔을 여세요. -
정책을 선택하세요. 이전 섹션에서 콘솔이 생성한
kinesis-analytics-service-MyApplication-us-east-1정책을 선택합니다. -
편집을 선택한 후 JSON 탭을 선택합니다.
-
다음 정책 예제의 강조 표시된 부분을 정책에 추가하세요. 샘플 계정 ID(
012345678901)는 사용자 계정 ID로,<bucket-name>은 생성한 S3 버킷 이름으로 각각 변경합니다. -
다음과 변경 사항 저장을 차례로 선택합니다.
애플리케이션 구성
애플리케이션을 편집하여 애플리케이션 코드 아티팩트를 설정합니다.
애플리케이션을 구성하려면
-
MyApplication 페이지에서 구성을 선택합니다.
-
애플리케이션 코드 위치 섹션에서 구성을 선택합니다.
-
Amazon S3 버킷의 경우 애플리케이션 코드를 위해 이전에 생성한 버킷을 선택합니다. 찾아보기를 선택하고 올바른 버킷을 선택한 다음 선택을 선택합니다. 버킷 이름을 클릭하지 마세요.
-
Amazon S3 객체 경로에는
amazon-msf-java-table-app-1.0.jar를 입력합니다.
-
-
액세스 권한에서 IAM 역할
kinesis-analytics-MyApplication-us-east-1생성/업데이트를 선택합니다. -
런타임 속성 섹션에서 다음 속성을 추가합니다.
-
새 항목 추가를 선택하고 다음 각 파라미터를 추가합니다.
그룹 ID Key(키) 값 bucketnameyour-bucket-namebucketpathoutput -
다른 설정은 수정하지 않습니다.
-
변경 사항 저장을 선택합니다.
참고
Amazon CloudWatch 로깅을 활성화하도록 선택하면 Managed Service for Apache Flink에서 로그 그룹 및 로그 스트림을 생성합니다. 이러한 리소스의 이름은 다음과 같습니다.
-
로그 그룹:
/aws/kinesis-analytics/MyApplication -
로그 스트림:
kinesis-analytics-log-stream
애플리케이션을 실행합니다
이제 애플리케이션이 구성이 완료되었으며 실행할 준비가 되었습니다.
애플리케이션을 실행하려면
-
Amazon Managed Service for Apache Flink 콘솔 페이지로 돌아가 MyApplication을 선택합니다.
-
실행을 선택하여 애플리케이션을 시작합니다.
-
애플리케이션 복원 구성에서 최신 스냅샷으로 실행을 선택합니다.
-
실행을 선택합니다.
애플리케이션 세부 정보의 상태는 애플리케이션이 시작된 후
Ready에서Starting으로 그리고Running으로 전환됩니다.
애플리케이션이 Running 상태일 때 Flink 대시보드를 열 수 있습니다.
대시보드를 열고 작업을 보려면
-
Apache Flink 대시보드 열기를 선택합니다. 대시보드가 새 페이지에서 열립니다.
-
실행 중인 작업 목록에서 보이는 단일 작업을 선택합니다.
참고
런타임 속성을 잘못 설정하거나 IAM 정책을 잘못 편집한 경우 애플리케이션 상태는
Running으로 변경될 수 있지만, Flink 대시보드에는 작업이 계속 재시작되는 것으로 표시됩니다. 이는 애플리케이션이 잘못 구성되었거나 외부 리소스에 액세스할 권한이 부족할 때 흔히 발생하는 장애 시나리오입니다.이러한 상황이 발생하면 Flink 대시보드의 예외 탭을 확인하여 문제의 원인을 조사합니다.
실행 중인 애플리케이션의 지표 관찰
MyApplication 페이지의 Amazon CloudWatch 지표 섹션을 보면 실행 중인 애플리케이션의 일부 기본 지표를 확인할 수 있습니다.
지표를 보려면
-
새로 고침 버튼 옆의 드롭다운 목록에서 10 seconds를 선택합니다.
-
애플리케이션이 실행 중이고 정상이면 가동 시간 지표가 계속 증가하는 것을 확인할 수 있습니다.
-
fullrestarts 지표는 0이어야 합니다. 값이 증가하고 있다면 구성에 문제가 있을 수 있습니다. Flink 대시보드의 예외 탭을 검토하여 문제를 조사합니다.
-
실패한 체크포인트 수 지표는 정상 애플리케이션에서는 0이어야 합니다.
참고
이 대시보드는 고정된 지표 세트를 5분 단위로 표시합니다. CloudWatch 대시보드에서 어떤 지표든 사용하여 사용자 지정 애플리케이션 대시보드를 생성할 수 있습니다.
애플리케이션이 대상 버킷에 데이터를 쓰는 동작 관찰
이제 Amazon Managed Service for Apache Flink에서 실행 중인 애플리케이션이 Amazon S3에 파일을 쓰는 것을 관찰할 수 있습니다.
파일을 관찰하려면 애플리케이션을 로컬에서 실행할 때 파일을 작성하는 것을 확인하기 위해 사용한 것과 동일한 프로세스를 따릅니다. 애플리케이션이 S3 버킷에 데이터를 쓰는 동작 관찰을(를) 참조하세요.
애플리케이션이 Flink 체크포인트 시 새 파일을 씁니다. Amazon Managed Service for Apache Flink에서 실행할 때 체크포인트는 기본적으로 활성화되며 60초마다 실행됩니다. 애플리케이션은 약 1분마다 새 파일을 생성합니다.
애플리케이션 중지
애플리케이션을 중지하려면 MyApplication이라는 Managed Service for Apache Flink 애플리케이션의 콘솔 페이지로 이동합니다.
애플리케이션을 중지하려면
-
작업 드롭다운 목록에서 정지를 선택합니다.
-
애플리케이션 세부 정보의 상태는 애플리케이션이 완전히 중지되면
Running에서Stopping으로 그리고Ready로 전환됩니다.참고
Python 스크립트 또는 Kinesis Data Generator에서 입력 스트림으로 데이터를 전송하는 것도 잊지 말고 중단합니다.