Amazon MQ for RabbitMQ のベストプラクティス - Amazon MQ

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon MQ for RabbitMQ のベストプラクティス

このセクションは、Amazon MQ での RabbitMQ ブローカーの使用時にパフォーマンスを最大限に引き出し、スループットコストを最小限に抑えるための推奨事項をすばやく見つけるために使用してください。

重要

現在、Amazon MQ はストリームや、RabbitMQ 3.9.x で導入された JSON での構造化ロギングの使用をサポートしていません。

重要

Amazon MQ for RabbitMQ では、ユーザー名「guest」はサポートされず、デフォルトのゲストアカウントは新しいブローカーの作成時に削除されます。ユーザーが作成した「guest」というアカウントも、Amazon MQ によって定期的に削除されます。

最高のスループットのために正しいブローカーインスタンスタイプを選択する

ブローカーインスタンスタイプのメッセージスループットは、アプリケーションのユースケースに依存します。t3.micro などの小さいブローカーインスタンスタイプは、アプリケーションのパフォーマンスをテストする場合にのみ使用することをお勧めします。大規模なインスタンスを本番環境で使用する前にこれらのマイクロインスタンスを使用すると、アプリケーションのパフォーマンスが向上し、開発コストを抑えることができます。m5.large 以上のインスタンスタイプでは、クラスターデプロイを使用して高可用性とメッセージの耐久性を確保できます。大きいブローカーインスタンスタイプは、本番稼働レベルのクライアントとキュー、高スループット、メモリ内のメッセージ、冗長メッセージを処理できます。正しいインスタンスタイプの選択の詳細については、「」を参照してくださいAmazon MQ for RabbitMQ のサイズ設定ガイドライン

複数のチャネルを使用する

接続チャーンを回避するには、1 つの接続で複数のチャネルを使用します。アプリケーションでは、チャネルに対する 1:1 の接続を避ける必要があります。プロセスごとに 1 つの接続を使用し、スレッドごとに 1 つのチャネルを使用することをお勧めします。チャネルのリークを防ぐために、チャネルを過剰に使用することは避けてください。

永続メッセージと持続キューを使用する

永続メッセージは、ブローカーがクラッシュまたは再起動するという状況におけるデータ損失の防止に役立ちます。永続メッセージは、到着するとすぐにディスクに書き込まれますが、レイジーキューとは異なり、ブローカーがより多くのメモリを必要とする場合を除き、永続メッセージはメモリとディスクの両方にキャッシュされます。より多くのメモリが必要な場合は、ディスクへのメッセージの保存を管理する RabbitMQ ブローカーメカニズム (一般に永続レイヤーと呼ばれます) によって、メモリからメッセージが削除されます。

メッセージの永続性を有効にするには、キューを durable として宣言し、メッセージ配信モードを persistent に設定できます。以下の例は、RabbitMQ Java クライアントライブラリを使用した持続キューの宣言を示しています。AMQP 0-9-1 を使用している場合は、配信モード「2」を設定することで、メッセージを永続としてマークできます。

boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);

キューを持続キューとして設定したら、以下の例にあるように、MessagePropertiesPERSISTENT_TEXT_PLAIN に設定することによって永続メッセージをキューに送信できます。

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

キューを短くしておく

クラスターデプロイでは、多数のメッセージを持つキューがリソースの過剰な使用につながる場合があります。ブローカーが過剰に使用されているときは、Amazon MQ for RabbitMQ ブローカーの再起動がパフォーマンスをさらに低下させる原因となる可能性があります。過剰に使用されているブローカーが再起動されると、REBOOT_IN_PROGRESS 状態のまま応答しなくなることがあります。

