

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Apache Flink에서 Apache Hudi 사용
<a name="tutorial-hudi-for-flink"></a>

Apache Hudi는 삽입, 업데이트, 업서트 및 삭제와 같은 레코드 수준 작업을 통해 데이터 관리 및 데이터 파이프라인 개발을 간소화하는 데 사용할 수 있는 오픈 소스 데이터 관리 프레임워크입니다. Amazon S3에서 효율적인 데이터 관리와 결합하여 Hudi는 데이터를 실시간으로 수집 및 업데이트할 수 있도록 합니다. Hudi는 데이터세트에서 실행하는 모든 작업의 메타데이터를 유지 관리하므로 모든 작업은 원자적이고 일관되게 유지됩니다.

Apache Hudi는 Amazon EMR 릴리스 7.2.0 이상의 Apache Flink와 함께 Amazon EMR on EKS에서 사용할 수 있습니다. Apache Hudi 작업을 시작하고 제출하는 방법을 알아보려면 다음 단계를 참조하세요.

## Apache Hudi 작업 제출
<a name="tutorial-hudi-for-flink-submit-jobs"></a>

Apache Hudi 작업을 제출하는 방법을 알아보려면 다음 단계를 참조하세요.

1.  AWS 이름이 인 Glue 데이터베이스를 생성합니다`default`.

   ```
   aws glue create-database --database-input "{\"Name\":\"default\"}"
   ```

1. [Flink Kubernetes Operator SQL Example](https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example)에 따라 `flink-sql-runner.jar` 파일을 빌드합니다.

1. 다음과 같은 Hudi SQL 스크립트를 생성합니다.

   ```
   CREATE CATALOG hudi_glue_catalog WITH (
   'type' = 'hudi',
   'mode' = 'hms',
   'table.external' = 'true',
   'default-database' = 'default',
   'hive.conf.dir' = '/glue/confs/hive/conf/',
   'catalog.path' = 's3://<hudi-example-bucket>/FLINK_HUDI/warehouse/'
   );
   
   USE CATALOG hudi_glue_catalog;
   CREATE DATABASE IF NOT EXISTS hudi_db;
   use hudi_db;
   
   CREATE TABLE IF NOT EXISTS hudi-flink-example-table(
       uuid VARCHAR(20),
       name VARCHAR(10),
       age INT,
       ts TIMESTAMP(3),
       `partition` VARCHAR(20)
   )
   PARTITIONED BY (`partition`)
   WITH (
     'connector' = 'hudi',
     'path' = 's3://<hudi-example-bucket>/hudi-flink-example-table',
     'hive_sync.enable' = 'true',
     'hive_sync.mode' = 'glue',
     'hive_sync.table' = 'hudi-flink-example-table',
     'hive_sync.db' = 'hudi_db',
     'compaction.delta_commits' = '1',
     'hive_sync.partition_fields' = 'partition',
     'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
     'table.type' = 'COPY_ON_WRITE'
   );
   
   EXECUTE STATEMENT SET
   BEGIN
   
   INSERT INTO hudi-flink-example-table VALUES
       ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
       ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
       ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
       ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
       ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
       ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
       ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
       ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
   
   END;
   ```

1. Hudi SQL 스크립트 및 `flink-sql-runner.jar` 파일을 S3 위치에 업로드합니다.

1. `FlinkDeployments` YAML 파일에서 `hudi.enabled`를 `true`로 설정합니다.

   ```
   spec:
     flinkConfiguration:
       hudi.enabled: "true"
   ```

1. YAML 파일을 생성하여 구성을 실행합니다. 이 예제 파일 이름은 `hudi-write.yaml`입니다.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: hudi-write-example
   spec:
     flinkVersion: v1_18
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       hudi.enabled: "true"
     executionRoleArn: "<JobExecutionRole>"
     emrReleaseLabel: "emr-7.12.0-flink-latest"
     jobManager:
       highAvailabilityEnabled: false
       replicas: 1
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar
       args: ["/opt/flink/scripts/hudi-write.sql"]
       parallelism: 1
       upgradeMode: stateless
     podTemplate:
       spec:
         initContainers:
           - name: flink-sql-script-download
             args: 
               - s3
               - cp
               - s3://<s3_location>/hudi-write.sql
               - /flink-scripts
             image: amazon/aws-cli:latest
             imagePullPolicy: Always
             resources: {}
             terminationMessagePath: /dev/termination-log
             terminationMessagePolicy: File
             volumeMounts:
               - mountPath: /flink-scripts
                 name: flink-scripts
           - name: flink-sql-runner-download
             args: 
               - s3
               - cp
               - s3://<s3_location>/flink-sql-runner.jar
               - /flink-artifacts
             image: amazon/aws-cli:latest
             imagePullPolicy: Always
             resources: {}
             terminationMessagePath: /dev/termination-log
             terminationMessagePolicy: File
             volumeMounts:
               - mountPath: /flink-artifacts
                 name: flink-artifact
         containers:
           - name: flink-main-container
             volumeMounts:
               - mountPath: /opt/flink/scripts
                 name: flink-scripts
               - mountPath: /opt/flink/usrlib
                 name: flink-artifact
         volumes:
           - emptyDir: {}
             name: flink-scripts
           - emptyDir: {}
             name: flink-artifact
   ```

1. [Flink Kubernetes 연산자](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator.html)에 Flink Hudi 작업을 제출합니다.

   ```
   kubectl apply -f hudi-write.yaml
   ```