<>事务消息源码学习
<>TransactionMQProducer发送事务消息
事务消息发送时,需要打上相应的标记,与普通消息进行区分
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED,
"true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
给broker发送消息后,根据返回状态,进行相应处理
*
SEND_OK:LocalTransactionExecuter或者TransactionListener执行本地事务
并返回本地事务完成状态,包括UNKNOW、ROLLBACK、COMMIT
*
FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE:
都是消息发送失败状态, 标记本地事务状态为ROLLBACK_MESSAGE
之后通过endTransaction将相应本地事务执行状态信息回传给broker.注意发送消息的方式为one way
<>broker端处理事务消息
TransactionMessageBridge,负责主要的事务消息存储逻辑。
事务消息的topic均设置为TransactionalMessageUtil.buildHalfTopic(),即
RMQ_SYS_TRANS_HALF_TOPIC.queueId设置为0
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner)); } private
MessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInner msgInner) {
//隐藏真实的topic和queueId MessageAccessor.putProperty(msgInner, MessageConst.
PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner,
MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),
MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(
TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.
setPropertiesString(MessageDecoder.messageProperties2String(msgInner.
getProperties())); return msgInner; }
EndTransactionProcessor的processRequest方法,处理producer端回传的事务状态
如果事务状态是commit,将消息还原成原来的topic和queueId,存储到commitLog
中,并且删除预处理消息(prepare),其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC
的主题中,代表这些消息已经被处理(提交或回滚)
如果事务状态是rollbackMessage,删除掉prepare消息,同样也是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC
的主题中,代表这些消息已经被处理
<>half消息队列和op消息队列
half消息消费队列: prepare消息消费队列,事务消息首先进入此消息消费队列
op消息消费队列:事务消息处理完成后,进入op消息消费队列,op消息消费队列主要用来记录事务消息完成状态
<>定时任务回查
如果第一次producer返回的事务消息为UNKNOW,则需要进行事务回查
事务回查,broker端主要逻辑在TransactionalMessageService的check方法
prepare消息,会存储在RMQ_SYS_TRANS_HALF_TOPIC消息队列中
prepare消息,被处理后,会存储在RMQ_SYS_TRANS_OP_HALF_TOPIC消息队列中
所以通过回查prepare消息队列,可以对一些失败的事务消息,进行重试。
为了充分利用commitLog顺序写的特性,
回查时,只要发送了回查消息,pepare
消息消费队列消费进度会往前推动,同时往prepare消息队列写入一条新的消息,如果回查失败,新增的消息可以再次发送回查消息。如果回查成功,可以根据op消息队列中的消息,判断重复,避免重复发送回查消息。
producer端事务回查处理逻辑主要在TransactionListener的checkLocalTransaction方法,一般重写
checkLocalTransaction方法,实现自定义的回查逻辑。