系列文章

Sentinel 原理-调用链
Sentinel 原理-滑动窗口
Sentinel 原理-实体类
Sentinel 实战-限流篇
Sentinel 实战-控制台篇
Sentinel 实战-规则持久化
Sentinel 实战-集群限流篇

Sentinel 系列教程,现已上传到 github 和 gitee 中:

Sentinel 是阿里中间件团队开源的,面向分布式服务架构的轻量级高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。

大家可能会问:Sentinel 和之前常用的熔断降级库 Netflix Hystrix 有什么异同呢?Sentinel官网有一个对比的文章,这里摘抄一个总结的表格,具体的对比可以点此 链接 查看。

对比内容SentinelHystrix
隔离策略信号量隔离线程池隔离/信号量隔离
熔断降级策略基于响应时间或失败比率基于失败比率
实时指标实现滑动窗口滑动窗口(基于 RxJava)
规则配置支持多种数据源支持多种数据源
扩展性多个扩展点插件的形式
基于注解的支持支持支持
限流基于 QPS,支持基于调用关系的限流不支持
流量整形支持慢启动、匀速器模式不支持
系统负载保护支持不支持
控制台开箱即用,可配置规则、查看秒级监控、机器发现等不完善
常见框架的适配Servlet、Spring Cloud、Dubbo、gRPC 等Servlet、Spring Cloud Netflix

从对比的表格可以看到,Sentinel比Hystrix在功能性上还要强大一些,本文让我们一起来了解下Sentinel的源码,揭开Sentinel的神秘面纱。

项目结构

将Sentinel的源码fork到自己的github库中,接着把源码clone到本地,然后开始源码阅读之旅吧。

首先我们看一下Sentinel项目的整个结构:

sentinel-project-structure.png

  • sentinel-core 核心模块,限流、降级、系统保护等都在这里实现
  • sentinel-dashboard 控制台模块,可以对连接上的sentinel客户端实现可视化的管理
  • sentinel-transport 传输模块,提供了基本的监控服务端和客户端的API接口,以及一些基于不同库的实现
  • sentinel-extension 扩展模块,主要对DataSource进行了部分扩展实现
  • sentinel-adapter 适配器模块,主要实现了对一些常见框架的适配
  • sentinel-demo 样例模块,可参考怎么使用sentinel进行限流、降级等
  • sentinel-benchmark 基准测试模块,对核心代码的精确性提供基准测试

运行样例

基本上每个框架都会带有样例模块,有的叫example,有的叫demo,sentinel也不例外。

那我们从sentinel的demo中找一个例子运行下看看大致的情况吧,上面说过了sentinel主要的核心功能是做限流、降级和系统保护,那我们就从“限流”开始看sentinel的实现原理吧。

sentinel-basic-demo-flow-qps.png

可以看到sentinel-demo模块中有很多不同的样例,我们找到basic模块下的flow包,这个包下面就是对应的限流的样例,但是限流也有很多种类型的限流,我们就找根据qps限流的类看吧,其他的限流方式原理上都大差不差。

public class FlowQpsDemo {
 
    private static final String KEY = "abc";
 
    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();
 
    private static volatile boolean stop = false;
 
    private static final int threadCount = 32;
 
    private static int seconds = 30;
 
    public static void main(String[] args) throws Exception {
        initFlowQpsRule();
 
        tick();
        // first make the system run on a very low condition
        simulateTraffic();
 
        System.out.println("===== begin to do flow control");
        System.out.println("only 20 requests per second can pass");
 
    }
 
    private static void initFlowQpsRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(KEY);
        // set limit qps to 20
        rule1.setCount(20);
        // 设置限流类型:根据qps
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule1.setLimitApp("default");
        rules.add(rule1);
        // 加载限流的规则
        FlowRuleManager.loadRules(rules);
    }
 
    private static void simulateTraffic() {
        for (int i = 0; i < threadCount; i++) {
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
    }
 
    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }
 
    static class TimerTask implements Runnable {
 
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");
 
            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;
            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;
 
                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;
 
                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;
 
                System.out.println(seconds + " send qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                    + ", pass:" + oneSecondPass
                    + ", block:" + oneSecondBlock);
 
                if (seconds-- <= 0) {
                    stop = true;
                }
            }
 
            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                + ", block:" + block.get());
            System.exit(0);
        }
    }
 
    static class RunTask implements Runnable {
        @Override
        public void run() {
            while (!stop) {
                Entry entry = null;
 
                try {
                    entry = SphU.entry(KEY);
                    // token acquired, means pass
                    pass.addAndGet(1);
                } catch (BlockException e1) {
                    block.incrementAndGet();
                } catch (Exception e2) {
                    // biz exception
                } finally {
                    total.incrementAndGet();
                    if (entry != null) {
                        entry.exit();
                    }
                }
 
                Random random2 = new Random();
                try {
                    TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
    }
}

执行上面的代码后,打印出如下的结果:

可以看到,上面的结果中,pass的数量和我们的预期并不相同,我们预期的是每秒允许pass的请求数是20个,但是目前有很多pass的请求数是超过20个的。

原因是,我们这里测试的代码使用了多线程,注意看 threadCount 的值,一共有32个线程来模拟,而在RunTask的run方法中执行资源保护时,即在 SphU.entry 的内部是没有加锁的,所以就会导致在高并发下,pass的数量会高于20。

可以用下面这个模型来描述下,有一个TimeTicker线程在做统计,每1秒钟做一次。有N个RunTask线程在模拟请求,被访问的business code被资源key保护着,根据规则,每秒只允许20个请求通过。

由于pass、block、total等计数器是全局共享的,而多个RunTask线程在执行SphU.entry申请获取entry时,内部没有锁保护,所以会存在pass的个数超过设定的阈值。

那为了证明在单线程下限流的正确性与可靠性,那我们的模型就应该变成了这样:

那接下来我把 threadCount 的值改为1,只有一个线程来执行这个方法,看下具体的限流结果,执行上面的代码后打印的结果如下:

sentinel-basic-demo-single-thread-flow-qps-result.png

可以看到pass数基本上维持在20,但是第一次统计的pass值还是超过了20。这又是什么原因导致的呢?

其实仔细看下Demo中的代码可以发现,模拟请求是用的一个线程,统计结果是用的另外一个线程,统计线程每1秒钟统计一次结果,这两个线程之间是有时间上的误差的。从TimeTicker线程打印出来的时间戳可以看出来,虽然每隔一秒进行统计,但是当前打印时的时间和上一次的时间还是有误差的,不完全是1000ms的间隔。

要真正验证每秒限制20个请求,保证数据的精准性,需要做基准测试,这个不是本篇文章的重点,有兴趣的同学可以去了解下jmh,sentinel中的基准测试也是通过jmh做的。

深入原理

通过一个简单的示例程序,我们了解了sentinel可以对请求进行限流,除了限流外,还有降级和系统保护等功能。那现在我们就拨开云雾,深入源码内部去一窥sentinel的实现原理吧。

首先从入口开始:SphU.entry() 。这个方法会去申请一个entry,如果能够申请成功,则说明没有被限流,否则会抛出BlockException,表面已经被限流了。

从 SphU.entry() 方法往下执行会进入到 Sph.entry() ,Sph的默认实现类是 CtSph ,在CtSph中最终会执行到 entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException 这个方法。

我们来看一下这个方法的具体实现:

public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        // Init the entry only. No rule checking will occur.
        return new CtEntry(resourceWrapper, null, context);
    }
 
    if (context == null) {
        context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
    }
 
    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {
        return new CtEntry(resourceWrapper, null, context);
    }
 
    // 获取该资源对应的SlotChain
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
 
    /*
     * Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no
     * rule checking will be done.
     */
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }
 
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        // 执行Slot的entry方法
        chain.entry(context, resourceWrapper, null, count, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        // 抛出BlockExecption
        throw e1;
    } catch (Throwable e1) {
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

这个方法可以分为以下几个部分:

  • 1.对参数和全局配置项做检测,如果不符合要求就直接返回了一个CtEntry对象,不会再进行后面的限流检测,否则进入下面的检测流程。
  • 2.根据包装过的资源对象获取对应的SlotChain
  • 3.执行SlotChain的entry方法
    • 3.1.如果SlotChain的entry方法抛出了BlockException,则将该异常继续向上抛出
    • 3.2.如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回
  • 4.如果上层方法捕获了BlockException,则说明请求被限流了,否则请求能正常执行

其中比较重要的是第2、3两个步骤,我们来分解一下这两个步骤。

创建SlotChain

首先看一下lookProcessChain的方法实现:

private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
 
                // 具体构造chain的方法
                chain = Env.slotsChainbuilder.build();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

该方法使用了一个HashMap做了缓存,key是资源对象。这里加了锁,并且做了 double check 。具体构造chain的方法是通过: Env.slotsChainbuilder.build() 这句代码创建的。那就进入这个方法看看吧。

public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    chain.addLast(new NodeSelectorSlot());
    chain.addLast(new ClusterBuilderSlot());
    chain.addLast(new LogSlot());
    chain.addLast(new StatisticSlot());
    chain.addLast(new SystemSlot());
    chain.addLast(new AuthoritySlot());
    chain.addLast(new FlowSlot());
    chain.addLast(new DegradeSlot());
 
    return chain;
}

