本篇文章旨在介绍一些不常用但又非常有用的Reactor Operator

Tip 1: filterWhen

与Java内置的Stream API类似,Reactor也提供了fitler操作符,用于按照某些条件来过滤元素,但该操作符并不能很好处理返回值是Publisher<Boolean>的Predicate,例如希望判断某个key是否在Redis中存在,且使用Reactive的形式访问Redis,类似下面的code,

1
2
3
4
5
6
public Mono<Boolean> keyExists(String key) {

ReactiveStringRedisTemplate template = //injected by container;
return template.hasKey(key);

}

这时,filterWhen操作符就非常有用,其方法签名为:

1
public final Flux<T> filterWhen(Function<? super T,? extends Publisher<Boolean>> asyncPredicate)

我们可以看到用于过滤的函数由一个简单的Predicate变成了一个返回值为Publisher<Boolean>Function,对于上面的判断,处理起来就很自然了,

1
2
3
4
5
6
ReactiveStringRedisTemplate template = //injected by container;

Flux.just("key1", "key2", "key3")
.map(key -> "key_prefix:" + key)
.filterWhen(template::hasKey)
.subscribe();

以下几点需要注意:

  • Publisher的元素为empty时,等同于false;
  • PublisherFlux时,只有第一个元素参与比较,其他元素将被忽略;

Tip 2: expand

我们在日常处理各种程序逻辑时,经常会处理分页或者递归的数据,此类数据在Imperative编程模式下很容易处理,那么如何在Reactove编程模式下处理呢?其实Reactor为我们提供了expand方法,可以方便地处理类似数据,其方法签名如下,

1
public final Flux<T> expand(Function<? super T,? extends Publisher<? extends T>> expander)

我们只需要提供一个具备分页或者递归边界的Function就可以了,expand会自动帮我们把数据整理到Flux中。假设我们有这样的一个方法,用于递归(分页可以理解为递归的一种特殊形式)地加载数据,

1
2
3
4
5
6
7
8
private Mono<String> composeRecursively(String key) {
//A -> AA -> AAA
if (key.length() >= 3) {
return Mono.empty();
} else {
return Mono.just(key + key.charAt(0));
}
}

注意,以上代码中,’key.length() >= 3’就是所谓的递归边界。当使用expand方法按照上面的逻辑处理"A", "B", "C"数据时,

1
2
3
Flux.just("A", "B", "C")
.expand(this::composeRecursively)
.subscribe(log::info);

输出结果为,

1
2
3
4
5
6
7
8
9
A
B
C
AA
BB
CC
AAA
BBB
CCC

我们都知道,递归的本质是对树的遍历,从上面的执行结果,我们可以看到,expand是按照广度优先来遍历树的,那么如果我们的程序逻辑是顺序敏感的,且希望使用深度优先来遍历呢?Reactor为我们提供了expandDeep方法,其防方法名如下,

1
public final Flux<T> expandDeep(Function<? super T,? extends Publisher<? extends T>> expander)

当我们使用该方法处理上面的’composeRecursively’数据时,

1
2
3
Flux.just("A", "B", "C")
.expandDeep(this::composeRecursively)
.subscribe(log::info);

输出结果变为,

1
2
3
4
5
6
7
8
9
A
AA
AAA
B
BB
BBB
C
CC
CCC

由此可见,深度优先遍历的目的就达到了。

Tip 3: transform & as

transformas都属于Mutualization(字面含义为“相互化”)操作,这类操作与mapflatMap等最大的区别为,前者的操作对象是Publisher中的元素,而后者操作的是Publisher本身
首先我们看一下transform的方法签名,

1
public final <V> Flux<V> transform(Function<? super Flux<T>,? extends Publisher<V>> transformer)

我们可以看到作为参数的Function,其入参出参皆为Flux,而不是元素,那么我们就可以在这个Function中完成对Flux的处理或变换,

1
2
3
4
5
6
7

Function<Flux<Integer>, Flux<String>> mapAndApplySchedulers =
f -> f.map(String::valueOf)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel());

Flux.just(1, 2, 3).transform(mapAndApplySchedulers).subscribe();

由此可见,当我们需要处理的是Flux本身而不是其中的元素的时候,transform是最合适的选择。

我们再来看as,其方法签名为,

1
public final <P> P as(Function<? super Flux<T>,P> transformer)

transform相比,其Function输出参数由Flux变成了一个P,不再限定为Publisher,而整个函数的返回值也不再限定为Flux,这样的逻辑使得as可以作为衔接多步Publisher操作并保持方法流式调用(fluent)风格的不二选择,请看下面的例子,

1
2
3
4
5
Flux<Integer> i = Flux.range(1, 10)
.map(i -> i + 2)
.map(i -> i * 10);
Mono<String> s = MathFlux.sumInt(i)
.map(isum -> "sum=" + isum);

