<>事务消息源码学习

<>TransactionMQProducer发送事务消息

事务消息发送时,需要打上相应的标记,与普通消息进行区分
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED,
"true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
给broker发送消息后,根据返回状态,进行相应处理

*
SEND_OK:LocalTransactionExecuter或者TransactionListener执行本地事务

并返回本地事务完成状态,包括UNKNOW、ROLLBACK、COMMIT

*
FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE:

都是消息发送失败状态, 标记本地事务状态为ROLLBACK_MESSAGE

之后通过endTransaction将相应本地事务执行状态信息回传给broker.注意发送消息的方式为one way

<>broker端处理事务消息

TransactionMessageBridge,负责主要的事务消息存储逻辑。

事务消息的topic均设置为TransactionalMessageUtil.buildHalfTopic(),即
RMQ_SYS_TRANS_HALF_TOPIC.queueId设置为0
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner)); } private
MessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInner msgInner) {
//隐藏真实的topic和queueId MessageAccessor.putProperty(msgInner, MessageConst.
PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner,
MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),
MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(
TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.
setPropertiesString(MessageDecoder.messageProperties2String(msgInner.
getProperties())); return msgInner; }
EndTransactionProcessor的processRequest方法,处理producer端回传的事务状态

如果事务状态是commit,将消息还原成原来的topic和queueId,存储到commitLog
中,并且删除预处理消息(prepare),其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC
的主题中,代表这些消息已经被处理(提交或回滚)

如果事务状态是rollbackMessage,删除掉prepare消息,同样也是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC
的主题中,代表这些消息已经被处理

<>half消息队列和op消息队列

half消息消费队列: prepare消息消费队列,事务消息首先进入此消息消费队列

op消息消费队列:事务消息处理完成后,进入op消息消费队列,op消息消费队列主要用来记录事务消息完成状态

<>定时任务回查

如果第一次producer返回的事务消息为UNKNOW,则需要进行事务回查

事务回查,broker端主要逻辑在TransactionalMessageService的check方法

prepare消息,会存储在RMQ_SYS_TRANS_HALF_TOPIC消息队列中

prepare消息,被处理后,会存储在RMQ_SYS_TRANS_OP_HALF_TOPIC消息队列中

所以通过回查prepare消息队列,可以对一些失败的事务消息,进行重试。

为了充分利用commitLog顺序写的特性,

回查时,只要发送了回查消息,pepare
消息消费队列消费进度会往前推动,同时往prepare消息队列写入一条新的消息,如果回查失败,新增的消息可以再次发送回查消息。如果回查成功,可以根据op消息队列中的消息,判断重复,避免重复发送回查消息。

producer端事务回查处理逻辑主要在TransactionListener的checkLocalTransaction方法,一般重写
checkLocalTransaction方法,实现自定义的回查逻辑。

技术
下载桌面版
GitHub
Microsoft Store
SourceForge
Gitee
百度网盘(提取码:draw)
云服务器优惠
华为云优惠券
京东云优惠券
腾讯云优惠券
阿里云优惠券
Vultr优惠券
站点信息
问题反馈
邮箱:[email protected]
吐槽一下
QQ群:766591547
关注微信