Chain是链条的意思,从build的方法可看出,ProcessorSlotChain是一个链表,里面添加了很多个Slot。具体的实现需要到DefaultProcessorSlotChain中去看。

public class DefaultProcessorSlotChain extends ProcessorSlotChain {
 
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, args);
        }
        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }
    };
    
    AbstractLinkedProcessorSlot<?> end = first;
 
    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }
 
    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }
}

DefaultProcessorSlotChain中有两个AbstractLinkedProcessorSlot类型的变量:first和end,这就是链表的头结点和尾节点。

创建DefaultProcessorSlotChain对象时,首先创建了首节点,然后把首节点赋值给了尾节点,可以用下图表示:

slot-chain-1.png

将第一个节点添加到链表中后,整个链表的结构变成了如下图这样:

slot-chain-2.png

将所有的节点都加入到链表中后,整个链表的结构变成了如下图所示:

slot-chain-3.png

这样就将所有的Slot对象添加到了链表中去了,每一个Slot都是继承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一种责任链的设计,每个对象中都有一个next属性,指向的是另一个AbstractLinkedProcessorSlot对象。其实责任链模式在很多框架中都有,比如Netty中是通过pipeline来实现的。

知道了SlotChain是如何创建的了,那接下来就要看下是如何执行Slot的entry方法的了。

执行SlotChain的entry方法

lookProcessChain方法获得的ProcessorSlotChain的实例是DefaultProcessorSlotChain,那么执行chain.entry方法,就会执行DefaultProcessorSlotChain的entry方法,而DefaultProcessorSlotChain的entry方法是这样的:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
    throws Throwable {
    first.transformEntry(context, resourceWrapper, t, count, args);
}

也就是说,DefaultProcessorSlotChain的entry实际是执行的first属性的transformEntry方法。

而transformEntry方法会执行当前节点的entry方法,在DefaultProcessorSlotChain中first节点重写了entry方法,具体如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, Object... args)
    throws Throwable {
    super.fireEntry(context, resourceWrapper, t, count, args);
}

first节点的entry方法,实际又是执行的super的fireEntry方法,那继续把目光转移到fireEntry方法,具体如下:

@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
    throws Throwable {
    if (next != null) {
        next.transformEntry(context, resourceWrapper, obj, count, args);
    }
}

从这里可以看到,从fireEntry方法中就开始传递执行entry了,这里会执行当前节点的下一个节点transformEntry方法,上面已经分析过了,transformEntry方法会触发当前节点的entry,也就是说fireEntry方法实际是触发了下一个节点的entry方法。具体的流程如下图所示:

slot-chain-entry-process.png

从图中可以看出,从最初的调用Chain的entry()方法,转变成了调用SlotChain中Slot的entry()方法。从上面的分析可以知道,SlotChain中的第一个Slot节点是NodeSelectorSlot。

执行Slot的entry方法

现在可以把目光转移到SlotChain中的第一个节点NodeSelectorSlot的entry方法中去了,具体的代码如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args)
    throws Throwable {
    
    DefaultNode node = map.get(context.getName());
    if (node == null) {
        synchronized (this) {
            node = map.get(context.getName());
            if (node == null) {
                node = Env.nodeBuilder.buildTreeNode(resourceWrapper, null);
                HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                cacheMap.putAll(map);
                cacheMap.put(context.getName(), node);
                map = cacheMap;
            }
            // Build invocation tree
            ((DefaultNode)context.getLastNode()).addChild(node);
        }
    }
 
    context.setCurNode(node);
    // 由此触发下一个节点的entry方法
    fireEntry(context, resourceWrapper, node, count, args);
}

从代码中可以看到,NodeSelectorSlot节点做了一些自己的业务逻辑处理,具体的大家可以深入源码继续追踪,这里大概的介绍下每种Slot的功能职责:

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatistcSlot 则用于记录,统计不同纬度的 runtime 信息;
  • FlowSlot 则用于根据预设的限流规则,以及前面 slot 统计的状态,来进行限流;
  • AuthorizationSlot 则根据黑白名单,来做黑白名单控制;
  • DegradeSlot 则通过统计信息,以及预设的规则,来做熔断降级;
  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

执行完业务逻辑处理后,调用了fireEntry()方法,由此触发了下一个节点的entry方法。此时我们就知道了sentinel的责任链就是这样传递的:每个Slot节点执行完自己的业务后,会调用fireEntry来触发下一个节点的entry方法。

所以可以将上面的图完整了,具体如下:

slot-chain-entry-whole-process.png

至此就通过SlotChain完成了对每个节点的entry()方法的调用,每个节点会根据创建的规则,进行自己的逻辑处理,当统计的结果达到设置的阈值时,就会触发限流、降级等事件,具体是抛出BlockException异常。

总结

sentinel主要是基于7种不同的Slot形成了一个链表,每个Slot都各司其职,自己做完分内的事之后,会把请求传递给下一个Slot,直到在某一个Slot中命中规则后抛出BlockException而终止。

前三个Slot负责做统计,后面的Slot负责根据统计的结果结合配置的规则进行具体的控制,是Block该请求还是放行。

控制的类型也有很多可选项:根据qps、线程数、冷启动等等。

