本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
适用于 RabbitMQ 的 Amazon MQ 中性能优化和效率的最佳实践
您可以通过最大限度地提高吞吐量、最大限度地减少延迟和确保高效的资源利用率来优化 Amazon MQ 的 RabbitMQ 代理性能。完成以下最佳实践以优化应用程序性能。
步骤 1:将邮件大小控制在 1 MB 以下
我们建议将消息保持在 1 MB 以下,以获得最佳性能和可靠性。
默认情况下,RabbitMQ 3.13 支持最大为 128 MB 的消息大小,但是大消息可能会触发不可预测的内存警报,从而阻止发布,并可能在跨节点复制消息时造成高内存压力。过大的消息还会影响代理的重启和恢复过程,从而增加服务连续性的风险并可能导致性能下降。
使用索赔检查模式存储和检索大型有效载荷
要管理大型消息,您可以通过将消息负载存储在外部存储器中并通过 RabbitMQ 仅发送有效负载参考标识符来实现声明检查模式。使用者使用有效负载参考标识符来检索和处理大消息。
下图演示了如何使用适用于 RabbitMQ 和 Amazon S3 的 Amazon MQ 来实现索赔检查模式。

以下示例使用亚马逊 MQ、适用于 Java 2.x 的AWS SDK 和 Amazon S3 演示了这种模式:
-
首先,定义一个用于保存 Amazon S3 参考标识符的消息类。
class Message { // Other data fields of the message... public String s3Key; public String s3Bucket; }
-
创建一个发布者方法,该方法将有效负载存储在 Amazon S3 中并通过 RabbitMQ 发送参考消息。
public void publishPayload() { // Store the payload in S3. String payload = PAYLOAD; String prefix = S3_KEY_PREFIX; String s3Key = prefix + "/" + UUID.randomUUID(); s3Client.putObject(PutObjectRequest.builder() .bucket(S3_BUCKET).key(s3Key).build(), RequestBody.fromString(payload)); // Send the reference through RabbitMQ. Message message = new Message(); message.s3Key = s3Key; message.s3Bucket = S3_BUCKET; // Assign values to other fields in your message instance. publishMessage(message); }
-
实现一种从 Amazon S3 检索有效负载、处理有效负载并删除 Amazon S3 对象的使用者方法。
public void consumeMessage(Message message) { // Retrieve the payload from S3. String payload = s3Client.getObjectAsBytes(GetObjectRequest.builder() .bucket(message.s3Bucket).key(message.s3Key).build()) .asUtf8String(); // Process the complete message. processPayload(message, payload); // Delete the S3 object. s3Client.deleteObject(DeleteObjectRequest.builder() .bucket(message.s3Bucket).key(message.s3Key).build()); }
第 2 步:使用basic.consume
和长寿消费者
basic.consume
与使用寿命较长的使用者一起使用比使用basic.get
轮询单个消息更有效。有关更多信息,请参阅轮询单个消息
步骤 3:配置预取
您可以使用 RabbitMQ 预提取值来优化使用者使用消息的方式。RabbitMQ 通过将预提取计数应用于使用者而不是通道,实现 AMQP 0-9-1 提供的通道预提取机制。预提取值用于指定在任何给定时间向使用者发送的消息数量。默认情况下,RabbitMQ 会为客户端应用程序设置无限制的缓冲区大小。
在为您的 RabbitMQ 使用者设置预提取计数时,需要考虑各种因素。首先,考虑使用者的环境和配置。由于使用者需要在处理消息时将所有消息保存在内存中,因此,较高的预提取值可能会对使用者的性能产生负面影响,在某些情况下,可能会导致使用者同时崩溃。同样,RabbitMQ 代理本身会将其发送的所有消息缓存在内存中,直到收到使用者确认。如果没有为使用者配置自动确认,并且使用者需要相对较长的时间来处理消息,则较高的预提取值可能会导致 RabbitMQ 服务器内存不足。
考虑到上述因素,我们建议始终设置预提取值,以防止由于大量未处理或未确认的消息而导致 RabbitMQ 代理或其使用者出现内存不足的情况。如果您需要优化代理来处理大量消息,您可以使用一系列预提取计数来测试您的代理和使用者,以确定与使用者处理消息所需的时间相比,网络开销在哪个点上变得微不足道。
注意
如果您的客户端应用程序已配置为自动确认将消息传递给使用者,则设置预提取值将不起作用。
所有预提取消息都会从队列中删除。
以下示例演示了如何使用 RabbitMQ Java 客户端库为单一使用者设置 10
的预提取值。
ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注意
在 RabbitMQ Java 客户端库中,global
标志的默认值设置为 false
,所以上面的例子可以简单地写成 channel.basicQos(10)
。
第 4 步:将 Celery 5.5 或更高版本用于法定人数队列
Python Celery
适用于所有 Celery 版本
-
关闭
task_create_missing_queues
以缓解队列流失。 -
然后,关闭
worker_enable_remote_control
以停止动态创建celery@...pidbox
队列。这将减少经纪商的队列流失。worker_enable_remote_control = false
-
要进一步减少非关键消息活动,请在启动 Celery 应用程序时worker-send-task-events
通过不包含 -E
或--task-events
标记来关闭 Celery。 -
使用以下参数启动 Celery 应用程序:
celery -A app_name worker --without-heartbeat --without-gossip --without-mingle
适用于 Celery 5.5 及以上版本
-
升级到 Celery 版本 5.5
(支持法定队列的最低版本)或更高版本。要查看您使用的是哪个版本的 Celery,请使用 celery --version
。有关法定人数队列的更多信息,请参阅。RabbitMQ on Amazon MQ 的仲裁队列 -
升级到 Celery 5.5 或更高版本后,配置
task_default_queue_type
为 “法定人数”。 -
然后,您还必须在 “代理传输选项” 中打开 “发布确认”
: broker_transport_options = {"confirm_publish": True}