Fury:一个基于JIT动态编译的高性能多语言原生序列化框架
引言
Fury是什么
支持主流编程语言如Java/Python/C++/Golang,其它语言可轻易扩展; 多语言/跨语言自动序列化任意对象,无需创建IDL文件、手动编译schema生成代码以及将对象转换为中间格式; 多语言/跨语言自动序列化共享引用和循环引用,用户只需要关心对象,不需要关心数据重复或者递归错误; 基于JIT动态编译技术在运行时自动生成序列化代码优化性能,增加方法内联、代码缓存和死代码消除,减少虚方法调用/条件分支/Hash查找/元数据写入等,提供相比其它序列化框架20~200倍以上的性能; Zero-Copy序列化支持,支持Out of band序列化协议,支持堆外内存读写; 提供缓存友好的二进制随机访问行存格式,支持跳过序列化和部分序列化,并能和列存自动互转;
无缝替代JDK/Kryo/Hessian等Java序列化框架,无需修改任何代码,同时提供相比Kryo 20倍以上的性能,相比Hessian100倍以上的性能,相比JDK自带序列化200倍以上的性能,可以大幅提升高性能场景RPC调用和对象持久化效率; 支持共享引用和循环引用的Golang序列化框架; 支持对象自动序列化的Golang序列化框架;
如何使用Fury
跨语言序列化自定义类型 跨语言序列化包含循环引用的自定义类型 跨语言零拷贝序列化 Drop-in替代Kryo/Hession/JDK序列化 通过Fury Format避免序列化
序列化自定义类型
Java序列化示例
import com.google.common.collect.*;
import io.fury.*;
import java.util.*;
public class CustomObjectExample {
public static class SomeClass1 {
Object f1;
Map<Byte, Integer> f2;
}
public static class SomeClass2 {
Object f1;
String f2;
List<Object> f3;
Map<Byte, Integer> f4;
Byte f5;
Short f6;
Integer f7;
Long f8;
Float f9;
Double f10;
short[] f11;
List<Short> f12;
}
public static Object createObject() {
SomeClass1 obj1 = new SomeClass1();
obj1.f1 = true;
obj1.f2 = ImmutableMap.of((byte) -1, 2);
SomeClass2 obj = new SomeClass2();
obj.f1 = obj1;
obj.f2 = "abc";
obj.f3 = Arrays.asList("abc", "abc");
obj.f4 = ImmutableMap.of((byte) 1, 2);
obj.f5 = Byte.MAX_VALUE;
obj.f6 = Short.MAX_VALUE;
obj.f7 = Integer.MAX_VALUE;
obj.f8 = Long.MAX_VALUE;
obj.f9 = 1.0f / 2;
obj.f10 = 1 / 3.0;
obj.f11 = new short[] {(short) 1, (short) 2};
obj.f12 = ImmutableList.of((short) -1, (short) 4);
return obj;
}
}
public class CustomObjectExample {
// mvn exec:java -Dexec.mainClass="io.fury.examples.CustomObjectExample"
public static void main(String[] args) {
// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例
Fury fury = Fury.builder().withLanguage(Language.JAVA)
.withReferenceTracking(false)
.withClassRegistrationRequired(false)
.build();
byte[] bytes = fury.serialize(createObject());
System.out.println(fury.deserialize(bytes));;
}
}
public class CustomObjectExample {
// mvn exec:java -Dexec.mainClass="io.fury.examples.CustomObjectExample"
public static void main(String[] args) {
// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例
Fury fury = Fury.builder().withLanguage(Language.XLANG)
.withReferenceTracking(false).build();
fury.register(SomeClass1.class, "example.SomeClass1");
fury.register(SomeClass2.class, "example.SomeClass2");
byte[] bytes = fury.serialize(createObject());
// bytes can be data serialized by other languages.
System.out.println(fury.deserialize(bytes));;
}
}
Python序列化示例
from dataclasses import dataclass
from typing import List, Dict
import pyfury
@dataclass
class SomeClass2:
f1: Any = None
f2: str = None
f3: List[str] = None
f4: Dict[pyfury.Int8Type, pyfury.Int32Type] = None
f5: pyfury.Int8Type = None
f6: pyfury.Int16Type = None
f7: pyfury.Int32Type = None
# int类型默认会按照long类型进行序列化,如果对端是更加narrow的类型,
# 需要使用pyfury.Int32Type等进行标注
f8: int = None # 也可以使用pyfury.Int64Type进行标注
f9: pyfury.Float32Type = None
f10: float = None # 也可以使用pyfury.Float64Type进行标注
f11: pyfury.Int16ArrayType = None
f12: List[pyfury.Int16Type] = None
@dataclass
class SomeClass1:
f1: Any
f2: Dict[pyfury.Int8Type, pyfury.Int32Type]
if __name__ == "__main__":
fury_ = pyfury.Fury(reference_tracking=False)
"example.SomeClass1")
"example.SomeClass2")
obj2 = SomeClass2(f1=True, f2={-1: 2})
obj1 = SomeClass1(
f1=obj2,
f2="abc",
f3=["abc", "abc"],
f4={1: 2},
f5=2 ** 7 - 1,
f6=2 ** 15 - 1,
f7=2 ** 31 - 1,
f8=2 ** 63 - 1,
f9=1.0 / 2,
f10=1 / 3.0,
f11=array.array("h", [1, 2]),
f12=[-1, 4],
)
data = fury_.serialize(obj)
# bytes can be data serialized by other languages.
print(fury_.deserialize(data))
GoLang序列化示例
package main
import "code.alipay.com/ray-project/fury/go/fury"
import "fmt"
func main() {
type SomeClass1 struct {
F1 interface{}
F2 string
F3 []interface{}
F4 map[int8]int32
F5 int8
F6 int16
F7 int32
F8 int64
F9 float32
F10 float64
F11 []int16
F12 fury.Int16Slice
}
type SomeClas2 struct {
F1 interface{}
F2 map[int8]int32
}
fury_ := fury.NewFury(false)
if err := fury_.RegisterTagType("example.SomeClass1", SomeClass1{}); err != nil {
panic(err)
}
if err := fury_.RegisterTagType("example.SomeClass2", SomeClass2{}); err != nil {
panic(err)
}
obj2 := &SomeClass2{}
obj2.F1 = true
obj2.F2 = map[int8]int32{-1: 2}
obj := &SomeClass1{}
obj.F1 = obj2
obj.F2 = "abc"
obj.F3 = []interface{}{"abc", "abc"}
f4 := map[int8]int32{1: 2}
obj.F4 = f4
obj.F5 = fury.MaxInt8
obj.F6 = fury.MaxInt16
obj.F7 = fury.MaxInt32
obj.F8 = fury.MaxInt64
obj.F9 = 1.0 / 2
obj.F10 = 1 / 3.0
obj.F11 = []int16{1, 2}
obj.F12 = []int16{-1, 4}
bytes, err := fury_.Marshal(value)
if err != nil {
}
var newValue interface{}
// bytes can be data serialized by other languages.
if err := fury_.Unmarshal(bytes, &newValue); err != nil {
panic(err)
}
fmt.Println(newValue)
}
序列化共享&循环引用
Java序列化示例
import com.google.common.collect.ImmutableMap;
import io.fury.*;
import java.util.Map;
public class ReferenceExample {
public static class SomeClass {
SomeClass f1;
Map<String, String> f2;
Map<String, String> f3;
}
public static Object createObject() {
SomeClass obj = new SomeClass();
obj.f1 = obj;
obj.f2 = ImmutableMap.of("k1", "v1", "k2", "v2");
obj.f3 = obj.f2;
return obj;
}
}
public class ReferenceExample {
// mvn exec:java -Dexec.mainClass="io.fury.examples.ReferenceExample"
public static void main(String[] args) {
// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例
Fury fury = Fury.builder().withLanguage(Language.JAVA)
.withReferenceTracking(true)
.withClassRegistrationRequired(false)
.build();
byte[] bytes = fury.serialize(createObject());
System.out.println(fury.deserialize(bytes));;
}
}
public class ReferenceExample {
// mvn exec:java -Dexec.mainClass="io.fury.examples.ReferenceExample"
public static void main(String[] args) {
// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例
Fury fury = Fury.builder().withLanguage(Language.XLANG)
.withReferenceTracking(true).build();
fury.register(SomeClass.class, "example.SomeClass");
byte[] bytes = fury.serialize(createObject());
// bytes can be data serialized by other languages.
System.out.println(fury.deserialize(bytes));;
}
}
Python序列化示例
from typing import Dict
import pyfury
class SomeClass:
f1: "SomeClass"
f2: Dict[str, str]
f3: Dict[str, str]
if __name__ == "__main__":
fury_ = pyfury.Fury(reference_tracking=True)
fury_.register_class(SomeClass, "example.SomeClass")
obj = SomeClass()
obj.f2 = {"k1": "v1", "k2": "v2"}
obj.f1, obj.f3 = obj, obj.f2
data = fury_.serialize(obj)
# bytes can be data serialized by other languages.
print(fury_.deserialize(data))
Golang序列化示例
package main
import "code.alipay.com/ray-project/fury/go/fury"
import "fmt"
func main() {
type SomeClass struct {
F1 *SomeClass
F2 map[string]string
F3 map[string]string
}
fury_ := fury.NewFury(true)
if err := fury_.RegisterTagType("example.SomeClass", SomeClass{}); err != nil {
panic(err)
}
value := &SomeClass{F2: map[string]string{"k1": "v1", "k2": "v2"}}
value.F3 = value.F2
value.F1 = value
bytes, err := fury_.Marshal(value)
if err != nil {
}
var newValue interface{}
// bytes can be data serialized by other languages.
if err := fury_.Unmarshal(bytes, &newValue); err != nil {
panic(err)
}
fmt.Println(newValue)
}
Zero-Copy序列化
Java序列化示例
import io.fury.*;
import io.fury.serializers.BufferObject;
import io.fury.memory.MemoryBuffer;
import java.util.*;
import java.util.stream.Collectors;
public class ZeroCopyExample {
// mvn exec:java -Dexec.mainClass="io.fury.examples.ZeroCopyExample"
public static void main(String[] args) {
// Fury应该在多个对象序列化之间复用,不要每次创建新的Fury实例
Fury fury = Fury.builder()
.withLanguage(Language.JAVA)
.withClassRegistrationRequired(false)
.build();
List<Object> list = Arrays.asList("str", new byte[1000], new int[100], new double[100]);
Collection<BufferObject> bufferObjects = new ArrayList<>();
byte[] bytes = fury.serialize(list, e -> !bufferObjects.add(e));
List<MemoryBuffer> buffers =
bufferObjects.stream().map(BufferObject::toBuffer).collect(Collectors.toList());
System.out.println(fury.deserialize(bytes, buffers));
}
}
跨语言序列化:
import io.fury.*;
import io.fury.serializers.BufferObject;
import io.fury.memory.MemoryBuffer;
import java.util.*;
import java.util.stream.Collectors;
public class ZeroCopyExample {
// mvn exec:java -Dexec.mainClass="io.fury.examples.ZeroCopyExample"
public static void main(String[] args) {
Fury fury = Fury.builder().withLanguage(Language.XLANG).build();
List<Object> list = Arrays.asList("str", new byte[1000], new int[100], new double[100]);
Collection<BufferObject> bufferObjects = new ArrayList<>();
byte[] bytes = fury.serialize(list, e -> !bufferObjects.add(e));
// bytes can be data serialized by other languages.
List<MemoryBuffer> buffers =
bufferObjects.stream().map(BufferObject::toBuffer).collect(Collectors.toList());
System.out.println(fury.deserialize(bytes, buffers));
}
}
Python序列化示例
import array
import pyfury
import numpy as np
if __name__ == "__main__":
fury_ = pyfury.Fury()
list_ = ["str", bytes(bytearray(1000)),
array.array("i", range(100)), np.full(100, 0.0, dtype=np.double)]
serialized_objects = []
data = fury_.serialize(list_, buffer_callback=serialized_objects.append)
buffers = [o.to_buffer() for o in serialized_objects]
# bytes can be data serialized by other languages.
print(fury_.deserialize(data, buffers=buffers))
Golang序列化示例
package main
import "code.alipay.com/ray-project/fury/go/fury"
import "fmt"
func main() {
fury := fury.NewFury(true)
// Golang版本暂不支持其他基本类型slice的zero-copy
list := []interface{}{"str", make([]byte, 1000)}
buf := fury.NewByteBuffer(nil)
var serializedObjects []fury.SerializedObject
fury.Serialize(buf, list, func(o fury.SerializedObject) bool {
serializedObjects = append(serializedObjects, o)
return false
})
var newList []interface{}
var buffers []*fury.ByteBuffer
for _, o := range serializedObjects {
buffers = append(buffers, o.ToBuffer())
}
err := fury.Deserialize(buf, &newList, buffers)
fmt.Println(newList)
Drop-in替换Kryo/Hession
import io.fury.Fury;
import java.util.List;
import java.util.Arrays;
public class Example {
public static void main(String[] args) {
SomeClass object = new SomeClass();
// Fury实例应该在序列化多个对象之间复用,不要每次创建新的实例
{
Fury fury = Fury.builder()
.withLanguage(Language.JAVA)
// 设置为true可以避免反序列化未注册的非内置类型,
// 避免安全漏洞
.withClassRegistrationRequired(false)
.withReferenceTracking(true).build();
// 注册类型可以减少classname的序列化,不是强制要求
// fury.register(SomeClass.class);
byte[] bytes = fury.serialize(object);
System.out.println(fury.deserialize(bytes));
}
{
ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA)
.withReferenceTracking(true)
.withClassRegistrationRequired(false)
.buildThreadSafeFury();
byte[] bytes = fury.serialize(object);
System.out.println(fury.deserialize(bytes));
}
{
ThreadSafeFury fury = new ThreadSafeFury(() -> {
Fury fury = Fury.builder()
.withLanguage(Language.JAVA)
.withClassRegistrationRequired(false)
.withReferenceTracking(true).build();
// 注册类型可以减少classname的序列化
fury.register(SomeClass.class);
return fury;
});
byte[] bytes = fury.serialize(object);
System.out.println(fury.deserialize(bytes));
}
}
}
通过Fury Format避免序列化
减少Java GC overhead。由于避免了反序列化,因此不会创建对象,从而避免了GC问题。 避免Python反序列化。Python性能一直很慢,因此在跨语言序列化时,可以在Java/C++侧把对象序列化成Row-Format,然后Python侧直接使用该数据计算,这样就避免了Python反序列化的昂贵开销。同时由于Python的动态性,Fury的BinaryRow/BinaryArrays实现了_getattr__/__getitem__/slice/和其它special methods,保证了行为跟python pojo/list/object的一致性,用户没有任何感知。 缓存友好,数据密集存储。
Python示例
class Bar:
f1: str
f2: List[pa.int64]
class Foo:
f1: pa.int32
f2: List[pa.int32]
f3: Dict[str, pa.int32]
f4: List[Bar]
encoder = pyfury.encoder(Foo)
foo = Foo(f1=10, f2=list(range(1000_000)),
f3={f"k{i}": i for i in range(1000_000)},
f4=[Bar(f1=f"s{i}", f2=list(range(10))) for i in range(1000_000)])
binary: bytes = encoder.to_row(foo).to_bytes()
print(f"start: {datetime.datetime.now()}")
foo_row = pyfury.RowData(encoder.schema, binary)
print(foo_row.f2[100000], foo_row.f4[100000].f1, foo_row.f4[200000].f2[5])
print(f"end: {datetime.datetime.now()}")
binary = pickle.dumps(foo)
print(f"pickle start: {datetime.datetime.now()}")
new_foo = pickle.loads(binary)
print(new_foo.f2[100000], new_foo.f4[100000].f1, new_foo.f4[200000].f2[5])
print(f"pickle end: {datetime.datetime.now()}")
Java示例
public class Bar {
String f1;
List<Long> f2;
}
public class Foo {
int f1;
List<Integer> f2;
Map<String, Integer> f3;
List<Bar> f4;
}
Encoder<Foo> encoder = Encoders.rowEncoder(Foo.class);
BinaryRow binaryRow = encoder.toRow(foo); // 该数据可以被Python零拷贝解析
Foo newFoo = encoder.fromRow(binaryRow); // 可以是来自python序列化的数据
BinaryArray binaryArray2 = binaryRow.getArray(1); // 零拷贝读取List<Integer> f2字段
BinaryArray binaryArray4 = binaryRow.getArray(4); // 零拷贝读取List<Bar> f4字段
BinaryRow barStruct = binaryArray4.getStruct(10);// 零拷贝读取读取List<Bar> f4第11个元素数据
// 零拷贝读取读取List<Bar> f4第11个元素数据的f2字段的第6个元素
long aLong = barStruct.getArray(1).getLong(5);
Encoder<Bar> barEncoder = Encoders.rowEncoder(Bar.class);
// 部分反序列化对象
Bar newBar = barEncoder.fromRow(barStruct);
Bar newBar2 = barEncoder.fromRow(binaryArray4.getStruct(20));
// 对象创建示例:
// Foo foo = new Foo();
// foo.f1 = 10;
// foo.f2 = IntStream.range(0, 1000000).boxed().collect(Collectors.toList());
// foo.f3 = IntStream.range(0, 1000000).boxed().collect(Collectors.toMap(i -> "k"+i, i->i));
// List<Bar> bars = new ArrayList<>(1000000);
// for (int i = 0; i < 1000000; i++) {
// Bar bar = new Bar();
// bar.f1 = "s"+i;
// bar.f2 = LongStream.range(0, 10).boxed().collect(Collectors.toList());
// bars.add(bar);
// }
// foo.f4 = bars;
自动转换Arrow
import pyfury
encoder = pyfury.encoder(Foo)
encoder.to_arrow_record_batch([foo] * 10000)
encoder.to_arrow_table([foo] * 10000)
std::shared_ptr<ArrowWriter> arrow_writer;
EXPECT_TRUE(
ArrowWriter::Make(schema, ::arrow::default_memory_pool(), &arrow_writer)
.ok());
for (auto &row : rows) {
EXPECT_TRUE(arrow_writer->Write(row).ok());
}
std::shared_ptr<::arrow::RecordBatch> record_batch;
EXPECT_TRUE(arrow_writer->Finish(&record_batch).ok());
EXPECT_TRUE(record_batch->Validate().ok());
EXPECT_EQ(record_batch->num_columns(), schema->num_fields());
EXPECT_EQ(record_batch->num_rows(), row_nums);
Schema schema = TypeInference.inferSchema(BeanA.class);
ArrowWriter arrowWriter = ArrowUtils.createArrowWriter(schema);
encoder = Encoders.rowEncoder(BeanA.class);
for (int i = 0; i < 10; i++) {
BeanA beanA = BeanA.createBeanA(2);
arrowWriter.write(encoder.toRow(beanA));
}
return arrowWriter.finishAsRecordBatch();
对比其它序列化框架
功能比较
多语言/跨语言:是否支持多种语言以及是否支持跨语言序列化 自动序列化:是否需要写大量序列化代码,还是可以完全自动话 是否需要schema编译:是否需要编写schema IDL文件,并编译schema生成代码 自定义类型:是否支持自定义类型,即POJO/DataClass/Struct等 非自定义类型:是否支持非自定义类型,即是否支持直接序列化基本类型、数组、List、Map等,还是需要将这些类型包装到自定义类型内部才能进行序列化 引用/循环引用:对于指向同一个对象的两份引用,是否只会序列化数据一次;对于循环引用,是否能够进行序列化而不是出现递归报错 多态子类型:对于List/Map的多个子类型如ArrayList/LinkedList/ImmutableList,HashMap/LinkedHashMap等,反序列化是否能够得到相同的类型,还是会变成ArrayList和HashMap 反序列化是否需要传入类型:即是否需要在反序列化时需要提前知道数据对应的类型。如果需要的话则灵活性和易用性会受到限制,而且传入的类型不正确的话反序列化可能会crash 部分反序列化/随机读写:反序列化是否可以只读取部分字段或者嵌套的部分字段,对于大对象这可以节省大量序列化开销 堆外内存读写:即是否支持直接读写native内存 数值类型可空:是否支持基本类型为null,比如Java的Integer等装箱类型以及python的int/float可能为null。
性能比较(数值越小越好)
操作系统:4.9.151-015.ali3000.alios7.x86_64 CPU型号:Intel(R) Xeon(R) Platinum 8163 CPU @ 2.50GHz Byte Order:Little Endian L1d cache:32K L1i cache:32K L2 cache:1024K L3 cache:33792K
自定义类型性能对比
Struct
public class Struct implements Serializable {
int f1;
long f2;
float f3;
double f4;
...
int f97;
long f98;
float f99;
double f100;
}
Sample
public final class Sample implements Serializable {
public int intValue;
public long longValue;
public float floatValue;
public double doubleValue;
public short shortValue;
public char charValue;
public boolean booleanValue;
public Integer IntValue;
public Long LongValue;
public Float FloatValue;
public Double DoubleValue;
public Short ShortValue;
public Character CharValue;
public Boolean BooleanValue;
public int[] intArray;
public long[] longArray;
public float[] floatArray;
public double[] doubleArray;
public short[] shortArray;
public char[] charArray;
public boolean[] booleanArray;
public String string; // Can be null.
public Sample sample; // Can be null.
public Sample() {}
public Sample populate(boolean circularReference) {
intValue = 123;
longValue = 1230000;
floatValue = 12.345f;
doubleValue = 1.234567;
shortValue = 12345;
charValue = '!';
booleanValue = true;
IntValue = 321;
LongValue = 3210000L;
FloatValue = 54.321f;
DoubleValue = 7.654321;
ShortValue = 32100;
CharValue = '$';
BooleanValue = Boolean.FALSE;
intArray = new int[] {-1234, -123, -12, -1, 0, 1, 12, 123, 1234};
longArray = new long[] {-123400, -12300, -1200, -100, 0, 100, 1200, 12300, 123400};
floatArray = new float[] {-12.34f, -12.3f, -12, -1, 0, 1, 12, 12.3f, 12.34f};
doubleArray = new double[] {-1.234, -1.23, -12, -1, 0, 1, 12, 1.23, 1.234};
shortArray = new short[] {-1234, -123, -12, -1, 0, 1, 12, 123, 1234};
charArray = "asdfASDF".toCharArray();
booleanArray = new boolean[] {true, false, false, true};
string = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
if (circularReference) {
sample = this;
}
return this;
}
}
MediaContent
public final class Media implements java.io.Serializable {
public String uri;
public String title; // Can be null.
public int width;
public int height;
public String format;
public long duration;
public long size;
public int bitrate;
public boolean hasBitrate;
public List<String> persons;
public Player player;
public String copyright; // Can be null.
public Media() {}
public enum Player {
JAVA,
FLASH;
}
}
public final class MediaContent implements java.io.Serializable {
public Media media;
public List<Image> images;
public MediaContent() {}
public MediaContent(Media media, List<Image> images) {
this.media = media;
this.images = images;
}
public MediaContent populate(boolean circularReference) {
media = new Media();
media.uri = "http://javaone.com/keynote.ogg";
media.width = 641;
media.height = 481;
media.format = "video/theora\u1234";
media.duration = 18000001;
media.size = 58982401;
media.persons = new ArrayList();
media.persons.add("Bill Gates, Jr.");
media.persons.add("Steven Jobs");
media.player = Media.Player.FLASH;
media.copyright = "Copyright (c) 2009, Scooby Dooby Doo";
images = new ArrayList();
Media media = circularReference ? this.media : null;
images.add(
new Image(
"http://javaone.com/keynote_huge.jpg",
"Javaone Keynote\u1234",
32000,
24000,
Image.Size.LARGE,
media));
images.add(
new Image(
"http://javaone.com/keynote_large.jpg", null, 1024, 768, Image.Size.LARGE, media));
images.add(
new Image("http://javaone.com/keynote_small.jpg", null, 320, 240, Image.Size.SMALL, media));
return this;
}
}
Buffer零拷贝性能对比
基本类型数组
public class ArraysData implements Serializable {
public boolean[] booleans;
public byte[] bytes;
public int[] ints;
public long[] longs;
public double[] doubles;
public ArraysData() {}
public ArraysData(int arrLength) {
booleans = new boolean[arrLength];
bytes = new byte[arrLength];
ints = new int[arrLength];
longs = new long[arrLength];
doubles = new double[arrLength];
Random random = new Random();
random.nextBytes(bytes);
for (int i = 0; i < arrLength; i++) {
booleans[i] = random.nextBoolean();
ints[i] = random.nextInt();
longs[i] = random.nextLong();
doubles[i] = random.nextDouble();
}
}
}
堆外Buffer
易用性比较
class Foo {
String f1;
Map<String, Integer> f2;
}
class Bar {
Foo f1;
String f2;
List<Foo> f3;
Map<Integer, Foo> f4;
Integer f5;
Long f6;
Float f7;
Double f8;
short[] f9;
List<Long> f10;
}
Fury序列化
Fury fury = Fury.builder().withLanguage(Language.XLANG).build();
byte[] data = fury.serialize(bar);
// 这里的data可以是被Fury python/Golang实现序列化的数据
Bar newBar = fury.deserialize(data);
对比Protobuf
首先需要安装protoc编译器[10],注意protoc的版本不能高于proto依赖库的版本 然后定义针对需要序列化的对象的schema:
syntax = "proto3";
package protobuf;
option java_package = "io.ray.fury.benchmark.state.generated";
option java_outer_classname = "ProtoMessage";
message Foo {
optional string f1 = 1;
int32> f2 = 2;
}
message Bar {
optional Foo f1 = 1;
optional string f2 = 2;
repeated Foo f3 = 3;
Foo> f4 = 4;
optional int32 f5 = 5;
optional int64 f6 = 6;
optional float f7 = 7;
optional double f8 = 8;
repeated int32 f9 = 9; // proto不支持int16
repeated int64 f10 = 10;
}
然后通过protoc编译schema生成Java/Python/GoLang代码文件。
java: protoc --experimental_allow_proto3_optional -I=src/main/java/io/ray/fury/benchmark/state --java_out=src/main/java/ bench.proto bench.proto 生成Python/GoLang代码
为了避免把生成的代码提交到代码仓库,需要将proto跟构建工具进行集成,这块较为复杂,存在大量构建工具集成成本。且由于构建工具的不完善,这部分依然无法完全自动化,比如protobuf-maven-plugin[11]依然需要用户在机器安装protoc,而不是自动下载protoc。 由于大部分场景都是用户已经有了自定义类型和基本类型以及组合类型构成的对象(树)需要被序列化,因此需要将用户类型对象转换成protobuf格式。这里面就有较大的开发成本,且每种需要都需要写一遍,代码冗长且易出错难维护,同时还存在大量数据转换和拷贝开销。另外转换过程没有考虑实际类型,因此还存在类型丢失的问题,比如LinkedList反序列化回来变成了ArrayList。下面是Java的序列化代码,大概需要130~150行。
public static byte[] serializeBar(Bar bar) {
return build(bar).build().toByteArray();
}
public static ProtoMessage.Bar.Builder build(Bar bar) {
ProtoMessage.Bar.Builder barBuilder = ProtoMessage.Bar.newBuilder();
if (bar.f1 == null) {
barBuilder.clearF1();
} else {
barBuilder.setF1(buildFoo(bar.f1));
}
if (bar.f2 == null) {
barBuilder.clearF2();
} else {
barBuilder.setF2(bar.f2);
}
if (bar.f3 == null) {
barBuilder.clearF3();
} else {
for (Foo foo : bar.f3) {
barBuilder.addF3(buildFoo(foo));
}
}
if (bar.f4 == null) {
barBuilder.clearF4();
} else {
bar.f4.forEach(
(k, v) -> {
ProtoMessage.Foo.Builder fooBuilder1 = ProtoMessage.Foo.newBuilder();
fooBuilder1.setF1(v.f1);
v.f2.forEach(fooBuilder1::putF2);
barBuilder.putF4(k, fooBuilder1.build());
});
}
if (bar.f5 == null) {
barBuilder.clearF5();
} else {
barBuilder.setF5(bar.f5);
}
if (bar.f6 == null) {
barBuilder.clearF6();
} else {
barBuilder.setF6(bar.f6);
}
if (bar.f7 == null) {
barBuilder.clearF7();
} else {
barBuilder.setF7(bar.f7);
}
if (bar.f8 == null) {
barBuilder.clearF8();
} else {
barBuilder.setF8(bar.f8);
}
if (bar.f9 == null) {
barBuilder.clearF9();
} else {
for (short i : bar.f9) {
barBuilder.addF9(i);
}
}
if (bar.f10 ==null) {
barBuilder.clearF10();
} else {
barBuilder.addAllF10(bar.f10);
}
return barBuilder;
}
public static ProtoMessage.Foo.Builder buildFoo(Foo foo) {
ProtoMessage.Foo.Builder builder = ProtoMessage.Foo.newBuilder();
if (foo.f1 == null) {
builder.clearF1();
} else {
builder.setF1(foo.f1);
}
if (foo.f2 == null) {
builder.clearF2();
} else {
foo.f2.forEach(builder::putF2);
}
return builder;
}
public static Foo fromFooBuilder(ProtoMessage.Foo.Builder builder) {
Foo foo = new Foo();
if (builder.hasF1()) {
foo.f1 = builder.getF1();
}
foo.f2 = builder.getF2Map();
return foo;
}
public static Bar deserializeBar(byte[] bytes) throws InvalidProtocolBufferException {
Bar bar = new Bar();
ProtoMessage.Bar.Builder barBuilder = ProtoMessage.Bar.newBuilder();
barBuilder.mergeFrom(bytes);
if (barBuilder.hasF1()) {
bar.f1 = fromFooBuilder(barBuilder.getF1Builder());
}
if (barBuilder.hasF2()) {
bar.f2 = barBuilder.getF2();
}
bar.f3 =
barBuilder.getF3BuilderList().stream()
.map(ProtoState::fromFooBuilder)
.collect(Collectors.toList());
bar.f4 = new HashMap<>();
barBuilder.getF4Map().forEach((k, v) -> bar.f4.put(k, fromFooBuilder(v.toBuilder())));
if (barBuilder.hasF5()) {
bar.f5 = barBuilder.getF5();
}
if (barBuilder.hasF6()) {
bar.f6 = barBuilder.getF6();
}
if (barBuilder.hasF7()) {
bar.f7 = barBuilder.getF7();
}
if (barBuilder.hasF8()) {
bar.f8 = barBuilder.getF8();
}
bar.f9 = new short[barBuilder.getF9Count()];
for (int i = 0; i < barBuilder.getF9Count(); i++) {
bar.f9[i] = (short) barBuilder.getF9(i);
}
bar.f10 = barBuilder.getF10List();
return bar;
}
即使之前没有针对该数据的自定义类型,也无法将protobuf生成的class直接用在业务代码里面。因为protobuf生成的class并不符合面向对象设计[12],无法给生成的class添加行为。这时候就需要定义额外的wrapper,如果自动内部有其它自定义类型,还需要将这些类型转换成对应的wrapper,这进一步限制了使用的灵活性。
对比Flatbuffer
安装flatc编译器[13],对于Linux环境,可能还需要进行源码编译安装flatc。 定义Schema
namespace io.ray.fury.benchmark.state.generated;
table FBSFoo {
string:string;
f2_key:[string]; // flatbuffers不支持map
f2_value:[int];
}
table FBSBar {
f1:FBSFoo;
f2:string;
f3:[FBSFoo];
f4_key:[int]; // flatbuffers不支持map
f4_value:[FBSFoo];
f5:int;
f6:long;
f7:float;
f8:double;
f9:[short];
f10:[long];
由于fbs不支持基本类型nullable,因此还需要单独一组字段或者一个vector标识这些值是否为null
}
root_type FBSBar;
然后通过flatc编译schema生成Java/Python/GoLang代码文件。
java: flatc -I=src/main/java/io/ray/fury/benchmark/state -o=src/main/java/ bar.fbs
生成Python/GoLang代码
为了避免把生成的代码提交到代码仓库,需要将proto跟构建工具进行集成,目前似乎只有bazel构建工具有比较好的集成,别的构建工具如maven/gradle等似乎都没有比较好的集成方式。
因为生成的类不符合面向对象设计无法直接添加行为,同时已有系统里面已经有了需要被序列化的类型,因此也需要将已有类型的对象序列化成flatbuffer格式。Flatbuffer序列化代码不仅存在和Protobuf一样代码冗长易出错难维护问题,还存在以下问题:
代码不灵活、难写且易出错。由于flatbuffer在序列化对象树时需要先深度优先和先序遍历整颗对象树,并手动保存每个变长字段的offset到临时状态,然后再序列化所有字段偏移或者内联标量值,这块代码写起来非常繁琐,一旦offset存储出现错误,序列化将会出现assert/exception/panic等报错,较难排查。
list元素需要按照反向顺序进行序列化不符合直觉。由于buffer是从后往前构建,因此对于list,需要将元素逆向依次进行序列化。
不支持map类型,需要将map序列化为两个list或者序列化为一个table,进一步带来了额外的开发成本。
public static byte[] serialize(Bar bar) {
return buildBar(bar).sizedByteArray();
}
public static FlatBufferBuilder buildBar(Bar bar) {
// 这里忽略了空值处理的代码
FlatBufferBuilder builder = new FlatBufferBuilder();
int f2_offset = builder.createString(bar.f2);
int[] f3_offsets = new int[bar.f3.size()];
for (int i = 0; i < bar.f3.size(); i++) {
f3_offsets[i] = buildFoo(builder, bar.f3.get(i));
}
int f3_offset = FBSBar.createF3Vector(builder, f3_offsets);
int f4_key_offset;
int f4_value_offset;
{
int[] keys = new int[bar.f4.size()];
int[] valueOffsets = new int[bar.f4.size()];
int i = 0;
for (Map.Entry<Integer, Foo> entry : bar.f4.entrySet()) {
keys[i] = entry.getKey();
valueOffsets[i] = buildFoo(builder, entry.getValue());
i++;
}
f4_key_offset = FBSBar.createF4KeyVector(builder, keys);
f4_value_offset = FBSBar.createF4ValueVector(builder, valueOffsets);
}
int f9_offset = FBSBar.createF9Vector(builder, bar.f9);
int f10_offset = FBSBar.createF10Vector(builder, bar.f10.stream().mapToLong(x -> x).toArray());
FBSBar.startFBSBar(builder);
FBSBar.addF1(builder, buildFoo(builder, bar.f1));
FBSBar.addF2(builder, f2_offset);
FBSBar.addF3(builder, f3_offset);
FBSBar.addF4Key(builder, f4_key_offset);
FBSBar.addF4Value(builder, f4_value_offset);
FBSBar.addF5(builder, bar.f5);
FBSBar.addF6(builder, bar.f6);
FBSBar.addF7(builder, bar.f7);
FBSBar.addF8(builder, bar.f8);
FBSBar.addF9(builder, f9_offset);
FBSBar.addF10(builder, f10_offset);
builder.finish(FBSBar.endFBSBar(builder));
return builder;
}
public static int buildFoo(FlatBufferBuilder builder, Foo foo) {
int stringOffset = builder.createString(foo.f1);
int[] keyOffsets = new int[foo.f2.size()];
int[] values = new int[foo.f2.size()];
int i = 0;
for (Map.Entry<String, Integer> entry : foo.f2.entrySet()) {
keyOffsets[i] = builder.createString(entry.getKey());
values[i] = entry.getValue();
i++;
}
int keyOffset = FBSFoo.createF2KeyVector(builder, keyOffsets);
int f2ValueOffset = FBSFoo.createF2ValueVector(builder, values);
return FBSFoo.createFBSFoo(builder, stringOffset, keyOffset, f2ValueOffset);
}
public static Bar deserializeBar(ByteBuffer buffer) {
Bar bar = new Bar();
FBSBar fbsBar = FBSBar.getRootAsFBSBar(buffer);
bar.f1 = deserializeFoo(fbsBar.f1());
bar.f2 = fbsBar.f2();
{
ArrayList<Foo> f3List = new ArrayList<>();
for (int i = 0; i < fbsBar.f3Length(); i++) {
f3List.add(deserializeFoo(fbsBar.f3(i)));
}
bar.f3 = f3List;
}
{
Map<Integer, Foo> f4 = new HashMap<>();
for (int i = 0; i < fbsBar.f4KeyLength(); i++) {
f4.put(fbsBar.f4Key(i), deserializeFoo(fbsBar.f4Value(i)));
}
bar.f4 = f4;
}
bar.f5 = fbsBar.f5();
bar.f6 = fbsBar.f6();
bar.f7 = fbsBar.f7();
bar.f8 = fbsBar.f8();
{
short[] f9 = new short[fbsBar.f9Length()];
for (int i = 0; i < fbsBar.f9Length(); i++) {
f9[i] = fbsBar.f9(i);
}
bar.f9 = f9;
}
{
List<Long> f10 = new ArrayList<>();
for (int i = 0; i < fbsBar.f10Length(); i++) {
f10.add(fbsBar.f10(i));
}
bar.f10 = f10;
}
return bar;
}
public static Foo deserializeFoo(FBSFoo fbsFoo) {
Foo foo = new Foo();
foo.f1 = fbsFoo.string();
HashMap<String, Integer> map = new HashMap<>();
foo.f2 = map;
for (int i = 0; i < fbsFoo.f2KeyLength(); i++) {
map.put(fbsFoo.f2Key(i), fbsFoo.f2Value(i));
}
return foo;
}
将Flatbuffer生成类型包装到其它符合面向对象设计的类里面:由于Flatbuffer序列化过程需要保存大量中间offset,且需要先把所有可变长度对象写入buffer,因此通过wrapper修改flatbuffer数据会比较复杂,使得包装Flatbuffer生成类型只适合反序列化读数据过程,导致添加wrapper也变得很困难。
对比Msgpack
总结
跨语言原生序列化,大幅提高了跨语言序列化的易用性,降低研发成本;
通过JIT技术来优化序列化性能。这里也可以看到通过把数据库和大数据领域的代码生成思想用在序列化上面是一个很好的思路,可以取得非常显著的性能提升;
Zero-Copy序列化,避免所有不必要的内存拷贝;
多语言行存支持避免序列化和元数据开销;
协议层面
JIT代码生成支持数据压缩模式 进一步通过SIMD向量化指令进行大规模数据压缩
框架层面
更多Java序列化代码JIT化; 完善C++支持,通过使用Macro、模板和编译时反射在编译时注册捕获Fury需要的类型信息,实现自动C++序列化; 通过Golang-ASM支持基于JIT的Golang序列化实现; 通过将更多Python代码Cython化来进一步加速Python序列化; 支持JavaScript,打通NodeJS生态; 支持Rust;
生态层面
与RPC框架SOFA、Dubbo、Akka等集成 与分布式计算框架Spark和Flink等集成
[1]https://github.com/EsotericSoftware/kryo
[2]https://spark.apache.org/docs/latest/index.html
[3]https://flink.apache.org/
[4]https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
[5]https://arrow.apache.org/
[6]https://developers.google.com/protocol-buffers/docs/javatutorial#parsing-and-serialization
[7]https://peps.python.org/pep-0574
[8]https://github.com/EsotericSoftware/kryo/tree/master/benchmarks
[9]https://openjdk.org/projects/code-tools/jmh/
[10]https://developers.google.com/protocol-buffers/docs/downloads
[11]https://www.xolstice.org/protobuf-maven-plugin/usage.html
[12]https://developers.google.com/protocol-buffers/docs/javatutorial#parsing-and-serialization
[13]https://github.com/google/flatbuffers/releases
[14]https://github.com/ray-project/ray
重磅来袭!2022上半年阿里云社区最热电子书榜单!
千万阅读量、百万下载量、上百本电子书,近200位阿里专家参与编写。多元化选择、全领域覆盖,汇聚阿里巴巴技术实践精华,读、学、练一键三连。开发者藏经阁,开发者的工作伴侣~
点击阅读原文查看详情。
微信扫码关注该文公众号作者