<>1. broker 启动

broker 启动需要指定启动参数,-c D:\IDEA_Project\Rule\rocketmq\conf\broker.conf,
其中的配置文件,主要是包括存储消息的路径、nameServer 地址,刷盘方式等,如下, 然后执行
org.apache.rocketmq.broker.BrokerStartup#start 完成启动。
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0
deleteWhen= 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType =
ASYNC_FLUSH# namesrvAddr地址 namesrvAddr=127.0.0.1:9876 deleteWhen = 04
fileReservedTime= 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true autoCreateTopicEnable=true # 存储路径 storePathRootDir=D:
\\IDEA_Project\\Rule\\rocketmq\\data\\rocketmq\\dataDir # commitLog路径
storePathCommitLog=D:\\IDEA_Project\\Rule\\rocketmq\\data\\rocketmq\\dataDir\\
commitlog# 消息队列存储路径 storePathConsumeQueue=D:\\IDEA_Project\\Rule\\rocketmq\\data
\\rocketmq\\dataDir\\consumequeue # 消息索引存储路径 storePathIndex=D:\\IDEA_Project\\
Rule\\rocketmq\\data\\rocketmq\\dataDir\\index # checkpoint文件路径 storeCheckpoint=
D:\\IDEA_Project\\Rule\\rocketmq\\data\\rocketmq\\dataDir\\checkpoint #
abort文件存储路径 abortFile=D:\\IDEA_Project\\Rule\\rocketmq\\data\\rocketmq\\dataDir\
\abort
*
启动大致流程,broker 的启动与 nameServer 很类似,都是先实例化一个控制器,然后调用
org.apache.rocketmq.broker.BrokerController#initialize 初始化控制器,最后调用控制器启动。
public static void main(String[] args) { start(createBrokerController(args)); }
public static BrokerController start(BrokerController controller) { // 控制器启动
controller.start(); // 打印日志 String tip = "The broker[" + controller.
getBrokerConfig().getBrokerName() + ", " + controller.getBrokerAddr() + "] boot
success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) { tip += " and name
server is " + controller.getBrokerConfig().getNamesrvAddr(); } log.info(tip);
System.out.printf("%s%n", tip); return controller; }
<>2 具体流程

<>2.1 创建 broker 控制器

org.apache.rocketmq.broker.BrokerStartup#createBrokerController:创建 broker
控制器,设置存储消息的存储路径、nameServer 地址等。

*
设置 netty 的发送、接收配置, brokerServer 的监听端口,broker 角色等。
// 设置默认 netty 的 socket 发送配置,128字节 if (null == System.getProperty(
NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig
.socketSndbufSize = 131072; } // 设置默认 netty 的 socket 接收配置,128字节 if (null ==
System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE))
{ NettySystemConfig.socketRcvbufSize = 131072; } // broker 服务的监听端口为 10911
nettyServerConfig.setListenPort(10911); final MessageStoreConfig
messageStoreConfig= new MessageStoreConfig(); // 默认为异步 master if (BrokerRole.
SLAVE== messageStoreConfig.getBrokerRole()) { int ratio = messageStoreConfig.
getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.
setAccessMessageInMemoryMaxRatio(ratio); }
*
解析启动参数,将参数中对应的配置文件内容,填充到对应的配置信息中,最后实例化 brokerController。
if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c')
; if (file != null) { configFile = file; InputStream in = new
BufferedInputStream(new FileInputStream(file)); properties = new Properties();
// 加载配置文件 properties.load(in); properties2SystemEnv(properties); // 填充 broker
配置信息 MixAll.properties2Object(properties, brokerConfig); // 填充 nettyServer 配置信息
MixAll.properties2Object(properties, nettyServerConfig); // 填充 nettyClient 配置信息
MixAll.properties2Object(properties, nettyClientConfig); // 填充消息存储配置信息 MixAll.
properties2Object(properties, messageStoreConfig); // 设置 broker 文件地址
BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } // 实例化 broker
final BrokerController controller = new BrokerController( brokerConfig,
nettyServerConfig, nettyClientConfig, messageStoreConfig);
<>2.2 初始化 broker 控制器

调用 org.apache.rocketmq.broker.BrokerController#initialize:初始化 brokerController。

