文章基于rocketmq demo为入口分析

<>目录

* 初始化流控规则
* 流控
* 常用slot链节点处理
* 常用流控算法学习

<>初始化流控规则

* PullConsumerDemo.main启动消费消息前初始化流控规则_initFlowControlRule_
* 创建流控规则:FlowRule
* 设置资源resource,例如:分组名GROUP_NAME与主题名TOPIC_NAME
* 设置流控阈值count
* 设置流控类型grade:FLOW_GRADE_THREAD基于线程流控,FLOW_GRADE_QPS基于QPS流控
* 设置要限制的应用名称limitApp:根据源数据限流,默认为default代表所有应用
* 设置流控行为controlBehavior:
CONTROL_BEHAVIOR_DEFAULT、CONTROL_BEHAVIOR_WARM_UP、CONTROL_BEHAVIOR_RATE_LIMITER(令牌桶算法)、CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER
*
设置最大队列超时时间maxQueueingTimeMs:如果有更多的请求进来,他们将被放入队列等待,队列等待有一个超时时间,请求如果超过超时设置将会立即被阻塞
* 流控管理器FlowRuleManager加载规则_loadRules_
* 当前属性更新规则_currentProperty(DynamicSentinelProperty
).updateValue(rules),绑定至value属性并通知监听列表如果存在的话_
好啦,配置还是灰常之简单滴,而且我们可以看到sentinel对算法的支持更加灵活。核心的流控算法是参考google的guava中的算法

<>流控

* 消费者消费消息
* 遍历消息
* 根据key进入调用上下文ContextUtil.enter
* 从ThreadLocal中获取,如果不存在则创建执行上下文Context
*
根据name获取缓存中DefaultNode属性,如果不存在,则判断是否超过最大限制2000,超过则返回_NULL_CONTEXT。否则再加锁判断是否为空,是否存在DefaultNode,不存在则根据name(即key:group_name及topic_name)创建_EntranceNode,并向ROOT节点添加子节点,缓存Node、上下文,返回上下文
* SphU根据key进入,类型为EntryType.OUT,按照优先级进入(默认为false,不开启优先级)
* 如果是NULL_CONTEXT类型则返回没有处理slot链的CtEntry
* 如果Context为null,返回默认的CtEntry
* 如果设置全局开关不启用处理链则同NULL_CONTEXT类型一样返回没有处理slot链的CtEntry
* 根据资源查找处理slot链lookProcessChain,不存在则创建并缓存
* 创建slot链SlotChainProvider.newSlotChain
*
如果builder不为空直接build构建返回,否则按照SPI加载builder,如果加载依然为空则使用默认的DefaultSlotChainBuilder
* 默认slot链构建器构建slot链 public ProcessorSlotChain build() { ProcessorSlotChain
chain= new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.
addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new
AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()
); return chain; }
* 根据资源、slot链、上下文创建CtEntry
* slot链进入entry,默认count步进为1,优先级false
* 返回CtEntry

<>默认Slot链各Slot节点处理

DefaultProcessorSlotChain链为链表结构AbstractLinkedProcessorSlot,即FIFO,也就是在存在next的情况下会不断传递至下一个节点,所以实际的顺序是从最后一个节点开始执行,我们按照顺序来具体看看每个slot节点的处理

<>NodeSelectorSlot节点选择slot

* 根据名称选择DefaultNode节点并放入上下文
* 如果上下文为空则创建集群节点为空(即非集群模式)

<>ClusterBuilderSlot集群构建slot

* 创建集群节点clusterNode
* 如果origin节点为空字符串,则创建对应的StatisticNode节点

<>LogSlot日志slot

* 直接fire下个节点,catch异常并记录日志,阻塞异常打印鹰眼日志并抛出,其他异常直接记录日志:“Unexpected entry
exception”

<>StatisticSlot监控统计slot