Amazon MQ はメンテナンスウィンドウ中、すべてのメンテナンス作業を一度に 1 ノードずつ実行して、ブローカーが動作可能な状態を維持することを確実にします。その結果、各ノードが操作を再開するときに、キューが同期する必要が生じる場合があります。同期中、ミラーにレプリケートする必要があるメッセージは、バッチで処理されるように、対応する Amazon Elastic Block Store (Amazon EBS) ボリュームからメモリにロードされます。メッセージをバッチで処理することにより、キューの同期が速くなります。

キューを短くし、メッセージを小さくしておくと、キューが正常に同期し、期待通りに操作を再開します。ただし、バッチ内のデータ量がノードのメモリ制限に近づいた場合は、ノードが高メモリアラームを発し、キューの同期を一時停止します。メモリ使用量は、CloudWatch で RabbitMemUsed および RabbitMqMemLimit のブローカーノードメトリクスを比較することで確認できます。同期は、メッセージが消費もしくは削除される、またはバッチ内のメッセージの数が減るまで完了できません。

クラスターデプロイのためにキューの同期化が一時停止される場合は、メッセージを消費または削除して、キュー内のメッセージの数を減らすことをお勧めします。キュー深度が減少し、キューの同期が完了すると、ブローカーのステータスが RUNNING に変更されます。一時停止されたキューの同期を解決するには、キューの同期のバッチサイズを小さくするポリシーを適用することも可能です。

また、自動削除ポリシーと TTL ポリシーを定義すると、リソースの使用量をプロアクティブに削減するとともに、コンシューマーからの NACK を最小限に抑えることができます。ブローカーへのメッセージの再キュー処理は CPU 負荷が高いため、大量の NACK が発生するとブローカーのパフォーマンスに影響する可能性があります。

パブリッシャーの確認とコンシューマーの配信承認の設定

ブローカーにメッセージが送信されたことを確認するプロセスは、パブリッシャーの確認と呼ばれます。パブリッシャーは、メッセージが確実に格納されたときにアプリケーションに通知します。パブリッシャーの確認は、ブローカーに格納されるメッセージの割合を制御するためにも役立ちます。パブリッシャーが確認しないと、メッセージが正常に処理されたことは確認されず、ブローカーは処理できないメッセージを削除する可能性があります。

同様に、クライアントアプリケーションはメッセージの配信と消費の確認をブローカーに返送します。これはコンシューマーの配信承認と呼ばれます。RabbitMQ ブローカーの使用時にデータの安全性を確保するには、確認と承認の両方が不可欠です。

コンシューマーの配信承認は、通常クライアントアプリケーションで設定されています。AMQP 0-9-1 を使用している場合は、basic.consume メソッドを設定することで承認を有効化できます。AMQP 0-9-1 クライアントでは、confirm.select メソッドを送信してパブリッシャーの確認を設定することもできます。

通常、配信承認はチャネルで有効化されます。例えば、RabbitMQ Java クライアントライブラリの使用時には、以下の例にあるように、Channel#basicAck を使用してシンプルな basic.ack 肯定承認をセットアップできます。

// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注記

未承認メッセージは、メモリにキャッシュする必要があります。コンシューマーがプリフェッチするメッセージの数は、クライアントアプリケーションのプリフェッチを設定することによって制限できます。

consumer_timeout を設定すると、コンシューマーから配信承認が届かない状況を検出できます。コンシューマーがタイムアウト値の時間内に承認を送信しない場合、チャネルは閉じられ、PRECONDITION_FAILED が発生します。エラーを診断するには、UpdateConfiguration API を使用して consumer_timeout 値を大きくします。

プリフェッチを設定する

RabbitMQ のプリフェッチ値を使用して、コンシューマーがメッセージを消費する方法を最適化できます。RabbitMQ は、プリフェッチ数をチャネルではなくコンシューマーに適用することによって、AMQP 0-9-1 が提供するチャネルプリフェッチメカニズムを実装します。プリフェッチ値は、特定の時間にコンシューマに送信されるメッセージの数を指定するために使用されます。デフォルトで、RabbitMQ はクライアントアプリケーションに無制限のバッファサイズを設定します。

