Redian新闻
>
项目自从用了接口请求合并,效率直接加倍!

项目自从用了接口请求合并,效率直接加倍!

公众号新闻

👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料: 

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:

  • Boot 地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 地址:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn

来源:blog.csdn.net/baidu_19473529


前言

请求合并到底有什么意义呢?我们来看下图。

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

技术手段

  • LinkedBlockQueue 阻塞队列
  • ScheduledThreadPoolExecutor 定时任务线程池
  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

代码实现

查询用户的代码

public interface UserService {

    Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
}
@Service
public class UserServiceImpl implements UserService {

    @Resource
    private UsersMapper usersMapper;

    @Override
    public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
        // 全部参数
        List<Long> userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
        QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
        // 用in语句合并成一条SQL,避免多次请求数据库的IO
        queryWrapper.in("id", userIds);
        List<Users> users = usersMapper.selectList(queryWrapper);
        Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
        HashMap<String, Users> result = new HashMap<>();
        userReqs.forEach(val -> {
            List<Users> usersList = userGroup.get(val.getUserId());
            if (!CollectionUtils.isEmpty(usersList)) {
                result.put(val.getRequestId(), usersList.get(0));
            } else {
                // 表示没数据
                result.put(val.getRequestId(), null);
            }
        });
        return result;
    }
}

合并请求的实现

 package com.springboot.sample.service.impl;

import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

/***
 * zzq
 * 包装成批量执行的地方
 * */

@Service
public class UserWrapBatchService {
    @Resource
    private UserService userService;

    /**
     * 最大任务数
     **/

    public static int MAX_TASK_NUM = 100;


    /**
     * 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
     * CompletableFuture将处理结果返回
     */

    public class Request {
        // 请求id 唯一
        String requestId;
        // 参数
        Long userId;
        //TODO Java 8 的 CompletableFuture 并没有 timeout 机制
        CompletableFuture<Users> completableFuture;

        public String getRequestId() {
            return requestId;
        }

        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }

        public Long getUserId() {
            return userId;
        }

        public void setUserId(Long userId) {
            this.userId = userId;
        }

        public CompletableFuture getCompletableFuture() {
            return completableFuture;
        }

        public void setCompletableFuture(CompletableFuture completableFuture) {
            this.completableFuture = completableFuture;
        }
    }

    /*
    LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
    LinkedBlockingQueue与ArrayBlockingQueue的区别
    ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
    ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
    两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
    而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
    也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
     */

    private final Queue<Request> queue = new LinkedBlockingQueue();

    @PostConstruct
    public void init() {
        //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        scheduledExecutorService.scheduleAtFixedRate(() -> {
            int size = queue.size();
            //如果队列没数据,表示这段时间没有请求,直接返回
            if (size == 0) {
                return;
            }
            List<Request> list = new ArrayList<>();
            System.out.println("合并了 [" + size + "] 个请求");
            //将队列的请求消费到一个集合保存
            for (int i = 0; i < size; i++) {
                // 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
                if (i < MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
            List<Request> userReqs = new ArrayList<>();
            for (Request request : list) {
                userReqs.add(request);
            }
            //将参数传入service处理, 这里是本地服务,也可以把userService 看成RPC之类的远程调用
            Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
            //将处理结果返回各自的请求
            for (Request request : list) {
                Users result = response.get(request.requestId);
                request.completableFuture.complete(result);    //completableFuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了
            }
        }, 10010, TimeUnit.MILLISECONDS);
        //scheduleAtFixedRate是周期性执行 schedule是延迟执行 initialDelay是初始延迟 period是周期间隔 后面是单位
        //这里我写的是 初始化后100毫秒后执行,周期性执行10毫秒执行一次
    }

    public Users queryUser(Long userId) {
        Request request = new Request();
        // 这里用UUID做请求id
        request.requestId = UUID.randomUUID().toString().replace("-""");
        request.userId = userId;
        CompletableFuture<Users> future = new CompletableFuture<>();
        request.completableFuture = future;
        //将对象传入队列
        queue.offer(request);
        //如果这时候没完成赋值,那么就会阻塞,直到能够拿到值
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

控制层调用

/***
 * 请求合并
 * */

@RequestMapping("/merge")
public Callable<Users> merge(Long userId) {
    return new Callable<Users>() {
        @Override
        public Users call() throws Exception {
            return userBatchService.queryUser(userId);
        }
    };
}

Callable是什么可以参考:

  • https://blog.csdn.net/baidu_19473529/article/details/123596792

模拟高并发查询的代码

package com.springboot.sample;

import org.springframework.web.client.RestTemplate;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class TestBatch {
    private static int threadCount = 30;

    private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount); //为保证30个线程同时并发运行

    private static final RestTemplate restTemplate = new RestTemplate();

    public static void main(String[] args) {


        for (int i = 0; i < threadCount; i++) {//循环开30个线程
            new Thread(new Runnable() {
                public void run() {
                    COUNT_DOWN_LATCH.countDown();//每次减一
                    try {
                        COUNT_DOWN_LATCH.await(); //此处等待状态,为了让30个线程同时进行
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    for (int j = 1; j <= 3; j++) {
                        int param = new Random().nextInt(4);
                        if (param <=0){
                            param++;
                        }
                        String responseBody = restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class);
                        System.out.println(Thread.currentThread().getName() + "参数 " + param + " 返回值 " + responseBody);
                    }
                }
            }).start();

        }
    }
}