* 直接fire下个节点
* 记录统计数据,递增线程数increaseThreadNum,按照步进count增加通过请求数addPassRequest
* 如果origin节点不为空同样操作线程数,请求数
* 如果Entry类型为IN则同样操作常量中Constants.ENTRY_NODE的线程数请求数
* 回调handle链
* 捕获优先级等待异常PriorityWaitException,则仅增加线程数
* 捕获阻塞异常BlockException,则仅增加阻塞请求数
* 其他异常则仅增加异常请求数

<>SystemSlot系统保护slot

* 系统规则管理器校验资源resourceWrapper
* 校验系统是否开启_checkSystemStatus=true(默认为false即不校验)_
* 如果资源Entry类型为IN直接返回
* 校验qps、thread、rt、系统负载(load. BBR
algorithm.)、cpu使用率是否超过配置阈值,是则抛出SystemBlockException异常
* fire下个节点

<>AuthoritySlot黑白名单认证slot

* 校验黑白名单
* 如果不存在认证规则AuthorityRule直接返回
* 根据资源名称获取对应的认证规则AuthorityRule列表,如果为空直接返回
* 遍历校验规则校验,校验失败则抛出AuthorityException异常
* fire下个节点

<>FlowSlot流控slot

* 校验流
* 流控规则管理器FlowRuleManager获取流控规则map
* 根据资源名称获取流控规则列表
* 遍历流控规则,校验流,校验失败则抛出FlowException异常
* fire下个节点

<>DegradeSlot熔断降级slot

* 熔断降级规则管理器校验降级:DegradeRuleManager.checkDegrade
* 根据资源名称获取熔断降级规则列表
* 如果为空直接返回
* 遍历规则校验,校验不通过则抛出DegradeException异常
* fire下个节点

<>常用流控算法学习

sentinel流控算法种类,总共一下几种:直接拒绝、Warm Up、匀速排队
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule)
{ if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { switch (rule.
getControlBehavior()) { //令牌桶算法 case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor); //漏桶算法 case RuleConstant.
CONTROL_BEHAVIOR_RATE_LIMITER: return new RateLimiterController(rule.
getMaxQueueingTimeMs(), rule.getCount()); case RuleConstant.
CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: return new WarmUpRateLimiterController(
rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(),
ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default: // Default mode or unknown mode: default traffic shaping controller
(fast-reject). } } //超出直接拒绝 return new DefaultController(rule.getCount(), rule.
getGrade()); }

<>令牌桶算法

sentinel中的RateLimiterController便是使用的令牌桶算法实现

* 比率限制控制器重要属性
* count:阈值count数
* maxQueueingTimeMs:最大等待队列超时时间毫秒数
* 如果申请acquireCount<=0则直接通过
* 如果count<=0则拒绝
* 计算平均每次请求耗时毫秒数 // Calculate the interval between every two requests. long
costTime= Math.round(1.0 * (acquireCount) / count * 1000);
* 计算平均每次请求耗时毫秒数
* 根据上次通过时间计算当前请求期望通过时间expectedTime // Expected pass time of this request. long
expectedTime= costTime + latestPassedTime.get();
* 如果期望时间小于等于当前时间
* 设置上次通过时间
* 返回通过true
* 如果期望时间大于当前时间(即需要等待) long waitTime = costTime + latestPassedTime.get() -
TimeUtil.currentTimeMillis();
* 计算需要等待的时间waitTime
* 如果waitTime大于最大等待队列时间则返回拒绝false
* 为上次通过时间增加花费时间cost减去当前时间再次判断是否超过最大等待队列时间
* 如果大于则恢复上次通过时间并返回拒绝false
* 否则休眠后返回true通过 // Calculate the time to wait. long waitTime = costTime +
latestPassedTime.get() - TimeUtil.currentTimeMillis(); if (waitTime >
maxQueueingTimeMs) { return false; } else { long oldTime = latestPassedTime.
addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); if
(waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return
false; } // in race condition waitTime may <= 0 if (waitTime > 0) { Thread.sleep
(waitTime); } return true; } catch (InterruptedException e) { } }

<>漏桶算法


sentinel中的WarmUpController便是采用的漏桶算法实现,主要解决系统接收的脉冲类的请求,即使系统在稳定期间内可能拥有很大的处理能力,脉冲类的请求也可能会将系统拖慢甚至宕机。那我们来看看这个类的实现

