什么是事务消息?
事务消息,可以被认为是一种基于两阶段提交理论的消息实现,用来确保分布式系统中的最终一致性。事务性消息保证了本地事务的执行和消息的发送可以原子性地执行。
使用限制
(1) 不支持定时消息和批量消息。
(2) 为了避免单个消息被检查次数过多,导致半队列消息累积,我们将
单个消息的检查次数默认限制为15次,但是用户可以通过更改broker配置中的“transactionCheckMax”参数,来更改此限制
。如果一条消息检查了“TransactionCheckMax”次,那么默认情况下,broker将放弃此消息,并同时打印错误日志。用户可以通过重写“AbstractTransactionCheckListener”类来更改此行为。
(3) 事务性消息,支持超时时间设置
。它将在broker的配置参数“transactionTimeout”指定的一段时间后,进行检查。用户也可以在发送事务性消息时,通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来更改此限制。此参数优先于“TransactionMsgtimeout”参数。
(4) 事务性消息可能被检查或使用多次。
(5) 将已提交的消息重新发送到用户的目标主题上,这可以会失败。目前,这取决于日志记录。Rocketmq本身的高可用性机制保证了高可用性。
如果要确保事务性消息不会丢失,并且事务完整性得到保证,建议使用同步的double write机制。
(6) 事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,
事务性消息允许反向查询,即MQ服务器可以根据生产者ID来查询生产者实例。
应用
1、 事务状态
事务性消息有三种状态:
(1) TransactionStatus.CommitTransaction: 提交事务,这意味着允许消费者消费此消息。
(2) TransactionStatus.RollbackTransaction: 回滚事务,意味着消息将被删除,且不允许消费。
(3) TransactionStatus.Unknown: 中间状态,这意味着MQ需要再次检查以确定状态。
2、发送事务消息
(1)创建事务性生产者
使用TransactionMQProducer类创建Producer客户端,并指定唯一的ProducerGroup,您可以设置自定义线程池来处理检查请求。
在执行本地事务之后,您需要根据执行结果回复MQ服务器,回复的事务状态在上述部分中描述。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt; import java.util.List;
public class TransactionProducer { public static void main(String[] args)
throws MQClientException, InterruptedException { TransactionListener
transactionListener = new TransactionListenerImpl();
//创建TransactionMQProducer事务性生产者实例 TransactionMQProducer producer = new
TransactionMQProducer("please_rename_unique_group_name"); ExecutorService
executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new
ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public
Thread newThread(Runnable r) { Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread"); return thread; } });
//设置自定义线程池,来处理检查请求 producer.setExecutorService(executorService); //设置事务监听器
producer.setTransactionListener(transactionListener); producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int
i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i %
tags.length], "KEY" + i, ("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//调用sendMessageInTransaction方法,来发送事务性消息 SendResult sendResult =
producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n",
sendResult); Thread.sleep(10); } catch (MQClientException |
UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i <
100000; i++) { Thread.sleep(1000); } producer.shutdown(); } } ```
(2)实现TransactionListener接口
“executeLocalTransaction”方法用于在发送half消息成功时,执行本地事务。它返回上一节中提到的三个事务状态之一。
“check local transaction”方法用于检查本地事务状态,并响应MQ检查请求。它也返回上一节中提到的三个事务状态之一。
import ... public class TransactionListenerImpl implements
TransactionListener { private AtomicInteger transactionIndex = new
AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new
ConcurrentHashMap<>(); @Override public LocalTransactionState
executeLocalTransaction(Message msg, Object arg) { int value =
transactionIndex.getAndIncrement(); int status = value % 3;
localTrans.put(msg.getTransactionId(), status); return
LocalTransactionState.UNKNOW; } @Override public LocalTransactionState
checkLocalTransaction(MessageExt msg) { Integer status =
localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) {
case 0: return LocalTransactionState.UNKNOW; case 1: return
LocalTransactionState.COMMIT_MESSAGE; case 2: return
LocalTransactionState.ROLLBACK_MESSAGE; } } return
LocalTransactionState.COMMIT_MESSAGE; } } ```