

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Managed Service for Apache Flink: 작동 방식
<a name="how-it-works"></a>

Managed Service for Apache Flink는 Apache Flink 애플리케이션을 사용하여 스트리밍 데이터를 처리할 수 있는 완전 관리형 Amazon 서비스입니다. 먼저 Apache Flink 애플리케이션을 프로그래밍한 다음 Managed Service for Apache Flink 애플리케이션을 생성합니다.

## Apache Flink 애플리케이션 프로그래밍
<a name="how-it-works-programming"></a>

Apache Flink 애플리케이션은 Apache Flink 프레임워크를 사용하여 만든 Java 또는 Scala 애플리케이션입니다. 로컬에서 Apache Flink 애플리케이션을 작성하고 빌드합니다.

[애플리케이션은 주로 [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html) 또는 Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/)를 사용합니다. 다른 Apache Flink API도 사용할 수 있지만 스트리밍 애플리케이션을 빌드하는 데는 많이 사용되지 않습니다.

두 API의 특징은 다음과 같습니다.

### DataStream API
<a name="how-it-works-prog-datastream"></a>

Apache Flink DataStream API 프로그래밍 모델은 두 가지 구성 요소를 기반으로 합니다.
+ **데이터 스트림:** 연속적인 데이터 레코드 흐름을 구조적으로 표현한 것입니다.
+ **변환 연산자:** 하나 이상의 데이터 스트림을 입력으로 받아 하나 이상의 데이터 스트림을 출력으로 생성합니다.

DataStream API로 만든 애플리케이션은 다음을 수행합니다.
+ 데이터 소스(예: Kinesis 스트림 또는 Amazon MSK 항목)에서 데이터를 읽습니다.
+ 필터링, 집계 또는 보강과 같은 변환을 데이터에 적용합니다.
+ 변환된 데이터를 Data Sink에 씁니다.

DataStream API를 사용하는 애플리케이션은 Java 또는 Scala로 작성할 수 있으며 Kinesis 데이터 스트림, Amazon MSK 항목 또는 사용자 지정 소스에서 읽을 수 있습니다.

애플리케이션은 *커넥터*를 사용하여 데이터를 처리합니다. Apache Flink는 다음 유형의 커넥터를 사용합니다.
+ **소스**: 외부 데이터를 읽는 데 사용되는 커넥터입니다.
+ **싱크**: 외부 위치에 쓰는 데 사용되는 커넥터입니다.
+ **오퍼레이터**: 애플리케이션 내에서 데이터를 처리하는 데 사용되는 커넥터입니다.

일반적인 애플리케이션은 소스가 있는 하나 이상의 데이터 스트림, 하나 이상의 연산자가 있는 데이터 스트림, 하나 이상의 데이터 싱크로 구성됩니다.

DataStream API 사용에 대한 자세한 내용은 [DataStream API 구성 요소 검토](how-datastream.md) 섹션을 참조하세요.

### 표 API
<a name="how-it-works-prog-table"></a>

Apache Flink Table API 프로그래밍 모델은 다음 구성 요소를 기반으로 합니다.
+ **테이블 환경:** 하나 이상의 테이블을 만들고 호스팅하는 데 사용하는 기본 데이터에 대한 인터페이스입니다.
+ **테이블:** SQL 테이블 또는 뷰에 대한 액세스를 제공하는 객체입니다.
+ **테이블 소스:** Amazon MSK 항목과 같은 외부 소스에서 데이터를 읽는 데 사용됩니다.
+ **테이블 함수:** 데이터를 변환하는 데 사용되는 SQL 쿼리 또는 API 직접 호출입니다.
+ **테이블 싱크:** Amazon S3 버킷과 같은 외부 위치에 데이터를 쓰는 데 사용됩니다.

Table API로 생성한 애플리케이션은 다음을 수행합니다.
+ `Table Source`에 연결하여 `TableEnvironment`을(를) 생성합니다.
+ SQL 쿼리 또는 Table API 함수를 사용하여 `TableEnvironment`에 테이블을 생성합니다.
+ 테이블 API 또는 SQL을 사용하여 테이블에서 쿼리를 실행
+ 테이블 함수 또는 SQL 쿼리를 사용하여 쿼리 결과에 변환을 적용합니다.
+ 쿼리 또는 함수 결과를 `Table Sink`에 씁니다.

Table API를 사용하는 애플리케이션은 Java 또는 Scala로 작성할 수 있으며, API 직접 호출 또는 SQL 쿼리를 사용하여 데이터를 쿼리할 수 있습니다.

Table API 사용에 대한 자세한 내용은 [테이블 API 구성 요소 검토](how-table.md) 섹션을 참조하세요.

## Managed Service for Apache Flink 애플리케이션 생성
<a name="how-it-works-app"></a>

Managed Service for Apache Flink는 Apache Flink 애플리케이션을 호스팅하기 위한 환경을 생성하고 다음 설정을 제공하는 AWS 서비스입니다.
+ **[런타임 속성 사용](how-properties.md):** 애플리케이션에 제공할 수 있는 파라미터. 애플리케이션 코드를 다시 컴파일하지 않고도 이러한 파라미터를 변경할 수 있습니다.
+ **[내결함성 구현](how-fault.md)**: 애플리케이션이 인터럽트 및 재시작으로부터 복구되는 방법
+ **[Amazon Managed Service for Apache Flink의 로깅 및 모니터링](monitoring-overview.md)**: 애플리케이션에서 CloudWatch Logs에 이벤트를 로깅하는 방법 
+ **[애플리케이션 규모 조정 구현](how-scaling.md)**: 애플리케이션이 컴퓨팅 리소스를 프로비저닝하는 방법.

콘솔이나 AWS CLI를 사용하여 Managed Service for Apache Flink 애플리케이션을 생성합니다. Managed Service for Apache Flink 애플리케이션 생성을 시작하려면 [자습서: Managed Service for Apache Flink에서 DataStream API 사용 시작하기](getting-started.md) 을 참조하세요.

# Managed Service for Apache Flink 애플리케이션 생성
<a name="how-creating-apps"></a>

이 주제에는 Managed Service for Apache Flink 애플리케이션을 생성하는 방법에 관한 자세한 내용이 포함되어 있습니다.