然后基于这个核心的方法,衍生出了很多其他的功能:

  • 1、dashboard控制台,可以可视化的对每个连接过来的sentinel客户端 (通过发送heartbeat消息)进行控制,dashboard和客户端之间通过http协议进行通讯。
  • 2、规则的持久化,通过实现DataSource接口,可以通过不同的方式对配置的规则进行持久化,默认规则是在内存中的
  • 3、对主流的框架进行适配,包括servlet,dubbo,rRpc等

Dashboard控制台

sentinel-dashboard是一个单独的应用,通过spring-boot进行启动,主要提供一个轻量级的控制台,它提供机器发现、单机资源实时监控、集群资源汇总,以及规则管理的功能。

我们只需要对应用进行简单的配置,就可以使用这些功能。

1 启动控制台

1.1 下载代码并编译控制台

  • 下载 控制台 工程
  • 使用以下命令将代码打包成一个 fat jar: mvn clean package

1.2 启动

使用如下命令启动编译后的控制台:

 

$ java -Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -jar target/sentinel-dashboard.jar

上述命令中我们指定了一个JVM参数,-Dserver.port=8080 用于指定 Spring Boot 启动端口为 8080

2 客户端接入控制台

控制台启动后,客户端需要按照以下步骤接入到控制台。

2.1 引入客户端jar包

通过 pom.xml 引入 jar 包:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-transport-simple-http</artifactId>
    <version>x.y.z</version>
</dependency>

2.2 配置启动参数

启动时加入 JVM 参数 -Dcsp.sentinel.dashboard.server=consoleIp:port 指定控制台地址和端口。若启动多个应用,则需要通过 -Dcsp.sentinel.api.port=xxxx 指定客户端监控 API 的端口(默认是 8719)。

除了修改 JVM 参数,也可以通过配置文件取得同样的效果。更详细的信息可以参考 启动配置项

2.3 触发客户端初始化

确保客户端有访问量,Sentinel 会在客户端首次调用的时候进行初始化,开始向控制台发送心跳包。

sentinel-dashboard是一个独立的web应用,可以接受客户端的连接,然后与客户端之间进行通讯,他们之间使用http协议进行通讯。他们之间的关系如下图所示:

dashboard-client-transport.png

dashboard

dashboard启动后会等待客户端的连接,具体的做法是在 MachineRegistryController 中有一个 receiveHeartBeat 的方法,客户端发送心跳消息,就是通过http请求这个方法。

dashboard接收到客户端的心跳消息后,会把客户端的传递过来的ip、port等信息封装成一个 MachineInfo 对象,然后将该对象通过 MachineDiscovery 接口的 addMachine 方法添加到一个ConcurrentHashMap中保存起来。

这里会有问题,因为客户端的信息是保存在dashboard的内存中的,所以当dashboard应用重启后,之前已经发送过来的客户端信息都会丢失掉。

client

client在启动时,会通过CommandCenterInitFunc选择一个,并且只选择一个CommandCenter进行启动。

启动之前会通过spi的方式扫描获取到所有的CommandHandler的实现类,然后将所有的CommandHandler注册到一个HashMap中去,待后期使用。

PS:考虑一下,为什么CommandHandler不需要做持久化,而是直接保存在内存中。

注册完CommandHandler之后,紧接着就启动CommandCenter了,目前CommandCenter有两个实现类:

  • SimpleHttpCommandCenter 通过ServerSocket启动一个服务端,接受socket连接
  • NettyHttpCommandCenter 通过Netty启动一个服务端,接受channel连接

CommandCenter启动后,就等待dashboard发送消息过来了,当接收到消息后,会把消息通过具体的CommandHandler进行处理,然后将处理的结果返回给dashboard。

这里需要注意的是,dashboard给client发送消息是通过异步的httpClient进行发送的,在HttpHelper类中。

但是诡异的是,既然通过异步发送了,又通过一个CountDownLatch来等待消息的返回,然后获取结果,那这样不就失去了异步的意义的吗?具体的代码如下:

private String httpGetContent(String url) {
    final HttpGet httpGet = new HttpGet(url);
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicReference<String> reference = new AtomicReference<>();
    httpclient.execute(httpGet, new FutureCallback<HttpResponse>() {
        @Override
        public void completed(final HttpResponse response) {
            try {
                reference.set(getBody(response));
            } catch (Exception e) {
                logger.info("httpGetContent " + url + " error:", e);
            } finally {
                latch.countDown();
            }
        }
 
        @Override
        public void failed(final Exception ex) {
            latch.countDown();
            logger.info("httpGetContent " + url + " failed:", ex);
        }
 
        @Override
        public void cancelled() {
            latch.countDown();
        }
    });
    try {
        latch.await(5, TimeUnit.SECONDS);
    } catch (Exception e) {
        logger.info("wait http client error:", e);
    }
    return reference.get();
}
 

主流框架的适配

sentinel也对一些主流的框架进行了适配,使得在使用主流框架时,也可以享受到sentinel的保护。目前已经支持的适配器包括以下这些:

  • Web Servlet
  • Dubbo
  • Spring Boot / Spring Cloud
  • gRPC
  • Apache RocketMQ

其实做适配就是通过那些主流框架的扩展点,然后在扩展点上加入sentinel限流降级的代码即可。拿Servlet的适配代码看一下,具体的代码是:

public class CommonFilter implements Filter {
 
    @Override
    public void init(FilterConfig filterConfig) {
 
    }
 
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
        throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest)request;
        Entry entry = null;
 
        try {
            // 根据请求生成的资源
            String target = FilterUtil.filterTarget(sRequest);
            target = WebCallbackManager.getUrlCleaner().clean(target);
 
            // “申请”该资源
            ContextUtil.enter(target);
            entry = SphU.entry(target, EntryType.IN);
 
            // 如果能成功“申请”到资源,则说明未被限流
            // 则将请求放行
            chain.doFilter(request, response);
        } catch (BlockException e) {
            // 否则如果捕获了BlockException异常,说明请求被限流了
            // 则将请求重定向到一个默认的页面
            HttpServletResponse sResponse = (HttpServletResponse)response;
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse);
        } catch (IOException e2) {
            // 省略部分代码
        } finally {
            if (entry != null) {
                entry.exit();
            }
            ContextUtil.exit();
        }
    }
 
    @Override
    public void destroy() {
 
    }
}

 

通过Servlet的Filter进行扩展,实现一个Filter,然后在doFilter方法中对请求进行限流控制,如果请求被限流则将请求重定向到一个默认页面,否则将请求放行给下一个Filter。

规则持久化,动态化

Sentinel 的理念是开发者只需要关注资源的定义,当资源定义成功,可以动态增加各种流控降级规则。

Sentinel 提供两种方式修改规则:

  • 通过 API 直接修改 (loadRules)
  • 通过DataSource适配不同数据源修改

通过 API 修改比较直观,可以通过以下三个 API 修改不同的规则:

FlowRuleManager.loadRules(List<FlowRule> rules); // 修改流控规则
DegradeRuleManager.loadRules(List<DegradeRule> rules); // 修改降级规则
SystemRuleManager.loadRules(List<SystemRule> rules); // 修改系统规则

DataSource 扩展

上述 loadRules() 方法只接受内存态的规则对象,但应用重启后内存中的规则就会丢失,更多的时候规则最好能够存储在文件、数据库或者配置中心中。