RabbitMQ コンシューマーにプリフェッチ数を設定するときに考慮する要因にはさまざまなものがあります。まず、コンシューマーの環境と設定を考慮します。コンシューマーは、メッセージが処理されるときにそれらすべてをメモリに保持する必要があるため、高いプリフェッチ値はコンシューマーのパフォーマンスに悪影響を及ぼし、場合によってはコンシューマー全体がクラッシュする原因になることもあります。同様に、RabbitMQ ブローカー自体も、コンシューマー承認を受け取るまで、送信するすべてのメッセージをメモリにキャッシュしておきます。コンシューマに自動承認が設定されておらず、コンシューマによるメッセージの処理に比較的長い時間がかかる場合、高いプリフェッチ値は RabbitMQ サーバーのメモリがすぐになくなる原因になる可能性があります。

上記の考慮事項を踏まえて、大量の未処理または未承認のメッセージが原因で RabbitMQ ブローカー、またはそのコンシューマーでメモリ不足が発生する状況を防ぐため、常にプリフェッチ値を設定することが推奨されます。大量のメッセージを処理するためにブローカーを最適化する必要がある場合は、さまざまなプリフェッチ数を使用してブローカーとコンシューマーをテストし、コンシューマーがメッセージを処理するためにかかる時間と比較して、ネットワークオーバーヘッドがおおむね軽微なものになる値を判断します。

注記
  • コンシューマーへのメッセージの配信を自動承認するようにクライアントアプリケーションが設定されている場合、プリフェッチ値を設定しても効果はありません。

  • プリフェッチされたメッセージはすべて、キューから削除されます。

以下の例は、RabbitMQ Java クライアントライブラリを使用した単一のコンシューマーへのプリフェッチ値 10 の設定を示しています。

ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注記

RabbitMQ Java クライアントライブラリでは、global フラグのデフォルト値が false に設定されているので、上記の例は単純に channel.basicQos(10) として記述できます。

クォーラムキューで Celery 5.5 以降を使用する

分散タスクキューシステムである Python Celery は、タスク負荷が高い場合に、重要ではないメッセージを多数生成できます。この追加のブローカーアクティビティにより、RabbitMQ メモリアラームがトリガーされ、ブローカーが使用できなくなる可能性があります。メモリアラームがトリガーされる可能性を減らすには、以下を実行します。

すべての Celery バージョンの場合

  1. をオフにtask_create_missing_queuesしてキューのチャーンを軽減します。

  2. 次に、 をオフにworker_enable_remote_controlして、celery@...pidboxキューの動的作成を停止します。これにより、ブローカーのキューチャーンが減少します。

    worker_enable_remote_control = false
  3. 重要でないメッセージアクティビティをさらに減らすには、Celery アプリケーションを起動するときに -Eまたは --task-events フラグを付けずに、Celery worker-send-task-events をオフにします。

  4. 次のパラメータを使用して Celery アプリケーションを起動します。

    celery -A app_name worker --without-heartbeat --without-gossip --without-mingle

Celery バージョン 5.5 以降の場合

  1. Celery バージョン 5.5、クォーラムキューをサポートする最小バージョン、またはそれ以降のバージョンにアップグレードします。使用している Celery のバージョンを確認するには、 を使用しますcelery --version。クォーラムキューの詳細については、「」を参照してくださいAmazon MQ での RabbitMQ のクォーラムキュー

  2. Celery 5.5 以降にアップグレードした後、 task_default_queue_type「クォーラム」に設定します。

  3. 次に、ブローカートランスポートオプションで発行確認を有効にする必要もあります。

    broker_transport_options = {"confirm_publish": True}

ネットワーク障害から自動的に回復する

RabbitMQ ノードへのクライアント接続が失敗した場合の大幅なダウンタイムを防ぐため、自動ネットワークリカバリを常に有効にしておくことをお勧めします。バージョン 4.0.0 以降の RabbitMQ Java クライアントライブラリは、自動ネットワークリカバリをデフォルトでサポートします。

