原创 吴就业 85 0 2020-09-22
本文为博主原创文章,未经博主允许不得转载。
本文链接:https://www.wujiuye.com/article/b3a99bfd781649a9979a33fa6465d646
作者:吴就业
链接:https://www.wujiuye.com/article/b3a99bfd781649a9979a33fa6465d646
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。
黑白名单过滤是使用最为广泛的一种过滤规则,例如,用于实现接口安全的IP黑白名单规则过滤,用于防骚扰的短信、来电拦截黑白名单过滤。所以Sentinel中的黑白名单限流并不难理解,如果配置了黑名单,且请求来源存在黑名单中,则拦截(拒绝)请求,如果配置了白名单,且请求来源存在白名单中则放行。Sentinel不支持一个黑白名单规则同时配置黑名单和白名单,因此不存优先级的问题。
黑白名单过滤功能更像是一种授权机制,它简单的将权限分为有权限和无权限两种情况,如果支持冲突,可使用优先级策略解决冲突问题。Sentinel把黑白名作为授权策略,实现黑白名单限流即实现授权限流。Sentinel在命名上也是使用Authority,而非BlackWhiteList。
一些关键类说明:
授权规则(AuthorityRule)是Sentinel中最易于理解的一种规则,AuthorityRule的配置项如下:
public class AuthorityRule extends AbstractRule {
private int strategy = RuleConstant.AUTHORITY_WHITE;
}
当strategy配置为AUTHORITY_WHITE时,limitApp即为白名单;当strategy配置为AUTHORITY_BLACK时,limitApp即为黑明单。例如:
AuthorityRule rule = new AuthorityRule();
// 资源名称
rule.setResource("GET:/hello");
// 白名单策略
rule.setStrategy(RuleConstant.AUTHORITY_WHITE);
// 白名单
rule.setLimitApp("serviceA,serviceC");
AuthorityRuleManager.loadRules(Collections.singletonList(rule));
上述规则用于限制资源”GET:/hello”只允许服务A和服务C访问。
在使用默认的SlotChainBuilder情况下,AuthoritySlot被放在SystemSlot、FlowSlot、DegradeSlot的前面,其优先级更高。原因之一是授权限流不需要使用统计的指标数据,另一个原因则是提升性能,在未授权的情况下没必要判断是否需要熔断、系统负载能否接住这个请求、QPS是否过高等,这与用户授权功能是一样的道理,未登陆无需判断是否有权限访问某个资源。
AuthoritySlot的实现源码如下。
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
// (1)
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
// (2)
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
// (3)
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
}
AuthorityRuleChecker负责实现黑白名单的过滤逻辑,其passCheck方法源码如下。
static boolean passCheck(AuthorityRule rule, Context context) {
// 获取来源
String requester = context.getOrigin();
// 来源为空,或者来源等于规则配置的limitApp则拦截请求
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}
// 字符串查找,这一步起到快速过滤的作用,提升性能
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;
// 存在才精确匹配
if (contain) {
boolean exactlyMatch = false;
// 分隔数组
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
// 标志设置为true
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
// 策略
int strategy = rule.getStrategy();
// 如果是黑名单,且来源存在规则配置的黑名单中
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
// 如果是白名单,且来源不存在规则配置的白名单中
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
return true;
}
整个方法都比较简单,首先是从当前Context获取调用来源的名称,只有在调用来源不为空且规则配置了黑名单或者白名单的情况下,才会走黑白名单的过滤逻辑,这也说明,要实现黑白名单限流的前提是每个服务消费端在发起请求时都必须要携带自身服务的名称,这取决于Sentinel主流框架适配器;其次,Sentinel通过使用indexOf先简单匹配一次黑名单或白名单,再切割黑名单或白名单数组实现精确匹配,这有助于提升性能;如果当前请求来源存在名单中,则根据策略判断这份名称是黑名单还是白名单,再决定是否需要拒绝请求。
热点参数限流并非在Sentinel的core模块中实现的,但也是非常实用的一种限流方式。并且,Sentinel支持API Gateway网关限流也是基于参数限流实现的,了解热点参数限流的实现原理也有助于我们更好的理解网关限流。
参数限流,即根据方法调用传递的参数实现限流,又或者说是根据接口的请求参数限流;热点参数限流,即针对访问频繁的参数限流,例如,都是调用一个下单接口,但购买的商品不同,比如主播带货的商品下单流量较大,而一般商品购买量很少,同时因为商品数量有限,不太可能每个下单请求都能购买成功,如果能实现根据客户端请求传递的商品ID实现限流,将流量控制在商品的库存总量左右,并且使用QPS限流等兜底,这种有针对性的限流将接口通过的有效流量最大化。
热点参数限流功能在Sentinel源码的扩展功能模块:sentinel-extension,子模块为sentinel-parameter-flow-control。
热点参数限流使用的指标数据不再是core模块中统计的指标数据,而是重新实现了一套指标数据统计功能,依旧是基于滑动窗口。
与core模块的MetricBucket实现不同,MetricBucket只统计每个指标的数值,而ParamMapBucket需要统计每个指标、参数的每种取值的数值,MetricBucket更像是Redis中的String结构,而ParamMapBucket更像Redis中的Hash结构。
ParamMapBucket的源码如下。
public class ParamMapBucket {
// 数组类型为CacheMap<Object, AtomicInteger>
private final CacheMap<Object, AtomicInteger>[] data;
public ParamMapBucket() {
this(DEFAULT_MAX_CAPACITY);
}
public ParamMapBucket(int capacity) {
RollingParamEvent[] events = RollingParamEvent.values();
// 根据需要统计的指标数据创建数组
this.data = new CacheMap[events.length];
// RollingParamEvent可取值为REQUEST_PASSED、REQUEST_BLOCKED
for (RollingParamEvent event : events) {
data[event.ordinal()] = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(capacity);
}
}
}
HotParameterLeapArray继承LeapArray,即实现滑动窗口。ParamMapBucket不存储窗口时间信息,窗口时间信息依然由WindowWrap存储,HotParameterLeapArray使用WindowWrap包装ParamMapBucket。
笔者也是看了HotParameterLeapArray之后才明白为什么Sentienl将滑动窗口抽象为LeapArray,这为扩展实现收集自定义指标数据的滑动窗口提供了支持。
HotParameterLeapArray的提供的几个API如下:
public class HotParameterLeapArray extends LeapArray<ParamMapBucket> {
//.....
public void addValue(RollingParamEvent event, int count, Object value) {
// ....
}
public Map<Object, Double> getTopValues(RollingParamEvent event, int number) {
// .....
}
public long getRollingSum(RollingParamEvent event, Object value) {
// .....
}
public double getRollingAvg(RollingParamEvent event, Object value) {
// ....
}
}
可见,如果是分钟级的滑动窗口,一分内参数的取值越多,其占用的内存就越多。
两个需要重点关注的类:
ParameterMetric有三个静态字段,源码如下。
public class ParameterMetric {
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();
}
ParameterMetricStorage使用ConcurrentHashMap缓存每个资源对应的ParameterMetric,只会为配置了参数限流规则的资源创建ParameterMetric。其部份源码如下所示。
public final class ParameterMetricStorage {
private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();
private static final Object LOCK = new Object();
public static void initParamMetricsFor(ResourceWrapper resourceWrapper,ParamFlowRule rule) {
if (resourceWrapper == null || resourceWrapper.getName() == null) {
return;
}
String resourceName = resourceWrapper.getName();
ParameterMetric metric;
// 双重检测,线程安全,为资源创建全局唯一的ParameterMetric
if ((metric = metricsMap.get(resourceName)) == null) {
synchronized (LOCK) {
if ((metric = metricsMap.get(resourceName)) == null) {
metric = new ParameterMetric();
metricsMap.put(resourceWrapper.getName(), metric);
}
}
}
// 初始化ParameterMetric
metric.initialize(rule);
}
}
initParamMetricsFor方法用于为资源创建ParameterMetric并初始化,该方法在资源被访问时由ParamFlowSlot调用,并且该方法只在为资源配置了参数限流规则的情况下被调用。
sentinel-parameter-flow-control模块通过Java SPI注册自定义的SlotChainBuilder,即注册HotParamSlotChainBuilder,将ParamFlowSlot放置在StatisticSlot的后面,这个ParamFlowSlot就是实现热点参数限流功能的切入点。
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
既然是参数限流,那么肯定是需要能够拿到参数了,而ProcessorSlot#entry方法的最后一个参数就是请求传递过来的参数,通过SphU#entry方法一层层往下传递。例如:
@GetMapping("/hello")
public String apiHello(String name) throws BlockException {
ContextUtil.enter("my_context");
Entry entry = null;
try {
entry = SphU.entry("GET:/hello", EntryType.IN,1,name);
doBusiness();
return "Hello!";
} catch (Exception e) {
if (!(e instanceof BlockException)) {
Tracer.trace(e);
}
throw e;
} finally {
if (entry != null) {
entry.exit(1);
}
ContextUtil.exit();
}
}
当SphU#entry调用到ParamFlowSlot#entry方法时,ParamFlowSlot调用checkFlow方法判断是否需要限流。checkFlow方法的实现如下。
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
//(1)
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
//(2)
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
在阅读ParamFlowChecker#passCheck方法的源码之前,我们需要先了解参数限流规则的配置,了解每个配置项的作用。
参数限流规则ParamFlowRule的源码如下(有删减)。
public class ParamFlowRule extends AbstractRule {
private int grade = RuleConstant.FLOW_GRADE_QPS;
private double count;
private Integer paramIdx;
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
private int maxQueueingTimeMs = 0;
private long durationInSec = 1;
private int burstCount = 0;
}
假设需要针对资源“GET:/hello”的name参数限流,当name取值为“jackson”时限流QPS阈值为5,则配置如下。
ParamFlowRule rule = new ParamFlowRule();
// 资源为/hello
rule.setResource("GET:/hello");
// 索引0对应的参数为name
rule.setParamIdx(0);
// qps限流
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 阈值为5
rule.setCount(5);
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
以此为例,我们分析ParamFlowChecker#passCheck方法源码,passCheck返回true表示放行,返回false表示拒绝。
ParamFlowChecker#passCheck方法源码如下。
public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object... args) {
if (args == null) {
return true;
}
// 判断参数索引是否合法
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
// 获取参数值,如果值为空则允许通过
Object value = args[paramIdx];
if (value == null) {
return true;
}
// 集群限流
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
// 单机限流
return passLocalCheck(resourceWrapper, rule, count, value);
}
我们先不讨论集群限流情况,仅看单机本地限流情况。passLocalCheck方法的源码如下。
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
// 基本数据类型
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
}
// 数组类
else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
}
// 引用类型
else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
}
return true;
}
由于参数可能是基本数据类型,也可能是数组类型,或者引用类型,所以passLocalCheck方法分三种情况处理。我们只讨论其中一种情况,其它情况的实现类似。
以资源“GET:/hello”为例,其方法apiHello的name参数为String类型,因此会调用passSingleValueCheck方法,该方法源码如下。
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {
//(1)
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
}
// (2)
else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value);
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
return ++threadCount <= threshold;
}
return true;
}
你可能好奇,并行占用线程总数是在哪里自增和自减的呢?这是由ParamFlowStatisticEntryCallback与ParamFlowStatisticExitCallback这两个Callback实现的,分别在StatisticSlot的entry方法和exit方法中被回调执行,这是我们前面分析StatisticSlot源码时故意遗漏的细节。
快速失败基于令牌桶算法实现。passDefaultLocalCheck方法控制每个时间窗口只生产一次令牌,将令牌放入令牌桶,每个请求都从令牌桶中取走令牌,当令牌足够时放行,当令牌不足时直接拒绝。ParameterMetric#tokenCounters用作令牌桶,timeCounters存储最近一次生产令牌的时间。
passDefaultLocalCheck方法源码如下。
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
//(1)
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// (2)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
// (3)
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
while (true) {
// (4)
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
//(5)
long passTime = currentTime - lastAddTokenTime.get();
if (passTime > rule.getDurationInSec() * 1000) {
// 确保非NULL
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
lastAddTokenTime.set(currentTime);
return true;
} else {
//(6)
long restQps = oldQps.get();
// 计算需要新增的令牌数,根据时间间隔、限流阈值、窗口时间计算
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
// 计算新的令牌总数,并立即使用(扣减acquireCount个令牌)
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
// (7)
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
// 令牌是否足够
if (oldQpsValue - acquireCount >= 0) {
// 从令牌桶中取走令牌
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
}
与RateLimiterController实现原理一样,passThrottleLocalCheck方法让请求在虚拟队列中排队,控制请求通过的时间间隔,该时间间隔通过阈值与窗口时间大小计算出来,如果当前请求计算出来的排队等待时间大于限流规则指定的maxQueueingTimeMs,则拒绝当前请求。
passThrottleLocalCheck方法源码如下。
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {
//(1)
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
if (timeRecorderMap == null) {
return true;
}
// (2)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
//(3)
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
//(4)
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
return true;
}
long lastPassTime = timeRecorder.get();
// 计算当前请求的期望通过时间,最近一次请求的期望通过时间 + 请求通过的时间间隔
long expectedTime = lastPassTime + costTime;
//(5)
if (expectedTime <= currentTime
|| expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
long waitTime = expectedTime - currentTime;
if (waitTime > 0) {
lastPastTimeRef.set(expectedTime);
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
return true;
} else {
Thread.yield();
}
} else {
return false;
}
}
}
黑白名单限流的实现相对简单,热点参数限流的实现相对复杂。热点参数限流自己实现了一个滑动窗口用于收集指标数据,但该滑动窗口并未被使用,而是使用ParameterMetric与ParameterMetricStorage,这应该是出于性能的考虑。热点参数限流对性能的影响和对内存的占用与参数的取值有多少种可能成正比,限流参数的取值可能性越多,占用的内存就越大,对性能的影响也就越大,在使用热点参数限流功能时,一定要考虑参数的取值。
例如,根据商品ID限流,如果有十万个商品下单,那么CacheMap将会存在十万个key-value,并且不会被移除,随着进程运行的时长而增长。如果限流阈值类型选择为THREAD则不会存在这个问题,因为在ParamFlowStatisticExitCallback方法会调用ParameterMetric#decreaseThreadCount方法扣减参数值占用的线程数,当线程数为零时,会将当前参数值对应的key-value从CacheMap中移除。
声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。
本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。
消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。
IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。
Instrumentation之所以难驾驭,在于需要了解Java类加载机制以及字节码,一不小心就能遇到各种陌生的Exception。笔者在实现Java探针时就踩过不少坑,其中一类就是类加载相关的问题,也是本篇所要跟大家分享的。
订阅
订阅新文章发布通知吧,不错过精彩内容!
输入邮箱,提交后我们会给您发送一封邮件,您需点击邮件中的链接完成订阅设置。