<>一、RocketMQ介绍

<>1.1. 简介

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
能够保证严格的消息顺序
提供丰富的消息拉取模式
高效的订阅者水平扩展能力
实时的消息订阅机制
亿级消息堆积能力

选用理由:

* 强调集群无单点,可扩展,任意一点高可用,水平可扩展。
* 海量消息堆积能力,消息堆积后,写入低延迟。
* 支持上万个队列
* 消息失败重试机制
* 消息可查询
* 开源社区活跃
* 成熟度(经过双十一考验)
<>1.2. 关键概念

<>1.2.1. 主题与标签

主题 Tpoic:第一级消息类型,书的标题
标签 Tags:第二级消息类型,书的目录,可以基于 Tag 做消息过滤
例如:
主题:
订单交易
标签:
订单交易-创建 订单交易-付款 订单交易-完成
<>1.2.2. 发送与订阅群组

生产组:用于消息的发送。
消费组:用于消息的订阅处理。

生产组和消费组,方便扩缩机器,增减处理能力,集群组的名字,用于标记用途中的一员。每次只会随机的发给每个集群中的一员。

<>二、RocketMQ集群方式

推荐的几种 Broker 集群部署方式,这里的Slave 不可写,但可读,类似于 Mysql 主备方式。

<>2.1. 单个 Master

这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。

<>2.2. 多 Master 模式

    一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
    优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10
磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
    缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。
### 先启动 NameServer
### 在机器 A,启动第一个 Master
### 在机器 B,启动第二个 Master

<>2.3. 多 Master 多 Slave 模式,异步复制

    每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。
    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave
消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
    缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。
### 先启动 NameServer
### 在机器 A,启动第一个 Master
### 在机器 B,启动第二个 Master
### 在机器 C,启动第一个 Slave
### 在机器 D,启动第二个 Slave

<>2.4. 多 Master 多 Slave 模式,同步双写

    每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。
    优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
    缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
### 先启动 NameServer
### 在机器 A,启动第一个 Master
### 在机器 B,启动第二个 Master
### 在机器 C,启动第一个 Slave
### 在机器 D,启动第二个 Slave
以上 Broker 与 Slave 配对是通过指定相同的brokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的
BrokerId 必须是大与 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave通过指定不同的
BrokerId 来区分。

<>三、RocketMQ部署【Master-Slave方式】

<>3.1. 服务器环境

序号 IP 用户名 密码 角色 模式
1 192.168.11.128 root *** nameServer1,brokerServer1 Master1
2 192.168.11.129 root *** nameServer2,brokerServer2 Master2
<>3.2. Hosts添加信息

