

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Amazon MQ for RabbitMQ 最佳实践
<a name="best-practices-rabbitmq"></a>

遵循这些生产就绪指南，以在使用 Amazon MQ for RabbitMQ 代理时最大化代理性能并优化消息吞吐效率。

**重要**  
目前，Amazon MQ 不支持[流](https://www.rabbitmq.com/streams.html)或在 JSON 中使用结构化日志记录（在 RabbitMQ 3.9.x 中推出）。

**Topics**
+ [Amazon MQ for RabbitMQ 中代理设置和连接管理的最佳实践](best-practices-broker-setup.md)
+ [Amazon MQ for RabbitMQ 中消息持久性与可靠性的最佳实践](best-practices-message-reliability.md)
+ [Amazon MQ for RabbitMQ 中性能优化与效率的最佳实践](best-practices-performance.md)
+ [Amazon MQ for RabbitMQ 网络弹性与监控最佳实践](best-practices-network-resilience.md)

# Amazon MQ for RabbitMQ 中代理设置和连接管理的最佳实践
<a name="best-practices-broker-setup"></a>

 代理设置和连接管理是防止代理消息吞吐量、资源利用率及处理生产工作负载能力出现问题的第一步。当[创建和配置 Amazon MQ for RabbitMQ 代理](getting-started-rabbitmq.md#create-rabbitmq-broker)时，请完成以下最佳实践：选择适当的实例类型、有效管理连接以及配置消息预取，以最大化代理性能。

**重要**  
Amazon MQ for RabbitMQ 不支持用户名“guest”，并会在您创建新代理时删除默认的访客账户。Amazon MQ 还将定期删除任何由客户创建的名为“guest”的账户。

## 步骤 1：使用集群部署
<a name="use-cluster-deployments-for-high-availability"></a>

 对于生产工作负载，我们建议使用集群部署而非单实例代理，以确保高可用性和消息弹性。集群部署消除了单点故障并提供更好的容错能力。

 集群部署包含分布在三个可用区的三个 RabbitMQ 代理节点，可提供自动失效转移，并确保即使整个可用区不可用也能持续运行。Amazon MQ 会自动在所有节点间复制消息，以确保在节点故障或维护期间的可用性。

 集群部署对生产环境至关重要，并受 [Amazon MQ 服务级别协议](https://aws.amazon.com/amazon-mq/sla/)支持。

 更多信息，请参阅 [Amazon MQ for RabbitMQ 中的集群部署](rabbitmq-broker-architecture.md#rabbitmq-broker-architecture-cluster)。

## 步骤 2：选择正确的代理实例类型
<a name="choose-broker-instance-type"></a>

 代理实例类型的消息吞吐量取决于应用程序用例。`M7g.medium` 应仅用于测试应用程序性能。在生产环境中使用较大实例前先使用此较小实例可以提高应用程序性能。在实例类型 `m7g.large` 及更大实例上，您可以使用集群部署来实现高可用性和消息持久性。较大的代理实例类型可以处理生产级别的客户端和队列、高吞吐量、内存中的消息和冗余消息。

 有关选择正确实例类型的更多信息，请参阅 [Amazon MQ for RabbitMQ 中的规模调整指南](rabbitmq-sizing-guidelines.md)。

## 步骤 3：使用法定队列
<a name="use-quorum-queues"></a>

 对于 3.13 及以上版本的 RabbitMQ 代理，法定队列配合集群部署应作为生产环境中复制队列类型的默认选择。法定队列是一种现代复制队列类型，具有高可靠性、高吞吐量和稳定延迟的特点。

 法定队列使用 Raft 共识算法以提供更好的容错能力。当主节点不可用时，法定队列通过多数投票自动选举新主节点，确保消息传递以最小中断持续进行。由于每个节点位于不同的可用区，即使整个可用区暂时不可用，您的消息系统仍保持可用。

 要声明法定队列，请在创建队列时将头参数 `x-queue-type` 设置为 `quorum`。

 有关法定队列的更多信息（包括迁移策略和最佳实践），请参阅 [Amazon MQ for RabbitMQ 中的法定队列](quorum-queues.md)。

## 步骤 4：使用多个通道
<a name="use-multiple-channels"></a>

 为避免连接中断，请在单个连接上使用多个通道。应用程序应避免 1:1 的连接与通道比率。我们建议每个进程使用一个连接，每个线程使用一个通道。避免过度使用通道，以防通道泄漏。

# Amazon MQ for RabbitMQ 中消息持久性与可靠性的最佳实践
<a name="best-practices-message-reliability"></a>

 在将应用程序移至生产环境前，请完成以下防止消息丢失和资源过度利用的最佳实践。

## 步骤 1：使用持久化消息和持久化队列
<a name="use-persistent-messages-durable-queues"></a>

 持久化消息有助于在代理崩溃或重启时保护数据持久性。持久消息一到达就会立即写入磁盘。但是，与延迟队列不同的是，持久消息同时在内存和磁盘中缓存，除非代理需要更多内存。在需要更多内存的情况下，通过管理将消息存储到磁盘的 RabbitMQ 代理机制从内存中删除消息，通常称为*持久性层*。

要启用消息持久性，可以将队列声明为 `durable` 并将消息传递模式设置为 `persistent`。以下示例演示了如何使用 [RabbitMQ Java 客户端库](https://www.rabbitmq.com/java-client.html)声明持续队列。在使用 AMQP 0-9-1 时，您可以通过设置“2”传送模式将消息标记为持久消息。

```
boolean durable = true;
channel.queueDeclare("my_queue", durable, false, false, null);
```

 将队列配置为持续队列后，您可以通过将 `MessageProperties` 设置为 `PERSISTENT_TEXT_PLAIN` 来将持久消息发送到您的队列，如以下示例所示。

```
import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "my_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
```

## 步骤 2：配置发布者确认和消费者投递确认
<a name="configure-confirmation-acknowledgement"></a>

 确认消息已发送到代理的过程称为*发布者确认*。发布者确认告知您的应用程序何时可靠地存储了消息。发布者确认还有助于控制存储到代理的消息速率。若无发布者确认，则无法确认消息是否成功处理，且代理可能会丢弃其无法处理的消息。

 同样，当客户端应用程序向代理发回消息的交付和使用确认时，称为*使用者交付确认*。在使用 RabbitMQ 代理时，这两种确认对于确保数据安全至关重要。

 使用者传递确认通常在客户端应用程序上配置。使用 AMQP 0-9-1 时，可以通过配置 `basic.consume` 方法来启用确认。AMQP 0-9-1 客户端也可以通过发送 `confirm.select` 方法来配置发布者确认。

 通常，在通道中启用传递确认。例如，使用 RabbitMQ Java 客户端库时，可以使用 `Channel#basicAck` 来设置一个简单的 `basic.ack` 肯定确认，如以下示例所示。

```
// this example assumes an existing channel instance

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // positively acknowledge a single delivery, the message will
             // be discarded
             channel.basicAck(deliveryTag, false);
         }
     });
```

**注意**  
 未确认的消息必须在内存中缓存。您可以通过为客户端应用程序配置[预提取](best-practices-performance.md#configure-prefetching)设置，限制使用者预提取的消息数量。

 您可以配置 `consumer_timeout`，以便在使用者不确认交付时进行检测。如果使用者没有在超时值内发送确认，通道将被关闭，您将收到 `PRECONDITION_FAILED`。要诊断错误，请使用 [UpdateConfiguration](https://docs.aws.amazon.com/amazon-mq/latest/api-reference/configurations-configuration-id.html)API 增加`consumer_timeout`值。

## 步骤 3：保持队列简短
<a name="keep-queues-short"></a>

在集群部署中，包含大量消息的队列可能会导致资源过度利用。当代理被过度利用时，重启 Amazon MQ for RabbitMQ 代理可能会导致性能进一步降低。如果重启，过度利用的代理可能会在 `REBOOT_IN_PROGRESS` 状态下变得反应迟钝。

在[维护时段](amazon-mq-rabbitmq-editing-broker-preferences.md#rabbitmq-edit-current-configuration-console)，Amazon MQ 每次执行一个节点的所有维护工作，以确保代理保持正常运行。因此，在每个节点恢复正常运行时，队列可能需要同步。在同步过程中，需要复制到镜像的消息将从相应的 Amazon Elastic Block Store（Amazon EBS）卷加载到内存中，以进行批处理。批处理消息可以让队列更快地同步。

如果队列保持简短且消息较少，则队列会按预期成功同步并恢复正常运行。但是，如果批处理中的数据量接近节点的内存限制，节点会引发高内存警报，暂停队列同步。您可以通过比较中的`RabbitMemUsed`和`RabbitMqMemLimit`[代理节点指标来确认内存使用情况 CloudWatch](amazon-mq-accessing-metrics.md)。在消耗或删除消息或批处理中的消息数量减少之前，同步无法完成。

 如果集群部署暂停队列同步，我们建议使用或删除消息，以减少队列中的消息数量。一旦队列深度减少且队列同步完成，代理状态将更改为 `RUNNING`。要解决暂停的队列同步，您还可以应用策略来[减少队列同步批处理大小](rabbitmq-queue-sync.md)。

您还可以定义自动删除和 TTL 策略，以主动减少资源使用量，并尽量减少对消费者的侵 NACKs 害。在代理上重新排队消息会占用 CPU 密集型，因此大量消息可能会影响代理性能。 NACKs 

# Amazon MQ for RabbitMQ 中性能优化与效率的最佳实践
<a name="best-practices-performance"></a>

 您可通过最大化吞吐量、最小化延迟及确保高效资源利用来优化 Amazon MQ for RabbitMQ 代理性能。请完成以下最佳实践以优化应用程序性能。

## 步骤 1：保持消息大小低于 1 MB
<a name="keep-messages-under-one-mb"></a>

 我们建议将消息保持在 1 兆字节（MB）以下以获得最佳性能和可靠性。

 RabbitMQ 3.13 默认支持高达 128 MB 的消息大小，但大消息可能触发不可预测的内存警报，从而阻塞发布操作，并在跨节点复制消息时可能产生高内存压力。过大的消息还会影响代理重启和恢复过程，这会增加服务连续性的风险并可能导致性能下降。

 **使用认领检查模式存储和检索大有效载荷** 

 为管理大消息，您可通过将消息有效载荷存储在外部存储中，并仅通过 RabbitMQ 发送有效载荷引用标识符来实现认领检查模式。消费者使用有效载荷引用标识符来检索和处理大消息。

 下图演示了如何使用 Amazon MQ for RabbitMQ 和 Amazon S3 实现认领检查模式。

![\[Diagram showing data flow between Producer, Consumer, Amazon MQ broker, and AWS S3.\]](http://docs.aws.amazon.com/zh_cn/amazon-mq/latest/developer-guide/images/claim-check-pattern.png)


 以下示例使用 Amazon MQ、[AWS SDK for Java 2.x](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) 和 [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) 演示此模式：

1.  首先，定义一个用于存储 Amazon S3 引用标识符的 Message 类。

   ```
   class Message {
       // Other data fields of the message...
   
       public String s3Key;
       public String s3Bucket;
   }
   ```

1.  创建一个发布者方法，将有效载荷存储在 Amazon S3 中并通过 RabbitMQ 发送引用消息。

   ```
   public void publishPayload() {
       // Store the payload in S3.
       String payload = PAYLOAD;
       String prefix = S3_KEY_PREFIX;
       String s3Key = prefix + "/" + UUID.randomUUID();
       s3Client.putObject(PutObjectRequest.builder()
           .bucket(S3_BUCKET).key(s3Key).build(), 
           RequestBody.fromString(payload));
       
       // Send the reference through RabbitMQ.
       Message message = new Message();
       message.s3Key = s3Key;
       message.s3Bucket = S3_BUCKET;
       // Assign values to other fields in your message instance.
   
       publishMessage(message);
   }
   ```

1.  实现一个消费者方法，从 Amazon S3 检索有效载荷、处理有效载荷并删除 Amazon S3 对象。

   ```
   public void consumeMessage(Message message) {
       // Retrieve the payload from S3.
       String payload = s3Client.getObjectAsBytes(GetObjectRequest.builder()
           .bucket(message.s3Bucket).key(message.s3Key).build())
           .asUtf8String();
       
       // Process the complete message.
       processPayload(message, payload);
       
       // Delete the S3 object.
       s3Client.deleteObject(DeleteObjectRequest.builder()
           .bucket(message.s3Bucket).key(message.s3Key).build());
   }
   ```

## 步骤 2：使用 `basic.consume` 和长生命周期消费者
<a name="use-basic-consume"></a>

 使用 `basic.consume` 配合长生命周期消费者比使用 `basic.get` 轮询单个消息更高效。更多信息，请参阅[轮询单个消息](https://www.rabbitmq.com/docs/3.13/consumers#polling)。

## 步骤 3：配置预取
<a name="configure-prefetching"></a>

 您可以使用 RabbitMQ 预提取值来优化使用者使用消息的方式。RabbitMQ 通过将预提取计数应用于使用者而不是通道，实现 AMQP 0-9-1 提供的通道预提取机制。预提取值用于指定在任何给定时间向使用者发送的消息数量。默认情况下，RabbitMQ 会为客户端应用程序设置无限制的缓冲区大小。

 在为您的 RabbitMQ 使用者设置预提取计数时，需要考虑各种因素。首先，考虑使用者的环境和配置。由于使用者需要在处理消息时将所有消息保存在内存中，因此，较高的预提取值可能会对使用者的性能产生负面影响，在某些情况下，可能会导致使用者同时崩溃。同样，RabbitMQ 代理本身会将其发送的所有消息缓存在内存中，直到收到使用者确认。如果没有为使用者配置自动确认，并且使用者需要相对较长的时间来处理消息，则较高的预提取值可能会导致 RabbitMQ 服务器内存不足。

考虑到上述因素，我们建议始终设置预提取值，以防止由于大量未处理或未确认的消息而导致 RabbitMQ 代理或其使用者出现内存不足的情况。如果您需要优化代理来处理大量消息，您可以使用一系列预提取计数来测试您的代理和使用者，以确定与使用者处理消息所需的时间相比，网络开销在哪个点上变得微不足道。

**注意**  
如果您的客户端应用程序已配置为自动确认将消息传递给使用者，则设置预提取值将不起作用。
所有预提取消息都会从队列中删除。

以下示例演示了如何使用 RabbitMQ Java 客户端库为单一使用者设置 `10` 的预提取值。

```
ConnectionFactory factory = new ConnectionFactory();

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.basicQos(10, false);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("my_queue", false, consumer);
```

**注意**  
在 RabbitMQ Java 客户端库中，`global` 标志的默认值设置为 `false`，所以上面的例子可以简单地写成 `channel.basicQos(10)`。

## 步骤 4：将 Celery 5.5 或更高版本与法定队列结合使用
<a name="use-celery-with-quorum-queues"></a>

 [Python Celery](https://docs.celeryq.dev/en/stable/index.html)（一个分布式任务队列系统）在高任务负载时可能生成大量非关键消息。这种额外的代理活动可能触发 [Amazon MQ for RabbitMQ：高内存警报](troubleshooting-action-required-codes-rabbitmq-memory-alarm.md) 并导致代理不可用。为降低触发内存警报的风险，请执行以下操作：

**对于所有 Celery 版本**

1. 关闭 [https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-task_create_missing_queues](https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-task_create_missing_queues) 以减轻队列抖动。

1.  然后关闭 `worker_enable_remote_control` 以停止动态创建 `celery@...pidbox` 队列。这将减少代理上的队列抖动。

   ```
   worker_enable_remote_control = false
   ```

1.  要进一步减少非关键消息活动，请在启动 Celery 应用程序时[worker-send-task-events](https://docs.celeryq.dev/en/stable/userguide/configuration.html#worker-send-task-events)通过不包含`-E`或`--task-events`标记来关闭 Celery。

1.  使用以下参数启动 Celery 应用程序：

   ```
   celery -A app_name worker --without-heartbeat --without-gossip --without-mingle
   ```

**对于 Celery 5.5 及以上版本**

1.  升级到 [Celery 5.5 版本](https://docs.celeryq.dev/en/latest/changelog.html#version-5-5-0)（支持法定队列的最低版本）或更高版本。要检查您使用的 Celery 版本，请使用 `celery --version`。有关法定队列的更多信息，请参阅 [RabbitMQ on Amazon MQ 的仲裁队列](quorum-queues.md)。

1.  升级到 Celery 5.5 或更高版本后，将 `task_default_queue_type` 配置为 ["quorum"](https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-task_default_queue_type)。

1.  然后，您还必须在[代理传输选项](https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-broker_transport_options)中开启发布确认：

   ```
   broker_transport_options = {"confirm_publish": True}
   ```

# Amazon MQ for RabbitMQ 网络弹性与监控最佳实践
<a name="best-practices-network-resilience"></a>

 网络弹性和监控代理指标对于维护可靠的消息应用程序至关重要。完成以下最佳实践以实施自动恢复机制和资源监控策略。

## 步骤 1：自动从网络故障中恢复
<a name="automatically-recover-from-network-failures"></a>

我们建议始终启用自动网络恢复，以防止在客户端连接到 RabbitMQ 节点失败的情况下出现严重停机。自版本 `4.0.0` 起，RabbitMQ Java 客户端库默认支持自动网络恢复。

[如果在连接 I/O 循环中抛出未处理的异常、检测到套接字读取操作超时或服务器错过心跳，则会触发自动连接恢复。](https://www.rabbitmq.com/heartbeats.html)

如果客户端和 RabbitMQ 节点之间的初始连接失败，将不会触发自动恢复。我们建议您编写应用程序代码，以便通过重试连接来解决初始连接失败的问题。以下示例演示了如何使用 RabbitMQ Java 客户端库来重试初始网络故障。

```
ConnectionFactory factory = new ConnectionFactory();
// enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0.
factory.setAutomaticRecoveryEnabled(true);
// configure various connection settings

try {
  Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {
  Thread.sleep(5000);
  // apply retry logic
}
```

**注意**  
如果应用程序使用 `Connection.Close` 方法关闭连接，则不会启用或触发自动网络恢复。

## 步骤 2：监控代理指标和警报
<a name="monitor-metrics-alarms"></a>

 我们建议您定期监控您的 Amazon MQ for RabbitMQ 代理的[CloudWatch 指标](amazon-mq-accessing-metrics.md)和警报，以便在潜在问题影响您的消息传递应用程序之前识别和解决这些问题。主动监控对于维护弹性消息应用程序和确保最佳性能至关重要。

 适用于 RabbitMQ 的 Amazon MQ 向其发布指标 CloudWatch ，这些指标提供了对代理性能、资源利用率和消息流的见解。要监控的关键指标包括内存使用率和磁盘使用率。您可以设置[CloudWatch 警报](https://docs.aws.amazon.com/Ihttps://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Alarm-On-Metrics.html)，以了解您的代理何时接近资源限制或出现性能下降的情况。

监控以下基本指标：

**`RabbitMQMemUsed` 和 `RabbitMQMemLimit`**  
监控内存使用率以防止可能阻塞消息发布的内存警报。

**`RabbitMQDiskFree` 和 `RabbitMQDiskFreeLimit`**  
监控磁盘使用率以避免可能导致代理故障的磁盘空间问题。

 对于集群部署，还要监控[节点特定指标](rabbitmq-logging-monitoring.md#security-logging-monitoring-cloudwatch-destination-metrics-rabbitmq)以识别节点特定问题。

**注意**  
有关如何防止高内存警报的更多信息，请参阅[处理和防止高内存警报](troubleshooting-action-required-codes-rabbitmq-memory-alarm.md#address-prevent-high-memory-alarm)。