DataSource 接口给我们提供了对接任意配置源的能力。相比直接通过 API 修改规则,实现 DataSource 接口是更加可靠的做法。

官方推荐通过控制台设置规则后将规则推送到统一的规则中心,用户只需要实现 DataSource 接口,来监听规则中心的规则变化,以实时获取变更的规则

DataSource 拓展常见的实现方式有:

  • 拉模式:客户端主动向某个规则管理中心定期轮询拉取规则,这个规则中心可以是 SQL、文件,甚至是 VCS 等。这样做的方式是简单,缺点是无法及时获取变更;
  • 推模式:规则中心统一推送,客户端通过注册监听器的方式时刻监听变化,比如使用 Nacos、Zookeeper 等配置中心。这种方式有更好的实时性和一致性保证。

 

 

我们已经知道了sentinel实现限流降级的原理,其核心就是一堆Slot组成的调用链。

这里大概的介绍下每种Slot的功能职责:

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;

  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;

  • StatisticsSlot 则用于记录,统计不同维度的 runtime 信息;

  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

  • AuthoritySlot 则根据黑白名单,来做黑白名单控制;

  • FlowSlot 则用于根据预设的限流规则,以及前面 slot 统计的状态,来进行限流;

  • DegradeSlot 则通过统计信息,以及预设的规则,来做熔断降级;

每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了sentinel的责任链。

下面我们就来详细研究下这些Slot的原理。

NodeSelectorSlot

NodeSelectorSlot 是用来构造调用链的,具体的是将资源的调用路径,封装成一个一个的节点,再组成一个树状的结构来形成一个完整的调用链, NodeSelectorSlot是所有Slot中最关键也是最复杂的一个Slot,这里涉及到以下几个核心的概念:

  • Resource

资源是 Sentinel 的关键概念。它可以是 Java 应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它服务,甚至可以是一段代码。

只要通过 Sentinel API 定义的代码,就是资源,能够被 Sentinel 保护起来。大部分情况下,可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。

简单来说,资源就是 Sentinel 用来保护系统的一个媒介。源码中用来包装资源的类是: com.alibaba.csp.sentinel.slotchain.ResourceWrapper,他有两个子类: StringResourceWrapperMethodResourceWrapper,通过名字就知道可以将一段字符串或一个方法包装为一个资源。

打个比方,我有一个服务A,请求非常多,经常会被陡增的流量冲垮,为了防止这种情况,简单的做法,我们可以定义一个 Sentinel 的资源,通过该资源来对请求进行调整,使得允许通过的请求不会把服务A搞崩溃。

图片

每个资源的状态也是不同的,这取决于资源后端的服务,有的资源可能比较稳定,有的资源可能不太稳定。那么在整个调用链中,Sentinel 需要对不稳定资源进行控制。当调用链路中某个资源出现不稳定,例如,表现为 timeout,或者异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终导致雪崩的后果。

  • Context

上下文是一个用来保存调用链当前状态的元数据的类,每次进入一个资源时,就会创建一个上下文。并且相同的资源名只会创建一个上下文。一个Context中包含了三个核心的对象:

1)当前调用链的根节点:EntranceNode

2)当前的入口:Entry

3)当前入口所关联的节点:Node

上下文中只会保存一个当前正在处理的入口Entry,另外还会保存调用链的根节点。需要注意的是,每次进入一个新的资源时,都会创建一个新的上下文。

  • Entry

每次调用 SphU#entry() 都会生成一个Entry入口,该入口中会保存了以下数据:入口的创建时间,当前入口所关联的节点,当前入口所关联的调用源对应的节点。Entry是一个抽象类,他只有一个实现类,在CtSph中的一个静态类:CtEntry

  • Node

节点是用来保存某个资源的各种实时统计信息的,他是一个接口,通过访问节点,就可以获取到对应资源的实时状态,以此为依据进行限流和降级操作。

可能看到这里,大家还是比较懵,这么多类到底有什么用,接下来就让我们更进一步,挖掘一下这些类的作用,在这之前,我先给大家展示一下他们之间的关系,如下图所示:

图片

这里把几种Node的作用先大概介绍下:

节点作用
StatisticNode执行具体的资源统计操作
DefaultNode该节点持有指定上下文中指定资源的统计信息,当在同一个上下文中多次调用entry方法时,该节点可能下会创建有一系列的子节点。
另外每个DefaultNode中会关联一个ClusterNode
ClusterNode该节点中保存了资源的总体的运行时统计信息,包括rt,线程数,qps等等,相同的资源会全局共享同一个ClusterNode,不管他属于哪个上下文
EntranceNode该节点表示一棵调用链树的入口节点,通过他可以获取调用链树中所有的子节点

Context的创建与销毁

首先我们要清楚的一点就是,每次执行entry()方法,试图冲破一个资源时,都会生成一个上下文。这个上下文中会保存着调用链的根节点和当前的入口。

Context是通过ContextUtil创建的,具体的方法是trueEntry,代码如下:

protected static Context trueEnter(String name, String origin) {

    // 先从ThreadLocal中获取
    Context context = contextHolder.get();
    if (context == null) {
        // 如果ThreadLocal中获取不到Context
        // 则根据name从map中获取根节点,只要是相同的资源名,就能直接从map中获取到node
        Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
        DefaultNode node = localCacheNameMap.get(name);
        if (node == null) {
            // 省略部分代码
            try {
                LOCK.lock();
                node = contextNameNodeMap.get(name);
                if (node == null) {
                    // 省略部分代码
                    // 创建一个新的入口节点
                    node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                    // 省略部分代码
                }
            } finally {
                LOCK.unlock();
            }
        }
        // 创建一个新的Context,并设置Context的根节点,即设置EntranceNode
        context = new Context(node, name);
        context.setOrigin(origin);
        // 将该Context保存到ThreadLocal中去
        contextHolder.set(context);
    }
    return context;
}

上面的代码中我省略了部分代码,只保留了核心的部分。从源码中还是可以比较清晰的看出生成Context的过程:

  • 1.先从ThreadLocal中获取,如果能获取到直接返回,如果获取不到则继续第2步

  • 2.从一个static的map中根据上下文的名称获取,如果能获取到则直接返回,否则继续第3步

  • 3.加锁后进行一次double check,如果还是没能从map中获取到,则创建一个EntranceNode,并把该EntranceNode添加到一个全局的ROOT节点中去,然后将该节点添加到map中去(这部分代码在上述代码中省略了)

  • 4.根据EntranceNode创建一个上下文,并将该上下文保存到ThreadLocal中去,下一个请求可以直接获取

那保存在ThreadLocal中的上下文什么时候会清除呢?从代码中可以看到具体的清除工作在ContextUtil的exit方法中,当执行该方法时,会将保存在ThreadLocal中的context对象清除,具体的代码非常简单,这里就不贴代码了。

那ContextUtil.exit方法什么时候会被调用呢?有两种情况:一是主动调用ContextUtil.exit的时候,二是当一个入口Entry要退出,执行该Entry的trueExit方法的时候,此时会触发ContextUtil.exit的方法。但是有一个前提,就是当前Entry的父Entry为null时,此时说明该Entry已经是最顶层的根节点了,可以清除context。

