本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
适用于 RabbitMQ 的 Amazon MQ 中消息持久性和可靠性的最佳实践
在将应用程序移至生产环境之前,请完成以下最佳实践,以防止消息丢失和资源过度使用。
步骤 1:使用永久消息和持久队列
持久消息有助于防止在代理崩溃或重启的情况下丢失数据。持久消息一到达就会立即写入磁盘。但是,与延迟队列不同的是,持久消息同时在内存和磁盘中缓存,除非代理需要更多内存。在需要更多内存的情况下,通过管理将消息存储到磁盘的 RabbitMQ 代理机制从内存中删除消息,通常称为持久性层。
要启用消息持久性,可以将队列声明为 durable
并将消息传递模式设置为 persistent
。以下示例演示了如何使用 RabbitMQ Java 客户端库
boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);
将队列配置为持续队列后,您可以通过将 MessageProperties
设置为 PERSISTENT_TEXT_PLAIN
来将持久消息发送到您的队列,如以下示例所示。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
步骤 2:配置发布者确认和消费者送达确认
确认消息已发送到代理的过程称为发布者确认。发布者确认告知您的应用程序何时可靠地存储了消息。发布者确认还有助于控制存储到代理的消息速率。如果没有发布商的确认,则无法确认消息已成功处理,并且您的代理可能会丢弃它无法处理的消息。
同样,当客户端应用程序向代理发回消息的交付和使用确认时,称为使用者交付确认。在使用 RabbitMQ 代理时,这两种确认对于确保数据安全至关重要。
使用者传递确认通常在客户端应用程序上配置。使用 AMQP 0-9-1 时,可以通过配置 basic.consume
方法来启用确认。AMQP 0-9-1 客户端也可以通过发送 confirm.select
方法来配置发布者确认。
通常,在通道中启用传递确认。例如,使用 RabbitMQ Java 客户端库时,可以使用 Channel#basicAck
来设置一个简单的 basic.ack
肯定确认,如以下示例所示。
// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注意
未确认的消息必须在内存中缓存。您可以通过为客户端应用程序配置预提取设置,限制使用者预提取的消息数量。
您可以配置 consumer_timeout
,以便在使用者不确认交付时进行检测。如果使用者没有在超时值内发送确认,通道将被关闭,您将收到 PRECONDITION_FAILED
。要诊断错误,请使用 UpdateConfigurationAPI 增加consumer_timeout
值。
第 3 步:缩短排队时间
在集群部署中,包含大量消息的队列可能会导致资源过度利用。当代理被过度利用时,重启 Amazon MQ for RabbitMQ 代理可能会导致性能进一步降低。如果重启,过度利用的代理可能会在 REBOOT_IN_PROGRESS
状态下变得反应迟钝。
在维护时段,Amazon MQ 每次执行一个节点的所有维护工作,以确保代理保持正常运行。因此,在每个节点恢复正常运行时,队列可能需要同步。在同步过程中,需要复制到镜像的消息将从相应的 Amazon Elastic Block Store(Amazon EBS)卷加载到内存中,以进行批处理。批处理消息可以让队列更快地同步。
如果队列保持简短且消息较少,则队列会按预期成功同步并恢复正常运行。但是,如果批处理中的数据量接近节点的内存限制,节点会引发高内存警报,暂停队列同步。您可以通过比较中的RabbitMemUsed
和RabbitMqMemLimit
代理节点指标来确认内存使用情况 CloudWatch。在消耗或删除消息或批处理中的消息数量减少之前,同步无法完成。
如果集群部署暂停队列同步,我们建议使用或删除消息,以减少队列中的消息数量。一旦队列深度减少且队列同步完成,代理状态将更改为 RUNNING
。要解决暂停的队列同步,您还可以应用策略来减少队列同步批处理大小。
您还可以定义自动删除和 TTL 策略,以主动减少资源使用量,并尽量减少对消费者的侵 NACKs 害。在代理上重新排队消息会占用 CPU 密集型,因此大量消息可能会影响代理性能。 NACKs