Java 22 新增利器: 使用 Java Stream Gather 优雅地处理流中的状态
阿里妹导读
一、背景
二、什么 是 Stream ?
三、各种库有什么不一样
四、为什么需要 Stream Gather API?
问题空间
Java Stream API 目前主要缺乏丰富的 操作符,而如果像其他的库一样添加很多的操作符到 Stream API,又会有较大的维护负担,如果希望只添加有限的操作符,却可以解决绝大部分的问题。则新的操作符需要支持下面的特性:
支持多种类型的数据转换: 1 :1, 如 map 操作符 1 :N, 如 filter ,flatmap N :1, 如 buffer N:M, 如 intersperse 支持 “有限流” 和 “无限流”
支持 “有状态” 和 “无状态”操作
支持“提前终止”和 “全量消费”
支持 “单线程顺序处理”和 “多线程并行处理”
支持感知 “结束”,如实现一个 fold 操作符
Stream Collector & Gather
五、分析和对比
Stream Gather API 拆解
supplier:产生最初的 State
integrator + downstream:转换 和 传递值;
map操作符无状态, 1 :1 地产生元素
filter 操作符无状态, 1 : 0 ... 1 地产生元素
public static <T, R> Gatherer<T, ?, R> map(final Function<T, R> mapper) {
return Gatherer.of((notused, element, downstream) -> downstream.push(mapper.apply(element)));
}
public static <T> Gatherer<T, ?, T> filter(final Predicate<T> predicate) {
return Gatherer.of((notused, element, downstream) -> {
if (predicate.test(element)) {
return downstream.push(element);
}
return true;
});
}
和其他的类库对比
def statefulMap[S, T](
create: function.Creator[S],
f: function.Function2[S, Out, Pair[S, T]],
onComplete: function.Function[S, Optional[T]]): javadsl.Flow[In, T, Mat]
“操作符”地址:
public static <T> Source<Pair<T, Integer>, NotUsed> zipWithIndex(final Source<T, NotUsed> source) {
return source.statefulMap(
() -> 0,
(state, element) -> Pair.create(state + 1, Pair.create(element, state)),
notused -> Optional.empty()
);
}
public static <T> Stream<Pair<T, Integer>> zipWithIndex(final Stream<T> stream) {
class State {
int index;
}
return stream.gather(Gatherer.ofSequential(
State::new,
(state, element, downstream) -> downstream.push(Pair.create(element, state.index++))
));
}
自带的 gathers 分析
mapConcurrent
public static <T, R> Gatherer<T,?,R> mapConcurrent(
final int maxConcurrency, //限制:最大并行度
final Function<? super T, ? extends R> mapper) //虚拟线程中执行的转换
整个实现非常的简单:
public static <T, R> Gatherer<T,?,R> mapConcurrent(
final int maxConcurrency,
final Function<? super T, ? extends R> mapper) {
class State {
final ArrayDeque<Future<R>> window =
new ArrayDeque<>(Math.min(maxConcurrency, 16));
//使用信号量,不需要复杂的判断逻辑
final Semaphore windowLock = new Semaphore(maxConcurrency);
final boolean integrate(T element,
Downstream<? super R> downstream) {
if (!downstream.isRejecting())
createTaskFor(element);
return flush(0, downstream);
}
final void createTaskFor(T element) {
//阻塞等待permit,这里不是虚拟线程
windowLock.acquireUninterruptibly();
var task = new FutureTask<R>(() -> {
try {
return mapper.apply(element);
} finally {
//处理完成后释放信号量permit
windowLock.release();
}
});
var wasAddedToWindow = window.add(task);
//使用虚拟线程来执行具体的任务
Thread.startVirtualThread(task);
}
final boolean flush(long atLeastN,
Downstream<? super R> downstream) {
//....省略很多代码,将结果值推送给下一个处理节点
downstream.push(current.get());
}
}
return Gatherer.ofSequential(
State::new,
Integrator.<State, T, R>ofGreedy(State::integrate),
(state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
);
}
我们可以看到:利用 虚拟线程 来并发的执行 mapper, 并结合 信号量来实现 maxConcurrency的限制。整个实现非常简单,感兴趣的同学可以对比下 Reactor-core 中 flatmap和 Pekko-Stream 中 mapAsync的实现。
fold
public static <T, R> Gatherer<T, ?, R> fold(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> folder) {
class State {
R value = initial.get(); //初始状态,记录聚合结果值
State() {}
}
return Gatherer.ofSequential(
State::new,
Integrator.ofGreedy((state, element, downstream) -> {
state.value = folder.apply(state.value, element);
return true;
}),
//流处理结束,返回结果值给流的下一个处理节点
(state, downstream) -> downstream.push(state.value)
);
}
同样将状态保持在了 局部的 State类中,并且在结束时,调用了 finisher返回的方法,将最终的值推送给了流的下一个处理节点。因为是 fold方法不一定满足 结合律,所以上面使用的是 Gatherer.ofSequential, 来保证串行执行。同时,Stream Gather API 也支持多个 gather 之间组合,相当于其他库中的 fuse ,继而提高性能。
六、未来展望和小结
在底层技术上,我们具备深厚的Android和iOS底层技术积累,拥有丰富的编译器、链接器、解释器技术应用实践。
在研发模式上,我们负责原生研发模式DX演进,服务数千开发者、承载数百亿日PV,深耕系统原生渲染技术,致力于建立下一代终端研发模式。
在网络技术上,我们在终端网络、传输和超大规模网关有深厚技术积累,负责开源方案XQUIC/Tengine,承载亿级长连和千万级QPS;在国际IETF标准、顶会SIGCOMM均有建树。
在终端技术上,我们打造领先行业的移动技术产品,涵盖多端架构、性能体验、组件框架、用户增长等关键领域,致力于移动端系统及厂商特性前沿探索。
在后端技术上,我们负责移动基础设施,有百万级QPS API网关、消息/推送、Serverless平台、自适应流控等柔性高可用解决方案。打造覆盖移动App全生命周期工程技术平台。
在跨端技术上,我们负责Weex2.0和核心Web容器,研究领域涉及W3C标准、WebKit内核、脚本引擎和自绘渲染引擎,面向Web标准提供一流跨端能力。通过卡片级小部件和小游戏技术,丰富创意供给,提供差异化的购物体验。
在前端技术上,我们在前端框架、工程、低代码领域长期深耕,支撑大促营销ProCode、LowCode、NoCode跨端页面研发;配套前沿的页面托管;负责ICE、微前端等开源方案,致力于提供简单友好的研发体系。
微信扫码关注该文公众号作者