IP NAME
192.168.11.128 rocketmq-nameserver1
192.168.11.128 rocketmq-master1
192.168.11.129 rocketmq-nameserver2
192.168.11.129 rocketmq-master1-slave vi /etc/hosts
<>3.3. 上传解压【两台机器】
# 上传apache-rocketmq.tar.gz文件至/usr/local tar -zxvf apache-rocketmq.tar.gz -C
/usr/localln -s apache-rocketmq rocketmq ll /usr/local
<>3.4. 创建存储路径【两台机器】
mkdir /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store/commitlog mkdir
/usr/local/rocketmq/store/consumequeuemkdir /usr/local/rocketmq/store/index
<>3.5. RocketMQ配置文件【两台机器】
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties vim
/usr/local/rocketmq/conf/2m-2s-async /broker-a-s.properties #所属集群名字
brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=
broker-a|broker-b #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker
自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker
自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口
listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时
fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000
#检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=
/usr/local/rocketmq/store#commitLog 存储路径 storePathCommitLog=
/usr/local/rocketmq/store/commitlog#消费队列存储路径存储路径 storePathConsumeQueue=
/usr/local/rocketmq/store/consumequeue#消息索引存储路径 storePathIndex=
/usr/local/rocketmq/store/index#checkpoint 文件存储路径 storeCheckpoint=
/usr/local/rocketmq/store/checkpoint#abort 文件存储路径 abortFile=
/usr/local/rocketmq/store/abort#限制的消息大小 maxMessageSize=65536
#flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000
#Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量
#sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
<>3.6. 修改日志配置文件【两台机器】
mkdir -p /usr/local/rocketmq/logs cd /usr/local/rocketmq/conf && sed -i 's#
${user.home}#/usr/local/rocketmq#g' *.xml
<>3.7. 修改启动脚本参数【两台机器】
vim /usr/local/rocketmq/bin/runbroker.sh
#==============================================================================
# 开发环境JVM Configuration
#==============================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m
-XX:MaxPermSize=320m" vim /usr/local/rocketmq/bin/runserver.sh JAVA_OPT="
${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m
-XX:MaxPermSize=320m"
<>3.8. 启动NameServer【两台机器】
cd /usr/local/rocketmq/bin nohup sh mqnamesrv &
<>3.9. 启动BrokerServer A【192.168.11.128】
cd /usr/local/rocketmq/bin nohup sh mqbroker -c
/usr/local/rocketmq/conf/2m-noslave/broker-a.properties>/dev/null 2>&1 & netstat
-ntlp jpstail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log tail
-f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
<>3.10. 启动BrokerServer B【192.168.11.129】
cd /usr/local/rocketmq/bin nohup sh mqbroker -c
/usr/local/rocketmq/conf/2m-noslave/broker-a-s.properties>/dev/null 2>&1 &
netstat -ntlp jps tail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
<>3.11. RocketMQ Console

在tomcat中部署rocketmq-console.war

<>3.12. 数据清理
cd /usr/local/rocketmq/bin sh mqshutdown broker sh mqshutdown namesrv --等待停止 rm
-rf /usr/local/rocketmq/storemkdir /usr/local/rocketmq/store mkdir
/usr/local/rocketmq/store/commitlogmkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index --按照上面步骤重启NameServer与BrokerServer
<>四、使用注意事项

<>4.1. 消费重试机制

第几次重试 每次重试间隔时间
1 10秒
2 30秒
3 1 分钟
4 2 分钟
5 3 分钟
6 4 分钟
7 5 分钟
8 6 分钟
9 7 分钟
10 8 分钟
11 9 分钟
12 10 分钟
13 20 分钟
14 30 分钟
15 1小时
16 2小时
<>4.2. 消费端做幂等处理

RocketMQ 无法避免消息重复,如果业务对消费重复非常敏感,务必要在业务局面去重,有以下几种去重方式:

* 将消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 Id 等。 建议最好使用消息内容中的唯一标识字段去重。
* 使用业务局面的状态机去重 。RocketMQ 的 Consumer 都是从 Broker 拉消息来消费,但是为了能做到实时收消息, RocketMQ
使用长轮询方式,可以保证消息实时性同Push 方式一致。’
<>五、多主多从模式

* 多主多从模式分为俩种方式,第一种为异步复制,第二种为同步双写,我们暂且考虑其中的一种情况,注意:*(RocketMQ
每一种集群环境配置会对应一个不同的目录)
双主模式,文件夹配置为: conf/2m-noslave/
多主多从模式(异步复制),文件夹配置为: conf/2m-2s-async/
多主多从模式(同步双写),文件夹配置为: conf/2m-2s-sync/
* 集群规划如下:【四台机器】
IP NAME
192.168.11.121 rocketmq-nameserver1、rocketmq-master1
192.168.11.122 rocketmq-nameserver2、rocketmq-master2
192.168.11.123 rocketmq-nameserver3、rocketmq-master1-slave
192.168.11.124 rocketmq-nameserver4、rocketmq-master2-slave
* 添加hosts信息 vi /etc/hosts
* 解压上传【113、114俩台机器】 上传alibaba-rocketmq-3.2.6.tar.gz文件至/usr/local tar -zxvf
alibaba-rocketmq-3.2.6.tar.gz -C /usr/localmv alibaba-rocketmq
alibaba-rocketmq-3.2.6ln -s alibaba-rocketmq-3.2.6 rocketmq ll /usr/local
* 创建存储目录【113、114俩台机器】 mkdir /usr/local/rocketmq/store mkdir
/usr/local/rocketmq/store/commitlogmkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
* RocketMQ配置文件【四台机器】
vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties vim
/usr/local/rocketmq/conf/2m-2s-async/broker-b.properties vim
/usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties vim
/usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties
broker-a.properties、broker-b.properties配置如下:
#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b #0 表示 Master,>0 表示 Slave brokerId=0
#nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;
rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker
自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker
自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口
listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时
fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000
#检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=
/usr/local/rocketmq/store#commitLog 存储路径 storePathCommitLog=
/usr/local/rocketmq/store/commitlog#消费队列存储路径存储路径 storePathConsumeQueue=
/usr/local/rocketmq/store/consumequeue#消息索引存储路径 storePathIndex=
/usr/local/rocketmq/store/index#checkpoint 文件存储路径 storeCheckpoint=
/usr/local/rocketmq/store/checkpoint#abort 文件存储路径 abortFile=
/usr/local/rocketmq/store/abort#限制的消息大小 maxMessageSize=65536
#flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000
#Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量
#sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
broker-a-s.properties、broker-b-s.properties配置如下:
#所属集群名字 brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样,与Master通过brokerName来配对 brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=
rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;
rocketmq-nameserver4:9876#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums
=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker
自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口
listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时
fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000
#检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=
/usr/local/rocketmq/store#commitLog 存储路径 storePathCommitLog=
/usr/local/rocketmq/store/commitlog#消费队列存储路径存储路径 storePathConsumeQueue=
/usr/local/rocketmq/store/consumequeue#消息索引存储路径 storePathIndex=
/usr/local/rocketmq/store/index#checkpoint 文件存储路径 storeCheckpoint=
/usr/local/rocketmq/store/checkpoint#abort 文件存储路径 abortFile=
/usr/local/rocketmq/store/abort#限制的消息大小 maxMessageSize=65536
#flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000
#Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE
brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=
ASYNC_FLUSH#checkTransactionMessageEnable=false #发消息线程池数量
#sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
* 修改日志配置文件【113、114俩台机器】 mkdir -p /usr/local/rocketmq/logs cd
/usr/local/rocketmq/conf&& sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
* 修改脚本启动参数【113、114俩台机器】 vim /usr/local/rocketmq/bin/runbroker.sh
#==============================================================================
# 开发环境JVM Configuration
#==============================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m
-XX:MaxPermSize=320m" vim /usr/local/rocketmq/bin/runserver.sh JAVA_OPT="
${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m
-XX:MaxPermSize=320m"
* 启动NameServer【四台机器】 cd /usr/local/rocketmq/bin nohup sh mqnamesrv &
* 启动Master1:BrokerServerA【192.168.11.121】 cd /usr/local/rocketmq/bin nohup sh
mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties>/dev/null 2
>&1 & netstat -ntlp jps tail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
* 启动Master2:BrokerServerB【192.168.11.122】 cd /usr/local/rocketmq/bin nohup sh
mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b.properties>/dev/null 2
>&1 & netstat -ntlp jps tail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
* 启动Master1-Slave:BrokerServerC【192.168.11.123】 cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties
>/dev/null 2>&1 & netstat -ntlp jps tail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
* 启动Master2-Slave:BrokerServerD【192.168.11.124】 cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties
>/dev/null 2>&1 & netstat -ntlp jps tail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500
/usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
* 服务停止(首先关闭4个BrokerServer,再关闭4个NameServer): cd /usr/local/rocketmq/bin sh
mqshutdown broker sh mqshutdown namesrv

技术
今日推荐
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信