调用链树

当在一个上下文中多次调用了 SphU#entry() 方法时,就会创建一棵调用链树。具体的代码在entry方法中创建CtEntry对象时:

CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
    super(resourceWrapper);
    this.chain = chain;
    this.context = context;
    // 获取 上下文 中上一次的入口
    parent = context.getCurEntry();
    if (parent != null) {
        // 然后将当前入口设置为上一次入口的子节点
        ((CtEntry)parent).child = this;
    }
    // 设置 上下文 的当前入口为该类本身
    context.setCurEntry(this);
}

这里可能看代码没有那么直观,可以用一些图形来描述一下这个过程。

构造树干

创建context

context的创建在上面已经分析过了,初始化的时候,context中的curEntry属性是没有值的,如下图所示:

图片

创建Entry

每创建一个新的Entry对象时,都会重新设置context的curEntry,并将context原来的curEntry设置为该新Entry对象的父节点,如下图所示:

图片

退出Entry

某个Entry退出时,将会重新设置context的curEntry,当该Entry是最顶层的一个入口时,将会把ThreadLocal中保存的context也清除掉,如下图所示:

图片

构造叶子节点

上面的过程是构造了一棵调用链的树,但是这棵树只有树干,没有叶子,那叶子节点是在什么时候创建的呢?DefaultNode就是叶子节点,在叶子节点中保存着目标资源在当前状态下的统计信息。通过分析,我们知道了叶子节点是在NodeSelectorSlot的entry方法中创建的。具体的代码如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, Object... args) throws Throwable {
    // 根据 上下文 的名称获取DefaultNode
    // 多线程环境下,每个线程都会创建一个context,
    // 只要资源名相同,则context的名称也相同,那么获取到的节点就相同
    DefaultNode node = map.get(context.getName());
    if (node == null) {
        synchronized (this) {
            node = map.get(context.getName());
            if (node == null) {
                // 如果当前 上下文 中没有该节点,则创建一个DefaultNode节点
                node = Env.nodeBuilder.buildTreeNode(resourceWrapper, null);
                // 省略部分代码
            }
            // 将当前node作为 上下文 的最后一个节点的子节点添加进去
            // 如果context的curEntry.parent.curNode为null,则添加到entranceNode中去
            // 否则添加到context的curEntry.parent.curNode中去
            ((DefaultNode)context.getLastNode()).addChild(node);
        }
    }
    // 将该节点设置为 上下文 中的当前节点
    // 实际是将当前节点赋值给context中curEntry的curNode
    // 在Context的getLastNode中会用到在此处设置的curNode
    context.setCurNode(node);
    fireEntry(context, resourceWrapper, node, count, args);
}

上面的代码可以分解成下面这些步骤: 1)获取当前上下文对应的DefaultNode,如果没有的话会为当前的调用新生成一个DefaultNode节点,它的作用是对资源进行各种统计度量以便进行流控; 2)将新创建的DefaultNode节点,添加到context中,作为「entranceNode」或者「curEntry.parent.curNode」的子节点; 3)将DefaultNode节点,添加到context中,作为「curEntry」的curNode。

上面的第2步,不是每次都会执行。我们先看第3步,把当前DefaultNode设置为context的curNode,实际上是把当前节点赋值给context中curEntry的curNode,用图形表示就是这样:

图片

多次创建不同的Entry,并且执行NodeSelectorSlot的entry方法后,就会变成这样一棵调用链树:

图片

PS:这里图中的node0,node1,node2可能是相同的node,因为在同一个context中从map中获取的node是同一个,这里只是为了表述的更清楚所以用了不同的节点名。

保存子节点

上面已经分析了叶子节点的构造过程,叶子节点是保存在各个Entry的curNode属性中的。

我们知道context中只保存了入口节点和当前Entry,那子节点是什么时候保存的呢,其实子节点就是上面代码中的第2步中保存的。

下面我们来分析上面的第2步的情况:

第一次调用NodeSelectorSlot的entry方法时,map中肯定是没有DefaultNode的,那就会进入第2步中,创建一个node,创建完成后会把该节点加入到context的lastNode的子节点中去。我们先看一下context的getLastNode方法:

public Node getLastNode() {
    // 如果curEntry不存在时,返回entranceNode
    // 否则返回curEntry的lastNode,
    // 需要注意的是curEntry的lastNode是获取的parent的curNode,
    // 如果每次进入的资源不同,就会每次都创建一个CtEntry,则parent为null
    // 所以curEntry.getLastNode()也为null
    if (curEntry != null && curEntry.getLastNode() != null) {
        return curEntry.getLastNode();
    } else {
        return entranceNode;
    }
}

代码中我们可以知道,lastNode的值可能是context中的entranceNode也可能是curEntry.parent.curNode,但是他们都是「DefaultNode」类型的节点,DefaultNode的所有子节点是保存在一个HashSet中的。

第一次调用getLastNode方法时,context中curEntry是null,因为curEntry是在第3步中才赋值的。所以,lastNode最初的值就是context的entranceNode。那么将node添加到entranceNode的子节点中去之后就变成了下面这样:

图片

紧接着再进入一次,资源名不同,会再次生成一个新的Entry,上面的图形就变成下图这样:

图片

此时再次调用context的getLastNode方法,因为此时curEntry的parent不再是null了,所以获取到的lastNode是curEntry.parent.curNode,在上图中可以很方便的看出,这个节点就是node0。那么把当前节点node1添加到lastNode的子节点中去,上面的图形就变成下图这样:

图片

然后将当前node设置给context的curNode,上面的图形就变成下图这样:

图片

假如再创建一个Entry,然后再进入一次不同的资源名,上面的图就变成下面这样:

图片

至此NodeSelectorSlot的基本功能已经大致分析清楚了。

PS:以上的分析是基于每次执行SphU.entry(name)时,资源名都是不一样的前提下。如果资源名都一样的话,那么生成的node都相同,则只会再第一次把node加入到entranceNode的子节点中去,其他的时候,只会创建一个新的Entry,然后替换context中的curEntry的值。

ClusterBuilderSlot

NodeSelectorSlot的entry方法执行完之后,会调用fireEntry方法,此时会触发ClusterBuilderSlot的entry方法。

ClusterBuilderSlot的entry方法比较简单,具体代码如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    if (clusterNode == null) {
        synchronized (lock) {
            if (clusterNode == null) {
                // Create the cluster node.
                clusterNode = Env.nodeBuilder.buildClusterNode();
                // 将clusterNode保存到全局的map中去
                HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<ResourceWrapper, ClusterNode>(16);
                newMap.putAll(clusterNodeMap);
                newMap.put(node.getId(), clusterNode);

                clusterNodeMap = newMap;
            }
        }
    }
    // 将clusterNode塞到DefaultNode中去
    node.setClusterNode(clusterNode);

    // 省略部分代码
    
    fireEntry(context, resourceWrapper, node, count, args);
}

NodeSelectorSlot的职责比较简单,主要做了两件事:

一、为每个资源创建一个clusterNode,然后把clusterNode塞到DefaultNode中去

二、将clusterNode保持到全局的map中去,用资源作为map的key

PS:一个资源只有一个ClusterNode,但是可以有多个DefaultNode

StatistcSlot

