本文最后更新于:2024年4月16日 下午
消息分发策略
默认的消息分发策略是轮询,参照上面的消息应答可以看出会有一定问题——消息处理慢的 AckConsumer1
和消息处理快的 AckConsumer2
分配了同等数量的消息,导致2早就结束空闲了,而1还有好几条消息没开始处理!因此看似公平的轮询分发其实是并不公平的
因此需要引入公平分发策略(Fair Dispatch)——在消费者中引入设置了 prefetchCount=1
参数的 basicQos
方法,它告诉了rabbitmq不要一次向该消费者传递过多消息
尚硅谷和评论区的两份笔记,都将这个叫做不公平分发,但是我在官方tutorial上看到的介绍时Fair Dispath
通过fair dispatch
和手动应答来控制消费者每次处理的消息数
1 2 3 4 5 6
| int prefetchCount = 1; channel.basicQos(prefetchCount);
boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
|
参数prefecthCount
表示该消费者能积压在信道上的预处理消息数最大值:
- 消费者通过
basicQos
方法设置了预取值后,对应的信道上最多只能积压prefetchCount
条消息
- 达到对应数量后,RabbitMQ就不会向该信道传递消息
- 如果所有队列都达到积压上限,消息就会积压在队列中撑满队列,这个时候就只能添加新的消费者或者改变存储策略
- 通常增加预取值可以提高向消费者传递消息的速度,但是无限制的自动应答或者过大值会导致消费者节点内存消耗过大,因此合理的预取值需要反复试验,通常100-300之间
消费端限流
消费者宕机过程中MQ上囤积大量消息,重启消费者服务后消息瞬间涌入,造成消息消费服务压力剧增,因此大流量下消息消费端需要进行限流设置
在非自动确认消息的前提下,如果一定数量的消息(基于Consumer和Channel设置QOS的值)没有被确认,将不进行消费新的消息
1、生产者使用线程池模拟大量消息的发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class LimitProducer {
public static final String QUEUE_NAME = "slowQueue"; public static final String EXCHANGE_NAME = "fastExchange";
public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "limit");
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); for (int i = 0; i < 2000; i++) { String msg = Thread.currentThread().getName()+"_"+i; try { channel.basicPublish(EXCHANGE_NAME, "limit", null, msg.getBytes()); } catch (IOException e) { e.printStackTrace(); } } threadPool.shutdown(); while (!threadPool.isTerminated()) {
} System.out.println("所有消息发送完成"); } }
|
2、消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class LimitConsumer {
public static final String QUEUE_NAME = "slowQueue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("收到的消息为:" + new String(delivery.getBody())); }; CancelCallback cancelCallback = cancel -> { System.out.println("取消消费"); };
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
|
可以看到设置了Qos
后消费10条消息就结束了,由于将应答注释了所以不会继续消费
放开应答的注释,可以看到持续消费,每次Unacked
的都是10条,速率也是每秒一条
发布确认
上面是通过持久化来保障在服务器崩溃时,消息不会丢失。但是生产者发布消息后,消息是否能正确到达Broker服务器呢?默认情况下消息发出后是不会有返回信息的,所以需要引入发布确认机制
消息被投递到匹配的队列后,Broker会返回一个确认信息给生产者,这个操作叫做消息确认发布,它有两种方式:
- 通过事务机制实现:设置 channel 为 transaction 模式,这是 AMQP协议层面提供的解决方案
- 通过发送方确认实现:设置 channel 为 confirm 模式,这是 RabbitMQ 提供的解决方案
事务机制
RabbitMQ 客户端中与事务机制有关的方法有三个
channel.txSelect
:用于将当前 channel 设置成 transaction 模式
channel.txCommit
:用于提交事务
channel.txRollback
:用于回滚事务
channel.txSelect
将当前信道开启为事务模式后,生产者就可以发布消息给Broker服务器了,如果channel.txCommit
提交成功了,则消息一定到达了broker了,如果在 channel.txCommit
执行之前 broker 异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过channel.txRollback
回滚事务。如图是正常提交事务的,对于使用回滚需要通过try/catch
捕获发生的异常,Publish后也不是正常的Commit,而是异常的Rollback
事务确实能够解决 producer 与 broker 之间消息确认的问题,只有消息成功被 broker 接收,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作,同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能。RabbitMQ提供了改进方案,即发送方确认(Confirm确认)
confirm模式
发布确认逻辑
生产者将信道设置成 confirm 模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中的delivery-tag
包含了确认消息的序列号,此外broker也可以设置basic.ack
的 multiple参数,表示到这个序号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认默认是没有开启的,通过在信道上设置开启
1 2
| channel.confirmSelect();
|
单个确认发布
单个发布确认是一种简单的发布确认方式:生产者发布一个消息之后只有等收到确认才会发送下一个,它是一种同步确认发布的方式。waitForConfirmsOrDie(long)
这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种方式最大的缺点是发布速度很慢,效率低,每秒只能不超过数百条数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void singleConfirm() throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect();
long start = System.currentTimeMillis(); for (int i = 0; i < MSG_COUNT; i++) { String msg = "消息_" + i; System.out.println("生产了:" + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); boolean flag = channel.waitForConfirms(); if (flag) { System.out.println(msg + "已发送到队列中"); } } long end = System.currentTimeMillis(); System.out.println("发布了 " + MSG_COUNT + " 个单独确认消息,耗时:" + (end - start) + " ms"); }
|
可以看到每次都是waitForConfirms
之后才发送下一条:
批量确认发布
批量确认模式也是一种同步确认发布的方式,先发布一批消息再一起确认,这样可以提高吞吐量,不过如果如果出现问题不能定位到具体的消息上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public static void batchConfirm() throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect(); int batchSize = 100; int size4Confirm = 0;
long start = System.currentTimeMillis(); for (int i = 0; i < MSG_COUNT; i++) { String msg = "消息_" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); size4Confirm++; if (size4Confirm == batchSize) { channel.waitForConfirms(); size4Confirm = 0; System.out.println("批量确认,最新的消息是:" + msg); } } if (size4Confirm > 0) { System.out.println("处理剩余的未确认消息"); channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("发布了 " + MSG_COUNT + " 个批量确认消息,耗时:" + (end - start) + " ms"); }
|
可以看到每一百个消息进行一次确认,当前的100个确认之前不会发送下一批,所以每批消息的确认序号都是固定增加的:
异步确认发布
因为异步非阻塞的特性,异步确认的可靠性和效率都很高,它是通过回调函数来实现消息的可靠传递的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public static void asyConfirm1() throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect();
ConfirmCallback successCall = (deliveryTag, multiple) -> { System.out.println("确认的消息:" + deliveryTag); };
ConfirmCallback failedCall = (deliveryTag, multiple) -> { System.out.println("未能确认的消息:" + deliveryTag); };
channel.addConfirmListener(successCall, failedCall);
long start = System.currentTimeMillis(); for (int i = 0; i < MSG_COUNT; i++) { String msg = "消息_" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } long end = System.currentTimeMillis(); System.out.println("发布了 " + MSG_COUNT + " 个异步确认消息,耗时:" + (end - start) + " ms"); }
|
可以看到消息的确认是异步进行的,并没有特定的串行顺序:
如何处理未被确认的消息?
最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
- 通过一个并发队列,将所有要发送的消息加到队列中
- 在发布确认的回调函数中删除已确认的消息,剩下的就是未确认的消息了
- 在失败的回调函数中处理未确认的消息,比如重发或者打印出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public static void asyConfirm2() throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.confirmSelect();
ConcurrentSkipListMap<Long, Object> infoMap = new ConcurrentSkipListMap<>();
ConfirmCallback successCall = (deliveryTag, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, Object> confirmed = infoMap.headMap(deliveryTag); confirmed.clear(); } else { infoMap.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag); };
ConfirmCallback failedCall = (deliveryTag, multiple) -> { String info = String.valueOf(infoMap.get(deliveryTag)); System.out.println("未能确认的消息:" + deliveryTag + " ,消息是:" + info); };
channel.addConfirmListener(successCall, failedCall);
long start = System.currentTimeMillis(); for (int i = 0; i < MSG_COUNT; i++) { String msg = "消息_" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); infoMap.putIfAbsent(channel.getNextPublishSeqNo(), msg); } long end = System.currentTimeMillis(); System.out.println("发布了 " + MSG_COUNT + " 个异步确认消息,耗时:" + (end - start) + " ms"); }
|