

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan Apache Hudi dengan Apache Flink
<a name="tutorial-hudi-for-flink"></a>

Apache Hudi adalah kerangka kerja manajemen data sumber terbuka dengan operasi tingkat rekaman seperti menyisipkan, memperbarui, meningkatkan, dan menghapus yang dapat Anda gunakan untuk menyederhanakan manajemen data dan pengembangan pipa data. Ketika dikombinasikan dengan manajemen data yang efisien di Amazon S3, Hudi memungkinkan Anda menelan dan memperbarui data secara real time. Hudi mempertahankan metadata dari semua operasi yang Anda jalankan pada dataset, sehingga semua tindakan tetap atom dan konsisten. 

Apache Hudi tersedia di Amazon EMR di EKS dengan Apache Flink dengan Amazon EMR rilis 7.2.0 dan lebih tinggi. Lihat langkah-langkah berikut untuk mempelajari cara memulai dan mengirimkan pekerjaan Apache Hudi.

## Kirim pekerjaan Apache Hudi
<a name="tutorial-hudi-for-flink-submit-jobs"></a>

Lihat langkah-langkah berikut untuk mempelajari cara mengirimkan pekerjaan Apache Hudi.

1. Buat database AWS Glue bernama`default`.

   ```
   aws glue create-database --database-input "{\"Name\":\"default\"}"
   ```

1. Ikuti [Contoh SQL Operator Flink Kubernetes](https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example) untuk membangun file. `flink-sql-runner.jar`

1. Buat skrip Hudi SQL seperti berikut ini.

   ```
   CREATE CATALOG hudi_glue_catalog WITH (
   'type' = 'hudi',
   'mode' = 'hms',
   'table.external' = 'true',
   'default-database' = 'default',
   'hive.conf.dir' = '/glue/confs/hive/conf/',
   'catalog.path' = 's3://{{<hudi-example-bucket>}}/FLINK_HUDI/warehouse/'
   );
   
   USE CATALOG hudi_glue_catalog;
   CREATE DATABASE IF NOT EXISTS hudi_db;
   use hudi_db;
   
   CREATE TABLE IF NOT EXISTS hudi-flink-example-table(
       uuid VARCHAR(20),
       name VARCHAR(10),
       age INT,
       ts TIMESTAMP(3),
       `partition` VARCHAR(20)
   )
   PARTITIONED BY (`partition`)
   WITH (
     'connector' = 'hudi',
     'path' = 's3://{{<hudi-example-bucket>}}/hudi-flink-example-table',
     'hive_sync.enable' = 'true',
     'hive_sync.mode' = 'glue',
     'hive_sync.table' = 'hudi-flink-example-table',
     'hive_sync.db' = 'hudi_db',
     'compaction.delta_commits' = '1',
     'hive_sync.partition_fields' = 'partition',
     'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
     'table.type' = 'COPY_ON_WRITE'
   );
   
   EXECUTE STATEMENT SET
   BEGIN
   
   INSERT INTO hudi-flink-example-table VALUES
       ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
       ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
       ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
       ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
       ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
       ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
       ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
       ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
   
   END;
   ```

1. Unggah skrip Hudi SQL Anda dan `flink-sql-runner.jar` file ke lokasi S3.

1. Di file `FlinkDeployments` YAMAL Anda, setel `hudi.enabled` ke`true`.

   ```
   spec:
     flinkConfiguration:
       hudi.enabled: "true"
   ```

1. Buat file YAMAL untuk menjalankan konfigurasi Anda. File contoh ini diberi nama`hudi-write.yaml`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: hudi-write-example
   spec:
     flinkVersion: v1_18
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       hudi.enabled: "true"
     executionRoleArn: "{{<JobExecutionRole>}}"
     emrReleaseLabel: "emr-7.13.0-flink-latest"
     jobManager:
       highAvailabilityEnabled: false
       replicas: 1
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar
       args: ["/opt/flink/scripts/hudi-write.sql"]
       parallelism: 1
       upgradeMode: stateless
     podTemplate:
       spec:
         initContainers:
           - name: flink-sql-script-download
             args: 
               - s3
               - cp
               - s3://{{<s3_location>}}/hudi-write.sql
               - /flink-scripts
             image: amazon/aws-cli:latest
             imagePullPolicy: Always
             resources: {}
             terminationMessagePath: /dev/termination-log
             terminationMessagePolicy: File
             volumeMounts:
               - mountPath: /flink-scripts
                 name: flink-scripts
           - name: flink-sql-runner-download
             args: 
               - s3
               - cp
               - s3://{{<s3_location>}}/flink-sql-runner.jar
               - /flink-artifacts
             image: amazon/aws-cli:latest
             imagePullPolicy: Always
             resources: {}
             terminationMessagePath: /dev/termination-log
             terminationMessagePolicy: File
             volumeMounts:
               - mountPath: /flink-artifacts
                 name: flink-artifact
         containers:
           - name: flink-main-container
             volumeMounts:
               - mountPath: /opt/flink/scripts
                 name: flink-scripts
               - mountPath: /opt/flink/usrlib
                 name: flink-artifact
         volumes:
           - emptyDir: {}
             name: flink-scripts
           - emptyDir: {}
             name: flink-artifact
   ```

1. Kirim pekerjaan Flink Hudi ke operator [Flink Kubernetes](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator.html).

   ```
   kubectl apply -f hudi-write.yaml
   ```