* 热身控制器构造器重要属性
* count:阈值count
* warmUpPeriodInSec:热身时间秒数
* coldFactor:冷却因子
* warningToken:警戒令牌=(int)(warmUpPeriodInSec * count) / (coldFactor -
1);每秒count个令牌*热身秒数,即热身期间所有令牌数/冷却因子-1,计算得出警戒值。
* maxToken:最大令牌=warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 +
coldFactor))
* slope:斜坡=(coldFactor - 1.0) / count / (maxToken - warningToken)
* 最大令牌数平均每秒约为下图中的值再乘以warmUpPeriodInSec
* 例如:count=10,coldFactor=3,warmUpPeriodInSec=3。则warningToken=310/(3-1) =
15,maxToken=15+23*10/(1+3)=30,slope=(3-1)/10/(30-15)≈0.013
* 判断是否可以通过canPass public boolean canPass(Node node, int acquireCount, boolean
prioritized) { long passQps = (long) node.passQps(); long previousQps = (long)
node.previousPassQps(); syncToken(previousQps); // 开始计算它的斜率 //
如果进入了警戒线,开始调整他的qps long restToken = storedTokens.get(); if (restToken >=
warningToken) { long aboveToken = restToken - warningToken; //
消耗的速度要比warning快,但是要比慢 // current interval = restToken*slope+1/count double
warningQps= Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps +
acquireCount<= warningQps) { return true; } } else { if (passQps + acquireCount
<= count) { return true; } } return false; }
* 同步令牌syncToken protected void syncToken(long passQps) { long currentTime =
TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime)
{ return; } long oldValue = storedTokens.get(); long newValue = coolDownTokens(
currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) {
storedTokens.set(0L); } lastFilledTime.set(currentTime); } }
* 当前时间抹去百位、十位、个位等位数取整
* 获取上次填充token时间,如果处理过的当前时间小于等于上次填充时间直接返回(因为抹掉了秒之后的所有位的数取整,即每秒填充一次)
* 获取上次填充值storedTokens.get()
* 冷却令牌 private long coolDownTokens(long currentTime, long passQps) { long
oldValue= storedTokens.get(); long newValue = oldValue; // 添加令牌的判断前提条件: //
当令牌的消耗程度远远低于警戒线的时候 if (oldValue < warningToken) { newValue = (long)(oldValue + (
currentTime- lastFilledTime.get()) * count / 1000); } else if (oldValue >
warningToken) { if (passQps < (int)count / coldFactor) { newValue = (long)(
oldValue+ (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.
min(newValue, maxToken); }
* 如果老令牌小于警戒令牌,新令牌=(当前时间-上次填充时间/1000(即转为秒))*count(阈值count)+老令牌
* 如果老令牌大于等于警戒令牌,上个时间窗的qps如果小于(count/冷却因子)。新令牌=同样按照小于警戒令牌的算法计算。如果大于等于则不再增加新令牌
* 返回新令牌与最大令牌两者之间的最小值
* 设置新令牌如果成功则将上个时间窗通过的qps减去,如果减去之后小于0则设置为0
* 设置填充时间
* 同步令牌完成
* 从令牌桶storeTokens中获取令牌
* 如果当前令牌大于等于警戒令牌
* 当前令牌减去警戒令牌(超出警戒令牌部分)aboveToken
* 根据斜率计算警戒qps
* 当前qps+申请制acquireCount<=警戒qps;返回通过true,否则返回拒绝false
* 如果当前令牌小于警戒令牌
* 当前qps+申请值acquireCount<=count(阈值count)返回通过true,否则返回拒绝false

技术
下载桌面版
GitHub
Gitee
SourceForge
百度网盘(提取码:draw)
云服务器优惠
华为云优惠券
腾讯云优惠券
阿里云优惠券
Vultr优惠券
站点信息
问题反馈
邮箱:[email protected]
吐槽一下
QQ群:766591547
关注微信