Redian新闻
>
项目终于用上了 xxl-job!

项目终于用上了 xxl-job!

公众号新闻

👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料: 

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:

  • Boot 地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 地址:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn

来源:码猿技术专栏


任务调度是java项目中常用的一种组件,可以指定任务在何时进行触发,最熟悉的是spring框架里面的quartz

较流行的有一些分布式调度组件,比如elastic-job /azkaban ,都是基于quartz 二次开发

今天介绍一款分布式的任务调度框架:xxl-job

项目介绍

xxl-job是一款极容易学习上手的轻量级开源分布式调度框架,分为管理端和执行端两块,管理端负责配置任务信息以及查看任务执行日志,执行端只需要配置与管理端的连接信息就可以进行具体的任务逻辑开发了,目前版本还在持续迭代中,使用简单,功能强大,具体功能特性可以看下官方介绍。废话不多说,直接进入实战吧。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

实战

1.服务端部署

https://github.com/xuxueli/xxl-job下载项目,用mysql客户端工具Navicat执行项目根目录下doc/db/table_xxl_job.sql文件,库名自己可以自行修改,一共8张表,如下:

创建一个新的spring boot项目,将下载的xxl-job-admin目录下的文件以及pom.xml文件都拷贝到新建的项目中(如果不想新建项目可以直接用下载下来的项目进行修改部署),修改application.properties中的数据库连接信息。

小编是自己创建的新项目,需要手动改了pom.xml依赖xxl-job-core的版本为2.2.0

修改logback.xml中的日志输出路径。

好了,以上3步曲就搞定整个服务端配置了,启动项目,并访问http://localhost:8080/xxl-job-admin/ ,默认管理员账号admin/123456进行登录。

这交互,可以啊,是不是很带感。

2.执行端配置

创建一个新的module,跟服务端一样,也需要修改下logback.xml以及在pom.xml添加xxl-job-core的依赖。

为了模拟分布式效果,小编创建了2个配置文件来区分2个执行服务。

application-9998.properties

application-9999.properties

细心的童鞋会发现只有server.port和xxl.job.executor.port不同,执行器服务跟spring boot一样,自带内嵌tomcat,也会暴露一个端口注册到服务端,进行高可用负载。

创建一个java config类,定义一个使用配置的XxlJobSpringExecutor执行类,如下

配置2个启动配置,分别启动,效果如下:

完美启动2个服务,看下服务端平台是不是有这两台执行服务的注册信息。

注意:为了演示,事先创建了一个执行器,AppName一定要与配置文件中xxl.job.executor.appname一致。

3.任务开发

3.1 基于方法注解任务

话不多说,直接上代码把,毕竟代码是程序员最好的交流方式。

3.2 基于api任务

3.3 分片广播任务

上面是整理的比较实用的任务创建方式,个人偏好于注解形式,方法上加一个注解就完事了。

4.任务执行

剩下的就是傻白甜的界面操作了,走起。

4.1 单任务执行

创建一个路由策略为轮询的任务,指定corn表达式,并填入JobHandler为myJobAnnotationHandler,myJobAnnotationHandler其实就是spring IOC容器中管理bean的名称,有兴趣的童鞋可以看下源码。

为了演示效果,点击执行一次并进行任务参数输入。

轮询调用执行器服务效果如下:

4.2 子任务执行

更新任务,并指定子任务id为5,多个子任务的需要以逗号隔开

执行任务结果如下

4.3 分片广播任务执行

分片任务其实就是广播功能,每次触发,每个执行服务的业务执行类都会被调用,类似于kafka里面的不同消费组都要对同一个topic进行消费一样。

执行后的效果如下

太强势了,需要定时刷新项目中的配置信息,用这个方式很完美。

5.任务日志

任务日志其实是很重要的一块,方便回溯任务历史执行情况, 以便跟踪问题并矫正丢失的业务数据

查看调度备注,父子任务调度信息非常详细,子任务可以通过执行备注查看执行情况

查看控制台输出,里面的日志是执行器中XxlJobLogger类打印出来的

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

通信底层介绍

xxl-job 使用 netty http 的方式进行通信,虽然也支持 Mina,jetty,netty tcp 等方式,但是代码里面固定写死的是 netty http。

通信整体流程

我以调度器通知执行器执行任务为例,绘制的活动图:

活动图

惊艳的设计

看完了整个处理流程代码,设计上可以说独具匠心,将 netty,多线程的知识运用得行云流水。

我现在就将这些设计上出彩的点总结如下:

1. 使用动态代理模式,隐藏通信细节

xxl-job 定义了两个接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封装了向心跳,暂停,触发执行等操作,AdminBiz 封装了回调,注册,取消注册操作,接口的实现类中,并没有通信相关的处理。

XxlRpcReferenceBean 类的 getObject() 方法会生成一个代理类,这个代理类会进行远程通信。

2. 全异步处理

执行器收到消息进行反序列化,并没有同步执行任务代码,而是将任务信息存储在 LinkedBlockingQueue 中,异步线程从这个队列中获取任务信息,然后执行。

而任务的处理结果,也不是说处理完之后,同步返回的,也是放到回调线程的阻塞队列中,异步的将处理结果返回回去。

这样处理的好处就是减少了 netty 工作线程的处理时间,提升了吞吐量。

3. 对异步处理的包装

对异步处理进行了包装,代码看起来是同步调用的。

我们看下调度器,XxlJobTrigger 类触发任务执行的代码:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        //这里面做了很多异步处理,最终同步得到处理结果
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