*
加载 config 路径下的配置文件,例如主题配置、消费偏移量等配置文件,最后实例化消息存储服务。
// 主题配置管理加载,配置文件为:xx/config/topics.json boolean result = this.
topicConfigManager.load(); // 主题消费偏移量加载,配置文件为:xx/config/consumerOffset.json
result= result && this.consumerOffsetManager.load(); //
订阅组加载,配置文件为:xx/config/subscriptionGroup.json result = result && this.
subscriptionGroupManager.load(); // 消费过滤加载,配置文件为:xx/config/consumerFilter.json
result= result && this.consumerFilterManager.load(); if (result) { //
上述文件都加载成功,实例化消息存储服务 this.messageStore = new DefaultMessageStore(this.
messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.
brokerConfig); this.brokerStats = new BrokerStats((DefaultMessageStore) this.
messageStore); //load plugin MessageStorePluginContext context = new
MessageStorePluginContext(messageStoreConfig, brokerStatsManager,
messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.
build(context, this.messageStore); this.messageStore.getDispatcherList().
addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.
consumerFilterManager)); } public boolean load() { String fileName = null; try {
// ScheduleMessageService: 路径在 xx/config/delayOffset.json //
TopicConfigManager:路径在 xx/config/topics.json // .... fileName = this.
configFilePath(); String jsonString = MixAll.file2String(fileName); //
获取不到,则从备份文件中获取 if (null == jsonString || jsonString.length() == 0) { return this
.loadBak(); } else { this.decode(jsonString); log.info("load " + fileName + "
OK"); return true; } } catch (Exception e) { log.error("load " + fileName + "
failed, and try to load backup file", e); return this.loadBak(); } }
*
调用 org.apache.rocketmq.store.MessageStore#load:使用 messageStore 消息存储服务,加载
commitlog、消费队列文件。
// 加载具体的进度文件,commitlog、主题消费队列文件等 result = result && this.messageStore.load();
public boolean load() { boolean result = true; // 加载 abort 文件,判断是否正常退出,正常退出会产生
abort 文件 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown
{}", lastExitOK ? "normally" : "abnormally"); // 加载延迟级别对应偏移量文件 delayOffset.json
if (null != scheduleMessageService) { result = result && this.
scheduleMessageService.load(); } // load Commit Log result = result && this.
commitLog.load(); // load Consume Queue result = result && this.loadConsumeQueue
(); if (result) { // 加载 checkpoint 文件 this.storeCheckpoint = new StoreCheckpoint
(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.
getStorePathRootDir())); // 加载 index 文件,如果不是正常退出 并且比 checkpoint 文件时间大,则删除 index
文件 this.indexService.load(lastExitOK); // 还原 cosumequeue 文件,接着根据是否异常退出,恢复不正常的
commitlog 文件 this.recover(lastExitOK); log.info("load over, and the max phy
offset = {}", this.getMaxPhyOffset()); } return result; }
*
启动 broker 远程服务,并且启动 vip 通道服务。
// 启动 broker 服务 this.remotingServer = new NettyRemotingServer(this.
nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig
= (NettyServerConfig) this.nettyServerConfig.clone(); // vip 通道的监听端口-2
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.
fastRemotingServer= new NettyRemotingServer(fastConfig, this.
clientHousekeepingService);
*
定义线程池,用于发送消息、拉取消息、查询消息、监控 broker 、心跳检测、消费者管理线程池。并注册处理器,其中的请求任务,放入刚定义好的线程池中。
// 发送消息线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.
brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.
getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.
sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); // 拉取消息线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.
getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums()
, 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new
ThreadFactoryImpl("PullMessageThread_")); // 查询消息线程池 this.queryMessageExecutor =
new BrokerFixedThreadPoolExecutor( this.brokerConfig.
getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums
(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new
ThreadFactoryImpl("QueryMessageThread_")); // 监控 broker 线程池 this.
adminBrokerExecutor= Executors.newFixedThreadPool(this.brokerConfig.
getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_"));
// 客户端管理线程池 this.clientManageExecutor = new ThreadPoolExecutor( this.
brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.
getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.
clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); //
发送心跳线程池 this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( this.
brokerConfig.getHeartbeatThreadPoolNums(), this.brokerConfig.
getHeartbeatThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.
heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_",true)); //
消费者管理线程池 this.consumerManageExecutor = Executors.newFixedThreadPool(this.
brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_")); // 注册处理器 this.registerProcessor();
*
定义一系列的定时任务,按照频率检查 broker状态、统计接收、拉取消息数量、持久化消息文件、修改 nameServer 地址等。
final long period = 1000 * 60 * 60 * 24; // 每隔 1天定时记录 broker 状态:发送、拉取消息 this.
scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public
void run() { try { BrokerController.this.getBrokerStats().record(); } catch (
Throwable e) { log.error("schedule record error.", e); } } }, initialDelay,
period, TimeUnit.MILLISECONDS); // 每隔 10s 持久化消息队列偏移文件 this.
scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public
void run() { try { BrokerController.this.consumerOffsetManager.persist(); }
catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); }
} }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.
MILLISECONDS); // 每隔 10s 持久化消费者过滤文件 this.scheduledExecutorService.
scheduleAtFixedRate(new Runnable() { @Override public void run() { try {
BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) {
log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000
* 10, TimeUnit.MILLISECONDS); // 每隔 3min
检查消费队列是否消费缓慢,如果disableConsumeIfConsumerReadSlowly=true,则停止消费 this.
scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public
void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e)
{ log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); // 每隔
1s 打印统计的流控日志 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { BrokerController.this.printWaterMark(); }
catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1,
TimeUnit.SECONDS); // 每隔 1min 记录剩余需要重放消息字节数 this.scheduledExecutorService.
scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.
info("dispatch behind commit log {} bytes", BrokerController.this.
getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error(
"schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit
.MILLISECONDS); if (this.brokerConfig.getNamesrvAddr() != null) { // 设置修改
nameServer 地址 this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.
getNamesrvAddr()); log.info("Set user specified name server address: {}", this.
brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.
isFetchNamesrvAddrByAddressServer()) { // 每隔 2min 获取 nameServer 地址 this.
scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public
void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); }
catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e
); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); }
*
从 master 节点同步 broker 的 salve 节点的数据。
// broker 为从节点时,更新主节点的 HA if (BrokerRole.SLAVE == this.messageStoreConfig.
getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null &&
this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.
updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.
updateMasterHAServerAddrPeriodically= false; } else { this.
updateMasterHAServerAddrPeriodically= true; } // 每隔 1min,从节点从 master 节点同步数据 this
.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public
void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (
Throwable e) { log.error("ScheduledTask syncAll slave exception", e); } } },
1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } else { // 每隔 1min,打印主从节点的差异 this
.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public
void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (
Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } },
1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); }
*
实例化文件监听服务,并且初始化事务消息服务。
fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.
tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.
tlsServerTrustCertPath}, new FileWatchService.Listener() { boolean certChanged,
keyChanged= false; @Override public void onChanged(String path) { if (path.
equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust
certificate changed, reload the ssl context"); reloadServerSslContext(); } if (
path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path
.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (
certChanged&& keyChanged) { log.info("The certificate and private key changed,
reload the ssl context"); certChanged = keyChanged = false;
reloadServerSslContext(); } } private void reloadServerSslContext() { ((
NettyRemotingServer) remotingServer).loadSslContext(); ((NettyRemotingServer)
fastRemotingServer).loadSslContext(); } }); private void initialTransaction() {
// 实例化事务消息服务 this.transactionalMessageService = ServiceProvider.loadClass(
ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); if (
null == this.transactionalMessageService) { this.transactionalMessageService =
new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.
getMessageStore())); log.warn("Load default transaction message hook service:
{}", TransactionalMessageServiceImpl.class.getSimpleName()); } // 实例化事务消息回查服务
this.transactionalMessageCheckListener = ServiceProvider.loadClass(
ServiceProvider.TRANSACTION_LISTENER_ID,
AbstractTransactionalMessageCheckListener.class); if (null == this.
transactionalMessageCheckListener) { this.transactionalMessageCheckListener =
new DefaultTransactionalMessageCheckListener(); log.warn("Load default discard
message hook service: {}", DefaultTransactionalMessageCheckListener.class.
getSimpleName()); } this.transactionalMessageCheckListener.setBrokerController(
this); this.transactionalMessageCheckService = new
TransactionalMessageCheckService(this); }
<>2.3 broker 关闭

