Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Amazon MQ untuk praktik terbaik RabbitMQ
Gunakan ini sebagai referensi untuk menemukan rekomendasi dengan cepat guna memaksimalkan performa dan meminimalkan biaya throughput saat bekerja dengan broker RabbitMQ di Amazon MQ.
penting
Saat ini, Amazon MQ tidak mendukung aliran
penting
Amazon MQ untuk RabbitMQ tidak mendukung nama pengguna “tamu”, dan akan menghapus akun tamu default saat Anda membuat broker baru. Amazon MQ juga akan secara berkala menghapus akun yang dibuat pelanggan yang disebut “tamu”.
Topik
Pilih jenis instans broker yang tepat untuk throughput terbaik
Throughput pesan dari jenis instans broker tergantung pada kasus penggunaan aplikasi Anda. Jenis instans broker yang lebih kecil seperti t3.micro
seharusnya hanya digunakan untuk menguji kinerja aplikasi. Menggunakan instans mikro ini sebelum menggunakan instans yang lebih besar dalam produksi dapat meningkatkan kinerja aplikasi dan membantu Anda menekan biaya pengembangan. Pada jenis instans m5.large
dan di atasnya, Anda dapat menggunakan penerapan klaster untuk ketersediaan tinggi dan daya tahan pesan. Jenis instans broker yang lebih besar dapat menangani tingkat produksi klien dan antrian, throughput tinggi, pesan dalam memori, dan pesan yang berlebihan. Untuk info lebih lanjut tentang memilih jenis instance yang benar, lihatAmazon MQ untuk pedoman ukuran RabbitMQ.
Gunakan beberapa saluran
Untuk menghindari churn koneksi, gunakan beberapa saluran melalui satu koneksi. Aplikasi harus menghindari rasio koneksi 1:1 ke saluran. Kami merekomendasikan menggunakan satu koneksi per proses, dan kemudian satu saluran per utas. Hindari penggunaan saluran yang berlebihan untuk mencegah kebocoran saluran.
Gunakan pesan persisten dan antrian yang tahan lama
Pesan persisten dapat membantu mencegah kehilangan data ketika broker lumpuh atau memulai ulang. Pesan persisten ditulis ke disk segera setelah pesan tiba. Tidak seperti antrean malas, pesan persisten di-cache dalam memori dan disk, kecuali lebih banyak memori diperlukan oleh broker. Dalam kasus ketika lebih banyak memori diperlukan, pesan dihapus dari memori oleh mekanisme broker RabbitMQ yang mengelola penyimpanan pesan ke disk, sering disebut sebagai lapisan persisten.
Untuk mengaktifkan persistensi pesan, Anda dapat menyatakan antrean sebagai durable
dan mengatur mode pengiriman pesan ke persistent
. Contoh berikut mendemonstrasikan penggunaan pustaka klien RabbitMQ Java
boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);
Setelah mengonfigurasi antrean sebagai tahan lama, Anda dapat mengirim pesan persisten ke antrean dengan mengatur MessageProperties
ke PERSISTENT_TEXT_PLAIN
seperti yang ditampilkan dalam contoh berikut.
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Jaga antrian pendek
Dalam penerapan cluster, antrian dengan sejumlah besar pesan dapat menyebabkan pemanfaatan sumber daya yang berlebihan. Ketika broker dimanfaatkan secara berlebihan, me-reboot Amazon MQ untuk broker RabbitMQ dapat menyebabkan penurunan kinerja lebih lanjut. Jika reboot, broker yang terlalu banyak digunakan mungkin menjadi tidak responsif di negara bagian. REBOOT_IN_PROGRESS
Selama jendela pemeliharaan, Amazon MQ melakukan semua pekerjaan pemeliharaan, satu simpul pada satu waktu, untuk memastikan bahwa broker tetap operasional. Akibatnya, antrian mungkin perlu disinkronkan karena setiap simpul melanjutkan operasi. Selama sinkronisasi, pesan yang perlu direplikasi ke cermin dimuat ke dalam memori dari volume Amazon Elastic Block Store (Amazon EBS) yang sesuai untuk diproses dalam batch. Memproses pesan dalam batch memungkinkan antrean menyinkronkan lebih cepat.
Jika antrean dibuat tetap pendek dan pesan berukuran kecil, antrean berhasil disinkronkan dan melanjutkan operasi seperti yang diharapkan. Namun, jika jumlah data dalam batch mendekati batas memori simpul, simpul memicu alarm memori tinggi, menjeda sinkronisasi antrean. Anda dapat mengonfirmasi penggunaan memori dengan membandingkan metrik node RabbitMemUsed dan RabbitMqMemLimit broker di CloudWatch. Sinkronisasi tidak dapat diselesaikan hingga pesan dikonsumsi atau dihapus, atau jumlah pesan dalam batch berkurang.
Jika sinkronisasi antrian dijeda untuk penerapan klaster, sebaiknya gunakan atau hapus pesan untuk menurunkan jumlah pesan dalam antrian. Setelah kedalaman antrian berkurang dan sinkronisasi antrian selesai, status broker akan berubah menjadi. RUNNING
Untuk menyelesaikan sinkronisasi antrian yang dijeda, Anda juga dapat menerapkan kebijakan untuk mengurangi ukuran batch sinkronisasi antrian.
Anda juga dapat menentukan kebijakan penghapusan otomatis dan TTL untuk secara proaktif mengurangi penggunaan sumber daya, serta menjaga NACKs dari konsumen seminimal mungkin. Requeueing pesan pada broker adalah CPU intensif sehingga jumlah yang tinggi NACKs dapat mempengaruhi kinerja broker.
Konfigurasikan konfirmasi penerbit dan pengakuan pengiriman konsumen
Proses konfirmasi pesan telah dikirim ke broker dikenal sebagai konfirmasi penerbit. Publisher mengonfirmasi membiarkan aplikasi Anda tahu kapan pesan telah disimpan dengan andal. Konfirmasi penerbit juga dapat membantu mengontrol tingkat pesan yang disimpan ke broker. Tanpa konfirmasi penerbit, tidak ada konfirmasi bahwa pesan diproses dengan sukses, dan broker Anda dapat menjatuhkan pesan yang tidak dapat diproses.
Demikian pula, ketika aplikasi klien mengirimkan konfirmasi pengiriman dan konsumsi pesan kembali ke broker, itu dikenal sebagai pengakuan pengiriman konsumen. Konfirmasi dan pengakuan sangat penting untuk memastikan keamanan data saat bekerja dengan broker RabbitMQ.
Pengakuan pengiriman konsumen biasanya dikonfigurasi pada aplikasi klien. Saat bekerja dengan AMQP 0-9-1, pengakuan dapat diaktifkan dengan mengonfigurasi metode. basic.consume
Klien AMQP 0-9-1 juga dapat mengonfigurasi konfirmasi penerbit dengan mengirimkan metode. confirm.select
Biasanya, pengakuan pengiriman diaktifkan di saluran. Misalnya, ketika bekerja dengan pustaka klien RabbitMQ Java, Anda dapat menggunakan Channel#basicAck
untuk menyiapkan yang pengakuan positif basic.ack
sederhana seperti yang ditampilkan dalam contoh berikut.
// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
catatan
Pesan yang tidak diakui harus di-cache dalam memori. Anda dapat membatasi jumlah pesan yang diambil sebelumnya oleh konsumen dengan mengonfigurasi pengaturan pra-pengambilan untuk aplikasi klien.
Anda dapat mengonfigurasi consumer_timeout
untuk mendeteksi ketika konsumen tidak mengakui pengiriman. Jika konsumen tidak mengirimkan pengakuan dalam nilai batas waktu, saluran akan ditutup, dan Anda akan menerima a. PRECONDITION_FAILED
Untuk mendiagnosis kesalahan, gunakan UpdateConfigurationAPI untuk meningkatkan consumer_timeout
nilai.
Konfigurasikan pra-pengambilan
Anda dapat menggunakan nilai pra-pengambilan RabbitMQ untuk mengoptimalkan cara konsumen mengonsumsi pesan. RabbitMQ mengimplementasikan mekanisme pra-pengambilan saluran yang disediakan oleh AMQP 0-9-1 dengan menerapkan jumlah pra-pengambilan untuk konsumen yang bertentangan dengan saluran. Nilai pra-pengambilan digunakan untuk menentukan jumlah pesan yang dikirim ke konsumen pada waktu tertentu. Secara default, RabbitMQ menetapkan ukuran buffer yang tidak terbatas untuk aplikasi klien.
Ada berbagai faktor yang perlu dipertimbangkan saat menetapkan jumlah pra-pengambilan untuk konsumen RabbitMQ. Pertama, pertimbangkan lingkungan dan konfigurasi konsumen Anda. Karena konsumen perlu menyimpan semua pesan dalam memori saat pesan sedang diproses, nilai pra-pengambilan yang tinggi dapat memiliki dampak negatif pada performa konsumen, dan di beberapa kasus, membuat konsumen berpotensi merusak semuanya. Demikian pula, broker RabbitMQ sendiri menyimpan semua pesan yang dikirimkannya dalam cache dalam memori sampai menerima pengakuan konsumen. Nilai pra-pengambilan yang tinggi dapat menyebabkan server RabbitMQ Anda kehabisan memori dengan cepat jika pengakuan otomatis tidak dikonfigurasi untuk konsumen, dan jika konsumen mengambil waktu yang relatif lama untuk memproses pesan.
Dengan pertimbangan di atas, kami rekomendasikan Anda untuk selalu menetapkan nilai pra-pengambilan agar terhindar dari situasi ketika broker RabbitMQ atau konsumen kehabisan memori karena sejumlah besar pesan yang belum diproses, atau tidak diakui. Jika perlu mengoptimalkan broker untuk memproses pesan dalam volume besar, Anda dapat menguji broker dan konsumen menggunakan berbagai jumlah pra-pengambilan untuk menentukan nilai titik ketika overhead jaringan menjadi sangat tidak signifikan dibandingkan dengan waktu yang dibutuhkan konsumen untuk memproses pesan.
catatan
Jika aplikasi klien Anda telah dikonfigurasi untuk secara otomatis mengakui pengiriman pesan ke konsumen, menetapkan nilai pra-pengambilan tidak akan berpengaruh.
Semua pesan pra-pengambilan dihapus dari antrean.
Contoh berikut mendemonstrasikan cara menentukan nilai pra-pengambilan 10
untuk konsumen tunggal menggunakan pustaka klien RabbitMQ Java.
ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
catatan
Dalam pustaka klien RabbitMQ Java, nilai default untuk bendera global
diatur ke false
, sehingga contoh di atas dapat ditulis hanya sebagai channel.basicQos(10)
.
Gunakan Seledri 5.5 atau lebih baru dengan antrian kuorum
Python Celery
Untuk semua versi Seledri
-
Matikan
task_create_missing_queues
untuk mengurangi churn antrian. -
Kemudian, matikan
worker_enable_remote_control
untuk menghentikan pembuatancelery@...pidbox
antrian dinamis. Ini akan mengurangi churn antrian pada broker.worker_enable_remote_control = false
-
Untuk lebih mengurangi aktivitas pesan non-kritis, matikan Seledri worker-send-task-events
dengan tidak menyertakan -E
atau--task-events
menandai saat memulai aplikasi Seledri Anda. -
Mulai aplikasi Seledri Anda menggunakan parameter berikut:
celery -A app_name worker --without-heartbeat --without-gossip --without-mingle
Untuk Seledri versi 5.5 dan di atas
-
Tingkatkan ke Celery versi 5.5
, versi minimum yang mendukung antrian kuorum, atau versi yang lebih baru. Untuk memeriksa versi Seledri apa yang Anda gunakan, gunakan celery --version
. Untuk informasi lebih lanjut tentang antrian kuorum, lihat. Antrian kuorum untuk RabbitMQ di Amazon MQ -
Setelah memutakhirkan ke Celery 5.5 atau yang lebih baru, konfigurasikan
task_default_queue_type
ke “kuorum”. -
Kemudian, Anda juga harus mengaktifkan Publish Confirmation in Broker Transport Options
: broker_transport_options = {"confirm_publish": True}
Secara otomatis pulih dari kegagalan jaringan
Kami merekomendasikan untuk selalu mengaktifkan pemulihan jaringan otomatis guna mencegah waktu henti yang signifikan ketika koneksi klien ke node RabbitMQ gagal. Pustaka klien RabbitMQ Java mendukung pemulihan jaringan otomatis secara default, dimulai dari versi 4.0.0
.
Dalam kasus ketika koneksi awal antara klien dan node RabbitMQ gagal, pemulihan otomatis tidak akan dipicu. Kami merekomendasikan Anda menulis kode aplikasi untuk memperhitungkan kegagalan koneksi awal dengan mencoba ulang koneksi. Contoh berikut mendemonstrasikan percobaan ulang kegagalan jaringan awal menggunakan pustaka klien RabbitMQ Java.
ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
catatan
Jika aplikasi menutup koneksi menggunakan metode Connection.Close
, pemulihan jaringan otomatis tidak akan diaktifkan atau dipicu.
Simpan ukuran pesan di bawah 1 MB
Sebaiknya simpan pesan di bawah 1 Megabyte (MB) untuk kinerja dan keandalan yang optimal.
RabbitMQ 3.13 mendukung ukuran pesan hingga 128 MB secara default, tetapi pesan besar dapat memicu alarm memori yang tidak terduga yang memblokir penerbitan dan berpotensi menciptakan tekanan memori tinggi saat mereplikasi pesan di seluruh node. Pesan yang terlalu besar juga dapat memengaruhi proses restart dan pemulihan broker, yang meningkatkan risiko terhadap kontinuitas layanan dan dapat menyebabkan penurunan kinerja.
Simpan dan ambil muatan besar menggunakan pola pemeriksaan klaim
Untuk mengelola pesan besar, Anda dapat menerapkan pola pemeriksaan klaim dengan menyimpan muatan pesan di penyimpanan eksternal dan hanya mengirim pengidentifikasi referensi payload melalui RabbitMQ. Konsumen menggunakan pengidentifikasi referensi payload untuk mengambil dan memproses pesan besar.
Diagram berikut menunjukkan cara menggunakan Amazon MQ untuk RabbitMQ dan Amazon S3 untuk menerapkan pola Pemeriksaan Klaim:

