Quartz分布式调度原理

原创 吴就业 109 0 2021-12-29

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://www.wujiuye.com/article/211fa63f10164b9796a0de6d2168705f

作者:吴就业
链接:https://www.wujiuye.com/article/211fa63f10164b9796a0de6d2168705f
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

Quartz分布式调度的实现是去中心化的,需要依赖数据库在集群间同步调度状态,基于分布式锁实现一致性调度,而我们当前使用的xxl-job版本(1.9.x)的分布式调度又是基于Quartz实现,因此我们所了解到的xxl-job调度性能差,本质上就是Quartz的调度性能差。

在同一时刻需要触发的Job只有少量的情况下,我们看不到Quartz的性能缺陷,在Job数量明显增加情况下,我们就会发现,调度延迟会有明显增加。尽管横向扩展节点,调度延迟也不会降低,且整体调度性能没有明显好转,反而更糟糕。

我们将带着这些疑惑,分析Quartz 的分布式调度原理,以及过时恢复、故障恢复的实现原理。

核心调度流程分析

核心流程代码在org.quartz.core.QuartzSchedulerThread类的run方法,即首先是调度线程(只有一个线程)启动,QuartzSchedulerThread实例的run执行,源码如下。

class QuartzSchedulerThread{

    @Override

    public void run() {

        int acquiresFailed = 0;

        while (!halted.get()) {

            // .....

                    (1)

                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();

                if(availThreadCount > 0) {



                    // ....

                    (2)                

                    try {

                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(

                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

                       

                    } //.....



                    if (triggers != null && !triggers.isEmpty()) {

                        (3)        

                        now = System.currentTimeMillis();

                        long triggerTime = triggers.get(0).getNextFireTime().getTime();

                        long timeUntilTrigger = triggerTime - now;

                        while(timeUntilTrigger > 2) {

                            // wait......

                        }



                        // .....

                        if(goAhead) {

                            (4)

                            try {

                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

                                if(res != null)

                                    bndles = res;

                            } //....

                        }

                        (5)

                        for (int i = 0; i < bndles.size(); i++) {

                           // ..........

                            JobRunShell shell = null;

                            try {

                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);

                                shell.initialize(qs);

                            } catch (SchedulerException se) {

                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);

                                continue;

                            }

        

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {

                                (6)

                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);

                            }



                        }



                        continue; // while (!halted)

                    }

                } else { // if(availThreadCount > 0)

                    // should never happen, if threadPool.blockForAvailableThreads() follows contract

                    continue; // while (!halted)

                }



                //....

            

        } // while (!halted)



        // drop references to scheduler stuff to aid garbage collection...

        qs = null;

        qsRsrcs = null;

    }

}

流程分析:

  1. 检查当前线程池可用数量是否大于0,大于0才执行调度逻辑。
  2. 获取分布式锁,根据配置的maxBatchSize(默认maxBatchSize为1),获取30秒内需要调度的Job,根据执行时间排序,最后释放分布式锁。在获取Job时,会将状态由WAITING更新为ACQUIRED,更新成功才表示当时节点获取到了调度该Job的权限。
  3. 遍历调度这一批次任务,休眠等待到当前毫秒”至少有一个”Job(批量)需要触发执行。
  4. 获取分布式锁,批量触发调度,for遍历修改Job状态,由ACQUIRED改为EXECUTING状态,如果修改成功,则根据cron更新Job下一次执行时间,以及状态恢复为WAITING,最后释放分布式锁。
  5. for遍历执行这一批次Job,第(4)步将Job状态由ACQUIRED改为EXECUTING状态成功的Job,获取线程池,将Job放入线程池执行。
  6. 如果Job放入线程池失败,就获取一次分布式锁,修改Job状态为ERROR,然后释放锁。

假设只部署一个节点,且maxBatchSize最大为1,即不考虑分布式锁、不考虑批量的情况,那么quartz的调度流程可以简化为:

  1. 查询数据库,获取需要调度的job,并更新job的状态,由WAITING更新为ACQUIRED。
  2. 更新job状态,由ACQUIRED改为EXECUTING状态,如果修改成功,则根据cron更新Job下一次执行时间,以及状态恢复为WAITING。
  3. 如果第(2)步成功,则获取线程池,将Job放入线程池,线程池分配线程调用Job的execute方法执行Job。
  4. 继续循环步骤1。

故障恢复流程分析

故障恢复与错过恢复的启动入口都在JobStoreSupport#schedulerStarted方法中,由QuartzScheduler#start方法调用,源码如下。

class JobStoreSupport{



    public void schedulerStarted()throws SchedulerException{

        (1)

        if(isClustered()){

            //...

            clusterManagementThread.initialize();

        }else{

            (2)

            try{

                recoverJobs();

            }//....

        }

        (3)

        misfireHandler=new MisfireHandler();

        // ...

        misfireHandler.initialize();

        schedulerRunning=true;

        //...

      }

}

分布式调度下的故障恢复流程:

错过恢复流程分析

错过的原因主要有以下几种情况:

错过恢复流程:

#中间件

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

Dubbo为什么会提供泛化调用这个功能

Dubbo的泛化调用功能就类似于Java语言提供的泛型功能,目的都是通用。那为什么需要泛化调用功能呢?

基于扩展点,为dubbo支持跨业务调用

很多规模稍大点的公司,内部都会有多个业务部门,这些业务部门都有自己的业务产品。每个业务部门开发的产品部署的环境物理上也都是相对隔离的,但这些业务部门之间可能存在合作关系,业务关联,因此就有了跨业务RPC调用的需求。

kafka消息重复消费排查

业务使用我们基础部门封装的kafka组件,在一次版本迭代中,我们引入了offset缓存,正是这个缓存,在某种条件触发下,会导致出现消息重复消费现象。

重构XXL-JOB,使用响应式编程实现异步RPC提升调度吞吐量

如果同一时刻需要下发几百个执行job的请求给执行器,使用这种阻塞的RPC,意味着需要开启几百个线程,使用几百个连接发送请求,而这几百个线程都需要阻塞等待响应,Job越多,需要的线程数就会越多,对调动中心的性能影响就越大。

重构支持多租户的XXL-JOB,如何实现多个逻辑集群的均衡选主

我们基于XXL-JOB的架构原理,重新架构设计了支持多租户横向扩展的分布式任务调度平台。本篇介绍如何实现多个逻辑集群(多个租户逻辑上是独立的集群)的均衡选主。

BFE原生路由转发功能分析

为什么加上“原生”,因为我们基于BFE开发已经魔改了。路由转发是BFE作为一个七层流量代理服务的核心功能,BFE设计了一套支持多租户、多机房的路由转发模型。