MathFlux.sumInt接收一个Flux<Integer>作为参数进行后续运算,多个步骤导致这种调用方式破坏了流式调用(fluent)风格,那么使用as可以解决,流式调用(fluent)风格得以保留,

1
2
3
4
5
Mono<String> s =  Flux.range(1, 10)
.map(i -> i + 2)
.map(i -> i * 10)
.as(MathFlux::sumInt)
.map(isum -> "sum=" + isum);

再看一个在Spring Webflux里经常出现的场景,

1
2
3
4
5
public Mono<ServerResponse> handle(ServerRequest request) {
var result = Flux.just("A", "B", "C")
.map(String::toLowerCase);
return ServerResponse.ok().body(result, String.class);
}

以上代码中,在第一步,产生了作为response body的result,在第二步再组装成了ServerResponse流式调用(fluent)风格遭到破坏,可以使用as处理,

1
2
3
return Flux.just("A", "B", "C")
.map(String::toLowerCase)
.as(f -> ServerResponse.ok().body(f, String.class));

当然,使用transform亦可解决问题,就是稍显冗长一些,

1
2
3
4
return Flux.just("A", "B", "C")
.map(String::toLowerCase)
.transform(f -> ServerResponse.ok().body(f, String.class))
.next();

以上代码对比,在某些场景下,我们可以把as理解为transform + next的简写方式。

Tip 4: using

Reactor提供及其丰富的API来创建一个Publisher,其中有很多方法用于桥接非Reactive的数据类型,之中比较有用的是fromStream,通过使用该API,可以非常容易的创建一个Publisher,由于Stream的懒运行特性,使得Stream作为imperative编程风格下的产物,却具备了先天的Reactive编程风格的适配能力。例如下面的代码,

1
2
3
Flux.fromStream(Stream.of("A", "B", "C"))
.map(String::toLowerCase)
.subscribe();

但这里又一个陷阱,并不是所有的Stream在使用后都不需要close,很多与IO或者下层资源有关的Stream在使用后是需要显式关闭的,或者采用等效的隐式方式关闭,例如try-with-resources。有两个非常典型的这一类Stream的例子,一个是Files.lines(Path path)返回的Stream,另一个就是在String Data JPA的Repo中返回的Stream。假设我们需要从文件中读取每一行并作为Flux的源,为了保证读取文件而产生的Stream可以被正确关闭,代码上要做额外的处理,而不只是使用Flux.fromStream

1
2
3
4
5
6
var lines = Files.lines(Paths.get("test.txt"));
Flux.fromStream(lines)
.filter(s -> s.length() > 10)
.map(String::toLowerCase)
.doFinally(signalType -> lines.close())
.subscribe();

首先肯定,以上代码可以达到目的,但仍有不足之处:

  1. 破坏了流式调用(fluent)风格;
  2. 在使用副作用(side-effect)来处理Publisher外部的资源。

那么如何可以比较优雅地关闭Stream而又避免上面的两个不足呢?答案就是using,其方法签名如下,

1
2
3
public static <T,D> Flux<T> using(Callable<? extends D> resourceSupplier,
Function<? super D,? extends Publisher<? extends T>> sourceSupplier,
Consumer<? super D> resourceCleanup)

我们可以看到,using方法包含3个参数,resourceSupplier表示一个数据源,例如产生Stream的各种方式;sourceSupplier代表一个使用该数据源产生Publisher的一个函数,例如Flux.fromStream(),最后一个resourceCleanup代表一个清理回调,可以清理由resourceSupplier产生的数据源。OK,一切水到渠成啦,用using改写文件读取的例子,如下面的代码所示,

1
2
3
4
Flux.using(() -> Files.lines(Paths.get("test.txt")), Flux::fromStream, BaseStream::close)
.filter(s -> s.length() > 10)
.map(String::toLowerCase)
.subscribe();

如代码所以,流式调用(fluent)风格得以保留,也避免了副作用(side-effect)using还有更复杂的方式,usingWhen,请自行参考文档。

总结

Reactor所提供的API及其丰富,如套用Java StreamAPI的部分理念,对于大部分API,相对容易理解,而真正难于掌握的,是在Reactive编程风格的大前提下,产生的Reactive独有的方法及其操作风格。本篇文章只是介绍了在日常的使用过程中,沉淀下来的几个例子,虽不常用,但在某些场景下,却非常有用,甚至不容易被其他方式替代。相信在项目的驱动下,在使用Reactive编程风格及Reactor API的过程中,还会遇到新的问题和挑战,如有积累,会再找机会与大家分享。

参考资料

  1. compose() vs. transform() vs. as() vs. map() in Flux and Mono
  2. Reactor - Using transform Operation
  3. File Reading in Reactor