RxJavaリアクティブプログラミング (CodeZine BOOKS)を読んでいる。
RxJava 2が対象になっており、コードのほとんどでFlowable
が使われている。Reactive StreamsやBackpressureに関する説明も豊富で、これまでAndroidなどクライアントサイドでRxJavaを使っていた人が読む本というより、サーバーサイドで使いたいって人の方が合っている気がする。
また、Flowable
は同じくReactive Streams準拠でSpring Framework 5に同梱されるReactorのFlux
/ Mono
とほとんど同じなので、Reactorを学ぶという意味でも十分使える。
RxJava2のメイン開発者であるDavid KarnokはReactorにもcontributeしているし、Reactorのメイン開発者であるStephane Maldiniと頻繁に情報交換しているようなので、APIの使い方や考え方はとても似ている。
(同じReactive Streams準拠でもAkka StreamsはScalaなので読めない...)
実際に「Chapter04 FlowableとObservableのオペレータ」のコードのReactor版を書いてみた。
https://github.com/making/reactor-reactive-programming
一部、Reactorが対応していないメソッドはあるが、サンプルコードのほとんどがそのままReactorにポートできた。
この本をReactorで写経すればFlux
のちょうど良い練習になる。
写経して感じたRxJava2に対するReactorのメリットは以下3点
ReactorはJava8ベースなのでjava.util.function.*
やjava.time.Duration
が使える。
RxJava2はAndroidユーザーもターゲットとなっており、Java 6に対応しているが、ReactorはJava 8必須となっている。
そのため、Reactorではjava.util.function.Function
やjava.util.function.Consumer
、java.util.function.Supplier
など、Java標準の関数クラスをそのまま使える。RxJava2ではio.reactivex.functions.Function
やio.reactivex.functions.Consumer
が用意されている。
また、RxJava2では
Flowable<Long> flowable = Flowable.interval(100, TimeUnit.MILLISECONDS);
と書くところをRactorでは
Flux<Long> flux = Flux.interval(Duration.ofMillis(100));
と書ける。ほんのちょっとの違いだけれど、間隔を指定することが多いので、Duration
を使えるのは嬉しい。
書籍では扱われていないが、Reactorではjava.util.concurrent.CompletionStage
/ CompletableFuture
からMono
への変換やjava.util.stream.Stream
からFlux
への変換もサポートされている。Java 8ベースというのは大きい。
一方、java.util.function
を使うことにはデメリットがある。ラムダ内で例外ハンドリングしないといけない点である。
RxJava2の関数クラスにはthrow Exception
が付いているため、そのままスローしてRxJava側でハンドリングすれば良い。
Reactorの場合、ラムダ内のチェック例外はcatchして処理するかre-throwする必要がある。
ReactorにはTuple
があって、zip
メソッドなどの返り値に使える。
ReactorにはTuple
が用意されている。
RxJava2で
Flowable<Long> flowable1 = Flowable.interval(300, TimeUnit.MILLISECONDS).take(5);
Flowable<Long> flowable2 = Flowable.interval(300, TimeUnit.MILLISECONDS).take(3).map(data -> data + 100);
Flowable<List<Long>> result = Flowable.zip(flowable1, flowable2, (data1, data2) -> Arrays.asList(data1, data2));
というようにzipper
関数を渡す必要があるが、Reactorでは
Flux<Long> flux1 = Flux.interval(Duration.ofMillis(300)).take(5);
Flux<Long> flux2 = Flux.interval(Duration.ofMillis(500)).take(3).map(data -> data + 100);
Flux<Tuple2<Long, Long>> result = Flux.zip(flux1, flux2);
というように複数のFlux
をzip
に渡してそのまま返せる。もちろん通常はその後map
でデータ変換するので、zipper
側で変換するか、一旦Tuple
で返してmap
で変換するかの違いではあるが、個人的にはTuple
の方が好き。
Reactive Streamsに完全対応で用途別にFlux
/ Mono
がある
ReactorはReactive Streamsに準拠したクラスが2つあり、0件または1件のデータを扱うためのMono
とn(>=0)件のデータを扱うFlux
と役割が明確に分かれている。
RxJava2ではReactiveStreamsに準拠したクラスはFlowable
のみであり、1件のデータを扱うSingle
と0件または1件のデータを扱うMaybe
はReactive Streamsに対応していない。(ちなみにRxJava1由来でn件のデータを扱うがBackpressureに対応していないObservable
もある。主にクライアント用途だと思われる。)
例えばRxJava2ではIterable
から変換する場合もCallable
から変換する場合もFlowable
である。
Flowable<String> flowable1 = Flowable.fromIterable(Arrays.asList("A", "B", "C"));
Flowable<Long> flowable2 = Flowable.fromCallable(() -> System.currentTimeMillis());
Callable
の返り値は1件にしかなりえないため、ReactorではCallable
からFlux
を作るメソッドは用意されておらずMono
側で提供される。
Flux<String> flux = Flux.fromIterable(Arrays.asList("A", "B", "C"));
Mono<Long> mono = Mono.fromCallable(() -> System.currentTimeMillis());
返り値が一件の場合、ReactorはMono
を返すが、RxJava2はSingle
を返す。
// RxJava2
Single<Long> single = Flowable.just("a", "b", "c").count();
// Reactor
Mono<Long> single = Flux.just("a", "b", "c").count();
Single
はReactive Streams準拠ではない、つまりorg.reactivestreams.Publisher
を実装していないため、互換性のためにPublisher
を引数にしているメソッドにそのまま渡すことができない。
Java8が使える環境でRxJavaからの以降を考える必要がなければReactorの方が使いやすいと思う。また、Springユーザーは必然的にReactorを選択することになる。
とはいえRxJavaリアクティブプログラミングはReactorを勉強するのにも十分役に立つし、日本語で読めるのはとてもありがたい。
Chapter1やChapter3はReactive StreamsやBackpressure、スレッドの切り替えなどが詳しく書かれていてとても良いが、この手の技術を初めて使う人がいきなり読むには難しいかもしれないので、Chapter4のサンプルコードを写経してRxAPIを十分体験してからChapter1に戻った方が良さそう。
Spring 5でReactorに触れることになるであろう開発者にとってありがたい一冊になりそう。