原创 吴就业 84 0 2020-09-22
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://www.wujiuye.com/article/71696ddb3c174831899b0b59ea49ede4
作者:吴就业
链接:https://www.wujiuye.com/article/71696ddb3c174831899b0b59ea49ede4
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
StatisticSlot才是实现资源各项指标数据统计的ProcessorSlot,它与NodeSelectorSlot、ClusterBuilderSlot组成了资源指标数据统计流水线,分工明确。首先NodeSelectorSlot为资源创建DefaultNode,将DefaultNode向下传递,ClusterBuilderSlot负责给资源的DefaultNode加工,添加ClusterNode这个零部件,再将DefaultNode向下传递给StatisticSlot,如下图所示。
StatisticSlot在统计指标数据之前会先调用后续的ProcessorSlot,根据后续ProcessorSlot判断是否需要拒绝该请求的结果决定记录哪些指标数据,这也是为什么Sentinel设计的责任链需要由前一个ProcessorSlot在entry或者exit方法中调用fireEntry或者fireExit完成调用下一个ProcessorSlot的entry或exit方法,而不是使用for循环遍历调用ProcessorSlot的原因。每个ProcessorSlot都有权决定是先等后续的ProcessorSlot执行完成再做自己的事情,还是先完成自己的事情再让后续ProcessorSlot执行,与流水线有所区别。
StatisticSlot源码框架如下。
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// .....
} catch (PriorityWaitException ex) {
// .....
} catch (BlockException e) {
// ....
throw e;
} catch (Throwable e) {
// .....
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
// ....
fireExit(context, resourceWrapper, count);
}
}
第一种情况:当后续的ProcessorSlot未抛出任何异常时,表示不需要拒绝该请求,放行当前请求。
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
可调用StatisticSlotCallbackRegistry#addEntryCallback静态方法注册ProcessorSlotEntryCallback,ProcessorSlotEntryCallback接口的定义如下。
public interface ProcessorSlotEntryCallback<T> {
void onPass(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) throws Exception;
void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args);
}
第二种情况:捕获到类型为PriorityWaitException的异常。
这是特殊情况,在需要对请求限流时,只有使用默认流量效果控制器才可能会抛出PriorityWaitException异常,这部分内容将在分析FlowSlot的实现源码时再作分析。
当捕获到PriorityWaitException异常时,说明当前请求已经被休眠了一会了,但请求还是允许通过的,只是不需要为DefaultNode记录这个请求的指标数据了,只自增当前资源并行占用的线程数,同时,DefaultNode也会为ClusterNode自增并行占用的线程数。最后也会回调所有ProcessorSlotEntryCallback#onPass方法。这部分源码如下。
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
第三种情况:捕获到BlockException异常,BlockException异常只在需要拒绝请求时抛出。
当捕获到BlockException异常时,将异常记录到调用链路上下文的当前Entry(StatisticSlot的exit方法会用到),然后调用DefaultNode#increaseBlockQps方法记录当前请求被拒绝,将当前时间窗口的block qps这项指标数据的值加1。如果调用来源不为空,让调用来源的StatisticsNode也记录当前请求被拒绝;如果流量类型为IN,则让用于统计所有资源指标数据的ClusterNode也记录当前请求被拒绝。这部分的源码如下。
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
StatisticSlot捕获BlockException异常只是为了收集被拒绝的请求,BlockException异常还是会往上抛出。抛出异常的目的是为了拦住请求,让入口处能够执行到catch代码块完成请求被拒绝后的服务降级处理。
第四种情况:捕获到其它异常。
其它异常并非指业务异常,因为此时业务代码还未执行,而业务代码抛出的异常是通过调用Tracer#trace方法记录的。
当捕获到非BlockException异常时,除PriorityWaitException异常外,其它类型的异常都同样处理。让DefaultNode记录当前请求异常,将当前时间窗口的exception qps这项指标数据的值加1。调用来源的StatisticsNode、用于统计所有资源指标数据的ClusterNode也记录下这个异常。这部分源码如下。
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
exit方法被调用时,要么请求被拒绝,要么请求被放行并且已经执行完成,所以exit方法需要知道当前请求是否正常执行完成,这正是StatisticSlot在捕获异常时将异常记录到当前Entry的原因,exit方法中通过Context可获取到当前CtEntry,从当前CtEntry可获取entry方法中写入的异常。
exit方法源码如下(有删减)。
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
// 计算耗时
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
// 记录执行耗时与成功总数
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
// 自减当前资源占用的线程数
node.decreaseThreadNum();
// origin不为空
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
// 流量类型为in时
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
}
// Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
fireExit(context, resourceWrapper, count);
}
exit方法中通过Context可获取当前资源的DefaultNode,如果entry方法中未出现异常,那么说明请求是正常完成的,在请求正常完成情况下需要记录请求的执行耗时以及响应是否成功,可将当前时间减去调用链路上当前Entry的创建时间作为请求的执行耗时。
ClusterNode才是一个资源全局的指标数据统计节点,但我们并未在StatisticSlot#entry方法与exit方法中看到其被使用。因为ClusterNode被ClusterBuilderSlot交给了DefaultNode掌管,在DefaultNode的相关指标数据收集方法被调用时,ClusterNode的对应方法也会被调用,如下代码所示。
public class DefaultNode extends StatisticNode {
......
private ClusterNode clusterNode;
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
}
记录某项指标数据指的是:针对当前请求,记录当前请求的某项指标数据,例如请求被放行、请求被拒绝、请求的执行耗时等。
假设当前请求被成功处理,StatisticSlot会调用DefaultNode#addRtAndSuccess方法记录请求处理成功、并且记录处理请求的耗时,DefaultNode先调用父类的addRtAndSuccess方法,然后DefaultNode会调用ClusterNode#addRtAndSuccess方法。ClusterNode与DefaultNode都是StatisticNode的子类,StatisticNode#addRtAndSuccess方法源码如下。
@Override
public void addRtAndSuccess(long rt, int successCount) {
// 秒级滑动窗口
rollingCounterInSecond.addSuccess(successCount);
rollingCounterInSecond.addRT(rt);
// 分钟级的滑动窗口
rollingCounterInMinute.addSuccess(successCount);
rollingCounterInMinute.addRT(rt);
}
rollingCounterInSecond是一个秒级的滑动窗口,rollingCounterInMinute是一个分钟级的滑动窗口,类型为ArrayMetric。分钟级的滑动窗口一共有60个MetricBucket,每个MetricBucket都被WindowWrap包装,每个MetricBucket统计一秒钟内的各项指标数据,如下图所示。
当调用rollingCounterInMinute#addSuccess方法时,由ArrayMetric根据当前时间戳获取当前时间窗口的MetricBucket,再调用MetricBucket#addSuccess方法将success这项指标的值加上方法参数传递进来的值(一般是1)。MetricBucket使用LongAdder记录各项指标数据的值。
Sentinel在MetricEvent枚举类中定义了Sentinel会收集哪些指标数据,MetricEvent枚举类的源码如下。
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
其它的指标数据都可通过以上这些指标数据计算得出,例如,平均耗时可根据总耗时除以成功总数计算得出。
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。
消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。
IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。
Instrumentation之所以难驾驭,在于需要了解Java类加载机制以及字节码,一不小心就能遇到各种陌生的Exception。笔者在实现Java探针时就踩过不少坑,其中一类就是类加载相关的问题,也是本篇所要跟大家分享的。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。