自研API网关实践之路
老网关使用 Spring Cloud Gateway (下称SCG)技术框架搭建,SCG基于webflux 编程范式,webflux是一种响应式编程理念,响应式编程对于提升系统吞吐率和性能有很大帮助; webflux 的底层构建在netty之上性能表现优秀;SCG属于spring生态的产物,具备开箱即用的特点,以较低的使用成本助力得物早期的业务快速发展;但是随着公司业务的快速发展,流量越来越大,网关迭代的业务逻辑越来越多,以及安全审计需求的不断升级和稳定性需求的提高,SCG在以下几个方面逐步暴露了一系列的问题。
网络安全
从网络安全角度来讲,对公网暴露接口无疑是一件风险极高的事情,网关是对外网络流量的重要桥梁,早期的接口暴露采用泛化路由的模式,即通过正则形式( /api/v1/app/order/** )的路由规则开放接口,单个应用服务往往只配置一个泛化路由,后续上线新接口时外部可以直接访问;这带来了极大的安全风险,很多时候业务开发的接口可能仅仅是内部调用,但是一不小心就被泛化路由开放到了公网,甚至很多时候没人讲得清楚某个服务具体有多少接口属于对外,多少对内;另一方面从监控数据来看,黑产势力也在不断对我们的接口做渗透试探。
协同效率
引入了接口注册机制,所有对外暴露接口逐一注册到网关,未注册接口不可访问,安全的问题得到了解决但同时带来了性能问题,SCG采用遍历方式匹配路由规则,接口注册模式推广后路由接口注册数量迅速提升到3W+,路由匹配性能出现严重问题;泛化路由的时代,一个服务只有一个路由配置,变动频率很低,配置工作由网关关开发人员负责,效率尚可,接口注册模式将路由工作转移到了业务开发同学的身上,这就得引入一套完整的路由审核流程,以提升协同效率;由于路由信息早期都存在配置中心,同时这么大的数据量给配置中心也带来极大的压力和稳定性风险。
性能与维护成本
业务迭代的不断增多,也使得API网关堆积了很多的业务逻辑,这些业务逻辑分散在不同的filter中,为了降低开发成本,网关只有一套主线分支,不同集群部署的代码完全相同,但是不同集群的业务属性不同,所需要的filter 逻辑是不一样的;如内网网关集群几乎没什么业务逻辑,但是App集群可能需要几十个filter的逻辑协同工作;这样的一套代码对内网网关而言,存在着大量的性能浪费;如何平衡维护成本和运行效率是个需要思考的问题。
稳定性风险
API网关作为基础服务,承载全站的流量出入,稳定性无疑是第一优先级,但其定位决定了绝不可能是一个简单的代理层,在稳定运行的同时依然需要承接大量业务需求,例如C端用户登录下线能力,App强升能力,B端场景下的鉴权能力等;很难想象较长一段时间以来,网关都保持着双周一次的发版频率;频繁的发版也带来了一些问题,实例启动初期有很多资源需要初始化,此时承接的流量处理时间较长,存在着明显的接口超时现象;早期的每次发版几乎都会导致下游服务的接口短时间内超时率大幅提高,而且往往涉及多个服务一起出现类似情况;为此甚至拉了一个网关发版公告群,提前置顶发版公告,让业务同学和NOC有一个心里预期;在发布升级期间尽可能让业务服务无感知这是个刚需。
定制能力
流量灰度是网关最常见的功能之一,对于新版本迭代,业务服务的某个节点发布新版本后希望引入少部分流量试跑观察,但很遗憾SCG原生并不支持,需要对负载均衡算法进行手动改写才可以,此外基于流量特征的定向节点路由也需要手动开发,在SCG中整个负载均衡算法属于比较核心的模块,不对外直接暴露,存在较高的改造成本。
B端业务和C端业务存在着很大的不同,例如对接口的响应时间的忍受度是不一样的,B端场景下下载一个报表用户可以接受等待10s或者1分钟,但是C端用户现在没有这个耐心。作为代理层针对以上的场景,我们需要针对不同接口定制不同的超时时间,原生的SCG显然也不支持。
诸如此类的定制需求还有很多,我们并不寄希望于开源产品能够开箱即用满足全部需求,但至少定制性拓展性足够好。上手改造成本低。
SCG主要使用了webflux技术,webflux的底层构建在reactor-netty之上,而reactor-netty构建于netty之上;SCG能够和spring cloud 的技术栈的各组件,完美适配,做到开箱即用,以较低的使用成本助力得物早期的业务快速发展;但是使用webflux也是需要付出一定成本,首先它会额外增加编码人员的心智负担,需要理解流的概念和常用的操作函数,诸如map, flatmap, defer 等等;其次异步非阻塞的编码形式,充斥着大量的回调函数,会导致顺序性业务逻辑被割裂开来,增加代码阅读理理解成本;此外经过多方面评估我们发现SCG存在以下缺点:
内存泄露问题
SCG存在较多的内存泄漏问题,排查困难,且官方迟迟未能修复,长期运行会导致服务触发OOM并宕机;以下为github上SCG官方开源仓库的待解决的内存泄漏问题,大约有16个之多。
SCG内存泄漏BUG
下图可以看到SCG在长期运行的过程中内存使用一直在增长,当增长到机器内存上限时当前节点将不可用,联系到网关单节点所承接的QPS 在几千,可想而知节点宕机带来的危害有多大;一段时间以来我们需要对SCG网关做定期重启。
SCG生产实例内存增长趋势
响应式编程范式复杂
基于webflux 中的flux 和mono ,在对request和response信息读取修改时,编码复杂度高,代码理解困难,下图是对body信息进行修改时的代码逻辑。
对requestBody 进行修改的方式
多层抽象的性能损耗
尽管相比于传统的阻塞式网关,SCG的性能已经足够优秀,但相比原生的netty仍然比较低下,SCG依赖于webflux编程范式,webflux构建于reactor-netty之上,reactor-netty 构建于netty 之上, 多层抽象存在较大的性能损耗。
一般认为程序调用栈越深性能越差;下图为只有一个filter的情况下的调用栈,可以看到存在大量的 webflux 中的 subscribe() 和onNext() 方法调用,这些方法的执行不关联任何业务逻辑,属于纯粹的框架运行层代码,粗略估算下没有引入任何逻辑的情况下SCG的调用栈深度在 90+ ,如果引入多个filter处理不同的业务逻辑,线程栈将进一步加深,当前网关的业务复杂度实际栈深度会达到120左右,也就是差不多有四分之三的非业务栈损耗,这个比例是有点夸张的。
SCG filter 调用栈深度
路由能力不完善
原生的的SCG并不支持动态路由管理,路由的配置信息通过大量的KV配置来做,平均一个路由配置需要三到四条KV配置信息来支撑,这些配置数据一般放在诸如Apollo或者ark 这样的配置中心,即使是添加了新的配置SCG并不能动态识别,需要引入动态刷新路由配置的能力。另一方面路由匹配算法通过遍历所有的路由信息逐一匹配的模式,当接口级别的路由数量急剧膨胀时,性能是个严重问题。
SCG路由匹配算法为On时间复杂度
预热时间长,冷启动RT尖刺大
SCG中LoadBalancerClient 会调用choose方法来选择合适的endpoint 作为本次RPC发起调用的真实地址,由于是懒加载,只有在有真实流量触发时才会加载创建相关资源;在触发底层的NamedContextFactory#getContext 方法时存在一个全局锁导致,woker线程在该锁上大量等待。
NamedContextFactory#getContext方法存在全局锁
SCG发布时超时报错增多
定制性差,数据流控制耦合
SCG在开发运维过程中已经出现了较多的针对源码改造的场景,如动态路由,路由匹配性能优化等;其设计理念老旧,控制流和数据流混合使用,架构不清晰,如对路由管理操作仍然耦合在filter中,即使引入spring mvc方式管理,依然绑定使用webflux编程范式,同时也无法做到控制流端口独立,存在一定安全风险。
filter中对路由进行管理
理想中的网关
综合业务需求和技术痛点,我们发现理想型的网关应该是这个样子的:
支持海量接口注册,并能够在运行时支持动态添加修改路由信息,具备出色路由匹配性能
编程范式尽可能简单,降低开发人员心智负担,同时最好是开发人员较为熟悉的语言
性能足够好,至少要等同于目前SCG的性能,RT99线和ART较低
稳定性好,无内存泄漏,能够长时间持续稳定运行,发布升级期间要尽可能下游无感
拓展能力强,支持超时定制,多网络协议支持,http,Dubbo等,生态完善
架构设计清晰,数据流与控制流分离,集成UI控制面
开源网关对比
基于以上需求,我们对市面上的常见网关进行了调研,以下几个开源方案对比。
结合当前团队的技术栈,我们倾向于选择Java技术栈的开源产品,唯一可选的只有zuul2 ,但是zuul2路由注册和稳定性方面也不能够满足我们的需求,也没有实现数控分离的架构设计。因此唯有走上自研之路。
通常而言代理网关分为透明代理与非透明代理,其主要区别在于对于流量是否存在侵入性,这里的侵入性主要是指对请求和响应数据的修改;显然API Gateway的定位决定了必然会对流量进行数据调整,常见的调整主要有 添加或者修改head 信息,加密或者解密 query params head ,以及 requestbody 或者responseBody,可以说http请求的每一个部分数据都存在修改的可能性,这要求代理层必须要完全解析数据包信息,而非简单的做一个路由器转发功能。
传统的服务器架构,以reactor架构为主。boss线程和worker线程的明确分工,boss线程负责连接建立创建;worker线程负责已经建立的连接的读写事件监听处理,同时会将部分复杂业务的处理放到独立的线程池中,进而避免worker线程的执行时间过长影响对网络事件处理的及时性;由于网关是IO密集型服务,相对来说计算内容较少,可以不必引入这样的业务线程池;直接基于netty 原生reactor架构实现。
Reactor多线程架构
为了只求极致性能和降低多线程编码的数据竞争,单个请求从接收到转发后端,再到接收后端服务响应,以及最终的回写给client端,这一系列操作被设计为完全闭合在一个workerEventLoop线程中处理;这需要worker线程中执行的IO类型操作全部实现异步非阻塞化,确保worker线程的高速运转;这样的架构和NGINX很类似;我们称之为 thread-per-core模式。
API网关组件架构
数据流控制流分离
数据面板专注于流量代理,不处理任何admin 类请求,控制流监听独立的端口,接收管理指令。
请求上下文封装
新的API网关底层仍然基于Netty,其自带的http协议解析handler可以直接使用。基于netty框架的编程范式,需要在初始化时逐一注册用到的 Handler。
Client到Proxy链路Handler 执行顺序
HttpServerCodec 负责HTTP请求的解析;对于体积较大的Http请求,客户端可能会拆成多个小的数据包进行发送,因此在服务端需要适当的封装拼接,避免收到不完整的http请求;HttpObjectAggregator 负责整个请求的拼装组合。
拿到HTTP请求的全部信息后在业务handler 中进行处理;如果请求体积过大直接抛弃;使用ServerWebExchange 对象封装请求上下文信息,其中包含了client2Proxy的channel, 以及负责处理该channel 的eventLoop 线程等信息,考虑到整个请求的处理过程中可能在不同阶段传递一些拓展信息,引入了getAttributes 方法 用于存储需要传递的数据;此外ServerWebExchange 接口的基本遵循了SCG的设计规范,保证了在迁移业务逻辑时的最小化改动;具体到实现类,可以参考如下代码:
public class DefaultServerWebExchange implements ServerWebExchange {
private final Channel client2ProxyChannel;
private final Channel proxy2ClientChannel;
private final EventLoop executor;
private ServerHttpRequest request;
private ServerHttpResponse response;
private final Map<String, Object> attributes;
}
DefaultServerWebExchange
Client2ProxyHttpHandler作为核心的入口handler 负责将接收到的FullHttpRequest 进行封装和构建ServerWebExchange 对象,其核心逻辑如下。可以看到对于数据读取封装的逻辑较为简单,并没有植入常见的业务逻辑,封装完对象后随即调用 Request filter chain。
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) {
try {
Channel client2ProxyChannel = ctx.channel();
DefaultServerHttpRequest serverHttpRequest = new DefaultServerHttpRequest(fullHttpRequest, client2ProxyChannel);
ServerWebExchange serverWebExchange = new DefaultServerWebExchange(client2ProxyChannel,(EventLoop) ctx.executor(), serverHttpRequest, null);
// request filter chain
this.requestFilterChain.filter(serverWebExchange);
}catch (Throwable t){
log.error("Exception caused before filters!\n {}",ExceptionUtils.getStackTrace(t));
ByteBufHelper.safeRelease(fullHttpRequest);
throw t;
}
}
Client2ProxyHttpHandler 精简后的代码
FilterChain设计
FilterChain可以解决异步请求发送出去后,还没收到响应,但是顺序逻辑已经执行完成的尴尬;例如当我们在上文的。
channelRead0 方法中发起某个鉴权RPC调用时,出于性能考虑只能使用非阻塞的方式,按照netty的非阻塞编码API最终要引入类似如下的 callback 机制,在业务逻辑上在没有收到RPC的响应之前该请求的处理应该“暂停”,等待收到响应时才能继续后续的逻辑执行; 也就是下面代码中的下一步执行逻辑并不能执行,正确的做法是将nextBiz() 方法包裹在 callBack() 方法内,由callBack() 触发后续逻辑的执行;这只是发起一次RPC调用的情况,在实际的的日常研发过程中存在着鉴权,风控,集群限流(Redis)等多次RPC调用,这就导致这样的非阻塞代码编写将异常复杂。
ChannelFuture writeFuture = channel.writeAndFlush(asyncRequest.httpRequest);
writeFuture.addListener(future -> {
if(future.isSuccess()) {
callBack();
}
}
);
nextBiz();
非阻塞调用下的业务逻辑编排
对于这样的复杂场景,采用filterChain模式可以很好的解决;首先RequestFilterChain().filter(serverWebExchange); 后不存在任何逻辑;发起请求时 ,当前filter执行结束,由于此时没有调用chain.filter(exchange); 所以不会继续执行下一个filter,发送请求到下游的逻辑也不会执行;当前请求的处理流程暂时中止,eventloop 线程将切换到其他请求的处理过程上;当收到RPC响应时,chain.filter(exchange) 被执行,之前中断的流程被重新拉起。
public void filter(ServerWebExchange exchange) {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
try {
filter.filter(exchange, chain);
}catch (Throwable e){
log.error("Filter chain unhandle backward exception! Request path {}, FilterClass: {}, exception: {}", exchange.getRequest().getPath(), filter.getClass(), ExceptionUtils.getFullStackTrace(e));
ResponseDecorator.failResponse(exchange,500, "网关内部错误!filter chain exception!");
}
}
}
基于filterChain的调用模式
对于filter的执行需要定义先后顺序,这里参考了SCG的方案,每个filter返回一个order值。不同的地方在于DAG的设计不允许 order值重复,因为在order重复的情况下,很难界定到底哪个Filter 先执行,存在模糊地带,这不是我们期望看到的;DAG中的Filter 执行顺序为order值从小到大,且不允许order值重复。为了易于理解,这里将Filter拆分为了 requestFilter,和responseFilter;分别代表请求的处理阶段 和拿到下游响应阶段,responseFilter 遵循同样的逻辑执行顺序与不可重复性。
public interface GatewayFilter extends Ordered {
void filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
public interface ResponseFilter extends GatewayFilter { }
public interface RequestFilter extends GatewayFilter { }
filter接口设计
路由管理与匹配
以SCG网关注册的路由数量为基准,网关节点的需要支撑的路由规则数量是上万级别的,按照得物目前的业务量,上限不超过5W,为了保证匹配性能,路由规则放在分布式缓存中显然是不合适的,需要保存在节点的内存中。类似于在nginx上配置上万条location 规则,手动维护难度可想而知,即使在配置中心管理起来也很麻烦,所以需要引入独立路由管理模块。
在匹配的效率上也需要进一步优化,SCG的路由匹配策略为普通的循环迭代逐一匹配,时间效率为On,在路由规则膨胀到万级别后,性能急剧拉胯,结合得物的接口规范,新网关采用Hash匹配模式,将匹配效率提升到O1;hash的key为接口的path, 需要强调的是在同一个网关集群中,path是唯一的,这里的path并不等价于业务服务的接口path, 绝大多数时候存在一些剪裁,例如在业务服务的编写的/order/detail接口,在网关实际注册的接口可能为/api/v1/app/order/detail;由于使用了path作为key进行hash匹配。常见的restful 接口显然是不支持的,确切的讲基于path传参数模式的接口均不支持;出于某些历史原因,网关保留了类似nginx 的前缀匹配的支持,但是这部分功能不对外开放。
public class Route implements Ordered {
private final String id;
private final int skipCount;
private final URI uri;
}
route类设计
route的URI字段中包含了,需要路由到的具体服务名,这里也可以称之为host ,route 信息会暂存在 exchange对象的 attributes 属性中, 在后续的loadbalance阶段host信息会被进一步替换为真实的 endpoint。
private Route lookupRoute(ServerWebExchange exchange) {
String path = exchange.getRequest().getPath();
CachingRouteLocator locator = (CachingRouteLocator) routeLocator;
Route exactRoute = pathRouteMap.getOrDefault(path, null);
if (exactRoute != null) {
exchange.getAttributes().put(DAGApplicationConfig.GATEWAY_ROUTE_CACHE, route);
return exactRoute;
}
}
路由匹配逻辑
单线程闭环
为了更好地利用CPU,以及减少不必要的数据竞争,将单个请求的处理全部闭合在一个线程当中;这意味着这个请求的业务逻辑处理,RPC调用,权限验证,限流token获取都将始终由某个固定线程处理。netty中 网络连接被抽象为channel,channel 与eventloop线程的对应关系为 N对1,一个channel 仅能被一个eventloop 线程所处理,这在处理用户请求时没有问题,但是在接收请求完毕向下游转发请求时,我们碰到了一些挑战,下游的连接往往是连接池在管理,连接池的管理是另一组eventLoop线程在负责,为了保持闭环需要将连接池的线程设定为处理当前请求的线程,并且只能是这一个线程;这样一来,默认状态下启动的N个线程(N 与机器核心数相同),分别需要管理一个连接池;thread-per-core 模式的性能已经在nginx开源组件上得到验证。
连接管理优化
为了满足单线程闭环,需要将连接池的管理线程设置为当前的 eventloop 线程,最终我们通过threadlocal 进行线程与连接池的绑定;通常情况下netty自带的连接池 FixedChannelPool 可以满足我们大部分场景下的需求,这样的连接池也是适用于多线程的场景;由于新网关使用thread-per-core模式并将请求处理的全生命周期闭合在单个线程中,所有为了线程安全的额外操作不再必要且存在性能浪费;为此需要对原生连接池做一些优化, 连接的获取和释放简化为对链表结构的简单getFirst , addLast。
对于RPC 而言,无论是HTTP,还是Dubbo,Redis等最终底层都需要用到TCP连接,将构建在TCP连接上的数据解析协议与连接剥离后,我们发现这种纯粹的连接管理是可以复用的,对于连接池而言不需要知道具体连接的用途,只需要维持到特定endpoint的连接稳定即可,那么这里的RPC服务的连接仍然可以放入连接池中进行托管;最终的连接池设计架构图。
AsyncClient设计
对于七层流量而言基本全部都是Http请求,同样在RPC请求中 http协议也占了大多数,考虑到还会存在少量的dubbo, Redis 等协议通信的场景。因此需要抽象出一套异步调用框架来支撑;这样的框架需要具备超时管理,回调执行,错误输出等功能,更重要的是具备协议无关性质, 为了更方便使用需要支持链式调用。
发起一次RPC调用通常可以分为以下几步:
获取目标地址和使用的协议, 目标服务为集群部署时,需要使用loadbalance模块
封装发送的请求,这样的请求在应用层可以具体化为某个Request类,网络层序列化为二进制数据流
出于性能考虑选择非阻塞式发送,发送动作完成后开始计算超时
接收数据响应,由于采用非阻塞模式,这里的发送线程并不会以block的方式等待数据
在超时时间内完成数据处理,或者触发超时导致连接取消或者关闭
AsyncClient 模块内容并不复杂,AsyncClient为抽象类不区分使用的网络协议;ConnectionPool 作为连接的管理者被client所引用,获取连接的key 使用 protocol+ip+port 再适合不过;通常在某个具体的连接初始化阶段就已经确定了该channel 所使用的协议,因此初始化时会直接绑定协议Handler;当协议为HTTP请求时,HttpClientCodec 为HTTP请求的编解码handler;也可以是构建在TCP协议上的 Dubbo, Mysql ,Redis 等协议的handler。
首先对于一个请求的不同执行阶段需要引入状态定位,这里引入了 STATE 枚举:
enum STATE{
INIT,SENDING,SEND,SEND_SUCCESS,FAILED,TIMEOUT,RECEIVED
}
其次在执行过程中设计了 AsyncContext作为信息存储的载体,内部包含request和response信息,作用类似于上文提到的ServerWebExchange;channel资源从连接池中获取,使用完成后需要自动放回。
public class AsyncContext<Req, Resp> implements Cloneable{
STATE state = STATE.INIT;
final Channel usedChannel;
final ChannelPool usedChannelPool;
final EventExecutor executor;
final AsyncClient<Req, Resp> agent;
Req request;
Resp response;
ResponseCallback<Resp> responseCallback;
ExceptionCallback exceptionCallback;
int timeout;
long deadline;
long sendTimestamp;
Promise<Resp> responsePromise;
}
AsyncContext
AsyncClient 封装了基本的网络通信能力,不拘泥于某个固定的协议,可以是Redis, http,Dubbo 等。当将数据写出去之后,该channel的非阻塞调用立即结束,在没有收到响应之前无法对AsyncContext 封装的数据做进一步处理,如何在收到数据时将接收到的响应和之前的请求管理起来这是需要面对的问题,channel 对象 的attr 方法可以用于临时绑定一些信息,以便于上下文切换时传递数据,可以在发送数据时将AsyncContext对象绑定到该channel的某个固定key上。当channel收到响应信息时,在相关的 AsyncClientHandler 里面取出AsyncContext。
public abstract class AsyncClient<Req, Resp> implements Client {
private static final int defaultTimeout = 5000;
private final boolean doTryAgain = false;
private final ChannelPoolManager channelPoolManager = ChannelPoolManager.getChannelPoolManager();
protected static AttributeKey<AsyncRequest> ASYNC_REQUEST_KEY = AttributeKey.valueOf("ASYNC_REQUEST");
public abstract ApplicationProtocol getProtocol();
public AsyncContext<Req, Resp> newRequest(EventExecutor executor, String endpoint, Req request) {
final ChannelPoolKey poolKey = genPoolKey(endpoint);
ChannelPool usedChannelPool = channelPoolManager.acquireChannelPool(executor, poolKey);
return new AsyncContext<>(this,executor,usedChannelPool,request, defaultTimeout, executor.newPromise());
}
public void submitSend(AsyncContext<Req, Resp> asyncContext){
asyncContext.state = AsyncContext.STATE.SENDING;
asyncContext.deadline = asyncContext.timeout + System.currentTimeMillis();
ReferenceCountUtil.retain(asyncContext.request);
Future<Resp> responseFuture = trySend(asyncContext);
responseFuture.addListener((GenericFutureListener<Future<Resp>>) future -> {
if(future.isSuccess()){
ReferenceCountUtil.release(asyncContext.request);
Resp response = future.getNow();
asyncContext.responseCallback.callback(response);
}
});
}
/**
* 尝试从连接池中获取连接并发送请求,若失败返回错误
*/
private Promise<Resp> trySend(AsyncContext<Req, Resp> asyncContext){
Future<Channel> acquireFuture = asyncContext.usedChannelPool.acquire();
asyncContext.responsePromise = asyncContext.executor.newPromise();
acquireFuture.addListener(new GenericFutureListener<Future<Channel>>() {
public void operationComplete(Future<Channel> channelFuture) throws Exception {
sendNow(asyncContext,channelFuture);
}
});
return asyncContext.responsePromise;
}
private void sendNow(AsyncContext<Req, Resp> asyncContext, Future<Channel> acquireFuture){
boolean released = false;
try {
if (acquireFuture.isSuccess()) {
NioSocketChannel channel = (NioSocketChannel) acquireFuture.getNow();
released = true;
assert channel.attr(ASYNC_REQUEST_KEY).get() == null;
asyncContext.usedChannel = channel;
asyncContext.state = AsyncContext.STATE.SEND;
asyncContext.sendTimestamp = System.currentTimeMillis();
channel.attr(ASYNC_REQUEST_KEY).set(asyncContext);
ChannelFuture writeFuture = channel.writeAndFlush(asyncContext.request);
channel.eventLoop().schedule(()-> doTimeout(asyncContext), asyncContext.timeout, TimeUnit.MILLISECONDS);
} else {
asyncContext.responsePromise.setFailure(acquireFuture.cause());
}
} catch (Exception e){
throw new Error("Unexpected Exception.............!");
}finally {
if(!released) {
ReferenceCountUtil.safeRelease(asyncContext.request);
}
}
}
}
public class AsyncClientHandler extends SimpleChannelInboundHandler {
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
AsyncContext asyncContext = ctx.attr(AsyncClient.ASYNC_REQUEST_KEY).get();
try {
asyncContext.state = AsyncContext.STATE.RECEIVED;
asyncContext.releaseChannel();
asyncContext.responsePromise.setSuccess(msg);
}catch (Throwable t){
log.error("Exception raised when set Success callback. Exception \n: {}", ExceptionUtils.getFullStackTrace(t));
ByteBufHelper.safeRelease(msg);
throw t;
}
}
}
AsyncClientHandler
通过上面几个类的封装得到了一个易用使用的 AsyncClient,下面的代码为调用权限系统的案例:
final FullHttpRequest httpRequest = HttpRequestUtil.getDefaultFullHttpRequest(newAuthReq, serviceInstance, "/auth/newCheckSls");
asyncClient.newRequest(exchange.getExecutor(), endPoint,httpRequest)
.timeout(timeout)
.onComplete(response -> {
String checkResultJson = response.content().toString(CharsetUtil.UTF_8);
response.release();
NewAuthResult result = Jsons.parse(checkResultJson,NewAuthResult.class);
TokenResult tokenResult = this.buildTokenResult(result);
String body = exchange.getAttribute(DAGApplicationConfig.REQUEST_BODY);
if (tokenResult.getUserInfoResp() != null) {
UserInfoResp userInfo = tokenResult.getUserInfoResp();
headers.set("userid", userInfo.getUserid() == null ? "" : String.valueOf(userInfo.getUserid()));
headers.set("username", StringUtils.isEmpty(userInfo.getUsername()) ? "" : userInfo.getUsername());
headers.set("name", StringUtils.isEmpty(userInfo.getName()) ? "" : userInfo.getName());
chain.filter(exchange);
} else {
log.error("{},heads: {},response: {}", path, headers, tokenResult);
int code = tokenResult.getCode() != null ? tokenResult.getCode().intValue() : ResultCode.UNAUTHO.code;
ResponseDecorator.failResponse(exchange, code, tokenResult.getMsg());
}
})
.onError(throwable -> {
log.error("Request service {},occur an exception {}",endPoint, throwable);
ResponseDecorator.failResponseWithStatus(exchange,HttpResponseStatus.INTERNAL_SERVER_ERROR,"AuthFilter 验证失败");
})
.sendRequest();
请求超时管理
一个请求的处理时间不能无限期拉长, 超过某个阈值的情况下App的页面会被取消 ,长时间的加载卡顿不如快速报错带来的体验良好;显然网关需要针对接口做超时处理,尤其是在向后端服务发起请求的过程,通常我们会设置一个默认值,例如3秒钟,超过这个时间网关会向请求端回写timeout的失败信息,由于网关下游接入的服务五花八门,可能是RT敏感型的C端业务,也可能是逻辑较重B端服务接口,甚至是存在大量计算的监控大盘接口。这就导致不同接口对超时时间的诉求不一样,因此针对每个接口的超时时间设定应该被独立出来,而不是统一配置成一个值。
asyncClient.newRequest(exchange.getExecutor(), endPoint,httpRequest)
.timeout(timeout)
.onComplete(response -> {
String checkResultJson = response.content().toString(CharsetUtil.UTF_8);
//..........
})
.onError(throwable -> {
log.error("Request service {},occur an exception {}",endPoint, throwable);
ResponseDecorator.failResponseWithStatus(exchange,HttpResponseStatus.INTERNAL_SERVER_ERROR,"AuthFilter 验证失败");
})
.sendRequest();
asyncClient 的链式调用设计了 timeout方法,用于传递超时时间,我们可以通过一个全局Map来配置这样的信息。
Map<String,Integer> 其key为全路径的path 信息,V为设定的超时时间,单位为ms, 至于Map的信息在实际配置过程中如何承载,使用ARK配置或者Mysql 都很容易实现。处于并发安全和性能的极致追求,超时事件的设定和调度最好能够在与当前channel绑定的线程中执行,庆幸的是 EventLoop线程自带schedule 方法。具体来看上文的 AsyncClient 的56行。schedule 方法内部以堆结构的方式实现了对超时时间进行管理,整体性能尚可。
堆外内存管理优化
常见的堆外内存手动管理方式无非是引用计数,不同处理逻辑可能针对 RC (引用计数) 的值做调整,到某个环节的业务逻辑处理后已经不记得当前的引用计数值是多少了,甚至是前面的RC增加了,后面的RC忘记减少了;但换个思路,在数据回写给客户端后我们肯定要把这个请求整个生命周期所申请的堆外内存全部释放掉,堆外内存在回收的时候条件只有一个,就是RC值为0 ,那么在最终的release的时候,我们引入一个safeRelase的思路 , 如果当前的RC>0 就不停的 release ,直至为0;因此只要把这样的逻辑放在netty的最后一个Handler中即可保证内存得到有效释放。
public static void safeRelease(Object msg){
if(msg instanceof ReferenceCounted){
ReferenceCounted ref = (ReferenceCounted) msg;
int refCount = ref.refCnt();
for(int i=0; i<refCount; i++){
ref.release();
}
}
}
响应时间尖刺优化
由于DAG 选择了复用spring 的 loadbalance 模块,但这样一来就会和SCG一样存在启动初期的响应时间尖刺问题;为此我们进一步分析RibbonLoadBalancerClient 的构建过程,发现其用到了NamedContextFactory,该类的 contexts 变量保存了每一个serviceName对应的一个独立context,这种使用模式带来大量的性能浪费。
public abstract class NamedContextFactory<C extends NamedContextFactory.Specification>implements DisposableBean, ApplicationContextAware {
//1. contexts 保存 key -> ApplicationContext 的map
private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>();
//........
}
在实际运行中 RibbonLoadBalancerClient 会调用choose方法来选择合适的endpoint 作为本次RPC发起调用的真实地址;choose 方法执行过程中会触发 getLoadBalancer() 方法执行,可以看到该方法的可以按照传入的serviceId 获取专属于这个服务的LoadBalancer,事实上这样的设计有点多此一举。大部分情况下,每个服务的负载均衡算法都一致的,完全可以复用一个LoadBalancer对象;该方法最终是从spring 容器中获取 LoadBalancer。
class RibbonLoadBalancerClient{
//..........
private SpringClientFactory clientFactory;
public ServiceInstance choose(String serviceId) {
return choose(serviceId, null);
}
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
//.........
}
RibbonLoadBalancerClient
由于是懒加载,实际流量触发下才会执行,因此第一次执行时,RibbonLoadBalancerClient 对象并不存在,需要初始化创建,创建时大量线程并发调用SpringClientFactory#getContext 方法, 锁在同一个对象上,出现大量的RT尖刺。这也解释了为什么SCG网关在发布期间会出现响应时间大幅度抖动的现象。
public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification>{
//............
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
//.........
}
在后期的压测过程中,发现 DAG的线程数量远超预期,基于thread-per-core的架构模式下,过多的线程对性能损害比较大,尤其是当负载上升到较高水位时。上文提到默认情况下,每个服务都会创建独立loadBalanceClient , 而在其内部又会启动独立的线程去同步当前关联的serviceName对应的可用serverList, 网关的特殊性导致需要接入的服务数量极为庞大,进而导致运行一段时间后DAG的线程数量急剧膨胀,对于同步serverList 这样的动作而言,完全可以采用非阻塞的方式从注册中心拉取相关的serverList , 这种模式下单线程足以满足性能要求。
serverList的更新前后架构对比
通过预先初始化的方式以及全局只使用1个context的方式,可以将这里冷启动尖刺消除,改造后的测试结果符合预期。
通过进一步修改优化spring loadbalance serverList 同步机制,降低90%线程数量的使用。
优化前线程数量(725)
优化后线程数量(72)
集群限流改造优化
首先来看DAG 启动后sentinel相关线程,类似的问题,线程数量非常多,需要针对性优化。
Sentinel 线程数
sentinel线程分析优化:
最终优化后的线程数量为4个
sentinel原生限流源码分析如下,进一步分析SphU#entry方法发现其底调用 FlowRuleCheck#passClusterCheck;在passClusterCheck方法中发现底层网络IO调用为阻塞式,;由于该方法的执行线程为workerEventLoop,因此需要使用上文提到的AsyncClient 进行优化。
private void doSentinelFlowControl(ServerWebExchange exchange, GatewayFilterChain chain, String resource){
Entry urlEntry = null;
try {
if (!StringUtil.isEmpty(resource)) {
//1. 检测是否限流
urlEntry = SphU.entry(resource, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
}
//2. 通过,走业务逻辑
chain.filter(exchange);
} catch (BlockException e) {
//3. 拦截,直接返回503
ResponseDecorator.failResponseWithStatus(exchange, HttpResponseStatus.SERVICE_UNAVAILABLE, ResultCode.SERVICE_UNAVAILABLE.message);
} catch (RuntimeException e2) {
Tracer.traceEntry(e2, urlEntry);
log.error(ExceptionUtils.getFullStackTrace(e2));
ResponseDecorator.failResponseWithStatus(exchange, HttpResponseStatus.INTERNAL_SERVER_ERROR,HttpResponseStatus.INTERNAL_SERVER_ERROR.reasonPhrase());
} finally {
if (urlEntry != null) {
urlEntry.exit();
}
ContextUtil.exit();
}
}
SentinelGatewayFilter(sentinel 适配SCG的逻辑)
public class RedisTokenService implements InitializingBean {
private final RedisAsyncClient client = new RedisAsyncClient();
private final RedisChannelPoolKey connectionKey;
public RedisTokenService(String host, int port, String password, int database, boolean ssl){
connectionKey = new RedisChannelPoolKey(String host, int port, String password, int database, boolean ssl);
}
//请求token
public Future<TokenResult> asyncRequestToken(ClusterFlowRule rule){
....
sendMessage(redisReqMsg,this.connectionKey)
}
private Future<TokenResult> sendMessage(RedisMessage requestMessage, EventExecutor executor, RedisChannelPoolKey poolKey){
AsyncRequest<RedisMessage,RedisMessage> request = client.newRequest(executor, poolKey,requestMessage);
DefaultPromise<TokenResult> tokenResultFuture = new DefaultPromise<>(request.getExecutor());
request.timeout(timeout)
.onComplete(response -> {
...
tokenResultFuture.setSuccess(response);
})
.onError(throwable -> {
...
tokenResultFuture.setFailure(throwable);
}).sendRequest();
return tokenResultFuture;
}
}
RedisTokenService
最终的限流Filter代码如下:
public class SentinelGatewayFilter implements RequestFilter {
RedisTokenService tokenService;\
public void filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//当前为 netty NioEventloop 线程
ServerHttpRequest request = exchange.getRequest();
String resource = request.getPath() != null ? request.getPath() : "";
//判断是否有集群限流规则
ClusterFlowRule rule = ClusterFlowManager.getClusterFlowRule(resource);
if (rule != null) {
//异步非阻塞请求token
tokenService.asyncRequestToken(rule,exchange.getExecutor())
.addListener(future -> {
TokenResult tokenResult;
if (future.isSuccess()) {
tokenResult = (TokenResult) future.getNow();
} else {
tokenResult = RedisTokenService.FAIL;
}
if(tokenResult == RedisTokenService.FAIL || tokenResult == RedisTokenService.ERROR){
log.error("Request cluster token failed, will back to local flowRule check");
}
ClusterFlowManager.setTokenResult(rule.getRuleId(), tokenResult);
doSentinelFlowControl(exchange, chain, resource);
});
} else {
doSentinelFlowControl(exchange, chain, resource);
}
}
}
DAG高压表现
wrk -t32 -c1000 -d60s -s param-delay1ms.lua --latency http://a.b.c.d:xxxxx
DAG网关的QPS、实时RT、错误率、CPU、内存监控图;在CPU占用80% 情况下,能够支撑的QPS在4.5W。
DAG网关的QPS、RT 折线图;
DAG在CPU占用80% 情况下,能够支撑的QPS在4.5W,ART 19ms
SCG高压表现
wrk -t32 -c1000 -d60s -s param-delay1ms.lua --latency http://a.b.c.d:xxxxx
SCG网关的QPS、实时RT、错误率、CPU、内存监控图:
SCG网关的QPS、RT 折线图:
SCG在CPU占用95% 情况下,能够支撑的QPS在1.1W,ART 54.1ms
DAG低压表现
wrk -t5 -c20 -d120s -s param-delay1ms.lua --latency http://a.b.c.d:xxxxx
DAG网关的QPS、实时RT、错误率、CPU、内存:
DAG网关的QPS、RT 折线图:
DAG在QPS 1.1W情况下,CPU占用30%,ART 1.56ms
数据对比
结论
满负载情况下,DAG要比SCG的吞吐量高很多,QPS几乎是4倍,RT反而消耗更低,SCG在CPU被打满后,RT表现出现严重性能劣化。DAG的吞吐控制和SCG一样情况下,CPU和RT损耗下降了更多。DAG在最大压力下,内存消耗比较高,达到了75%左右,不过到峰值后,就不再会有大幅变动了。对比压测结果,结论令人欣喜,SCG作为Java生态当前使用最广泛的网关,其性能属于一线水准,DAG的性能达到其4倍以上也是远超意料,这样的结果给与研发同学极大的鼓舞。
安全性提升
完善的接口级路由管理
基于接口注册模式的全新路由上线,包含了接口注册的申请人,申请时间,接口场景备注信息等,接口管理更加严谨规范;结合路由组功能可以方便的查询当前服务的所有对外接口信息,某种程度上具备一定的API查询管理能力;同时为了缓解用户需要检索的接口太多的尴尬,引入了一键收藏功能,大部分时候用户只需要切换到已关注列表即可。
注册接口列表
接口收藏
防渗透能力极大增强
早期的泛化路由,给黑产的渗透带来了极大的想象空间和安全隐患,甚至可以在外网直接访问某些业务的配置信息。
黑产接口渗透
接口注册模式启用后,所有未注册的接口均无法访问,防渗透能力提升一个台阶,同时自动推送异常接口访问信息。
404接口访问异常推送
稳定性增强
内存泄漏问题解决
通过一系列手段改进优化和严格的测试,新网关的内存使用更加稳健,内存增长曲线直接拉平,彻底解决了泄漏问题。
老网关内存增长趋势
新网关内存增长趋势
响应时间尖刺消除
通过预先初始化 & context 共用等手段,去除了运行时并发创建多个context 抢占全局锁的开销,冷启动RT尖刺降低99% ;关于spring load balance 模块的更多优化细节可以参考这篇博客:Spring LoadBalance 存在问题与优化。
压测数据对比
实际生产监控
趋势图上略有差异,但是从非200请求的绝对值上看,这种差异可以忽略, 对比发布期间和非发布期间异常请求的数量,发现基本没有区别,这代表着以往的发布期间的响应时间尖刺基本消除,做到了发布期间业务服务彻底无感知。
1月4日发布期间各节点流量变化
降本增效
资源占用下降50% +
JDK17升级收益
架构演进
技术思考
稳定性把控
团队成长
微信扫码关注该文公众号作者