<>1业务背景
最近在做新老系统的mysql数据同步,由于新系统切换为微服务,数据结构设计不同,使用RocketMQ进行异步同步数据。
老系统已经有个老版本的MQ集群,所以在微服务的系统中,需要配置两个MQ集群,一个为新系统服务,另外一个专门为同步数据到老系统服务。
<>2代码
这里仅仅按照官方文档示例设置了生产组名和namesrv集群地址;
新系统如下使用MQ:
private DefaultMQProducer producer=new DefaultMQProducer("accountProducer");
producer.setNamesrvAddr("新系统集群namesrv地址"); producer.start(); String msgStr =
"新系统业务消息内容"; Message msg = new Message(topicName, key, "", msgStr.getBytes());
producer.sendMsg(msg);
老系统如下使用MQ:
private DefaultMQProducer producer=new
DefaultMQProducer("asyncToOldSysProducer");
producer.setNamesrvAddr("老系统集群namesrv地址"); producer.start(); String msgStr =
"需要同步到老系统的消息内容"; Message msg = new Message(topicName, key, "",
msgStr.getBytes()); producer.sendMsg(msg);
<>3接口测试
通过控制台发现后来每次应该发到老系统MQ的消息,都发到新系统MQ集群里了,奇怪了半天,检查老系统的namesrv地址没有配错啊,随后发现是DefaultMQProducer每次都会去用默认的那个MQClient实例,
应该在新系统使用producer实例时,代码里加上这么一段显式设置,才可以将消息发到老系统MQ集群。
producer.setInstanceName("AsyncToOldSysProducer");
这样如愿达成效果,数据同步到老系统。
<>4多个消费者实例
多个消费者实例也需要显式设置,具体如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"SyncUserToNewConsumer"); //多个注册中心时,需要设置instanceName consumer.setInstanceName(
"SyncUserToNewConsumer"); consumer.setNamesrvAddr(namesrvaddr);