本篇文章旨在介绍一些不常用但又非常有用的Reactor Operator。
Tip 1: filterWhen
与Java内置的Stream API类似,Reactor
也提供了fitler操作符,用于按照某些条件来过滤元素,但该操作符并不能很好处理返回值是Publisher<Boolean>的Predicate,例如希望判断某个key是否在Redis中存在,且使用Reactive的形式访问Redis,类似下面的code,
1 | public Mono<Boolean> keyExists(String key) { |
这时,filterWhen操作符就非常有用,其方法签名为:
1 | public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate) |
我们可以看到用于过滤的函数由一个简单的Predicate变成了一个返回值为Publisher<Boolean>的Function,对于上面的判断,处理起来就很自然了,
1 | ReactiveStringRedisTemplate template = //injected by container; |
以下几点需要注意:
Tip 2: expand
我们在日常处理各种程序逻辑时,经常会处理分页或者递归的数据,此类数据在Imperative编程模式下很容易处理,那么如何在Reactove编程模式下处理呢?其实Reactor
为我们提供了expand方法,可以方便地处理类似数据,其方法签名如下,
1 | public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander) |
我们只需要提供一个具备分页或者递归边界的Function就可以了,expand会自动帮我们把数据整理到Flux中。假设我们有这样的一个方法,用于递归(分页可以理解为递归的一种特殊形式)地加载数据,
1 | private Mono<String> composeRecursively(String key) { |
注意,以上代码中,’key.length() >= 3’就是所谓的递归边界。当使用expand
方法按照上面的逻辑处理"A", "B", "C"
数据时,
1 | Flux.just("A", "B", "C") |
输出结果为,
1 | A |
我们都知道,递归的本质是对树的遍历,从上面的执行结果,我们可以看到,expand
是按照广度优先来遍历树的,那么如果我们的程序逻辑是顺序敏感的,且希望使用深度优先来遍历呢?Reactor
为我们提供了expandDeep方法,其防方法名如下,
1 | public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander) |
当我们使用该方法处理上面的’composeRecursively’数据时,
1 | Flux.just("A", "B", "C") |
输出结果变为,
1 | A |
由此可见,深度优先遍历的目的就达到了。
Tip 3: transform & as
transform和as都属于Mutualization(字面含义为“相互化”)操作,这类操作与map、flatMap等最大的区别为,前者的操作对象是Publisher
中的元素,而后者操作的是Publisher
的本身。
首先我们看一下transform的方法签名,
1 | public final <V> Flux<V> transform(Function<? super Flux<T>,? extends Publisher<V>> transformer) |
我们可以看到作为参数的Function,其入参出参皆为Flux,而不是元素,那么我们就可以在这个Function
中完成对Flux
的处理或变换,
1 |
|
由此可见,当我们需要处理的是Flux
本身而不是其中的元素的时候,transform
是最合适的选择。
我们再来看as,其方法签名为,
1 | public final <P> P as(Function<? super Flux<T>,P> transformer) |
与transform
相比,其Function
输出参数由Flux
变成了一个P
,不再限定为Publisher
,而整个函数的返回值也不再限定为Flux
,这样的逻辑使得as
可以作为衔接多步Publisher
操作并保持方法流式调用(fluent)风格的不二选择,请看下面的例子,
1 | Flux<Integer> i = Flux.range(1, 10) |
MathFlux.sumInt
接收一个Flux<Integer>
作为参数进行后续运算,多个步骤导致这种调用方式破坏了流式调用(fluent)风格,那么使用as
可以解决,流式调用(fluent)风格得以保留,
1 | Mono<String> s = Flux.range(1, 10) |
再看一个在Spring Webflux
里经常出现的场景,
1 | public Mono<ServerResponse> handle(ServerRequest request) { |
以上代码中,在第一步,产生了作为response body的result,在第二步再组装成了ServerResponse
,流式调用(fluent)风格遭到破坏,可以使用as
处理,
1 | return Flux.just("A", "B", "C") |
当然,使用transform
亦可解决问题,就是稍显冗长一些,
1 | return Flux.just("A", "B", "C") |
以上代码对比,在某些场景下,我们可以把as
理解为transform + next
的简写方式。
Tip 4: using
Reactor
提供及其丰富的API来创建一个Publisher
,其中有很多方法用于桥接非Reactive的数据类型,之中比较有用的是fromStream,通过使用该API,可以非常容易的创建一个Publisher
,由于Stream
的懒运行特性,使得Stream
作为imperative
编程风格下的产物,却具备了先天的Reactive
编程风格的适配能力。例如下面的代码,
1 | Flux.fromStream(Stream.of("A", "B", "C")) |
但这里又一个陷阱,并不是所有的Stream
在使用后都不需要close
,很多与IO或者下层资源有关的Stream
在使用后是需要显式关闭的,或者采用等效的隐式方式关闭,例如try-with-resources
。有两个非常典型的这一类Stream
的例子,一个是Files.lines(Path path)返回的Stream
,另一个就是在String Data JPA的Repo中返回的Stream
。假设我们需要从文件中读取每一行并作为Flux
的源,为了保证读取文件而产生的Stream
可以被正确关闭,代码上要做额外的处理,而不只是使用Flux.fromStream
。
1 | var lines = Files.lines(Paths.get("test.txt")); |
首先肯定,以上代码可以达到目的,但仍有不足之处:
- 破坏了流式调用(fluent)风格;
- 在使用副作用(side-effect)来处理
Publisher
外部的资源。
那么如何可以比较优雅地关闭Stream
而又避免上面的两个不足呢?答案就是using,其方法签名如下,
1 | public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier, |
我们可以看到,using
方法包含3个参数,resourceSupplier
表示一个数据源,例如产生Stream
的各种方式;sourceSupplier
代表一个使用该数据源产生Publisher
的一个函数,例如Flux.fromStream()
,最后一个resourceCleanup
代表一个清理回调,可以清理由resourceSupplier
产生的数据源。OK,一切水到渠成啦,用using
改写文件读取的例子,如下面的代码所示,
1 | Flux.using(() -> Files.lines(Paths.get("test.txt")), Flux::fromStream, BaseStream::close) |
如代码所以,流式调用(fluent)风格得以保留,也避免了副作用(side-effect)。using
还有更复杂的方式,usingWhen,请自行参考文档。
总结
Reactor
所提供的API及其丰富,如套用Java Stream
API的部分理念,对于大部分API,相对容易理解,而真正难于掌握的,是在Reactive
编程风格的大前提下,产生的Reactive
独有的方法及其操作风格。本篇文章只是介绍了在日常的使用过程中,沉淀下来的几个例子,虽不常用,但在某些场景下,却非常有用,甚至不容易被其他方式替代。相信在项目的驱动下,在使用Reactive
编程风格及Reactor API
的过程中,还会遇到新的问题和挑战,如有积累,会再找机会与大家分享。