

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Praktik terbaik untuk klien (Valkey dan Redis OSS)
<a name="BestPractices.Clients.redis"></a>

Pelajari praktik terbaik untuk skenario umum dan ikuti contoh kode dari beberapa pustaka klien Valkey dan Redis OSS open source paling populer (redis-py,, dan Lettuce) PHPRedis, serta praktik terbaik untuk berinteraksi dengan ElastiCache sumber daya dengan pustaka klien Memcached open-source yang umum digunakan.

**Topics**
+ [Sejumlah besar koneksi (Valkey dan Redis OSS)](BestPractices.Clients.Redis.Connections.md)
+ [Penemuan klien cluster dan backoff eksponensial (Valkey dan Redis OSS)](BestPractices.Clients.Redis.Discovery.md)
+ [Konfigurasikan batas waktu sisi klien (Valkey dan Redis OSS)](BestPractices.Clients.Redis.ClientTimeout.md)
+ [Konfigurasikan batas waktu idle sisi server (Valkey dan Redis OSS)](BestPractices.Clients.Redis.ServerTimeout.md)
+ [Skrip Lua](BestPractices.Clients.Redis.LuaScripts.md)
+ [Menyimpan item komposit besar (Valkey dan Redis OSS)](BestPractices.Clients.Redis.LargeItems.md)
+ [Konfigurasi klien selada (Valkey dan Redis OSS)](BestPractices.Clients-lettuce.md)
+ [Mengkonfigurasi protokol pilihan untuk cluster tumpukan ganda (Valkey dan Redis OSS)](#network-type-configuring-dual-stack-redis)

# Sejumlah besar koneksi (Valkey dan Redis OSS)
<a name="BestPractices.Clients.Redis.Connections"></a>

Cache tanpa server dan individual ElastiCache untuk node Redis OSS mendukung hingga 65.000 koneksi klien bersamaan. Namun, untuk mengoptimalkan performa, kami menyarankan agar aplikasi klien tidak terus-menerus beroperasi pada tingkat koneksi tersebut. Valkey dan Redis OSS masing-masing memiliki proses single-threaded berdasarkan loop peristiwa di mana permintaan klien yang masuk ditangani secara berurutan. Artinya, waktu respons klien tertentu menjadi lebih lama seiring jumlah klien yang terhubung meningkat.

Anda dapat mengambil serangkaian tindakan berikut untuk menghindari hambatan koneksi pada server Valkey atau Redis OSS:
+ Lakukan operasi baca dari replika baca. Ini dapat dilakukan dengan menggunakan endpoint ElastiCache pembaca dalam mode cluster dinonaktifkan atau dengan menggunakan replika untuk membaca dalam mode cluster diaktifkan, termasuk cache tanpa server.
+ Distribusikan lalu lintas penulisan ke beberapa simpul primer. Anda dapat melakukannya dengan dua cara: Anda dapat menggunakan cluster Valkey atau Redis OSS multi-sharded dengan klien yang mampu mode cluster. Anda juga dapat menulis ke beberapa simpul primer dalam mode klaster dinonaktifkan dengan sharding sisi klien. Hal ini dilakukan secara otomatis dalam cache nirserver.
+ Gunakan pool koneksi jika tersedia di pustaka klien Anda.

Secara umum, membuat koneksi TCP adalah operasi komputasi yang mahal dibandingkan dengan perintah Valkey atau Redis OSS yang khas. Misalnya, menangani SET/GET permintaan adalah urutan besarnya lebih cepat saat menggunakan kembali koneksi yang ada. Menggunakan pool koneksi klien dengan ukuran terbatas akan mengurangi overhead manajemen koneksi. Hal ini juga membatasi jumlah koneksi masuk konkuren dari aplikasi klien.

Contoh kode berikut PHPRedis menunjukkan bahwa koneksi baru dibuat untuk setiap permintaan pengguna baru:

```
$redis = new Redis();
if ($redis->connect($HOST, $PORT) != TRUE) {
	//ERROR: connection failed
	return;
}
$redis->set($key, $value);
unset($redis);
$redis = NULL;
```

Kami membandingkan kode ini dalam satu loop pada instance Amazon Elastic Compute Cloud EC2 (Amazon) yang terhubung ke Graviton2 (m6g.2xlarge) untuk node Redis OSS. ElastiCache Kita menempatkan klien dan server di Zona Ketersediaan yang sama. Latensi rata-rata seluruh operasi adalah 2,82 milidetik.

Saat kami memperbarui kode serta menggunakan koneksi persisten dan pool koneksi, latensi rata-rata seluruh operasi adalah 0,21 milidetik:

```
$redis = new Redis();
if ($redis->pconnect($HOST, $PORT) != TRUE) {
	// ERROR: connection failed
	return;
}
$redis->set($key, $value);
unset($redis);
$redis = NULL;
```

Konfigurasi redis.ini yang diperlukan:
+ `redis.pconnect.pooling_enabled=1`
+ `redis.pconnect.connection_limit=10`

Kode berikut adalah contoh [pool koneksi Redis-Py](https://redis.readthedocs.io/en/stable/):

```
conn = Redis(connection_pool=redis.BlockingConnectionPool(host=HOST, max_connections=10))
conn.set(key, value)
```

Kode berikut adalah contoh [pool koneksi Lettuce](https://lettuce.io/core/release/reference/#_connection_pooling):

```
RedisClient client = RedisClient.create(RedisURI.create(HOST, PORT));
GenericObjectPool<StatefulRedisConnection> pool = ConnectionPoolSupport.createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig());
pool.setMaxTotal(10); // Configure max connections to 10
try (StatefulRedisConnection connection = pool.borrowObject()) {
	RedisCommands syncCommands = connection.sync();
	syncCommands.set(key, value);
}
```

# Penemuan klien cluster dan backoff eksponensial (Valkey dan Redis OSS)
<a name="BestPractices.Clients.Redis.Discovery"></a>

Saat menghubungkan ke cluster ElastiCache Valkey atau Redis OSS dalam mode cluster diaktifkan, pustaka klien yang sesuai harus sadar cluster. Klien harus mendapatkan peta slot hash ke simpul yang sesuai di klaster untuk mengirim permintaan ke simpul yang tepat dan menghindari overhead performa penanganan pengalihan klaster. Akibatnya, klien harus menemukan daftar lengkap slot dan simpul yang dipetakan dalam dua situasi berbeda:
+ Klien diinisialisasi dan harus mengisi konfigurasi slot awal
+ Pengalihan MOVED diterima dari server, seperti dalam situasi failover ketika semua slot yang dilayani oleh simpul primer sebelumnya diambil alih oleh replika, atau melakukan resharding ketika slot dipindahkan dari simpul primer sumber ke simpul primer target

Penemuan klien biasanya dilakukan dengan mengeluarkan perintah CLUSTER SLOT atau CLUSTER NODE ke server Valkey atau Redis OSS. Sebaiknya gunakan metode CLUSTER SLOT karena metode ini mengembalikan set rentang slot dan simpul primer dan replika terkait ke klien. Metode ini tidak memerlukan parsing tambahan dari klien dan lebih efisien.

Tergantung topologi klaster, ukuran respons untuk perintah CLUSTER SLOT dapat bervariasi berdasarkan ukuran klaster. Klaster yang lebih besar dengan lebih banyak simpul menghasilkan respons yang lebih besar. Oleh karena itu, penting untuk memastikan bahwa jumlah klien yang melakukan penemuan topologi klaster tidak bertambah tanpa batas. Misalnya, ketika aplikasi klien diaktifkan atau kehilangan koneksi dari server dan harus melakukan penemuan klaster, satu kesalahan umumnya adalah bahwa aplikasi klien memicu beberapa permintaan koneksi ulang dan penemuan tanpa menambahkan backoff eksponensial saat mencoba lagi. Ini dapat membuat server Valkey atau Redis OSS tidak responsif untuk jangka waktu yang lama, dengan pemanfaatan CPU 100%. Pemadaman akan lebih lama jika setiap perintah CLUSTER SLOT harus memproses sejumlah besar simpul di bus klaster. Kami telah mengamati beberapa pemadaman klien di masa lalu karena perilaku ini di sejumlah bahasa yang berbeda termasuk Python redis-py-cluster () dan Java (Lettuce dan Redisson).

Dalam cache nirserver, banyak masalah secara otomatis dikurangi karena topologi klaster yang dinyatakan bersifat statis dan terdiri dari dua entri: titik akhir tulis dan titik akhir baca. Penemuan klaster juga secara otomatis tersebar di beberapa simpul saat menggunakan titik akhir cache. Namun, rekomendasi berikut masih berguna.

Untuk mengurangi dampak yang disebabkan oleh masuknya permintaan koneksi dan penemuan secara tiba-tiba, kami merekomendasikan hal berikut:
+ Menerapkan pool koneksi klien dengan ukuran terbatas untuk membatasi jumlah koneksi masuk konkuren dari aplikasi klien.
+ Ketika klien terputus dari server karena waktu habis, coba lagi dengan backoff eksponensial dengan jitter. Hal ini membantu menghindari banyak klien membebani server pada waktu yang sama.
+ Gunakan panduan dalam [Menemukan titik akhir koneksi di ElastiCache](Endpoints.md) guna menemukan titik akhir klaster untuk melakukan penemuan klaster. Dengan begitu, Anda menyebarkan beban penemuan ke semua simpul di klaster (hingga 90), bukan memanfaatkan hanya beberapa simpul seed hardcode di klaster.

Berikut ini adalah beberapa contoh kode untuk logika percobaan ulang backoff eksponensial di redis-py,, dan Lettuce. PHPRedis

**Contoh logika backoff 1: redis-py**

redis-py memiliki mekanisme percobaan ulang bawaan yang mencoba ulang satu kali segera setelah kegagalan. Mekanisme ini dapat diaktifkan melalui `retry_on_timeout` argumen yang diberikan saat membuat objek [Redis OSS](https://redis.readthedocs.io/en/stable/examples/connection_examples.html#redis.Redis). Di sini kita mendemonstrasikan mekanisme percobaan ulang kustom dengan backoff eksponensial dan jitter. Kami telah mengirimkan permintaan tarik untuk mengimplementasikan backoff eksponensial secara native di [redis-py (\$11494)](https://github.com/andymccurdy/redis-py/pull/1494). Di masa depan, hal tersebut mungkin tidak perlu diimplementasikan secara manual.

```
def run_with_backoff(function, retries=5):
base_backoff = 0.1 # base 100ms backoff
max_backoff = 10 # sleep for maximum 10 seconds
tries = 0
while True:
try:
  return function()
except (ConnectionError, TimeoutError):
  if tries >= retries:
	raise
  backoff = min(max_backoff, base_backoff * (pow(2, tries) + random.random()))
  print(f"sleeping for {backoff:.2f}s")
  sleep(backoff)
  tries += 1
```

Anda kemudian dapat menggunakan kode berikut untuk menetapkan nilai:

```
client = redis.Redis(connection_pool=redis.BlockingConnectionPool(host=HOST, max_connections=10))
res = run_with_backoff(lambda: client.set("key", "value"))
print(res)
```

Bergantung pada beban kerja Anda, Anda sebaiknya mengubah nilai backoff dasar dari 1 detik menjadi puluhan atau ratusan milidetik untuk beban kerja yang sensitif terhadap latensi.

**Contoh logika backoff 2: PHPRedis**

PHPRedis memiliki mekanisme coba ulang bawaan yang mencoba ulang maksimum (tidak dapat dikonfigurasi) sebanyak 10 kali. Ada penundaan yang dapat dikonfigurasi di antara percobaan (dengan jitter dari percobaan ulang kedua dan seterusnya). Untuk informasi selengkapnya, lihat [kode sampel](https://github.com/phpredis/phpredis/blob/b0b9dd78ef7c15af936144c1b17df1a9273d72ab/library.c#L335-L368) berikut. [Kami telah mengirimkan permintaan tarik untuk menerapkan backoff eksponensial secara native di [PHPredis (\$11986)](https://github.com/phpredis/phpredis/pull/1986) yang telah digabungkan dan didokumentasikan.](https://github.com/phpredis/phpredis/blob/develop/README.md#retry-and-backoff) Bagi mereka yang berada di rilis terbaru PHPRedis, tidak perlu mengimplementasikan secara manual tetapi kami telah menyertakan referensi di sini untuk mereka yang ada di versi sebelumnya. Untuk saat ini, berikut ini adalah contoh kode yang mengonfigurasi penundaan mekanisme percobaan ulang:

```
$timeout = 0.1; // 100 millisecond connection timeout
$retry_interval = 100; // 100 millisecond retry interval
$client = new Redis();
if($client->pconnect($HOST, $PORT, $timeout, NULL, $retry_interval) != TRUE) {
	return; // ERROR: connection failed
}
$client->set($key, $value);
```

**Contoh logika backoff 3: Lettuce**

Lettuce memiliki mekanisme percobaan ulang bawaan berdasarkan strategi backoff eksponensial yang dijelaskan dalam postingan [Backoff Eksponensial dan Jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/). Berikut ini adalah kutipan kode yang menunjukkan pendekatan jitter penuh:

```
public static void main(String[] args)
{
	ClientResources resources = null;
	RedisClient client = null;

	try {
		resources = DefaultClientResources.builder()
				.reconnectDelay(Delay.fullJitter(
			Duration.ofMillis(100),     // minimum 100 millisecond delay
			Duration.ofSeconds(5),      // maximum 5 second delay
			100, TimeUnit.MILLISECONDS) // 100 millisecond base
		).build();

		client = RedisClient.create(resources, RedisURI.create(HOST, PORT));
		client.setOptions(ClientOptions.builder()
	.socketOptions(SocketOptions.builder().connectTimeout(Duration.ofMillis(100)).build()) // 100 millisecond connection timeout
	.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(5)).build()) // 5 second command timeout
	.build());

	    // use the connection pool from above example
	} finally {
		if (connection != null) {
			connection.close();
		}

		if (client != null){
			client.shutdown();
		}

		if (resources != null){
			resources.shutdown();
		}

	}
}
```

# Konfigurasikan batas waktu sisi klien (Valkey dan Redis OSS)
<a name="BestPractices.Clients.Redis.ClientTimeout"></a>

**Mengkonfigurasi batas waktu sisi klien**

Konfigurasikan waktu habis sisi klien dengan tepat agar server memiliki cukup waktu untuk memproses permintaan dan menghasilkan respons. Hal ini juga membantunya melakukan gagal cepat (fail fast) jika koneksi ke server tidak dapat dibuat. Perintah Valkey atau Redis OSS tertentu bisa lebih mahal secara komputasi daripada yang lain. Misalnya, skrip Lua atau MULTI/EXEC transaksi yang berisi beberapa perintah yang harus dijalankan secara atom. Secara umum, waktu habis sisi klien yang lebih tinggi disarankan untuk menghindari waktu habis klien sebelum respons diterima dari server, termasuk yang berikut:
+ Menjalankan perintah di beberapa kunci
+ Menjalankan MULTI/EXEC transaksi atau skrip Lua yang terdiri dari beberapa perintah Valkey atau Redis OSS individual
+ Membaca nilai besar
+ Melakukan operasi pemblokiran seperti BLPOP

Dalam kasus operasi pemblokiran seperti BLPOP, praktik terbaiknya adalah dengan mengatur waktu habis perintah ke jumlah yang lebih rendah dari waktu habis soket.

Berikut ini adalah contoh kode untuk menerapkan batas waktu sisi klien di redis-py,, dan Lettuce. PHPRedis

**Contoh konfigurasi waktu habis 1: redis-py**

Berikut ini adalah contoh kode dengan redis-py:

```
# connect to Redis server with a 100 millisecond timeout
# give every Redis command a 2 second timeout
client = redis.Redis(connection_pool=redis.BlockingConnectionPool(host=HOST, max_connections=10,socket_connect_timeout=0.1,socket_timeout=2))

res = client.set("key", "value") # will timeout after 2 seconds
print(res)                       # if there is a connection error

res = client.blpop("list", timeout=1) # will timeout after 1 second
                                      # less than the 2 second socket timeout
print(res)
```

**Contoh konfigurasi batas waktu 2: PHPRedis**

Berikut ini adalah contoh kode dengan PHPRedis:

```
// connect to Redis server with a 100ms timeout
// give every Redis command a 2s timeout
$client = new Redis();
$timeout = 0.1; // 100 millisecond connection timeout
$retry_interval = 100; // 100 millisecond retry interval
$client = new Redis();
if($client->pconnect($HOST, $PORT, 0.1, NULL, 100, $read_timeout=2) != TRUE){
	return; // ERROR: connection failed
}
$client->set($key, $value);

$res = $client->set("key", "value"); // will timeout after 2 seconds
print "$res\n";                      // if there is a connection error

$res = $client->blpop("list", 1); // will timeout after 1 second
print "$res\n";                   // less than the 2 second socket timeout
```

**Contoh konfigurasi waktu habis 3: Lettuce**

Berikut ini adalah contoh kode dengan Lettuce:

```
// connect to Redis server and give every command a 2 second timeout
public static void main(String[] args)
{
	RedisClient client = null;
	StatefulRedisConnection<String, String> connection = null;
	try {
		client = RedisClient.create(RedisURI.create(HOST, PORT));
		client.setOptions(ClientOptions.builder()
	.socketOptions(SocketOptions.builder().connectTimeout(Duration.ofMillis(100)).build()) // 100 millisecond connection timeout
	.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(2)).build()) // 2 second command timeout 
	.build());

		// use the connection pool from above example

		commands.set("key", "value"); // will timeout after 2 seconds
		commands.blpop(1, "list"); // BLPOP with 1 second timeout
	} finally {
		if (connection != null) {
			connection.close();
		}

		if (client != null){
			client.shutdown();
		}
	}
}
```

# Konfigurasikan batas waktu idle sisi server (Valkey dan Redis OSS)
<a name="BestPractices.Clients.Redis.ServerTimeout"></a>

Kami telah mengamati kasus ketika aplikasi pelanggan memiliki klien idle terhubung dalam jumlah yang tinggi, tetapi tidak secara aktif mengirim perintah. Dalam skenario tersebut, Anda dapat menghabiskan total 65.000 koneksi dengan jumlah klien idle yang tinggi. Untuk menghindari skenario tersebut, konfigurasikan pengaturan waktu habis dengan sesuai di server melalui [Parameter Valkey dan Redis OSS](ParameterGroups.Engine.md#ParameterGroups.Redis). Hal ini memastikan bahwa server secara aktif memutus klien idle untuk menghindari peningkatan jumlah koneksi. Pengaturan ini tidak tersedia di cache nirserver.

# Skrip Lua
<a name="BestPractices.Clients.Redis.LuaScripts"></a>

Valkey dan Redis OSS mendukung lebih dari 200 perintah, termasuk yang menjalankan skrip Lua. Namun, ketika datang ke skrip Lua, ada beberapa jebakan yang dapat mempengaruhi memori dan ketersediaan Valkey atau Redis OSS.

**Skrip Lua yang tidak diparameterisasi**

Setiap skrip Lua di-cache di server Valkey atau Redis OSS sebelum berjalan. Skrip Lua yang tidak diparameterisasi adalah unik, yang dapat menyebabkan server Valkey atau Redis OSS menyimpan sejumlah besar skrip Lua dan menghabiskan lebih banyak memori. Untuk memitigasi hal ini, pastikan bahwa semua skrip Lua diparameterisasi dan secara teratur melakukan SCRIPT FLUSH untuk membersihkan skrip Lua yang di-cache jika diperlukan.

Ketahuilah juga bahwa kunci harus disediakan. Jika nilai untuk parameter KEY tidak disediakan, skrip akan gagal. Misalnya, ini tidak akan berhasil: 

```
serverless-test-lst4hg.serverless.use1.cache.amazonaws.com:6379> eval 'return "Hello World"' 0
(error) ERR Lua scripts without any input keys are not supported.
```

Ini akan berhasil:

```
serverless-test-lst4hg.serverless.use1.cache.amazonaws.com:6379> eval 'return redis.call("get", KEYS[1])' 1 mykey-2
"myvalue-2"
```

Contoh berikut menunjukkan cara menggunakan skrip yang diparameterisasi. Pertama, kita memiliki contoh pendekatan tanpa parameterisasi yang menghasilkan tiga skrip Lua yang di-cache yang berbeda dan tidak disarankan:

```
eval "return redis.call('set','key1','1')" 0
eval "return redis.call('set','key2','2')" 0
eval "return redis.call('set','key3','3')" 0
```

Sebagai gantinya, gunakan pola berikut untuk membuat skrip tunggal yang dapat menerima parameter yang diteruskan:

```
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 key1 1 
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 key2 2 
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 key3 3
```

**Skrip Lua yang berjalan lama**

Skrip Lua dapat menjalankan beberapa perintah secara atom, sehingga bisa memakan waktu lebih lama untuk diselesaikan daripada perintah Valkey atau Redis OSS biasa. Jika skrip Lua hanya menjalankan operasi hanya-baca, Anda dapat menghentikannya di tengah proses. Namun, segera setelah skrip Lua melakukan operasi penulisan, prosesnya tidak dapat dihentikan dan harus dijalankan hingga selesai. Skrip Lua yang berjalan lama yang bermutasi dapat menyebabkan server Valkey atau Redis OSS tidak responsif untuk waktu yang lama. Untuk memitigasi masalah ini, hindari skrip Lua yang berjalan lama dan uji skrip di lingkungan praproduksi.

**Skrip Lua dengan penulisan stealth**

Ada beberapa cara skrip Lua dapat terus menulis data baru ke Valkey atau Redis OSS bahkan ketika Valkey atau Redis OSS selesai: `maxmemory`
+ Script dimulai ketika server Valkey atau Redis OSS di bawah ini`maxmemory`, dan berisi beberapa operasi tulis di dalamnya
+ Perintah tulis pertama skrip tidak menghabiskan memori (seperti DEL), yang diikuti oleh lebih banyak operasi tulis yang menghabiskan memori
+ Anda dapat mengurangi masalah ini dengan mengonfigurasi kebijakan penggusuran yang tepat di server Valkey atau Redis OSS selain. `noeviction` Hal ini memungkinkan Redis OSS untuk mengusir item dan membebaskan memori di antara skrip Lua.

# Menyimpan item komposit besar (Valkey dan Redis OSS)
<a name="BestPractices.Clients.Redis.LargeItems"></a>

Dalam beberapa skenario, aplikasi dapat menyimpan item komposit besar di Valkey atau Redis OSS (seperti dataset hash multi-GB). Ini bukan praktik yang direkomendasikan karena sering menyebabkan masalah kinerja di Valkey atau Redis OSS. Misalnya, klien dapat melakukan perintah HGETALL untuk mengambil seluruh koleksi hash multi GB. Hal ini dapat menghasilkan tekanan memori yang signifikan ke server Valkey atau Redis OSS buffering item besar dalam buffer output klien. Selain itu, untuk migrasi slot dalam mode cluster, ElastiCache tidak memigrasikan slot yang berisi item dengan ukuran serial yang lebih besar dari 256 MB.

Untuk mengatasi masalah item besar, kami memiliki rekomendasi berikut:
+ Pecah item komposit besar menjadi beberapa item yang lebih kecil. Misalnya, pisahkan koleksi hash besar menjadi bidang nilai kunci individual dengan skema nama kunci yang mencerminkan koleksi dengan tepat, seperti menggunakan awalan umum dalam nama kunci untuk mengidentifikasi kumpulan item. Jika Anda harus mengakses beberapa bidang dalam koleksi yang sama secara atomik, Anda dapat menggunakan perintah MGET untuk mengambil beberapa nilai kunci dalam perintah yang sama.
+ Jika Anda mengevaluasi semua opsi dan masih tidak dapat memecah set data koleksi besar, coba gunakan perintah yang beroperasi di subset data dalam koleksi, bukan seluruh koleksi. Hindari kasus penggunaan yang mengharuskan Anda mengambil seluruh koleksi multi-GB secara atomik dalam perintah yang sama. Salah satu contohnya adalah menggunakan perintah HGET atau HMGET, bukan HGETALL di koleksi hash.

# Konfigurasi klien selada (Valkey dan Redis OSS)
<a name="BestPractices.Clients-lettuce"></a>

Bagian ini menjelaskan opsi konfigurasi Java dan Lettuce yang direkomendasikan, dan bagaimana penerapannya pada ElastiCache cluster.

Rekomendasi di bagian ini diuji dengan Lettuce versi 6.2.2.

**Topics**
+ [Contoh: Konfigurasi selada untuk mode cluster, TLS diaktifkan](BestPractices.Clients-lettuce-cme.md)
+ [Contoh: Konfigurasi selada untuk mode cluster dinonaktifkan, TLS diaktifkan](BestPractices.Clients-lettuce-cmd.md)

**TTL cache DNS Java**

Mesin virtual Java (JVM) menyimpan cache pencarian nama DNS. Ketika JVM menyelesaikan nama host ke alamat IP, itu menyimpan alamat IP untuk jangka waktu tertentu, yang dikenal sebagai (TTL). *time-to-live*

Pilihan nilai TTL merupakan kompromi antara latensi dan responsivitas terhadap perubahan. Dengan lebih pendek TTLs, penyelesai DNS melihat pembaruan di DNS cluster lebih cepat. Hal ini dapat membuat aplikasi Anda lebih cepat dalam merespons penggantian atau alur kerja lain yang dialami klaster Anda. Namun, jika TTL terlalu rendah, hal ini meningkatkan volume kueri, yang dapat meningkatkan latensi aplikasi Anda. Meskipun tidak ada nilai TTL yang benar, sebaiknya pertimbangkan lama waktu yang sesuai bagi Anda untuk menunggu perubahan berlaku saat menetapkan nilai TTL Anda.

Karena ElastiCache node menggunakan entri nama DNS yang mungkin berubah, kami sarankan Anda mengonfigurasi JVM Anda dengan TTL rendah 5 hingga 10 detik. Hal ini dapat memastikan bahwa ketika alamat IP simpul berubah, aplikasi Anda dapat menerima dan menggunakan alamat IP baru sumber daya dengan mengueri ulang DNS.

Pada beberapa konfigurasi Java, TTL default JVM diatur untuk tidak pernah menyegarkan entri DNS hingga JVM dimulai ulang.

Untuk detail tentang cara mengatur TTL JVM Anda, lihat [How to set the JVM TTL](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html#how-to-set-the-jvm-ttl).

**Versi Lettuce**

Sebaiknya gunakan Lettuce versi 6.2.2 atau versi yang lebih baru.

**Titik akhir**

Saat Anda menggunakan klaster dengan mode klaster diaktifkan, atur `redisUri` ke titik akhir konfigurasi klaster. Pencarian DNS untuk URI ini menampilkan daftar semua simpul yang tersedia di klaster, dan diresolusi secara acak ke salah satu simpul selama inisialisasi klaster. Untuk detail selengkapnya tentang cara kerja penyegaran topologi, lihat *dynamicRefreshResources*nanti dalam topik ini.

**SocketOption**

Aktifkan [KeepAlive](https://lettuce.io/core/release/api/io/lettuce/core/SocketOptions.KeepAliveOptions.html). Mengaktifkan opsi ini akan mengurangi kebutuhan untuk menangani koneksi yang gagal selama runtime perintah.

Pastikan Anda menetapkan [Waktu habis koneksi](https://lettuce.io/core/release/api/io/lettuce/core/SocketOptions.Builder.html#connectTimeout-java.time.Duration-) berdasarkan persyaratan aplikasi dan beban kerja Anda. Untuk informasi selengkapnya, lihat bagian Waktu habis dalam topik ini.

**ClusterClientOption: Mode Cluster Opsi klien yang diaktifkan**

Aktifkan [AutoReconnect](https://lettuce.io/core/release/api/io/lettuce/core/cluster/ClusterClientOptions.Builder.html#autoReconnect-boolean-)saat koneksi terputus.

Atur [CommandTimeout](https://lettuce.io/core/release/api/io/lettuPrce/core/RedisURI.html#getTimeout--). Untuk detail selengkapnya, lihat bagian Waktu habis dalam topik ini.

Atur [nodeFilter](https://lettuce.io/core/release/api/io/lettuce/core/cluster/ClusterClientOptions.Builder.html#nodeFilter-java.util.function.Predicate-) untuk memfilter simpul yang gagal dari topologi. Lettuce menyimpan semua simpul yang ditemukan di output 'simpul klaster' (termasuk simpul dengan status PFAIL/FAIL) di 'partisi' klien (juga dikenal sebagai serpihan). Selama proses pembuatan topologi klaster, Lettuce mencoba terhubung ke semua simpul partisi. Perilaku Lettuce yang menambahkan simpul yang gagal ini dapat menyebabkan kesalahan koneksi (atau peringatan) ketika simpul diganti karena alasan apa pun. 

Misalnya, setelah failover selesai dan klaster memulai proses pemulihan, sementara clusterTopology sedang diperbarui, peta simpul bus klaster memiliki periode waktu singkat ketika simpul yang nonaktif dicantumkan sebagai simpul FAIL, sebelum sepenuhnya dihapus dari topologi. Selama periode ini, klien Selada menganggapnya sebagai simpul yang sehat dan terus terhubung dengannya. Hal ini menyebabkan kegagalan setelah percobaan kembali habis. 

Misalnya:

```
final ClusterClientOptions clusterClientOptions = 
    ClusterClientOptions.builder()
    ... // other options
    .nodeFilter(it -> 
        ! (it.is(RedisClusterNode.NodeFlag.FAIL) 
        || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) 
        || it.is(RedisClusterNode.NodeFlag.HANDSHAKE)
        || it.is(RedisClusterNode.NodeFlag.NOADDR)))
    .validateClusterNodeMembership(false)
    .build();
redisClusterClient.setOptions(clusterClientOptions);
```

**catatan**  
Pemfilteran node paling baik digunakan dengan DynamicRefreshSources set ke true. Sebaliknya, jika tampilan topologi diambil dari satu simpul seed yang bermasalah, yang melihat simpul primer dari serpihan tertentu sebagai simpul yang mengalami kegagalan, pemfilteran tersebut akan memfilter simpul primer ini, sehingga akan mengakibatkan slot tidak tercakup. Memiliki beberapa node benih (kapan DynamicRefreshSources benar) mengurangi kemungkinan masalah ini, karena setidaknya beberapa node benih harus memiliki tampilan topologi yang diperbarui setelah failover dengan primer yang baru dipromosikan.

**ClusterTopologyRefreshOptions: Opsi untuk mengontrol penyegaran topologi cluster dari klien Cluster Mode Enabled**

**catatan**  
Klaster dengan mode klaster dinonaktifkan tidak mendukung perintah penemuan klaster dan tidak kompatibel dengan semua fungsionalitas penemuan topologi dinamis klien.  
Mode cluster dinonaktifkan dengan ElastiCache tidak kompatibel dengan Lettuce. `MasterSlaveTopologyRefresh` Sebagai gantinya, untuk mode klaster dinonaktifkan, Anda dapat mengonfigurasi `StaticMasterReplicaTopologyProvider` dan menyediakan titik akhir baca dan tulis klaster.  
Untuk informasi selengkapnya tentang menghubungkan ke klaster dengan mode klaster dinonaktifkan, lihat [Menemukan Titik Akhir Cluster Valkey atau Redis OSS (Mode Cluster Dinonaktifkan) (Konsol)](Endpoints.md#Endpoints.Find.Redis).  
Jika Anda ingin menggunakan fungsionalitas penemuan topologi dinamis Lettuce, Anda dapat membuat klaster dengan mode klaster diaktifkan dengan konfigurasi serpihan yang sama dengan klaster yang ada. Namun, untuk klaster dengan mode klaster diaktifkan, sebaiknya konfigurasi setidaknya 3 serpihan dengan minimal 1 replika untuk mendukung failover cepat.

Aktifkan [enablePeriodicRefresh](https://lettuce.io/core/release/api/io/lettuce/core/cluster/ClusterTopologyRefreshOptions.Builder.html#enablePeriodicRefresh-java.time.Duration-). Opsi ini memungkinkan pembaruan topologi klaster berkala sehingga klien memperbarui topologi klaster dalam interval refreshPeriod (default: 60 detik). Ketika opsi ini dinonaktifkan, klien memperbarui topologi klaster hanya ketika kesalahan terjadi ketika klien tersebut mencoba menjalankan perintah terhadap klaster. 

Dengan mengaktifkan opsi ini, Anda dapat mengurangi latensi yang terkait dengan penyegaran topologi klaster dengan menambahkan pekerjaan ini ke tugas latar belakang. Saat penyegaran topologi dilakukan dalam pekerjaan latar belakang, prosesnya bisa agak lambat untuk klaster yang memiliki banyak simpul. Hal ini karena semua simpul sedang dikueri untuk mendapatkan tampilan klaster yang paling terbaru. Jika Anda menjalankan klaster dalam jumlah besar, Anda sebaiknya menambah periodenya.

Aktifkan [enableAllAdaptiveRefreshTriggers](https://lettuce.io/core/release/api/io/lettuce/core/cluster/ClusterTopologyRefreshOptions.Builder.html#enableAllAdaptiveRefreshTriggers--). Hal ini memungkinkan penyegaran topologi adaptif yang menggunakan semua [pemicu](https://lettuce.io/core/6.1.6.RELEASE/api/io/lettuce/core/cluster/ClusterTopologyRefreshOptions.RefreshTrigger.html): MOVED\$1REDIRECT, ASK\$1REDIRECT, PERSISTENT\$1RECONNECTS, UNCOVERED\$1SLOT, UNKNOWN\$1NODE. Pemicu penyegaran adaptif memulai pembaruan tampilan topologi berdasarkan peristiwa yang terjadi selama operasi klaster Valkey atau Redis OSS. Mengaktifkan opsi ini akan membuat penyegaran topologi langsung dilakukan ketika salah satu pemicu sebelumnya terjadi. Penyegaran yang dipicu secara adaptif dibatasi lajunya menggunakan waktu habis karena peristiwa dapat terjadi dalam skala besar (waktu habis default di antara pembaruan: 30).

Aktifkan [closeStaleConnections](https://lettuce.io/core/release/api/io/lettuce/core/cluster/ClusterTopologyRefreshOptions.Builder.html#closeStaleConnections-boolean-). Hal ini memungkinkan penutupan koneksi yang sudah tidak berlaku saat menyegarkan topologi klaster. Itu hanya berlaku jika [ClusterTopologyRefreshOptions. isPeriodicRefreshEnabled ()](https://lettuce.io/core/release/api/io/lettuce/core/cluster/ClusterTopologyRefreshOptions.html#isPeriodicRefreshEnabled--) benar. Ketika diaktifkan, klien dapat menutup koneksi yang tidak berlaku dan membuat yang baru di latar belakang. Hal ini mengurangi kebutuhan untuk menangani koneksi yang gagal selama runtime perintah.

Aktifkan [dynamicRefreshResources](https://lettuce.io/core/release/api/io/lettuce/core/cluster/ClusterTopologyRefreshOptions.Builder.html#dynamicRefreshSources-boolean-). Kami merekomendasikan dynamicRefreshResources untuk mengaktifkan cluster kecil, dan menonaktifkannya untuk cluster besar. dynamicRefreshResourcesmemungkinkan menemukan node cluster dari node benih yang disediakan (misalnya, titik akhir konfigurasi cluster). Ia menggunakan semua simpul yang ditemukan sebagai sumber untuk menyegarkan topologi klaster. 

Menggunakan dynamic refresh akan mengueri semua simpul yang ditemukan untuk topologi klaster dan mencoba memilih tampilan klaster yang paling akurat. Jika diatur ke false, hanya simpul seed awal yang digunakan sebagai sumber untuk penemuan topologi, dan jumlah klien diperoleh hanya untuk simpul seed awal. Saat dinonaktifkan, jika titik akhir konfigurasi klaster diresolusi ke simpul yang gagal, percobaan untuk menyegarkan tampilan klaster akan gagal dan menyebabkan pengecualian. Skenario ini dapat terjadi karena dibutuhkan beberapa waktu hingga entri simpul yang gagal dihapus dari titik akhir konfigurasi klaster. Oleh karena itu, titik akhir konfigurasi masih dapat diresolusi secara acak ke simpul yang gagal untuk waktu yang singkat. 

Namun, ketika diaktifkan, kita menggunakan semua simpul klaster yang diterima dari tampilan klaster untuk mengueri tampilannya saat ini. Karena kita memfilter simpul yang gagal dari tampilan tersebut, penyegaran topologi akan berhasil. Namun, kapan dynamicRefreshSources benar, Lettuce menanyakan semua node untuk mendapatkan tampilan cluster, dan kemudian membandingkan hasilnya. Jadi, klaster dengan banyak simpul bisa menghabiskan banyak daya komputasi. Sebaiknya nonaktifkan fitur ini untuk klaster dengan banyak simpul. 

```
final ClusterTopologyRefreshOptions topologyOptions = 
    ClusterTopologyRefreshOptions.builder()
    .enableAllAdaptiveRefreshTriggers()
    .enablePeriodicRefresh()
    .dynamicRefreshSources(true)
    .build();
```

**ClientResources**

Konfigurasikan [DnsResolver](https://lettuce.io/core/release/api/io/lettuce/core/resource/DefaultClientResources.Builder.html#dnsResolver-io.lettuce.core.resource.DnsResolver-)dengan [DirContextDnsResolver](https://lettuce.io/core/release/api/io/lettuce/core/resource/DirContextDnsResolver.html). DNS resolver didasarkan pada com.sun.jndi.dns Java. DnsContextFactory.

Konfigurasikan [reconnectDelay](https://lettuce.io/core/release/api/io/lettuce/core/resource/DefaultClientResources.Builder.html#reconnectDelay-io.lettuce.core.resource.Delay-) dengan backoff eksponensial dan jitter penuh. Lettuce memiliki mekanisme percobaan ulang bawaan berdasarkan strategi backoff eksponensial. Untuk detailnya, lihat [Exponential Backoff dan Jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter) di Blog Arsitektur. AWS Untuk informasi lebih lanjut tentang pentingnya memiliki strategi backoff coba lagi, lihat bagian logika backoff dari [posting blog Praktik Terbaik di Blog](https://aws.amazon.com/blogs/database/best-practices-redis-clients-and-amazon-elasticache-for-redis/) Database. AWS 

```
ClientResources clientResources = DefaultClientResources.builder()
   .dnsResolver(new DirContextDnsResolver())
    .reconnectDelay(
        Delay.fullJitter(
            Duration.ofMillis(100),     // minimum 100 millisecond delay
            Duration.ofSeconds(10),      // maximum 10 second delay
            100, TimeUnit.MILLISECONDS)) // 100 millisecond base
    .build();
```

**Batas Waktu**

Gunakan nilai waktu habis koneksi yang lebih rendah daripada waktu habis perintah Anda. Lettuce menggunakan pembuatan lazy connection. Jadi, jika waktu habis koneksi lebih tinggi dari waktu habis perintah, Anda dapat mengalami periode kegagalan persisten setelah penyegaran topologi jika Lettuce mencoba terhubung ke simpul yang tidak berkondisi baik dan waktu habis perintah selalu terlampaui. 

Gunakan waktu habis perintah dinamis untuk perintah yang berbeda-beda. Sebaiknya tetapkan waktu habis perintah berdasarkan durasi perintah yang diharapkan. Misalnya, gunakan waktu habis yang lebih lama untuk perintah yang mengulangi beberapa kunci, seperti skrip FLUSHDB, FLUSHALL, KEYS, SMEMBERS, atau Lua. Gunakan waktu habis yang lebih pendek untuk perintah kunci tunggal, seperti SET, GET, dan HSET.

**catatan**  
Waktu habis yang dikonfigurasi dalam contoh berikut adalah untuk pengujian yang menjalankan perintah SET/GET dengan kunci dan nilai hingga 20 byte. Waktu pemrosesan bisa lebih lama jika perintahnya kompleks atau kunci dan nilainya lebih besar. Anda harus mengatur waktu habis berdasarkan kasus penggunaan aplikasi Anda. 

```
private static final Duration META_COMMAND_TIMEOUT = Duration.ofMillis(1000);
private static final Duration DEFAULT_COMMAND_TIMEOUT = Duration.ofMillis(250);
// Socket connect timeout should be lower than command timeout for Lettuce
private static final Duration CONNECT_TIMEOUT = Duration.ofMillis(100);
    
SocketOptions socketOptions = SocketOptions.builder()
    .connectTimeout(CONNECT_TIMEOUT)
    .build();
 

class DynamicClusterTimeout extends TimeoutSource {
     private static final Set<ProtocolKeyword> META_COMMAND_TYPES = ImmutableSet.<ProtocolKeyword>builder()
          .add(CommandType.FLUSHDB)
          .add(CommandType.FLUSHALL)
          .add(CommandType.CLUSTER)
          .add(CommandType.INFO)
          .add(CommandType.KEYS)
          .build();

    private final Duration defaultCommandTimeout;
    private final Duration metaCommandTimeout;

    DynamicClusterTimeout(Duration defaultTimeout, Duration metaTimeout)
    {
        defaultCommandTimeout = defaultTimeout;
        metaCommandTimeout = metaTimeout;
    }

    @Override
    public long getTimeout(RedisCommand<?, ?, ?> command) {
        if (META_COMMAND_TYPES.contains(command.getType())) {
            return metaCommandTimeout.toMillis();
        }
        return defaultCommandTimeout.toMillis();
    }
}

// Use a dynamic timeout for commands, to avoid timeouts during
// cluster management and slow operations.
TimeoutOptions timeoutOptions = TimeoutOptions.builder()
.timeoutSource(
    new DynamicClusterTimeout(DEFAULT_COMMAND_TIMEOUT, META_COMMAND_TIMEOUT))
.build();
```

# Contoh: Konfigurasi selada untuk mode cluster, TLS diaktifkan
<a name="BestPractices.Clients-lettuce-cme"></a>

**catatan**  
Timeout dalam contoh berikut adalah untuk tes yang menjalankan SET/GET perintah dengan kunci dan nilai hingga 20 byte. Waktu pemrosesan bisa lebih lama jika perintahnya kompleks atau kunci dan nilainya lebih besar. Anda harus mengatur waktu habis berdasarkan kasus penggunaan aplikasi Anda. 

```
// Set DNS cache TTL
public void setJVMProperties() {
    java.security.Security.setProperty("networkaddress.cache.ttl", "10");
}

private static final Duration META_COMMAND_TIMEOUT = Duration.ofMillis(1000);
private static final Duration DEFAULT_COMMAND_TIMEOUT = Duration.ofMillis(250);
// Socket connect timeout should be lower than command timeout for Lettuce
private static final Duration CONNECT_TIMEOUT = Duration.ofMillis(100);

// Create RedisURI from the cluster configuration endpoint
clusterConfigurationEndpoint = <cluster-configuration-endpoint> // TODO: add your cluster configuration endpoint
final RedisURI redisUriCluster =
    RedisURI.Builder.redis(clusterConfigurationEndpoint)
        .withPort(6379)
        .withSsl(true)
        .build();

// Configure the client's resources                
ClientResources clientResources = DefaultClientResources.builder()
    .reconnectDelay(
        Delay.fullJitter(
            Duration.ofMillis(100),     // minimum 100 millisecond delay
            Duration.ofSeconds(10),      // maximum 10 second delay
            100, TimeUnit.MILLISECONDS)) // 100 millisecond base
    .dnsResolver(new DirContextDnsResolver())
    .build(); 

// Create a cluster client instance with the URI and resources
RedisClusterClient redisClusterClient = 
    RedisClusterClient.create(clientResources, redisUriCluster);

// Use a dynamic timeout for commands, to avoid timeouts during
// cluster management and slow operations.
class DynamicClusterTimeout extends TimeoutSource {
     private static final Set<ProtocolKeyword> META_COMMAND_TYPES = ImmutableSet.<ProtocolKeyword>builder()
          .add(CommandType.FLUSHDB)
          .add(CommandType.FLUSHALL)
          .add(CommandType.CLUSTER)
          .add(CommandType.INFO)
          .add(CommandType.KEYS)
          .build();

    private final Duration metaCommandTimeout;
    private final Duration defaultCommandTimeout;

    DynamicClusterTimeout(Duration defaultTimeout, Duration metaTimeout)
    {
        defaultCommandTimeout = defaultTimeout;
        metaCommandTimeout = metaTimeout;
    }

    @Override
    public long getTimeout(RedisCommand<?, ?, ?> command) {
        if (META_COMMAND_TYPES.contains(command.getType())) {
            return metaCommandTimeout.toMillis();
        }
        return defaultCommandTimeout.toMillis();
    }
}

TimeoutOptions timeoutOptions = TimeoutOptions.builder()
    .timeoutSource(new DynamicClusterTimeout(DEFAULT_COMMAND_TIMEOUT, META_COMMAND_TIMEOUT))
     .build();

// Configure the topology refreshment options
final ClusterTopologyRefreshOptions topologyOptions = 
    ClusterTopologyRefreshOptions.builder()
    .enableAllAdaptiveRefreshTriggers()
    .enablePeriodicRefresh()
    .dynamicRefreshSources(true)
    .build();

// Configure the socket options
final SocketOptions socketOptions = 
    SocketOptions.builder()
    .connectTimeout(CONNECT_TIMEOUT) 
    .keepAlive(true)
    .build();

// Configure the client's options
final ClusterClientOptions clusterClientOptions = 
    ClusterClientOptions.builder()
    .topologyRefreshOptions(topologyOptions)
    .socketOptions(socketOptions)
    .autoReconnect(true)
    .timeoutOptions(timeoutOptions) 
    .nodeFilter(it -> 
        ! (it.is(RedisClusterNode.NodeFlag.FAIL) 
        || it.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) 
        || it.is(RedisClusterNode.NodeFlag.NOADDR))) 
    .validateClusterNodeMembership(false)
    .build();
    
redisClusterClient.setOptions(clusterClientOptions);

// Get a connection
final StatefulRedisClusterConnection<String, String> connection = 
    redisClusterClient.connect();

// Get cluster sync/async commands   
RedisAdvancedClusterCommands<String, String> sync = connection.sync();
RedisAdvancedClusterAsyncCommands<String, String> async = connection.async();
```

# Contoh: Konfigurasi selada untuk mode cluster dinonaktifkan, TLS diaktifkan
<a name="BestPractices.Clients-lettuce-cmd"></a>

**catatan**  
Timeout dalam contoh berikut adalah untuk tes yang menjalankan SET/GET perintah dengan kunci dan nilai hingga 20 byte. Waktu pemrosesan bisa lebih lama jika perintahnya kompleks atau kunci dan nilainya lebih besar. Anda harus mengatur waktu habis berdasarkan kasus penggunaan aplikasi Anda. 

```
// Set DNS cache TTL
public void setJVMProperties() {
    java.security.Security.setProperty("networkaddress.cache.ttl", "10");
}

private static final Duration META_COMMAND_TIMEOUT = Duration.ofMillis(1000);
private static final Duration DEFAULT_COMMAND_TIMEOUT = Duration.ofMillis(250);
// Socket connect timeout should be lower than command timeout for Lettuce
private static final Duration CONNECT_TIMEOUT = Duration.ofMillis(100);

// Create RedisURI from the primary/reader endpoint
clusterEndpoint = <primary/reader-endpoint> // TODO: add your node endpoint
RedisURI redisUriStandalone =
    RedisURI.Builder.redis(clusterEndpoint).withPort(6379).withSsl(true).withDatabase(0).build();

ClientResources clientResources =
    DefaultClientResources.builder()
        .dnsResolver(new DirContextDnsResolver())
        .reconnectDelay(
            Delay.fullJitter(
                Duration.ofMillis(100), // minimum 100 millisecond delay
                Duration.ofSeconds(10), // maximum 10 second delay
                100,
                TimeUnit.MILLISECONDS)) // 100 millisecond base
        .build();

// Use a dynamic timeout for commands, to avoid timeouts during
// slow operations.
class DynamicTimeout extends TimeoutSource {
     private static final Set<ProtocolKeyword> META_COMMAND_TYPES = ImmutableSet.<ProtocolKeyword>builder()
          .add(CommandType.FLUSHDB)
          .add(CommandType.FLUSHALL)
          .add(CommandType.INFO)
          .add(CommandType.KEYS)
          .build();

    private final Duration metaCommandTimeout;
    private final Duration defaultCommandTimeout;

    DynamicTimeout(Duration defaultTimeout, Duration metaTimeout)
    {
        defaultCommandTimeout = defaultTimeout;
        metaCommandTimeout = metaTimeout;
    }

    @Override
    public long getTimeout(RedisCommand<?, ?, ?> command) {
        if (META_COMMAND_TYPES.contains(command.getType())) {
            return metaCommandTimeout.toMillis();
        }
        return defaultCommandTimeout.toMillis();
    }
}

TimeoutOptions timeoutOptions = TimeoutOptions.builder()
    .timeoutSource(new DynamicTimeout(DEFAULT_COMMAND_TIMEOUT, META_COMMAND_TIMEOUT))
     .build();                      
                                    
final SocketOptions socketOptions =
    SocketOptions.builder().connectTimeout(CONNECT_TIMEOUT).keepAlive(true).build();

ClientOptions clientOptions =
    ClientOptions.builder().timeoutOptions(timeoutOptions).socketOptions(socketOptions).build();

RedisClient redisClient = RedisClient.create(clientResources, redisUriStandalone);
redisClient.setOptions(clientOptions);
```

## Mengkonfigurasi protokol pilihan untuk cluster tumpukan ganda (Valkey dan Redis OSS)
<a name="network-type-configuring-dual-stack-redis"></a>

Untuk mode cluster diaktifkan Valkey atau Redis OSS cluster, Anda dapat mengontrol klien protokol yang akan digunakan untuk terhubung ke node di cluster dengan parameter IP Discovery. Parameter IP Discovery dapat diatur ke salah satu IPv4 atau IPv6. 

Untuk cluster Valkey atau Redis OSS, parameter penemuan IP menetapkan protokol IP yang digunakan dalam [slot cluster (), pecahan cluster ()](https://valkey.io/commands/cluster-slots/)[, dan node [cluster](https://valkey.io/commands/cluster-nodes/) ()](https://valkey.io/commands/cluster-shards/) output. Perintah tersebut digunakan oleh klien untuk menemukan topologi klaster. Klien menggunakan perintah IPs in theses untuk terhubung ke node lain di cluster. 

Perubahan pada Penemuan IP tidak akan mengakibatkan waktu henti untuk klien yang terhubung. Namun, perubahan ini akan memakan waktu untuk disebarkan. Untuk menentukan kapan perubahan telah sepenuhnya disebarkan untuk Cluster Valkey atau Redis OSS, pantau output dari. `cluster slots` Setelah semua node dikembalikan oleh laporan perintah slot cluster IPs dengan protokol baru, perubahan telah selesai menyebar. 

Contoh dengan Redis-Py:

```
cluster = RedisCluster(host="xxxx", port=6379)
target_type = IPv6Address # Or IPv4Address if changing to IPv4

nodes = set()
while len(nodes) == 0 or not all((type(ip_address(host)) is target_type) for host in nodes):
    nodes = set()

   # This refreshes the cluster topology and will discovery any node updates.
   # Under the hood it calls cluster slots
    cluster.nodes_manager.initialize()
    for node in cluster.get_nodes():
        nodes.add(node.host)
    self.logger.info(nodes)

    time.sleep(1)
```

Contoh dengan Lettuce:

```
RedisClusterClient clusterClient = RedisClusterClient.create(RedisURI.create("xxxx", 6379));

Class targetProtocolType = Inet6Address.class; // Or Inet4Address.class if you're switching to IPv4

Set<String> nodes;
    
do {
   // Check for any changes in the cluster topology.
   // Under the hood this calls cluster slots
    clusterClient.refreshPartitions();
    Set<String> nodes = new HashSet<>();

    for (RedisClusterNode node : clusterClient.getPartitions().getPartitions()) {
        nodes.add(node.getUri().getHost());
    }

    Thread.sleep(1000);
} while (!nodes.stream().allMatch(node -> {
            try {
                return finalTargetProtocolType.isInstance(InetAddress.getByName(node));
            } catch (UnknownHostException ignored) {}
            return false;
}));
```