一种新的流:为Java加入生成器(Generator)特性
阿里妹导读
一种全新的设计模式,数学美感与工程实用价值兼备,且不限编程语言。本文将以Java为样例,从无到有实现出完整的流式API,引入生成器特性,并介绍诸多应用场景。
前言
这一设计模式并非因Java而生,而是诞生于一个十分简陋的脚本语言。它对语言特性的要求非常之低,因而其价值对众多现代编程语言都是普适的。
关于Stream
首先大概回顾下Java里传统的流式API。自Java8引入lambda表达式和Stream以来,Java的开发便捷性有了质的飞跃,Stream在复杂业务逻辑的处理上让人效率倍增,是每一位Java开发者都应该掌握的基础技能。但排除掉parallelStream也即并发流之外,它其实并不是一个好的设计。
第一,封装过重,实现过于复杂,源码极其难读。我能理解这或许是为了兼容并发流所做的妥协,但毕竟耦合太深,显得艰深晦涩。每一位初学者被源码吓到之后,想必都会产生流是一种十分高级且实现复杂的特性的印象。实际上并不是这样,流其实可以用非常简单的方式构建。
第二、API过于冗长。冗长体现在stream.collect这一部分,作为对比,Kotlin提供的toList/toSet/associate(toMap)等等丰富操作是可以直接作用在流上的。Java直到16才抠抠索索加进来一个Stream可以直接调用的toList,他们甚至不肯把toSet/toMap一起加上。
第三、API功能简陋。对于链式操作,在最初的Java8里只有map/filter/skip/limit/peek/distinct/sorted这七个,Java9又加上了takeWhile/dropWhile。然而在Kotlin中,除了这几个之外人还有许多额外的实用功能,
例如:
mapIndexed, mapNotNull, filterIndexed, filterNotNull, onEachIndexed, distinctBy, sortedBy, sortedWith, zip, zipWithNext等等,翻倍了不止。这些东西实现起来并不复杂,就是个顺手的事,但对于用户而言有和没有的体验差异可谓巨大。
在这篇文章里,我将提出一种全新的机制用于构建流。这个机制极其简单,任何能看懂lambda表达式(闭包)的同学都能亲手实现,任何支持闭包的编程语言都能利用该机制实现自己的流。也正是由于这个机制足够简单,所以开发者可以以相当低的成本撸出大量的实用API,使用体验甩开Stream两条街,不是问题。
关于生成器
生成器(Generator)[1]是许多现代编程语言里一个广受好评的重要特性,在Python/Kotlin/C#/Javascript等等语言中均有直接支持。它的核心API就是一个yield关键字(或者方法)。有了生成器之后,无论是iterable/iterator,还是一段乱七八糟的闭包,都可以直接映射为一个流。举个例子,假设你想实现一个下划线字符串转驼峰的方法,在Python里你可以利用生成器这么玩
def underscore_to_camelcase(s):
def camelcase():
yield str.lower
while True:
yield str.capitalize
return ''.join(f(sub) for sub, f in zip(s.split('_'), camelcase()))
以上代码里的操作, 在任何支持生成器的语言里都可以轻易完成,但是在Java里你恐怕连想都不敢想。Java有史以来,无论是历久弥新的Java8,还是最新的引入了Project Loom[3]的OpenJDK19,连协程都有了,依然没有直接支持生成器。
本质上,生成器的实现要依赖于continuation[4]的挂起和恢复,所谓continuation可以直观理解为程序执行到指定位置后的断点,协程就是指在这个函数的断点挂起后跳到另一个函数的某个断点继续执行,而不会阻塞线程,生成器亦如是。Python通过栈帧的保存与恢复实现函数重入以及生成器[5],Kotlin在编译阶段利用CPS(Continuation Passing Style)[6]技术对字节码进行了变换,从而在JVM上模拟了协程[7]。其他的语言要么大体如此,要么有更直接的支持。
那么,有没有一种办法,可以在没有协程的Java里,实现或者至少模拟出一个yield关键字,从而动态且高性能地创建流呢。答案是,有。
正文
概念定义
public interface Seq<T> {
void consume(Consumer<T> consumer);
}
List<Integer> list = Arrays.asList(1, 2, 3);
Seq<Integer> seq = list::forEach;
可以看到,在这个例子里consume和forEach是完全等价的,事实上这个接口我最早就是用forEach命名的,几轮迭代之后才改成含义更准确的consume。
static <T> Seq<T> unit(T t) {
return c -> c.accept(t);
}
这个方法在数学上很重要(实操上其实用的不多),它定义了Seq这个泛型类型的单位元操作,即T -> Seq<T>的映射。
map与flatMap
map
从forEach的直观角度出发,我们很容易写出map[8],将类型为T的流,转换为类型为E的流,也即根据函数T -> E得到Seq<T> -> Seq<E>的映射。
default <E> Seq<E> map(Function<T, E> function) {
return c -> consume(t -> c.accept(function.apply(t)));
}
flatMap
default <E> Seq<E> flatMap(Function<T, Seq<E>> function) {
return c -> consume(t -> function.apply(t).consume(c));
}
大家可以自己在IDEA里写写这两个方法,结合智能提示,写起来其实非常方便。如果你觉得理解起来不太直观,就把Seq看作是List,把consume看作是forEach就好。
filter与take/drop
filter
default Seq<T> filter(Predicate<T> predicate) {
return c -> consume(t -> {
if (predicate.test(t)) {
c.accept(t);
}
});
}
take
public final class StopException extends RuntimeException {
public static final StopException INSTANCE = new StopException();
public synchronized Throwable fillInStackTrace() {
return this;
}
}
以及相应的方法
static <T> T stop() {
throw StopException.INSTANCE;
}
default void consumeTillStop(C consumer) {
try {
consume(consumer);
} catch (StopException ignore) {}
}
然后就可以实现take了:
default Seq<T> take(int n) {
return c -> {
int[] i = {n};
consumeTillStop(t -> {
if (i[0]-- > 0) {
c.accept(t);
} else {
stop();
}
});
};
}
drop
default Seq<T> drop(int n) {
return c -> {
int[] a = {n - 1};
consume(t -> {
if (a[0] < 0) {
c.accept(t);
} else {
a[0]--;
}
});
};
}
onEach
对流的某个元素添加一个操作consumer,但是不执行流——对应Stream.peek。
default Seq<T> onEach(Consumer<T> consumer) {
return c -> consume(consumer.andThen(c));
}
zip
default <E, R> Seq<R> zip(Iterable<E> iterable, BiFunction<T, E, R> function) {
return c -> {
Iterator<E> iterator = iterable.iterator();
consumeTillStop(t -> {
if (iterator.hasNext()) {
c.accept(function.apply(t, iterator.next()));
} else {
stop();
}
});
};
}
终端操作
default String join(String sep) {
StringJoiner joiner = new StringJoiner(sep);
consume(t -> joiner.add(t.toString()));
return joiner.toString();
}
以及toList。
default List<T> toList() {
List<T> list = new ArrayList<>();
consume(list::add);
return list;
}
至此为止,我们仅仅只用几十行代码,就实现出了一个五脏俱全的流式API。在大部分情况下,这些API已经能覆盖百分之八九十的使用场景。你完全可以依样画葫芦,在其他编程语言里照着玩一玩,比如Go(笑)。
生成器的推导
List<Integer> list = Arrays.asList(1, 2, 3);
Seq<Integer> seq = list::forEach;
没看出来?那把这个方法推导改写为普通lambda函数,有
Seq<Integer> seq = c -> list.forEach(c);
再进一步,把这个forEach替换为更传统的for循环,有
Seq<Integer> seq = c -> {
for (Integer i : list) {
c.accept(i);
}
};
由于已知这个list就是[1, 2, 3],所以以上代码可以进一步等价写为
Seq<Integer> seq = c -> {
c.accept(1);
c.accept(2);
c.accept(3);
};
是不是有点眼熟?不妨看看Python里类似的东西长啥样:
def seq():
yield 1
yield 2
yield 3
二者相对比,形式几乎可以说一模一样——这其实就已经是生成器了,这段代码里的accept就扮演了yield的角色,consume这个接口之所以取这个名字,含义就是指它是一个消费操作,所有的终端操作都是基于这个消费操作实现的。功能上看,它完全等价于Iterable的forEach,之所以又不直接叫forEach,是因为它的元素并不是本身自带的,而是通过闭包内的代码块临时生成的。
这种生成器,并非传统意义上利用continuation挂起的生成器,而是利用闭包来捕获代码块里临时生成的元素,哪怕没有挂起,也能高度模拟传统生成器的用法和特性。其实上文所有链式API的实现,本质上也都是生成器,只不过生成的元素来自于原始的流罢了。
有了生成器,我们就可以把前文提到的下划线转驼峰的操作用Java也依样画葫芦写出来了。
static String underscoreToCamel(String str) {
// Java没有首字母大写方法,随便现写一个
UnaryOperator<String> capitalize = s -> s.substring(0, 1).toUpperCase() + s.substring(1).toLowerCase();
// 利用生成器构造一个方法的流
Seq<UnaryOperator<String>> seq = c -> {
// yield第一个小写函数
c.accept(String::toLowerCase);
// 这里IDEA会告警,提示死循环风险,无视即可
while (true) {
// 按需yield首字母大写函数
c.accept(capitalize);
}
};
List<String> split = Arrays.asList(str.split("_"));
// 这里的zip和join都在上文给出了实现
return seq.zip(split, (f, sub) -> f.apply(sub)).join("");
}
大家可以把这几段代码拷下来跑一跑,看它是不是真的实现了其目标功能。
生成器的本质
生产者-消费者模式
生产者与消费者的关系不止出现在多线程或者协程语境下,在单线程里也有一些经典场景。比如A和B两名同学合作一个项目,分别开发两个模块:A负责产出数据,B负责使用数据。A不关心B怎么处理数据,可能要先过滤一些,进行聚合后再做计算,也可能是写到某个本地或者远程的存储;B自然也不关心A的数据是怎么来的。这里边唯一的问题在于,数据条数实在是太多了,内存一次性放不下。在这种情况下,传统的做法是让A提供一个带回调函数consumer的接口,B在调用A的时候传入一个具体的consumer。
public void produce(Consumer<String> callback) {
// do something that produce strings
// then use the callback consumer to eat them
}
这种基于回调函数的交互方式实在是过于经典了,原本没啥可多说的。但是在已经有了生成器之后,我们不妨胆子放大一点稍微做一下改造:仔细观察上面这个produce接口,它输入一个consumer,返回void——咦,所以它其实也是一个Seq嘛!
Seq<String> producer = this::produce;
接下来,我们只需要稍微调整下代码,就能对这个原本基于回调函数的接口进行一次升级,将它变成一个生成器。
public Seq<String> produce() {
return c -> {
// still do something that produce strings
// then use the callback consumer to eat them
};
}
基于这一层抽象,作为生产者的A和作为消费者的B就真正做到完全的、彻底的解耦了。A只需要把数据生产过程放到生成器的闭包里,期间涉及到的所有副作用,例如IO操作等,都被这个闭包完全隔离了。B则直接拿到一个干干净净的流,他不需要关心流的内部细节,当然想关心也关心不了,他只用专注于自己想做的事情即可。
更重要的是,A和B虽然在操作逻辑上完全解耦,互相不可见,但在CPU调度时间上它们却是彼此交错的,B甚至还能直接阻塞、中断A的生产流程——可以说没有协程,胜似协程。
至此,我们终于成功发现了Seq作为生成器的真正本质:consumer of callback。明明是一个回调函数的消费者,摇身一变就成了生产者,实在是有点奇妙。不过仔细一想倒也合理:能够满足消费者需求(callback)的家伙,不管这需求有多么奇怪,可不就是生产者么。
容易发现,基于callback机制的生成器,其调用开销完全就只有生成器闭包内部那堆代码块的执行开销,加上一点点微不足道的闭包创建开销。在诸多涉及到流式计算与控制的业务场景里,这将带来极为显著的内存与性能优势。后面我会给出展现其性能优势的具体场景实例。
另外,观察这段改造代码,会发现produce输出的东西,根本就还是个函数,没有任何数据被真正执行和产出。这就是生成器作为一个匿名接口的天生优势:惰性计算——消费者看似得到了整个流,实际那只是一张爱的号码牌,可以涂写,可以废弃,但只有在拿着货真价实的callback去兑换的那一刻,才会真正的执行流。
生成器的本质,正是人类本质的反面:鸽子克星——没有任何人可以鸽它
IO隔离与流输出
Stream<String> lines = new BufferedReader(new InputStreamReader(new FileInputStream("file"))).lines();
那么有没有更普适做法呢,毕竟不是所有人都清楚BufferedReader.lines和Files.lines会有这种安全性上的区别,也不是所有的Closeable都能提供类似的安全关闭的流式接口,甚至大概率压根就没有流式接口。
好在现在我们有了Seq,它的闭包特性自带隔离副作用的先天优势。恰巧在涉及大量数据IO的场景里,利用callback交互又是极为经典的设计方式——这里简直就是它大展拳脚的最佳舞台。
用生成器实现IO的隔离非常简单,只需要整个包住try-with-resources代码即可,它同时就包住了IO的整个生命周期。
Seq<String> seq = c -> {
try (BufferedReader reader = Files.newBufferedReader(Paths.get("file"))) {
String s;
while ((s = reader.readLine()) != null) {
c.accept(s);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
};
核心代码其实就3行,构建数据源,挨个读数据,然后yield(即accept)。后续对流的任何操作看似发生在创建流之后,实际执行起来都被包进了这个IO生命周期的内部,读一个消费一个,彼此交替,随用随走。
换句话讲,生成器的callback机制,保证了哪怕Seq可以作为变量四处传递,但涉及到的任何副作用操作,都是包在同一个代码块里惰性执行的。它不需要像Monad那样,还得定义诸如IOMonad,StateMonad等等花样众多的Monad。
与之类似,这里不妨再举个阿里中间件的例子,利用Tunnel将大家熟悉的ODPS表数据下载为一个流:
public static Seq<Record> downloadRecords(TableTunnel.DownloadSession session) {
return c -> {
long count = session.getRecordCount();
try (TunnelRecordReader reader = session.openRecordReader(0, count)) {
for (long i = 0; i < count; i++) {
c.accept(reader.read());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
};
}
有了Record流之后,如果再能实现出一个map函数,就可以非常方便的将Record流map为带业务语义的DTO流——这其实就等价于一个ODPS Reader。
异步流
public static Seq<Integer> asyncSeq() {
return c -> {
CompletableFuture.runAsync(() -> c.accept(1));
CompletableFuture.runAsync(() -> c.accept(2));
};
}
这就是一个简单而粗暴的异步流生成器。对于外部使用者来说,异步流除了不能保证元素顺序,它和同步流没有任何区别,本质上都是一段可运行的代码,边运行边产生数据。一个callback函数,谁给用不是用呢。
并发流
对此我们不妨采用最为暴力而简单的思路,构造一个ForkJoinTask的list,依次将元素提交forkJoinPool后,产生一个task并添加进这个list,等所有元素全部提交完毕后,再对这个list里的所有task统一join。
default Seq<T> parallel() {
ForkJoinPool pool = ForkJoinPool.commonPool();
return c -> map(t -> pool.submit(() -> c.accept(t))).cache().consume(ForkJoinTask::join);
}
这就是基于生成器的并发流,它的实现仅仅只需要两行代码——正如本文开篇所说,流可以用非常简单的方式构建。哪怕是Stream费了老大劲的并发流,换一种方式,实现起来可以简单到令人发指。
这里值得再次强调的是,这种机制并非Java限定,而是任何支持闭包的编程语言都能玩。事实上,这种流机制的最早验证和实现,就是我在AutoHotKey_v2[10]这个软件自带的简陋的脚本语言上完成的。
再谈生产者-消费者模式
回想一下,Seq作为一种中间数据结构,能够完全解耦生产者与消费者,一方只管生产数据交给它,另一方只管从它那里拿数据消费。这种构造有没有觉得有点眼熟?不错,正是Java开发者常见的阻塞队列,以及支持协程的语言里的通道(Channel),比如Go和Kotlin。
通道某种意义上也是一种阻塞队列,它和传统阻塞队列的主要区别,在于当通道里的数据超出限制或为空时,对应的生产者/消费者会挂起而不是阻塞,两种方式都会暂停生产/消费,只是协程挂起后能让出CPU,让它去别的协程里继续干活。
那Seq相比Channel有什么优势呢?优势可太多了:首先,生成器闭包里callback的代码块,严格确保了生产和消费必然交替执行,也即严格的先进先出、进了就出、不进不出,所以不需要单独开辟堆内存去维护一个队列,那没有队列自然也就没有锁,没有锁自然也就没有阻塞或挂起。其次,Seq本质上是消费监听生产,没有生产自然没有消费,如果生产过剩了——啊,生产永远不会过剩,因为Seq是惰性的,哪怕生产者在那儿while死循环无限生产,也不过是个司空见惯的无限流罢了。
为了更直观的理解,这里给一个简单的通道示例。先随便实现一个基于ForkJoinPool的异步消费接口,该接口允许用户自由选择消费完后是否join。
default void asyncConsume(Consumer<T> consumer) {
ForkJoinPool pool = ForkJoinPool.commonPool();
map(t -> pool.submit(() -> consumer.accept(t))).cache().consume(ForkJoinTask::join);
}
@Test
public void testChan() {
// 生产无限的自然数,放入通道seq,这里流本身就是通道,同步流还是异步流都无所谓
Seq<Long> seq = c -> {
long i = 0;
while (true) {
c.accept(i++);
}
};
long start = System.currentTimeMillis();
// 通道seq交给消费者,消费者表示只要偶数,只要5个
seq.filter(i -> (i & 1) == 0).take(5).asyncConsume(i -> {
try {
Thread.sleep(1000);
System.out.printf("produce %d and consume\n", i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.printf("elapsed time: %dms\n", System.currentTimeMillis() - start);
}
运行结果
produce 0 and consume
produce 8 and consume
produce 6 and consume
produce 4 and consume
produce 2 and consume
elapsed time: 1032ms
可以看到,由于消费是并发执行的,所以哪怕每个元素的消费都要花1秒钟,最终总体耗时也就比1秒多一点点。当然,这和传统的通道模式还是不太一样,比如实际工作线程就有很大区别。更全面的设计是在流的基础上加上无锁非阻塞队列实现正经Channel,可以附带解决Go通道的许多问题同时提升性能,后面我会另写文章专门讨论。
上文介绍了生成器的本质特性,它是一个consumer of callback,它可以以闭包的形式完美封装IO操作,它可以无缝切换为异步流和并发流,并在异步交互中扮演一个无锁的通道角色。除去这些核心特性带来的优势外,它还有非常多有趣且有价值的应用场景。
树遍历
def scan_tree(node):
yield node.value
if node.left:
yield from scan_tree(node.left)
if node.right:
yield from scan_tree(node.right)
//static <T> Seq<T> of(T... ts) {
// return Arrays.asList(ts)::forEach;
//}
// 递归函数
public static <N> void scanTree(Consumer<N> c, N node, Function<N, Seq<N>> sub) {
c.accept(node);
sub.apply(node).consume(n -> {
if (n != null) {
scanTree(c, n, sub);
}
});
}
// 通用方法,可以遍历任何树
public static <N> Seq<N> ofTree(N node, Function<N, Seq<N>> sub) {
return c -> scanTree(c, node, sub);
}
// 遍历一个二叉树
public static Seq<Node> scanTree(Node node) {
return ofTree(node, n -> Seq.of(n.left, n.right));
}
static Seq<Object> ofJson(Object node) {
return Seq.ofTree(node, n -> c -> {
if (n instanceof Iterable) {
((IterableforEach(c); >)n).
} else if (n instanceof Map) {
((MapforEach(c); , )n).values().
}
});
}
boolean hasInteger = ofJson(node).any(t -> t instanceof Integer);
这个方法的厉害之处不仅在于它足够简单,更在于它是一个短路操作。用正常代码在一个深度优先的递归函数里执行短路,要不就抛出异常,要不就额外添加一个上下文参数参与递归(只有在返回根节点后才能停止),总之实现起来都挺麻烦。但是使用Seq,你只需要一个any/all/none。
boolean isIllegal = ofJson(node).any(n -> (n instanceof String) && ((String)n).contains("114514"));
对了,JSON的前辈XML也是树的结构,结合众多成熟的XML的解析器,我们也可以实现出类似的流式扫描工具。比如说,更快的Excel解析器?
更好用的笛卡尔积
笛卡尔积对大部分开发而言可能用处不大,但它在函数式语言中是一种颇为重要的构造,在运筹学领域构建最优化模型时也极其常见。此前Java里若要利用Stream构建多重笛卡尔积,需要多层flatMap嵌套。
public static Stream<Integer> cartesian(List<Integer> list1, List<Integer> list2, List<Integer> list3) {
return list1.stream().flatMap(i1 ->
list2.stream().flatMap(i2 ->
list3.stream().map(i3 ->
i1 + i2 + i3)));
}
对于这样的场景,Scala提供了一种语法糖,允许用户以for循环+yield[11]的方式来组合笛卡尔积。不过Scala的yield就是个纯语法糖,与生成器并无直接关系,它会在编译阶段将代码翻译为上面flatMap的形式。这种糖形式上等价于Haskell里的do annotation[12]。
好在现在有了生成器,我们有了更好的选择,可以在不增加语法、不引入关键字、不麻烦编译器的前提下,直接写个嵌套for循环并输出为流。且形式更为自由——你可以在for循环的任意一层随意添加代码逻辑。
public static Seq<Integer> cartesian(List<Integer> list1, List<Integer> list2, List<Integer> list3) {
return c -> {
for (Integer i1 : list1) {
for (Integer i2 : list2) {
for (Integer i3 : list3) {
c.accept(i1 + i2 + i3);
}
}
}
};
}
换言之,Java不需要这样的糖。Scala或许原本也可以不要。
可能是Java下最快的CSV/Excel解析器
Excel则不一样,有集团开源软件EasyExcel[16]珠玉在前,我只能确保比它快,很难也不打算比它功能覆盖全。
改造EasyExcel,让它可以直接输出流
public static <T> void readEasyExcel(String file, Class<T> cls, Consumer<T> consumer) {
EasyExcel.read(file, cls, new PageReadListener<T>(list -> {
for (T person : list) {
consumer.accept(person);
}
})).sheet().doRead();
}
消费者需要关心生产者的内部缓存,比如这里的缓存就是一个list。
消费者如果想拿走全部数据,需要放一个list进去挨个add或者每次addAll。这个操作是非惰性的。
难以把读取过程转变为Stream,任何流式操作都必须要用list存完并转为流后,才能再做处理。灵活性很差。
消费者不方便干预数据生产过程,比如达到某种条件(例如个数)后直接中断,除非你在实现回调监听器时把这个逻辑override进去[17]。
public static <T> Seq<T> readExcel(String pathName, Class<T> head) {
return c -> {
ReadListener<T> listener = new ReadListener<T>() {
public void invoke(T data, AnalysisContext context) {
c.accept(data);
}
public void doAfterAllAnalysed(AnalysisContext context) {}
};
EasyExcel.read(pathName, head, listener).sheet().doRead();
};
}
这一改造我已经给EasyExcel官方提了PR[18],不过不是输出Seq,而是基于生成器原理构建的Stream,后文会有构建方式的具体介绍。
更进一步的,完全可以将对Excel的解析过程改造为生成器方式,利用一次性的callback调用避免内部大量状态的存储与修改,从而带来可观的性能提升。这一工作由于要依赖上文CsvReader的一系列API,所以暂时没法提交给EasyExcel。
用生成器构建Stream
生成器作为一种全新的设计模式,固然可以提供更为强大的流式API特性,但是毕竟不同于大家最为熟悉Stream,总会有个适应成本或者迁移成本。对于既有的已经成熟的库而言,使用Stream依然是对用户最为负责的选择。值得庆幸的是,哪怕机制完全不同,Stream和Seq仍是高度兼容的。
Stream<Integer> stream = Stream.of(1, 2, 3);
Seq<Integer> seq = stream::forEach;
那反过来Seq能否转化为Stream呢?在Java Stream提供的官方实现里,有一个StreamSupport.stream的构造工具,可以帮助用户将一个iterator转化为stream。针对这个入口,我们其实可以用生成器来构造一个非标准的iterator:不实现hastNext和next,而是单独重载forEachRemaining方法,从而hack进Stream的底层逻辑——在那迷宫一般的源码里,有一个非常隐秘的角落,一个叫AbstractPipeline.copyInto的方法,会在真正执行流的时候调用Spliterator的forEachRemaining方法来遍历元素——虽然这个方法原本是通过next和hasNext实现的,但当我们把它重载之后,就可以做到假狸猫换真太子。
public static <T> Stream<T> stream(Seq<T> seq) {
Iterator<T> iterator = new Iterator<T>() {
public boolean hasNext() {
throw new NoSuchElementException();
}
public T next() {
throw new NoSuchElementException();
}
public void forEachRemaining(Consumer<? super T> action) {
seq.consume(action::accept);
}
};
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
false);
}
public static void main(String[] args) {
Stream<Integer> stream = stream(c -> {
c.accept(0);
for (int i = 1; i < 5; i++) {
c.accept(i);
}
});
System.out.println(stream.collect(Collectors.toList()));
}
无限递推数列
public static Seq<Integer> fibonaaci() {
return c -> {
int i = 1, j = 2;
c.accept(i);
c.accept(j);
while (true) {
c.accept(j = i + (i = j));
}
};
}
流的更多特性
流的聚合
如何设计流的聚合接口是一个很复杂的话题,若要认真讨论几乎又可以整出大几千字,限于篇幅这里简单提几句好了。在我看来,好的流式API应该要让流本身能直接调用聚合函数,而不是像Stream那样,先用Collectors构造一个Collector,再用stream去调用collect。可以对比下以下两种方式,孰优孰劣一目了然:
Set<Integer> set1 = stream.collect(Collectors.toSet());
String string1 = stream.map(Integer::toString).collect(Collectors.joinning(","));
Set<Integer> set2 = seq.toSet();
String string2 = seq.join(",", Integer::toString);
public static void main(String[] args) {
Seq<Integer> seq = Seq.of(1, 2, 3, 4, 5, 6, 7, 8, 9);
double avg1 = seq.average(i -> i, i -> i); // = 6.3333
double avg2 = seq.reduce(Reducer.average(i -> i, i -> i)); // = 6.3333
Map<Integer, Double> avgMap = seq.groupBy(i -> i % 2, Reducer.average(i -> i, i -> i)); // = {0=6.0, 1=6.6}
Map<Integer, Double> avgMap2 = seq.reduce(Reducer.groupBy(i -> i % 2, Reducer.average(i -> i, i -> i)));
}
流的分段处理
static String underscoreToCamel(String str, UnaryOperator<String> capitalize) {
// split=>分段map=>join
return Seq.of(str.split("_")).map(capitalize, 1, String::toLowerCase).join("");
}
一次性流还是可重用流?
生成器的本质和人类一样,都是复读机
public class ArraySeq<T> extends ArrayList<T> implements Seq<T> {
public void consume(Consumer<T> consumer) {
forEach(consumer);
}
}
default Seq<T> cache() {
ArraySeq<T> arraySeq = new ArraySeq<>();
consume(t -> arraySeq.add(t));
return arraySeq;
}
二元流
public interface BiSeq<K, V> {
void consume(BiConsumer<K, V> consumer);
}
for i, j in zip([1, 2, 3], [4, 5, 6]):
pass
Map<Integer, String> map = new HashMap<>();
BiSeq<Integer, String> biSeq = map::forEach;
结束语
public interface Seq<T> {
void consume(Consumer<T> consumer);
}
附录
Monad
推导map的实现
default <E> Seq<E> map2(Function<T, E> function) {
return flatMap(t -> unit(function.apply(t)));
}
static <T> Seq<T> unit(T t) {
return c -> c.accept(t);
}
default <E> Seq<E> flatMap(Function<T, Seq<E>> function) {
return c -> supply(t -> function.apply(t).supply(c));
}
default <E> Seq<E> map3(Function<T, E> function) {
return flatMap(t -> c -> c.accept(function.apply(t)));
}
default <E> Seq<E> map4(Function<T, E> function) {
Function<T, Seq<E>> flatFunction = t -> c -> c.accept(function.apply(t));
return consumer -> supply(t -> flatFunction.apply(t).supply(consumer));
}
Function<T, Seq<E>> flatFunction = t -> c -> c.accept(function.apply(t));
// 等价于
BiConsumer<T, Consumer<E>> biConsumer = (t, c) -> c.accept(function.apply(t));
default <E> Seq<E> map5(Function<T, E> function) {
BiConsumer<T, Consumer<E>> biConsumer = (t, c) -> c.accept(function.apply(t));
return c -> supply(t -> biConsumer.accept(t, c));
}
default <E> Seq<E> map6(Function<T, E> function) {
return c -> supply(t -> c.accept(function.apply(t)));
}
到这一步,这个map6,就和前文从流式概念出发直接写出来的map完全一致了。证毕!
参考链接:
[1]https://en.wikipedia.org/wiki/Generator_(computer_programming)
[2]https://www.pythonlikeyoumeanit.com/Module2_EssentialsOfPython/Generators_and_Comprehensions.html
[3]https://openjdk.org/projects/loom/
[4]https://en.wikipedia.org/wiki/Continuation
[5]https://hackernoon.com/the-magic-behind-python-generator-functions-bc8eeea54220
[6]https://en.wikipedia.org/wiki/Continuation-passing_style
[7]https://kotlinlang.org/spec/asynchronous-programming-with-coroutines.html
[8]https://zh.wikipedia.org/wiki/Map_(%E9%AB%98%E9%98%B6%E5%87%BD%E6%95%B0)
[9]https://crypto.stanford.edu/~blynn/haskell/io.html
[10]https://www.autohotkey.com/docs/v2/
[11]https://stackoverflow.com/questions/1052476/what-is-scalas-yield
[12]https://stackoverflow.com/questions/10441559/scala-equivalent-of-haskells-do-notation-yet-again
[13]https://opencsv.sourceforge.net/
[14]https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv
[15]https://github.com/uniVocity/univocity-parsers
[16]https://github.com/alibaba/easyexcel
[17]https://github.com/alibaba/easyexcel/issues/1566
[18]https://github.com/alibaba/easyexcel/pull/3052
[20]https://github.com/alibaba/easyexcel/pull/3052
[21]https://github.com/alibaba/fastjson2/blob/f30c9e995423603d5b80f3efeeea229b76dc3bb8/extension/src/main/java/com/alibaba/fastjson2/support/csv/CSVParser.java#L197
[22]https://www.bilibili.com/video/BV1ha41137oW/?is_story_h5=false&p=1&share_from=ugc&share_medium=android&share_plat=android&share_session_id=96a03926-820b-4c9f-a2fd-162944103bed&share_source=COPY&share_tag=s_i×tamp=1663058544&unique_k=p94n8tD
[24]https://en.wikipedia.org/wiki/Monad_(functional_programming)
微信扫码关注该文公众号作者