broker 正常关闭时,会执行钩子方法,其中会停止上述实例化、初始化的服务、向 nameServer 请求注销服务,删除 abort 文件。
// broker 正常 shutdown 时,执行该钩子方法 Runtime.getRuntime().addShutdownHook(new Thread
(new Runnable() { private volatile boolean hasShutdown = false; private
AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run()
{ synchronized (this) { log.info("Shutdown hook was invoked, {}", this.
shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown =
true; long beginTime = System.currentTimeMillis(); // 下线操作 controller.shutdown()
; long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info(
"Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }
, "ShutdownHook"));
<>2.4 启动 broker 控制器

调用 org.apache.rocketmq.broker.BrokerController#start:开启实例化时注册的各种的服务,并注册 broker。
public void start() throws Exception { // 开启消息存储服务 if (this.messageStore !=
null) { this.messageStore.start(); } // 开启与 nameServer 通信服务 if (this.
remotingServer!= null) { this.remotingServer.start(); } // 开启 vip 通道,与
nameServer 通信服务 if (this.fastRemotingServer != null) { this.fastRemotingServer.
start(); } // 文件监听服务 if (this.fileWatchService != null) { this.fileWatchService.
start(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } if
(this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); }
if (this.clientHousekeepingService != null) { this.clientHousekeepingService.
start(); } if (this.filterServerManager != null) { this.filterServerManager.
start(); } // 注册 broker this.registerBrokerAll(true, false, true); // 每隔最少 10s
注册 broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { try { BrokerController.this.registerBrokerAll(true
, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error(
"registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(
brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); if
(this.brokerStatsManager != null) { this.brokerStatsManager.start(); } if (this.
brokerFastFailure!= null) { this.brokerFastFailure.start(); } // master
节点开启事务回查服务 if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) { if (
this.transactionalMessageCheckService != null) { log.info("Start transaction
service!"); this.transactionalMessageCheckService.start(); } } }

技术
下载桌面版
GitHub
Gitee
SourceForge
百度网盘(提取码:draw)
云服务器优惠
华为云优惠券
腾讯云优惠券
阿里云优惠券
Vultr优惠券
站点信息
问题反馈
邮箱:[email protected]
吐槽一下
QQ群:766591547
关注微信