一文看懂分布式链路监控系统
本文通过阿里的Eagleeye(鹰眼)和开源的Skywalking,从数据模型、数据埋点以及数据存储三个方面介绍分布式链路监控系统的实现细节,其中将重点介绍Skywalking字节码增强的实现方案。
应用级透明
低开销
扩展性和开放性
无论是何种软件系统,可扩展性和开放性都是衡量其质量优劣的重要标准。对于链路监控系统这样的基础服务系统来说,上游业务系统对于链路监控系统来说是透明的,在一个规模较大的企业中,一个基础服务系统往往会承载成千上万个上游业务系统。每个业务系统由不同的团队和开发人员负责,虽然使用的框架和中间件在同一个企业中有大致的规范和要求,但是在各方面还是存在差异的。因此作为一个基础设施,链路监控系统需要具有非常好的可扩展性,除了对企业中常用中间件和框架的支撑外,还要能够方便开发人员针对特殊的业务场景进行定制化的开发。
OpenTracing规范
Trace
表示一整条调用链,包括跨进程、跨线程的所有Segment的集合。
Segment
表示一个进程(JVM)或线程内的所有操作的集合,即包含若干个Span。
Span
1、Entry Span:入栈Span。Segment的入口,一个Segment有且仅有一个Entry Span,比如HTTP或者RPC的入口,或者MQ消费端的入口等。
2、Local Span:通常用于记录一个本地方法的调用。
3、Exit Span:出栈Span。Segment的出口,一个Segment可以有若干个Exit Span,比如HTTP或者RPC的出口,MQ生产端,或者DB、Cache的调用等。
唯一id
关系描述
从【OpenTracing规范】一节的调用链路图中可以看出,Trace、Segment可以作为整个调用链路中的逻辑结构,而Span才是真正串联起整个链路的单元,系统可以通过若干个Span串联起整个调用链路。
在Java中,方法是以入栈、出栈的形式进行调用,那么系统在记录Span的时候就可以通过模拟出栈、入栈的动作来记录Span的调用顺序,不难发现最终一个链路中的所有Span呈现树形关系,那么如何描述这棵Span树?Eagleeye中的设计很巧妙,EagleEye设计了RpcId来区别同一个调用链下多个网络调用的顺序和嵌套层次。如下图所示:
- 0
- 0.1
- 0.1.1
- 0.1.2
- 0.1.2.1
- 0.2
- 0.2.1
- 0.3
- 0.3.1
- 0.3.1.1
- 0.3.2
跨进程传输
编码
阿里Eagleye的埋点方式是直接编的码方式,通过中间件预留的扩展点实现。但是按照我们通常的理解来说,编码对于Dapper提出的扩展性和开放性似乎并不友好,那为什Eagleye么要采用这样的方式?个人认为有以下几点:
1、阿里有中间件的使用规范,不是想用什么就用什么,因此对于埋点的覆盖范围是有限的;
2、阿里有给力的中间件团队专门负责中间件的维护,中间件的埋点对于上层应用来说也是应用级透明的,对于埋点的覆盖是全面的;
3、阿里应用有接入Eagleye监控系统的要求,因此对于可插拔的诉求并没有非常强烈。
从上面几点来说,编码方式的埋点完全可以满足Eagleye的需要,并且直接编码的方式在维护、性能消耗方面也是非常有优势的。
字节码增强
相比于Eagleye,SkyWalking这样开源的分布式链路监控系统,在开源环境下就没有这么好做了。开源环境下面临的问题其实和阿里集团内部的环境正好相反:
1、开源环境下每个开发者使用的中间件可能都不一样,想用什么就用什么,因此对于埋点的覆盖范围几乎是无限的;
2、开源环境下,各种中间件都由不同组织或个人进行维护,甚至开发者还可以进行二次开发,不可能说服他们在代码中加入链路监控的埋点;
3、开源环境下,并不一定要接入链路监控体系,大多数个人开发者由于资源有限或其他原因没有接入链路监控系统的需求。
对Java应用实现字节码增强的方式有Attach和Javaagent两种,本文做一个简单的介绍。
Attach
Attach是一种相对动态的方式,在阿尔萨斯(Arthas)这样的诊断系统中广泛使用,利用JVM提供的Attach API可以实现一个JVM对另一个运行中的JVM的通信。用一个具体的场景举例:我们要实现Attach JVM对一个运行中JVM的监控。如下图所示:
1、Attach JVM利用Attach API获取目标JVM的实例,底层会通过socketFile建立两个JVM间的通信;
2、Attach JVM指定目标JVM需要挂载的agent.jar包,挂载成功后会执行agent包中的agentmain方法,此时就可以对目标JVM中类的字节码进行修改;
3、Attach JVM通过Socket向目标JVM发送命令,目标JVM收到后会进行响应,以达到监控的目的。
Javaagent
Javaagent大家应该相对熟悉,他的启动方式是在启动命令中加入javaagent参数,指定需要挂载的agent:
java -javaagent:/path/agent.jar=key1=value1,key2=value2 -jar myJar.jar
1、目标JVM通过javaagent参数启动后找到指定的agent,执行agent的premain方法;
2、agent中通过JVM暴露的接口添加一个Transformer,顾名思义它可以Transform字节码;
3、目标JVM在类加载的时候会触发JVM内置的事件,回调Transformer以实现字节码的增强。
字节码增强类库
Byte Buddy和cglib有较为出色的性能得益于它们底层都是基于ASM构建,如果将ASM也加入对比那么它的性能一定是最高的。但是用过ASM的同学虽然不一定能感受到它的高性能,但一定能感受到它噩梦般的开发体验:
mv.visitFieldInsn(GETSTATIC, "java/lang/System", "out", "Ljava/io/PrintStream;");
mv.visitLdcInsn("begin of sayhello().");
mv.visitMethodInsn(INVOKEVIRTUAL, "java/io/PrintStream", "println", "(Ljava/lang/String;)V", false);
Skywalking案例分析
Skywalking为开发者提供了简单易用的插件接口,对于开发者来说不需要知道怎么增强方法的字节码,只需要关心以下几点:
要增强哪个类的哪个方法?
Skywalking提供了ClassMatch,支持各种类、方法的匹配方式。包括类名、前缀、正则、注解等方式的匹配,除此之外还提供了与、或、非逻辑链接,以支持用户通过各种方式精确定位到一个具体的方法。我们看一个插件中的代码:
需要添加/修改什么逻辑?
知道了需要增强哪个类的哪个方法,那下一步就是如何增强。Java中的方法可以分为静态方法、实例方法和构造方法三类方法,Skywalking对于这三种方法的增强逻辑为用户提供了不同的扩展点:
public interface InstanceMethodsAroundInterceptor {
// 方法执行前置扩展点
void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable;
// 方法执行后置扩展点
Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable;
// 方法抛出异常时扩展点
void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t);
}
假设我们有一个Controller,里面只有一个sayHello方法返回"Hello",经过Skywalking增强后,反编译一下它被增强后的字节码文件:
1、Skywalking在其中插入了一个名为_$EnhancedClassField_ws的字段,开发者在某些场合可以合理利用该字段存储一些信息。比如存储Spring MVC中Controller的跟路径,或者Jedis、HttpClient链接中对端信息等。
2、原来的syHello方法名被修改了但仍保存下来,并且新生成了一个增强后的sayHello方法,静态代码块里将经过字节码增强后的sayHello方法存入缓存字段。
增强的前置条件是什么?
在某些时候,并不是只要引入了对应插件就一定会对相关的代码进行字节码增强。比如我们想对Spring MVC的Controller进行埋点,我们使用的是Spring 4.x版本,但是插件却是 5.x 版本的,如果直接对源码进行增强可能会因为版本的差别带来意料之外的问题。Skywalking提供了一种witness机制,简单来说就是当我们的代码中存在指定的类或方式时,当前插件才会进行字节码增强。比如Spring 4.x版本中需要witness这两个类:
介绍了Skywalking的插件模型后,下面从Javaagent的入口premain开始介绍下主要的流程:
1、从指定的目录加载所有插件到内存中;
2、构建Byte Buddy核心的AgentBuilder插桩到JVM的Instrumentation API上,包括需要增强哪些类以及核心的增强逻辑Transformer。
private static class Transformer implements AgentBuilder.Transformer {
private PluginFinder pluginFinder;
Transformer(PluginFinder pluginFinder) {
this.pluginFinder = pluginFinder;
}
/**
* 这个方法在类加载的过程中会由JVM调用(Byte Buddy做了封装)
* @param builder 原始类的字节码构建器
* @param typeDescription 类描述信息
* @param classLoader 这个类的类加载器
* @param module jdk9中模块信息
* @return 修改后的类的字节码构建器
*/
@Override
public DynamicType.Builder > transform(final DynamicType.Builder > builder,
final TypeDescription typeDescription,
final ClassLoader classLoader,
final JavaModule module) {
LoadedLibraryCollector.registerURLClassLoader(classLoader);
// 根据类信息找到针对这个类进行字节码增强的插件,可能有多个
List<AbstractClassEnhancePluginDefine> pluginDefines = pluginFinder.find(typeDescription);
if (pluginDefines.size() > 0) {
DynamicType.Builder > newBuilder = builder;
EnhanceContext context = new EnhanceContext();
for (AbstractClassEnhancePluginDefine define : pluginDefines) {
// 调用插件的define方法得到新的字节码
DynamicType.Builder > possibleNewBuilder = define.define(
typeDescription, newBuilder, classLoader, context);
if (possibleNewBuilder != null) {
newBuilder = possibleNewBuilder;
}
}
// 返回增强后的字节码给JVM,完成字节码增强
return newBuilder;
}
return builder;
}
}
public abstract class AbstractClassEnhancePluginDefine {
public DynamicType.Builder<?> define(TypeDescription typeDescription, DynamicType.Builder<?> builder,
ClassLoader classLoader, EnhanceContext context) throws PluginException {
// witness机制
WitnessFinder finder = WitnessFinder.INSTANCE;
//通过类加载器找witness类,没有就直接返回,不进行字节码的改造
String[] witnessClasses = witnessClasses();
if (witnessClasses != null) {
for (String witnessClass : witnessClasses) {
if (!finder.exist(witnessClass, classLoader)) {
return null;
}
}
}
//通过类加载器找witness方法,没有就直接返回,不进行字节码的改造
List<WitnessMethod> witnessMethods = witnessMethods();
if (!CollectionUtil.isEmpty(witnessMethods)) {
for (WitnessMethod witnessMethod : witnessMethods) {
if (!finder.exist(witnessMethod, classLoader)) {
return null;
}
}
}
// enhance开始修改字节码
DynamicType.Builder<?> newClassBuilder = this.enhance(typeDescription, builder, classLoader, context);
// 修改完成,返回新的字节码
context.initializationStageCompleted();
return newClassBuilder;
}
protected DynamicType.Builder<?> enhance(TypeDescription typeDescription, DynamicType.Builder<?> newClassBuilder,
ClassLoader classLoader, EnhanceContext context) throws PluginException {
// 增强静态方法
newClassBuilder = this.enhanceClass(typeDescription, newClassBuilder, classLoader);
// 增强实例方法& 构造方法
newClassBuilder = this.enhanceInstance(typeDescription, newClassBuilder, classLoader, context);
return newClassBuilder;
}
}
public abstract class ClassEnhancePluginDefine extends AbstractClassEnhancePluginDefine {
protected DynamicType.Builder<?> enhanceInstance(TypeDescription typeDescription,
DynamicType.Builder<?> newClassBuilder, ClassLoader classLoader,
EnhanceContext context) throws PluginException {
// 获取插件定义的构造方法拦截点ConstructorInterceptPoint
ConstructorInterceptPoint[] constructorInterceptPoints = getConstructorsInterceptPoints();
// 获取插件定义的实例方法拦截点InstanceMethodsInterceptPoint
InstanceMethodsInterceptPoint[] instanceMethodsInterceptPoints = getInstanceMethodsInterceptPoints();
String enhanceOriginClassName = typeDescription.getTypeName();
// 非空校验
boolean existedConstructorInterceptPoint = false;
if (constructorInterceptPoints != null && constructorInterceptPoints.length > 0) {
existedConstructorInterceptPoint = true;
}
boolean existedMethodsInterceptPoints = false;
if (instanceMethodsInterceptPoints != null && instanceMethodsInterceptPoints.length > 0) {
existedMethodsInterceptPoints = true;
}
if (!existedConstructorInterceptPoint && !existedMethodsInterceptPoints) {
return newClassBuilder;
}
// 这里就是之前提到的让类实现EnhancedInstance接口,并添加_$EnhancedClassField_ws字段
if (!typeDescription.isAssignableTo(EnhancedInstance.class)) {
if (!context.isObjectExtended()) {
// Object类型、private volatie修饰符、提供方法进行访问
newClassBuilder = newClassBuilder.defineField(
"_$EnhancedClassField_ws", Object.class, ACC_PRIVATE | ACC_VOLATILE)
.implement(EnhancedInstance.class)
.intercept(FieldAccessor.ofField("_$EnhancedClassField_ws"));
context.extendObjectCompleted();
}
}
// 构造方法增强
if (existedConstructorInterceptPoint) {
for (ConstructorInterceptPoint constructorInterceptPoint : constructorInterceptPoints) {
// jdk核心类
if (isBootstrapInstrumentation()) {
newClassBuilder = newClassBuilder.constructor(constructorInterceptPoint.getConstructorMatcher())
.intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()
.to(BootstrapInstrumentBoost
.forInternalDelegateClass(constructorInterceptPoint
// 非jdk核心类 .getConstructorInterceptor()))));
} else {
// 找到对应的构造方法,并通过插件自定义的InstanceConstructorInterceptor进行增强
newClassBuilder = newClassBuilder.constructor(constructorInterceptPoint.getConstructorMatcher())
.intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()
.to(new ConstructorInter(constructorInterceptPoint
.getConstructorInterceptor(), classLoader))));
}
}
}
// 实例方法增强
if (existedMethodsInterceptPoints) {
for (InstanceMethodsInterceptPoint instanceMethodsInterceptPoint : instanceMethodsInterceptPoints) {
// 找到插件自定义的实例方法拦截器InstanceMethodsAroundInterceptor
String interceptor = instanceMethodsInterceptPoint.getMethodsInterceptor();
// 这里在插件自定义的匹配条件上加了一个【不为静态方法】的条件
ElementMatcher.Junction<MethodDescription> junction = not(isStatic()).and(instanceMethodsInterceptPoint.getMethodsMatcher());
// 需要重写入参
if (instanceMethodsInterceptPoint.isOverrideArgs()) {
// jdk核心类
if (isBootstrapInstrumentation()) {
newClassBuilder = newClassBuilder.method(junction)
.intercept(MethodDelegation.withDefaultConfiguration()
.withBinders(Morph.Binder.install(OverrideCallable.class))
.to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
// 非jdk核心类
} else {
newClassBuilder = newClassBuilder.method(junction)
.intercept(MethodDelegation.withDefaultConfiguration()
.withBinders(Morph.Binder.install(OverrideCallable.class))
.to(new InstMethodsInterWithOverrideArgs(interceptor, classLoader)));
}
// 不需要重写入参
} else {
// jdk核心类
if (isBootstrapInstrumentation()) {
newClassBuilder = newClassBuilder.method(junction)
.intercept(MethodDelegation.withDefaultConfiguration()
.to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
// 非jdk核心类
} else {
// 找到对应的实例方法,并通过插件自定义的InstanceMethodsAroundInterceptor进行增强
newClassBuilder = newClassBuilder.method(junction)
.intercept(MethodDelegation.withDefaultConfiguration()
.to(new InstMethodsInter(interceptor, classLoader)));
}
}
}
}
return newClassBuilder;
}
}
用户通过方法拦截器实现增强逻辑,但是它是面向用户的,并不能直接用来进行字节码增强,Skywalking加了一个中间层来连接用户逻辑和Byte Buddy类库。上述代码中的XXXInter便是中间层,比如针对实例方法的InstMethodsInter:
public class InstMethodsInter {
private static final ILog LOGGER = LogManager.getLogger(InstMethodsInter.class);
// 用户在插件中定义的实例方法拦截器
private InstanceMethodsAroundInterceptor interceptor;
public InstMethodsInter(String instanceMethodsAroundInterceptorClassName, ClassLoader classLoader) {
try {
// 加载用户在插件中定义的实例方法拦截器
interceptor = InterceptorInstanceLoader.load(instanceMethodsAroundInterceptorClassName, classLoader);
} catch (Throwable t) {
throw new PluginException("Can't create InstanceMethodsAroundInterceptor.", t);
}
}
/**
* 当执行被增强方法时,会执行该intercept方法
*
* @param obj 实例对象(this)
* @param allArguments 方法入参
* @param method 参数描述
* @param zuper 原方法调用的句柄
* @param method 被增强后的方法的引用
* @return 方法返回值
*/
public Object intercept(@This Object obj, @AllArguments Object[] allArguments, @SuperCall Callable<?> zuper,
@Origin Method method) throws Throwable {
EnhancedInstance targetObject = (EnhancedInstance) obj;
MethodInterceptResult result = new MethodInterceptResult();
try {
// 拦截器前置逻辑
interceptor.beforeMethod(targetObject, method, allArguments, method.getParameterTypes(), result);
} catch (Throwable t) {
LOGGER.error(t, "class[{}] before method[{}] intercept failure", obj.getClass(), method.getName());
}
Object ret = null;
try {
// 是否中断方法执行
if (!result.isContinue()) {
ret = result._ret();
} else {
// 执行原方法
ret = zuper.call();
// 为什么不能走method.invoke?因为method已经是被增强后方法,调用就死循环了!
// 可以回到之前的字节码文件查看原因,看一下该intercept执行的时机
}
} catch (Throwable t) {
try {
// 拦截器异常时逻辑
interceptor.handleMethodException(targetObject, method, allArguments, method.getParameterTypes(), t);
} catch (Throwable t2) {
LOGGER.error(t2, "class[{}] handle method[{}] exception failure", obj.getClass(), method.getName());
}
throw t;
} finally {
try {
// 拦截器后置逻辑
ret = interceptor.afterMethod(targetObject, method, allArguments, method.getParameterTypes(), ret);
} catch (Throwable t) {
LOGGER.error(t, "class[{}] after method[{}] intercept failure", obj.getClass(), method.getName());
}
}
return ret;
}
}
存储
鹰眼采用并发环形队列存储Trace数据,如下图所示:
Skywalking在实现上有所区别,采用分区的QueueBuffer存储Trace数据,多个消费线程通过Driver平均分配到各个QueueBuffer上进行数据消费:
有趣的原子下标
普通的Oject数组是无法支持并发的,但只要保证每个线程获取下标的过程是原子的,即可保证数组的线程安全。这需要保证:
1、多线程获取的下标是依次递增的,从0开始到数组容量-1;
2、当某个线程获取的下标超过数组容量,需要从0开始重新获取。
// 提供原子下标的类
public class AtomicRangeInteger {
// JDK提供的原子数组
private AtomicIntegerArray values;
// 固定值15
private static final int VALUE_OFFSET = 15;
// 数组开始下标,固定为0
private int startValue;
// 数组最后一个元素的下标,固定为数组的最大长度-1
private int endValue;
public AtomicRangeInteger(int startValue, int maxValue) {
// 创建一个长度为31的原子数组
this.values = new AtomicIntegerArray(31);
// 将第15位设置为初始值0
this.values.set(VALUE_OFFSET, startValue);
this.startValue = startValue;
this.endValue = maxValue - 1;
}
// 核心方法,获取数组的下一个下标
public final int getAndIncrement() {
int next;
do {
// 原子递增
next = this.values.incrementAndGet(VALUE_OFFSET);
// 如果超过了数组范围,CAS重制到0
if (next > endValue && this.values.compareAndSet(VALUE_OFFSET, next, startValue)) {
return endValue;
}
} while (next > endValue);
return next - 1;
}
}
public class AtomicRangeInteger {
private AtomicInteger value;
private int startValue;
private int endValue;
public AtomicRangeInteger(int startValue, int maxValue) {
this.value = new AtomicInteger(startValue);
this.startValue = startValue;
this.endValue = maxValue - 1;
}
public final int getAndIncrement() {
int current;
int next;
do {
// 获取当前下标
current = this.value.get();
// 如果超过最大范围则从0开始
next = current >= this.endValue ? this.startValue : current + 1;
// CAS更新下标,失败则循环重试
} while (!this.value.compareAndSet(current, next));
return current;
}
}
public class AtomicRangeInteger extends Number implements Serializable {
// 用原子整型替代V1版本的原子数组
private AtomicInteger value;
private int startValue;
private int endValue;
public AtomicRangeInteger(int startValue, int maxValue) {
this.value = new AtomicInteger(startValue);
this.startValue = startValue;
this.endValue = maxValue - 1;
}
public final int getAndIncrement() {
int next;
do {
next = this.value.incrementAndGet();
if (next > endValue && this.value.compareAndSet(next, startValue)) {
return endValue;
}
}
while (next > endValue);
return next - 1;
}
}
public class AtomicRangeInteger {
private AtomicLong value;
private int mask;
public AtomicRangeInteger(int startValue, int maxValue) {
this.value = new AtomicLong(startValue);
this.mask = maxValue - 1;
}
public final int getAndIncrement() {
return (int)(value.incrementAndGet() % mask);
}
}
Skywalking官方数据(数组大小100):
自己在mac上测试的数据(数组大小100):
自己在mac上测试的数据(数组大小128):
传输
从整体上来看,Skywalking采取了埋点和中间件代码分离的方式,在某种意义上实现了应用级透明,但是在后期维护的过程中中间件版本的升级需要配合插件版本的升级,在维护方面带来了一些问题。而Eagleeye编码方式的埋点由中间件团队维护,对于上层的应用也是透明的,更加适合阿里集团内部的环境。
往期推荐
畅聊云栖
2022云栖大会一起见证科技创新,一起讨论云栖话题。
点击阅读原文查看详情。
微信扫码关注该文公众号作者