StatisticSlot负责来统计资源的实时状态,具体的代码如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    try {
        // 触发下一个Slot的entry方法
        fireEntry(context, resourceWrapper, node, count, args);
        // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
        // 统计信息
        node.increaseThreadNum();
        node.addPassRequest();
        // 省略部分代码
    } catch (BlockException e) {
        context.getCurEntry().setError(e);
        // Add block count.
        node.increaseBlockedQps();
        // 省略部分代码
        throw e;
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        // Should not happen
        node.increaseExceptionQps();
        // 省略部分代码
        throw e;
    }
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    DefaultNode node = (DefaultNode)context.getCurNode();
    if (context.getCurEntry().getError() == null) {
        long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
        if (rt > Constants.TIME_DROP_VALUE) {
            rt = Constants.TIME_DROP_VALUE;
        }
        node.rt(rt);
        // 省略部分代码
        node.decreaseThreadNum();
        // 省略部分代码
    }
    fireExit(context, resourceWrapper, count);
}

代码分成了两部分,第一部分是entry方法,该方法首先会触发后续slot的entry方法,即SystemSlot、FlowSlot、DegradeSlot等的规则,如果规则不通过,就会抛出BlockException,则会在node中统计被block的数量。反之会在node中统计通过的请求数和线程数等信息。第二部分是在exit方法中,当退出该Entry入口时,会统计rt的时间,并减少线程数。

这些统计的实时数据会被后续的校验规则所使用,具体的统计方式是通过 滑动窗口 来实现的。后面我会详细分析滑动窗口的原理。

SystemSlot

SystemSlot就是根据总的请求统计信息,来做流控,主要是防止系统被搞垮,具体的代码如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    SystemRuleManager.checkSystem(resourceWrapper);
    fireEntry(context, resourceWrapper, node, count, args);
}

public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
    // 省略部分代码
    // total qps
    double currentQps = Constants.ENTRY_NODE.successQps();
    if (currentQps > qps) {
        throw new SystemBlockException(resourceWrapper.getName(), "qps");
    }
    // total thread
    int currentThread = Constants.ENTRY_NODE.curThreadNum();
    if (currentThread ? maxThread) {
        throw new SystemBlockException(resourceWrapper.getName(), "thread");
    }
    double rt = Constants.ENTRY_NODE.avgRt();
    if (rt > maxRt) {
        throw new SystemBlockException(resourceWrapper.getName(), "rt");
    }
    // 完全按照RT,BBR算法来
    if (highestSystemLoadIsSet && getCurrentSystemAbgLoad() > highestSystemLoad) {
        if (currentThread > 1 && currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
            throw new SystemBlockException(resourceWrapper.getName(), "load");
        }
    }
}

其中的Constants.ENTRY_NODE是一个全局的ClusterNode,该节点的值是在StatisticsSlot中进行统计的。

AuthoritySlot

AuthoritySlot做的事也比较简单,主要是根据黑白名单进行过滤,只要有一条规则校验不通过,就抛出异常。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    AuthorityRuleManager.checkAuthority(resourceWrapper, context, node, count);
    fireEntry(context, resourceWrapper, node, count, args);
}

public static void checkAuthority(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException {
    if (authorityRules == null) {
        return;
    }
    // 根据资源名称获取相应的规则
    List<AuthorityRule> rules = authorityRules.get(resource.getName());
    if (rules == null) {
        return;
    }
    for (AuthorityRule rule : rules) {
        // 只要有一条规则校验不通过,就抛出AuthorityException
        if (!rule.passCheck(context, node, count)) {
            throw new AuthorityException(context.getOrigin());
        }
    }
}

FlowSlot

FlowSlot主要是根据前面统计好的信息,与设置的限流规则进行匹配校验,如果规则校验不通过则进行限流,具体的代码如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    FlowRuleManager.checkFlow(resourceWrapper, context, node, count);
    fireEntry(context, resourceWrapper, node, count, args);
}

public static void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException {
    List<FlowRule> rules = flowRules.get(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!rule.passCheck(context, node, count)) {
                throw new FlowException(rule.getLimitApp());
            }
        }
    }
}

DegradeSlot

DegradeSlot主要是根据前面统计好的信息,与设置的降级规则进行匹配校验,如果规则校验不通过则进行降级,具体的代码如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
    fireEntry(context, resourceWrapper, node, count, args);
}

public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException {
    List<DegradeRule> rules = degradeRules.get(resource.getName());
    if (rules != null) {
        for (DegradeRule rule : rules) {
            if (!rule.passCheck(context, node, count)) {
                throw new DegradeException(rule.getLimitApp());
            }
        }
    }
}

总结

sentinel的限流降级等功能,主要是通过一个SlotChain实现的。在链式插槽中,有7个核心的Slot,这些Slot各司其职,可以分为以下几种类型:

一、进行资源调用路径构造的NodeSelectorSlot和ClusterBuilderSlot

二、进行资源的实时状态统计的StatisticsSlot

三、进行系统保护,限流,降级等规则校验的SystemSlot、AuthoritySlot、FlowSlot、DegradeSlot

后面几个Slot依赖于前面几个Slot统计的结果。至此,每种Slot的功能已经基本分析清楚了。

 

基于滑动时间窗口的实时指标统计分析

上文中,我们了解了sentinel是如何构造资源调用链的,以及每种Slot的具体作用,其中最重要的一个Slot非StatisticSlot莫属,因为他做的事是其他所有的Slot的基础。包括各种限流,熔断的规则,都是基于StatisticSlot统计出来的结果进行规则校验的。下文我们将深入研究下sentinel是如何进行qps等指标的统计的,首先要确定的一点是,sentinel是基于滑动时间窗口来实现的。

化整为零

我们已经知道了Slot是从第一个往后一直传递到最后一个的,且当信息传递到StatisticSlot时,这里就开始进行统计了,统计的结果又会被后续的Slot所采用,作为规则校验的依据。我们先来看一段非常熟悉的代码,就是StatisticSlot中的entry方法:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
    try {
        // 触发下一个Slot的entry方法
        fireEntry(context, resourceWrapper, node, count, args);
        // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
        // 统计信息
        node.increaseThreadNum();
        node.addPassRequest();
        // 省略部分代码
    } catch (BlockException e) {
        context.getCurEntry().setError(e);
        // Add block count.
        node.increaseBlockdQps();
        // 省略部分代码
        throw e;
    } catch (Throwable e) {
        context.getCurEntry().setError(e);
        // Should not happen
        node.increaseExceptionQps();
        // 省略部分代码
        throw e;
    }
}

上面的代码注释写的已经很清晰了,简单的来说,StatisticSlot中就是做了三件事:

  • 1.通过node中的当前的实时统计指标信息进行规则校验

  • 2.如果通过了校验,则重新更新node中的实时指标数据

  • 3.如果被block或出现了异常了,则重新更新node中block的指标或异常指标

从上面的代码中可以很清晰的看到,所有的实时指标的统计都是在node中进行的。这里我们拿qps的指标进行分析,看sentinel是怎么统计出qps的,这里可以事先透露下他是通过滑动时间窗口来统计的,而滑动窗口就是本篇文章的重点。

DefaultNode和ClusterNode