测试效果

要注意的问题

  • Java 8 的 CompletableFuture 并没有 timeout 机制
  • 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行(本例中加了MAX_TASK_NUM判断)

使用队列的超时解决Java 8 的 CompletableFuture 并没有 timeout 机制

核心代码

package com.springboot.sample.service.impl;

import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

/***
 * zzq
 * 包装成批量执行的地方,使用queue解决超时问题
 * */

@Service
public class UserWrapBatchQueueService {
    @Resource
    private UserService userService;

    /**
     * 最大任务数
     **/

    public static int MAX_TASK_NUM = 100;


    /**
     * 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
     * CompletableFuture将处理结果返回
     */

    public class Request {
        // 请求id
        String requestId;

        // 参数
        Long userId;
        // 队列,这个有超时机制
        LinkedBlockingQueue<Users> usersQueue;


        public String getRequestId() {
            return requestId;
        }

        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }

        public Long getUserId() {
            return userId;
        }

        public void setUserId(Long userId) {
            this.userId = userId;
        }

        public LinkedBlockingQueue<Users> getUsersQueue() {
            return usersQueue;
        }

        public void setUsersQueue(LinkedBlockingQueue<Users> usersQueue) {
            this.usersQueue = usersQueue;
        }
    }

    /*
    LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
    LinkedBlockingQueue与ArrayBlockingQueue的区别
    ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
    ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
    两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
    而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
    也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
     */

    private final Queue<Request> queue = new LinkedBlockingQueue();

    @PostConstruct
    public void init() {
        //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        scheduledExecutorService.scheduleAtFixedRate(() -> {
            int size = queue.size();
            //如果队列没数据,表示这段时间没有请求,直接返回
            if (size == 0) {
                return;
            }
            List<Request> list = new ArrayList<>();
            System.out.println("合并了 [" + size + "] 个请求");
            //将队列的请求消费到一个集合保存
            for (int i = 0; i < size; i++) {
                // 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
                if (i < MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
            List<Request> userReqs = new ArrayList<>();
            for (Request request : list) {
                userReqs.add(request);
            }
            //将参数传入service处理, 这里是本地服务,也可以把userService 看成RPC之类的远程调用
            Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
            for (Request userReq : userReqs) {
                // 这里再把结果放到队列里
                Users users = response.get(userReq.getRequestId());
                userReq.usersQueue.offer(users);
            }

        }, 10010, TimeUnit.MILLISECONDS);
        //scheduleAtFixedRate是周期性执行 schedule是延迟执行 initialDelay是初始延迟 period是周期间隔 后面是单位
        //这里我写的是 初始化后100毫秒后执行,周期性执行10毫秒执行一次
    }

    public Users queryUser(Long userId) {
        Request request = new Request();
        // 这里用UUID做请求id
        request.requestId = UUID.randomUUID().toString().replace("-""");
        request.userId = userId;
        LinkedBlockingQueue<Users> usersQueue = new LinkedBlockingQueue<>();
        request.usersQueue = usersQueue;
        //将对象传入队列
        queue.offer(request);
        //取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
        try {
            return usersQueue.poll(3000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}
...省略..

    @Override
    public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
        // 全部参数
        List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
        QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
        // 用in语句合并成一条SQL,避免多次请求数据库的IO
        queryWrapper.in("id", userIds);
        List<Users> users = usersMapper.selectList(queryWrapper);
        Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
        HashMap<String, Users> result = new HashMap<>();
        // 数据分组
        userReqs.forEach(val -> {
            List<Users> usersList = userGroup.get(val.getUserId());
            if (!CollectionUtils.isEmpty(usersList)) {
                result.put(val.getRequestId(), usersList.get(0));
            } else {
                // 表示没数据 , 这里要new,不然加入队列会空指针
                result.put(val.getRequestId(), new Users());
            }
        });
        return result;
    }

...省略...

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

代码地址

  • https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

参考

  • https://www.cnblogs.com/oyjg/p/13099998.html

欢迎加入我的知识星球,全面提升技术能力。

👉 加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
高通胀难消,美债收益率直冲5%挑战英伟达H100霸权!IBM模拟人脑造神经网络芯片,效率提升14倍,破解AI模型耗电难题Laronde遭FP合并,新公司价值近8亿美元,业务将关注「完全可编程药物」人生困境与白左自辩这家公司估值已超500亿美元! VinFast 完成SPAC合并,登陆纳斯达克成功上市物价再度失控上涨,美联储或再次加息,房贷利率直冲8%,23年来新纪录【推广】别瞎找啦!NU专属求职群,直接加这个!别瞎找啦!美国留学生专属求职群,直接加这个!5137 血壮山河之武汉会战 信罗战役 10【广而告之】别瞎找啦!UIUC留学生专属求职群,直接加这个!华人团队首次分化出蓝斑去甲肾上腺素能神经元,效率最高达60%,可用于药物筛选你是盲目自信、迷之自信、有条件自信还是无条件自信?第十章 美国社会的方方面面 (1)这样学习,效率高极了!网友:写作业有困难,也能找警察One Last Walk 最后一程 【小说】西部数据和铠侠再谈合并,全球最大NAND供应商或将易主拉布拉多刚被送去日托所20分钟就创下新纪录:退学了接一下突然要求所有接口都用 post 请求,为什么?零食很忙与赵一鸣合并,量贩零售赛道大变天西装是男人最好的医美,一条好的西裤魅力加倍!中国移动游戏发行公司重庆灏瀚公开宣布与SPAC合并,估值5亿美金这样开晨会,员工不累,效率加倍OpenAI董事会被踢爆曾与竞争对手Anthropic讨论合并,Altman 去留仍存变数托福90+,无SSAT,这些美东顶尖寄宿直接加入选校名单!(24年RD申请倒计时!)几秒生成高清商拍大片,电商效率直接Pro Max | 虹软科技推出PhotoStudio® AI资本扎堆、政策护航,国内脑机接口迎来高速成长期丨2023中国脑机接口行业研究报告超重磅!澳洲新八大官宣!正式合并!全澳第一大学或大变天!大批留学生秒变八大学生,澳洲大学掀起合并潮...USB-C 接口成主流,苹果其它产品更新USB-C 接口时间曝光刚宣!墨尔本Glen附近,3所私校合并,这届家长都在见证历史案例分析 | 如何从用户思维转到商业思维脑机接口如何改变未来? ——访“脑机接口之父”米格尔•尼科莱利斯首个国内《芯粒互联接口标准》Chiplet接口测试成功,北极雄芯公布新进展由中国人实际控制的SPAC:Mars Acquisition宣布与标的企业合并,估值1.5亿美金《薇藤花》&《无字碑》小模型编排,让 1+1>2,企业工作更灵活,效率更高
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。