

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Pertahankan praktik terbaik untuk Layanan Terkelola untuk aplikasi Apache Flink
<a name="best-practices"></a>

Bagian ini berisi informasi dan rekomendasi untuk mengembangkan Layanan Terkelola yang stabil dan berkinerja untuk aplikasi Apache Flink.

**Topics**
+ [Minimalkan ukuran Uber JAR](#minimize-uber-JAR)
+ [Toleransi kesalahan: titik pemeriksaan dan titik simpan](#how-dev-bp-checkpoint)
+ [Versi konektor yang tidak didukung](#how-dev-bp-connectors)
+ [Performa dan paralelisme](#how-dev-bp-performance)
+ [Pengaturan paralelisme per operator](#how-dev-bp-parallelism)
+ [Pencatatan log](#how-dev-bp-logging)
+ [Pengkodean](#how-dev-bp-code)
+ [Mengelola kredensyal](#how-dev-bp-managing-credentials)
+ [Membaca dari sumber dengan sedikit pecahan/partisi](#troubleshooting-few-shards-partitions)
+ [Interval refresh notebook Studio](#notebook-refresh-rate)
+ [Performa optimum notebook Studio](#notebook-refresh-rate)
+ [Bagaimana strategi watermark dan pecahan idle memengaruhi jendela waktu](#notebook-watermarking)
+ [Tetapkan UUID untuk semua operator](#best-practices-setting-operator-ids)
+ [Tambahkan ServiceResourceTransformer ke plugin Maven shade](#best-practices-service-resource-transformer)

## Minimalkan ukuran Uber JAR
<a name="minimize-uber-JAR"></a>

Java/Scala application must be packaged in an uber (super/fat) JAR dan sertakan semua dependensi tambahan yang diperlukan yang belum disediakan oleh runtime. Namun, ukuran Uber JAR mempengaruhi waktu mulai dan restart aplikasi dan dapat menyebabkan JAR melebihi batas 512 MB.

Untuk mengoptimalkan waktu penerapan, Uber JAR Anda **tidak** boleh menyertakan yang berikut:
+ **Setiap dependensi yang disediakan oleh runtime seperti yang** diilustrasikan dalam contoh berikut. Mereka harus memiliki `provided` ruang lingkup dalam file POM atau `compileOnly` dalam konfigurasi Gradle Anda.
+ **Setiap dependensi yang digunakan untuk pengujian saja**, misalnya JUnit atau Mockito. Mereka harus memiliki `test` ruang lingkup dalam file POM atau `testImplementation` dalam konfigurasi Gradle Anda. 
+ **Dependensi apa pun yang sebenarnya tidak digunakan oleh aplikasi Anda**.
+ **Setiap data statis atau metadata yang diperlukan oleh aplikasi Anda.** Data statis harus dimuat oleh aplikasi saat runtime, misalnya dari datastore atau dari Amazon S3.
+ Lihat [file contoh POM](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/java/GettingStarted/pom.xml) ini untuk detail tentang pengaturan konfigurasi sebelumnya. 

**Dependensi yang disediakan**

Managed Service for Apache Flink runtime menyediakan sejumlah dependensi. Dependensi ini tidak boleh dimasukkan dalam JAR gemuk dan harus memiliki `provided` ruang lingkup dalam file POM atau secara eksplisit dikecualikan dalam konfigurasi. `maven-shade-plugin` Salah satu dependensi ini yang termasuk dalam JAR gemuk diabaikan saat runtime, tetapi meningkatkan ukuran JAR yang menambahkan overhead selama penerapan.

Dependensi disediakan oleh runtime, dalam runtime versi 1.18, 1.19, dan 1.20:
+ `org.apache.flink:flink-core`
+ `org.apache.flink:flink-java`
+ `org.apache.flink:flink-streaming-java`
+ `org.apache.flink:flink-scala_2.12`
+ `org.apache.flink:flink-table-runtime`
+ `org.apache.flink:flink-table-planner-loader`
+ `org.apache.flink:flink-json`
+ `org.apache.flink:flink-connector-base`
+ `org.apache.flink:flink-connector-files`
+ `org.apache.flink:flink-clients`
+ `org.apache.flink:flink-runtime-web`
+ `org.apache.flink:flink-metrics-code`
+ `org.apache.flink:flink-table-api-java`
+ `org.apache.flink:flink-table-api-bridge-base`
+ `org.apache.flink:flink-table-api-java-bridge`
+ `org.apache.logging.log4j:log4j-slf4j-impl`
+ `org.apache.logging.log4j:log4j-api`
+ `org.apache.logging.log4j:log4j-core`
+ `org.apache.logging.log4j:log4j-1.2-api`

Selain itu, runtime menyediakan library yang digunakan untuk mengambil properti runtime aplikasi di Managed Service for Apache Flink. `com.amazonaws:aws-kinesisanalytics-runtime:1.2.0`

Semua dependensi yang disediakan oleh runtime harus menggunakan rekomendasi berikut untuk **tidak** memasukkannya ke dalam Uber JAR: 
+ Di Maven (`pom.xml`) dan SBT (`build.sbt`), gunakan ruang lingkup. `provided`
+ Di Gradle (`build.gradle`), gunakan `compileOnly` konfigurasi.

Ketergantungan apa pun yang disediakan secara tidak sengaja disertakan dalam Uber JAR akan diabaikan saat runtime karena pemuatan kelas induk-pertama Apache Flink. Untuk informasi lebih lanjut, lihat [parent-first-patterns](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default)di dokumentasi Apache Flink.

**Konektor**

Sebagian besar konektor, kecuali FileSystem konektor, yang tidak termasuk dalam runtime harus disertakan dalam file POM dengan cakupan default ()`compile`.

**Rekomendasi lainnya**

Sebagai aturan, Apache Flink uber JAR yang disediakan untuk Managed Service for Apache Flink harus berisi kode minimum yang diperlukan untuk menjalankan aplikasi. Menyertakan dependensi yang menyertakan kelas sumber, kumpulan data pengujian, atau status bootstrap tidak boleh disertakan dalam toples ini. Jika sumber daya statis perlu ditarik saat runtime, pisahkan masalah ini menjadi sumber daya seperti Amazon S3. Contohnya termasuk bootstraps status atau model inferensi.

Luangkan waktu untuk mempertimbangkan pohon ketergantungan mendalam Anda dan hapus dependensi non-runtime. 

Meskipun Managed Service untuk Apache Flink mendukung ukuran jar 512MB, ini harus dilihat sebagai pengecualian aturan. Apache Flink saat ini mendukung ukuran jar \$1 104MB melalui konfigurasi defaultnya, dan itu harus menjadi ukuran target maksimum dari toples yang dibutuhkan.

## Toleransi kesalahan: titik pemeriksaan dan titik simpan
<a name="how-dev-bp-checkpoint"></a>

Gunakan pos pemeriksaan dan savepoint untuk menerapkan toleransi kesalahan dalam Layanan Terkelola untuk aplikasi Apache Flink Anda. Ingat hal berikut saat mengembangkan dan memelihara aplikasi Anda:
+ Kami menyarankan agar Anda tetap mengaktifkan checkpointing untuk aplikasi Anda. Checkpointing memberikan toleransi kesalahan untuk aplikasi Anda selama pemeliharaan terjadwal, dan juga untuk kegagalan tak terduga karena masalah layanan, kegagalan ketergantungan aplikasi, dan masalah lainnya. Untuk informasi tentang pemeliharaan terjadwal, lihat [Mengelola tugas pemeliharaan untuk Managed Service untuk Apache Flink](maintenance.md).
+ Set [ApplicationSnapshotConfiguration:: SnapshotsEnabled](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html) ke `false` selama pengembangan aplikasi atau pemecahan masalah. Snapshot dibuat selama setiap aplikasi berhenti, yang dapat menyebabkan masalah jika aplikasi dalam keadaan tidak sehat atau tidak berkinerja. Atur `SnapshotsEnabled` ke `true` setelah aplikasi dalam produksi dan stabil.
**catatan**  
Kami menyarankan Anda mengatur aplikasi Anda untuk membuat snapshot beberapa kali sehari untuk memulai ulang dengan benar dengan data status yang benar. Frekuensi yang benar untuk snapshot Anda bergantung pada logika bisnis aplikasi Anda. Mengambil snapshot yang sering memungkinkan Anda memulihkan data yang lebih baru, tetapi meningkatkan biaya dan membutuhkan lebih banyak sumber daya sistem.

  Untuk informasi tentang pemantauan waktu henti aplikasi, lihat [Metrik dan dimensi dalam Layanan Terkelola untuk Apache Flink](metrics-dimensions.md).

Untuk informasi selengkapnya tentang penerapan toleransi kegagalan, lihat [Menerapkan toleransi kesalahan](how-fault.md).

## Versi konektor yang tidak didukung
<a name="how-dev-bp-connectors"></a>

Dari Apache Flink versi 1.15 atau yang lebih baru, Managed Service for Apache Flink secara otomatis mencegah aplikasi memulai atau memperbarui jika mereka menggunakan versi konektor Kinesis yang tidak didukung yang dibundel ke dalam aplikasi. JARs Saat memutakhirkan ke Managed Service untuk Apache Flink versi 1.15 atau yang lebih baru, pastikan Anda menggunakan konektor Kinesis terbaru. Ini adalah versi apa pun yang sama dengan atau lebih baru dari versi 1.15.2. Semua versi lain tidak didukung oleh Managed Service untuk Apache Flink karena mereka dapat menyebabkan masalah konsistensi atau kegagalan dengan fitur **Stop with Savepoint**, mencegah operasi berhenti/pembaruan bersih. Untuk mempelajari lebih lanjut tentang kompatibilitas konektor di Amazon Managed Service untuk versi Apache Flink, lihat konektor [Apache](https://docs.aws.amazon.com/managed-flink/latest/java/how-flink-connectors.html) Flink.

## Performa dan paralelisme
<a name="how-dev-bp-performance"></a>

Aplikasi Anda dapat diskalakan untuk memenuhi tingkat throughput apa pun dengan menyetel paralelisme aplikasi Anda, dan menghindari perangkap performa. Ingat hal berikut saat mengembangkan dan memelihara aplikasi Anda:
+ Verifikasi bahwa semua sumber aplikasi dan sink Anda ditetapkan dengan cukup dan tidak dibatasi. Jika sumber dan wastafel adalah AWS layanan lain, pantau layanan tersebut menggunakan [CloudWatch](https://docs.aws.amazon.com/cloudwatch/?id=docs_gateway).
+ Untuk aplikasi dengan paralelisme yang sangat tinggi, periksa apakah tingkat paralelisme yang tinggi diterapkan pada semua operator dalam aplikasi. Secara default, Apache Flink menerapkan paralelisme aplikasi yang sama untuk semua operator dalam grafik aplikasi. Ini dapat menyebabkan masalah penyediaan pada sumber atau sink, atau pun hambatan dalam pemrosesan data operator. Anda dapat mengubah paralelisme setiap operator dalam kode dengan [setParallelism](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/parallel.html).
+ Pahami arti pengaturan paralelisme untuk operatori dalam aplikasi Anda. Jika Anda mengubah paralelisme untuk operator, Anda mungkin tidak dapat memulihkan aplikasi dari snapshot yang dibuat ketika operator memiliki paralelisme yang tidak kompatibel dengan pengaturan saat ini. Untuk informasi selengkapnya tentang pengaturan paralelisme operator, lihat [ Mengatur paralelisme maksimum untuk operator secara eksplisit](https://nightlies.apache.org/flink/flink-docs-release-1.15/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly).

Untuk informasi selengkapnya tentang penerapan penskalaan, lihat [Menerapkan penskalaan aplikasi](how-scaling.md).

## Pengaturan paralelisme per operator
<a name="how-dev-bp-parallelism"></a>

Secara default, semua operator memiliki paralelisme yang ditetapkan pada tingkat aplikasi. Anda dapat mengganti paralelisme dari satu operator menggunakan API. DataStream `.setParallelism(x)` Anda dapat mengatur paralelisme operator ke paralelisme apa pun yang sama atau lebih rendah dari paralelisme aplikasi. 

Jika memungkinkan, tentukan paralelisme operator sebagai fungsi dari paralelisme aplikasi. Dengan cara ini, paralelisme operator akan bervariasi dengan paralelisme aplikasi. Jika Anda menggunakan penskalaan otomatis, misalnya, semua operator akan memvariasikan paralelisme mereka dalam proporsi yang sama:

```
int appParallelism = env.getParallelism();
...
...ops.setParalleism(appParallelism/2);
```

Dalam beberapa kasus, Anda mungkin ingin mengatur paralelisme operator ke konstanta. Misalnya, mengatur paralelisme sumber Aliran Kinesis ke jumlah pecahan. Dalam kasus ini, pertimbangkan untuk meneruskan paralelisme operator sebagai parameter konfigurasi aplikasi untuk mengubahnya tanpa mengubah kode, misalnya untuk mengubah aliran sumber. 

## Pencatatan log
<a name="how-dev-bp-logging"></a>

Anda dapat memantau kinerja dan kondisi kesalahan aplikasi Anda menggunakan CloudWatch Log. Ingat hal berikut saat mengonfigurasi pencatatan untuk aplikasi Anda: 
+ Aktifkan CloudWatch pencatatan untuk aplikasi sehingga masalah runtime apa pun dapat di-debug.
+ Jangan buat entri log untuk setiap catatan yang diproses dalam aplikasi. Hal ini menyebabkan kemacetan parah selama pemrosesan dan dapat menyebabkan tekanan balik dalam pemrosesan data.
+ Buat CloudWatch alarm untuk memberi tahu Anda ketika aplikasi Anda tidak berjalan dengan benar. Untuk informasi selengkapnya, lihat [Gunakan CloudWatch Alarm dengan Amazon Managed Service untuk Apache Flink](monitoring-metrics-alarms.md)

Untuk informasi selengkapnya tentang penerapan pencatatan, lihat [](monitoring-overview.md).

## Pengkodean
<a name="how-dev-bp-code"></a>

Anda dapat membuat aplikasi Anda berfungsi dan stabil menggunakan praktik pemrograman yang direkomendasikan. Ingat hal berikut saat menulis kode aplikasi:
+ Jangan gunakan `system.exit()` dalam kode aplikasi Anda, baik dalam metode `main` aplikasi Anda atau dalam fungsi yang ditetapkan pengguna. Jika Anda ingin menonaktifkan aplikasi Anda dari dalam kode, lempar pengecualian yang berasal dari `Exception` atau `RuntimeException`, yang berisi pesan tentang apa yang salah dengan aplikasi. 

  Catat hal berikut tentang bagaimana layanan menangani pengecualian ini:
  + Jika pengecualian dilemparkan dari metode `main` aplikasi Anda, layanan akan membungkusnya dalam `ProgramInvocationException` saat transisi aplikasi ke status `RUNNING`, dan manajer tugas akan gagal mengirimkan tugas.
  + Jika pengecualian dilemparkan dari fungsi yang ditetapkan pengguna, manajer tugas akan gagal tugas dan memulai ulang, serta detail pengecualian akan ditulis ke log pengecualian.
+ Pertimbangkan bayangan file JAR aplikasi Anda dan dependensi yang disertakan. Bayangan direkomendasikan ketika ada potensi konflik dalam nama paket antara aplikasi Anda dan runtime Apache Flink. Jika terjadi konflik, log aplikasi Anda mungkin berisi pengecualian tipe `java.util.concurrent.ExecutionException`. Untuk informasi selengkapnya tentang bayangan file JAR aplikasi Anda, lihat [Plugin Apache Maven Shade](https://maven.apache.org/plugins/maven-shade-plugin/).

## Mengelola kredensyal
<a name="how-dev-bp-managing-credentials"></a>

Anda tidak boleh memanggang kredensi jangka panjang apa pun ke dalam aplikasi produksi (atau lainnya). Kredensi jangka panjang kemungkinan diperiksa ke dalam sistem kontrol versi dan dapat dengan mudah hilang. Sebagai gantinya, Anda dapat mengaitkan peran ke aplikasi Managed Service for Apache Flink dan memberikan izin ke peran tersebut. Aplikasi Flink yang sedang berjalan kemudian dapat memilih kredensyal sementara dengan izin masing-masing dari lingkungan. [Dalam hal otentikasi diperlukan untuk layanan yang tidak terintegrasi secara native dengan IAM, misalnya, database yang memerlukan nama pengguna dan kata sandi untuk otentikasi, Anda harus mempertimbangkan untuk menyimpan rahasia di Secrets Manager.AWS](https://aws.amazon.com/secrets-manager/)

Banyak layanan AWS asli mendukung otentikasi:
+ [Kinesis Data ProcessTaxiStream Streams — .java](hhttps://github.com/aws-samples/amazon-kinesis-data-analytics-taxi-consumer/blob/master/src/main/java/com/amazonaws/samples/kaja/taxi/consumer/ProcessTaxiStream.java#L90)
+ Amazon MSK — [https://github.com/aws/aws-msk-iam-authusing-the-amazon-msk/\$1](https://github.com/aws/aws-msk-iam-auth/#using-the-amazon-msk-library-for-iam-authentication) - library-for-iam-authentication
+ [Amazon Elasticsearch Service — .java AmazonElasticsearchSink](https://github.com/aws-samples/amazon-kinesis-data-analytics-taxi-consumer/blob/master/src/main/java/com/amazonaws/samples/kaja/taxi/consumer/operators/AmazonElasticsearchSink.java)
+ Amazon S3 - bekerja di luar kotak pada Layanan Terkelola untuk Apache Flink

## Membaca dari sumber dengan sedikit pecahan/partisi
<a name="troubleshooting-few-shards-partitions"></a>

Saat membaca dari Apache Kafka atau Aliran Data Kinesis, mungkin ada ketidakcocokan antara paralelisme aliran (jumlah partisi untuk Kafka dan jumlah pecahan untuk Kinesis) dan paralelisme aplikasi. Dengan desain yang naif, paralelisme aplikasi tidak dapat berskala melampaui paralelisme aliran: Setiap subtugas operator sumber hanya dapat membaca dari 1 atau lebih piringan/partisi. Itu berarti untuk aliran dengan hanya 2 pecahan dan aplikasi dengan paralelisme 8, bahwa hanya dua subtugas yang benar-benar memakan dari aliran dan 6 subtugas tetap menganggur. Ini secara substansional dapat membatasi throughput aplikasi, khususnya jika deserialisasi mahal dan dilakukan oleh sumber (yang merupakan default).

Untuk mengurangi efek ini, Anda dapat menskalakan aliran. Tapi itu mungkin tidak selalu diinginkan atau mungkin. Atau, Anda dapat merestrukturisasi sumber sehingga tidak melakukan serialisasi apa pun dan hanya meneruskan. `byte[]` Anda kemudian dapat [menyeimbangkan kembali](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/overview/) data untuk mendistribusikannya secara merata di semua tugas dan kemudian deserialisasi data di sana. Dengan cara ini, Anda dapat memanfaatkan semua subtugas untuk deserialisasi dan operasi yang berpotensi mahal ini tidak lagi terikat oleh shards/partitions jumlah aliran.

## Interval refresh notebook Studio
<a name="notebook-refresh-rate"></a>

Jika Anda mengubah interval refresh hasil paragraf, atur ke nilai yang setidaknya 1000 milidetik.

## Performa optimum notebook Studio
<a name="notebook-refresh-rate"></a>

Kami menguji dengan pernyataan berikut dan mendapatkan kinerja optimal ketika `events-per-second` dikalikan dengan di bawah `number-of-keys` 25.000.000. Ini adalah untuk `events-per-second` di bawah 150.000.

```
SELECT key, sum(value) FROM key-values GROUP BY key
```

## Bagaimana strategi watermark dan pecahan idle memengaruhi jendela waktu
<a name="notebook-watermarking"></a>

Saat membaca peristiwa dari Apache Kafka dan Kinesis Data Streams, sumber dapat mengatur waktu acara berdasarkan atribut aliran. Dalam kasus Kinesis, waktu acara sama dengan perkiraan waktu kedatangan peristiwa. Tetapi pengaturan waktu acara di sumber untuk acara tidak cukup bagi aplikasi Flink untuk menggunakan waktu acara. Sumber juga harus menghasilkan tanda air yang menyebarkan informasi tentang waktu acara dari sumber ke semua operator lain. [Dokumentasi Flink](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/time/) memiliki gambaran yang baik tentang cara kerja proses itu.

Secara default, stempel waktu peristiwa yang dibaca dari Kinesis diatur ke perkiraan waktu kedatangan yang ditentukan oleh Kinesis. Prasyarat tambahan untuk waktu acara untuk bekerja dalam aplikasi adalah strategi watermark.

```
WatermarkStrategy<String> s = WatermarkStrategy
    .<String>forMonotonousTimestamps()
    .withIdleness(Duration.ofSeconds(...));
```

Strategi watermark kemudian diterapkan ke a `DataStream` with the `assignTimestampsAndWatermarks` method. Ada beberapa strategi bawaan yang berguna:
+ `forMonotonousTimestamps()`hanya akan menggunakan waktu acara (perkiraan waktu kedatangan) dan secara berkala memancarkan nilai maksimum sebagai tanda air (untuk setiap subtugas tertentu)
+ `forBoundedOutOfOrderness(Duration.ofSeconds(...))`mirip dengan strategi sebelumnya, tetapi akan menggunakan waktu acara - durasi untuk pembuatan tanda air.

Dari [dokumentasi Flink](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/time/):

*Setiap subtugas paralel dari fungsi sumber biasanya menghasilkan tanda airnya secara independen. Tanda air ini menentukan waktu acara pada sumber paralel tertentu.*

*Saat tanda air mengalir melalui program streaming, mereka memajukan waktu acara di operator tempat mereka tiba. Setiap kali operator memajukan waktu acaranya, ia menghasilkan tanda air baru di hilir untuk operator penggantinya.*

*Beberapa operator menggunakan beberapa aliran input; serikat pekerja, misalnya, atau operator yang mengikuti fungsi KeyBy (...) atau partisi (...). Waktu kejadian operator saat ini adalah minimum waktu acara aliran inputnya. Karena aliran inputnya memperbarui waktu acara mereka, begitu juga operator.*

Itu berarti, jika subtugas sumber mengkonsumsi dari pecahan siaga, operator hilir tidak menerima tanda air baru dari subtugas itu dan karenanya memproses stall untuk semua operator hilir yang menggunakan jendela waktu. Untuk menghindari hal ini, pelanggan dapat menambahkan `withIdleness` opsi ke strategi tanda air. Dengan opsi itu, operator mengecualikan tanda air dari subtugas upstream idle saat menghitung waktu acara operator. Oleh karena itu, subtugas idle tidak lagi memblokir kemajuan waktu acara di operator hilir.

Bergantung pada pemecah pecahan yang Anda gunakan, beberapa pekerja mungkin tidak diberi pecahan Kinesis apa pun. Dalam hal ini, pekerja ini akan memanifestasikan perilaku sumber menganggur bahkan jika semua pecahan Kinesis terus mengirimkan data peristiwa. Anda dapat mengurangi risiko ini dengan menggunakan `uniformShardAssigner` dengan operator sumber. Ini memastikan bahwa semua subtugas sumber memiliki pecahan untuk diproses selama jumlah pekerja kurang atau sama dengan jumlah pecahan aktif. 

Namun, opsi kemalasan dengan strategi tanda air bawaan tidak akan memajukan waktu acara jika tidak ada subtugas yang membaca acara apa pun, yaitu tidak ada acara dalam aliran. Ini menjadi sangat terlihat untuk kasus uji di mana serangkaian peristiwa terbatas dibaca dari aliran. Karena waktu acara tidak berlanjut setelah acara terakhir dibaca, jendela terakhir (berisi acara terakhir) tidak akan ditutup.

### Ringkasan
<a name="notebook-watermarking-summary"></a>
+ `withIdleness`Pengaturan tidak akan menghasilkan tanda air baru jika pecahan tidak digunakan. Ini akan mengecualikan tanda air terakhir yang dikirim oleh subtugas idle dari perhitungan tanda air min di operator hilir.
+ Dengan strategi watermark bawaan, jendela terbuka terakhir tidak akan ditutup (kecuali peristiwa baru yang memajukan tanda air akan dikirim, tetapi itu menciptakan jendela baru yang kemudian tetap terbuka).
+ Bahkan ketika waktu diatur oleh aliran Kinesis, peristiwa kedatangan terlambat masih dapat terjadi jika satu pecahan dikonsumsi lebih cepat daripada yang lain (misalnya selama inisialisasi aplikasi atau saat menggunakan `TRIM_HORIZON` di mana semua pecahan yang ada dikonsumsi secara paralel mengabaikan hubungannya). parent/child 
+ `withIdleness`Pengaturan strategi tanda air tampaknya mengganggu pengaturan khusus sumber Kinesis untuk pecahan siaga. `(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS`

### Contoh
<a name="notebook-watermarking-example"></a>

Aplikasi berikut membaca dari aliran dan membuat jendela sesi berdasarkan waktu acara.

```
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig);

WatermarkStrategy<String> s = WatermarkStrategy
    .<String>forMonotonousTimestamps()
    .withIdleness(Duration.ofSeconds(15));
    
env.addSource(consumer)
    .assignTimestampsAndWatermarks(s)
    .map(new MapFunction<String, Long>() {
        @Override
        public Long map(String s) throws Exception {
            return Long.parseLong(s);
        }
    })
    .keyBy(l -> 0l)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
    .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() {
        @Override
        public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception {
            long count = StreamSupport.stream(iterable.spliterator(), false).count();
            long timestamp = context.currentWatermark();

            System.out.print("XXXXXXXXXXXXXX Window with " + count + " events");
            System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp));


            for (Long l : iterable) {
                System.out.println(l);
            }
        }
    });
```

Dalam contoh berikut, 8 peristiwa ditulis ke aliran pecahan 16 (2 yang pertama dan peristiwa terakhir kebetulan mendarat di pecahan yang sama).

```
$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ==
$ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg==
$ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw==
$ date

{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811028721934184977530127978070210"
}
{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811028795678659974022576354623682"
}
{
    "ShardId": "shardId-000000000014",
    "SequenceNumber": "49627894338659257050897872275134360684221592378842022114"
}
Wed Mar 23 11:19:57 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA==
$ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ==
$ date

{
    "ShardId": "shardId-000000000010",
    "SequenceNumber": "49627894338570054070103749783042116732419934393936642210"
}
{
    "ShardId": "shardId-000000000014",
    "SequenceNumber": "49627894338659257050897872275659034489934342334017700066"
}
Wed Mar 23 11:20:10 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng==
$ date

{
    "ShardId": "shardId-000000000001",
    "SequenceNumber": "49627894338369347363316974173886988345467035365375213586"
}
Wed Mar 23 11:20:22 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw==
$ date

{
    "ShardId": "shardId-000000000008",
    "SequenceNumber": "49627894338525452579706688535878947299195189349725503618"
}
Wed Mar 23 11:20:34 CET 2022

$ sleep 60
$ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA==
$ date

{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811029600823255837371928900796610"
}
Wed Mar 23 11:21:27 CET 2022
```

Masukan ini akan menghasilkan jendela sesi 5: event 1,2,3; event 4,5; event 6; event 7; event 8. Namun, program ini hanya menghasilkan 4 jendela pertama.

```
11:59:21,529 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,531 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:23,209 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,244 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z
11:59:23,377 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,405 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,581 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,586 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:24,790 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2
event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z
event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z
event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z
11:59:24,907 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2
event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z
event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z
event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z
event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z
XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z
3
1
2
XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z
4
5
XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z
6
XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z
7
```

Outputnya hanya menampilkan 4 jendela (tidak ada jendela terakhir yang berisi acara 8). Ini karena waktu acara dan strategi tanda air. Jendela terakhir tidak dapat ditutup karena strategi watermark pra-bangun waktu tidak pernah maju melampaui waktu peristiwa terakhir yang telah dibaca dari aliran. Tetapi agar jendela ditutup, waktu perlu maju lebih dari 10 detik setelah acara terakhir. Dalam hal ini, tanda air terakhir adalah 2022-03-23T 10:21:27.170 Z, tetapi untuk menutup jendela sesi, tanda air 10 detik dan 1 ms kemudian diperlukan.

Jika `withIdleness` opsi dihapus dari strategi tanda air, tidak ada jendela sesi yang akan ditutup, karena “tanda air global” dari operator jendela tidak dapat maju.

Ketika aplikasi Flink dimulai (atau jika ada kemiringan data), beberapa pecahan mungkin dikonsumsi lebih cepat daripada yang lain. Hal ini dapat menyebabkan beberapa tanda air dipancarkan terlalu dini dari subtugas (subtugas mungkin memancarkan tanda air berdasarkan konten satu pecahan tanpa dikonsumsi dari pecahan lain yang dilangganannya). Cara untuk mengurangi adalah strategi watermarking yang berbeda yang menambahkan buffer keamanan `(forBoundedOutOfOrderness(Duration.ofSeconds(30))` atau secara eksplisit memungkinkan acara kedatangan terlambat. `(allowedLateness(Time.minutes(5))`

## Tetapkan UUID untuk semua operator
<a name="best-practices-setting-operator-ids"></a>

Ketika Layanan Terkelola untuk Apache Flink memulai pekerjaan Flink untuk aplikasi dengan snapshot, pekerjaan Flink dapat gagal dimulai karena masalah tertentu. Salah satunya adalah *ketidakcocokan ID operator*. Flink mengharapkan operator eksplisit dan konsisten IDs untuk operator grafik pekerjaan Flink. Jika tidak disetel secara eksplisit, Flink menghasilkan ID untuk operator. Ini karena Flink menggunakan operator ini IDs untuk mengidentifikasi operator secara unik dalam grafik pekerjaan dan menggunakannya untuk menyimpan status setiap operator di savepoint.

Masalah *ketidakcocokan ID operator* terjadi ketika Flink tidak menemukan pemetaan 1:1 antara operator grafik pekerjaan dan operator IDs yang IDs ditentukan dalam savepoint. Ini terjadi ketika operator konsisten eksplisit tidak IDs disetel dan Flink menghasilkan operator IDs yang mungkin tidak konsisten dengan setiap pembuatan grafik pekerjaan. Kemungkinan aplikasi mengalami masalah ini tinggi selama pemeliharaan berjalan. Untuk menghindari hal ini, kami menyarankan pelanggan mengatur UUID untuk semua operator dalam kode Flink. Untuk informasi selengkapnya, lihat topik *Menetapkan UUID untuk semua operator* di bawah Kesiapan [produksi](https://docs.aws.amazon.com/managed-flink/latest/java/production-readiness.html).

## Tambahkan ServiceResourceTransformer ke plugin Maven shade
<a name="best-practices-service-resource-transformer"></a>

Flink menggunakan Java [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) untuk memuat komponen seperti konektor dan format. Beberapa dependensi Flink menggunakan SPI [dapat menyebabkan bentrokan di uber-jar dan perilaku aplikasi yang tidak terduga](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#transform-table-connectorformat-resources). Kami menyarankan Anda menambahkan plugin bayangan Maven, yang didefinisikan dalam pom.xml. [ServiceResourceTransformer](https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html)

```
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
        </plugin>
```