原创 吴就业 110 0 2020-09-22
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://www.wujiuye.com/article/c189b65d3d9743eaafdb0c263d5e78d5
作者:吴就业
链接:https://www.wujiuye.com/article/c189b65d3d9743eaafdb0c263d5e78d5
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
限流需要我们根据不同的硬件条件做好压测,压测出一个接口或者一个服务在某种硬件配置下最大能承受的QPS,根据这个结果配置限流规则,并且在后期需求的不断叠加,也需要对接口重新做压测,或者根据线上的实际表现不断调整限流的阈值。因此,限流可能很少使用,或者限流的阈值都会配置的比压测结果略大,这时就需要结合熔断降级做兜底。
Sentinel支持对同一个资源配置多个相同类型或不同类型的规则,在配置了限流规则的基础上,我们还可以为同一资源配置熔断降级规则。当接口的QPS未达限流阈值却已经有很多请求超时的情况下,就可能达到熔断降级规则的阈值从而触发熔断,这就能很好的保护服务自身。
DegradeRule规则类声明的字段如下。
public class DegradeRule extends AbstractRule {
// 可配置字段
private double count;
private int timeWindow;
private int grade = RuleConstant.DEGRADE_GRADE_RT;
private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
// 非配置字段
private AtomicLong passCount = new AtomicLong(0);
private final AtomicBoolean cut = new AtomicBoolean(false);
}
DegradeSlot是实现熔断降级的切入点,它作为ProcessorSlot插入到ProcessorSlotChain链表中,在entry方法中调用Checker去判断是否熔断当前请求,如果熔断则抛出Block异常。
Checker并不是一个接口,而是一种检测行为,限流的ckeck由FlowRuleChecker实现,而熔断的check行为则由DegradeRuleManager负责,真正check逻辑判断由DegradeRule实现,流程如下图所示。
当DegradeSlot#entry方法被调用时,由DegradeSlot调用DegradeRuleManager#checkDegrade方法检查当前请求是否满足某个熔断降级规则。熔断规则配置由DegradeRuleManager加载,所以DegradeSlot将check逻辑交给DegradeRuleManager去完成,checkDegrade方法的源码如下。
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
// 因为我们可以对同一个资源配置多个熔断降级规则,所以返回的将是一个集合。
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
DegradeRuleManager首先根据资源名称获取配置的熔断降级规则,然后遍历熔断降级规则,调用DegradeRule#passCheck方法将检查是否需要触发熔断的逻辑交给DegradeRule完成。如果对一个资源配置多个熔断降级规则,那么只要有一个熔断降级规则满足条件,就会触发熔断。
DegradeRule#passCheck方法源码如下。
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
if (cut.get()) {
return false;
}
// (1)
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
// (2)
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
}
// (3)
else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
if (total < minRequestAmount) {
return true;
}
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
}
// (4)
else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
// (5)
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
// 熔断
return false;
}
官方文档在介绍DEGRADE_GRADE_EXCEPTION_COUNT策略的地方加了使用注意说明,内容为:
注意由于统计时间窗口是分钟级别的,若 timeWindow 小于 60s,则结束熔断状态后仍可能再进入熔断状态。
这句话并不难理解,因为调用ClusterNode#totalException方法获取的是一分钟内的总异常数。StatisticNode的totalException源码如下。
// 数组大小为60,窗口时间长度为1000毫秒
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
@Override
public long totalException() {
// 获取1分钟的总异常数
return rollingCounterInMinute.exception();
}
也因如此,DEGRADE_GRADE_EXCEPTION_COUNT这个熔断降级策略似乎使用场景不多,笔者也未曾使用过。
timeWindow、passCount、cut是作者出于性能考虑而添加的,在配置熔断规则时,建议不要将timeWindow配置为0或者小于0, 可将timeWindow配置为1000毫秒,一个窗口时间长度,能减少一点计算就能降低一点Sentinel对性能的影响。
系统自适应限流就是在系统负载过高的情况下,自动切断后续请求,以保证服务的稳定运行。系统自适应限流也属于熔断降级的一种实现,而非限流降级,它与熔断降级都有一个共性,在保证服务稳定运行的情况下尽最大可能处理更多请求,一旦系统负载达到阈值就熔断请求。
SystemSlot是实现系统自适应限流的切入点。DegradeSlot在ProcessorSlotChain链表中被放在FlowSlot的后面,作为限流的兜底解决方案,而SystemSlot在ProcessorSlotChain链表中被放在FlowSlot的前面,强制优先考虑系统目前的情况能否处理当前请求,让系统尽可能跑在最大吞吐量的同时保证系统的稳定性。
系统自适应限流规则针对所有流量类型为IN的资源生效,因此不需要配置规则的资源名称。SystemRule定义的字段如下。
public class SystemRule extends AbstractRule {
private double highestSystemLoad = -1;
private double highestCpuUsage = -1;
private double qps = -1;
private long avgRt = -1;
private long maxThread = -1;
}
如果配置了多个SystemRule,则每个配置项只取最小值。例如三个SystemRule都配置了qps,则取这三个规则中最小的qps作为限流阈值,这在调用SystemRuleManager#loadRules方法加载规则时完成。
public static void loadSystemConf(SystemRule rule) {
// 是否开启系统自适应限流判断功能
boolean checkStatus = false;
// highestSystemLoad
if (rule.getHighestSystemLoad() >= 0) {
// 多个规则都配置则取最小值
highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
highestSystemLoadIsSet = true;
// 开启系统自适应限流检查功能
checkStatus = true;
}
// highestCpuUsage
if (rule.getHighestCpuUsage() >= 0) {
if (rule.getHighestCpuUsage() > 1) {}
// [0,1)
else {
// 多个规则都配置则取最小值
highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
highestCpuUsageIsSet = true;
checkStatus = true;
}
}
// avgRt
if (rule.getAvgRt() >= 0) {
// 多个规则都配置则取最小值
maxRt = Math.min(maxRt, rule.getAvgRt());
maxRtIsSet = true;
checkStatus = true;
}
// maxThread
if (rule.getMaxThread() >= 0) {
// 多个规则都配置则取最小值
maxThread = Math.min(maxThread, rule.getMaxThread());
maxThreadIsSet = true;
checkStatus = true;
}
// qps
if (rule.getQps() >= 0) {
// 多个规则都配置则取最小值
qps = Math.min(qps, rule.getQps());
qpsIsSet = true;
checkStatus = true;
}
checkSystemStatus.set(checkStatus);
}
当SystemSlot#entry方法被调用时,由SystemSlot调用SystemRuleManager#checkSystem方法判断是否需要限流,流程如下图所示。
SystemRuleManager#checkSystem方法从全局的资源指标数据统计节点Constans.ENTRY_NODE读取当前时间窗口的指标数据,判断总的QPS、平均耗时这些指标数据是否达到阈值,或者总占用的线程数是否达到阈值,如果达到阈值则抛出Block异常(SystemBlockException)。除此之外,checkSystem方法还实现了根据系统当前Load和CPU使用率限流。
SystemRuleManager#checkSystem方法源码如下。
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// 如果有配置SystemRule,则checkSystemStatus为true
if (!checkSystemStatus.get()) {
return;
}
// 只限流类型为IN的流量
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// qps限流
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// 占用线程数限流
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
// 平均耗时限流
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// 系统平均负载限流
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu使用率限流
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
使用TOP命令可查看系统的平均负载(Load)和CPU使用率,如下图所示:
Sentinel通过定时任务每秒钟使用OperatingSystemMXBean API获取这两个指标数据的值,代码如下:
@Override
public void run() {
try {
OperatingSystemMXBean osBean = ManagementFactory
.getPlatformMXBean(OperatingSystemMXBean.class);
// getSystemLoadAverage
currentLoad = osBean.getSystemLoadAverage();
// getSystemCpuLoad
currentCpuUsage = osBean.getSystemCpuLoad();
if (currentLoad > SystemRuleManager.getSystemLoadThreshold()) {
writeSystemStatusLog();
}
} catch (Throwable e) {
RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e);
}
}
private static boolean checkBbr(int currentThread) {
if (currentThread > 1 &&
currentThread >
Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
return false;
}
return true;
}
假设某接口的最大QPS为800,处理一次请求的最小耗时为5ms,那么至少需要并行的线程数与Min RT和Max QPS的关系为:
Max QPS = Threads * (1000 / Min Rt)
推出:Threads = Max QPS / (1000 / Min Rt) = Max QPS * Min Rt / 1000
替换Min Rt为5ms、Max QPS为800,计算结果:
Threads = 800 * 5 / 1000
= 4
所以,checkBbr方法中,(minRt / 1000)是将最小耗时的单位由毫秒转为秒,表示系统处理最多请求时的最小耗时,maxSuccessQps * (minRt / 1000)表示至少需要每秒多少个线程并行才能达到maxSuccessQps。在系统负载比较高的情况下,只要并行占用的线程数超过该值就限流。但如果Load高不是由当前进程引起的,checkBbr的效果就不明显。
参考文献:
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。
消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。
IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。
Instrumentation之所以难驾驭,在于需要了解Java类加载机制以及字节码,一不小心就能遇到各种陌生的Exception。笔者在实现Java探针时就踩过不少坑,其中一类就是类加载相关的问题,也是本篇所要跟大家分享的。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。