**Topics**
+ [Managed Service for Apache Flink 애플리케이션 코드 빌드](#how-creating-apps-building)
+ [Managed Service for Apache Flink 애플리케이션 생성](#how-creating-apps-creating)
+ [고객 관리형 키 사용](#how-creating-apps-use-cmk)
+ [Managed Service for Apache Flink 애플리케이션 시작](#how-creating-apps-starting)
+ [Managed Service for Apache Flink 애플리케이션 확인](#how-creating-apps-verifying)
+ [Managed Service for Apache Flink 애플리케이션의 시스템 롤백 활성화](how-system-rollbacks.md)

## Managed Service for Apache Flink 애플리케이션 코드 빌드
<a name="how-creating-apps-building"></a>

이 섹션에서는 Managed Service for Apache Flink 애플리케이션의 애플리케이션 코드를 빌드하는 데 사용하는 구성 요소에 대해 설명합니다.

지원되는 최신 버전의 Apache Flink를 애플리케이션 코드에 사용하는 것이 좋습니다. Managed Service for Apache Flink 애플리케이션 업그레이드에 대한 자세한 내용을 알아보려면 [Apache Flink에 인플레이스 버전 업그레이드 사용](how-in-place-version-upgrades.md) 섹션을 참조하십시오.

[Apache Maven](https://maven.apache.org/)을 사용하여 애플리케이션 코드를 구축합니다. Apache Maven 프로젝트는 `pom.xml` 파일을 사용하여 해당 프로젝트에서 사용하는 구성 요소의 버전을 지정합니다.

**참고**  
Managed Service for Apache Flink는 최대 512MB 크기의 JAR 파일을 지원합니다. 이보다 큰 JAR 파일을 사용하면 애플리케이션이 시작되지 않습니다.

이제 애플리케이션은 모든 Scala 버전에서 Java API를 사용할 수 있습니다. 선택한 Scala 표준 라이브러리를 Scala 애플리케이션에 번들로 제공해야 합니다.

**Apache Beam**을 사용하는 Managed Service for Apache Flink 애플리케이션을 만드는 방법에 대한 자세한 내용을 알아보려면 [Managed Service for Apache Flink 애플리케이션에서 Apache Beam을 사용합니다.](how-creating-apps-beam.md) 섹션을 참조하십시오.

### 애플리케이션의 Apache Flink 버전 지정
<a name="how-creating-apps-building-flink"></a>

Managed Service for Apache Flink 런타임 버전 1.1.0 이상을 사용하는 경우 애플리케이션을 컴파일할 때 애플리케이션에서 사용하는 Apache Flink 버전을 지정합니다. `-Dflink.version` 파라미터와 함께 Apache Flink 버전을 제공합니다. 예를 들어 Apache Flink 2.2.0을 사용하는 경우 다음을 제공합니다.

```
mvn package -Dflink.version=2.2.0
```

이전 버전의 Apache Flink를 사용하여 애플리케이션을 빌드하려면 [이전 버전](earlier.md) 섹션을 참조하세요.

## Managed Service for Apache Flink 애플리케이션 생성
<a name="how-creating-apps-creating"></a>

애플리케이션 코드를 구축한 후에는 다음을 수행하여 Managed Service for Apache Flink(Amazon MSF) 애플리케이션을 생성합니다.
+ **애플리케이션 코드 업로드**: Amazon S3 버킷에 애플리케이션 코드를 업로드합니다. 애플리케이션 코드의 S3 버킷 명칭과 객체 명칭은 애플리케이션을 생성할 때 지정합니다. 애플리케이션 코드를 업로드하는 방법을 보여주는 자습서는 [자습서: Managed Service for Apache Flink에서 DataStream API 사용 시작하기](getting-started.md) 자습서를 참조하세요.
+ **Managed Service for Apache Flink 애플리케이션 만들기**: 다음 방법 중 하나를 사용하여 Amazon MSF 애플리케이션을 생성합니다.
**참고**  
Amazon MSF는 기본적으로를 사용하여 애플리케이션을 암호화합니다 AWS 소유 키. AWS KMS 고객 관리형 키(CMKs)를 사용하여 새 애플리케이션을 생성하여 직접 키를 생성, 소유 및 관리할 수도 있습니다. CMK에 관한 자세한 내용은 [Amazon Managed Service for Apache Flink의 키 관리](key-management-flink.md) 섹션을 참조하세요.
  + ** AWS 콘솔을 사용하여 Amazon MSF 애플리케이션 생성:** AWS 콘솔을 사용하여 애플리케이션을 생성하고 구성할 수 있습니다.

    콘솔을 사용하여 애플리케이션을 생성하면 애플리케이션의 종속 리소스(예: CloudWatch Logs 스트림, IAM 역할, IAM 정책)가 자동으로 생성됩니다.

    콘솔을 사용하여 애플리케이션을 생성할 때는 **Managed Service for Apache Flink - 애플리케이션 생성** 페이지의 풀다운에서 애플리케이션을 선택하여 애플리케이션에서 사용하는 Apache Flink 버전을 지정합니다.

    콘솔을 사용하여 애플리케이션을 생성하는 방법에 대한 자습서는 [자습서: Managed Service for Apache Flink에서 DataStream API 사용 시작하기](getting-started.md) 자습서를 참조하세요.
  + ** AWS CLI를 사용하여 Amazon MSF 애플리케이션 생성:** AWS CLI를 사용하여 애플리케이션을 생성하고 구성할 수 있습니다.

    CLI를 사용하여 애플리케이션을 생성할 때는 애플리케이션의 종속 리소스(예: CloudWatch Logs 스트림, IAM 역할, IAM 정책)도 수동으로 생성해야 합니다.

    CLI를 사용하여 애플리케이션을 만들 때는 `CreateApplication` 작업의 `RuntimeEnvironment` 파라미터를 사용하여 애플리케이션에서 사용하는 Apache Flink 버전을 지정합니다.
**참고**  
기존 애플리케이션의 `RuntimeEnvironment`를 변경할 수 있습니다. 자세한 방법은 [Apache Flink에 인플레이스 버전 업그레이드 사용](how-in-place-version-upgrades.md)을 참조하세요.

## 고객 관리형 키 사용
<a name="how-creating-apps-use-cmk"></a>

Amazon MSF에서 고객 관리형 키(CMKs)는 사용자가 생성, 소유 및 관리하는 키 AWS Key Management Service ()로 애플리케이션의 데이터를 암호화할 수 있는 기능입니다AWS KMS. Amazon MSF 애플리케이션에서는 Flink [체크포인트](how-fault.md) 또는 [스냅샷](how-snapshots.md) 대상이 되는 모든 데이터가 사용자가 해당 애플리케이션용으로 정의한 CMK로 암호화된다는 의미입니다.

애플리케이션에서 CMK를 사용하려면 먼저 [새 애플리케이션을 생성](#how-creating-apps-creating)한 다음 CMK를 적용해야 합니다. CMK 사용에 관한 자세한 내용은 [Amazon Managed Service for Apache Flink의 키 관리](key-management-flink.md) 섹션을 참조하세요.

## Managed Service for Apache Flink 애플리케이션 시작
<a name="how-creating-apps-starting"></a>

애플리케이션 코드를 구축하고, S3에 업로드하고, Managed Service for Apache Flink 애플리케이션을 생성한 후 애플리케이션을 시작합니다. Managed Service for Apache Flink 애플리케이션을 시작하는 데 일반적으로 몇 분이 걸립니다.

애플리케이션을 시작하려면 다음 방법 중 하나를 사용합니다.
+ ** AWS 콘솔을 사용하여 Managed Service for Apache Flink 애플리케이션을 시작합니다.** AWS 콘솔의 애플리케이션 페이지에서 **실행**을 선택하여 애플리케이션을 실행할 수 있습니다.
+ ** AWS API를 사용하여 Managed Service for Apache Flink 애플리케이션을 시작합니다.** [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 작업을 사용하여 애플리케이션을 실행할 수 있습니다.

## Managed Service for Apache Flink 애플리케이션 확인
<a name="how-creating-apps-verifying"></a>

다음과 같은 방법으로 애플리케이션이 작동하는지 확인할 수 있습니다.
+ **CloudWatch Logs 사용:** CloudWatch Logs 및 CloudWatch Logs Insights를 사용하여 애플리케이션이 제대로 실행되고 있는지 확인할 수 있습니다. Managed Service for Apache Flink 애플리케이션과 함께 CloudWatch Logs를 사용하는 방법에 대한 자세한 내용을 알아보려면 [Amazon Managed Service for Apache Flink의 로깅 및 모니터링](monitoring-overview.md) 섹션을 참조하십시오.
+ **CloudWatch 지표 사용:** CloudWatch 지표를 사용하여 애플리케이션의 활동 또는 애플리케이션이 입력 또는 출력에 사용하는 리소스(예: Kinesis 스트림, Firehose 스트림 또는 Amazon S3 버킷)에서의 활동을 모니터링할 수 있습니다. CloudWatch 지표에 대한 자세한 내용을 알아보려면 Amazon CloudWatch 사용자 가이드의 [지표 작업](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html)을 참조하세요.
+ **출력 위치 모니터링:** 애플리케이션이 특정 위치(예: Amazon S3 버킷 또는 데이터베이스)에 출력을 기록하는 경우 해당 위치에서 기록된 데이터를 모니터링할 수 있습니다.

# Managed Service for Apache Flink 애플리케이션의 시스템 롤백 활성화
<a name="how-system-rollbacks"></a>

시스템 롤백 기능으로 Amazon Managed Service for Apache Flink에서 실행 중인 Apache Flink 애플리케이션의 가용성을 더욱 높일 수 있습니다. 이 구성을 선택하면 `UpdateApplication` 또는 `autoscaling`과 같은 작업에서 코드나 구성 버그가 발생할 경우 서비스가 애플리케이션을 이전에 실행되던 버전으로 자동으로 되돌릴 수 있습니다.

**참고**  
시스템 롤백 기능을 사용하려면 애플리케이션을 업데이트하여 옵트인해야 합니다. 기존 애플리케이션은 기본적으로 시스템 롤백을 자동으로 사용하지 않습니다.

## 작동 방식
<a name="how-rollback-works"></a>

업데이트나 스케일링 작업과 같은 애플리케이션 작업을 시작하면 Amazon Managed Service for Apache Flink는 먼저 해당 작업을 실행하려고 시도합니다. 코드 버그나 권한 부족 등으로 작업의 성공을 방해하는 문제가 감지되면 서비스는 자동으로 `RollbackApplication` 작업을 시작합니다.

롤백은 애플리케이션을 이전에 정상적으로 실행되던 버전과 그에 연관된 애플리케이션 상태로 복원하려고 시도합니다. 롤백이 성공하면 애플리케이션이 이전 버전을 사용하여 가동 중지 시간을 최소화하면서 데이터를 계속 처리합니다. 자동 롤백도 실패할 경우 Amazon Managed Service for Apache Flink는 애플리케이션을 `READY` 상태로 전환하여 오류를 수정하고 작업을 다시 시도하는 등 필요한 추가 조치를 수행할 수 있도록 합니다.

자동 시스템 롤백을 사용하려면 옵트인해야 합니다. 이 시점 이후에는 콘솔 또는 API를 사용해 애플리케이션의 모든 작업에 대해 이 기능을 활성화할 수 있습니다.

`UpdateApplication` 작업을 위한 다음 예제 요청은 애플리케이션에 대한 시스템 롤백을 활성화합니다.

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## 자동 시스템 롤백에 대한 일반적인 시나리오 검토
<a name="common-scenarios"></a>

다음 시나리오는 자동 시스템 롤백이 유용한 경우를 보여줍니다.
+ **애플리케이션 업데이트:** 기본 메서드를 통해 Flink 작업을 초기화할 때 버그가 있는 새 코드로 애플리케이션을 업데이트한 경우 자동 롤백 기능을 통해 이전에 정상적으로 실행되던 버전으로 복원할 수 있습니다. 시스템 롤백이 도움이 되는 업데이트 시나리오는 다음과 같습니다.
  + 애플리케이션이 [maxParallelism](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto)보다 높은 병렬 처리로 실행되도록 업데이트되는 경우.
  + VPC 애플리케이션에서 잘못된 서브넷으로 업데이트되어 Flink 작업 스타트업 시 실패가 발생하는 경우.
+ **Flink 버전 업그레이드:** 새로운 Apache Flink 버전으로 업그레이드했으나 업그레이드된 애플리케이션에서 스냅샷 호환성 문제가 발생하면 시스템 롤백을 통해 이전 Flink 버전으로 자동으로 되돌릴 수 있습니다.
+ **AutoScaling:** 애플리케이션 스케일 업 시 스냅샷과 Flink 작업 그래프 간 연산자 불일치로 인해 세이브포인트 복원이 실패하는 경우.

## 시스템 롤백을 위한 운영 API 사용
<a name="operation-apis"></a>

Amazon Managed Service for Apache Flink은 더 나은 가시성을 위해 장애 및 관련 시스템 롤백을 추적할 수 있는 두 가지 애플리케이션 운영 관련 API를 제공합니다.

`ListApplicationOperations`

이 API는 `UpdateApplication`, `Maintenance`, `RollbackApplication` 등을 포함한 애플리케이션에서 수행된 모든 작업을 최신 순으로 나열합니다. `ListApplicationOperations` 작업을 위한 다음 예제 요청은 애플리케이션의 처음 10개 작업을 나열합니다.

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

`ListApplicationOperations`를 위한 다음 예제 요청은 목록을 애플리케이션의 이전 업데이트로 필터링하는 데 도움이 됩니다.

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

이 API는 `ListApplicationOperations`에서 나열된 특정 작업에 대한 상세 정보를 제공하며 해당하는 경우 실패 이유도 포함합니다. `DescribeApplicationOperation` 작업을 위한 다음 예제 요청은 애플리케이션 작업에 관한 세부 정보를 나열합니다.

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

문제 해결 정보는 [시스템 롤백 모범 사례](troubleshooting-system-rollback.md)를 참조하세요.

# Managed Service for Apache Flink 애플리케이션 실행
<a name="how-running-apps"></a>

이 주제에는 Managed Service for Apache Flink 실행에 대한 자세한 내용이 포함되어 있습니다.

Managed Service for Apache Flink 애플리케이션을 실행하면 서비스가 Apache Flink 작업을 생성합니다. Apache Flink 작업은 Managed Service for Apache Flink 애플리케이션의 실행 라이프사이클입니다. 작업 실행 및 작업에 사용되는 리소스는 Job Manager에서 관리합니다. Job Manager는 애플리케이션 실행을 작업으로 구분합니다. 각 작업은 작업 관리자가 관리합니다. 애플리케이션 성능을 모니터링할 때 각 Task Manager 또는 Job Manager 전체의 성능을 검사할 수 있습니다.

Apache Flink 작업에 관한 자세한 내용은 Apache Flink 설명서의 [작업 및 스케줄링](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/)을 참조하세요.

## 애플리케이션 및 작업 상태 식별
<a name="how-running-job-status"></a>

애플리케이션과 애플리케이션 작업 모두 현재 실행 상태입니다.
+ **애플리케이션 상태:** 애플리케이션에는 실행 단계를 설명하는 현재 상태가 있습니다. 애플리케이션 상태에는 다음이 포함됩니다.
  + **꾸준한 애플리케이션 상태:** 상태가 변경될 때까지 애플리케이션은 일반적으로 다음과 같은 상태를 유지합니다.
    + **준비:** 새 애플리케이션이나 중지된 애플리케이션은 실행할 때까지 준비 상태입니다.
    + **가동:** 성공적으로 시작된 애플리케이션은 가동 상태입니다.
  + **임시 애플리케이션 상태:** 이러한 상태의 애플리케이션은 일반적으로 다른 상태로 전환되는 중입니다. 애플리케이션이 일정 시간 동안 임시 상태에 있는 경우 `Force` 파라미터가 `true`로 설정된 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 작업을 사용하여 애플리케이션을 중지할 수 있습니다. 이러한 상태에는 다음이 포함됩니다.
    + [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 작업 후에 `STARTING:`이 발생합니다. 애플리케이션이 `READY` 상태에서 `RUNNING` 상태로 전환되고 있습니다.
    + [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 작업 후에 `STOPPING:`이 발생합니다. 애플리케이션이 `RUNNING` 상태에서 `READY` 상태로 전환되고 있습니다.
    + [DeleteApplication ](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html) 작업 이후에 `DELETING:`이 발생합니다. 애플리케이션을 삭제하는 중입니다.
    + [UpdateApplication ](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업 후에 `UPDATING:`이 발생합니다. 애플리케이션이 업데이트 중이며 `RUNNING` 또는 `READY` 상태로 다시 전환됩니다.
    + `AUTOSCALING:` 애플리케이션은 [ParallelismConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)의 `AutoScalingEnabled` 속성을 `true`으로 설정하고, 서비스는 애플리케이션의 병렬성을 높이고 있습니다. 애플리케이션이 이 상태인 경우 사용할 수 있는 유효한 API 작업은 `Force` 파라미터가 `true`로 설정된 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 작업뿐입니다. 자동 조정에 대한 자세한 내용을 알아보려면 [Managed Service for Apache Flink에서 자동 규모 조정 사용](how-scaling-auto.md) 섹션을 참조하세요.
    + `FORCE_STOPPING:``Force` 파라미터가 `true`로 설정된 상태에서 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 작업을 호출한 후에 이 발생합니다. 애플리케이션을 강제 중지하는 중입니다. 애플리케이션이 `STARTING`, `UPDATING`, `STOPPING` 또는 `AUTOSCALING` 상태에서 `READY` 상태로 전환됩니다.
    + [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) 작업이 호출된 후에 `ROLLING_BACK:`이 발생합니다. 애플리케이션이 이전 버전으로 회귀하는 중입니다. 애플리케이션이 `UPDATING` 또는 `AUTOSCALING` 상태에서 `RUNNING` 상태로 전환됩니다.
    + Managed Service for Apache Flink가 애플리케이션에 패치를 적용하는 동안 `MAINTENANCE:`이 발생합니다. 자세한 내용을 알아보려면 [Managed Service for Apache Flink 유지 관리 작업 관리](maintenance.md) 섹션을 참조하세요.

  콘솔을 사용하거나 [DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html) 작업을 사용하여 애플리케이션 상태를 확인할 수 있습니다.
+ **작업 현황:** 애플리케이션이 `RUNNING` 상태에 있으면 작업에는 현재 실행 단계를 설명하는 상태가 있습니다. 작업은 `CREATED` 상태에서 시작하고, 시작되면 `RUNNING` 상태로 진행됩니다. 오류 상황이 발생하는 경우 애플리케이션은 다음 상태가 됩니다.
  + Apache Flink 1.11 이상을 사용하는 애플리케이션의 경우 애플리케이션은 `RESTARTING` 상태가 됩니다.
  + Apache Flink 1.8 및 이전 버전을 사용하는 애플리케이션의 경우 플리케이션은 `FAILING` 상태가 됩니다.

  그러면 작업이 다시 시작될 수 있는지 여부에 따라 애플리케이션이 `RESTARTING` 또는 `FAILED` 상태로 진행됩니다.

  애플리케이션의 CloudWatch 로그에서 상태 변경을 검사하여 작업 상태를 확인할 수 있습니다.

## 배치 워크로드 실행
<a name="batch-workloads"></a>

Managed Service for Apache Flink는 Apache Flink 배치 워크로드 실행을 지원합니다. 배치 작업에서 Apache Flink 작업이 **FINISHED** 상태가 되면 Managed Service for Apache Flink 애플리케이션 상태가 **준비**로 설정됩니다. Flink 작업 상태에 대한 자세한 내용을 알아보려면 [작업 및 예약](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/)을 참조하십시오.

# Managed Service for Apache Flink 애플리케이션 리소스 검토
<a name="how-resources"></a>

이 섹션에서는 애플리케이션에서 사용하는 시스템 리소스에 대해 설명합니다. Managed Service for Apache Flink가 리소스를 프로비저닝하고 사용하는 방법을 이해하면 성능이 뛰어나고 안정적인 Apache Flink용 관리형 서비스 애플리케이션을 설계, 작성 및 유지 관리하는 데 도움이 됩니다.

## Managed Service for Apache Flink 애플리케이션 리소스
<a name="how-resources-kda"></a>

Managed Service for Apache Flink는 Apache Flink 애플리케이션을 호스팅하기 위한 환경을 생성하는 AWS 서비스입니다. Managed Service for Apache Flink는 **Kinesis 처리 단위(KPU)**라는 단위를 사용하여 리소스를 제공합니다.

하나의 KPU는 다음과 같은 시스템 리소스를 나타냅니다.
+ 하나의 CPU 코어
+ 4GB 메모리. 이 중 1GB는 기본 메모리이고 3GB는 힙 메모리입니다.
+ 50GB의 여유 디스크 공간

KPU는 **작업**과 **하위 작업**이라는 별개의 실행 단위로 애플리케이션을 실행합니다. 하위 작업은 스레드와 같다고 생각할 수 있습니다.

애플리케이션에서 사용할 수 있는 KPU 수는 애플리케이션 `Parallelism` 설정을 애플리케이션 `ParallelismPerKPU` 설정으로 나눈 값과 같습니다.

애플리케이션 병렬성에 대한 자세한 내용을 알아보려면 [애플리케이션 규모 조정 구현](how-scaling.md) 섹션을 참조하세요.

## Apache Flink 애플리케이션 리소스
<a name="how-resources-flink"></a>

Apache Flink 환경에서는 **작업 슬롯**이라는 단위를 사용하여 애플리케이션에 리소스를 할당합니다. Managed Service for Apache Flink는 애플리케이션에 리소스를 할당할 때 하나 이상의 Apache Flink 작업 슬롯을 단일 KPU에 할당합니다. 단일 KPU에 할당된 슬롯 수는 애플리케이션의 `ParallelismPerKPU` 설정과 같습니다. 작업 슬롯에 관한 자세한 내용은 Apache Flink 설명서의 [작업 스케줄링](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/)을 참조하세요.

### 연산자 병렬 처리
<a name="how-resources-flink-operatorparallelism"></a>

연산자가 사용할 수 있는 최대 하위 작업 수를 설정할 수 있습니다. 이 값을 **연산자 병렬성**이라고 합니다. 기본적으로 애플리케이션의 각 연산자의 병렬성은 애플리케이션의 병렬성과 동일합니다. 즉, 기본적으로 애플리케이션의 각 연산자는 필요한 경우 애플리케이션에서 사용 가능한 모든 하위 작업을 사용할 수 있습니다.

`setParallelism` 메서드를 사용하여 애플리케이션의 연산자 병렬성을 설정할 수 있습니다. 이 방법을 사용하면 각 연산자가 한 번에 사용할 수 있는 하위 작업의 수를 제어할 수 있습니다.

자세한 내용은 Apache Flink 설명서의 [연산자](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)를 참조하세요.

### 연산자 연결
<a name="how-resources-flink-operatorchaining"></a>

일반적으로 각 연산자는 별도의 하위 작업을 사용하여 실행하지만 여러 연산자가 항상 순서대로 실행되는 경우 런타임에서 이들 모두를 동일한 작업에 할당할 수 있습니다. 이 프로세스를 **연산자 연결**이라고 합니다.

순차 연산자 여러 개가 모두 같은 데이터에 대해 연산을 수행하는 경우 여러 연산자를 단일 작업으로 연결할 수 있습니다. 이를 실현하기 위해 필요한 몇 가지 기준은 다음과 같습니다.
+ 운영자는 일대일 단순 전달을 수행합니다.
+ 연산자는 모두 동일한 연산자 병렬성을 갖습니다.

애플리케이션이 연산자를 단일 하위 작업으로 연결하면 서비스가 네트워크 작업을 수행하고 연산자별로 하위 작업을 할당할 필요가 없으므로 시스템 리소스가 절약됩니다. 애플리케이션에서 연산자 연결을 사용하고 있는지 확인하려면 Managed Service for Apache Flink 콘솔의 작업 그래프를 살펴보세요. 애플리케이션의 각 꼭짓점은 하나 이상의 연산자를 나타냅니다. 그래프에는 체인으로 연결된 연산자가 단일 꼭지점으로 표시됩니다.

# Managed Service for Apache Flink의 초 단위 결제
<a name="how-pricing"></a>

Managed Service for Apache Flink는 이제 1초 단위로 요금이 청구됩니다. 애플리케이션당 최소 10분의 사용 요금이 적용됩니다. 초 단위 결제는 새로 시작되거나 이미 실행 중인 애플리케이션에 적용됩니다. 이 섹션에서는 Managed Service for Apache Flink가 사용량을 측정하고 요금을 청구하는 방식을 설명합니다. Managed Service for Apache Flink 요금에 관한 자세한 내용은 [Amazon Managed Service for Apache Flink 요금](https://aws.amazon.com/managed-service-apache-flink/pricing/)을 참조하세요.

## 작동 방식
<a name="how-resources-kda"></a>

Managed Service for Apache Flink는 지원되는 AWS 리전에서 1초 단위로 청구되는 **Kinesis 처리 단위(KPU)** 사용 시간과 사용량을 기준으로 요금을 부과합니다. 단일 KPU는 1vCPU 컴퓨팅과 4GB 메모리로 구성됩니다. 애플리케이션 실행에 사용된 KPU 수를 기준으로 시간당 요금이 청구됩니다.

예를 들어, 애플리케이션이 20분 10초 동안 실행된 경우 해당 시간 동안 사용한 리소스를 기준으로 요금이 청구됩니다. 반면, 애플리케이션이 5분 동안 실행된 경우 최소 10분 요금이 적용되며, 그 시간 동안 사용된 리소스를 기준으로 요금이 청구됩니다.

Managed Service for Apache Flink는 사용량을 시간 단위로 계산합니다. 예를 들어, 15분은 0.25시간으로 계산됩니다.

Apache Flink 애플리케이션의 경우, 오케스트레이션 용도로 애플리케이션당 추가 KPU 1개가 청구됩니다. 애플리케이션은 실행 중인 스토리지와 내구성 있는 백업에 대해서도 요금이 청구됩니다. Managed Service for Apache Flink에서 상태 저장 처리 기능을 위해 사용되는 실행 중인 애플리케이션 스토리지는 GB/월 단위로 요금이 청구됩니다. 내구성 있는 백업은 선택 사항이며 애플리케이션의 시점 복구 기능을 제공하고 GB/월 단위로 요금이 청구됩니다.

스트리밍 모드에서는 메모리와 컴퓨팅 요구가 변동됨에 따라 스트림 처리 애플리케이션에 필요한 KPU 수를 Managed Service for Apache Flink가 자동으로 규모 조정합니다. 애플리케이션에 필요한 KPU 수를 직접 프로비저닝하도록 선택할 수도 있습니다.

## AWS 리전 가용성
<a name="how-pricing-regions"></a>

**참고**  
현재 AWS GovCloud(미국 동부), AWS GovCloud(미국 서부), 중국(베이징), 중국(닝샤) 리전에는 초 단위 결제를 제공하지 않습니다.

초 단위 결제는 다음 AWS 리전에서 사용할 수 있습니다.
+ 미국 동부(버지니아 북부) - us-east-1
+ 미국 동부(오하이오) - us-east-2
+ 미국 서부(캘리포니아 북부) - us-west-1
+ 미국 서부(오리건) - us-west-2
+ 아프리카(케이프타운) – af-south-1
+ 아시아 태평양(홍콩) - ap-east-1
+ 아시아 태평양(하이데라바드) - ap-south-1
+ 아시아 태평양(자카르타) – ap-southeast-3
+ 아시아 태평양(멜버른) - ap-southeast-4
+ 아시아 태평양(뭄바이) - ap-south-1
+ 아시아 태평양(오사카) – ap-northeast-3
+ 아시아 태평양(서울) - ap-northeast-2
+ 아시아 태평양(싱가포르) - ap-southeast-1
+ 아시아 태평양(시드니) - ap-southeast-2
+ 아시아 태평양(도쿄) - ap-northeast-1
+ 캐나다(중부) - ca-central-1
+ 캐나다 서부(캘거리) - ca-west-1
+ 유럽(프랑크푸르트) eu-central-1
+ 유럽(아일랜드) - eu-west-1
+ 유럽(런던) - eu-west-2
+ 유럽(밀라노) – eu-south-1
+ 유럽(파리) - eu-west-3
+ 유럽(스페인) - eu-south-2
+ 유럽(스톡홀름) - eu-north-1
+ 유럽(취리히) - eu-central-2
+ 이스라엘 (텔아비브) - il-central-1
+ 중동(바레인) - me-south-1
+ 중동(UAE) - me-central-1
+ 남아메리카(상파울루) - sa-east-1

## 요금 예시
<a name="how-pricing-examples"></a>

Managed Service for Apache Flink 요금 페이지에서 요금 예시를 확인할 수 있습니다. 자세한 내용은 [Amazon Managed Service for Apache Flink 요금](https://aws.amazon.com/managed-service-apache-flink/pricing/)을 참조하세요. 각 항목에 대해 비용 사용 보고서 예시를 포함한 추가 예시는 다음과 같습니다.

### 장시간 실행되는 고부하 워크로드
<a name="pricing-example-1"></a>

대규모 비디오 스트리밍 서비스를 운영하고 있으며 사용자 상호 작용을 기반으로 실시간 동영상 추천 시스템을 구축하려고 합니다. 여러 Kinesis 데이터 스트림에서 사용자 상호 작용 이벤트를 지속적으로 수집하고 다운스트림 시스템으로 전달하기 전에 실시간으로 이벤트를 처리하기 위해 Managed Service for Apache Flink의 Apache Flink 애플리케이션을 사용합니다. 사용자 상호 작용 이벤트는 여러 연산자를 통해 변환됩니다. 여기에는 이벤트 유형별 파티셔닝, 추가 메타데이터를 활용한 데이터 보강, 타임스탬프 기반 데이터 정렬, 전달 전 5분 동안의 데이터 버퍼링 등이 포함됩니다. 애플리케이션에는 연산 집약적이면서 병렬 처리가 가능한 여러 변환 단계가 포함되어 있습니다. 이 워크로드를 처리하기 위해 Flink 애플리케이션은 20KPU 구성으로 실행됩니다. 또한 애플리케이션은 매일 1GB의 내구성 있는 애플리케이션 백업을 사용합니다. 월별 Managed Service for Apache Flink 요금은 다음과 같이 계산됩니다.

**월별 요금**

미국 동부(버지니아 북부) 리전의 요금은 KPU-시간당 0.11 USD입니다. Managed Service for Apache Flink는 KPU당 50GB의 실행 중 애플리케이션 스토리지를 할당하며, GB/월당 0.10 USD가 청구됩니다.
+ 월별 KPU 요금: 24시간 \$1 30일 \$1 (20KPU \$1 스트리밍 애플리케이션용 추가 KPU 1개) \$1 시간당 0.11 USD = 1,584.00 USD
+ 월별 실행 중 애플리케이션 스토리지 요금: 30일 \$1 20KPU \$1 50GB/KPU \$1 0.10 USD/GB-월 = 100.00 USD
+ 월별 내구성 있는 애플리케이션 스토리지 요금: 30일 \$1 1GB \$1 0.023/GB-월 = 0.03 USD
+ 총 요금: 1,584.00 USD \$1 100 USD \$1 0.03 USD = **1,684.03 USD**

**해당 월의 Billing and Cost Management 콘솔에서 확인되는 Managed Service for Apache Flink 비용 사용 보고서**

Kinesis Analytics
+ 1,684.03 USD - 미국 동부(버지니아 북부)
+ Amazon Kinesis Analytics CreateSnapshot
  + 내구성 있는 애플리케이션 백업의 GB/월당 0.023 USD
    + 1GB-월 - 0.03 USD
+ Amazon Kinesis Analytics StartApplication
  + 실행 중 애플리케이션 스토리지의 GB-월당 0.10 USD
    + 1,000GB-월 - 100 USD
  + Apache Flink 애플리케이션의 Kinesis 처리 단위-시간당 0.11 USD
    + 15,120KPU-시간 - 1,584 USD

### 하루에 약 15분 동안 실행되는 배치 워크로드
<a name="pricing-example-2"></a>

Managed Service for Apache Flink의 Apache Flink 애플리케이션을 사용하여 Amazon Simple Storage Service(Amazon S3)에 저장된 로그 데이터를 배치 모드로 변환합니다. 로그 데이터는 여러 연산자를 사용하여 변환됩니다. 여기에는 서로 다른 로그 이벤트에 스키마를 적용하고, 이벤트 유형별로 데이터를 파티셔닝하고, 타임스탬프 기준으로 정렬하는 작업이 포함됩니다. 애플리케이션에는 여러 변환 단계가 있지만, 계산 집약적이지는 않습니다. 이 애플리케이션은 30일 동안 매일 15분씩 초당 2,000개 레코드의 데이터를 수집합니다. 내구성 있는 애플리케이션 백업은 생성하지 않습니다. 월별 Managed Service for Apache Flink 요금은 다음과 같이 계산됩니다.

**월별 요금**

미국 동부(버지니아 북부) 리전의 요금은 KPU-시간당 0.11 USD입니다. Managed Service for Apache Flink는 KPU당 50GB의 실행 중 애플리케이션 스토리지를 할당하며, GB/월당 0.10 USD가 청구됩니다.
+ 배치 워크로드: 하루 15분 동안 Managed Service for Apache Flink 애플리케이션은 초당 2,000개의 레코드를 처리하며, 이 작업에는 2KPU가 필요합니다. 30일/월 \$1 15분/일 = 450분/월
+ 월별 KPU 요금: 450분/월 \$1 (2KPU \$1 스트리밍 애플리케이션용 추가 KPU 1개) \$1 시간당 0.11 USD = 2.48 USD
+ 월별 실행 중 애플리케이션 스토리지 요금: 450분/월 \$1 2KPU \$1 50GB/KPU \$1 0.10 USD/GB-월 = 0.11 USD
+ 총 요금: 2.48 USD \$1 0.11 = **2.59 USD**

**해당 월의 Billing and Cost Management 콘솔에서 확인되는 Managed Service for Apache Flink 비용 사용 보고서**

Kinesis Analytics
+ 2.59 USD - 미국 동부(버지니아 북부)
+ Amazon Kinesis Analytics StartApplication
  + 실행 중 애플리케이션 백업의 GB-월당 0.10 USD
    + 1.042GB-월 - 0.11 USD
  + Apache Flink 애플리케이션의 Kinesis 처리 단위-시간당 0.11 USD
    + 22.5KPU-시간 - 2.48 USD

### 같은 시간 안에 애플리케이션을 반복적으로 시작 및 중지하는 테스트 애플리케이션으로 인해 최소 요금이 여러 번 발생하는 경우
<a name="pricing-example-3"></a>

매일 수백만 건의 트랜잭션을 처리하는 대규모 전자상거래 플랫폼을 운영하고 있습니다. 실시간 부정 행위 탐지 시스템을 개발하려고 합니다. Managed Service for Apache Flink의 Apache Flink 애플리케이션을 사용하여 Kinesis Data Streams에서 트랜잭션 이벤트를 수집하고 다양한 변환 단계를 통해 실시간으로 이벤트를 처리합니다. 여기에는 슬라이딩 윈도우를 사용한 이벤트 집계, 이벤트 유형별 파티셔닝, 이벤트 유형별 특정 탐지 규칙 적용 등이 포함됩니다. 개발 중에는 애플리케이션의 동작을 테스트하고 디버그하기 위해 애플리케이션을 여러 번 시작하고 중지하게 됩니다. 때로는 애플리케이션이 몇 분만 실행되는 경우도 있습니다. 특정 한 시간 동안에는 4KPU로 구성된 애플리케이션을 테스트하며 애플리케이션에서 내구성 있는 애플리케이션 백업은 사용하지 않습니다.
+ 오전 10시 5분에 애플리케이션을 시작하며, 30분간 실행된 후 오전 10시 35분에 중지됩니다.
+ 오전 10시 40분에 애플리케이션을 다시 시작하며, 5분간 실행된 후 오전 10시 45분에 중지됩니다.
+ 오전 10시 50분에 애플리케이션을 다시 시작하며, 2분간 실행된 후 오전 10시 52분에 중지됩니다.

Managed Service for Apache Flink는 애플리케이션이 실행될 때마다 최소 10분의 사용량을 기준으로 요금을 부과합니다. 애플리케이션의 월별 Managed Service for Apache Flink 사용량은 다음과 같이 계산됩니다.
+ 애플리케이션 첫 번째 시작 및 중지: 사용량 30분
+ 애플리케이션 두 번째 시작 및 중지: 사용량 10분(애플리케이션이 5분 동안 실행되었으나 최소 요금 10분으로 반올림됨)
+ 애플리케이션 세 번째 시작 및 중지: 사용량 10분(애플리케이션이 2분 동안 실행되었으나 최소 요금 10분으로 반올림됨)

총 50분의 애플리케이션 사용량에 대해 요금이 청구됩니다. 해당 월에 애플리케이션이 실행된 다른 시간이 없으면, 월별 Managed Service for Apache Flink 요금은 다음과 같이 계산됩니다.

**월별 요금**

미국 동부(버지니아 북부) 리전의 요금은 KPU-시간당 0.11 USD입니다. Managed Service for Apache Flink는 KPU당 50GB의 실행 중 애플리케이션 스토리지를 할당하며, GB/월당 0.10 USD가 청구됩니다.
+ 월별 KPU 요금: 50분 \$1 (4KPU \$1 스트리밍 애플리케이션용 추가 KPU 1개) \$1 0.11 USD/시간 = 0.46 USD(1센트 단위로 반올림)
+ 월별 실행 중 애플리케이션 스토리지 요금: 50분 \$1 4KPU \$1 50GB/KPU \$1 0.10 USD/GB-월 = 0.03 USD(1센트 단위로 반올림)
+ 총 요금: 0.46 USD \$1 0.03 = **0.49 USD**

**해당 월의 Billing and Cost Management 콘솔에서 확인되는 Managed Service for Apache Flink 비용 사용 보고서**

Kinesis Analytics
+ 0.49 USD - 미국 동부(버지니아 북부)
+ Amazon Kinesis Analytics StartApplication
  + 실행 중 애플리케이션 스토리지의 GB-월당 0.10 USD
    + 0.232GB-월 - 0.03 USD
  + Apache Flink 애플리케이션의 Kinesis 처리 단위-시간당 0.11 USD
    + 4.167KPU-시간 - 0.46 USD

# DataStream API 구성 요소 검토
<a name="how-datastream"></a>

Apache Flink 애플리케이션은 [Apache Flink DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/)를 사용하여 데이터 스트림의 데이터를 변환합니다.

이 섹션에서는 데이터를 이동, 변환, 추적하는 다양한 구성 요소를 설명합니다.
+ [Managed Service for Apache Flink에서 DataStream API를 사용한 데이터 이동 커넥터 사용](how-connectors.md): 이러한 구성 요소는 애플리케이션과 외부 데이터 소스 및 목적지 간에 데이터를 이동합니다.
+ [DataStream API를 사용한 Managed Service for Apache Flink의 연산자를 사용하여 데이터 변환](how-operators.md): 이러한 구성 요소는 응용 프로그램 내의 데이터 요소를 변환하거나 그룹화합니다.
+ [DataStream API를 사용하여 Managed Service for Apache Flink에서 이벤트 추적](how-time.md): 이 항목에서는 Managed Service for Apache Flink가 DataStream API를 사용할 때 이벤트를 추적하는 방법에 대해 설명합니다.

# Managed Service for Apache Flink에서 DataStream API를 사용한 데이터 이동 커넥터 사용
<a name="how-connectors"></a>

Amazon Managed Service for Apache Flink DataStream API에서 *커넥터*는 Managed Service for Apache Flink 애플리케이션에서 데이터를 내보내고 받는 소프트웨어 구성 요소입니다. 커넥터는 파일과 디렉터리에서 읽을 수 있는 유연한 통합 구성 요소입니다. 커넥터는 Amazon 서비스 및 제3자 시스템과 상호 작용하기 위한 완전한 모듈로 구성됩니다.

커넥터 유형은 다음을 포함합니다.
+ [스트리밍 데이터 소스 추가](how-sources.md): Kinesis 데이터 스트림, 파일 또는 기타 데이터 소스에서 애플리케이션에 데이터를 제공합니다.
+ [싱크를 사용하여 데이터 쓰기](how-sinks.md): 애플리케이션에서 Kinesis 데이터 스트림, Firehose 스트림 또는 기타 데이터 대상으로 데이터를 전송합니다.
+ [비동기식 I/O 사용](how-async.md): 스트림 이벤트를 강화하기 위해 데이터 소스(예: 데이터베이스)에 대한 비동기 액세스를 제공합니다.

## 사용 가능한 커넥터
<a name="how-connectors-list"></a>

Apache Flink 프레임워크에는 다양한 소스의 데이터에 액세스하기 위한 커넥터가 포함되어 있습니다. Apache Flink 프레임워크에서 사용할 수 있는 커넥터에 대한 자세한 내용을 알아보려면 [Apache Flink 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.15/)의 [커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/)를 참조하세요.

**주의**  
Flink 1.6, 1.8, 1.11 또는 1.13에서 실행되는 애플리케이션을 중동(UAE), 아시아 태평양(하이데라바드), 이스라엘(텔아비브), 유럽(취리히), 아시아 태평양(멜버른) 또는 아시아 태평양(자카르타) 리전에서 실행하려는 경우 업데이트된 커넥터로 애플리케이션 아카이브를 재구축하거나 Flink 1.18로 업그레이드해야 할 수 있습니다.  
Apache Flink 커넥터는 각 커넥터별로 분리된 오픈 소스 리포지토리에 저장됩니다. 버전 1.18 이상으로 업그레이드하는 경우 종속 항목을 업데이트해야 합니다. Apache Flink AWS 커넥터의 리포지토리에 액세스하려면 [flink-connector-aws](https://github.com/apache/flink-connector-aws)를 참조하세요.  
이전 Kinesis 소스 `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer`는 지원이 중단되었으며 향후 Flink 릴리스에서 제거될 수 있습니다. 대신 [Kinesis 소스](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source)를 사용합니다.  
`FlinkKinesisConsumer` 및 `KinesisStreamsSource` 사이에는 상태 호환성이 없습니다. 자세한 내용은 Apache Flink 설명서의 [Kinesis Streams 소스로 기존 작업 마이그레이션](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer)을 참조하세요.  
 다음은 권장 가이드라인입니다.  


**커넥터 업그레이드**  

| Flink 버전 | 커넥터 사용 | 해결 방법 | 
| --- | --- | --- | 
| 1.19, 1.20 | Kinesis 소스 |  Managed Service for Apache Flink 1.19 및 1.20으로 업그레이드할 때는 최신 Kinesis Data Streams 소스 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Amazon Kinesis Data Streams 커넥터](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)를 참조하세요.  | 
| 1.19, 1.20 | Kinesis 싱크 |  Managed Service for Apache Flink 1.19 및 1.20으로 업그레이드할 때는 최신 Kinesis Data Streams 싱크 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Kinesis Streams 싱크](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink)를 참조하세요.  | 
| 1.19, 1.20 | DynamoDB 스트림 소스 |  Managed Service for Apache Flink 1.19 및 1.20으로 업그레이드할 때는 최신 DynamoDB 스트림 소스 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Amazon DynamoDB 커넥터](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)를 참조하세요.  | 
| 1.19, 1.20 | DynamoDB 싱크 | Managed Service for Apache Flink 1.19 및 1.20으로 업그레이드할 때는 최신 DynamoDB 싱크 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Amazon DynamoDB 커넥터](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)를 참조하세요. | 
| 1.19, 1.20 | Amazon SQS 싱크 |  Managed Service for Apache Flink 버전 1.19 및 1.20으로 업그레이드할 때는 최신 Amazon SQS 싱크 커넥터를 사용하고 있는지 확인합니다. 5.0.0 이상 버전이어야 합니다. 자세한 내용은 [Amazon SQS 싱크](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)를 참조하세요.  | 
| 1.19, 1.20 | Amazon Managed Service for Prometheus 싱크 |  Managed Service for Apache Flink 버전 1.19 및 1.20으로 업그레이드할 때는 최신 Amazon Managed Service for Prometheus 싱크 커넥터를 사용하고 있는지 확인합니다. 1.0.0 이상 버전이어야 합니다. 자세한 내용은 [Prometheus 싱크](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/)를 참조하세요.  | 

# Managed Service for Apache Flink에 스트리밍 데이터 소스 추가
<a name="how-sources"></a>

Apache Flink는 파일, 소켓, 컬렉션, 맞춤 소스에서 읽을 수 있는 커넥터를 제공합니다. 애플리케이션 코드에서 [Apache Flink 소스](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)를 사용하여 스트림으로부터 데이터를 수신합니다. 이 섹션에서는 Amazon 서비스에 사용할 수 있는 소스를 설명합니다.

## Kinesis 데이터 스트림 사용
<a name="input-streams"></a>

`KinesisStreamsSource`는 Amazon Kinesis 데이터 스트림에서 애플리케이션으로 스트리밍 데이터를 제공합니다.

### `KinesisStreamsSource` 생성
<a name="input-streams-create"></a>

다음 코드 예는 `KinesisStreamsSource`를 생성하는 방법을 보여 줍니다.

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

`KinesisStreamsSource` 사용에 관한 자세한 내용은 Apache Flink 설명서의 [Amazon Kinesis Data Streams 커넥터](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) 및 [Github에서의 퍼블릭 KinesisConnectors 예제](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)를 참조하세요.

### EFO 소비자를 사용하는 `KinesisStreamsSource` 생성
<a name="input-streams-efo"></a>

`KinesisStreamsSource`는 이제 [향상된 팬아웃(EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)을 지원합니다.

Kinesis 컨슈머가 EFO를 사용하는 경우 Kinesis Data Streams 서비스는 컨슈머가 스트림에서 읽는 다른 컨슈머와 스트림의 고정 대역폭을 공유하지 않고 자체 전용 대역폭을 제공합니다.

Kinesis 소비자와 함께 EFO를 사용하는 방법에 대한 자세한 내용은 [ FLIP-128: AWS Kinesis 소비자를 위한 향상된 팬아웃을 참조하세요](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers).

Kinesis 소비자에 다음 파라미터를 설정하여 EFO 소비자를 활성화합니다.
+ **READER\$1TYPE:** 애플리케이션이 **EFO** 소비자를 사용하여 Kinesis Data Stream 데이터에 액세스하도록 이 파라미터를 EFO로 설정하세요.
+ **EFO\$1CONSUMER\$1NAME: ** 이 파라미터를 이 스트림의 소비자 간에 고유한 문자열 값으로 설정합니다. 동일한 Kinesis Data Stream에서 컨슈머 명칭을 재사용하면 해당 명칭을 사용하던 이전 컨슈머가 종료됩니다.

EFO를 사용하도록 `KinesisStreamsSource`를 구성하려면 컨슈머에 다음 파라미터를 추가하세요.

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

EFO 소비자를 사용하는 Managed Service for Apache Flink 애플리케이션의 예제는 [Github에서의 퍼블릭 Kinesis 커넥터 예제](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)를 참조하세요.

## Amazon MSK 사용
<a name="input-msk"></a>

`KafkaSource` 소스는 Amazon MSK 주제에서 스트리밍 데이터를 애플리케이션에 제공합니다.

### `KafkaSource` 생성
<a name="input-msk-create"></a>

다음 코드 예는 `KafkaSource`를 생성하는 방법을 보여 줍니다.

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

`KafkaSource` 사용에 대한 자세한 내용을 알아보려면 [MSK 복제](earlier.md#example-msk) 섹션을 참조하세요.

# Managed Service for Apache Flink에서 싱크를 사용하여 데이터 쓰기
<a name="how-sinks"></a>

애플리케이션 코드에서는 Kinesis Data Streams 및 DynamoDB 같은 AWS 서비스를 포함한 외부 시스템에 데이터를 쓰기 위해 어떤 [Apache Flink 싱크](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) 커넥터든 사용할 수 있습니다.

Apache Flink는 파일과 소켓용 싱크를 제공하며 사용자 지정 싱크를 구현할 수도 있습니다. 지원되는 여러 싱크 중 다음 싱크가 자주 사용됩니다.

## Kinesis 데이터 스트림 사용
<a name="sinks-streams"></a>

Apache Flink는 Apache Flink 설명서에서 [Kinesis Data Streams 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)에 대한 정보를 제공합니다.

입력 및 출력에 Kinesis 데이터 스트림을 사용하는 애플리케이션의 예는 [자습서: Managed Service for Apache Flink에서 DataStream API 사용 시작하기](getting-started.md) 섹션을 참조하세요.

## Apache Kafka 및 Amazon Managed Streaming for Apache Kafka(MSK) 사용
<a name="sinks-MSK"></a>

[Apache Flink Kafka 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)는 Apache Kafka와 Amazon MSK로 데이터를 게시하기 위한 광범위한 기능을 제공하며, 정확히 한 번 처리 보장도 지원합니다. Kafka에 데이터를 쓰는 방법은 Apache Flink 설명서의 [Kafka 커넥터 예제](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)를 참조하세요.

## Amazon S3 사용
<a name="sinks-s3"></a>

Amazon S3 버킷에 객체를 쓰는 데 Apache Flink `StreamingFileSink`를 사용할 수 있습니다.

S3에 객체를 쓰는 방법에 대한 예는 [예: Amazon S3 버킷에 쓰기](earlier.md#examples-s3) 섹션을 참조하세요.

## Firehose 사용
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`는 [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) 서비스를 사용하여 애플리케이션 출력을 저장하기 위한 안정적이고 확장 가능한 Apache Flink 싱크입니다. 이 섹션에서는 Maven 프로젝트를 설정하여 `FlinkKinesisFirehoseProducer`를 생성하고 사용하는 방법을 설명합니다.

**Topics**
+ [`FlinkKinesisFirehoseProducer` 생성](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` 코드 예](#sinks-firehose-sample)

### `FlinkKinesisFirehoseProducer` 생성
<a name="sinks-firehose-create"></a>

다음 코드 예는 `FlinkKinesisFirehoseProducer`를 생성하는 방법을 보여 줍니다.

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` 코드 예
<a name="sinks-firehose-sample"></a>

다음 코드 예제는 `FlinkKinesisFirehoseProducer`를 생성 및 구성하고 Apache Flink 데이터 스트림에서 Firehose 서비스로 데이터를 전송하는 방법을 보여줍니다.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Firehose 싱크를 사용하는 방법에 대한 전체 자습서는 [예제: Firehose에 쓰기](earlier.md#get-started-exercise-fh) 섹션을 참조하세요.

# Managed Service for Apache Flink에서 비동기식 I/O 사용
<a name="how-async"></a>

비동기식 I/O 연산자는 데이터베이스와 같은 외부 데이터 소스를 사용하여 스트림 데이터를 강화합니다. Managed Service for Apache Flink는 스트림 이벤트를 비동기적으로 보강하여 요청을 일괄 처리하여 효율성을 높일 수 있도록 합니다.

자세한 내용을 알아보려면 Apache Flink 설명서의 [비동기식 I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) 섹션을 참조하세요.

# DataStream API를 사용한 Managed Service for Apache Flink의 연산자를 사용하여 데이터 변환
<a name="how-operators"></a>

Managed Service for Apache Flink에서 들어오는 데이터를 변환하려면 Apache Flink *연산자*를 사용합니다. Apache Flink 연산자는 하나 이상의 데이터 스트림을 새 데이터 스트림으로 변환합니다. 새 데이터 스트림에는 원래 데이터 스트림에서 수정된 데이터가 포함됩니다. Apache Flink는 25개 이상의 사전 빌드된 스트림 처리 연산자를 제공합니다. 자세한 내용은 Apache Flink 설명서의 [연산자](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)를 참조하세요.

**Topics**
+ [변환 연산자 사용](#how-operators-transform)
+ [집계 연산자 사용](#how-operators-agg)

## 변환 연산자 사용
<a name="how-operators-transform"></a>

다음은 JSON 데이터 스트림의 필드 중 하나에 대한 간단한 텍스트 변환의 예입니다.

이 코드는 변환된 데이터 스트림을 만듭니다. 새 데이터 스트림에는 원래 스트림과 동일한 데이터가 포함되며 `TICKER` 필드 내용에 “` Company`” 문자열이 추가됩니다.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 집계 연산자 사용
<a name="how-operators-agg"></a>

다음은 집계 연산자의 예시입니다. 코드는 집계된 데이터 스트림을 만듭니다. 연산자는 5초짜리 텀블링 윈도우를 만들고 창에 있는 레코드에 대한 `PRICE` 값의 합계를 같은 `TICKER` 값으로 반환합니다.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

더 많은 코드 예제는 [Managed Service for Apache Flink 애플리케이션 생성 및 사용 예제](examples-collapsibles.md) 섹션을 참조하세요.

# DataStream API를 사용하여 Managed Service for Apache Flink에서 이벤트 추적
<a name="how-time"></a>

Managed Service for Apache Flink는 다음 타임스탬프를 사용하여 이벤트를 추적합니다.
+ **처리 시간:** 각 작업을 실행 중인 시스템의 시스템 시간을 나타냅니다.
+ **이벤트 시간:** 각 개별 이벤트가 생성 장치에서 발생한 시간을 나타냅니다.
+ **수집 시간:** 이벤트가 Managed Service for Apache Flink에 입력되는 시간을 나타냅니다.

`setStreamTimeCharacteristic`을 사용하여 스트리밍 환경에서 사용되는 시간을 설정합니다.

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

타임스탬프에 관한 자세한 내용을 알아보려면 Apache Flink 설명서의 [워터마크 생성](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/)을 참조하세요.

# 테이블 API 구성 요소 검토
<a name="how-table"></a>

Apache Flink 애플리케이션은 [Apache Flink 표 API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/)를 사용하여 관계형 모델을 사용하여 스트림의 데이터와 상호 작용합니다. 표 API를 사용하여 표 소스를 사용하여 데이터에 액세스한 다음 표 함수를 사용하여 표 데이터를 변환하고 필터링합니다. API 함수 또는 SQL 명령을 사용하여 표 형식 데이터를 변환하고 필터링할 수 있습니다.

이 섹션은 다음 주제를 포함합니다:
+ [Table API 커넥터](how-table-connectors.md): 이러한 구성 요소는 애플리케이션과 외부 데이터 소스 및 목적지 간에 데이터를 이동합니다.
+ [Table API 시간 속성](how-table-timeattributes.md): 이 항목에서는 Apache Flink용 관리형 서비스가 표 API를 사용할 때 이벤트를 추적하는 방법에 대해 설명합니다.

# Table API 커넥터
<a name="how-table-connectors"></a>

Apache Flink 프로그래밍 모델에서 커넥터는 애플리케이션이 다른 AWS 서비스와 같은 외부 소스에서 데이터를 읽거나 쓰는 데 사용하는 구성 요소입니다.

Apache Flink 표 API를 사용하면 다음과 같은 유형의 커넥터를 사용할 수 있습니다.
+ [Table API 소스](#how-table-connectors-source): 표 API 소스 커넥터를 사용하면 API 호출 또는 SQL 쿼리를 사용하여 귀하의 `TableEnvironment` 내에 표를 생성할 수 있습니다.
+ [Table API 싱크](#how-table-connectors-sink): SQL 명령을 사용하여 Amazon MSK 주제 또는 Amazon S3 버킷과 같은 외부 소스에 표 데이터를 쓸 수 있습니다.

## Table API 소스
<a name="how-table-connectors-source"></a>

데이터 스트림에서 표 소스를 생성합니다. 다음 코드는 Amazon MSK 주제에서 표를 생성합니다.

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

표 소스에 관한 자세한 내용을 알아보려면 Apache Flink 설명서의 [표 및 SQL 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)를 참조하세요.

## Table API 싱크
<a name="how-table-connectors-sink"></a>

표 데이터를 싱크에 쓰려면 SQL로 싱크를 만든 다음 `StreamTableEnvironment` 객체에서 SQL 기반 싱크를 실행합니다.

다음 코드 예는 Amazon S3 싱크에 표 데이터를 쓰는 방법을 보여줍니다.

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 `format` 파라미터를 사용하여 Apache Flink용 관리형 서비스가 싱크에 출력을 기록하는 데 사용하는 형식을 제어할 수 있습니다. 형식에 관한 자세한 내용을 알아보려면 Apache Flink 설명서의 [지원되는 커넥터](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)를 참조하세요.

## 사용자 정의 소스 및 싱크
<a name="how-table-connectors-userdef"></a>

기존 Apache Kafka 커넥터를 사용하여 Amazon MSK 및 Amazon S3와 같은 다른 AWS 서비스와 데이터를 주고 받을 수 있습니다. 다른 데이터 소스 및 목적지과 상호 작용하기 위해 자체 소스 및 싱크를 정의할 수 있습니다. 자세한 내용을 알아보려면 Apache Flink 설명서의 [사용자 정의 소스 및 싱크](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/)를 참조하세요.

# Table API 시간 속성
<a name="how-table-timeattributes"></a>

데이터 스트림의 각 레코드에는 레코드와 관련된 이벤트가 발생한 시기를 정의하는 여러 타임스탬프가 있습니다.
+ **이벤트 시간**: 레코드를 생성한 이벤트가 발생한 시기를 정의하는 사용자 정의 타임스탬프입니다.
+ **통합 시간**: 애플리케이션이 데이터 스트림에서 레코드를 검색한 시간입니다.
+ **처리 시간**: 애플리케이션에서 레코드를 처리한 시간입니다.

Apache Flink Table API가 레코드 시간을 기반으로 창을 생성할 때, `setStreamTimeCharacteristic` 메서드를 사용하여 이러한 타임스탬프 중 어떤 타임스탬프를 사용할지 정의합니다.

Table API에서의 타임스탬프를 사용에 관한 자세한 내용을 알아보려면 Apache Flink 설명서의 [시간 속성](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/) 및 [적시 스트림 처리](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/)를 참조하세요.

# Managed Service for Apache Flink와 함께 Python 사용
<a name="how-python"></a>

**참고**  
애플 실리콘 칩이 장착된 새로운 Mac에서 Python Flink 애플리케이션을 개발하는 경우, PyFlink 1.15의 Python 종속성과 관련된 몇 가지 [알려진 문제](https://issues.apache.org/jira/browse/FLINK-26981)가 발생할 수 있습니다. 이 경우 Docker에서 Python 인터프리터를 실행하는 것이 좋습니다. 단계별 지침은 [Apple Silicon Mac에서의 PyFlink 1.15 개발](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon)을 참조하세요.

Apache Flink 버전 2.2에는 Python 버전 3.12를 사용하여 애플리케이션을 생성할 수 있는 지원이 포함되어 있으며, Python 버전 3.8에 대한 지원이 제거됩니다. 자세한 내용은 [Flink Python 설명서](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/)를 참조하세요. 다음을 통해 Python을 사용하여 Managed Service for Apache Flink 애플리케이션을 생성합니다.
+ `main` 메서드를 사용하여 Python 애플리케이션 코드를 텍스트 파일로 만듭니다.
+ 애플리케이션 코드 파일과 Python 또는 Java 종속성을 zip 파일로 번들링한 다음 Amazon S3 버킷에 업로드합니다.
+ Amazon S3 코드 위치, 애플리케이션 속성 및 애플리케이션 설정을 지정하여 Managed Service for Apache Flink 애플리케이션을 생성합니다.

상위 수준에서 보면, Python 표 API는 자바 표 API에 대한 래퍼입니다. Python Table API에 관한 자세한 내용은 Apache Flink 설명서의 [Table API 자습서](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/)를 참조하세요.

# Managed Service for Apache Flink Python 애플리케이션 프로그래밍
<a name="how-python-programming"></a>

Apache Flink Python 표 API를 사용하여 Python 애플리케이션용 Managed Service for Apache Flink를 코딩합니다. Apache Flink 엔진은 Python 표 API 명령문(Python가상 머신에서 실행)을 Java 표 API 명령문(Java 가상 머신에서 실행)으로 변환합니다.

Python 표 API를 사용하는 방법은 다음과 같습니다.
+ `StreamTableEnvironment`에 대한 참조를 생성합니다.
+ `StreamTableEnvironment` 참조에 대해 쿼리를 실행하여 소스 스트리밍 데이터에서 `table` 객체를 생성합니다.
+ `table` 객체에 대해 쿼리를 실행하여 출력 표를 생성합니다.
+ `StatementSet`을 사용하여 대상에 출력 표를 작성합니다.

Managed Service for Apache Flink에서 Python 표 API를 사용하여 시작하려면 [Python으로 Amazon Managed Service for Apache Flink 시작하기](gs-python.md) 섹션을 참조하십시오.

## 스트리밍 데이터 읽기 및 쓰기
<a name="how-python-programming-readwrite"></a>

스트리밍 데이터를 읽고 쓰려면 표 환경에서 SQL 쿼리를 실행합니다.

### 테이블 생성
<a name="how-python-programming-readwrite-createtable"></a>

다음 코드 예는 SQL 쿼리를 생성하는 사용자 정의 함수를 보여줍니다. SQL 쿼리는 Kinesis 스트림과 상호 작용하는 표를 생성합니다.

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### 스트리밍 데이터 읽기
<a name="how-python-programming-readwrite-read"></a>

다음 코드 예는 표 환경 참조에서 이전 `CreateTable`SQL 쿼리를 사용하여 데이터를 읽는 방법을 보여줍니다.

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### 스트리밍 데이터 쓰기
<a name="how-python-programming-readwrite-write"></a>

다음 코드 예는 `CreateTable` 예의 SQL 쿼리를 사용하여 출력 표 참조를 생성하는 방법과 `StatementSet`를 사용하여 표과 상호 작용하여 대상 Kinesis 스트림에 데이터를 쓰는 방법을 보여줍니다.

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## 런타임 속성 읽기
<a name="how-python-programming-properties"></a>

애플리케이션 코드를 변경하지 않고도 런타임 속성을 사용하여 애플리케이션을 구성할 수 있습니다.

Java 애플리케이션용 Managed Service for Apache Flink를 사용할 때와 같은 방식으로 애플리케이션의 애플리케이션 속성을 지정합니다. 런타임 속성은 다음과 같은 방법으로 지정할 수 있습니다.
+ [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 작업 사용하기
+ [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업 사용하기
+ 콘솔을 사용하여 애플리케이션 구성하기.

Managed Service for Apache Flink 런타임에서 생성하는 `application_properties.json`이라는 json 파일을 읽으면 코드에서 애플리케이션 속성을 검색할 수 있습니다.

다음 코드 예는 `application_properties.json` 파일에서 애플리케이션 속성을 읽는 방법을 보여줍니다.

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

다음 사용자 정의 함수 코드 예는 애플리케이션 속성 객체에서 속성 그룹을 읽는 방법을 보여줍니다: 검색합니다:

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

다음 코드 예는 이전 예에서 반환하는 속성 그룹에서 INPUT\$1STREAM\$1KEY라는 속성을 읽는 방법을 보여 줍니다.

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## 애플리케이션의 코드 패키지 생성
<a name="how-python-programming-package"></a>

Python 애플리케이션을 만든 후에는 코드 파일과 종속성을 zip 파일로 번들로 묶습니다.

zip 파일에는 `main` 메서드가 있는 Python 스크립트가 포함되어야 하며 선택적으로 다음을 포함할 수 있습니다.
+ 추가 Python 코드 파일
+ JAR 파일의 사용자 정의 Java 코드
+ JAR 파일의 Java 라이브러리

**참고**  
애플리케이션 zip 파일에는 애플리케이션의 모든 종속성이 포함되어야 합니다. 다른 소스의 라이브러리는 애플리케이션에 참조할 수 없습니다.

# Managed Service for Apache Flink Python 애플리케이션 생성
<a name="how-python-creating"></a>

## 코드 파일 지정
<a name="how-python-creating-code"></a>

애플리케이션의 코드 패키지를 만들었으면 Amazon S3 버킷에 업로드합니다. 그런 다음 콘솔 또는 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 작업을 사용하여 애플리케이션을 생성합니다.

[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 작업을 사용하여 애플리케이션을 만드는 경우 `kinesis.analytics.flink.run.options`라는 특수 애플리케이션 속성 그룹을 사용하여 zip 파일의 코드 파일과 아카이브를 지정합니다. 다음과 같은 유형의 파일을 정의할 수 있습니다.
+ **python**: Python 기본 메서드를 포함하는 텍스트 파일입니다.
+ **jarfile**: Java 사용자 정의 함수를 포함하는 Java JAR 파일입니다.
+ **pyFiles**: 애플리케이션에서 사용할 리소스가 포함된 Python 리소스 파일입니다.
+ **pyArchives**: 애플리케이션의 리소스 파일이 들어 있는 zip 파일입니다.

Apache Flink Python 코드 파일 유형에 관한 자세한 내용은 Apache Flink 설명서의 [명령줄 인터페이스](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/)를 참조하세요.

**참고**  
Managed Service for Apache Flink는 `pyModule`, `pyExecutable` 또는 `pyRequirements` 파일 유형을 지원하지 않습니다. 코드, 요건 및 종속성은 모두 zip 파일에 있어야 합니다. pip를 사용하여 설치할 종속성을 지정할 수 없습니다.

다음 예 json 스니펫은 애플리케이션의 zip 파일 내에서 파일 위치를 지정하는 방법을 보여줍니다.

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# Managed Service for Apache Flink Python 애플리케이션 모니터링
<a name="how-python-monitoring"></a>

애플리케이션의 CloudWatch 로그를 사용하여 Managed Service for Apache Flink Python 애플리케이션을 모니터링할 수 있습니다.

Managed Service for Apache Flink는 Python 애플리케이션을 위해 다음 메시지를 기록합니다.
+ 애플리케이션의 `main` 메서드에서 `print()`를 사용하여 콘솔에 기록되는 메시지.
+ `logging` 패키지를 사용하여 사용자 정의 함수로 전송된 메시지. 다음 코드 예는 사용자 정의 함수에서 애플리케이션 로그에 쓰는 방법을 보여줍니다.

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ 애플리케이션에서 발생한 오류 메시지.

  애플리케이션에서 `main` 함수에서 예외가 발생하면 애플리케이션 로그에 해당 예외가 나타납니다.

  다음 예는 Python 코드에서 발생한 예외에 대한 로그 항목을 보여줍니다.

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**참고**  
성능 문제로 인해 애플리케이션 개발 중에는 맞춤 로그 메시지만 사용하는 것이 좋습니다.

## CloudWatch 인사이트를 통한 로그 쿼리
<a name="how-python-monitoring-insights"></a>

다음 CloudWatch 인사이트 쿼리는 애플리케이션의 기본 기능을 실행하는 동안 Python 진입점에서 생성된 로그를 검색합니다.

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# Managed Service for Apache Flink에서 런타임 속성 사용
<a name="how-properties"></a>

애플리케이션 코드를 다시 컴파일하지 않고도 *런타임 속성*을 사용하여 애플리케이션을 구성할 수 있습니다.

**Topics**
+ [콘솔을 사용하여 런타임 속성 관리](#how-properties-console)
+ [CLI를 사용하여 런타임 속성 관리](#how-properties-cli)
+ [Managed Service for Apache Flink 애플리케이션의 런타임 속성 액세스](#how-properties-access)

## 콘솔을 사용하여 런타임 속성 관리
<a name="how-properties-console"></a>

 AWS Management Console을 사용하여 Managed Service for Apache Flink 애플리케이션에서 런타임 속성을 추가, 업데이트 또는 제거할 수 있습니다.

**참고**  
지원되는 이전 Apache Flink 버전을 사용 중이며 기존 애플리케이션을 Apache Flink 1.19.1로 업그레이드하려는 경우, 인플레이스 Apache Flink 버전 업그레이드를 사용하여 업그레이드할 수 있습니다. 인플레이스 버전 업그레이드를 사용하면 스냅샷, 로그, 지표, 태그, Flink 구성 등을 포함하여 Apache Flink 버전 전반에 걸쳐 단일 ARN을 기준으로 애플리케이션 추적성을 유지할 수 있습니다. 이 기능은 `RUNNING` 및 `READY` 상태에서 사용할 수 있습니다. 자세한 내용은 [Apache Flink에 인플레이스 버전 업그레이드 사용](how-in-place-version-upgrades.md) 단원을 참조하십시오.

**Managed Service for Apache Flink에 대한 런타임 속성 업데이트**

1. 에 로그인 AWS Management Console하고 https://console.aws.amazon.com/flink Amazon MSF 콘솔을 엽니다.

1. Managed Service for Apache Flink 애플리케이션을 선택하세요. **애플리케이션 세부 정보**를 선택합니다.

1. 애플리케이션 페이지에서 **구성**을 선택합니다.

1. **속성** 섹션을 확장합니다.

1. **속성** 섹션의 컨트롤을 사용하여 키-값 쌍으로 속성 그룹을 정의할 수 있습니다. 이러한 컨트롤을 사용하여 속성 그룹 및 런타임 속성을 추가, 업데이트 또는 제거할 수 있습니다.

1. **업데이트**를 선택합니다.

## CLI를 사용하여 런타임 속성 관리
<a name="how-properties-cli"></a>

[AWS CLI](https://docs.aws.amazon.com/cli)를 사용하여 런타임 속성을 추가, 업데이트 또는 제거할 수 있습니다.

이 섹션에는 애플리케이션의 런타임 속성을 구성하기 위한 API 작업에 대한 예제 요청이 포함되어 있습니다. JSON 파일을 사용하여 API 작업을 입력하는 방법에 대한 자세한 방법은 [Managed Service for Apache Flink API 예 코드](api-examples.md) 섹션을 참조하세요.

**참고**  
다음 예제의 샘플 계정 ID (*`012345678901`*)을(를) 계정 ID로 바꾸세요.

### 애플리케이션 생성 시 런타임 속성 추가
<a name="how-properties-create"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 작업에 대한 다음 예제 요청은 애플리케이션을 생성할 때 두 개의 런타임 속성 그룹(`ProducerConfigProperties` 및 `ConsumerConfigProperties`)을 추가합니다.

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### 기존 애플리케이션의 런타임 속성 추가 및 업데이트
<a name="how-properties-update"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 작업에 대한 다음 예제 요청은 기존 응용 프로그램의 런타임 속성을 추가하거나 업데이트합니다.

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**참고**  
속성 그룹에 해당하는 런타임 속성이 없는 키를 사용하는 경우 Managed Service for Apache Flink는 키-값 쌍을 새 속성으로 추가합니다. 속성 그룹의 기존 런타임 속성에 키를 사용하는 경우 Managed Service for Apache Flink에서 속성 값을 업데이트합니다.

### 런타임 속성 제거
<a name="how-properties-remove"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 작업에 대한 다음 예제 요청은 기존 애플리케이션에서 모든 런타임 속성과 속성 그룹을 제거합니다.

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**중요**  
속성 그룹에서 기존 속성 그룹 또는 기존 속성 키를 생략하면 해당 속성 그룹 또는 속성이 제거됩니다.

## Managed Service for Apache Flink 애플리케이션의 런타임 속성 액세스
<a name="how-properties-access"></a>

`Map<String, Properties>` 객체를 반환하는 정적 `KinesisAnalyticsRuntime.getApplicationProperties()` 메서드를 사용하여 Java 애플리케이션 코드에서 런타임 속성을 검색합니다.

다음은 애플리케이션의 런타임 속성을 검색하는 Java 코드 예제입니다.

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

다음과 같이 속성 그룹(`Java.Util.Properties`객체)을 검색합니다.

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

일반적으로 개별 속성을 검색할 필요 없이 `Properties` 객체를 전달하여 Apache Flink 소스 또는 싱크를 구성합니다. 다음 코드 예제는 런타임 속성에서 검색된 `Properties` 객체를 전달하여 Flink 소스를 만드는 방법을 보여줍니다.

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

코드 예제는 [Managed Service for Apache Flink 애플리케이션 생성 및 사용 예제](examples-collapsibles.md) 섹션을 참조하세요.

# Managed Service for Apache Flink에서 Apache Flink 커넥터 사용
<a name="how-flink-connectors"></a>

Apache Flink 커넥터는 Amazon Managed Service for Apache Flink 애플리케이션 내부 및 외부로 데이터를 이동하는 소프트웨어 구성 요소입니다. 커넥터는 파일과 디렉터리에서 읽을 수 있는 유연한 통합 구성 요소입니다. 커넥터는 Amazon 서비스 및 제3자 시스템과 상호 작용하기 위한 완전한 모듈로 구성됩니다.

커넥터 유형은 다음을 포함합니다.
+ **소스:** Kinesis 데이터 스트림, 파일, Apache Kafka 주제, 파일 또는 기타 데이터 소스에서 애플리케이션에 데이터를 제공합니다.
+ **싱크:** 애플리케이션에서 Kinesis 데이터 스트림, Firehose 스트림, Apache Kafka 주제 또는 기타 데이터 대상으로 데이터를 전송합니다.
+ **비동기 I/O:** 스트림을 강화하기 위해 데이터 소스(예: 데이터베이스)에 대한 비동기 액세스를 제공합니다.

Apache Flink 커넥터는 각 커넥터별 자체 소스 리포지토리에 저장됩니다. Apache Flink 커넥터의 버전 및 아티팩트는 사용 중인 Apache Flink 버전과 DataStream, Table 또는 SQL API 사용 여부에 따라 달라집니다.

Amazon Managed Service for Apache Flink는 40개 이상의 사전 구축된 Apache Flink 소스 및 싱크 커넥터를 지원합니다. 다음 표는 가장 인기 있는 커넥터와 연결된 버전 정보를 요약해 제공합니다. 비동기 싱크 프레임워크를 사용하여 사용자 지정 싱크를 빌드할 수도 있습니다. 자세한 내용은 Apache Flink 설명서의 [일반 비동기식 베이스 싱크](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/)를 참조하세요.

 Apache Flink AWS 커넥터의 리포지토리에 액세스하려면 [flink-connector-aws](https://github.com/apache/flink-connector-aws)를 참조하세요.

## Flink 2.2용 커넥터
<a name="connectors-flink-2-2"></a>

Flink 2.2로 업그레이드할 때는 커넥터 종속성을 Flink 2.x 런타임과 호환되는 버전으로 업데이트해야 합니다. Flink 커넥터는 Flink 런타임과 독립적으로 릴리스되며 일부 커넥터에는 아직 Flink 2.x 호환 릴리스가 없습니다. 다음 표에는이 쓰기를 기준으로 Amazon Managed Service for Apache Flink에서 일반적으로 사용되는 커넥터의 가용성이 요약되어 있습니다.


**Flink 2.2용 커넥터**  

| 커넥터 | Flink 2.0 이상 버전 | 참고 | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4.0.0-2.0 | Flink 2.2에 권장 | 
| Kinesis Data Streams(소스) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Flink 2.2에 권장 | 
| Kinesis Data Streams(싱크) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Flink 2.2에 권장 | 
| FileSystem(S3, HDFS) | Flink와 번들로 제공 | Flink 배포에 내장 - 항상 사용 가능 | 
| JDBC | 2.x에 대해 아직 릴리스되지 않음 | Flink 2.x 호환 릴리스 사용 불가 | 
| OpenSearch | 2.x에 대해 아직 릴리스되지 않음 | Flink 2.x 호환 릴리스 사용 불가 | 
| Elasticsearch | 2.x에 대해 아직 릴리스되지 않음 | OpenSearch 커넥터로 마이그레이션 고려 | 
|  – Amazon Managed Service for Prometheus | 2.x에 대해 아직 릴리스되지 않음 | 작성 시 Flink 2.x 호환 릴리스 없음 | 

애플리케이션이 아직 Flink 2.2 릴리스가 없는 커넥터에 의존하는 경우 커넥터가 호환 버전을 릴리스할 때까지 기다리거나 대체 버전으로 바꿀 수 있는지 여부를 평가하는 두 가지 옵션이 있습니다(예: JDBC 카탈로그 또는 사용자 지정 싱크 사용).

**알려진 문제**
+ 커넥터 v5.0.0 및 v6.0.0에 도입된 EFO(향상된 팬아웃/SubscribeToShard) 경로와 `KinesisStreamsSource` 함께를 사용하는 애플리케이션은 Kinesis 스트림이 리샤딩될 때 실패할 수 있습니다. 이는 커뮤니티에서 알려진 문제입니다. 자세한 내용은 [FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648)을 참조하세요.
+ `KinesisStreamsSource`와 함께 커넥터 v5.0.0 및 v6.0.0에 도입된 EFO(Enhanced Fan-Out/SubscribeToShard) 경로를 사용하는 애플리케이션은 Flink 애플리케이션에 역압이 가해지면 교착 상태가 발생하여 하나 이상의 TaskManager에서 데이터 처리가 완전히 중지될 `KinesisStreamsSink` 수 있습니다. TaskManagers 앱을 복구하려면 강제 중지 작업과 시작 앱 작업이 필요합니다. 커뮤니티에서 알려진 문제의 하위 사례인 [FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071)입니다.

## 이전 Flink 버전용 커넥터
<a name="connectors-older-versions"></a>


**이전 Flink 버전용 커넥터**  

| 커넥터 | Flink 버전 1.15 | Flink 버전 1.18 | Flink 버전 1.19 | Flink 버전 1.20 | 
| --- | --- | --- | --- | --- | 
| Kinesis Data Stream - 소스 - DataStream 및 Table API | flink-connector-kinesis, 1.15.4 | flink-connector-kinesis, 4.3.0-1.18 | flink-connector-kinesis, 5.0.0-1.19 | flink-connector-kinesis, 5.0.0-1.20 | 
| Kinesis Data Stream - 싱크 - DataStream and Table API | flink-connector-aws-kinesis-streams, 1.15.4 | flink-connector-aws-kinesis-streams, 4.3.0-1.18 | flink-connector-aws-kinesis-streams, 5.0.0-1.19 | flink-connector-aws-kinesis-streams, 5.0.0-1.20 | 
| Kinesis Data Streams - 소스/싱크 - SQL | flink-sql-connector-kinesis, 1.15.4 | flink-sql-connector-kinesis, 4.3.0-1.18 | flink-sql-connector-kinesis, 5.0.0-1.19 | flink-sql-connector-kinesis-streams, 5.0.0-1.20 | 
| Kafka - DataStream 및 Table API | flink-connector-kafka, 1.15.4 | flink-connector-kafka, 3.2.0-1.18 | flink-connector-kafka, 3.3.0-1.19 | flink-connector-kafka, 3.3.0-1.20 | 
| Kafka - SQL | flink-sql-connector-kafka, 1.15.4 | flink-sql-connector-kafka, 3.2.0-1.18 | flink-sql-connector-kafka, 3.3.0-1.19 | flink-sql-connector-kafka, 3.3.0-1.20 | 
| Firehose - DataStream 및 Table API | flink-connector-aws-kinesis-firehose, 1.15.4 | flink-connector-aws-firehose, 4.3.0-1.18 | flink-connector-aws-firehose, 5.0.0-1.19 | flink-connector-aws-firehose, 5.0.0-1.20 | 
| Firehose - SQL | flink-sql-connector-aws-kinesis-firehose, 1.15.4 | flink-sql-connector-aws-firehose, 4.3.0-1.18 | flink-sql-connector-aws-firehose, 5.0.0-1.19 | flink-sql-connector-aws-firehose, 5.0.0-1.20 | 
| DynamoDB - DataStream and Table API | flink-connector-dynamodb, 3.0.0-1.15 | flink-connector-dynamodb, 4.3.0-1.18 | flink-connector-dynamodb, 5.0.0-1.19 | flink-connector-dynamodb, 5.0.0-1.20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb, 3.0.0-1.15 | flink-sql-connector-dynamodb, 4.3.0-1.18 | flink-sql-connector-dynamodb, 5.0.0-1.19 | flink-sql-connector-dynamodb, 5.0.0-1.20 | 
| OpenSearch - DataStream and Table API | - | flink-connector-opensearch, 1.2.0-1.18 | flink-connector-opensearch, 1.2.0-1.19 | flink-connector-opensearch, 1.2.0-1.19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-sql-connector-opensearch, 1.2.0-1.19 | flink-sql-connector-opensearch, 1.2.0-1.19 | 
| Amazon Managed Service for Prometheus DataStream | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-prometheus, 1.0.0-1.19 | flink-connector-prometheus, 1.0.0-1.20 | 
| Amazon SQS DataStream 및 Table API | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-sqs, 5.0.0-1.19 | flink-connector-sqs, 5.0.0-1.20 | 

Amazon Managed Service for Apache Flink의 커넥터에 대해 자세히 알아보려면 다음을 참조하세요.
+ [DataStream API 커넥터](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [Table API 커넥터](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### 알려진 문제
<a name="connectors-known-issues"></a>

Apache Flink 1.15의 Apache Kafka 커넥터에 영향을 미치는 알려진 오픈 소스 Apache Flink 문제가 있습니다. 이 문제는 Apache Flink의 이후 버전에서 해결되었습니다.

자세한 내용은 [알려진 문제](flink-1-15-2.md#flink-1-15-known-issues) 단원을 참조하십시오.

# Managed Service for Apache Flink에서 내결함성 구현
<a name="how-fault"></a>

체크포인트는 Amazon Managed Service for Apache Flink.에서 내결함성을 구현하는 데 사용하는 방법입니다. *체크포인트*는 실행 중인 애플리케이션의 최신 백업으로, 예상치 못한 애플리케이션 중단이나 장애 조치로부터 즉시 복구하는 데 사용됩니다.

Apache Flink 애플리케이션의 체크포인트 설정에 관한 자세한 내용은 Apache Flink 설명서의 [체크포인트](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/)를 참조하세요.

*스냅샷*은 수동으로 생성하고 관리되는 애플리케이션 상태 백업입니다. 스냅샷으로 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 호출을 통해 애플리케이션을 이전 상태로 복원할 수 있습니다. 자세한 내용은 [스냅샷을 사용한 애플리케이션 백업 관리](how-snapshots.md) 단원을 참조하십시오.

애플리케이션에 체크포인트가 활성화되어 있는 경우 서비스는 예상치 못한 애플리케이션 재시작 시 애플리케이션 데이터의 백업을 생성하고 로드하여 내결함성을 제공합니다. 이러한 예기치 않은 애플리케이션 재시작은 예상치 못한 작업 재시작, 인스턴스 장애 등으로 인해 발생할 수 있습니다. 이렇게 하면 애플리케이션을 다시 시작하는 동안 장애 없이 실행하는 것과 동일한 의미 체계를 갖게 됩니다.

애플리케이션에 대해 스냅샷을 활성화하고 애플리케이션의 [ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html)을 사용하여 구성한 경우, 서비스는 애플리케이션 업데이트 중이나 서비스 관련 확장 또는 정비 중에 정확히 한 번 처리 의미 체계를 제공합니다.

## Managed Service for Apache Flink에서 체크포인트 구성
<a name="how-fault-configure"></a>

애플리케이션의 체크포인트 동작을 구성할 수 있습니다. 체크포인트 상태를 유지할지 여부, 상태를 체크포인트에 저장하는 빈도, 한 체크포인트 작업의 종료와 다른 체크포인트 작업의 시작 사이의 최소 간격을 정의할 수 있습니다.

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 또는 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) API 작업을 사용하여 다음 설정을 구성합니다.
+ `CheckpointingEnabled` — 애플리케이션에서 체크포인트가 활성화되었는지 여부를 나타냅니다.
+ `CheckpointInterval` — 체크포인트(지속성) 작업 사이의 시간(밀리초)을 포함합니다.
+ `ConfigurationType` — 기본 체크포인트 동작을 사용하려면 이 값을 `DEFAULT`으로 설정합니다. 다른 값을 구성하려면 이 값을 `CUSTOM`으로 설정하세요.
**참고**  
기본 체크포인트 동작은 다음과 같습니다.  
**CheckpointingEnabled:** true
**CheckpointInterval:** 60000
**MinPauseBetweenCheckpoints:** 5000
**ConfigurationType**이 로 설정된 경우 `DEFAULT`를 사용하거나 애플리케이션 코드의 값을 설정 AWS Command Line Interface하여 다른 값으로 설정된 경우에도 이전 값이 사용됩니다.
**참고**  
Flink 1.15 이후 버전에서는 Managed Service for Apache Flink가 자동 스냅샷 생성, 즉 애플리케이션 업데이트, 크기 조정 또는 중지하는 동안 `stop-with-savepoint`를 사용합니다.
+ `MinPauseBetweenCheckpoints` — 한 체크포인트 작업 종료와 다른 체크포인트 작업 시작 사이의 최소 시간(밀리초) 이 값을 설정하면 체크포인트 작업이 `CheckpointInterval` 보다 오래 걸리는 경우에 애플리케이션이 연속으로 체크포인트를 수행하는 것을 방지합니다.

## 체크포인트 API 예제 검토
<a name="how-fault-examples"></a>

이 섹션에는 애플리케이션의 체크포인트 구성을 위한 API 작업에 대한 예 요청이 포함되어 있습니다. JSON 파일을 사용하여 API 작업을 입력하는 방법에 대한 자세한 방법은 [Managed Service for Apache Flink API 예 코드](api-examples.md) 섹션을 참조하세요.

### 새 애플리케이션의 체크포인트 수행 구성
<a name="how-fault-examples-create-config"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 작업을 위한 다음 예 요청은 애플리케이션을 생성할 때 체크포인트 수행을 구성합니다.

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### 새 애플리케이션의 체크포인트 수행 비활성화
<a name="how-fault-examples-create-disable"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 작업을 위한 다음 예 요청은 애플리케이션을 생성할 때 체크포인트 수행을 비활성화합니다.

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### 기존 애플리케이션의 체크포인트 수행 구성
<a name="how-fault-examples-update-config"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 작업을 위한 다음 예 요청은 기존 애플리케이션의 체크포인트 수행을 구성합니다.

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### 기존 애플리케이션의 체크포인트 수행 비활성화
<a name="how-fault-examples-update-update-disable"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 작업을 위한 다음 예 요청은 기존 애플리케이션에 대한 체크포인트 수행을 비활성화합니다.

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# 스냅샷을 사용한 애플리케이션 백업 관리
<a name="how-snapshots"></a>

*스냅샷*은 Apache Flink *Savepoint*를 구현한 Managed Service for Apache Flink입니다. 스냅샷은 사용자 또는 서비스에 의해 트리거, 생성, 관리되는 애플리케이션 상태 백업입니다. Apache Flink 저장점에 관한 자세한 내용을 알아보려면 Apache Flink 설명서의 [저장점](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/)을 참조하세요. 스냅샷을 사용하면 애플리케이션 상태의 특정 스냅샷에서 애플리케이션을 다시 시작할 수 있습니다.

**참고**  
애플리케이션이 올바른 상태 데이터로 제대로 다시 시작하려면 하루에 여러 번 스냅샷을 생성하는 것이 좋습니다. 스냅샷의 올바른 주기는 애플리케이션의 비즈니스 로직에 따라 다릅니다. 스냅샷을 자주 생성하면 최신 데이터를 복구할 수 있지만 비용이 증가하고 더 많은 시스템 리소스가 필요합니다.

Managed Service for Apache Flink에서는 다음 API 작업을 사용하여 스냅샷을 관리합니다.
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

스냅샷 수에 대한 애플리케이션별 한도는 [Managed Service for Apache Flink 및 Studio 노트북 할당량](limits.md) 섹션을 참조하세요. 애플리케이션이 스냅샷 한도에 도달한 경우 스냅샷을 수동으로 만들면 `LimitExceededException` 코드와 함께 실패합니다.

Managed Service for Apache Flink는 스냅샷을 절대 삭제하지 않습니다. [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) 작업을 사용하여 스냅샷을 수동으로 삭제해야 합니다.

애플리케이션을 시작할 때 애플리케이션 상태의 저장된 스냅샷을 로드하려면 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) 또는 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 작업의 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html) 파라미터를 사용하세요.

**Topics**
+ [자동 스냅샷 생성 관리](#how-fault-snapshot-update)
+ [호환되지 않는 상태 데이터가 포함된 스냅샷에서 복원](#how-fault-snapshot-restore)
+ [스냅샷 API 예제 검토](#how-fault-snapshot-examples)

## 자동 스냅샷 생성 관리
<a name="how-fault-snapshot-update"></a>

`SnapshotsEnabled`가 애플리케이션의 [ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)에서 `true`로 설정된 경우 Managed Service for Apache Flink는 애플리케이션이 업데이트, 확장 또는 중지될 때 스냅샷을 자동으로 생성하고 사용하여 정확히 한 번 처리 의미 체계를 제공합니다.

**참고**  
`ApplicationSnapshotConfiguration::SnapshotsEnabled`를 `false`로 설정하면 애플리케이션 업데이트 중에 데이터가 손실될 수 있습니다.

**참고**  
Managed Service for Apache Flink는 스냅샷 생성 중에 중간 저장점을 트리거합니다. Flink 버전 1.15 이상에서는 중간 저장점이 더 이상 부작용을 일으키지 않습니다. [저장점 트리거](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)를 참조하세요.

자동으로 생성된 스냅샷의 품질은 다음과 같습니다.
+ 스냅샷은 서비스에 의해 관리되지만 [ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html) 작업을 사용하여 스냅샷을 볼 수 있습니다. 자동으로 생성된 스냅샷은 스냅샷 한도 계산에 포함됩니다.
+ 애플리케이션이 스냅샷 한도를 초과하는 경우 수동으로 생성한 스냅샷은 실패하지만 Managed Service for Apache Flink 서비스는 애플리케이션이 업데이트, 확장 또는 중지될 때 여전히 스냅샷을 성공적으로 생성합니다. 수동으로 더 많은 스냅샷을 생성하기 전에 먼저 [DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) 작업을 사용하여 스냅샷을 수동으로 삭제해야 합니다.

## 호환되지 않는 상태 데이터가 포함된 스냅샷에서 복원
<a name="how-fault-snapshot-restore"></a>

스냅샷에는 연산자에 대한 정보가 포함되어 있기 때문에 이전 애플리케이션 버전 이후 변경된 연산자의 스냅샷에서 상태 데이터를 복원하면 예상치 못한 결과가 발생할 수 있습니다. 현재 연산자에 해당하지 않는 스냅샷에서 상태 데이터를 복원하려고 시도하면 애플리케이션에 오류가 발생합니다. 오류가 발생한 애플리케이션은 `STOPPING` 또는 `UPDATING` 상태에서 멈춥니다.

애플리케이션이 호환되지 않는 상태 데이터가 포함된 스냅샷에서 복원할 수 있도록 하려면 [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)의 `AllowNonRestoredState` 파라미터가 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업을 사용하도록 `true`로 설정하세요.

더 이상 사용하지 않는 스냅샷에서 애플리케이션을 복원하면 다음과 같은 동작이 나타납니다.
+ **연산자 추가:** 새 연산자가 추가된 경우 저장점에는 새 연산자에 대한 상태 데이터가 없습니다. 오류가 발생하지 않으며 `AllowNonRestoredState`를 설정할 필요도 없습니다.
+ **연산자 삭제:** 기존 연산자가 삭제된 경우 저장점에는 누락된 연산자에 대한 상태 데이터가 있습니다. `AllowNonRestoredState`를 `true`로 설정하지 않으면 오류가 발생합니다.
+ **연산자 수정:** 파라미터 유형을 호환 가능한 유형으로 변경하는 등 호환되는 변경이 이루어진 경우 애플리케이션은 사용되지 않는 스냅샷에서 복원할 수 있습니다. 스냅샷에서 복원하는 방법에 관한 자세한 내용을 알아보려면 Apache Flink 설명서의 [저장점](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/)을 참조하세요. Apache Flink 버전 1.8 이상을 사용하는 애플리케이션은 다른 스키마를 사용한 스냅샷에서 복원할 수 있습니다. Apache Flink 버전 1.6을 사용하는 애플리케이션은 복원할 수 없습니다. 2단계 커밋 싱크에서 사용자가 생성한 스냅샷(CreateApplicationSnapshot) 대신 시스템 스냅샷(SwS)을 사용하는 것이 좋습니다.

  Flink의 경우 Managed Service for Apache Flink는 스냅샷 생성 중에 중간 저장점을 트리거합니다. Flink 1.15 이상 버전에서는 중간 저장점이 더 이상 부작용을 일으키지 않습니다. [저장점 트리거](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)를 참조하세요.

기존 저장점 데이터와 호환되지 않는 애플리케이션을 재개해야 하는 경우 [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 작업의 `ApplicationRestoreType` 파라미터를 `SKIP_RESTORE_FROM_SNAPSHOT`으로 설정하여 스냅샷에서 복원을 건너뛰는 것이 좋습니다.

Apache Flink가 병립될 수 없는 상태 데이터를 처리하는 방법에 대한 자세한 내용을 알아보려면 *Apache Flink 설명서*의 [상태 스키마 개선](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/)을 참조하세요.

## 스냅샷 API 예제 검토
<a name="how-fault-snapshot-examples"></a>

이 섹션에는 애플리케이션에서 스냅샷을 사용하기 위한 API 작업에 대한 예 요청이 포함되어 있습니다. JSON 파일을 사용하여 API 작업을 입력하는 방법에 대한 자세한 방법은 [Managed Service for Apache Flink API 예 코드](api-examples.md) 섹션을 참조하세요.

### 애플리케이션에 대한 스냅샷 활성화
<a name="how-fault-savepoint-examples-enable"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 작업을 위한 다음 예 요청은 애플리케이션에 대한 스냅샷을 활성화합니다.

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### 스냅샷 생성
<a name="how-fault-savepoint-examples-create"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html) 작업을 위한 다음 예 요청은 현재 애플리케이션 상태의 스냅샷을 생성합니다.

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### 애플리케이션에 대한 스냅샷 나열
<a name="how-fault-snapshot-examples-list"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) 작업을 위한 다음 예 요청은 현재 애플리케이션 상태의 처음 50개 스냅샷이 나열합니다.

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### 애플리케이션 스냅샷에 대한 세부 정보 나열
<a name="how-fault-snapshot-examples-describe"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) 작업을 위한 다음 예 요청은 애플리케이션 스냅샷에 대한 세부 정보를 나열합니다.

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### 스냅샷 삭제
<a name="how-fault-snapshot-examples-delete"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) 작업을 위한 다음 예 요청은 이전에 저장한 스냅샷을 삭제합니다. [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) 또는 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) 다음 중 하나를 사용하여 `SnapshotCreationTimestamp` 값을 가져올 수 있습니다.

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### 명명된 스냅샷을 사용하여 애플리케이션 재시작
<a name="how-fault-snapshot-examples-load-custom"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) 작업을 위한 다음 예 요청은 특정 스냅샷에 저장된 상태를 사용하여 애플리케이션을 시작합니다.

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### 최신 스냅샷을 사용하여 애플리케이션 재시작
<a name="how-fault-snapshot-examples-load-recent"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)작업을 위한 다음 예 요청은 가장 최근의 스냅샷을 사용하여 애플리케이션을 시작합니다.

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### 스냅샷을 사용하지 않고 애플리케이션 재시작
<a name="how-fault-snapshot-examples-load-none"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)작업을 위한 다음 예 요청은 스냅샷이 있더라도 애플리케이션 상태를 로드하지 않고 애플리케이션을 시작합니다.

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# Apache Flink에 인플레이스 버전 업그레이드 사용
<a name="how-in-place-version-upgrades"></a>

Apache Flink의 인플레이스 버전 업그레이드를 사용하면 Apache Flink 버전 전반에서 단일 ARN을 기준으로 애플리케이션 추적성을 유지할 수 있습니다. 여기에는 스냅샷, 로그, 지표, 태그, Flink 구성, 리소스 제한 증가, VPC 등이 포함됩니다.

Apache Flink의 인플레이스 버전 업그레이드를 수행하여 Amazon Managed Service for Apache Flink의 기존 애플리케이션을 새 Flink 버전으로 업그레이드할 수 있습니다. 이 작업을 수행하려면 AWS CLI, AWS CloudFormation, AWS SDK 또는를 사용할 수 있습니다 AWS Management Console.

**참고**  
Amazon Managed Service for Apache Flink Studio에서는 Apache Flink에 대해 인플레이스 버전 업그레이드를 사용할 수 없습니다.

**Topics**
+ [Apache Flink의 인플레이스 버전 업그레이드를 사용하여 애플리케이션 업그레이드](upgrading-applications.md)
+ [애플리케이션을 새로운 Apache Flink 버전으로 업그레이드](upgrading-application-new-version.md)
+ [애플리케이션 업그레이드 롤백](rollback.md)
+ [일반적인 애플리케이션 업그레이드 모범 사례 및 권장 사항](best-practices-recommendations.md)
+ [애플리케이션 업그레이드 시 주의 사항 및 알려진 문제](precautions.md)
+ [Flink 2.2로 업그레이드: 전체 가이드](flink-2-2-upgrade-guide.md)
+ [Flink 2.2 업그레이드를 위한 상태 호환성 가이드](state-compatibility.md)

# Apache Flink의 인플레이스 버전 업그레이드를 사용하여 애플리케이션 업그레이드
<a name="upgrading-applications"></a>

시작하기 전에 [인플레이스 버전 업그레이드](https://www.youtube.com/watch?v=f1qGGdaP2XI) 동영상 시청을 권장합니다.

Apache Flink에 대한 현재 위치 버전 업그레이드를 수행하려면 AWS CLI, , AWS CloudFormation AWS SDK 또는를 사용할 수 있습니다 AWS Management Console. 이 기능은 Managed Service for Apache Flink에서 `READY` 또는 `RUNNING` 상태로 사용 중인 기존 애플리케이션에 사용할 수 있습니다. UpdateApplication API를 사용하여 Flink 런타임을 변경하는 기능을 추가합니다.

## 업그레이드 전: Apache Flink 애플리케이션 업데이트
<a name="before-upgrading"></a>

Flink 애플리케이션을 작성할 때는 애플리케이션을 종속성과 함께 애플리케이션 JAR에 번들링한 후 해당 JAR을 Amazon S3 버킷에 업로드합니다. 그 후 Amazon Managed Service for Apache Flink는 사용자가 선택한 새로운 Flink 런타임에서 작업을 실행합니다. 업그레이드하려는 Flink 런타임과의 호환성을 확보하기 위해 애플리케이션을 업데이트해야 할 수도 있습니다. Flink 버전 간 불일치로 인해 버전 업그레이드가 실패할 수 있습니다. 대부분의 경우 소스(수신) 또는 대상(싱크, 송신) 커넥터와 Scala 종속성에서 이러한 문제가 발생합니다. Managed Service for Apache Flink에서 Flink 1.15 이상 버전은 Scala에 구애받지 않으며 사용하려는 Scala 버전을 JAR에 포함해야 합니다.

**애플리케이션을 업데이트하려면**

1. 상태를 가진 애플리케이션 업그레이드에 관한 Flink 커뮤니티의 조언을 읽습니다. [애플리케이션 및 Flink 버전 업그레이드](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/)를 참조합니다.

1. 알려진 문제 및 제한 사항 목록을 검토합니다. [애플리케이션 업그레이드 시 주의 사항 및 알려진 문제](precautions.md)을(를) 참조하세요.

1. 종속성을 업데이트하고 애플리케이션을 로컬에서 테스트합니다. 일반적으로 이러한 종속성은 다음과 같습니다.

   1. Flink 런타임 및 API.

   1. 새로운 Flink 런타임에 권장되는 커넥터. 업데이트하려는 런타임의 [릴리스 버전](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html)에서 확인 가능합니다.

   1. Scala - Apache Flink는 Flink 1.15부터 Scala에 구애받지 않습니다. 따라서 사용하려는 Scala 종속성을 애플리케이션 JAR에 포함해야 합니다.

1. 새 애플리케이션 JAR을 zipfile 형태로 빌드하여 Amazon S3에 업로드합니다. 이전 JAR/zipfile과 다른 이름을 사용하는 것이 좋습니다. 이 정보는 롤백이 필요한 경우 사용합니다.

1. 상태 저장 애플리케이션을 실행 중인 경우 현재 애플리케이션의 스냅샷을 생성할 것을 강력히 권장합니다. 이를 통해 업그레이드 도중 또는 이후 문제 발생 시 상태를 유지한 채 롤백할 수 있습니다.

# 애플리케이션을 새로운 Apache Flink 버전으로 업그레이드
<a name="upgrading-application-new-version"></a>

[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업을 사용하여 Flink 애플리케이션을 업그레이드할 수 있습니다.

`UpdateApplication` API는 다음과 같은 여러 가지 방식으로 직접적으로 호출할 수 있습니다.
+  AWS Management Console의 기존 **구성** 워크플로를 사용합니다.
  +  AWS Management Console에서 앱 페이지로 이동합니다.
  + **구성**을 선택합니다.
  + 새 런타임과 복원 구성이라고도 하는 시작하려는 스냅샷을 선택합니다. 최신 스냅샷에서 앱을 시작하려면 최신 설정을 복원 구성으로 사용합니다. Amazon S3에 있는 새로 업그레이드된 애플리케이션 JAR/zip 파일을 지정합니다.
+ the AWS CLI [update-application](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) 작업을 사용합니다.
+  CloudFormation (CFN)을 사용합니다.
  + [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment) 필드를 업데이트합니다. 이전에는에서 애플리케이션을 CloudFormation 삭제하고 새 애플리케이션을 생성하여 스냅샷 및 기타 앱 기록이 손실되었습니다. 이제 RuntimeEnvironment CloudFormation 를 업데이트하고 애플리케이션을 삭제하지 않습니다.
+  AWS SDK를 사용합니다.
  + 원하는 프로그래밍 언어의 SDK 설명서를 참조하세요. [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 섹션을 참조하세요.

애플리케이션이 `RUNNING` 상태이거나 `READY` 상태로 중지된 경우 업그레이드를 수행할 수 있습니다. Amazon Managed Service for Apache Flink는 원래 런타임 버전과 대상 런타임 버전 간의 호환성을 확인하기 위해 검증을 수행합니다. 이 호환성 검사는 `RUNNING` 상태에서 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)을 수행할 때 실행되며 `READY` 상태에서 업그레이드한 경우에는 다음 [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 시점에 실행됩니다.

## `RUNNING` 상태에서 애플리케이션 업그레이드
<a name="upgrading-running"></a>

다음 예제에서는를 사용하여 미국 동부(버지니아 북부)에서 라는 `RUNNING` 상태의 앱을 `UpgradeTest` Flink 1.18로 업그레이드 AWS CLI 하고 최신 스냅샷에서 업그레이드된 앱을 시작하는 방법을 보여줍니다.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ 서비스 스냅샷을 활성화한 상태에서 최신 스냅샷에서 애플리케이션을 계속 실행하려는 경우 Amazon Managed Service for Apache Flink는 현재 `RUNNING` 애플리케이션의 런타임이 선택한 대상 런타임과 호환되는지 확인합니다.
+ 대상 런타임을 계속할 스냅샷을 지정한 경우 Amazon Managed Service for Apache Flink는 대상 런타임이 해당 스냅샷과 호환되는지 확인합니다. 호환성 검사에 실패하면 업데이트 요청이 거부되며 애플리케이션은 `RUNNING` 상태에서 변경되지 않습니다.
+ 스냅샷을 사용하지 않고 애플리케이션을 시작하도록 선택한 경우 Amazon Managed Service for Apache Flink는 호환성 검사를 실행하지 않습니다.
+ 업그레이드된 애플리케이션이 실패하거나 전이적 `UPDATING` 상태에서 멈춘 경우 정상 상태로 복구하려면 [애플리케이션 업그레이드 롤백](rollback.md) 섹션의 지침을 따르세요.

**실행 중인 상태 애플리케이션의 프로세스 흐름**

![\[다음 다이어그램은 실행 중인 상태에서 애플리케이션을 업그레이드할 때 권장되는 워크플로를 나타냅니다. 애플리케이션이 상태 저장 상태이고 스냅샷을 활성화했다고 가정합니다. 이 워크플로에서는 업데이트 시 Amazon Managed Service for Apache Flink가 업데이트 전에 자동으로 생성한 최신 스냅샷에서 애플리케이션을 복원합니다.\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/images/in-place-update-while-running.png)


## **READY** 상태에서 애플리케이션 업그레이드
<a name="upgrading-ready"></a>

다음 예제는 AWS CLI를 사용하여 `READY` 상태의 `UpgradeTest` 앱을 미국 동부(버지니아 북부) 리전에서 Flink 1.18로 업그레이드하는 방법을 보여줍니다. 애플리케이션이 실행되고 있지 않기 때문에 앱을 시작할 스냅샷이 지정되어 있지 않습니다. 애플리케이션 시작 요청을 보낼 때 스냅샷을 지정할 수 있습니다.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ `READY` 상태에서는 애플리케이션의 런타임을 어떤 Flink 버전으로도 업데이트할 수 있습니다. Amazon Managed Service for Apache Flink는 애플리케이션이 시작되기 전까지 어떤 검사도 실행하지 않습니다.
+  Amazon Managed Service for Apache Flink는 앱 시작 시 선택한 스냅샷에 대해서만 호환성 검사를 실행합니다. 이는 [Flink 호환성 표](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table)를 기반으로 수행되는 기본 호환성 검사입니다. 스냅샷이 생성된 Flink 버전과 대상 Flink 버전만 확인합니다. 선택한 스냅샷의 Flink 런타임이 앱의 새 런타임과 호환되지 않으면 시작 요청이 거부될 수 있습니다.

**준비 상태 애플리케이션의 프로세스 흐름**

![\[다음 다이어그램은 준비 상태에서 애플리케이션을 업그레이드할 때 권장되는 워크플로를 나타냅니다. 애플리케이션이 상태 저장 상태이고 스냅샷을 활성화했다고 가정합니다. 이 워크플로에서는 업데이트 시 Amazon Managed Service for Apache Flink가 애플리케이션이 중단될 때 자동으로 생성한 최신 스냅샷에서 애플리케이션을 복원합니다.\]](http://docs.aws.amazon.com/ko_kr/managed-flink/latest/java/images/in-place-update-while-ready.png)


# 애플리케이션 업그레이드 롤백
<a name="rollback"></a>

애플리케이션에 문제가 있거나 애플리케이션 코드에서 Flink 버전 간의 불일치를 발견하는 경우 AWS CLI, , AWS CloudFormation, AWS SDK 또는를 사용하여 롤백할 수 있습니다 AWS Management Console. 다음 예제는 다양한 장애 시나리오에서 롤백이 어떻게 이루어지는지 보여줍니다.

## 런타임 업그레이드에 성공했고 애플리케이션이 `RUNNING` 상태이지만 작업이 실패하여 계속 재시작되는 경우
<a name="succeeded-restarting"></a>

`TestApplication` 상태 저장 애플리케이션을 미국 동부(버지니아 북부) 리전에서 Flink 1.15에서 Flink 1.18로 업그레이드한다고 가정합니다. 그러나 업그레이드된 Flink 1.18 애플리케이션은 `RUNNING` 상태임에도 불구하고 시작되지 않거나 지속적으로 재시작될 수 있습니다. 이는 일반적으로 발생하는 장애 시나리오입니다. 추가 가동 중지 시간을 방지하려면 애플리케이션을 이전 실행 버전(Flink 1.15)으로 즉시 롤백하고 문제는 이후에 진단할 것을 권장합니다.

애플리케이션을 이전 실행 버전으로 롤백하려면 [rollback-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI 명령 또는 [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API 작업을 사용합니다. 이 API 작업은 최신 버전을 생성하게 된 변경 사항을 롤백합니다. 그런 다음 가장 마지막으로 성공한 스냅샷을 사용하여 애플리케이션을 다시 시작합니다.

업그레이드를 시도하기 전에 기존 앱으로 스냅샷을 생성할 것을 강력히 권장합니다. 이는 데이터 손실이나 데이터 재처리 상황을 방지하는 데 도움이 됩니다.

이 실패 시나리오에서는가 애플리케이션을 롤백 CloudFormation 하지 않습니다. CloudFormation이 애플리케이션을 업데이트하도록 강제하려면 CloudFormation 템플릿을 이전 런타임과 이전 코드를 가리키도록 수정해야 합니다. 그렇지 않으면 CloudFormation은 애플리케이션이 `RUNNING` 상태로 전환될 때 업데이트가 완료된 것으로 가정합니다.

## `UPDATING` 상태에서 멈춰 있는 애플리케이션 롤백
<a name="stuck-updating"></a>

업그레이드 시도 후 애플리케이션이 `UPDATING` 또는 `AUTOSCALING` 상태에서 멈춘 경우 Amazon Managed Service for Apache Flink는 [롤백 애플리케이션](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI 명령 또는 멈춘 `UPDATING` 또는 `AUTOSCALING` 상태 이전에 애플리케이션을 버전으로 롤백할 수 있는 [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API 작업을 제공합니다. 이 API는 애플리케이션이 `UPDATING` 또는 `AUTOSCALING` 전이 상태에서 멈추게 한 변경 사항을 롤백합니다.

# 일반적인 애플리케이션 업그레이드 모범 사례 및 권장 사항
<a name="best-practices-recommendations"></a>
+ 프로덕션 업그레이드를 시도하기 전에 비프로덕션 환경에서 상태 없이 새 작업 및 런타임을 먼저 테스트합니다.
+ 상태 저장 업그레이드도 먼저 비프로덕션 애플리케이션에서 테스트하는 것을 고려하세요.
+ 업그레이드된 애플리케이션을 시작하는 데 사용할 스냅샷과 새 작업 그래프의 상태가 서로 호환되는지 반드시 확인합니다.
  + 연산자 상태에 저장되는 유형이 동일하게 유지되어야 합니다. 유형이 변경되면 Apache Flink는 해당 연산자 상태를 복원할 수 없습니다.
  + `uid` 메서드를 통해 설정한 연산자 ID가 동일하게 유지되도록 해야 합니다. Apache Flink는 연산자에 고유한 ID를 지정할 것을 강력히 권장합니다. 자세한 내용을 Apache Flink 설명서의 [연산자 ID 할당](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids)을 참조하세요.

    연산자에 ID를 지정하지 않으면 Flink가 ID를 자동으로 생성합니다. 이 경우 프로그램 구조에 따라 달라질 수 있어 구조 변경 시 호환성 문제가 발생할 수 있습니다. Flink는 연산자 ID를 사용해 스냅샷의 상태를 연산자와 연결합니다. 연산자 ID가 변경되면 애플리케이션이 시작되지 않거나 스냅샷에 저장된 상태가 삭제되어 연산자가 상태 없이 실행될 수 있습니다.
  + 키 지정 상태를 저장하는 데 사용되는 키는 변경하지 않습니다.
  + 창 또는 조인과 같은 상태 저장 연산자의 입력 유형은 수정하지 않습니다. 이렇게 하면 연산자의 내부 상태 유형이 암시적으로 변경되어 상태 비호환성을 초래합니다.

# 애플리케이션 업그레이드 시 주의 사항 및 알려진 문제
<a name="precautions"></a>

## 브로커 재시작 후 Kafka 커밋이 체크포인트에서 반복적으로 실패
<a name="apache-kafka-connector"></a>

Flink 버전 1.15의 Apache Kafka 커넥터에는 Kafka Client 2.8.1의 심각한 오픈 소스 Kafka Client 버그로 인해 발생하는 알려진 오픈 소스 Apache Flink 문제가 있습니다. 자세한 내용은 [브로커 재시작 후 Kafka 커밋이 체크포인트에서 반복적으로 실패](https://issues.apache.org/jira/browse/FLINK-28060) 및 [commitOffsetAsync 예외 후 KafkaConsumer가 그룹 코디네이터에 대한 연결을 복원할 수 없음](https://issues.apache.org/jira/browse/KAFKA-13840)을 참조하세요.

이 문제를 방지하려면 Amazon Managed Service for Apache Flink에서 Apache Flink 1.18 이상을 사용하는 것을 권장합니다.

## 상태 호환성의 알려진 제한 사항
<a name="state-precautions"></a>
+ Table API를 사용하는 경우 Apache Flink는 버전 간 상태 호환성을 보장하지 않습니다. 자세한 내용은 Apache Flink 설명서의 [상태 저장 업그레이드 및 발전](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution)을 참조하세요.
+ Flink 1.6 상태는 Flink 1.18과 호환되지 않습니다. 따라서 상태를 유지한 채 1.6에서 1.18 이상으로 업그레이드하려 하면 API 요청이 거부됩니다. 대신 1.8, 1.11, 1.13, 1.15로 순차 업그레이드한 뒤 스냅샷을 생성하고, 이후 1.18 이상으로 업그레이드하는 방식으로 진행할 수 있습니다. 자세한 내용은 Apache Flink 설명서의 [애플리케이션 및 Flink 버전 업그레이드](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/)를 참조하세요.

## Flink Kinesis 커넥터의 알려진 문제
<a name="kinesis-connector-precautions"></a>
+ Flink 1.11 이하 버전에서 향상된 팬아웃(EFO) 지원을 위해 `amazon-kinesis-connector-flink` 커넥터를 사용하는 경우, Flink 1.13 이상으로 상태 저장 업그레이드를 수행하려면 추가 단계가 필요합니다. 이는 커넥터의 패키지 이름 변경 때문입니다. 자세한 내용은 [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink)를 참조하세요.

  Flink 1.11 이하 버전의 `amazon-kinesis-connector-flink` 커넥터는 `software.amazon.kinesis` 패키징을 사용하지만, Flink 1.13 이상의 Kinesis 커넥터는 `org.apache.flink.streaming.connectors.kinesis`를 사용합니다. 마이그레이션 지원 도구로 [amazon-kinesis-connector-flink-state-migrator](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator)를 사용합니다.
+ Flink 1.13 이하 버전에서 `FlinkKinesisProducer`를 사용 중이며 Flink 1.15 이상으로 업그레이드하려는 경우, 상태 저장 업그레이드를 위해서는 새로운 `KinesisStreamsSink` 대신 Flink 1.15 이상에서도 계속 `FlinkKinesisProducer`를 사용해야 합니다. 하지만 싱크에 이미 사용자 지정 `uid`가 설정되어 있는 경우 `FlinkKinesisProducer`가 상태를 유지하지 않기 때문에 `KinesisStreamsSink`로 전환할 수 있습니다. 사용자 지정 `uid`가 설정되어 있으므로 Flink는 이를 동일한 연산자로 취급합니다.

## Scala로 작성된 Flink 애플리케이션
<a name="scala-precautions"></a>
+ Flink 1.15부터 Apache Flink 런타임에 Scala가 포함되지 않습니다. Flink 1.15 이상으로 업그레이드할 때 사용하려는 Scala 버전과 기타 Scala 종속성을 코드 JAR/zip에 포함해야 합니다. 자세한 내용은 [Amazon Managed Service for Apache Flink 1.15.2 릴리스](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html)를 참조하세요.
+ 애플리케이션이 Scala를 사용하고 있고 Flink 1.11 이하 버전(Scala 2.11)에서 Flink 1.13(Scala 2.12)으로 업그레이드하는 경우, 코드가 Scala 2.12를 사용하도록 해야 합니다. 그렇지 않으면 Flink 1.13 애플리케이션이 Flink 1.13 런타임에서 Scala 2.11 클래스를 찾지 못할 수 있습니다.

## Flink 애플리케이션 다운로드 시 고려해야 할 사항
<a name="downgrading-precautions"></a>
+ Flink 애플리케이션을 다운그레이드할 수 있지만, 이전에 더 낮은 Flink 버전에서 실행한 적이 있는 경우로 제한됩니다. 상태 저장 업그레이드의 경우 Managed Service for Apache Flink는 다운그레이드를 위해 동일 버전 또는 이전 버전에서 생성된 스냅샷 사용을 요구합니다.
+ Flink 1.13 이상에서 Flink 1.11 이하로 런타임을 업데이트하고 앱이 HashMap 상태 백엔드를 사용하는 경우, 애플리케이션은 계속 실패합니다.

# Flink 2.2로 업그레이드: 전체 가이드
<a name="flink-2-2-upgrade-guide"></a>

이 가이드에서는 Amazon Managed Service for Apache Flink 애플리케이션을 Flink 1.x에서 Flink 2.2로 업그레이드하기 위한 step-by-step 지침을 제공합니다. 이는 신중한 계획 및 테스트가 필요한 주요 버전 업그레이드입니다.

**메이저 버전 업그레이드는 단방향입니다.**  
업그레이드 작업은 상태 보존을 통해 애플리케이션을 Flink 1.x에서 2.2로 이동할 수 있지만 2.2 상태에서 2.2에서 1.x로 되돌릴 수는 없습니다. 업그레이드 후 애플리케이션이 비정상 상태가 되면 롤백 API를 사용하여 최신 스냅샷에서 원래 1.x 상태로 1.x 버전으로 돌아갑니다.

## 사전 조건
<a name="upgrade-guide-prerequisites"></a>

업그레이드를 시작하기 전에:
+ 검토 [주요 변경 사항 및 사용 중단](flink-2-2.md#flink-2-2-breaking-changes)
+ 검토 [Flink 2.2 업그레이드를 위한 상태 호환성 가이드](state-compatibility.md)
+ 테스트를 위한 비프로덕션 환경이 있는지 확인합니다.
+ 현재 애플리케이션 구성 및 종속성 문서화

## 마이그레이션 경로 이해
<a name="upgrade-guide-migration-paths"></a>

업그레이드 환경은 애플리케이션의 Flink 2.2와의 호환성에 따라 달라집니다. 이러한 경로를 이해하면 적절하게 준비하고 현실적인 기대치를 설정하는 데 도움이 됩니다.

**경로 1: 호환 가능한 바이너리 및 애플리케이션 상태**

**예상되는 사항:**
+ 업그레이드 작업 호출
+ 애플리케이션 상태 전환으로 2.2로 마이그레이션 완료: `RUNNING` → `UPDATING` → `RUNNING`
+ 데이터 손실 또는 재처리 없이 모든 애플리케이션 상태 보존
+ 마이너 버전 마이그레이션과 동일한 경험

가장 적합: 호환되는 직렬화를 사용하는 상태 비저장 애플리케이션 또는 애플리케이션(Avro, 호환되는 Protobuf 스키마, 컬렉션POJOs)

**경로 2: 이진 비호환성**

**예상되는 사항:**
+ 업그레이드 작업 호출
+ 작업이 실패하고 작업 API 및 로그를 통해 이진 비호환성을 표시합니다.
+ 자동 롤백이 활성화된 경우: 애플리케이션이 개입 없이 몇 분 내에 자동으로 롤백됩니다.
+ 자동 롤백이 비활성화된 경우: 애플리케이션이 데이터 처리 없이 실행 상태로 유지되고 수동으로 이전 버전으로 롤백합니다.
+ 바이너리가 수정되면 경로 1과 유사한 환경에 [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)를 사용합니다.

가장 적합: Flink 작업 시작 중에 감지된 제거된 APIs 사용하는 애플리케이션

**경로 3: 호환되지 않는 애플리케이션 상태**

**예상되는 사항:**
+ 업그레이드 작업 호출
+ 마이그레이션이 처음에 성공하는 것처럼 보임
+ 상태 복원이 실패하면 애플리케이션이 몇 초 내에 재시작 루프에 들어갑니다.
+ 연속 재시작을 보여주는 CloudWatch 지표를 통해 장애 감지
+ 롤백 작업 수동 호출
+ 롤백을 시작한 후 몇 분 이내에 프로덕션으로 돌아갑니다.
+ [상태 마이그레이션](state-compatibility.md#state-compat-migration) 애플리케이션 검토

가장 적합: 상태 직렬화 비호환성이 있는 애플리케이션(컬렉션POJOs, 특정 Kryo 직렬화 상태)

**참고**  
프로덕션 애플리케이션에 대해 동일한 단계를 수행하기 전에 프로덕션 애플리케이션의 복제본을 생성하고 복제본에서 다음 각 업그레이드 단계를 테스트하는 것이 좋습니다.

## 1단계: 준비
<a name="upgrade-guide-phase-1"></a>

**애플리케이션 코드 업데이트**

Flink 2.2와 호환되도록 애플리케이션 코드를 업데이트합니다.
+ `pom.xml` 또는에서 **Flink 종속성을 버전 2.2.0으로 업데이트** `build.gradle`
+ **커넥터 종속성을 Flink 2.2 호환 버전으로 업데이트**( 참조[커넥터 가용성](flink-2-2.md#flink-2-2-connectors))
+ **더 이상 사용되지 않는 API 사용 제거**:
  + DataSet API를 DataStream API 또는 테이블 API/SQL로 교체
  + 레거시 `SourceFunction`/`SinkFunction`를 FLIP-27 소스 및 FLIP-143 싱크 APIs로 대체
  + Scala API 사용량을 Java API로 바꾸기
+ **Java 17로 업데이트**

**업데이트된 애플리케이션 코드 업로드**
+ Flink 2.2 종속성을 사용하여 애플리케이션 JAR 빌드
+ 현재 JAR과 **다른 파일 이름으로** Amazon S3에 업로드(예: `my-app-flink-2.2.jar`)
+ 업그레이드 단계에서 사용할 S3 버킷 및 키 기록

## 2단계: 자동 롤백 활성화
<a name="upgrade-guide-phase-2"></a>

자동 롤백을 사용하면 업그레이드에 실패할 경우 Amazon Managed Service for Apache Flink가 자동으로 이전 버전으로 되돌릴 수 있습니다.

**자동 롤백 상태 확인**

*AWS Management Console:*

1. 애플리케이션으로 이동

1. **구성** 선택

1. **애플리케이션 설정**에서 **시스템 롤백**이 활성화되어 있는지 확인합니다.

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**자동 롤백 활성화(활성화되지 않은 경우)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## 3단계: 스냅샷 생성(선택 사항)
<a name="upgrade-guide-phase-3"></a>

애플리케이션에 자동 스냅샷이 활성화된 경우이 단계를 건너뛰고, 그렇지 않으면 업그레이드하기 전에 애플리케이션의 스냅샷을 생성하여 애플리케이션의 상태를 저장할 수 있습니다.

**실행 중인 애플리케이션에서 스냅샷 생성**

*AWS Management Console:*

1. 애플리케이션으로 이동

1. **스냅샷** 선택

1. **스냅샷 생성을** 선택합니다.

1. 스냅샷 이름 입력(예: `pre-flink-2.2-upgrade`)

1. **생성**을 선택합니다.

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**스냅샷 생성 확인**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

계속하기 `READY` 전에 `SnapshotStatus`가 될 때까지 기다립니다.

## 4단계: 애플리케이션 업그레이드
<a name="upgrade-guide-phase-4"></a>

[https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업을 사용하여 Flink 애플리케이션을 업그레이드할 수 있습니다.

`UpdateApplication` API는 다음과 같은 여러 가지 방식으로 직접적으로 호출할 수 있습니다.
+ ** AWS Management Console을 사용합니다.**
  +  AWS Management Console에서 앱 페이지로 이동합니다.
  + **구성**을 선택합니다.
  + 새 런타임과 복원 구성이라고도 하는 시작하려는 스냅샷을 선택합니다. 최신 스냅샷에서 앱을 시작하려면 최신 설정을 복원 구성으로 사용합니다. Amazon S3에 있는 새로 업그레이드된 애플리케이션 JAR/zip 파일을 지정합니다.
+ [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) 작업을 **사용합니다 AWS CLI**.
+ **를 사용합니다 CloudFormation.**
  + `RuntimeEnvironment` 필드를 업데이트합니다. 이전에는 CloudFormation 이 애플리케이션을 삭제한 뒤 새로 생성하여 스냅샷과 기타 앱 기록이 손실되었습니다. 이제 `RuntimeEnvironment`가 현재를 CloudFormation 업데이트하고 애플리케이션을 삭제하지 않습니다.
+ ** AWS SDK를 사용합니다.**
  + 원하는 프로그래밍 언어의 SDK 설명서를 참조하세요. [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)을(를) 참조하세요.

애플리케이션이 `RUNNING` 상태이거나 `READY` 상태로 중지된 경우 업그레이드를 수행할 수 있습니다. Amazon Managed Service for Apache Flink는 원래 런타임 버전과 대상 런타임 버전 간의 호환성을 검증합니다. 이 호환성 검사는 상태`UpdateApplication`일 때를 수행하거나 `RUNNING` 상태일 때 업그레이드할 `StartApplication` 때 실행됩니다`READY`.

**실행 중 상태에서 업그레이드**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**READY 상태에서 업그레이드**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## 5단계: 업그레이드 모니터링
<a name="upgrade-guide-phase-5"></a>

**호환성 검사**
+ 작업 API를 사용하여 업그레이드 상태를 확인합니다. 이진 비호환성 또는 작업 시작 문제가 있는 경우 로그와 함께 업그레이드 작업이 실패합니다.
+ 업그레이드 작업이 성공했지만 애플리케이션이 재시작 루프에 멈춘 경우 상태가 새 Flink 버전과 호환되지 않거나 업데이트된 코드에 문제가 있는 것입니다. 상태 비호환성 문제를 식별하는 [Flink 2.2 업그레이드를 위한 상태 호환성 가이드](state-compatibility.md) 방법을 검토합니다.

**애플리케이션 상태 모니터링**

*애플리케이션 상태:*
+ 애플리케이션 상태가 전환되어야 함: `RUNNING` → `UPDATING` → `RUNNING`
+ 애플리케이션의 런타임을 확인합니다. 2.2인 경우 업그레이드 작업이 성공한 것입니다.
+ 애플리케이션이에 `RUNNING` 있지만 여전히 이전 런타임에 있는 경우 자동 롤백이 시작되었습니다. 작업 API는 작업을 로 표시합니다`FAILED`. 로그를 확인하여 실패에 대한 예외를 찾습니다.

또한 CloudWatch에서 이러한 지표를 모니터링합니다.

*지표 다시 시작:*
+ `numRestarts`: 예기치 않은 재시작 모니터링 -가 0이고 `uptime` 또는 `numRestarts`가 증가하는 경우 업그레이드`runningTime`가 성공합니다.

*체크포인트 지표:*
+ `lastCheckpointDuration`: 업그레이드 전 값과 유사해야 합니다.
+ `numberOfFailedCheckpoints`: 0을 유지해야 합니다.

## 6단계: 애플리케이션 동작 검증
<a name="upgrade-guide-phase-6"></a>

애플리케이션이 Flink 2.2에서 실행된 후:

**기능 검증**
+ 소스에서 데이터를 읽고 있는지 확인
+ 싱크에 데이터가 기록되고 있는지 확인
+ 비즈니스 로직이 예상 결과를 생성하는지 확인
+ 출력과 업그레이드 전 기준 비교

**성능 검증**
+ 지연 시간 지표 모니터링(end-to-end 처리 시간)
+ 처리량 지표 모니터링(초당 레코드 수)
+ 체크포인트 기간 및 크기 모니터링
+ 메모리 및 CPU 사용률 모니터링

**24시간 이상 실행**

다음을 보장하기 위해 프로덕션 환경에서 최소 24시간 동안 애플리케이션을 실행하도록 허용합니다.
+ 메모리 누수 없음
+ 안정적인 체크포인트 동작
+ 예기치 않은 재시작 없음
+ 일관된 처리량

## 7단계: 롤백 절차
<a name="upgrade-guide-phase-7"></a>

업그레이드에 실패하거나 애플리케이션이 실행 중이지만 비정상인 경우 이전 버전으로 롤백합니다.

**자동 롤백**

자동 롤백이 활성화되어 있고 시작 중에 업그레이드가 실패하면 Amazon Managed Service for Apache Flink가 자동으로 이전 버전으로 돌아갑니다.

**수동 롤백**

애플리케이션이 실행 중이지만 비정상인 경우 `RollbackApplication` API를 사용합니다.

*AWS Management Console:*

1. 애플리케이션으로 이동

1. **작업** → **롤백**을 선택합니다.

1. 롤백 확인

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**롤백 중에 발생하는 일:**
+ 애플리케이션 중지
+ 런타임이 이전 Flink 버전으로 되돌림
+ 애플리케이션 코드가 이전 JAR로 되돌림
+ 업그레이드 **전에** 마지막으로 성공한 스냅샷에서 애플리케이션이 다시 시작됩니다.

**중요**  
Flink 1.x에서는 Flink 2.2 스냅샷을 복원할 수 없습니다.
롤백은 업그레이드 전에 생성된 스냅샷을 사용합니다.
업그레이드하기 전에 항상 스냅샷 생성(3단계)

## 다음 단계
<a name="upgrade-guide-next-steps"></a>

업그레이드 중 질문이나 문제는 섹션을 참조[Managed Service for Apache Flink 문제 해결](troubleshooting.md)하거나 AWS Support에 문의하세요.

# Flink 2.2 업그레이드를 위한 상태 호환성 가이드
<a name="state-compatibility"></a>

Flink 1.x에서 Flink 2.2로 업그레이드할 때 상태 호환성 문제로 인해 애플리케이션이 스냅샷에서 복원되지 않을 수 있습니다. 이 가이드는 잠재적 호환성 문제를 식별하고 마이그레이션 전략을 제공하는 데 도움이 됩니다.

## 상태 호환성 변경 사항 이해
<a name="state-compat-understanding"></a>

Amazon Managed Service for Apache Flink 2.2에는 상태 호환성에 영향을 미치는 여러 직렬화 변경 사항이 도입되었습니다. 다음은 주요 항목입니다.
+ **Kryo 버전 업그레이드**: Apache Flink 2.2는 번들링된 Kryo 직렬 변환기를 버전 2에서 버전 5로 업그레이드합니다. Kryo v5는 Kryo v2와 다른 이진 인코딩 형식을 사용하므로 Flink 1.x 저장점에서 Kryo를 통해 직렬화된 연산자 상태는 Flink 2.2에서 복원할 수 없습니다.
+ **Java 컬렉션 직렬화**: Flink 1.x에서 POJOs 내의 Java 컬렉션(예: `HashMap``ArrayList`, 및 `HashSet`)은 Kryo를 사용하여 직렬화되었습니다. Flink 2.2에는 1.x의 Kryo 직렬화 상태와 호환되지 않는 컬렉션별 최적화 직렬 변환기가 도입되었습니다. 1.x에서 POJO 또는 Kryo 직렬 변환기와 함께 Java 컬렉션을 사용하는 애플리케이션은 Flink 2.2에서이 상태를 복원할 수 없습니다. 데이터 유형 및 직렬화에 대한 자세한 내용은 Flink [설명서를](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) 참조하세요.
+ **Kinesis 커넥터 호환성**: 5.0 미만의 Kinesis Data Streams(KDS) 커넥터 버전은 Flink 2.2 Kinesis 커넥터 버전 6.0과 호환되지 않는 상태를 유지합니다. 업그레이드하기 전에 커넥터 버전 5.0 이상으로 마이그레이션해야 합니다.

## 직렬화 호환성 참조
<a name="state-compat-reference"></a>

애플리케이션의 모든 상태 선언을 검토하고 직렬화 유형을 아래 표와 일치시킵니다. 상태 유형이 호환되지 않는 경우 업그레이드를 진행하기 전에 [상태 마이그레이션](#state-compat-migration) 섹션을 참조하세요.


**직렬화 호환성 참조**  

| 직렬화 유형 | 호환되나요? | 세부 정보 | 
| --- | --- | --- | 
| Avro(SpecificRecord, GenericRecord) | 예 | Kryo와 독립적인 자체 바이너리 형식을 사용합니다. Kryo 직렬 변환기로 등록된 Avro가 아닌 Flink의 기본 Avro 유형 정보를 사용하고 있는지 확인합니다. | 
| Protobuf | 예 | Kryo와 독립적인 자체 바이너리 인코딩을 사용합니다. 스키마 변경 사항이 이전 버전과 호환되는 진화 규칙을 따르는지 확인합니다. | 
| 컬렉션POJOs  | 예 | Flink의 POJO 직렬 변환기에서 처리 - 클래스가 퍼블릭 클래스, 퍼블릭 no-arg 생성자, 모든 퍼블릭 필드 또는 getter/setters를 통해 액세스할 수 있는 필드, Flink에서 직렬화할 수 있는 모든 필드 유형 자체 등 모든 POJO 기준을 충족하는 경우에만 처리됩니다. 이러한 문제를 위반하는 POJO는 자동으로 Kryo로 돌아가 호환되지 않습니다. | 
| 사용자 지정 TypeSerializers | 예 | serializer가 Kryo에 내부적으로 위임하지 않는 경우에만 호환됩니다. | 
| SQL 및 테이블 API 상태 | 예(주의 사항 포함) | Flink의 내부 직렬 변환기를 사용합니다. 그러나 Apache Flink는 Table API 애플리케이션의 메이저 버전 간 상태 호환성을 보장하지 않습니다. 비프로덕션 환경에서 테스트합니다. | 
| Java 컬렉션POJOs(HashMap, ArrayList, HashSet) | 아니요 | Flink 1.x에서는 POJOs 내의 컬렉션이 Kryo v2를 통해 직렬화되었습니다. Flink 2.2에는 바이너리 형식이 Kryo v2 형식과 호환되지 않는 전용 컬렉션 직렬화기가 도입되었습니다. | 
| Scala 사례 클래스 | 아니요 | Flink 1.x의 Kryo를 통해 직렬화됩니다. Kryo v2에서 v5로 업그레이드하면 바이너리 형식이 변경됩니다. | 
| Java 레코드 | 아니요 | 일반적으로 Flink 1.x에서 Kryo 직렬화로 돌아갑니다. 를 사용하여 테스트하여 확인합니다disableGenericTypes(). | 
| 타사 라이브러리 유형 | 아니요 | 등록된 사용자 지정 직렬 변환기가 없는 유형은 Kryo로 돌아갑니다. Kryo v2에서 v5로의 바이너리 형식 변경은 호환성을 깨뜨립니다. | 
| Kryo 대체를 사용하는 모든 유형 | 아니요 | Flink가 내장 또는 등록된 직렬 변환기가 있는 유형을 처리할 수 없는 경우 Kryo로 돌아갑니다. 1.x의 모든 Kryo 직렬화 상태는 2.2와 호환되지 않습니다. | 

## 진단 방법
<a name="state-compat-diagnostics"></a>

애플리케이션 로그를 보거나 [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업 후 로그를 검사하여 상태 호환성 문제를 사전에 식별할 수 있습니다.

**애플리케이션에서 Kryo 대체 식별**

로그에서 다음 정규식 패턴을 사용하여 애플리케이션에서 Kryo 대체를 식별할 수 있습니다.

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

샘플 로그:

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

UpdateApplication API를 사용하여 업그레이드에 실패하면 다음 예외에 따라 serializer 기반 상태 비호환성이 발생할 수 있습니다.

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException(POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## 업그레이드 전 체크리스트
<a name="state-compat-checklist"></a>
+ 애플리케이션의 모든 상태 선언 검토
+ 컬렉션POJOs 확인(`HashMap`, `ArrayList`, `HashSet`)
+ 각 상태 유형에 대한 직렬화 메서드 확인
+ 이 복제본에서 UpdateApplication API를 사용하여 Prod 복제본 애플리케이션 생성 및 상태 호환성 테스트
+ 상태가 호환되지 않는 경우에서 전략을 선택합니다. [상태 마이그레이션](#state-compat-migration) 
+ 프로덕션 Flink 애플리케이션 구성에서 자동 롤백 활성화

## 상태 마이그레이션
<a name="state-compat-migration"></a>

**재구축 완료 상태**

소스 데이터에서 상태를 다시 빌드할 수 있는 애플리케이션에 가장 적합합니다.

애플리케이션이 소스 데이터에서 상태를 다시 빌드할 수 있는 경우:

1. Flink 1.x 애플리케이션 중지

1. 업데이트된 코드를 사용하여 Flink 2.x로 업그레이드

1. 로 시작 `SKIP_RESTORE_FROM_SNAPSHOT`

1. 애플리케이션이 상태를 다시 빌드하도록 허용

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## 모범 사례
<a name="state-compat-best-practices"></a>

1. **복잡한 상태에는 항상 Avro 또는 Protobuf 사용** - 스키마 진화를 제공하고 Kryo에 독립적입니다.

1. **POJOs에서 컬렉션 방지** - 대신 Flink의 네이티브 사용 `ListState` `MapState` 

1. **로컬에서 상태 복원 테스트** - 프로덕션 업그레이드 전에 실제 스냅샷으로 테스트

1. **자주 스냅샷 생성 **- 특히 메이저 버전 업그레이드 전

1. **자동 롤백 활성화 -** 실패 시 자동으로 롤백하도록 MSF 애플리케이션을 구성합니다.

1. **상태 유형 문서화** - 모든 상태 유형 및 직렬화 방법에 대한 문서 유지 관리

1. **체크포인트 크기 모니터링** - 체크포인트 크기가 증가하면 직렬화 문제가 발생할 수 있습니다.

## 다음 단계
<a name="state-compat-next-steps"></a>

**업그레이드 계획: 섹션을 참조하세요**[Flink 2.2로 업그레이드: 전체 가이드](flink-2-2-upgrade-guide.md).

마이그레이션 중 질문이나 문제는 섹션을 참조[Managed Service for Apache Flink 문제 해결](troubleshooting.md)하거나 AWS Support에 문의하세요.

# Managed Service for Apache Flink의 애플리케이션 조정 구현
<a name="how-scaling"></a>

Amazon Managed Service for Apache Flink의 작업 병렬 실행 및 리소스 할당을 구성하여 규모 조정을 구현할 수 있습니다. Apache Flink가 작업의 병렬 인스턴스를 예약하는 방법에 관한 자세한 내용은 Apache Flink 설명서의 [병렬 실행](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/)을 참조하세요.

**Topics**
+ [애플리케이션 병렬 처리 및 ParallelismPerKPU 구성](#how-parallelism)
+ [Kinesis 처리 단위 할당](#how-scaling-kpus)
+ [애플리케이션의 병렬 처리 업데이트](#how-scaling-howto)
+ [Managed Service for Apache Flink에서 자동 규모 조정 사용](how-scaling-auto.md)
+ [maxParallelism 고려 사항](#how-scaling-auto-max-parallelism)

## 애플리케이션 병렬 처리 및 ParallelismPerKPU 구성
<a name="how-parallelism"></a>

다음 [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html) 속성을 사용하여 Managed Service for Apache Flink 애플리케이션 작업(예: 소스에서 읽기 또는 연산자 실행)에 대한 병렬 실행을 구성합니다.
+ `Parallelism` — 이 속성을 사용하여 기본 Apache Flink 애플리케이션 병렬성을 설정합니다. 모든 연산자, 소스 및 싱크는 애플리케이션 코드에서 재정의되지 않는 한 이 병렬성으로 실행됩니다. 기본값은 `1`이고, 기본 최대값은 `256`입니다.
+ `ParallelismPerKPU` — 이 속성을 사용하여 애플리케이션의 Kinesis 처리 단위(KPU)에 따라 예약할 수 있는 병렬 작업 수를 설정합니다. 기본값은 `1`이고 최대값은 `8`입니다. 블로킹 작업(예: I/O)이 있는 애플리케이션의 경우 `ParallelismPerKPU` 값이 높을수록 KPU 리소스가 완전히 활용됩니다.

**참고**  
`Parallelism`의 한도는 KPU 한도(기본 값 64)의 `ParallelismPerKPU`배와 같습니다. 한도 증가를 요청하여 KPU 한도를 늘릴 수 있습니다. 한도 증가를 요청하는 방법에 대한 지침은 [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html)의 “한도 증가를 요청하려면”을 참조하세요.

특정 연산자의 작업 병렬 처리 설정에 관한 자세한 내용은 Apache Flink 설명서의 [병렬 처리 설정: 연산자](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level)를 참조하세요.

## Kinesis 처리 단위 할당
<a name="how-scaling-kpus"></a>

Amazon Managed Service for Apache Flink는 용량을 KPU로 프로비저닝합니다. 단일 KPU는 1개의 vCPU 및 4GB의 메모리를 제공합니다. 할당된 모든 KPU에 대해 50GB의 실행 중인 애플리케이션 스토리지도 제공됩니다.

Managed Service for Apache Flink는 다음과 같이 `Parallelism` 및 `ParallelismPerKPU` 속성을 사용하여 애플리케이션을 실행하는 데 필요한 KPU를 계산합니다.

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

Managed Service for Apache Flink는 처리량 또는 처리 활동의 급증에 대응하여 애플리케이션 리소스를 신속하게 제공합니다. 활동 급증이 지난 후 애플리케이션에서 리소스를 점진적으로 제거합니다. 리소스 자동 할당을 비활성화하려면 나중에 [애플리케이션의 병렬 처리 업데이트](#how-scaling-howto)에서 설명하는 대로 `AutoScalingEnabled` 값을 `false`로 설정합니다.

애플리케이션에 대한 KPU 기본 한도는 64입니다. 이 한도 증가를 요청하는 방법에 대한 지침은 [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html)의 “한도 증가를 요청하려면”을 참조하십시오.

**참고**  
오케스트레이션을 위해 추가 KPU가 부과됩니다. 자세한 내용을 알아보려면 [Managed Service for Apache Flink 요금](https://aws.amazon.com/kinesis/data-analytics/pricing/)을 참조하세요.

## 애플리케이션의 병렬 처리 업데이트
<a name="how-scaling-howto"></a>

이 섹션에는 애플리케이션의 병렬성을 설정하는 API 작업에 대한 샘플 요청이 포함되어 있습니다. API 작업과 함께 요청 블록을 사용하는 방법에 대한 추가 예와 지침은 [Managed Service for Apache Flink API 예 코드](api-examples.md) 섹션을 참조하십시오.

다음 예 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 작업 요청은 애플리케이션을 만들 때 병렬성을 설정합니다.

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 작업을 위한 다음 예 요청은 기존 애플리케이션에 대한 병렬성을 설정합니다.

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 작업을 위한 다음 예 요청은 기존 애플리케이션에 대한 병렬성을 비활성화합니다.

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# Managed Service for Apache Flink에서 자동 규모 조정 사용
<a name="how-scaling-auto"></a>

Managed Service for Apache Flink는 대부분의 시나리오에서 소스의 데이터 처리량과 연산자의 복잡성을 수용할 수 있도록 애플리케이션의 병렬성을 탄력적으로 조정합니다. 자동 조정은 기본적으로 활성화되어 있습니다. Managed Service for Apache Flink는 애플리케이션의 리소스(CPU) 사용량을 모니터링하고 그에 따라 애플리케이션의 병렬성을 탄력적으로 늘리거나 줄입니다.
+ CloudWatch 지표 최대 `containerCPUUtilization`이 15분 동안 75% 이상이면 애플리케이션이 스케일 업(병렬성 증가)합니다. 즉, 75% 이상인 1분 기간의 연속 데이터 포인트가 15개 있을 때 `ScaleUp` 작업이 시작됩니다. `ScaleUp` 작업은 애플리케이션의 `CurrentParallelism`을 두 배로 증가시키며, `ParallelismPerKPU`는 변경되지 않습니다. 그 결과 할당된 KPU도 두 배가 됩니다.
+ CPU 사용량이 6시간 동안 10% 미만으로 유지되면 애플리케이션이 축소됩니다 (병렬성 감소). 즉, 10% 미만인 1분 기간의 연속 데이터 포인트가 360개 있을 때 `ScaleDown` 작업이 시작됩니다. `ScaleDown` 작업은 애플리케이션의 병렬 처리를 절반(올림 적용)으로 줄이며, `ParallelismPerKPU`는 변경되지 않고, 할당된 KPU 수도 절반(올림 적용)으로 줄어듭니다.

**참고**  
1분 이상의 최대 `containerCPUUtilization` 기간을 참조하여 조정 작업에 사용되는 데이터 포인트와의 상관 관계를 찾을 수 있지만, 작업이 시작된 정확한 순간을 반영할 필요는 없습니다.

Managed Service for Apache Flink는 애플리케이션 `CurrentParallelism` 값을 애플리케이션의 `Parallelism` 설정보다 낮추지 않습니다.

Managed Service for Apache Flink 서비스가 애플리케이션을 확장할 때는 `AUTOSCALING` 상태가 됩니다. [DescribeApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html) 또는 [ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html) 작업을 사용하여 현재 애플리케이션 상태를 확인할 수 있습니다. 서비스가 애플리케이션을 확장하는 동안 사용할 수 있는 유효한 API 작업은 `Force` 파라미터가 `true`로 설정된 [StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)뿐입니다.

`AutoScalingEnabled` 속성([https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html)에 속함)을 사용하여 Auto Scaling 동작을 활성화하거나 비활성화할 수 있습니다. Managed Service for Apache Flink가 애플리케이션 `parallelism` 및 `parallelismPerKPU` 설정의 함수인 프로비저닝하는 KPUs에 대해서는 AWS 계정에 요금이 부과됩니다. 활동이 급증하면 Managed Service for Apache Flink 비용이 증가합니다.

요금에 대한 자세한 내용을 알아보려면 [Amazon Managed Service for Apache Flink 요금](https://aws.amazon.com/kinesis/data-analytics/pricing/)을 참조하세요.

애플리케이션 규모 조정에 대해 다음을 유의하세요.
+ 자동 조정은 기본적으로 활성화되어 있습니다.
+ 조정 기능은 Studio 노트북에는 적용되지 않습니다. 그러나 Studio Notebook을 내구성 상태의 애플리케이션으로 배포하는 경우 배포된 애플리케이션에 조정 작업이 적용됩니다.
+ 애플리케이션의 기본 한도는 64 KPU입니다. 자세한 내용은 [Managed Service for Apache Flink 및 Studio 노트북 할당량](limits.md) 단원을 참조하십시오.
+ 자동 크기 조정이 애플리케이션 병렬성을 업데이트하면 애플리케이션 다운타임이 발생합니다. 이러한 다운타임을 방지하려면 다음을 수행하십시오.
  + 자동 조정 사용 중지
  + `parallelism`[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업을 사용하여 애플리케이션의 및 `parallelismPerKPU`을 구성합니다. 애플리케이션의 병렬 처리 설정에 관한 자세한 내용은 [애플리케이션의 병렬 처리 업데이트](how-scaling.md#how-scaling-howto) 섹션을 참조하세요.
  + 애플리케이션의 리소스 사용량을 주기적으로 모니터링하여 애플리케이션의 워크로드에 대한 병렬성 설정이 올바른지 확인하십시오. 할당 리소스 사용량 모니터링에 대한 자세한 내용을 알아보려면 [Managed Service for Apache Flink의 지표 및 차원](metrics-dimensions.md) 섹션을 참조하십시오.

## 사용자 지정 자동 크기 조정 구현
<a name="how-scaling-custom-autoscaling"></a>

자동 크기 조정을 더 세밀하게 제어하거나 이외의 트리거 지표를 사용하려면 다음 예제를 사용할 `containerCPUUtilization`수 있습니다.
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  이 예제에서는 소스 또는 싱크로 사용되는 Amazon MSK 및 Amazon Kinesis Data Streams의 지표를 포함하여 Apache Flink 애플리케이션과 다른 CloudWatch 지표를 사용하여 Managed Service for Apache Flink 애플리케이션을 규모 조정하는 방법을 보여줍니다.

자세한 내용은 [Apache Flink에 대한 향상된 모니터링 및 자동 규모 조정](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/)을 참조하세요.

## 예약된 자동 크기 조정 구현
<a name="how-scaling-scheduled-autoscaling"></a>

워크로드가 시간 경과에 따라 예측 가능한 프로파일을 따르는 경우 Apache Flink 애플리케이션을 사전에 규모 조정하는 것이 더 적합할 수 있습니다. 이 방식은 지표 기반의 반응형 규모 조정과 달리 예약된 시간에 애플리케이션을 규모 조정합니다. 하루 중 고정된 시간에 스케일 업 및 스케일 다운을 설정하려면 다음 예제를 사용할 수 있습니다.
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## maxParallelism 고려 사항
<a name="how-scaling-auto-max-parallelism"></a>

Flink 작업이 규모 조정할 수 있는 최대 병렬 처리는 해당 작업의 모든 연산자 중 *최소* `maxParallelism` 값에 의해 제한됩니다. 예를 들어 소스와 싱크만 있으며 소스의 `maxParallelism`이 16이고 싱크가 8인 간단한 작업의 경우 애플리케이션은 병렬 처리 8을 초과하여 규모를 조정할 수 없습니다.

연산자의 기본 `maxParallelism`이 계산되는 방식과 기본값을 재정의하는 방식은 Apache Flink 설명서의 [최대 병렬 처리 설정](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism)을 참조하세요.

기본 규칙으로 연산자에 `maxParallelism`을 정의하지 않고 애플리케이션을 병렬 처리 128 이하로 시작하면 모든 연산자의 `maxParallelism`은 128이 됩니다.

**참고**  
작업의 최대 병렬 처리는 상태를 유지하면서 애플리케이션을 조정할 수 있는 병렬 처리의 상한입니다.  
기존 애플리케이션의 `maxParallelism`을 수정하는 경우 이전 `maxParallelism`으로 생성된 스냅샷에서 애플리케이션을 다시 시작할 수 없습니다. 스냅샷 없어야만 애플리케이션 재시작이 가능합니다.  
애플리케이션을 병렬 처리 128보다 크게 규모 조정하려면 애플리케이션에서 `maxParallelism`을 명시적으로 설정해야 합니다.
+ Autoscaling 로직은 작업의 최대 병렬 처리를 초과하는 병렬 처리로 Flink 작업의 규모를 조정하는 것을 방지합니다.
+ 사용자 지정 자동 크기 조정 또는 예약된 규모 조정을 사용하는 경우 작업의 최대 병렬 처리를 초과하지 않도록 구성해야 합니다.
+ 애플리케이션이 최대 병렬 처리를 초과하도록 수동으로 규모 조정하면 애플리케이션이 시작되지 않습니다.

# Managed Service for Apache Flink 애플리케이션에 태그 추가
<a name="how-tagging"></a>



이 섹션에서는 Managed Service for Apache Flink 애플리케이션에 키-값 메타데이터를 추가하는 방법을 설명합니다. 이러한 태그는 다음과 같은 용도로 사용할 수 있습니다:
+ 개별 Managed Service for Apache Flink 애플리케이션에 대한 청구 결정 자세한 내용을 알아보려면 *청구 및 비용 관리 가이드*의 [비용 할당 태그 사용](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html)을 참조하세요.
+ 태그를 기반으로 애플리케이션 리소스에 대한 액세스 통제. 자세한 설명은 *AWS Identity and Access Management 사용자 가이드*의 [태그를 사용한 액세스 통제](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html)를 참조하십시오.
+ 사용자 정의 용도. 사용자 태그의 존재 유무를 기반으로 애플리케이션 기능을 정의할 수 있습니다.

태그 지정에 대한 다음 정보를 참조하십시오.
+ 애플리케이션 태그의 최대 수는 시스템 태그를 포함합니다. 사용자 정의 애플리케이션 태그의 최대 수는 50입니다.
+ 작업에 중복된 `Key` 값이 있는 태그 목록이 포함된 경우 서비스에서 `InvalidArgumentException`가 발생합니다.

**Topics**
+ [애플리케이션이 생성될 때 태그 추가](how-tagging-create.md)
+ [기존 애플리케이션에 대한 태그 추가 또는 업데이트](how-tagging-add.md)
+ [애플리케이션에 대한 태그 나열](how-tagging-list.md)
+ [애플리케이션에서 태그 제거](how-tagging-remove.md)

# 애플리케이션이 생성될 때 태그 추가
<a name="how-tagging-create"></a>

[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 작업의 `tags` 파라미터를 사용하여 애플리케이션을 만들 때 태그를 추가합니다.

다음 예 요청은 `CreateApplication` 요청에 대한 `Tags` 노드를 보여줍니다.

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# 기존 애플리케이션에 대한 태그 추가 또는 업데이트
<a name="how-tagging-add"></a>

[TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html) 작업을 사용하여 애플리케이션에 태그를 추가합니다. [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 작업을 사용해서는 애플리케이션에 태그를 추가할 수 없습니다.

기존 태그를 업데이트하려면, 기존 태그의 동일한 키로 태그를 추가합니다.

`TagResource` 작업을 위한 다음 예 요청은 새 태그를 추가하거나 기존 태그를 업데이트합니다.

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# 애플리케이션에 대한 태그 나열
<a name="how-tagging-list"></a>

기존 태그를 나열하려면 [ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html) 작업을 사용합니다.

`ListTagsForResource` 작업을 위한 다음 예 요청은 애플리케이션에 대한 태그를 나열합니다.

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# 애플리케이션에서 태그 제거
<a name="how-tagging-remove"></a>

애플리케이션에서 태그를 제거하려면 [UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html) 작업을 사용합니다.

`UntagResource` 작업을 위한 다음 예 요청은 애플리케이션에서 태그를 제거합니다.

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# Managed Service for Apache Flink에서 CloudFormation 사용
<a name="lambda-cfn-flink"></a>

다음 연습에서는 동일한 스택에서 Lambda 함수를 CloudFormation 사용하여 로 생성된 Flink 애플리케이션을 시작하는 방법을 보여줍니다.

## 시작하기 전에
<a name="before-you-begin"></a>

이 연습을 시작하기 전에 [AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html) CloudFormation 에서를 사용하여 Flink 애플리케이션을 생성하는 단계를 따르세요.

## Lambda 함수 작성
<a name="write-lambda-function"></a>

생성 또는 업데이트 후 Flink 애플리케이션을 시작하려면 kinesisanalyticsv2 [애플리케이션 시작](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html) API를 사용합니다. 호출은 Flink 애플리케이션 생성 후 CloudFormation 이벤트에 의해 트리거됩니다. 이 연습의 후반부에서 Lambda 함수를 트리거하도록 스택을 설정하는 방법에 대해 설명하겠지만, 먼저 Lambda 함수 선언과 해당 코드를 중점적으로 살펴보겠습니다. 이 예에서는 `Python3.8` 런타임을 사용합니다.

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

위 코드에서 Lambda는 수신 CloudFormation 이벤트를 처리하고, `Create` 및 이외의 모든 항목을 필터링하고`Update`, 애플리케이션 상태를 가져오고, 상태가 인 경우 시작합니다`READY`. 애플리케이션 상태를 가져오려면 다음과 같이 Lambda 역할을 생성해야 합니다.

## Lambda 역할 생성
<a name="create-lambda-role"></a>

Lambda가 애플리케이션과 성공적으로 “통신”하고 로그를 기록할 수 있는 역할을 생성합니다. 이 역할은 기본 관리형 정책을 사용하지만 필요한 경우 사용자 지정 정책을 사용하도록 범위를 좁힐 수 있습니다.

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

Lambda 리소스는 해당 스택에 의존하기 때문에 동일한 스택에서 Flink 애플리케이션을 생성한 후에 생성된다는 점에 유의하세요.

## Lambda 함수 호출
<a name="invoking-lambda-function"></a>

이제 Lambda 함수 호출만 남았습니다. 이 작업은 [사용자 지정 리소스](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html)를 사용하여 수행합니다.

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

다음은 Lambda를 사용하여 Flink 애플리케이션을 시작하는 데 필요한 모든 사항입니다. 이제 자체 스택을 만들거나 아래 전체 예를 사용하여 이러한 모든 단계가 실제로 어떻게 작동하는지 확인할 준비가 되었습니다.

## 확장 예제 검토
<a name="lambda-cfn-flink-full-example"></a>

다음 예제는 이전 단계에 약간의 확장을 더한 버전이며 [템플릿 파라미터](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html)를 통해 추가적인 `RunConfiguration` 조정이 이루어집니다. 시도해 볼 수 있는 작업 스택입니다. 함께 제공된 참고 사항을 반드시 읽어보세요.

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

다시, 애플리케이션 자체뿐만 아니라 Lambda의 역할도 조정하고 싶을 수 있습니다.

위의 스택을 생성하기 전에 파라미터를 지정하는 것을 잊지 마세요.

parameters.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

`YOUR_BUCKET_ARN` 및 `YOUR_JAR`을 특정 요건으로 바꾸세요. 이 [가이드](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html)에 따라 Amazon S3 버킷과 애플리케이션 jar를 생성할 수 있습니다.

이제 스택을 생성합니다(YOUR\$1REGION을 원하는 지역(예: us-east-1)으로 대체).

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

이제 [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation)으로 이동하여 진행 상황을 확인할 수 있습니다. 생성한 후에는 Flink 애플리케이션이 현재 `Starting` 상태인 것을 확인할 수 있습니다. `Running`을 시작하려면 몇 분이 걸릴 수 있습니다.

자세한 정보는 을(를) 참조하세요.
+ [AWS CloudFormation을 사용하여 AWS 서비스 속성을 검색하는 네 가지 방법(1/3부)](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/)
+ [연습: Amazon Machine Image ID 조회](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html)

# Managed Service for Apache Flink에서 Apache Flink 대시보드 사용
<a name="how-dashboard"></a>

애플리케이션의 Apache Flink 대시보드를 사용하여 Managed Service for Apache Flink 애플리케이션의 상태를 모니터링할 수 있습니다. 애플리케이션 대시보드에 다음 정보가 표시됩니다.
+ 사용 중인 리소스(작업 관리자 및 작업 슬롯 포함) 
+ 실행 중인 작업, 완료한 작업, 취소된 작업, 실패한 작업을 비롯한 작업에 대한 정보 

Apache Flink 작업 관리자, 작업 슬롯 및 작업에 대한 자세한 내용을 알아보려면 Apache Flink 웹 사이트의 [Apache Flink 아키텍처](https://flink.apache.org/what-is-flink/flink-architecture/)를 참조하세요.

Managed Service for Apache Flink 애플리케이션과 Apache Flink 대시보드를 사용하는 방법에 대해서는 다음을 참고하세요.
+ Managed Service for Apache Flink 애플리케이션용 Apache Flink 대시보드는 읽기 전용입니다. Apache Flink 대시보드를 사용하여 Managed Service for Apache Flink 애플리케이션을 변경할 수 없습니다.
+ Apache Flink 대시보드는 Microsoft Internet Explorer와 병립될 수 없습니다.

## 애플리케이션의 Apache Flink 대시보드에 액세스
<a name="how-dashboard-accessing"></a>

Managed Service for Apache Flink 콘솔을 통해 또는 CLI를 사용하여 보안 URL 엔드포인트를 요청하여 애플리케이션의 Apache Flink 대시보드에 액세스할 수 있습니다.

### Managed Service for Apache Flink 콘솔을 사용하여 애플리케이션의 Apache Flink 대시보드에 액세스
<a name="how-dashboard-accessing-console"></a>

콘솔에서 애플리케이션의 Apache Flink 대시보드에 액세스하려면 애플리케이션 페이지에서 **Apache Flink 대시보드**를 선택합니다.

**참고**  
Managed Service for Apache Flink 콘솔에서 대시보드를 열면 콘솔이 생성하는 URL이 12시간 동안 유효합니다.

### Managed Service for Apache Flink CLI를 사용하여 애플리케이션의 Apache Flink 대시보드에 액세스
<a name="how-dashboard-accessing-cli"></a>

Managed Service for Apache Flink CLI를 사용하여 애플리케이션 대시보드에 액세스하는 URL을 생성할 수 있습니다. 생성한 URL은 지정된 시간 동안 유효합니다.

**참고**  
생성된 URL에 3분 이내에 액세스하지 않으면 더 이상 유효하지 않습니다.

[CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html) 작업을 사용하여 대시보드 URL을 생성합니다. 작업에 대해 다음 파라미터를 지정할 수 있습니다.
+ 애플리케이션 명칭
+ URL이 유효한 시간(단위: 초)
+ `FLINK_DASHBOARD_URL`을 URL 유형으로 지정합니다.