自動接続リカバリは、接続の I/O ループで未処理の例外がスローされた場合、ソケット読み取り操作のタイムアウトが検出された場合、またはサーバーがハートビートを受信しない場合にトリガーされます。

クライアントと RabbitMQ ノード間の初期接続が失敗した場合、自動リカバリはトリガーされません。アプリケーションコードは、接続の再試行によって、初期接続障害を考慮するように記述することをお勧めします。以下の例は、RabbitMQ Java クライアントライブラリを使用した初期ネットワーク障害の再試行を示しています。

ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
注記

アプリケーションが Connection.Close メソッド使用して接続を閉じる場合、自動ネットワークリカバリは有効化またはトリガーされません。

メッセージサイズを 1 MB 未満に維持する

最適なパフォーマンスと信頼性を得るには、メッセージを 1 MB (1 MB) 未満にしておくことをお勧めします。

RabbitMQ 3.13 はデフォルトで最大 128 MB のメッセージサイズをサポートしますが、大きなメッセージは、発行をブロックし、ノード間でメッセージをレプリケートしながら高いメモリ負荷を発生させる可能性のある予測不可能なメモリアラームをトリガーする可能性があります。メッセージのサイズが大きすぎると、ブローカーの再起動プロセスや復旧プロセスにも影響し、サービス継続性のリスクが高まり、パフォーマンスが低下する可能性があります。

クレームチェックパターンを使用して大きなペイロードを保存および取得する

大きなメッセージを管理するには、メッセージペイロードを外部ストレージに保存し、RabbitMQ を介してペイロード参照識別子のみを送信することで、クレームチェックパターンを実装できます。コンシューマーはペイロード参照識別子を使用して、大きなメッセージを取得して処理します。

次の図は、Amazon MQ for RabbitMQ と Amazon S3 を使用してクレームチェックパターンを実装する方法を示しています。

Diagram showing data flow between Producer, Consumer, Amazon MQ broker, and AWS S3.

次の例は、Amazon MQ、 AWS SDK for Java 2.xAmazon S3 を使用したこのパターンを示しています。

  1. まず、Amazon S3 参照識別子を保持する Message クラスを定義します。

    class Message { // Other data fields of the message... public String s3Key; public String s3Bucket; }
  2. Amazon S3 にペイロードを保存し、RabbitMQ を介してリファレンスメッセージを送信するパブリッシャーメソッドを作成します。

    public void publishPayload() { // Store the payload in S3. String payload = PAYLOAD; String prefix = S3_KEY_PREFIX; String s3Key = prefix + "/" + UUID.randomUUID(); s3Client.putObject(PutObjectRequest.builder() .bucket(S3_BUCKET).key(s3Key).build(), RequestBody.fromString(payload)); // Send the reference through RabbitMQ. Message message = new Message(); message.s3Key = s3Key; message.s3Bucket = S3_BUCKET; // Assign values to other fields in your message instance. publishMessage(message); }
  3. Amazon S3 からペイロードを取得し、ペイロードを処理し、Amazon S3 オブジェクトを削除するコンシューマーメソッドを実装します。

    public void consumeMessage(Message message) { // Retrieve the payload from S3. String payload = s3Client.getObjectAsBytes(GetObjectRequest.builder() .bucket(message.s3Bucket).key(message.s3Key).build()) .asUtf8String(); // Process the complete message. processPayload(message, payload); // Delete the S3 object. s3Client.deleteObject(DeleteObjectRequest.builder() .bucket(message.s3Bucket).key(message.s3Key).build()); }

basic.consume と存続期間の長いコンシューマーを使用する

存続期間の長いコンシューマーbasic.consumeで を使用すると、 を使用して個々のメッセージをポーリングするよりも効率的ですbasic.get。詳細については、「個々のメッセージのポーリング」を参照してください。