RabbitMQ实现消息投递可靠性
rabbitmq的消息可靠性投递提现在两个方面,分别是生产者端和消费者端的可靠性控制
1.生产者端
生产端可靠性一般通过confirm消息确认和Return消息机制
1.1 confirm
当生产者发送消息后,消息到达broker后就会进行confim回调,在回到中根据投递标签(Tag)进行消息的唯一确定。根据ack结果分为两种
- true 标识消息正常投递,被broker接受
- false 消息为正常投递 (可能因为内存、磁盘等原因导致)
1.2 Return
当消息未找到exchange或routingkey不正确消息最终路由错误,这两种情况都会导致消息不可达,最终执行return回调 需要开启 spring.rabbitmq.template.mandatory=true
2.消费者端
消费端的ack是控制消息是否从broker进行正常消费,可以进行三种确认操作
其中basicReject
、 basicNack
的区别参考:https://blog.csdn.net/fly_leopard/article/details/102821776
注意:要设置 关闭自动ack模式 改为手动MANUAL
一、生产端可靠性解决方案
消息落库,对消息状态进行打标
实现本地消息表,对消息的状态进行标记,更改,定期抓取非正常状态的消息进行重新投递或补偿

- 1.业务数据和消息数据同时写入数据库,此时消息状态为0标识投递中
1 2 3 4 5
| public static final String ORDER_SENDING = "0"; public static final String ORDER_SEND_SUCCESS = "1";
public static final String ORDER_SEND_FAILURE = "2";
|
- 2.上一步确保成功后,生产端发送消息到broker
- 3.broker通过confirm机制,回调confirm方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData: " + correlationData); String messageId = correlationData.getId(); if(ack){ brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date()); } else { System.err.println("异常处理..."); } } };
|
其中brokerMessageLogMapper.changeBrokerMessageLogStatus
就是更改消息状态为投递成功
1 2 3 4 5 6
| <update id="changeBrokerMessageLogStatus" > update broker_message_log bml set bml.status = #{status,jdbcType=VARCHAR}, bml.update_time = #{updateTime, jdbcType=TIMESTAMP} where bml.message_id = #{messageId,jdbcType=VARCHAR} </update>
|
- 4.上一步如果成功,更改消息状态为1 代表消息投递成功,如果失败可以进行重试
- 5.通过定时任务抓取消息状态为0的消息,并且发送时间至少为5分钟以前的(防止新消息发送中导致误判)消息,进行重新发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Scheduled(initialDelay = 3000, fixedDelay = 10000) public void reSend(){ System.err.println("---------------定时任务开始---------------"); List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage(); list.forEach(messageLog -> { if(messageLog.getTryCount() >= 3){ brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constants.ORDER_SEND_FAILURE, new Date()); } else { brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(), new Date()); Order reSendOrder = FastJsonConvertUtil.convertJSONToObject(messageLog.getMessage(), Order.class); try { rabbitOrderSender.sendOrder(reSendOrder); } catch (Exception e) { e.printStackTrace(); System.err.println("-----------异常处理-----------"); } } }); }
|
1 2 3 4 5 6 7 8
| brokerMessageLogMapper.update4ReSend()
<update id="update4ReSend" > update broker_message_log bml set bml.try_count = bml.try_count + 1, bml.update_time = #{updateTime, jdbcType=TIMESTAMP} where bml.message_id = #{messageId,jdbcType=VARCHAR} </update>
|
- 判断重试次数大于上限时比如3次 更改消息状态为2 标识消息投递失败(这个一般就是机器或程序不可抗因素,需要人工补偿了)
1
| brokerMessageLogMapper.changeBrokerMessageLogStatus()方法就是将消息状态改为投递失败,最终进行补偿
|
另外可以利用return机制进行不可达消息的追踪,具体规则根据业务而定
二、消费端可靠性解决方案
- 由于业务异常,可以进行日志记录,然后进行补偿
- 由于服务器宕机等严重问题,那么就需要手动ack保证消费成功(一般都是手动ack)
- 可以根据业务设置消息是否重回队列
1 2 3 4 5 6 7
| @RabbitHandler public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception { Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); }
|