<>RabbitMQ保证消息的一致性

<>一、采用confirm消息确认机制及return返回机制 确保消息发送成功

<>二、将队列以及消息设置持久化 保证rabbitmq突然宕机消息仍然存在

<>三、手动确认接收消息方式 消息处理失败拒收重回队列

<>1. yml配置
spring: rabbitmq: host: 10.134.22.232 port: 5672 username: guest password:
guest ##消息发送确认回调 publisher-confirms: true #采用confirm以及return机制 发送返回监听回调
publisher-confirm-type: correlated publisher-returns: true listener: type:
simple simple: #手动接收消息方式 acknowledge-mode: manual
<>2. RabbitMQ配置类
@Configuration @Slf4j @AllArgsConstructor public class RabbitmqConfig { private
final ConnectionFactory connectionFactory; private final RabbitLogsMapper
rabbitLogsMapper; @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate
rabbitTemplate= new RabbitTemplate(connectionFactory); //confirm确认
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { String msgId
= correlationData.getId(); if (ack) { //发送成功 log.info("消息成功发送 , msgId: {}," ,
msgId); //状态更新 消息发送成功 BiddingRabbitLogs biddingRabbitLogs = new
BiddingRabbitLogs(); biddingRabbitLogs.setStatus(SendStatus.SEND_SUCCESS.
getValue()); rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(
BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(
BiddingRabbitLogs::getStatus,"4")); } else { //发送失败 log.error("消息发送失败, {},
cause: {}, msgId: {}", correlationData, cause, msgId); //状态更新 消息发送失败
BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs(); biddingRabbitLogs
.setStatus(SendStatus.SEND_FAILD.getValue()); rabbitLogsMapper.update(
biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(
BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4")); } });
rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message,
replyCode,replyText,exchange,routingKey) -> { //触发回调 只有交换机找不到队列时才会触发 log.error(
"消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {},
message: {}", exchange, routingKey, replyCode, replyText, message); //状态更新
消息发送失败 String msgId = (String) message.getMessageProperties().getHeaders().get(
"spring_returned_message_correlation"); BiddingRabbitLogs biddingRabbitLogs =
new BiddingRabbitLogs(); biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.
getValue()); rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(
BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(
BiddingRabbitLogs::getStatus,"4")); }); return rabbitTemplate; } @Bean public
RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate){ RabbitAdmin rabbitAdmin
= new RabbitAdmin(rabbitTemplate); rabbitAdmin.setAutoStartup(true); return
rabbitAdmin; } }
说明:

* confirm机制只是确保了消息是否成功发送到交换机
* Return机制确保了消息是否从交换机发送到指定的队列
- - ConfirmCallback则根据状态判断发送成功还是失败 进行更新日志表记录状态
* ReturnCallback则根据收到消息就是未找到队列发送失败,未收到消息就是发送成功 进行更新日志表记录状态
<>3. 声明的队列一定要将队列持久化
public String createQueue(String queueName) { BiddingQueueConfig
biddingQueueConfig= queueMapper.selectOne(Wrappers.lambdaQuery(
BiddingQueueConfig.class).eq(BiddingQueueConfig::getQueue, queueName)); if (
biddingQueueConfig== null) { biddingQueueConfig = new BiddingQueueConfig();
biddingQueueConfig.setCreatetime(new Date()); biddingQueueConfig.setQueue(
queueName); biddingQueueConfig.setStatus("1"); int insert = queueMapper.insert(
biddingQueueConfig); //将队列持久化 rabbitAdmin.declareQueue(new Queue(queueName,true)
); return queueName + "队列创建成功"; } return queueName + "队列创建失败"; }
<>4. 发送消息 将发送的消息设置为持久化

发送消息前首先将发送的数据插入数据库,状态变为发送中

<>5. 消费者监听队列

* 如果根据消息id查询日志表为空的话那么是没有发送消息,消息自动接收,发送成功消息后日志表会有数据
* 判断是否重复消费 根据状态是否成功消费以及失败重试次数判断
* 处理业务逻辑,如果成功消息接收 状态更新
* 如果处理业务逻辑失败报错则会拒收,消息重回队列重新处理此条消息,当处理次数超过3次处理失败则消息改为接收 // 启动自动创建队列
@RabbitListener(queuesToDeclare = { @Queue("queue_work_dontask") })
@RabbitHandler @SneakyThrows public void receiveDonTask(String data, Message
message, Channel channel){ //消息id String msgId = (String) message.
getMessageProperties().getHeaders().get("spring_returned_message_correlation");
//根据消息id查询BiddingRabbitLogs日志表 BiddingRabbitLogs biddingRabbitLogs =
remoteLogsService.get(msgId, SecurityConstants.FROM_IN).getData(); if (
biddingRabbitLogs== null) { log.error("消息ID查询 biddingRabbitLogs:null"); channel.
basicAck(message.getMessageProperties().getDeliveryTag(),false); return; }
//状态:1.消息发送中 2.消息发送成功 3.消息发送失败 4.消费成功 5.消费失败 if (SendStatus.CONSUME_SUCCESS.
getValue().equals(biddingRabbitLogs.getStatus()) || SendStatus.SEND_FAILD.
getValue() == String.valueOf(biddingRabbitLogs.getTryTimes())) { //重复消费 log.info
("消息ID:{},重复消费",msgId); channel.basicAck(message.getMessageProperties().
getDeliveryTag(),false); return; } try { //处理业务逻辑 Map map = JSON.parseObject(
data, Map.class); String dataString = (String) map.get("data"); String username
= (String) map.get("username"); Integer tenantId = (Integer) map.get("tenantId")
; ApproveParam approveParam = JSON.parseObject(dataString, ApproveParam.class);
R<String> stringR = doneTask(approveParam,username,tenantId); //处理成功 更新状态
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
biddingRabbitLogs.setStatus(SendStatus.CONSUME_SUCCESS.getValue());
biddingRabbitLogs.setSuccesstime(new Date()); remoteLogsService.updateById(
biddingRabbitLogs,SecurityConstants.FROM_IN); log.info("消费成功,消息ID:{}",msgId); }
catch (Exception e) { e.printStackTrace(); if (biddingRabbitLogs.getTryTimes()
>= Integer.parseInt(SendStatus.TRY_TIMES.getValue())) { //多次消费不成功 自动接收 channel.
basicAck(message.getMessageProperties().getDeliveryTag(),false); log.error(
"多次消费失败,消息ID:{}",msgId); } else { //消费失败 拒收 重回队列 channel.basicNack(message.
getMessageProperties().getDeliveryTag(),false,true); log.error("消费失败,消息ID:{}",
msgId); } biddingRabbitLogs.setStatus(SendStatus.CONSUME_FAILD.getValue());
biddingRabbitLogs.setTryTimes(biddingRabbitLogs.getTryTimes()+1);
remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN); } }

技术
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信