Amazon Managed Service for Apache Flink(Amazon MSF)는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려져 있었습니다.
Managed Service for Apache Flink Python 애플리케이션 생성 및 실행
이 섹션에서는 Kinesis 스트림을 소스 및 싱크로 사용하여 Managed Service for Apache Flink Python 애플리케이션을 생성합니다.
이 섹션은 다음 주제를 포함합니다.
종속 리소스 생성
이 연습을 위해 Managed Service for Apache Flink를 생성하기 전에 먼저 다음과 같은 종속 리소스를 생성해야 합니다.
-
입력 및 출력을 위한 Kinesis 스트림 2개.
-
애플리케이션 코드를 저장할 Amazon S3 버킷입니다.
참고
이 자습서에서는 애플리케이션을 us-east-1 리전에 배포한다고 가정합니다. 다른 리전을 사용하는 경우 모든 단계를 해당 리전에 맞게 조정해야 합니다.
2개의 Kinesis 스트림 생성
이 연습용 Managed Service for Apache Flink 애플리케이션을 생성하기 전에 애플리케이션을 배포하는 데 사용할 리전(이 예제에서는 us-east-1)과 동일한 리전에서 Kinesis 데이터 스트림 두 개(ExampleInputStream 및 ExampleOutputStream)를 생성합니다. 이 애플리케이션은 애플리케이션 소스 및 대상 스트림에 대해 이러한 스트림을 사용합니다.
Amazon Kinesis 콘솔 또는 다음 AWS CLI 명령을 사용하여 이러한 스트림을 만들 수 있습니다. 콘솔 지침은 Amazon Kinesis Data Streams 개발자 가이드의 데이터 스트림 생성 및 업데이트를 참조하세요.
데이터 스트림 (AWS CLI)을 생성하려면
-
첫 번째 스트림(
ExampleInputStream)을 생성하려면 다음 Amazon Kinesiscreate-streamAWS CLI 명령을 사용합니다.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 -
애플리케이션에서 출력을 쓰는 데 사용하는 두 번째 스트림을 생성하려면 동일한 명령을 실행하여 스트림 명칭을
ExampleOutputStream으로 변경합니다.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
Amazon S3 버킷 생성
콘솔을 사용하여 Amazon S3 버킷을 생성할 수 있습니다. 리소스 생성에 대한 지침은 다음 주제를 참조하세요.
-
Amazon Simple Storage Service 사용 설명서의 S3 버킷을 생성하려면 어떻게 해야 합니까? Amazon S3 버킷에는 전역적으로 고유한 명칭을 지정해야 하며, 예를 들어 로그인 명칭을 추가하는 방식이 있습니다.
참고
이 자습서에서 사용하는 리전(us-east-1)에 S3 버킷을 생성해야 합니다.
기타 리소스
애플리케이션을 생성할 때 Managed Service for Apache Flink는 다음과 같은 Amazon CloudWatch 리소스를 생성합니다(아직 존재하지 않는 경우).
-
/AWS/KinesisAnalytics-java/<my-application>라는 로그 그룹. -
kinesis-analytics-log-stream라는 로그 스트림.
로컬 개발 환경 설정
개발 및 디버깅을 위해 Python Flink 애플리케이션을 로컬 머신에서 실행할 수 있습니다. 애플리케이션은 명령줄에서 python
main.py를 사용해 실행하거나 원하는 Python IDE에서 실행할 수 있습니다.
참고
로컬 개발 머신에는 Python 3.10 또는 3.11, Java 11, Apache Maven, Git이 설치되어 있어야 합니다. PyCharm
PyFlink 라이브러리 설치
애플리케이션을 개발하고 로컬에서 실행하려면 Flink Python 라이브러리를 설치해야 합니다.
-
VirtualEnv, Conda 또는 유사한 Python 도구를 사용하여 독립 실행형 Python 환경을 생성합니다.
-
해당 환경에 PyFlink 라이브러리를 설치합니다. Amazon Managed Service for Apache Flink에서 사용할 Apache Flink 런타임 버전과 동일한 버전을 사용해야 합니다. 현재 권장 런타임은 1.19.1입니다.
$ pip install apache-flink==1.19.1 -
애플리케이션을 실행할 때 해당 환경이 활성 상태인지 확인합니다. IDE에서 애플리케이션을 실행하는 경우 IDE가 해당 환경을 런타임으로 사용하고 있는지 확인합니다. 프로세스는 사용 중인 IDE에 따라 다릅니다.
참고
PyFlink 라이브러리만 설치하면 됩니다. 로컬 머신에 Apache Flink 클러스터를 설치할 필요는 없습니다.
AWS 세션 인증
애플리케이션이 Kinesis 데이터 스트림을 사용하여 데이터를 게시합니다. 로컬에서 실행할 때는 Kinesis 데이터 스트림에 쓸 수 있는 권한이 포함된 유효한 AWS 인증 세션이 있어야 합니다. 다음 단계에 따라 세션을 인증합니다.
-
AWS CLI와 유효한 자격 증명이 구성된 명명된 프로필이 없는 경우 AWS Command Line Interface (AWS CLI) 설정 섹션을 참조하세요.
-
AWS CLI가 올바르게 구성되었고 사용자가 Kinesis 데이터 스트림에 기록할 권한이 있는지 확인하기 위해 다음 테스트 레코드를 게시합니다,
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST -
IDE가 AWS와 통합되는 플러그인을 지원하는 경우, 이를 사용하여 IDE에서 실행 중인 애플리케이션에 자격 증명을 전달할 수 있습니다. 자세한 내용은 PyCharm용 AWS 툴킷
, Visual Studio Code용 AWS 툴킷 , IntelliJ IDEA용 AWS 툴킷 을 참조하세요.
Apache Flink 스트리밍 Python 코드 다운로드 및 검사
이 예제에 대한 Python 애플리케이션 코드는 GitHub에서 받을 수 있습니다. 애플리케이션 코드를 다운로드하려면 다음을 수행하세요.
-
다음 명령을 사용하여 원격 리포지토리를 복제합니다.
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
./python/GettingStarted디렉터리로 이동합니다.
애플리케이션 구성 요소 검토
애플리케이션 코드는 main.py에 있습니다. 애플리케이션의 흐름은 Python 코드에 임베딩된 SQL로 정의합니다.
참고
최적화된 개발자 경험을 위해 이 애플리케이션은 Amazon Managed Service for Apache Flink와 사용자의 머신 개발 환경 모두에서 코드 변경 없이 실행되도록 설계되었습니다. 애플리케이션은 로컬에서 실행 중인지 감지하기 위해 환경 변수 IS_LOCAL =
true를 사용합니다. 환경 변수 IS_LOCAL = true는 쉘에서 설정하거나 IDE의 실행 구성에서 설정해야 합니다.
-
애플리케이션은 실행 환경을 설정하고 런타임 구성을 읽습니다. Amazon Managed Service for Apache Flink와 로컬 환경 모두에서 동작하도록 애플리케이션은
IS_LOCAL변수를 확인합니다.-
애플리케이션이 Amazon Managed Service for Apache Flink에서 실행될 때의 기본 동작은 다음과 같습니다.
-
애플리케이션과 함께 패키징된 종속성을 로드합니다. 자세한 내용은 (링크) 섹션을 참조하세요.
-
Amazon Managed Service for Apache Flink 애플리케이션에서 정의한 런타임 속성에서 구성을 로드합니다. 자세한 내용은 (링크) 섹션을 참조하세요.
-
-
로컬에서 애플리케이션을 실행할 때 애플리케이션이
IS_LOCAL = true를 감지하는 경우:-
프로젝트에서 외부 종속성을 로드합니다.
-
프로젝트에 포함된
application_properties.json파일에서 구성을 로드합니다.... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
애플리케이션은 Kinesis 커넥터
를 사용하여 CREATE TABLE문으로 소스 테이블을 정의합니다. 이 표는 입력 Kinesis 스트림에서 데이터를 읽습니다. 애플리케이션은 스트림 이름, 리전, 초기 위치는 런타임 구성에서 가져옵니다.table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """) -
애플리케이션은 이 예제에서 Kinesis 커넥터
를 사용하여 싱크 테이블도 정의합니다. 이 표는 출력 Kinesis 스트림으로 데이터를 전송합니다. table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""") -
마지막으로 애플리케이션은 소스 테이블에서 싱크 테이블로
INSERT INTO...를 실행하는 SQL을 실행합니다. 더 복잡한 애플리케이션에서는 싱크로 쓰기 전에 데이터를 변환하는 추가 단계가 있을 수 있습니다.table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""") -
로컬에서 애플리케이션을 실행하려면
main()함수 끝에 다른 단계를 추가해야 합니다.if is_local: table_result.wait()이 문이 없으면 로컬에서 실행할 때 애플리케이션이 즉시 종료됩니다. Amazon Managed Service for Apache Flink에서 애플리케이션을 실행할 때는 이 문을 실행해서는 안 됩니다.
JAR 종속성 관리
PyFlink 애플리케이션은 일반적으로 하나 이상의 커넥터가 필요합니다. 이 자습서의 애플리케이션은 Kinesis 커넥터
이 예제에서는 Apache Maven을 사용하여 종속성을 가져오고 Managed Service for Apache Flink에서 실행할 수 있도록 애플리케이션을 패키징하는 방법을 보여줍니다.
참고
종속성을 가져오고 패키징하는 방법에는 다른 방식도 있습니다. 이 예제는 하나 이상의 커넥터와 함께 올바르게 작동하는 방법을 보여줍니다. 또한 개발을 위해 로컬에서 그리고 Managed Service for Apache Flink에서 코드 변경 없이 애플리케이션을 실행할 수 있게 합니다.
pom.xml 파일 사용
Apache Maven은 pom.xml 파일을 사용하여 종속성 및 애플리케이션 패키징을 제어합니다.
모든 JAR 종속성은 pom.xml 파일의 <dependencies>...</dependencies> 블록에 지정됩니다.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
사용할 커넥터의 올바른 아티팩트와 버전을 찾으려면 Managed Service for Apache Flink에서 Apache Flink 커넥터 사용 섹션을 참조하세요. 사용 중인 Apache Flink 버전을 참조해야 합니다. 이 예제에서는 Kinesis 커넥터를 사용합니다. Apache Flink 1.19의 경우 커넥터 버전은 4.3.0-1.19입니다.
참고
Apache Flink 1.19를 사용하는 경우 이 버전에 대해 특별히 릴리스된 커넥터 버전이 없습니다. 1.18용으로 릴리스된 커넥터를 사용합니다.
종속성 다운로드 및 패키징
Maven을 사용하여 pom.xml 파일에 정의된 종속성을 다운로드하고 Python Flink 애플리케이션용으로 패키징합니다.
-
python/GettingStarted라는 Python 시작하기 프로젝트가 포함된 디렉터리로 이동합니다. -
다음 명령을 실행합니다.
$ mvn package
Maven은 ./target/pyflink-dependencies.jar라는 새 파일을 생성합니다. 로컬로 사용자의 머신에서 개발 시 Python 애플리케이션은 이 파일을 찾습니다.
참고
이 명령을 실행하지 않으면 애플리케이션을 실행할 때 다음 오류가 발생합니다. 식별자 "kinesis에 대한 팩토리를 찾을 수 없습니다.
샘플 레코드를 입력 스트림에 쓰기
이 섹션에서는 애플리케이션이 처리할 수 있도록 스트림에 샘플 레코드를 전송합니다. 샘플 데이터를 생성하는 방법은 Python 스크립트를 사용하거나 Kinesis Data Generator
Python 스크립트를 사용하여 샘플 데이터 생성
Python 스크립트를 사용하여 스트림으로 샘플 레코드를 전송할 수 있습니다.
참고
이 Python 스크립트를 실행하려면 Python 3.x를 사용해야 하며 AWS SDK for Python (Boto)
Kinesis 입력 스트림으로 테스트 데이터 전송을 시작하려면:
-
데이터 생성기
stock.pyPython 스크립트를 Data generator GitHub repository에서 다운로드합니다. -
stock.py스크립트를 실행합니다.$ python stock.py
자습서의 나머지 단계를 완료하는 동안 스크립트를 계속 실행한 상태로 유지합니다. 이제 Apache Flink 애플리케이션을 실행할 수 있습니다.
Kinesis Data Generator를 사용하여 샘플 데이터 생성
Python 스크립트를 사용하는 대신, 호스팅 버전
Kinesis Data Generator를 설정하고 실행하려면:
-
Kinesis Data Generator 설명서
의 지침에 따라 도구에 대한 액세스를 설정합니다. 사용자와 암호를 설정하는 CloudFormation 템플릿을 실행합니다. -
CloudFormation 템플릿에서 생성된 URL을 통해 Kinesis Data Generator에 액세스합니다. CloudFormation 템플릿이 완료되면 출력 탭에서 URL을 확인할 수 있습니다.
-
데이터 생성기를 구성합니다.
-
리전: 이 자습서에서 사용 중인 리전(us-east-1)을 선택합니다.
-
스트림/전송 스트림: 애플리케이션이 사용할 입력 스트림을 선택합니다. 예:
ExampleInputStream -
초당 레코드 수: 100
-
레코드 템플릿: 다음 템플릿을 복사하여 붙여 넣습니다.
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
템플릿 테스트: 템플릿 테스트를 선택하고 생성된 레코드가 다음과 유사한지 확인합니다.
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 } -
데이터 생성기 시작: 데이터 전송 선택을 선택합니다.
Kinesis Data Generator가 이제 ExampleInputStream으로 데이터를 전송하고 있습니다.
로컬에서 애플리케이션 실행
명령줄에서 python main.py로 실행하거나 IDE에서 실행하여 로컬에서 애플리케이션을 테스트할 수 있습니다.
애플리케이션을 로컬에서 실행하려면 이전 섹션에 설명된 대로 올바른 버전의 PyFlink 라이브러리가 설치되어 있어야 합니다. 자세한 내용은 (링크) 섹션을 참조하세요.
참고
계속하기 전에 입력 스트림과 출력 스트림 사용이 가능한지 확인합니다. 2개의 Amazon Kinesis Data Streams 생성을(를) 참조하세요. 또한 두 스트림에 대해 읽기 및 쓰기 권한이 있는지 확인합니다. AWS 세션 인증을(를) 참조하세요.
Python 프로젝트를 IDE로 가져오기
IDE에서 애플리케이션 작업을 시작하려면 Python 프로젝트로 가져와야 합니다.
복제한 리포지토리에는 여러 예제가 포함되어 있습니다. 각 예제는 별도의 프로젝트입니다. 이 자습서에서는 ./python/GettingStarted 하위 디렉터리의 내용을 IDE로 가져옵니다.
기존 Python 프로젝트로 코드를 가져옵니다.
참고
새 Python 프로젝트를 가져오는 정확한 프로세스는 사용 중인 IDE에 따라 다릅니다.
로컬 애플리케이션 구성 확인
로컬에서 실행할 때 애플리케이션은 ./src/main/resources 경로 아래 프로젝트의 리소스 폴더에 있는 application_properties.json 파일의 구성을 사용합니다. Kinesis 스트림 이름이나 리전을 변경하려면 이 파일을 편집할 수 있습니다.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Python 애플리케이션을 로컬에서 실행
애플리케이션은 일반적인 Python 스크립트처럼 명령줄에서 실행하거나 IDE에서 실행해 로컬에서 구동할 수 있습니다.
명령줄에서 애플리케이션을 실행하려면
-
Python Flink 라이브러리를 설치한 Conda 또는 VirtualEnv와 같은 독립 실행형 Python 환경이 현재 활성화되어 있는지 확인합니다.
-
mvn package를 최소 한 번 이상 실행했는지 확인합니다. -
IS_LOCAL = true환경 변수를 설정합니다.$ export IS_LOCAL=true -
애플리케이션을 일반 Python 스크립트로 실행합니다.
$python main.py
IDE에서 애플리케이션을 실행하려면
-
IDE에서
main.py스크립트를 다음 구성으로 실행하도록 설정합니다.-
PyFlink 라이브러리를 설치한 Conda 또는 VirtualEnv와 같은 독립 실행형 Python 환경을 사용합니다.
-
입력 및 출력 Kinesis 데이터 스트림에 액세스하기 위해 AWS 자격 증명을 사용합니다.
-
IS_LOCAL = true을 설정합니다.
-
-
실행 구성을 설정하는 정확한 프로세스는 사용 중인 IDE에 따라 다릅니다.
-
IDE 설정이 완료되면 Python 스크립트를 실행하고 애플리케이션이 실행되는 동안 IDE에서 제공하는 도구를 사용합니다.
로컬에서 애플리케이션 로그 검사
로컬에서 실행 중인 경우 애플리케이션은 시작 시 출력되는 몇 줄을 제외하고 콘솔에 로그를 표시하지 않습니다. PyFlink는 로그를 Python Flink 라이브러리가 설치된 디렉터리의 파일에 기록합니다. 애플리케이션은 시작 시 로그 위치를 출력합니다. 다음 명령을 실행하여 로그 위치를 확인할 수도 있습니다.
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
로깅 디렉터리의 파일을 나열합니다. 일반적으로 하나의
.log파일이 있습니다. -
애플리케이션이 실행되는 동안 다음 명령으로 파일을 실시간으로 확인합니다.
tail -f <log-path>/<log-file>.log
Kinesis 스트림의 입력 및 출력 데이터 관찰
Python(샘플 데이터를 생성하는 Python) 또는 Kinesis Data Generator(링크)을 사용하여 입력 스트림으로 전송된 레코드를 Amazon Kinesis 콘솔의 데이터 뷰어에서 관찰할 수 있습니다.
레코드를 관찰하려면:
로컬에서 실행 중인 애플리케이션 중지
IDE에서 실행 중인 애플리케이션을 중지합니다. IDE는 일반적으로 ‘중지’ 옵션을 제공합니다. 정확한 위치와 방법은 IDE에 따라 다릅니다.
애플리케이션 코드 패키징
이 섹션에서는 Apache Maven을 사용하여 애플리케이션 코드와 필요한 모든 종속성을 .zip 파일로 패키징합니다.
Maven 패키지 명령을 다시 실행합니다.
$ mvn package
이 명령은 target/managed-flink-pyflink-getting-started-1.0.0.zip 파일을 생성합니다.
애플리케이션 패키지를 Amazon S3 버킷에 업로드합니다.
이 섹션에서는 이전 섹션에서 생성한 .zip 파일을 자습서 시작 부분에서 생성한 Amazon Simple Storage Service(Amazon S3) 버킷에 업로드합니다. 이 단계를 아직 완료하지 않았으면 (링크) 섹션을 참조하세요.
애플리케이션 코드 JAR 파일을 업로드하려면
https://console.aws.amazon.com/s3/
에서 S3 콘솔을 엽니다. -
애플리케이션 코드를 위해 이전에 생성한 버킷을 선택합니다.
-
업로드를 선택합니다.
-
[Add Files]를 선택합니다.
-
이전 단계
target/managed-flink-pyflink-getting-started-1.0.0.zip에서 생성한 .zip 파일로 이동합니다. -
다른 설정을 변경하지 않고 업로드를 선택합니다.
Managed Service for Apache Flink 애플리케이션 생성 및 구성
콘솔이나 AWS CLI를 사용하여 Managed Service for Apache Flink 애플리케이션을 생성하고 구성할 수 있습니다. 이 자습서에서는 콘솔을 사용합니다.
애플리케이션 생성
AWS Management 콘솔에 로그인한 후 https://console.aws.amazon.com/flink에서 Amazon MSF 콘솔을 엽니다.
-
미국 동부(버지니아 북부) us-east-1 리전이 올바르게 선택되었는지 확인합니다.
-
오른쪽의 메뉴를 열고 Apache Flink 애플리케이션을 선택한 다음 스트리밍 애플리케이션 생성을 선택합니다. 또는 초기 페이지의 시작하기 섹션에서 스트리밍 애플리케이션 생성을 선택합니다.
-
스트리밍 애플리케이션 생성 페이지에서:
-
스트림 처리 애플리케이션을 설정하는 방법 선택에서 처음부터 생성을 선택합니다.
-
Apache Flink 구성의 애플리케이션 Flink 버전에서 Apache Flink 1.19를 선택합니다.
-
애플리케이션 구성의 경우:
-
애플리케이션 명칭에
MyApplication을 입력합니다. -
설명에
My Python test app를 입력합니다. -
애플리케이션 리소스에 대한 액세스에서 필요한 정책이 포함된 IAM 역할 kinesis-analytics-MyApplication-us-east-1 생성 및 업데이트를 선택합니다.
-
-
애플리케이션 설정 템플릿의 경우:
-
템플릿에서 개발을 선택합니다.
-
-
스트리밍 애플리케이션 생성을 선택합니다.
-
참고
콘솔을 사용하여 Managed Service for Apache Flink 애플리케이션을 만들 때 내 애플리케이션에 대한 IAM 역할 및 정책을 둘 수 있는 옵션이 있습니다. 귀하의 애플리케이션은 이 역할 및 정책을 사용하여 종속 리소스에 액세스합니다. 이러한 IAM 리소스의 이름은 애플리케이션 명칭과 리전을 사용하여 다음과 같이 지정됩니다.
-
정책:
kinesis-analytics-service-MyApplication-us-west-2 -
역할:
kinesisanalytics-MyApplication-us-west-2
Amazon Managed Service for Apache Flink는 이전에 Kinesis Data Analytics로 알려졌습니다. 자동으로 생성되는 리소스의 이름은 이전 버전과의 호환성을 위해 kinesis-analytics 접두사가 붙습니다.
IAM 정책 편집
IAM 정책을 편집하여 Amazon S3 버킷에 액세스할 수 있는 권한을 추가합니다.
IAM 정책을 편집하여 S3 버킷 권한을 추가하려면
https://console.aws.amazon.com/iam/
에서 IAM 콘솔을 여세요. -
정책을 선택하세요. 이전 섹션에서 콘솔이 생성한
kinesis-analytics-service-MyApplication-us-east-1정책을 선택합니다. -
편집을 선택한 후 JSON 탭을 선택합니다.
-
다음 정책 예제의 강조 표시된 부분을 정책에 추가하세요. 샘플 계정 ID(
012345678901)를 내 계정 ID로 바꿉니다. -
다음과 변경 사항 저장을 차례로 선택합니다.
애플리케이션 구성
애플리케이션 구성을 편집하여 애플리케이션 코드 아티팩트를 설정합니다.
애플리케이션을 구성하려면
-
MyApplication 페이지에서 구성을 선택합니다.
-
애플리케이션 코드 위치 섹션에서 다음을 수행합니다.
-
Amazon S3 버킷의 경우 애플리케이션 코드를 위해 이전에 생성한 버킷을 선택합니다. 찾아보기를 선택하고 올바른 버킷을 선택한 다음 선택을 선택합니다. 버킷 이름을 선택하지 않습니다.
-
Amazon S3 객체 경로에는
managed-flink-pyflink-getting-started-1.0.0.zip를 입력합니다.
-
-
액세스 권한에서 IAM 역할
kinesis-analytics-MyApplication-us-east-1생성 및 업데이트를 선택합니다. -
런타임 속성으로 이동한 뒤 다른 모든 설정은 기본값으로 유지합니다.
-
새 항목 추가를 선택하고 다음 각 파라미터를 추가합니다.
그룹 ID 키 값 InputStream0stream.nameExampleInputStreamInputStream0flink.stream.initposLATESTInputStream0aws.regionus-east-1OutputStream0stream.nameExampleOutputStreamOutputStream0aws.regionus-east-1kinesis.analytics.flink.run.optionspythonmain.pykinesis.analytics.flink.run.optionsjarfilelib/pyflink-dependencies.jar -
다른 섹션을 어떠한 것도 수정하지 말고 변경 사항 저장을 선택합니다.
참고
Amazon CloudWatch 로깅을 활성화하도록 선택하면 Managed Service for Apache Flink에서 로그 그룹 및 로그 스트림을 생성합니다. 이러한 리소스의 이름은 다음과 같습니다.
-
로그 그룹:
/aws/kinesis-analytics/MyApplication -
로그 스트림:
kinesis-analytics-log-stream
애플리케이션을 실행합니다
이제 애플리케이션이 구성이 완료되었으며 실행할 준비가 되었습니다.
애플리케이션을 실행하려면
-
Amazon Managed Service for Apache Flink 콘솔에서 내 애플리케이션을 선택하고 실행을 선택합니다.
-
다음 페이지의 애플리케이션 복원 구성 페이지에서 최신 스냅샷으로 실행을 선택한 다음 실행을 선택합니다.
애플리케이션 세부 정보의 상태는 애플리케이션이 시작되면
Ready에서Starting으로 그리고Running으로 전환됩니다.
애플리케이션이 Running 상태일 때 Flink 대시보드를 열 수 있습니다.
대시보드 열기
-
Apache Flink 대시보드 열기를 선택합니다. 대시보드가 새 페이지에서 열립니다.
-
실행 중 작업 목록에서 보이는 단일 작업을 선택합니다.
참고
런타임 속성을 잘못 설정하거나 IAM 정책을 잘못 편집한 경우 애플리케이션 상태는
Running으로 변경될 수 있지만, Flink 대시보드에는 작업이 계속 재시작되는 것으로 표시됩니다. 이는 애플리케이션이 잘못 구성되었거나 외부 리소스에 액세스할 권한이 부족할 때 흔히 발생하는 장애 시나리오입니다.이러한 상황이 발생하면 Flink 대시보드의 예외 탭을 확인하여 문제의 원인을 확인합니다.
실행 중인 애플리케이션의 지표 관찰
MyApplication 페이지의 Amazon CloudWatch 지표 섹션을 보면 실행 중인 애플리케이션의 일부 기본 지표를 확인할 수 있습니다.
지표를 보려면
-
새로 고침 버튼 옆의 드롭다운 목록에서 10 seconds를 선택합니다.
-
애플리케이션이 실행 중이고 정상이면 가동 시간 지표가 계속 증가하는 것을 확인할 수 있습니다.
-
fullrestarts 지표는 0이어야 합니다. 값이 증가하고 있다면 구성에 문제가 있을 수 있습니다. 문제를 조사하려면 Flink 대시보드의 예외 탭을 검토합니다.
-
실패한 체크포인트 수 지표는 정상 애플리케이션에서는 0이어야 합니다.
참고
이 대시보드는 고정된 지표 세트를 5분 단위로 표시합니다. CloudWatch 대시보드에서 어떤 지표든 사용하여 사용자 지정 애플리케이션 대시보드를 생성할 수 있습니다.
Kinesis 스트림의 출력 데이터 관찰
Python 스크립트 또는 Kinesis Data Generator를 사용하여 입력 스트림에 데이터를 계속 게시하고 있는지 확인합니다.
이제 앞서 수행한 것처럼 https://console.aws.amazon.com/kinesis/
출력을 보려면
https://console.aws.amazon.com/kinesis
에서 Kinesis 콘솔을 엽니다. -
이 자습서를 실행하는 데 사용하는 리전과 동일한 리전이 선택되어 있는지 확인합니다. 기본값은 us-east-1 미국 동부(버지니아 북부)입니다. 필요시 리전을 변경합니다.
-
데이터 스트림을 선택합니다.
-
관찰하려는 스트림을 선택합니다. 본 자습서에서는
ExampleOutputStream를 사용합니다. -
데이터 뷰어 탭을 선택합니다.
-
샤드를 선택하고 시작 위치를 최신으로 유지한 다음 레코드 가져오기를 선택합니다. ‘이 요청에 대한 레코드를 찾을 수 없음’ 오류가 표시될 수 있습니다. 그렇다면 레코드 가져오기 재시도를 선택합니다. 스트림에 게시된 최신 레코드가 표시됩니다.
-
데이터 열의 값을 선택하여 레코드 내용을 JSON 형식으로 확인합니다.
애플리케이션 중지
애플리케이션을 중지하려면 MyApplication이라는 Managed Service for Apache Flink 애플리케이션의 콘솔 페이지로 이동합니다.
애플리케이션을 중지하려면
-
작업 드롭다운 목록에서 정지를 선택합니다.
-
애플리케이션 세부 정보의 상태는 애플리케이션이 완전히 중지되면
Running에서Stopping으로 그리고Ready로 전환됩니다.참고
Python 스크립트 또는 Kinesis Data Generator에서 입력 스트림으로 데이터를 전송하는 것도 잊지 말고 중단합니다.