ExecutorBiz.run 方法我们说过了,是走的动态代理,和执行器进行通信,执行器执行结果也是异步处理完,才返回的,而这里看到的 run 方法是同步等待处理结果返回。

我们看下xxl-job是如何同步获取处理结果的:调度器向执行器发出消息后,该线程阻塞。等到执行器处理完毕后,将处理结果返回,唤醒被阻塞的线程,调用处拿到返回值。

动态代理代码如下:

//代理类中的触发调用
if (CallType.SYNC == callType) {
   // future-response set
   XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
   try {
      // do invoke
      client.asyncSend(finalAddress, xxlRpcRequest);

      // future get
      XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
      if (xxlRpcResponse.getErrorMsg() != null) {
         throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
      }
      return xxlRpcResponse.getResult();
   } catch (Exception e) {
      logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

      throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
   } finally{
      // future-response remove
      futureResponse.removeInvokerFuture();
   }

XxlRpcFutureResponse 类中实现了线程的等待,和线程唤醒的处理:

//返回结果,唤醒线程
public void setResponse(XxlRpcResponse response) {
   this.response = response;
   synchronized (lock) {
      done = true;
      lock.notifyAll();
   }
}

@Override
    public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (!done) {
            synchronized (lock) {
                try {
                    if (timeout < 0) {
            //线程阻塞
                        lock.wait();
                    } else {
                        long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
                        lock.wait(timeoutMillis);
                    }
                } catch (InterruptedException e) {
                    throw e;
                }
            }
        }

        if (!done) {
            throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
        }
        return response;
    }

有的同学可能会问了,调度器接收到返回结果,怎么确定唤醒哪个线程呢?

每一次远程调用,都会生成 uuid 的请求 id,这个 id 是在整个调用过程中一直传递的,就像一把钥匙,在你回家的的时候,拿着它就带开门。

这里拿着请求 id 这把钥匙,就能找到对应的 XxlRpcFutureResponse,然后调用 setResponse 方法,设置返回值,唤醒线程。

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){

    // 通过requestId找到XxlRpcFutureResponse,
    final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
    if (futureResponse == null) {
        return;
    }
    if (futureResponse.getInvokeCallback()!=null) {

        // callback type
        try {
            executeResponseCallback(new Runnable() {
                @Override
                public void run() {
                    if (xxlRpcResponse.getErrorMsg() != null) {
                        futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                    } else {
                        futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                    }
                }
            });
        }catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    } else {
        // 里面调用lock的notify方法
        futureResponse.setResponse(xxlRpcResponse);
    }

    // do remove
    futureResponsePool.remove(requestId);

}

欢迎加入我的知识星球,全面提升技术能力。

👉 加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
为了让脾气大又死犟的熊孩子听话合作,我用上了兵法……920XXX,来了明天面试一个dream job 求求求bless!BK华男赢得漂亮!连庭都不用上了!果然坚持就是胜利!!!![干货] “加入俱乐部”,用 join 还 join in?暂缓摆烂步伐,然而只看钱的航司常旅客项目终究要到来于向真:湖南之行今早看到的,wework倒掉的影响【Citigroup considers deep job cuts】小说:兰欣与乌茶 34职场|香港打响“人才争夺战”,手握XXX的港漂是“香饽饽”?!AmEx 开始搞“Up To XXX”的“薛定谔的”开卡奖励了招11,000名工人!加州到拉斯维加斯高铁项目终于要开工了!《花信风之寒露》今天巴伐利亚州议会选举项目终于用上了插入式注解,真香!宾大荣誉教授发起新倡导,三年制美本项目终于要火起来了?市区 Loop 房源L03 | Studio$19xx, 1b$24xx/不收中介费/性价比最高的湖景公寓【友情转发】【Major Fair】| 华大课友Major Fair: 新生Huskies快来看看有没有你感兴趣的专业吧记一次 xxl-job 实战带香烟飞纽约被查?33岁男子“耍小聪明” 往行李藏9磅xxx 下地JFK就被捕“自己人” 买房泡汤了!(今日世界日报)市区South Loop房源SL06 | Studio$19xx, 1b$21xx, 2b$32xx/不收中介费14岁华裔男孩杀父弑母 重伤妹妹! 砍刀 猎枪全用上了! 报警撒谎有人闯入 弟弟目睹全程!分布式定时调度:xxl-job 最佳实践儿童肺炎支原体感染增多!关于用药,专家提醒→Jo Johnson警告:英国取消留学生毕业工签PSW将带来严重影响!政府立场难以捉摸...【友情转发】【Major Fair】| 华大Major Fair: 探索专业之旅!年薪$8万-10万5美金+H1B!硅谷公司诚招升学顾问!特稿丨世界首创!雄安“地下城”用上了2024 了,这些年轻人却用上了「老人机」招11,000名工人!南加到拉斯维加斯高铁项目终于要开工了!那些年穿过New Balance的人,才是真NB!最新 Amex Offers 汇总【eBay $15 返 $5, Exxon Mobile $25 返 $5】伦敦转租真便宜!步行10分钟到LCC和象堡,仅需£3XX/周,温布利多间高端公寓仅£2XX/周,点击查看更多!(1月10日更新)以为在美“钱途无限”?走线客被拍成功进入 可现实或是月薪$2xxx刷盘子端侧AI推理,高效部署PyTorch模型:官方新工具开源,Meta已经用上了Trader Joe's第15届年度最受顾客欢迎的产品出炉啦!下次逛Trader Joe’s就去试试看吧
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。