我们可以看到 node.addPassRequest() 这段代码是在fireEntry执行之后执行的,这意味着,当前请求通过了sentinel的流控等规则,此时需要将当次请求记录下来,也就是执行 node.addPassRequest() 这行代码,现在我们进入这个代码看看。具体的代码如下所示:

@Override
public void addPassRequest() {
    super.addPassRequest();
    this.clusterNode.addPassRequest();
}

首先我们知道这里的node是一个 DefaultNode 实例,这里特别补充一个 DefaultNodeClusterNode 的区别:

DefaultNode:保存着某个resource在某个context中的实时指标,每个DefaultNode都指向一个ClusterNode

ClusterNode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个ClusterNode,不管他在哪个context中

StatisticNode

好了,知道了他们的区别后,我们再来看上面的代码,其实都是执行的 StatisticNode 对象的 addPassRequest 方法。进入这个方法中看下具体的代码:

private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL);

private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);

@Override
public void addPassRequest() {
    rollingCounterInSecond.addPass();
    rollingCounterInMinute.AddPass();
}

Metric

从代码中我们可以看到,具体的增加pass指标是通过一个叫 Metric 的接口进行操作的,并且是通过 ArrayMetric 这种实现类,现在我们在进入 ArrayMetric 中看一下。具体的代码如下所示:

private final WindowLeapArray data;

public ArrayMetric(int windowLength, int interval) {
    this.data = new WindowLeapArray(windowLength, interval);
}

@Override
public void addPass() {
    WindowWrap<Window> wrap = data.currentWindow();
    wrap.value().addPass();
}

LeapArray和Window

本以为在ArrayMetric中应该可以看到具体的统计操作了,谁知道又出现了一个叫 WindowLeapArray 的类,不过从名字上看有点 「窗口」 的意思了。继续跟代码,发现 wrap.value().addPass() 是执行的 wrap 对象所包装的 Window 对象的 addPass 方法,这里就是最终的增加qps中q的值的地方了。进入 Window 类中看一下,具体的代码如下:

private final LongAdder pass = new LongAdder();
private final LongAdder block = new LongAdder();
private final LongAdder exception = new LongAdder();
private final LongAdder rt = new LongAdder();
private final LongAdder success = new LongAdder();

public void addPass() {
    pass.add(1L);
}
public void addException() {
    exception.add(1L);
}
public void addBlock() {
    block.add(1L);
}
public void addSuccess() {
    success.add(1L);
}
public void addRT(long rt) {
    this.rt.add(rt);

    // Not thread-safe, but it's okay.
    if (rt < minRt) {
        minRt = rt;
    }
}

看到这里是不是就放心了,原来 Window 是通过 LongAdder 来保存各种指标的值的,看到 LongAdder 是不是立刻就想到 AtomicLong 了?但是这里为什么不用 AtomicLong ,而是用 LongAdder 呢?主要是 LongAdder 在高并发下有更好的吞吐量,代价是花费了更多的空间,典型的以空间换时间。

完整的流程

分析到这里我们已经把指标统计的完整链路理清楚了,可以用下面这张图来表示整个过程:

图片

有人可能会问了,你不是要分析滑动窗口的吗?搞了半天只画了一张图,而且图上还多了一个 timeId 之类的东西,这个根本没在上面出现过。

好了,现在我们就可以来分析具体的滑动窗口了,这里的 timeId 是用来表示一个 WindowWrap对象的时间id。为什么要用 timeId 来表示呢?我们可以看到每一个 WindowWrap 对象由三个部分组成:

  • windowStart: 时间窗口的开始时间,单位是毫秒

  • windowLength: 时间窗口的长度,单位是毫秒

  • value: 时间窗口的内容,在 WindowWrap 中是用泛型表示这个值的,但实际上就是 Window 类

我们先大致的了解下时间窗口的构成,后面会再来分析 timeId 的作用。首先一个时间窗口是用来在某个固定时间长度内保存一些统计值的虚拟概念。有了这个概念后,我们就可以通过时间窗口来计算统计一段时间内的诸如:qps,rt,threadNum等指标了。

继续深入

我们再回到 ArrayMetric 中看一下:

private final WindowLeapArray data;

public ArrayMetric(int windowLength, int interval) {
    this.data = new WindowLeapArray(windowLength, interval);
}

首先创建了一个 WindowLeapArray 对象,看一下 WindowLeapArray 类的代码:

public class WindowLeapArray extends LeapArray<Window> {
    public WindowLeapArray(int windowLengthInMs, int intervalInSec) {
        super(windowLengthInMs, intervalInSec);
    }
}

该对象的构造方法有两个参数:

  • windowLengthInMs :一个用毫秒做单位的时间窗口的长度

  • intervalInSec ,一个用秒做单位的时间间隔,这个时间间隔具体是做什么的,下面会分析。

然后 WindowLeapArray 继承自 LeapArray ,在初始化 WindowLeapArray 的时候,直接调用了父类的构造方法,再来看一下父类 LeapArray 的代码:

public abstract class LeapArray<T> {
    // 时间窗口的长度
    protected int windowLength;
    // 采样窗口的个数
    protected int sampleCount;
    // 以毫秒为单位的时间间隔
    protected int intervalInMs;
    
    // 采用的时间窗口数组
    protected AtomicReferenceArray<WindowWrap<T>> array;

    /**
     *
     * LeapArray对象
     * @param windowLength 时间窗口的长度,单位:毫秒
     * @param intervalInSec 统计的间隔,单位:秒
     */
    public LeapArray(int windowLength, int intervalInSec) {
        this.windowLength = windowLength;
        // 时间窗口的采样个数,默认为2个采样窗口
        this.sampleCount = intervalInSec * 1000 / windowLength;
        this.intervalInMs = intervalInSec * 1000;

        this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
    }
}

可以很清晰的看出来在 LeapArray 中创建了一个 AtomicReferenceArray 数组,用来对时间窗口中的统计值进行采样。通过采样的统计值再计算出平均值,就是我们需要的最终的实时指标的值了。

可以看到我在上面的代码中通过注释,标明了默认采样的时间窗口的个数是2个,这个值是怎么得到的呢?我们回忆一下 LeapArray 对象创建,是通过在 StatisticNode 中,new了一个 ArrayMetric ,然后将参数一路往上传递后创建的:

private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL);

SampleCountProperty.sampleCount 的默认值是2,所以第一个参数 windowLengthInMs 的值是 500ms,那么1秒钟是1000ms,每个时间窗口的长度是500ms,也就是说总共分了两个采样的时间窗口。

现在继续回到 ArrayMetric.addPass() 方法:

@Override
public void addPass() {
    WindowWrap<Window> wrap = data.currentWindow();
    wrap.value().addPass();
}

获取当前Window

我们已经分析了 wrap.value().addPass() ,现在只需要分析清楚 data.currentWindow()具体做了什么,拿到了当前时间窗口就可以 了。继续深入代码,最终定位到下面的代码:

@Override
public WindowWrap<Window> currentWindow(long time) {
    long timeId = time / windowLength;
    // Calculate current index.
    int idx = (int)(timeId % array.length());

    // Cut the time to current window start.
    long time = time - time % windowLength;
    while (true) {
        WindownWrap<Window> old = array.get(idx);
        if (old == null) {
            WindowWrap<Window> window = new WindowWrap<Window>(windowLength, time, new Window());
            if (array.compareAndSet(idx, null, window)) {
                return window;
            } else {
                Thread.yield();
            }
        } else if (time == old.windowStart()) {
            return old;
        } else if (time ? old.windowStart()) {
            if (addLock.tryLock()) {
                try {
                    // if (old is deprecated) then [LOCK] resetTo currentTime.
                    return resetWindowTo(old, time);
                } finally {
                    addLock.unlock();
                }
            } else {
                Thread.yield();
            }
        } else if (time < old.windowStart()) {
            // Cannot go through here.
            return new WindowWrap<Window>(windowLength, time, new Window());
        }
    }
}

初次看到这段代码时,可能会觉得有点懵,但是细细的分析一下,实际可以把他分成以下几步:

  • 1.根据当前时间,算出该时间的timeId,并根据timeId算出当前窗口在采样窗口数组中的索引idx

  • 2.根据当前时间算出当前窗口的应该对应的开始时间time,以毫秒为单位

  • 3.根据索引idx,在采样窗口数组中取得一个时间窗口old

  • 4.循环判断知道获取到一个当前时间窗口

    • 4.1.如果old为空,则创建一个时间窗口,并将它插入到array的第idx个位置,array上面已经分析过了,是一个 AtomicReferenceArray

    • 4.2.如果当前窗口的开始时间time与old的开始时间相等,那么说明old就是当前时间窗口,直接返回old

    • 4.3.如果当前窗口的开始时间time大于old的开始时间,则说明old窗口已经过时了,将old的开始时间更新为最新值:time,下个循环中会在步骤4.2中返回

    • 4.4.如果当前窗口的开始时间time小于old的开始时间,实际上这种情况是不可能存在的,因为time是当前时间,old是过去的一个时间

上面的代码有个比较容易混淆的地方,就是计算出来的当前时间窗口的开始时间,没有使用一个新的变量来表示,而是直接用time来表示。

另外timeId是会随着时间的增长而增加,当前时间每增长一个windowLength的长度,timeId就加1。但是idx不会增长,只会在0和1之间变换,因为array数组的长度是2,只有两个采样时间窗口。至于为什么默认只有两个采样窗口,个人觉得因为sentinel是比较轻量的框架。时间窗口中保存着很多统计数据,如果时间窗口过多的话,一方面会占用过多内存,另一方面时间窗口过多就意味着时间窗口的长度会变小,如果时间窗口长度变小,就会导致时间窗口过于频繁的滑动。

经过分析,加上注释,并将表示当前窗口开始时间的time变量,重命名成其他变量,使得代码更具可读性,调整后的代码如下:

@Override
public WindowWrap<Window> currentWindow(long time) {
    // time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个
    long timeId = time / windowLength;
    // Calculate current index.
    // idx被分成[0,arrayLength-1]中的某一个数,作为array数组中的索引
    int idx = (int)(timeId % array.length());

    // Cut the time to current window start.
    long currentWindowStart = time - time % windowLength;

    while (true) {
        // 从采样数组中根据索引获取缓存的时间窗口
        WindowWrap<Window> old = array.get(idx);
        // array数组长度不宜过大,否则old很多情况下都命中不了,就会创建很多个WindowWrap对象
        if (old == null) {
            // 如果没有获取到,则创建一个新的
            WindowWrap<Window> window = new WindowWrap<Window>(windowLength, currentWindowStart, new Window());
            // 通过CAS将新窗口设置到数组中去
            if (array.compareAndSet(idx, null, window)) {
                // 如果能设置成功,则将该窗口返回
                return window;
            } else {
                // 否则当前线程让出时间片,等待
                Thread.yield();
            }
        // 如果当前窗口的开始时间与old的开始时间相等,则直接返回old窗口
        
        } else if (currentWindowStart == old.windowStart()) {
            return old;
            // 如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口
            // 并将time设置为新的时间窗口的开始时间,此时窗口向前滑动
        } else if (currentWindowStart > old.windowStart()) {
            if (addLock.tryLock()) {
                try {
                    // if (old is deprecated) then [LOCK] resetTo currentTime.
                    return resetWindowTo(old, currentWindowStart);
                } finally {
                    addLock.unlock();
                }
            } else {
                Thread.yield();
            }
        // 这个条件不可能存在
        } else if (currentWindowStart < old.windowStart()) {
            // Cannot go through here.
            return new WindowWrap<Window>(windowLength, currentWindowStart, new Window());
        }
    }
}

看图理解

为了更好的理解,下面我用几幅图来描述下这个过程。

图片

初始的时候arrays数组中只有一个窗口(可能是第一个,也可能是第二个),每个时间窗口的长度是500ms,这就意味着只要当前时间与时间窗口的差值在500ms之内,时间窗口就不会向前滑动。例如,假如当前时间走到300或者500时,当前时间窗口仍然是相同的那个:

图片

时间继续往前走,当超过500ms时,时间窗口就会向前滑动到下一个,这时就会更新当前窗口的开始时间:

图片

时间继续往前走,只要不超过1000ms,则当前窗口不会发生变化:

图片

当时间继续往前走,当前时间超过1000ms时,就会再次进入下一个时间窗口,此时arrays数组中的窗口将会有一个失效,会有另一个新的窗口进行替换:

图片

以此类推随着时间的流逝,时间窗口也在发生变化,在当前时间点中进入的请求,会被统计到当前时间对应的时间窗口中。计算qps时,会用当前采样的时间窗口中对应的指标统计值除以时间间隔,就是具体的qps。具体的代码在StatisticNode中:

@Override
public long totalQps() {
    return passQps() + blockQps();
}

@Override
public long blockedQps() {
    return rollingCounterInSecond.block() / IntervalProperty.INTERVAL;
}

@Override
public long passQps() {
    return rollingCounterInSecond.pass() / IntervalProperty.INTERVAL;
}

到这里就基本上把滑动窗口的原理分析清楚了,还有不清楚的地方,最好能够借助代码继续分析下,最好的做法就是debug,这里贴一下笔者在分析 currentWindow 方法时采取的测试代码:

public static void main(String[] args) throws InterruptedException {
    int windowLength = 500;
    int arrayLength = 2;
    calculate(windowLength,arrayLength);

    Thread.sleep(100);
    calculate(windowLength,arrayLength);

    Thread.sleep(200);
    calculate(windowLength,arrayLength);

    Thread.sleep(200);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);

    Thread.sleep(500);
    calculate(windowLength,arrayLength);
}

private static void calculate(int windowLength,int arrayLength) {
    long time = System.currentTimeMillis();
    long timeId = time/windowLength;
    long currentWindowStart = time - time % windowLength;
    int idx = (int)(timeId % arrayLength);
    System.out.println("time=" + time+", currentWindowStart="+currentWindowStart+"timeId="+timeId +",idx="+idx);
}

这里假设时间窗口的长度为500ms,数组的大小为2,当前时间作为输入参数,计算出当前时间窗口的timeId、windowStart、idx等值。执行上面的代码后,将打印出如下的结果:

可以看出来,windowStart每增加500ms,timeId就加1,这时就是时间窗口发生滑动的时候。

 

 

 

 

至此,sentinel的基本情况都已经分析了,更加详细的内容,可以继续阅读源码来研究。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