Redian新闻
>
xxl-job惊艳的设计,怎能叫人不爱

xxl-job惊艳的设计,怎能叫人不爱

公众号新闻

点击上方“芋道源码”,选择“设为星标

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 10:33 更新文章,每天掉亿点点头发...

源码精品专栏

 

来源:c1n.cn/N8Mln


通信底层介绍

xxl-job 使用 netty http 的方式进行通信,虽然也支持 Mina,jetty,netty tcp 等方式,但是代码里面固定写死的是 netty http。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

通信整体流程

我以调度器通知执行器执行任务为例,绘制的活动图:

活动图

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

惊艳的设计

看完了整个处理流程代码,设计上可以说独具匠心,将 netty,多线程的知识运用得行云流水。

我现在就将这些设计上出彩的点总结如下:

使用动态代理模式,隐藏通信细节

xxl-job 定义了两个接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封装了向心跳,暂停,触发执行等操作,AdminBiz 封装了回调,注册,取消注册操作,接口的实现类中,并没有通信相关的处理。

XxlRpcReferenceBean 类的 getObject() 方法会生成一个代理类,这个代理类会进行远程通信。

全异步处理

执行器收到消息进行反序列化,并没有同步执行任务代码,而是将任务信息存储在 LinkedBlockingQueue 中,异步线程从这个队列中获取任务信息,然后执行。

而任务的处理结果,也不是说处理完之后,同步返回的,也是放到回调线程的阻塞队列中,异步的将处理结果返回回去。

这样处理的好处就是减少了 netty 工作线程的处理时间,提升了吞吐量。

对异步处理的包装

对异步处理进行了包装,代码看起来是同步调用的。

我们看下调度器,XxlJobTrigger 类触发任务执行的代码:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        //这里面做了很多异步处理,最终同步得到处理结果
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

ExecutorBiz.run 方法我们说过了,是走的动态代理,和执行器进行通信,执行器执行结果也是异步处理完,才返回的,而这里看到的 run 方法是同步等待处理结果返回。

我们看下xxl-job是如何同步获取处理结果的:调度器向执行器发出消息后,该线程阻塞。等到执行器处理完毕后,将处理结果返回,唤醒被阻塞的线程,调用处拿到返回值。

动态代理代码如下:

//代理类中的触发调用
if (CallType.SYNC == callType) {
   // future-response set
   XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
   try {
      // do invoke
      client.asyncSend(finalAddress, xxlRpcRequest);

      // future get
      XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
      if (xxlRpcResponse.getErrorMsg() != null) {
         throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
      }
      return xxlRpcResponse.getResult();
   } catch (Exception e) {
      logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

      throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
   } finally{
      // future-response remove
      futureResponse.removeInvokerFuture();
   }

XxlRpcFutureResponse 类中实现了线程的等待,和线程唤醒的处理:

//返回结果,唤醒线程
public void setResponse(XxlRpcResponse response) {
   this.response = response;
   synchronized (lock) {
      done = true;
      lock.notifyAll();
   }
}

@Override
    public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (!done) {
            synchronized (lock) {
                try {
                    if (timeout < 0) {
            //线程阻塞
                        lock.wait();
                    } else {
                        long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
                        lock.wait(timeoutMillis);
                    }
                } catch (InterruptedException e) {
                    throw e;
                }
            }
        }

        if (!done) {
            throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
        }
        return response;
    }

有的同学可能会问了,调度器接收到返回结果,怎么确定唤醒哪个线程呢?

每一次远程调用,都会生成 uuid 的请求 id,这个 id 是在整个调用过程中一直传递的,就像一把钥匙,在你回家的的时候,拿着它就带开门。

这里拿着请求 id 这把钥匙,就能找到对应的 XxlRpcFutureResponse,然后调用 setResponse 方法,设置返回值,唤醒线程。

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){

    // 通过requestId找到XxlRpcFutureResponse,
    final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
    if (futureResponse == null) {
        return;
    }
    if (futureResponse.getInvokeCallback()!=null) {

        // callback type
        try {
            executeResponseCallback(new Runnable() {
                @Override
                public void run() {
                    if (xxlRpcResponse.getErrorMsg() != null) {
                        futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                    } else {
                        futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                    }
                }
            });
        }catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    } else {
        // 里面调用lock的notify方法
        futureResponse.setResponse(xxlRpcResponse);
    }

    // do remove
    futureResponsePool.remove(requestId);

}


欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
这100个福字的设计,真的有用!为人生赋能的增额终身寿,我怎能不爱它这么多年终于知道,原来这么惊艳的背影是她!这才叫走心的绿化设计!你那只能叫乱种树!三女儿与父亲同行佛州(杰克逊维尔)聊一聊:今年让你最惊艳的电子产品是什么?封神!她是让奥巴马惊艳的耶鲁华裔才女,21岁凭天才设计征服全美这些奇葩反女性的设计,你真的知道吗?生病老人新常委; 渐行渐远二十大震撼!澳洲人太有福了,绝美公路旅行路线曝光!上帝调色盘打翻,造就了这样惊艳的澳洲!分布式定时调度:xxl-job 万字详解超惊艳的国风红绳!6种天然玉石,美到心坎里酱猪蹄、XXL超大鸡排、黄焖鸡米饭,湾区美食5折开吃!香奈儿是懂流量密码的!一整个珍珠墙的设计,新包“呼啦圈”再杀疯朋友圈,老板成瑞士首富!历史新低!飞利浦 XXL大容量 数字式智能 空气炸锅+烤架套装5.1折!行程卡正式下线!樱花雪山无敌惊艳的小城,我还欠她一场浪漫出逃走资派特色党还是为工农服务吗聊聊今年最让我感到惊艳的笔记本神奇的剧情!惊艳的结局!阿根廷惊险夺冠,梅西如愿捧杯母亲在养老院去世之后阿黛尔, 你是懂惊艳的!Adele就是那个造梦的人!3个「鸟巢」酒店的设计,网红酒店的新概念这样的设计,有点儿辣眼睛!这8种没用的设计,装完即后悔,怪不得99%网友都在吐槽!【装修干货】畅游法国(31)-富豪游乐场有哪些看起来很一般,但是吃起来很惊艳的水果?李铁,对得起我们吗?XXX,退钱!全新设计,来自德国IF红点设计获奖设计团队倾心之作:全触大屏通话智能手表一半古典一半时尚,她手作的设计美到惊艳!简约舒适的设计,缔造独特的个性!好多大咖啊!惊艳的不只是朱一龙一夜爆火!墨尔本西人司机中文搞笑拜年,圈粉无数!这个超暖的城市,怎能不爱?全球最昂贵建筑之一:耗资超50亿美元的洛杉矶索菲体育场打造令人惊艳的顶篷设计!只会用 xxl-job?更强大的新一代分布式任务调度框架来了!美爆了!火爆全网的COACH腕表新款,一眼惊艳的情人节赠礼!
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。