

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Membuat dan menjalankan Managed Service untuk aplikasi Apache Flink
<a name="gs-table-create"></a>

Dalam latihan ini, Anda membuat Layanan Terkelola untuk aplikasi Apache Flink dengan aliran data Kinesis sebagai sumber dan sink.

**Topics**
+ [Buat sumber daya yang bergantung](#gs-table-resources)
+ [Siapkan lingkungan pengembangan lokal Anda](#gs-table-2)
+ [Unduh dan periksa kode Java streaming Apache Flink](#gs-table-5)
+ [Jalankan aplikasi Anda secara lokal](#gs-table-run-locally)
+ [Amati data penulisan aplikasi ke bucket S3](#gs-table-input-output)
+ [Menghentikan aplikasi Anda berjalan secara lokal](#gs-table-stop)
+ [Kompilasi dan paket kode aplikasi Anda](#gs-table-5.5)
+ [Unggah file JAR kode aplikasi](#gs-table-6)
+ [Buat dan konfigurasikan Layanan Terkelola untuk aplikasi Apache Flink](#gs-table-7)

## Buat sumber daya yang bergantung
<a name="gs-table-resources"></a>

Sebelum Anda membuat Layanan Terkelola untuk Apache Flink untuk latihan ini, Anda membuat sumber daya dependen berikut: 
+ Bucket Amazon S3 untuk menyimpan kode aplikasi dan menulis output aplikasi.
**catatan**  
Tutorial ini mengasumsikan bahwa Anda menerapkan aplikasi Anda di Wilayah us-east-1. Jika Anda menggunakan Wilayah lain, Anda harus menyesuaikan semua langkah yang sesuai.

### Buat bucket Amazon S3.
<a name="gs-table-resources-s3"></a>

Anda dapat membuat bucket Amazon S3 menggunakan konsol. Untuk petunjuk pembuatan sumber daya ini, lihat topik berikut:
+ [Bagaimana Cara Membuat Bucket S3?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) di *Panduan Pengguna Layanan Penyimpanan Sederhana Amazon*. Berikan bucket Amazon S3 nama unik secara global dengan menambahkan nama login Anda.
**catatan**  
Pastikan Anda membuat bucket di Region yang Anda gunakan untuk tutorial ini. Default untuk tutorial ini adalah us-east-1.

### Sumber daya lainnya
<a name="gs-table-resources-cw"></a>

Saat Anda membuat aplikasi, Managed Service for Apache Flink akan membuat CloudWatch resource Amazon berikut jika belum ada:
+ Grup log yang disebut `/AWS/KinesisAnalytics-java/<my-application>`.
+ Aliran log yang disebut `kinesis-analytics-log-stream`.

## Siapkan lingkungan pengembangan lokal Anda
<a name="gs-table-2"></a>

Untuk pengembangan dan debugging, Anda dapat menjalankan aplikasi Apache Flink di mesin Anda, langsung dari IDE pilihan Anda. Setiap dependensi Apache Flink ditangani sebagai dependensi Java normal menggunakan Maven.

**catatan**  
Pada mesin pengembangan Anda, Anda harus menginstal Java JDK 11, Maven, dan Git. [Kami menyarankan Anda menggunakan lingkungan pengembangan seperti [Eclipse Java Neon atau](https://www.eclipse.org/downloads/packages/release/neon/3) IntelliJ IDEA.](https://www.jetbrains.com/idea/) Untuk memverifikasi bahwa Anda memenuhi semua prasyarat, lihat. [Memenuhi prasyarat untuk menyelesaikan latihan](getting-started.md#setting-up-prerequisites) Anda **tidak** perlu menginstal cluster Apache Flink di mesin Anda. 

### Otentikasi sesi Anda AWS
<a name="get-started-exercise-table-authenticate"></a>

Aplikasi ini menggunakan aliran data Kinesis untuk mempublikasikan data. Saat berjalan secara lokal, Anda harus memiliki sesi AWS otentikasi yang valid dengan izin untuk menulis ke aliran data Kinesis. Gunakan langkah-langkah berikut untuk mengautentikasi sesi Anda:

1. Jika Anda tidak memiliki AWS CLI dan profil bernama dengan kredensi valid yang dikonfigurasi, lihat[Mengatur AWS Command Line Interface (AWS CLI)](setup-awscli.md).

1. Jika IDE Anda memiliki plugin untuk diintegrasikan AWS, Anda dapat menggunakannya untuk meneruskan kredensil ke aplikasi yang berjalan di IDE. Untuk informasi selengkapnya, lihat [AWS Toolkit untuk IntelliJ](https://aws.amazon.com/intellij/) IDEA [AWS dan Toolkit untuk mengkompilasi](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html) aplikasi atau menjalankan Eclipse.

## Unduh dan periksa kode Java streaming Apache Flink
<a name="gs-table-5"></a>

Kode aplikasi untuk contoh ini tersedia dari GitHub. 

**Untuk mengunduh kode aplikasi Java**

1. Kloning repositori jarak jauh menggunakan perintah berikut:

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. Buka direktori `./java/GettingStartedTable` tersebut.

### Tinjau komponen aplikasi
<a name="get-started-exercise-table-components"></a>

Aplikasi ini sepenuhnya diimplementasikan di `com.amazonaws.services.msf.BasicTableJob` kelas. `main()`Metode ini mendefinisikan sumber, transformasi, dan sink. Eksekusi diprakarsai oleh pernyataan eksekusi di akhir metode ini.

**catatan**  
Untuk pengalaman pengembang yang optimal, aplikasi ini dirancang untuk berjalan tanpa perubahan kode apa pun baik di Amazon Managed Service untuk Apache Flink maupun secara lokal, untuk pengembangan di IDE Anda.
+ Untuk membaca konfigurasi runtime sehingga akan berfungsi saat berjalan di Amazon Managed Service untuk Apache Flink dan di IDE Anda, aplikasi secara otomatis mendeteksi apakah itu berjalan mandiri secara lokal di IDE. Dalam hal ini, aplikasi memuat konfigurasi runtime secara berbeda:

  1. Saat aplikasi mendeteksi bahwa aplikasi berjalan dalam mode mandiri di IDE Anda, bentuk `application_properties.json` file yang disertakan dalam folder **sumber daya** proyek. Isi file berikut.

  1. Saat aplikasi berjalan di Amazon Managed Service untuk Apache Flink, perilaku default memuat konfigurasi aplikasi dari properti runtime yang akan Anda tentukan di Amazon Managed Service untuk aplikasi Apache Flink. Lihat [Buat dan konfigurasikan Layanan Terkelola untuk aplikasi Apache Flink](get-started-exercise.md#get-started-exercise-7).

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()`Metode ini mendefinisikan aliran data aplikasi dan menjalankannya. 
  + Menginisialisasi lingkungan streaming default. Dalam contoh ini, kami menunjukkan cara membuat kedua `StreamExecutionEnvironment` untuk digunakan dengan DataStream API, dan `StreamTableEnvironment` untuk menggunakan dengan SQL dan Table API. Dua objek lingkungan adalah dua referensi terpisah ke lingkungan runtime yang sama, untuk menggunakan API yang berbeda.

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    ```
  + Muat parameter konfigurasi aplikasi. Ini akan secara otomatis memuatnya dari tempat yang benar, tergantung di mana aplikasi berjalan:

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + [Konektor FileSystem wastafel](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink) [yang digunakan aplikasi untuk menulis hasil ke file output Amazon S3 saat Flink menyelesaikan pos pemeriksaan.](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/stateful-stream-processing/#checkpointing) Anda harus mengaktifkan pos pemeriksaan untuk menulis file ke tujuan. Saat aplikasi berjalan di Amazon Managed Service untuk Apache Flink, konfigurasi aplikasi mengontrol pos pemeriksaan dan mengaktifkannya secara default. Sebaliknya, saat berjalan secara lokal, pos pemeriksaan dinonaktifkan secara default. Aplikasi mendeteksi bahwa itu berjalan secara lokal dan mengonfigurasi pos pemeriksaan setiap 5.000 ms. 

    ```
     if (env instanceof LocalStreamEnvironment) {
        env.enableCheckpointing(5000);
     }
    ```
  + Aplikasi ini tidak menerima data dari sumber eksternal yang sebenarnya. Ini menghasilkan data acak untuk diproses melalui [DataGen konektor](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/). Konektor ini tersedia untuk DataStream API, SQL, dan Table API. Untuk mendemonstrasikan integrasi antar API, aplikasi menggunakan versi DataStram API karena memberikan lebih banyak fleksibilitas. Setiap catatan dihasilkan oleh fungsi generator yang disebut `StockPriceGeneratorFunction` dalam kasus ini, di mana Anda dapat menempatkan logika khusus. 

    ```
    DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>(
            new StockPriceGeneratorFunction(),
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(recordPerSecond),
            TypeInformation.of(StockPrice.class));
    ```
  + Di DataStream API, catatan dapat memiliki kelas khusus. Kelas harus mengikuti aturan tertentu sehingga Flink dapat menggunakannya sebagai catatan. Untuk informasi selengkapnya, lihat [Tipe Data yang Didukung](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#supported-data-types). Dalam contoh ini, `StockPrice` kelasnya adalah [POJO](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos). 
  + Sumber kemudian dilampirkan ke lingkungan eksekusi, menghasilkan a `DataStream` dari`StockPrice`. Aplikasi ini tidak menggunakan [semantik event-time](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/#notions-of-time-event-time-and-processing-time) dan tidak menghasilkan watermark. Jalankan DataGenerator sumber dengan paralelisme 1, terlepas dari paralelisme aplikasi lainnya.

    ```
    DataStream<StockPrice> stockPrices = env.fromSource(
            source, 
            WatermarkStrategy.noWatermarks(),
            "data-generator"
        ).setParallelism(1);
    ```
  + Apa yang berikut dalam aliran pemrosesan data didefinisikan menggunakan Tabel API dan SQL. Untuk melakukannya, kami mengubah DataStream dari StockPrices menjadi tabel. Skema tabel secara otomatis disimpulkan dari kelas. `StockPrice` 

    ```
    Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    ```
  + Cuplikan kode berikut menunjukkan cara mendefinisikan tampilan dan kueri menggunakan API Tabel terprogram:

    ```
    Table filteredStockPricesTable = stockPricesTable.
            select(
                    $("eventTime").as("event_time"),
                    $("ticker"),
                    $("price"),
                    dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"),
                    dateFormat($("eventTime"), "HH").as("hr")
            ).where($("price").isGreater(50));
            
    tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    ```
  + Tabel wastafel didefinisikan untuk menulis hasil ke bucket Amazon S3 sebagai file JSON. Untuk mengilustrasikan perbedaan dengan mendefinisikan tampilan secara terprogram, dengan Table API tabel sink didefinisikan menggunakan SQL.

    ```
    tableEnv.executeSql("CREATE TABLE s3_sink (" +
                "eventTime TIMESTAMP(3)," +
                "ticker STRING," +
                "price DOUBLE," +
                "dt STRING," +
                "hr STRING" +
            ") PARTITIONED BY ( dt, hr ) WITH (" +
            "'connector' = 'filesystem'," +
            "'fmat' = 'json'," +
            "'path' = 's3a://"  + s3Path + "'" +
            ")");
    ```
  + Langkah terakhir dari ini adalah memasukkan tampilan harga saham `executeInsert()` yang disaring ke dalam tabel wastafel. Metode ini memulai eksekusi aliran data yang telah kami definisikan sejauh ini. 

    ```
    filteredStockPricesTable.executeInsert("s3_sink");
    ```

### Gunakan file pom.xml
<a name="get-started-exercise-5-2"></a>

File pom.xml mendefinisikan semua dependensi yang diperlukan oleh aplikasi dan menyiapkan plugin Maven Shade untuk membangun toples lemak yang berisi semua dependensi yang diperlukan oleh Flink. 
+ Beberapa dependensi memiliki `provided` ruang lingkup. Dependensi ini secara otomatis tersedia saat aplikasi berjalan di Amazon Managed Service untuk Apache Flink. Mereka diperlukan untuk aplikasi atau aplikasi secara lokal di IDE Anda. Untuk informasi selengkapnya, lihat (perbarui ke TableAPI)[Jalankan aplikasi Anda secara lokal](get-started-exercise.md#get-started-exercise-5-run). Pastikan Anda menggunakan versi Flink yang sama dengan runtime yang akan Anda gunakan di Amazon Managed Service untuk Apache Flink. Untuk menggunakan TableAPI dan SQL, Anda harus menyertakan `flink-table-planner-loader` dan`flink-table-runtime-dependencies`, keduanya dengan `provided` cakupan. 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-loader</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Anda harus menambahkan dependensi Apache Flink tambahan ke pom dengan cakupan default. Misalnya, [DataGen konektor, konektor FileSystem ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/) [SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/), dan format [JSON](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/json/). 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
  </dependency>
  ```
+ Untuk menulis ke Amazon S3 saat berjalan secara lokal, Sistem File Hadoop S3 juga disertakan dengan cakupan. `provided`

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-s3-fs-hadoop</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Plugin Maven Java Compiler memastikan bahwa kode dikompilasi terhadap Java 11, versi JDK yang saat ini didukung oleh Apache Flink. 
+ Plugin Maven Shade mengemas toples lemak, tidak termasuk beberapa pustaka yang disediakan oleh runtime. Ini juga menentukan dua transformer: dan. `ServicesResourceTransformer` `ManifestResourceTransformer` Yang terakhir mengkonfigurasi kelas yang berisi `main` metode untuk memulai aplikasi. Jika Anda mengganti nama kelas utama, jangan lupa perbarui transformator ini.
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## Jalankan aplikasi Anda secara lokal
<a name="gs-table-run-locally"></a>

Anda dapat menjalankan dan men-debug aplikasi Flink Anda secara lokal di IDE Anda.

**catatan**  
Sebelum Anda melanjutkan, verifikasi bahwa aliran input dan output tersedia. Lihat [Buat dua aliran data Amazon Kinesis](get-started-exercise.md#get-started-exercise-1). Juga, verifikasi bahwa Anda memiliki izin untuk membaca dan menulis dari kedua aliran. Lihat [Otentikasi sesi Anda AWS](get-started-exercise.md#get-started-exercise-2-5).   
Menyiapkan lingkungan pengembangan lokal membutuhkan Java 11 JDK, Apache Maven, dan IDE untuk pengembangan Java. Pastikan Anda memenuhi prasyarat yang diperlukan. Lihat [Memenuhi prasyarat untuk menyelesaikan latihan](getting-started.md#setting-up-prerequisites).

### Impor proyek Java ke IDE Anda
<a name="gs-table-import"></a>

Untuk mulai mengerjakan aplikasi di IDE Anda, Anda harus mengimpornya sebagai proyek Java. 

Repositori yang Anda kloning berisi beberapa contoh. Setiap contoh adalah proyek terpisah. Untuk tutorial ini, impor konten dalam `./jave/GettingStartedTable` subdirektori ke IDE Anda. 

Masukkan kode sebagai proyek Java yang ada menggunakan Maven.

**catatan**  
Proses yang tepat untuk mengimpor proyek Java baru bervariasi tergantung pada IDE yang Anda gunakan.

### Ubah konfigurasi aplikasi lokal
<a name="gs-table-modify-2"></a>

Saat berjalan secara lokal, aplikasi menggunakan konfigurasi dalam `application_properties.json` file di folder sumber daya proyek di bawah`./src/main/resources`. Untuk aplikasi tutorial ini, parameter konfigurasi adalah nama bucket dan jalur di mana data akan ditulis.

Edit konfigurasi dan ubah nama bucket Amazon S3 agar sesuai dengan bucket yang Anda buat di awal tutorial ini.

```
[
 {
 "PropertyGroupId": "bucket",
 "PropertyMap": {
 "name": "{{<bucket-name>}}",
 "path": "output"
 }
 }
]
```

**catatan**  
Properti konfigurasi `name` harus berisi hanya nama bucket, misalnya`my-bucket-name`. Jangan sertakan awalan apa pun seperti `s3://` atau garis miring.  
Jika Anda memodifikasi jalur, hilangkan garis miring di depan atau belakang.

### Siapkan konfigurasi run IDE Anda
<a name="gs-table-setupIDE"></a>

Anda dapat menjalankan dan men-debug aplikasi Flink dari IDE Anda secara langsung dengan menjalankan kelas utama`com.amazonaws.services.msf.BasicTableJob`, karena Anda akan menjalankan aplikasi Java apa pun. Sebelum menjalankan aplikasi, Anda harus mengatur konfigurasi Run. Pengaturan tergantung pada IDE yang Anda gunakan. Misalnya, lihat [Run/debug konfigurasi dalam dokumentasi](https://www.jetbrains.com/help/idea/run-debug-configuration.html) IntelliJ IDEA. Secara khusus, Anda harus mengatur yang berikut:

1. **Tambahkan `provided` dependensi ke classpath.** Ini diperlukan untuk memastikan bahwa dependensi dengan `provided` cakupan diteruskan ke aplikasi saat berjalan secara lokal. Tanpa pengaturan ini, aplikasi segera menampilkan `class not found` kesalahan. 

1. **Lulus AWS kredensi untuk mengakses aliran Kinesis** ke aplikasi. Cara tercepat adalah dengan menggunakan [AWS Toolkit untuk IntelliJ IDEA](https://aws.amazon.com/intellij/). Menggunakan plugin IDE ini dalam konfigurasi Run, Anda dapat memilih AWS profil tertentu. AWS otentikasi terjadi menggunakan profil ini. Anda tidak perlu memberikan AWS kredensil secara langsung. 

1. Verifikasi bahwa IDE menjalankan aplikasi menggunakan **JDK 11**.

### Jalankan aplikasi di IDE Anda
<a name="gs-table-runIDE"></a>

Setelah Anda mengatur konfigurasi Run untuk`BasicTableJob`, Anda dapat menjalankan atau men-debug seperti aplikasi Java biasa. 

**catatan**  
Anda tidak dapat menjalankan toples lemak yang dihasilkan oleh Maven langsung dengan `java -jar ...` dari baris perintah. Toples ini tidak berisi dependensi inti Flink yang diperlukan untuk menjalankan aplikasi mandiri.

Ketika aplikasi dimulai dengan sukses, ia mencatat beberapa informasi tentang minicluster mandiri dan inisialisasi konektor. Ini diikuti oleh sejumlah INFO dan beberapa log WARN yang biasanya dipancarkan Flink saat aplikasi dimulai.

```
21:28:34,982 INFO  com.amazonaws.services.msf.BasicTableJob                     
                [] - Loading application properties from 'flink-application-properties-dev.json'
21:28:35,149 INFO  com.amazonaws.services.msf.BasicTableJob                     
[] - s3Path is ExampleBucket/my-output-bucket
...
```

Setelah inisialisasi selesai, aplikasi tidak memancarkan entri log lebih lanjut. **Saat data mengalir, tidak ada log yang dipancarkan.**

Untuk memverifikasi apakah aplikasi memproses data dengan benar, Anda dapat memeriksa konten bucket keluaran, seperti yang dijelaskan di bagian berikut.

**catatan**  
 Tidak memancarkan log tentang data yang mengalir adalah perilaku normal untuk aplikasi Flink. Memancarkan log pada setiap catatan mungkin nyaman untuk debugging, tetapi dapat menambahkan overhead yang cukup besar saat berjalan dalam produksi. 

## Amati data penulisan aplikasi ke bucket S3
<a name="gs-table-input-output"></a>

Aplikasi contoh ini menghasilkan data acak secara internal dan menulis data ini ke bucket S3 tujuan yang Anda konfigurasikan. Kecuali Anda memodifikasi jalur konfigurasi default, data akan ditulis ke `output` jalur diikuti oleh partisi data dan jam, dalam format. `./output/<yyyy-MM-dd>/<HH>`

[Konektor FileSystem wastafel](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink) membuat file baru di pos pemeriksaan Flink. Saat berjalan secara lokal, aplikasi menjalankan pos pemeriksaan setiap 5 detik (5.000 milidetik), seperti yang ditentukan dalam kode. 

```
 if (env instanceof LocalStreamEnvironment) {
    env.enableCheckpointing(5000);
 }
```

**Untuk menelusuri bucket S3 dan mengamati file yang ditulis oleh aplikasi**

1. 

   1. Buka konsol Amazon S3 di. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Pilih ember yang Anda buat sebelumnya. 

1. Arahkan ke `output` jalur, lalu ke folder tanggal dan jam yang sesuai dengan waktu saat ini di zona waktu UTC. 

1. Segarkan secara berkala untuk mengamati file baru yang muncul setiap 5 detik.

1. Pilih dan unduh satu file untuk mengamati konten.
**catatan**  
Secara default, file tidak memiliki ekstensi. Konten diformat sebagai JSON. Anda dapat membuka file dengan editor teks apa pun untuk memeriksa konten.

## Menghentikan aplikasi Anda berjalan secara lokal
<a name="gs-table-stop"></a>

Hentikan aplikasi yang berjalan di IDE Anda. IDE biasanya menyediakan opsi “berhenti”. Lokasi dan metode yang tepat tergantung pada IDE. 

## Kompilasi dan paket kode aplikasi Anda
<a name="gs-table-5.5"></a>

Di bagian ini, Anda menggunakan Apache Maven untuk mengkompilasi kode Java dan mengemasnya ke dalam file JAR. Anda dapat mengkompilasi dan mengemas kode Anda menggunakan alat baris perintah Maven atau IDE Anda.

**Untuk mengkompilasi dan paket menggunakan baris perintah Maven**

Pindah ke direktori yang berisi GettingStarted proyek Jave dan jalankan perintah berikut:

```
$ mvn package
```

**Untuk mengkompilasi dan paket menggunakan IDE Anda**

Jalankan `mvn package` dari integrasi IDE Maven Anda.

Dalam kedua kasus, file JAR `target/amazon-msf-java-table-app-1.0.jar` dibuat.

**catatan**  
Menjalankan *proyek build* dari IDE Anda mungkin tidak membuat file JAR.

## Unggah file JAR kode aplikasi
<a name="gs-table-6"></a>

Di bagian ini, Anda mengunggah file JAR yang Anda buat di bagian sebelumnya ke bucket Amazon S3 yang Anda buat di awal tutorial ini. Jika Anda sudah melakukannya, selesaikan[Buat bucket Amazon S3.](#gs-table-resources-s3).

**Untuk mengunggah kode aplikasi**

1. Buka konsol Amazon S3 di. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Pilih bucket yang sebelumnya Anda buat untuk kode aplikasi.

1. Pilih bidang **Unggah**. 

1. Pilih **Tambahkan file**.

1. Arahkan ke file JAR yang dihasilkan di bagian sebelumnya:`target/amazon-msf-java-table-app-1.0.jar`.

1. Pilih **Unggah** tanpa mengubah pengaturan lainnya.
**Awas**  
 Pastikan Anda memilih file JAR yang benar di`<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar`.   
Direktori target juga berisi file JAR lain yang tidak perlu Anda unggah. 

## Buat dan konfigurasikan Layanan Terkelola untuk aplikasi Apache Flink
<a name="gs-table-7"></a>

Anda dapat membuat dan mengkonfigurasi Layanan Terkelola untuk aplikasi Apache Flink menggunakan konsol atau aplikasi. AWS CLI Untuk tutorial ini, Anda akan menggunakan konsol.

**catatan**  
Saat Anda membuat aplikasi menggunakan konsol, sumber daya AWS Identity and Access Management (IAM) dan Amazon CloudWatch Logs dibuat untuk Anda. Saat Anda membuat aplikasi menggunakan AWS CLI, Anda harus membuat sumber daya ini secara terpisah.

### Buat aplikasi
<a name="gs-table-7-console-create"></a>

1. Masuk ke Konsol Manajemen AWS, dan buka konsol Amazon MSF di https://console.aws.amazon.com/flink.

1. Verifikasi bahwa Wilayah yang benar dipilih: US East (Virginia N.) us-east-1.

1. Di menu kanan, pilih **Apache Flink Applications** dan kemudian pilih **Create Streaming** Application. Atau, pilih **Buat aplikasi streaming** di bagian **Memulai** di halaman awal. 

1. Pada halaman **Buat aplikasi streaming**, lengkapi yang berikut ini:
   + Untuk **Pilih metode untuk mengatur aplikasi pemrosesan aliran**, pilih **Buat dari awal**.
   + Untuk **konfigurasi Apache Flink, versi Application Flink,** pilih **Apache** Flink 1.19.
   + Di bagian **Konfigurasi aplikasi**, lengkapi yang berikut ini:
     + Untuk **Application name** (Nama aplikasi), masukkan **MyApplication**.
     + Untuk **Description** (Deskripsi), masukkan **My Java Table API test app**.
     + Untuk **Akses ke sumber daya aplikasi**, pilih **Buat/perbarui peran IAM kinesis-analytics-** dengan kebijakan yang diperlukan. MyApplication-us-east-1 
   + Di **Template untuk pengaturan aplikasi**, lengkapi yang berikut ini:
     + Untuk **Template**, pilih **Develoment**.

1. Pilih **Buat aplikasi streaming**.

**catatan**  
Saat membuat aplikasi Managed Service for Apache Flink menggunakan konsol, Anda memiliki opsi untuk membuat peran dan kebijakan IAM untuk aplikasi Anda. Aplikasi Anda menggunakan peran dan kebijakan ini untuk mengakses sumber daya dependen. Sumber daya IAM ini diberi nama menggunakan nama aplikasi dan Wilayah sebagai berikut:  
Kebijakan: `kinesis-analytics-service-{{MyApplication}}-{{us-east-1}}`
Peran: `kinesisanalytics-{{MyApplication}}-{{us-east-1}}`

### Edit kebijakan IAM
<a name="gs-table-7-console-iam"></a>

Edit kebijakan IAM untuk menambahkan izin agar dapat mengakses bucket Amazon S3.

**Untuk mengedit kebijakan IAM agar dapat menambahkan izin bucket S3**

1. Buka konsol IAM di [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Pilih **Policies** (Kebijakan). Pilih kebijakan **`kinesis-analytics-service-MyApplication-us-east-1`** yang dibuat konsol untuk Anda di bagian sebelumnya. 

1. Pilih **Edit** dan kemudian pilih tab **JSON**.

1. Tambahkan bagian yang disorot dari contoh kebijakan berikut ke kebijakan. Ganti contoh ID akun ({{012345678901}}) dengan ID akun Anda dan {{<bucket-name>}} dengan nama bucket S3 yang Anda buat.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket}}/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           }{{, 
           {
               "Sid": "WriteOutputBucket", 
               "Effect": "Allow", 
               "Action": "s3:*", 
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket2}}"
               ]
          }}}
       ]
   }
   ```

------

1.  Pilih **Berikutnya** dan kemudian pilih **Simpan perubahan**.

### Konfigurasikan aplikasi
<a name="gs-table-7-console-configure"></a>

Edit aplikasi untuk mengatur artefak kode aplikasi.

**Untuk mengonfigurasi aplikasi**

1. Pada **MyApplication**halaman, pilih **Konfigurasi**.

1. **Di bagian **Lokasi kode aplikasi**, pilih Konfigurasi.**
   + Untuk bucket **Amazon S3, pilih bucket** yang sebelumnya Anda buat untuk kode aplikasi. Pilih **Browse** dan pilih bucket yang benar, lalu pilih **Pilih**. Jangan klik nama bucket. 
   + Untuk **Jalur ke objek Amazon S3**, masukkan **amazon-msf-java-table-app-1.0.jar**.

1. Untuk **Access permissions** (Izin akses), pilih **Create / update IAM role `kinesis-analytics-MyApplication-us-east-1`** (Buat/perbarui IAM role ).

1. Di bagian **properti Runtime**, tambahkan properti berikut. 

1. Pilih **Tambahkan item baru** dan tambahkan masing-masing parameter berikut:    
[See the AWS documentation website for more details](http://docs.aws.amazon.com/id_id/managed-flink/latest/java/gs-table-create.html)

1. Jangan memodifikasi pengaturan lainnya.

1. Pilih **Simpan perubahan**.

**catatan**  
Saat Anda memilih untuk mengaktifkan CloudWatch pencatatan Amazon, Layanan Terkelola untuk Apache Flink membuat grup log dan aliran log untuk Anda. Nama-nama sumber daya ini adalah sebagai berikut:   
Grup log: `/aws/kinesis-analytics/MyApplication`
Aliran log: `kinesis-analytics-log-stream`

### Jalankan aplikasi
<a name="gs-table-7-console-run"></a>

Aplikasi sekarang dikonfigurasi dan siap dijalankan.

**Untuk menjalankan aplikasi**

1. Kembali ke halaman konsol di Amazon Managed Service untuk Apache Flink dan pilih. **MyApplication**

1. Pilih **Jalankan** untuk memulai aplikasi.

1. Pada **konfigurasi Pemulihan aplikasi**, pilih **Jalankan dengan snapshot terbaru**.

1. Pilih **Jalankan**.

1. **Status** dalam **Aplikasi merinci** transisi dari `Ready` ke `Starting` dan kemudian ke `Running` setelah aplikasi dimulai. 

Saat aplikasi dalam `Running` status, Anda dapat membuka dasbor Flink. 

**Untuk membuka dasbor dan melihat pekerjaan**

1. Pilih **Open Apache Flink** dashbard. Dasbor terbuka di halaman baru.

1. Dalam daftar **Running Jobs**, pilih satu pekerjaan yang dapat Anda lihat.
**catatan**  
Jika Anda menyetel properti runtime atau mengedit kebijakan IAM secara tidak benar, status aplikasi mungkin berubah menjadi`Running`, tetapi dasbor Flink menunjukkan pekerjaan yang terus dimulai ulang. Ini adalah skenario kegagalan umum ketika aplikasi salah konfigurasi atau tidak memiliki izin untuk mengakses sumber daya eksternal.   
Ketika ini terjadi, periksa tab **Pengecualian** di dasbor Flink untuk menyelidiki penyebab masalah.

### Amati metrik aplikasi yang sedang berjalan
<a name="gs-observe-metrics"></a>

Pada **MyApplication**halaman, di bagian ** CloudWatch metrik Amazon**, Anda dapat melihat beberapa metrik dasar dari aplikasi yang sedang berjalan. 

**Untuk melihat metrik**

1. Di sebelah tombol **Refresh**, pilih **10 detik** dari daftar dropdown.

1. Saat aplikasi berjalan dan sehat, Anda dapat melihat metrik **uptime** terus meningkat.

1. Metrik **fullrestart** harus nol. Jika meningkat, konfigurasi mungkin memiliki masalah. Tinjau tab **Pengecualian** di dasbor Flink untuk menyelidiki masalah ini.

1. **Jumlah metrik pos pemeriksaan yang gagal** harus nol dalam aplikasi yang sehat. 
**catatan**  
Dasbor ini menampilkan satu set metrik tetap dengan perincian 5 menit. Anda dapat membuat dasbor aplikasi khusus dengan metrik apa pun di CloudWatch dasbor.

### Amati data penulisan aplikasi ke bucket tujuan
<a name="gs-observe-output"></a>

Anda sekarang dapat mengamati aplikasi yang berjalan di Amazon Managed Service untuk Apache Flink menulis file ke Amazon S3.

Untuk mengamati file, ikuti proses yang sama yang Anda gunakan untuk memeriksa file yang sedang ditulis ketika aplikasi berjalan secara lokal. Lihat [Amati data penulisan aplikasi ke bucket S3](#gs-table-input-output). 

Ingat bahwa aplikasi menulis file baru di pos pemeriksaan Flink. Saat berjalan di Amazon Managed Service untuk Apache Flink, pos pemeriksaan diaktifkan secara default dan dijalankan setiap 60 detik. Aplikasi ini membuat file baru kira-kira setiap 1 menit.

### Hentikan aplikasi
<a name="gs-table-7-console-stop"></a>

Untuk menghentikan aplikasi, buka halaman konsol dari aplikasi Managed Service for Apache Flink bernama. `MyApplication`

**Untuk menghentikan aplikasi**

1. **Dari daftar dropdown **Action**, pilih Stop.**

1. **Status** dalam **Aplikasi merinci** transisi dari `Running` ke`Stopping`, dan kemudian ke `Ready` saat aplikasi benar-benar dihentikan. 
**catatan**  
Jangan lupa juga berhenti mengirim data ke input stream dari script Python atau Kinesis Data Generator.