[{"createTime":1735734952000,"id":1,"img":"hwy_ms_500_252.jpeg","link":"https://activity.huaweicloud.com/cps.html?fromacct=261f35b6-af54-4511-a2ca-910fa15905d1&utm_source=V1g3MDY4NTY=&utm_medium=cps&utm_campaign=201905","name":"华为云秒杀","status":9,"txt":"华为云38元秒杀","type":1,"updateTime":1735747411000,"userId":3},{"createTime":1736173885000,"id":2,"img":"txy_480_300.png","link":"https://cloud.tencent.com/act/cps/redirect?redirect=1077&cps_key=edb15096bfff75effaaa8c8bb66138bd&from=console","name":"腾讯云秒杀","status":9,"txt":"腾讯云限量秒杀","type":1,"updateTime":1736173885000,"userId":3},{"createTime":1736177492000,"id":3,"img":"aly_251_140.png","link":"https://www.aliyun.com/minisite/goods?userCode=pwp8kmv3","memo":"","name":"阿里云","status":9,"txt":"阿里云2折起","type":1,"updateTime":1736177492000,"userId":3},{"createTime":1735660800000,"id":4,"img":"vultr_560_300.png","link":"https://www.vultr.com/?ref=9603742-8H","name":"Vultr","status":9,"txt":"Vultr送$100","type":1,"updateTime":1735660800000,"userId":3},{"createTime":1735660800000,"id":5,"img":"jdy_663_320.jpg","link":"https://3.cn/2ay1-e5t","name":"京东云","status":9,"txt":"京东云特惠专区","type":1,"updateTime":1735660800000,"userId":3},{"createTime":1735660800000,"id":6,"img":"new_ads.png","link":"https://www.iodraw.com/ads","name":"发布广告","status":9,"txt":"发布广告","type":1,"updateTime":1735660800000,"userId":3},{"createTime":1735660800000,"id":7,"img":"yun_910_50.png","link":"https://activity.huaweicloud.com/discount_area_v5/index.html?fromacct=261f35b6-af54-4511-a2ca-910fa15905d1&utm_source=aXhpYW95YW5nOA===&utm_medium=cps&utm_campaign=201905","name":"底部","status":9,"txt":"高性能云服务器2折起","type":2,"updateTime":1735660800000,"userId":3}]
<>1业务背景
最近在做新老系统的mysql数据同步,由于新系统切换为微服务,数据结构设计不同,使用RocketMQ进行异步同步数据。
老系统已经有个老版本的MQ集群,所以在微服务的系统中,需要配置两个MQ集群,一个为新系统服务,另外一个专门为同步数据到老系统服务。
<>2代码
这里仅仅按照官方文档示例设置了生产组名和namesrv集群地址;
新系统如下使用MQ:
private DefaultMQProducer producer=new DefaultMQProducer("accountProducer");
producer.setNamesrvAddr("新系统集群namesrv地址"); producer.start(); String msgStr =
"新系统业务消息内容"; Message msg = new Message(topicName, key, "", msgStr.getBytes());
producer.sendMsg(msg);
老系统如下使用MQ:
private DefaultMQProducer producer=new
DefaultMQProducer("asyncToOldSysProducer");
producer.setNamesrvAddr("老系统集群namesrv地址"); producer.start(); String msgStr =
"需要同步到老系统的消息内容"; Message msg = new Message(topicName, key, "",
msgStr.getBytes()); producer.sendMsg(msg);
<>3接口测试
通过控制台发现后来每次应该发到老系统MQ的消息,都发到新系统MQ集群里了,奇怪了半天,检查老系统的namesrv地址没有配错啊,随后发现是DefaultMQProducer每次都会去用默认的那个MQClient实例,
应该在新系统使用producer实例时,代码里加上这么一段显式设置,才可以将消息发到老系统MQ集群。
producer.setInstanceName("AsyncToOldSysProducer");
这样如愿达成效果,数据同步到老系统。
<>4多个消费者实例
多个消费者实例也需要显式设置,具体如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"SyncUserToNewConsumer"); //多个注册中心时,需要设置instanceName consumer.setInstanceName(
"SyncUserToNewConsumer"); consumer.setNamesrvAddr(namesrvaddr);