

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan AWS Glue dengan Flink
<a name="glue-for-flink"></a>

Amazon EMR di EKS dengan Apache Flink merilis 6.15.0 dan dukungan yang lebih tinggi menggunakan Katalog Data AWS Glue sebagai penyimpanan metadata untuk streaming dan alur kerja SQL batch.

Anda harus terlebih dahulu membuat database AWS Glue bernama `default` yang berfungsi sebagai Katalog SQL Flink Anda. Katalog Flink ini menyimpan metadata seperti database, tabel, parisi, tampilan, fungsi, dan informasi lain yang diperlukan untuk mengakses data di sistem eksternal lainnya.

```
aws glue create-database \
    --database-input "{\"Name\":\"default\"}"
```

Untuk mengaktifkan dukungan AWS Glue, gunakan `FlinkDeployment` spesifikasi. Contoh spesifikasi ini menggunakan skrip Python untuk dengan cepat mengeluarkan beberapa pernyataan SQL Flink untuk berinteraksi dengan katalog Glue. AWS 

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    aws.glue.enabled: "true"
  executionRoleArn: {{job-execution-role-arn}};
  emrReleaseLabel: "emr-6.15.0-flink-latest"
  jobManager:
    highAvailabilityEnabled: false
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: s3://<{{S3_bucket_with_your_script}}/{{pyflink-glue-script.py}}
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-py", "/opt/flink/usrlib/{{pyflink-glue-script.py}}"] 
    parallelism: 1
    upgradeMode: stateless
```

Berikut ini adalah contoh dari apa PyFlink script Anda mungkin terlihat seperti.

```
import logging
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

def glue_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    t_env.execute_sql("""
          CREATE CATALOG glue_catalog WITH (
          'type' = 'hive',
          'default-database' = 'default',
          'hive-conf-dir' = '/glue/confs/hive/conf',
          'hadoop-conf-dir' = '/glue/confs/hadoop/conf'
          )
                      """)
    t_env.execute_sql("""
          USE CATALOG glue_catalog;
                      """)
    t_env.execute_sql("""
          DROP DATABASE IF EXISTS eks_flink_db CASCADE;
                      """)
    t_env.execute_sql("""
          CREATE DATABASE IF NOT EXISTS eks_flink_db WITH ('hive.database.location-uri'= 's3a://{{S3-bucket-to-store-metadata}}/flink/flink-glue-for-hive/warehouse/');
                      """)
    t_env.execute_sql("""
          USE eks_flink_db;
                  """)
    t_env.execute_sql("""
          CREATE TABLE IF NOT EXISTS eksglueorders (
            order_number BIGINT,
            price        DECIMAL(32,2),
            buyer        RO {{first_name STRING, last_name STRING}},
            order_time   TIMESTAMP(3)
          ) WITH (
            'connector' = 'datagen'
          );
                      """)
    t_env.execute_sql("""
          CREATE TABLE IF NOT EXISTS eksdestglueorders (
            order_number BIGINT,
            price        DECIMAL(32,2),
            buyer        ROW {{first_name STRING, last_name STRING}},
            order_time   TIMESTAMP(3)
          ) WITH (
            'connector' = 'filesystem',
            'path' = 's3://{{S3-bucket-to-store-metadata}}/flink/flink-glue-for-hive/warehouse/eksdestglueorders',
            'format' = 'json'
          );
                  """)
    t_env.execute_sql("""
          CREATE TABLE IF NOT EXISTS print_table (
            order_number BIGINT,
            price        DECIMAL(32,2),
            buyer        ROW {{first_name STRING, last_name STRING}},
            order_time   TIMESTAMP(3)
          ) WITH (
            'connector' = 'print'
          );
                """)
    t_env.execute_sql("""
          EXECUTE STATEMENT SET
          BEGIN
          INSERT INTO eksdestglueorders SELECT * FROM  eksglueorders LIMIT 10;
          INSERT INTO print_table SELECT * FROM eksdestglueorders;
          END;
            """)


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    glue_demo()
```