Contoh berikut menunjukkan pola ini menggunakan Amazon MQ, SDK for Java AWS 2.x, dan Amazon S3:
-
Pertama, tentukan kelas Pesan yang akan menampung pengenal referensi Amazon S3.
class Message { // Other data fields of the message... public String s3Key; public String s3Bucket; }
-
Buat metode penerbit yang menyimpan muatan di Amazon S3 dan mengirim pesan referensi melalui RabbitMQ.
public void publishPayload() { // Store the payload in S3. String payload = PAYLOAD; String prefix = S3_KEY_PREFIX; String s3Key = prefix + "/" + UUID.randomUUID(); s3Client.putObject(PutObjectRequest.builder() .bucket(S3_BUCKET).key(s3Key).build(), RequestBody.fromString(payload)); // Send the reference through RabbitMQ. Message message = new Message(); message.s3Key = s3Key; message.s3Bucket = S3_BUCKET; // Assign values to other fields in your message instance. publishMessage(message); }
-
Menerapkan metode konsumen yang mengambil payload dari Amazon S3, memproses payload, dan menghapus objek Amazon S3.
public void consumeMessage(Message message) { // Retrieve the payload from S3. String payload = s3Client.getObjectAsBytes(GetObjectRequest.builder() .bucket(message.s3Bucket).key(message.s3Key).build()) .asUtf8String(); // Process the complete message. processPayload(message, payload); // Delete the S3 object. s3Client.deleteObject(DeleteObjectRequest.builder() .bucket(message.s3Bucket).key(message.s3Key).build()); }
Gunakan basic.consume
dan konsumen berumur panjang
Menggunakan basic.consume
dengan konsumen berumur panjang lebih efisien daripada polling untuk menggunakan pesan individu. basic.get
Untuk informasi selengkapnya, lihat Polling untuk pesan individual