[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
SlideShare a Scribd company logo
Copyright©2016 NTT corp. All Rights Reserved.
Spring 5に備える
リアクティブプログラミング入門
2016年11月18日
岩塚 卓弥,堅田 淳也
NTT ソフトウェアイノベーションセンタ
2Copyright©2016 NTT corp. All Rights Reserved.
• 名前:岩塚 卓弥
• 所属:NTT ソフトウェアイノベーションセンタ
• NTTの研究所のうちソフトウェアを専門に扱う
• 自部署ではソフトウェア工学を研究
• Springベースのグループ共通フレームワークの整備を担当
• Spring関連:
• Spring I/O, SpringOne それぞれ2015,2016に参加
• 改訂新版 Spring入門,Spring徹底入門 レビュアー
• JSUG幹事
Introduction
3Copyright©2016 NTT corp. All Rights Reserved.
• 名前:堅田 淳也
• 所属:NTT ソフトウェアイノベーションセンタ
• NTTの研究所のうちソフトウェアを専門に扱う
• 自部署ではソフトウェア工学を研究
• Springベースのグループ共通フレームワークの整備を担当
• Spring経験:
• 元SIerで、プロジェクトへのSpring適用支援などを担当
• 使ったことのあるバージョン:Spring 1, 3, 4, (5)
Introduction
4Copyright©2016 NTT corp. All Rights Reserved.
Spring Framework 5.0
M1
M2
M3 M4
GA
2016/6
2016/9
2016/12
2017 1Q
• ベースラインのアップグレード
• JDK 8+, Servlet 3.1+, JMS 2.0+, JPA 2.1+, JUnit 5
• コアコンテナのオーバーホール
• 柔軟でプログラマティックなBean登録と解決
• 効率的なウェブアプリケーション
• Reactive Streamsベースのwebコントローラ
• ラムダ式によるHTTPルーティングと処理
2016/11
5Copyright©2016 NTT corp. All Rights Reserved.
Spring Framework 5.0 / Reactor
spring-web-reactive reactor-core
実装
依存
依存
For Reactive ProgrammingFor Reactive Web
Spring Framework 5.0 Project Reactor
6Copyright©2016 NTT corp. All Rights Reserved.
What is reactive program
Transformational program
始めに与えられた入力値を使って結果を出力
Interactive program
必要なタイミングでユーザや他のプログラムとやりとり
Reactive program
入力タイミングをコントロールできないソースに対応して動作
[G. Berry 1989] による分類
Program入力 出力
Program
入力A 次は入力Bをくれ
入力B
Program
ソースA
ソースB
ソースの変化に
対応して変化
出力
出力
7Copyright©2016 NTT corp. All Rights Reserved.
Example (Reactive program)
22℃
℃ → °F
Program
71.6 °F
リアクティブシステム
与えられた物理環境に対して
連続的にやりとりするシステム
8Copyright©2016 NTT corp. All Rights Reserved.
Example (Reactive program)
21.5℃
℃ → °F
Program
70.7 °F
リアクティブシステム
与えられた物理環境に対して
連続的にやりとりするシステム
9Copyright©2016 NTT corp. All Rights Reserved.
Example (Reactive program)
21℃
℃ → °F
Program
69.8 °F
リアクティブシステム
与えられた物理環境に対して
連続的にやりとりするシステム
10Copyright©2016 NTT corp. All Rights Reserved.
Invading namespace : Reactive System
The Reactive Manifesto* におけるリアクティブシステム
従来リアクティブシステムと呼ばれてきたものとは別物
名前の乗っ取りは良くない!
* http://www.reactivemanifesto.org/
11Copyright©2016 NTT corp. All Rights Reserved.
Classification of language for
reactive programming
(Synchronous / Data flow) programming
• (本来の意味での)リアクティブシステムを記述する言語
• 実時間の制約を持つ
• 入力の頻度や入出力のレスポンス時間に関する制約
Functional reactive programming(FRP)
• 実時間の制約は無い
• BehaviorとEvent,Switching combinatorがプリミティブと
して提供される(後述)
Cousins of reactive programming
• 実時間の制約もBehaviorも無い
• 値の変更の伝播など,その他のRPの特徴のみを持つ
• Reactive Streamsはここに分類される
[E. Bainomugisha et al. 2013] による分類
12Copyright©2016 NTT corp. All Rights Reserved.
• 「関数型言語の上で実装されたRP=FRP」ではない
• Javaで実装されたFRPもある
• Frappé [A. Courtney 2001]
• Eventだけを扱うRP(≠FRP)が昨今の流行り
• イベントドリブンなアーキテクチャを実現するためのRP
Behavior,Event,Switching combinator
Behavior Event Switching combinator
時間で連続変化する値
例:マウスポジション
Behavior mouseX
Behavior mouseY
値変化の(無限)列
データフローを動的に
切り替えるための連結子
例:キー入力
a b c
Event keyStream
例:マウスクリックで描画
色の切替え
right
left
Behavior color =
switching(
rc -> blue, lc -> red)
* 文法は適当
13Copyright©2016 NTT corp. All Rights Reserved.
Observer Pattern
Subject Observer
observe
notify
Q. イベントドリブンで値の変更の伝搬とかObserver
パターンで実現できるのでは.
A. できます.
非常に雑に言うとObserverパターンの強いやつが
昨今の流行りです.
* http://reactivex.io
*
14Copyright©2016 NTT corp. All Rights Reserved.
Push based / Pull based
reactive programming
Producer Consumer
Producer Consumer
PushベースのRP
PullベースのRP
新しい値を
片っ端から送信 頑張って処理
必要なときに
欲しい分だけ要求
Consumerの
要求に従って送信
✔ Producerで値が生成されてからConsumerが反応するまでのレイテンシが短い
✘ Consumerの処理能力を超えて値が送信されてくる可能性がある
✔ 必要なとき(Consumerが処理可能な場合)にだけ新しい値を取得できる
✘ Producerで値が生成されてからConsumerが反応するまで時間がかかる
15Copyright©2016 NTT corp. All Rights Reserved.
Reactive Streams
• 非同期ストリーム処理
• ノンブロッキングなバックプレッシャ
• バックプレッシャ:受信側による送信制御を行う仕組み*
• 標準を提供
• 最低限のメソッドを備えたインタフェースと,仕様のみを提供
• Publisher, Subscriber, Subscription, Processor
• 実用上必要なメソッドは各実装(Reactor他)が提供
* 元々はネットワーク用語.本来の用法から拡大されて使用されている
.http://moccosblue.blogspot.jp/2015/05/translatebackpressure.html
ノンブロッキングなバックプレッシャを備えた
非同期ストリーム処理の標準を提供するための提案 (直訳)
c.f. パルスのファルシのルシがコクーンでパージ
Producer Consumer
この間のやりとりをノンブロッキングに
それぞれ非同期に処理できる
16Copyright©2016 NTT corp. All Rights Reserved.
Publisher, Subscriber, Subscription
Subscription Subscriber<T>
request(long n)
Publisher<T> onSubscribe(Subscription s)
onNext(T t) × (0 〜 n)
onComplete() / onError(Throwable e)
…
subscribe(Subscriber<? super T> s)
• 各メソッドの戻り値の型はすべてvoid
• Subscriberは送られてきたシグナルに対応して動作
• PushベースRP / PullベースRP の両方を実現可能
• request(Long.MAX_VALUE) とするとPushベースRPになる
• Processor = Publisher ∧ Subscriber
17Copyright©2016 NTT corp. All Rights Reserved.
• Reactive Streamsの各インタフェースを提供
• JDK9から利用可能
• Reactive Streamsとは別物扱いなので,相互変換が必要
java.util.concurrent.Flow
18Copyright©2016 NTT corp. All Rights Reserved.
Project Reactor
Reactor Core Reactor IPC Reactor Add-ons
・Reactor Core
・Reactor Core .Net
・Reactor Core JS
・Reactor Netty
・Reactor Kafka
・Reactor Aeron
・Reactor Adapter
・Reactor Test
・Reactor Logback
Reactive Streamsの実装
JDK9のFlowとの変換
エンコード / デコード
通信(UDP / TCP / HTTP)
他実装との変換
その他便利系
Reactor Coreを中心とした一連のプロジェクト群
19Copyright©2016 NTT corp. All Rights Reserved.
Mono, Flux
Publisher<T>
Mono<T> Flux<T>
0個または1個の値を発行 0個以上の値を発行
• Reactor Coreで提供されるPublisherの実装
• 発行され得る値の数によって使い分ける
• subscribe以外にストリーム処理用の様々なメソッド
を提供
• filter, map, take, skip, …
20Copyright©2016 NTT corp. All Rights Reserved.
Flux.range(0, 10)
.delayMillis(1000)
.filter(n -> n%2 == 0)
.map(n -> n*2);
Example (Flux)
9876543210
0123456789
86420
1612840
delayMillis(1000)
filter(n -> n%2 == 0)
filter(n -> n*2)
21Copyright©2016 NTT corp. All Rights Reserved.
Implementation detail
Fluxのサブクラスのインスタンスが作成される.
クラスはパッケージプライベートなので普通は意識しないで良い.
FluxRange
FluxFilter
FluxMap
source
source
Flux.range(..).filter(..).map(..);
22Copyright©2016 NTT corp. All Rights Reserved.
Implementation detail
FluxRange
FluxFilter
FluxMap
Flux.range(..).filter(..).map(..).subscribe(..);
source
source
Subscriber
subscribe
23Copyright©2016 NTT corp. All Rights Reserved.
Implementation detail
FluxRange
FluxFilter
FluxMap
Flux.range(..).filter(..).map(..).subscribe(..);
source
source
actual
actual
Subscriber
subscribe
new
FluxFilter
.FilterSubscriber
FluxMap
.MapSubscriber
24Copyright©2016 NTT corp. All Rights Reserved.
Implementation detail
FluxRange
FluxFilter
FluxMap
Flux.range(..).filter(..).map(..).subscribe(..);
source
source
actual
actual
Subscriber
subscribe
new
FluxFilter
.FilterSubscriber
FluxMap
.MapSubscriber
FluxRange
.RangeSubscription
actual
actual
onSubscribe
s
s
s
25Copyright©2016 NTT corp. All Rights Reserved.
Implementation detail
FluxRange
FluxFilter
FluxMap
Flux.range(..).filter(..).map(..).subscribe(..);
source
source
actual
actual
Subscriber
subscribe
new
FluxFilter
.FilterSubscriber
FluxMap
.MapSubscriber
FluxRange
.RangeSubscription
actual
actual
onSubscribe
s
s
s request
26Copyright©2016 NTT corp. All Rights Reserved.
Implementation detail
FluxRange
FluxFilter
FluxMap
Flux.range(..).filter(..).map(..).subscribe(..);
source
source
actual
actual
Subscriber
subscribe
new
FluxFilter
.FilterSubscriber
FluxMap
.MapSubscriber
FluxRange
.RangeSubscription
actual
actual
onSubscribe
s
s
s request
onNext
27Copyright©2016 NTT corp. All Rights Reserved.
Don‘t return void
public void nonsense(Flux<Integer> s){
s.map(n -> n*n);
}
public Flux<Integer> ok(Flux<Integer> s){
return s.map(n -> n*n);
}
ダメな例
修正
「subscribeされない = 何も起こらない」
新しいFluxが生成されて捨てられるだけ
戻り値がどこかでsubscribeされればOK
28Copyright©2016 NTT corp. All Rights Reserved.
Subscriber for Reactive Web
@Controller
public Flux<Hoge> getHoges(){
…
return resultFlux;
}
誰がsubscribeしている?
Servlet 3.1利用の場合
ServletHttpHandlerAdapter#service
…
29Copyright©2016 NTT corp. All Rights Reserved.
Comparison from other types
CompletableFuture
Stream
Optional
✔ 非同期,ノンブロッキング
✔ 処理の合成
✘ Pushにしか対応できない
✘ ストリーム処理に対応できない
✔ ストリーム処理
✔ 処理の合成
✘ 非同期処理のためのAPIがない
✔ 0個または1個の値を扱う(ReactorのMonoと同様)
✔ 処理の合成
✘ 非同期処理のためのAPIがない
30Copyright©2016 NTT corp. All Rights Reserved.
• Reactorの祖先はReactive Extensions(Rx)
• Rxが最初に導入されたのは .NET
• 当時Microsoftに在籍していたErik Meijer氏らが設計
• Erik Meijer氏は関数型言語(主にHaskell)の研究者としても知ら
れている
• 関数型のプログラミングスタイルとは
• 副作用を極力避けるプログラミングスタイル
• そもそも実は「関数型言語」には明確な定義がない*
Functional programming style
Rxは関数型のプログラミングスタイルが活きるよう設計されている
* 東北大学 住井先生によるQiitaエントリ
http://qiita.com/esumii/items/ec589d138e72e22ea97e
31Copyright©2016 NTT corp. All Rights Reserved.
• 副作用を使用しない(=参照透明である)メリット
• バグを作り込みにくくなる
• 正しさを検証しやすくなる
• 関数の独立性が高くなる
Referential Transparency
「状態」を持たないことによる複雑性と依存性の排除
いつどこで実行されるか分からない処理では参照透明性は特に重要
Reactive Streamsにおいては…
・実際の計算はsubscribeされるまで遅延される
→ どのタイミングで実行されるか分からない
・非同期的に実行される可能性がある
→ どのスレッドで実行されるか分からない
32Copyright©2016 NTT corp. All Rights Reserved.
• ラムダ式
• 関数型インタフェースの実装を記述する構文(from Java 8)
• 名前をつけるまでもない小さな関数・述語等の記述に利用
• メソッド参照
• 関数型インタフェースの実装として既存のメソッドを渡すため
の構文(from Java 8)
• 既存のメソッドを高階関数に渡す際に利用
• 高階関数
• 関数を引数や戻り値とする関数 (map, filter, …)
• 関数を組み合わせて処理を記述するために利用
• タプル
• 順序を持った値の組
• 複数の値を返したい関数の定義等に利用
Tools for functional programming
Java
Reactor
33Copyright©2016 NTT corp. All Rights Reserved.
• 関数型言語で多く用いられている一種のデザインパターン
• 関数をチェーンさせて,順番に値に適用させていくことができる
• Mono / Flux はMonadになっている
• StreamやOptional, CompletableFutureもMonad
Monad
map
(a.k.a fmap)
just
(a.k.a unit, pure)
flatMap
(a.k.a bind)
Monad が備える関数
Monad則,Functor,Applicative等との関係等,細かい諸々は省略
→ 全部同じような考え方で取り扱える
34Copyright©2016 NTT corp. All Rights Reserved.
Example
public Flux<Item> findItems(int idxA, int idxB){
return Mono.when(serviceA.findOne(idxA),
serviceB.findOne(idxB))
.flatMap(p -> findItemsByXAndY(
p.getT1.getX(),
p.getT2.getY()));
}
public Mono<long> getTotal(int idxA, int idxB){
return findItems(idxA, idxB)
.map(Item::calculateScore)
.reduce((a, b) -> a+b);
}
35Copyright©2016 NTT corp. All Rights Reserved.
Example
public Flux<Item> findItems(int idxA, int idxB){
return Mono.when(serviceA.findOne(idxA),
serviceB.findOne(idxB))
.flatMap(p -> findItemsByXAndY(
p.getT1.getX(),
p.getT2.getY()));
}
public Mono<long> getTotal(int idxA, int idxB){
return findItems(idxA, idxB)
.map(Item::calculateScore)
.reduce(0, (a, b) -> a+b);
} 2つのMonoから,Monoのタプルを作る
36Copyright©2016 NTT corp. All Rights Reserved.
Example
public Flux<Item> findItems(int idxA, int idxB){
return Mono.when(serviceA.findOne(idxA),
serviceB.findOne(idxB))
.flatMap(p -> findItemsByXAndY(
p.getT1.getX(),
p.getT2.getY()));
}
public Mono<long> getTotal(int idxA, int idxB){
return findItems(idxA, idxB)
.map(Item::calculateScore)
.reduce((a, b) -> a+b);
}
ラムダ式
タプルからの値の取り出し
高階関数
37Copyright©2016 NTT corp. All Rights Reserved.
Example
public Flux<Item> findItems(int idxA, int idxB){
return Mono.when(serviceA.findOne(idxA),
serviceB.findOne(idxB))
.flatMap(p -> findItemsByXAndY(
p.getT1.getX(),
p.getT2.getY()));
}
public Mono<long> getTotal(int idxA, int idxB){
return findItems(idxA, idxB)
.map(Item::calculateScore)
.reduce((a, b) -> a+b);
} 高階関数 メソッド参照
38Copyright©2016 NTT corp. All Rights Reserved.
Example
public Flux<Item> findItems(int idxA, int idxB){
return Mono.when(serviceA.findOne(idxA),
serviceB.findOne(idxB))
.flatMap(p -> findItemsByXAndY(
p.getT1.getX(),
p.getT2.getY()));
}
public Mono<long> getTotal(int idxA, int idxB){
return findItems(idxA, idxB)
.map(Item::calculateScore)
.reduce((a, b) -> a+b);
}
ラムダ式
そこまでの関数の適用結果と次の要素を
繰り返し関数に適用していく高階関数 (畳み込み)
39Copyright©2016 NTT corp. All Rights Reserved.
• Reactorでは,特に指定しない限りsubscribeを呼び
だしたスレッドでシグナルを処理する
• 多くの場合それが一番パフォーマンスが良い
• 必要ならば明示的にバックグラウンド実行させる
• Reactor Coreに含まれるSchedulerを利用する
Parallel processing
Flux.just(“red”, “blue”, “green”)
.map(String::toUpperCase)
.subscribeOn(Schedullers.parallel())
.subscribe();
内部的に保持している複数のスレッドから
一つを選んでタスクを実行
40Copyright©2016 NTT corp. All Rights Reserved.
• すべての要素を並列実行させるには,すべての要素を
別々のPublisherに分配させる
Parallel processing
Flux.range(0, 100)
.flatMap(n ->
Mono.just(n*2)
.subscribeOn(Scheduler.parallel())
)
.subscribe();
別々のPublisherに分配
別のスレッドで実行
[2, 10, 18, 26, 34, 42, 50, …]
[0, 8, 6, 14, 22, 30, 38, …]
flatMap:
concatMap:
• 実行順序は非決定的になる
• flatMapの代わりにconcatMapを使えば順序が担保される
[0, 2, 4, 6, 8, 10, 12, …]
毎回結果が異なる
41Copyright©2016 NTT corp. All Rights Reserved.
• 非同期プログラミングはそもそも非常に複雑で難しい
• Rxは上手くデザインされているが,非同期プログラミ
ングの複雑さがすべて解消するわけではない
• 非決定性に起因する再現性の低いバグ
• デバッグが難しい
• テストが難しい
• 設計が難しい
• etc…
Complexity gain
→ 不必要にRPを導入することはデメリットの方が大きい
42Copyright©2016 NTT corp. All Rights Reserved.
Glitch
値の変更の伝搬の過程で一時的に不整合な状態になることがある
Flux<Integer> a = Flux.range(0,10);
Flux<Integer> b = a.map(n -> n*2);
Flux<Integer> c = Flux.combineLatest(
b, a, (n, m) -> n+m);
a 0 1 2 3 4 5 6 7 8 9
b 0 2 4 6 8 10 12 14 16 18
c 0 3 6 9 12 15 18 21 24 27
理想
現実
a 0 1 1 2 2 3 3 4 4 …
b 0 2 2 4 4 6 6 8 8 …
c 0 2 3 5 6 8 9 11 12 …
bの変更がaの変更より先にcに伝搬される
43Copyright©2016 NTT corp. All Rights Reserved.
• ICSE’16でRPのデバッグに関する研究が発表されている
• 対象はScala
• EclipseプラグインとしてRP用デバッガを実装
• この領域はまだ他にも解決すべき課題は色々ありそう
Research effort
依存関係を可視化
ブレークポイントの設定
[G. Salvaneschi, M. Mezini 2016]
44Copyright©2016 NTT corp. All Rights Reserved.
Spring Web Reactive
Spring Framework 5.0
45Copyright©2016 NTT corp. All Rights Reserved.
Spring Framework 5.0
Web MVC vs Web Reactive
@Controller, @RequestMapping
Spring MVC
Servlet API
Servlet Container
Spring Web Reactive
Reactive HTTP
Servlet 3.1,
Netty, Undertow
46Copyright©2016 NTT corp. All Rights Reserved.
Spring Framework 5.0
Web MVC vs Web Reactive
@Controller, @RequestMapping
Spring MVC
Servlet API
Servlet Container
Spring Web Reactive
Reactive HTTP
Servlet 3.1,
Netty, Undertow
spring-web-reactive.jarが
新規追加
Servlet 3.1 で追加されたNon-blocking I/O を利用
Servlet以外の環境にも対応
従来のMVCと同様の
アノテーションが使える
47Copyright©2016 NTT corp. All Rights Reserved.
• Spring Web MVC とは独立した存在
• しかし、多くのアルゴリズムはMVCと共通
• プログラミングモデルもMVCと変わらない
• @Controller, @RequestMapping, @RequestBody, etc…
• Reactive HTTP request/responseを処理
• リクエスト/レスポンスの読み書きをNon-Blockingで処理
• 少ないスレッド数でスケール
• 現在の実装状況など
• Spring Framework 5.0 M3 が 2016/11/8 にリリース
• まずはRESTにフォーカスして実装してきた
• HTMLレンダリング系も徐々に実装されてきている
Spring Web Reactive Framework
48Copyright©2016 NTT corp. All Rights Reserved.
• Spring Initializr
• http://start.spring.io
Getting start with Spring Boot
2.0.0(SNAPSHOT)を選択すると、
Reactive Webが追加可能になる
49Copyright©2016 NTT corp. All Rights Reserved.
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>
<!-- omit -->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-dependencies-web-reactive</artifactId>
<version>0.1.0.BUILD-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
50Copyright©2016 NTT corp. All Rights Reserved.
• ControllerからFluxやMonoが返せる
• Flux/Mono の subscribe は呼ばない
• subscribe するのは Spring
Reactive Web Controller
@RestController
public class UserController {
@Autowired
UserRepository userRepository;
@GetMapping("/find")
public Mono<User> find(@RequestParam("id") long id) {
return userRepository.findById(id);
}
@GetMapping("/listAdult")
public Flux<User> listAdult() {
// 20歳以上のユーザを返す
return userRepository.findAll()
.filter(u -> u.getAge() >= 20);
}
}
public interface UserRepository {
Mono<User> findById(long id);
Flux<User> findAll();
Mono<Void> save(User user);
}
/listAdult の結果
51Copyright©2016 NTT corp. All Rights Reserved.
• @ResponseBody(@RestController)メソッドの
戻り値として返せる型
Reactive Web Controller
※ “User”は任意のJavaBeanを表す
• Reactor
• Mono<User>
• Mono<Void>
• Flux<User>
• Flux<ServerSentEvent>
• RxJava
• Single<User>
• Observable<User>
• Flowable<User>
• Not reactive types
• User
• void
52Copyright©2016 NTT corp. All Rights Reserved.
• ダメな例
• スレッドをブロックするメソッドを使ってFlux/Monoから値を
取り出す
Reactive Web Controller
@GetMapping("/findBlocking")
public User findBlocking(@RequestParam("id") long id) {
return userRepository.findById(id).block();
}
@GetMapping("/listAdultBlocking")
public List<User> listAdult() {
List<User> list = new ArrayList<>();
// 20歳以上のユーザを返す
userRepository.findAll()
.filter(u -> u.getAge() >= 20)
.toIterable()
.forEach(list::add);
return list;
}
Userが返ってくるまでブロック!
FluxをIterableに変換
IterableからUserを取得するときにブロック!
53Copyright©2016 NTT corp. All Rights Reserved.
• Controllerの引数
• @RequestBody を付与することで、JSON(Jackson)や
XML(JAXB)を受け取れる
• 引数の型にはFlux/Mono が指定可能
• Monoを使わずに単体のBeanで受けることもできる
• @PathVariableや@RequestParam も使える
Reactive Web Controller
@RequestMapping("/helloMono")
public Mono<String> hello(@RequestBody Mono<User> user) {
return user.map(u -> "Hello " + u.getName() + "!!");
}
@RequestMapping("/hello")
public Mono<String> hello(@RequestBody User user) {
return Mono.just("Hello " + user.getName() + "!!");
}
Mono<User>じゃなくてもブロッキング
にはならない
54Copyright©2016 NTT corp. All Rights Reserved.
• @RequestBody が付与された引数に指定可能な型
Reactive Web Controller
※ “User”は任意のJavaBeanを表す
• Reactor
• Mono<User>
• Flux<User>
• RxJava
• Single<User>
• Observable<User>
• Not reactive type
• User
55Copyright©2016 NTT corp. All Rights Reserved.
• @ModelAttribute 引数への対応
• 従来通り@ModelAttributeの省略も可能
Reactive Web Controller
@RequestMapping("/helloAnnotation")
public Mono<String> helloAnnotation(@ModelAttribute("user") Mono<User> user) {
return user.map(u -> "Hello " + u.getName());
}
@RequestMapping("/hello")
public Mono<String> hello(Mono<User> user) {
return user.map(u -> "Hello " + u.getName());
}
Spring 5.0 M3 から
POST /form/hello HTTP/1.1
Host: localhost:8080
Content-Type: application/x-www-form-urlencoded
id=10&name=taro&age=25
HTTPリクエスト
id= 10
name= “taro”
age= 25
:User
56Copyright©2016 NTT corp. All Rights Reserved.
• Bean Validation も使える
• 従来通り@Validatedによる検証が可能
• pom.xmlにstarterの追加が必要
Reactive Web Controller
Bean Validation
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
57Copyright©2016 NTT corp. All Rights Reserved.
• Monoに対して@Validatedを付与した場合は、検証失
敗時に例外が流れてくる
Reactive Web Controller
Bean Validation
@RequestMapping("/validate")
public Mono<String> formValidate(@Validated User user, BindingResult bindingResult) {
if (bindingResult.hasErrors()) {
return Mono.just("Error!");
}
}
@RequestMapping("/validateMono")
public Mono<String> formValidate(@Validated Mono<User> user) {
return user
.map(u -> "Hello " + u.getName())
.otherwise(WebExchangeBindException.class, e -> Mono.just("Error!"));
}
検証失敗時はWebExchangeBindExceptionが発生
58Copyright©2016 NTT corp. All Rights Reserved.
• ちなみに・・・
• Monoの引数に対してBindingResultの引数を追加すると
IllegalArgumentException発生
Reactive Web Controller
Bean Validation
// Not work!
@RequestMapping("/validateMonoWithBindingResult")
public Mono<String> form(@Validated Mono<User> user, BindingResult bindingResult) {
if (bindingResult.hasErrors()) {
・・・
Caused by: java.lang.IllegalArgumentException:
Errors/BindingResult cannot be used with an async model attribute. Either declare the model
attribute without the async wrapper type or handle WebExchangeBindException through the
async type.
at org.springframework.util.Assert.isNull(Assert.java:126) ~[spring-core-5.0.0.BUILD-
SNAPSHOT.jar:5.0.0.BUILD-SNAPSHOT]
…
例外メッセージ
59Copyright©2016 NTT corp. All Rights Reserved.
• HTML5のサーバプッシュ技術
• Springでは4.2から対応している
Reactive Web Controller
Server-Sent Events
@GetMapping("/connect")
public SseEmitter connect() {
SseEmitter sseEmitter = new SseEmitter();
sseEmitters.add(sseEmitter);
sseEmitter.onCompletion(() -> sseEmitters.remove(sseEmitter));
return sseEmitter;
}
@PostMapping("/send")
public void send(@RequestBody Message message) {
for (SseEmitter sseEmitter : this.sseEmitters) {
try {
sseEmitter.send(message, MediaType.APPLICATION_JSON);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Controllerメソッドから
SseEmitterを返す
Spring 4.3
60Copyright©2016 NTT corp. All Rights Reserved.
• Spring Web Reactive では
• ControllerからFluxを返すだけでServer-Sent Eventsに対応
• リトライ間隔などを細かく制御する場合は、
Flux<ServerSentEvent>を返す
Reactive Web Controller
Server-Sent Events
private FluxProcessor<Message, Message> processor
= ReplayProcessor.<Message> create().serialize();
@GetMapping("/connect")
public Flux<String> connect() {
return processor.connect().map(m -> formatMessage(m));
}
@PostMapping("/send")
public Mono<Void> send(@RequestBody Mono<Message> message) {
return message.doOnNext(m -> processor.onNext(m)).then();
}
Fluxを返すだけでOK
Spring 5
Web Reactive
61Copyright©2016 NTT corp. All Rights Reserved.
• Servletベースでの起動の場合は・・・
• Servlet3.1から入ったNon-blocking I/O を使ってノンブロッ
キングを実現している
How to realize non-blocking
ControllerからFluxやMonoを返せばいいのは
分かったが、中はどうなっているのか?
62Copyright©2016 NTT corp. All Rights Reserved.
• Servlet 3.0 から入った非同期処理サポートと組み合
わせて使う
• ネットワークの待ち時間からスレッドを解放
• リクエストの受信待ち
• レスポンスの送信待ち
• 主要なインターフェース
• AsyncContext
• ReadListener
• WriteListener
Servlet 3.1 – Non-blocking I/O
63Copyright©2016 NTT corp. All Rights Reserved.
Servlet 3.1 – Non-blocking I/O
Servlet
ReadListener
WriteListener
レスポンスが書
き込めるように
なったので、書
き込み処理開始
リクエストが読み
込めるようになっ
たので処理開始
ReadListener, WriteListenerを登録し
たら、Servletの処理は終了
64Copyright©2016 NTT corp. All Rights Reserved.
• Servletの実装例
Servlet 3.1 – Non-blocking I/O
@WebServlet(urlPatterns = "/nonblocking", asyncSupported = true)
public class MyNonBlockingServlet extends HttpServlet {
@Override
protected void service(HttpServletRequest req,
HttpServletResponse resp)
throws ServletException, IOException {
AsyncContext asyncContext = req.startAsync(req, resp);
ServletInputStream input = req.getInputStream();
ReadListener readListener = new ReadListenerImpl(input, resp, asyncContext);
input.setReadListener(readListener);
}
//omit
}
Servletの非同期サポートを有効化
AsyncContextの取得
ReadListenerインターフェー
スを実装したクラスを作成し、
リスナとして登録
65Copyright©2016 NTT corp. All Rights Reserved.
• ReadListenerの実装例
Servlet 3.1 – Non-blocking I/O
public class ReadListenerImpl implements ReadListener {
@Override // データ読み込み可能になるとコールバックされる
public void onDataAvailable() throws IOException {
int len;
byte[] b = new byte[1024];
while (input.isReady() && !input.isFinished() && (len = input.read(b)) != -1) {
sb.append(new String(b, 0, len));
}
}
@Override // 全データを読み終わるとコールバックされる
public void onAllDataRead() throws IOException {
ServletOutputStream output = resp.getOutputStream();
WriteListener writeListener = new WriteListenerImpl(output,
asyncContext, sb.toString());
output.setWriteListener(writeListener);
}
@Override // エラー時にコールバックされる
public void onError(Throwable throwable) {・・・}
同様にWriteListenerイン
ターフェースを実装した
クラスを作成し、リスナ
として登録
66Copyright©2016 NTT corp. All Rights Reserved.
• WriteListenerの実装例
Servlet 3.1 – Non-blocking I/O
public class WriteListenerImpl implements WriteListener {
@Override // データ書き込み可能になるとコールバックされる
public void onWritePossible() throws IOException {
output.print("<body>" + result + "</body>");
output.flush();
asyncContext.complete();
}
@Override // エラー時にコールバックされる
public void onError(Throwable throwable) {・・・}
}
非同期処理の完了
67Copyright©2016 NTT corp. All Rights Reserved.
• ServletHttpHandlerAdapter
• Servlet 3.1 の Non-blocking I/O を利用する Servletの実装
クラス
• Spring MVC の DispatcherServlet で行っていたような処理
は、このServletではなく DispatcherHandler に委譲
Spring Web Reactive + Servlet 3.1
Servlet API
<<Servlet>>
Dispatcher
Servlet
Servlet API
<<Servlet>>
ServletHttp
HandlerAdapter
Dispatcher
Handler
Spring MVC Spring Web Reactive
68Copyright©2016 NTT corp. All Rights Reserved.
• ServletHttpHandlerAdapter
Spring Web Reactive + Servlet 3.1
@WebServlet(asyncSupported = true)
@SuppressWarnings("serial")
public class ServletHttpHandlerAdapter extends HttpHandlerAdapterSupport
implements Servlet {
// omit
@Override
public void service(ServletRequest servletRequest,
ServletResponse servletResponse) throws IOException {
// Start async before Read/WriteListener registration
AsyncContext asyncContext = servletRequest.startAsync();
// omit
HandlerResultSubscriber resultSubscriber =
new HandlerResultSubscriber(asyncContext);
getHttpHandler().handle(request, response)
.subscribe(resultSubscriber);
}
ここでsubscribeメソッドが呼ばれている
Servletの非同期サポートを有効化
AsyncContextの取得
69Copyright©2016 NTT corp. All Rights Reserved.
Spring Web Reactive + Servlet 3.1
ReadListener
private class RequestBodyReadListener implements ReadListener {
@Override
public void onDataAvailable() throws IOException {
RequestBodyPublisher.this.onDataAvailable();
}
@Override
public void onAllDataRead() throws IOException {
RequestBodyPublisher.this.onAllDataRead();
}
@Override
public void onError(Throwable throwable) {
RequestBodyPublisher.this.onError(throwable);
}
}
ServletServerHttpRequest$
RequestBodyPublisher$
RequestBodyReadListener
70Copyright©2016 NTT corp. All Rights Reserved.
Spring Web Reactive + Servlet 3.1
ReadListener
RequestBody
ReadListener
RequestBody
Publisher
<<State>>
DEMAND
リクエストのInputStreamか
らデータを読み込む
Subscriber
onDataAvailable()
onDataAvailable()
onDataAvailable()
readAndPublish()
onNext()
読み込んだデータ
をSubscriberへ
リクエストデータの読み込みを
要求している状態を表すオブジェクト
71Copyright©2016 NTT corp. All Rights Reserved.
Spring Web Reactive + Servlet 3.1
WriteListener
private class ResponseBodyWriteListener implements WriteListener {
@Override
public void onWritePossible() throws IOException {
if (bodyProcessor != null) {
bodyProcessor.onWritePossible();
}
}
@Override
public void onError(Throwable ex) {
if (bodyProcessor != null) {
bodyProcessor.cancel();
bodyProcessor.onError(ex);
}
}
}
ServletServerHttpResponse$
ResponseBodyWriteListener
72Copyright©2016 NTT corp. All Rights Reserved.
Spring Web Reactive + Servlet 3.1
WriteListener
ResponseBody
WriteListener
ResponseBody
Processor
<<State>>
RECEIVED
レスポンスの
OutputStreamへ出力
Subscription
onWritePossible()
onWritePossible()
onWritePossible()
write()
request(1)
(出力が完了していなければ)
次のデータを1つ要求
出力可能なデータを受け取って
いる状態を表すオブジェクト
73Copyright©2016 NTT corp. All Rights Reserved.
• Controllerメソッドの起動タイミング
• 引数がFlux/Monoかどうかでタイミングが違う
Spring Web Reactive + Servlet 3.1
@RequestMapping("/helloMono")
public Mono<String> hello(@RequestBody Mono<User> user) {
return user.map(u -> "Hello " + u.getName() + "!!");
}
@RequestMapping("/hello")
public Mono<String> hello(@RequestBody User user) {
return Mono.just("Hello " + user.getName() + "!!");
}
メソッド起動の前にはUser(=リクエストデータ)が必要
→ ReadListenerのonAllDataRead()後に起動される
Mono<User>なので、User(リクエストデータ)はまだ不要
→ ReadListenerを待たずに、Servletの実行スレッドから起動される
74Copyright©2016 NTT corp. All Rights Reserved.
HTML rendering ?
75Copyright©2016 NTT corp. All Rights Reserved.
• 状況
• REST対応にフォーカスして開発が進められてきたため、HTML
レンダリングなどの画面遷移系の対応は少し遅れぎみ
• テンプレートエンジン
• Freemarker
• Springとの統合モジュールはSpring側が提供
• spring-web-reactive にサポートクラスがすでに存在する
• 基本的なHTMLレンダリングは可能
• 一応動きました
• Thymeleaf
• Thymeleaf自体にReactiveを意識した改善がすでに入っている
• Springとの統合モジュールはThymelaf側が提供
• 統合モジュールはまだ実験段階
• 一応動いてました
HTML rendering
76Copyright©2016 NTT corp. All Rights Reserved.
• 2016年5月にリリース済み
• 2016/9/28に3.0.2をリリース
• Reactive フレームワークを意識した改善
• Servlet API から独立
• Engine throttling の導入
• Engine throttling
• 出力チャネルからのバックプレッシャー要求に応じて、テンプ
レート処理結果を小分けで出力
• 上記の出力処理を単一スレッドで実行
• data-driven モードで動作させれば、Publisher から流れてく
るデータに応じて、少しずつ処理結果を出力
Thymeleaf 3.0
77Copyright©2016 NTT corp. All Rights Reserved.
• ThymeleafとSpringを統合するためのモジュール
• メインリポジトリ上での Spring 5 対応はまだ
• Thymeleaf Sandbox: Spring + Spring
Reactive
• Spring Web Reactive 対応の実験用リポジトリ
• https://github.com/thymeleaf/thymeleafsandbox-
springreactive
• ThymeleafView や ThymeleafViewResolver の
Reactive対応版の実装が含まれている
• 3つのテンプレート処理モード
1. NORMAL
2. BUFFERED
3. DATA-DRIVEN
thymeleaf-spring
※ テンプレート処理モードについては ThymeleafView#renderFragmentInternal メソッド内のコメントが参考になる
78Copyright©2016 NTT corp. All Rights Reserved.
• Mode: DATA-DRIVEN
• Model に格納された Publisher (Flux/Mono) の onNext(X)
に反応してレンダリングを実行していく
• Non-blocking !!
Thymeleaf Sandbox
Template processing mode: DATA-DRIVEN
@RequestMapping("/biglist-buffered.thymeleaf")
public String bigListBufferedThymeleaf(final Model model) {
final Publisher<PlaylistEntry> playlistFlow =
this.playlistEntryRepository.findLargeCollectionPlaylistEntries();
// No need to fully resolve the Publisher! We will just let it drive
model.addAttribute("dataSource", playlistFlow);
return "thymeleaf/biglist-buffered";
}
Controllerの実装例
PublisherをModelに格納できる!
Flux/MonoでもOK
(data-drivenで処理するデータのModel Attribute名を別
途設定しておく必要あり)
79Copyright©2016 NTT corp. All Rights Reserved.
• Mode: DATA-DRIVEN
• Model に格納された Publisher (Flux/Mono) の onNext(X)
に反応してレンダリングを実行していく
• Non-blocking !!
Thymeleaf Sandbox
Template processing mode: DATA-DRIVEN
@RequestMapping("/biglist-buffered.thymeleaf")
public String bigListBufferedThymeleaf(final Model model) {
final Publisher<PlaylistEntry> playlistFlow =
this.playlistEntryRepository.findLargeCollectionPlaylistEntries();
// No need to fully resolve the Publisher! We will just let it drive
model.addAttribute("dataSource", playlistFlow);
return "thymeleaf/biglist-buffered";
}
Controllerの実装例
PublisherをModelに格納できる!
Flux/MonoでもOK
Spring Framework 5.0 M3 で
動かなくなりました
80Copyright©2016 NTT corp. All Rights Reserved.
• Spring 5.0 M3 では…
• レンダリング前にModel内のPublisher(Flux/Mono)が解決さ
れるようになった
• ThymeleafがModelからオブジェクトを取り出すときには、
Publisher(Flux/Mono)ではなくなっている…
Thymeleaf Sandbox
Template processing mode: DATA-DRIVEN
@RequestMapping("/biglist-buffered.thymeleaf")
public String bigListBufferedThymeleaf(final Model model) {
final Publisher<PlaylistEntry> playlistFlow =
this.playlistEntryRepository.findLargeCollectionPlaylistEntries();
// No need to fully resolve the Publisher! We will just let it drive
model.addAttribute("dataSource", playlistFlow);
return "thymeleaf/biglist-buffered";
}
Controllerの実装例
Publisherを入れてるが、ThymeleafView
が取り出すときにはPublisherではなく
なっている
81Copyright©2016 NTT corp. All Rights Reserved.
Database ?
82Copyright©2016 NTT corp. All Rights Reserved.
• NoSQL
• MongoDB
• Reactive Streams driver
• Couchbase
• RxJava driver
• Redis
• lettuce
• RDB
• JDBCにNon-Blockingな仕組みがない
• PostgreSQL と MySQL の async-driver というものが
GitHub上に見つかるが・・・
• PostgreSQL / MySQL
• https://github.com/mauricio/postgresql-async
• PostgreSQL
• https://github.com/alaisi/postgres-async-driver
Database
Spring Data 2.0 で
対応予定
83Copyright©2016 NTT corp. All Rights Reserved.
MongoDB - Reactive Streams driver
@Repository
public class MongoRepository {
private final ObjectMapper mapper;
private final MongoCollection<Document> col;
@Autowired
public MongoRepository(MongoDatabase db, ObjectMapper mapper) {
this.mapper = mapper;
this.col = db.getCollection("user");
}
public Mono<Void> insert(Mono<User> user) {
return user.flatMap(u -> col.insertOne(Document.parse(toJson(u)))).then();
}
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-
reactivestreams</artifactId>
<version>1.2.0</version>
</dependency>
pom.xml
Repository
Publisherを返してくる
@Bean
MongoDatabase mongoDatabase() {
return MongoClients.create().getDatabase("demo");
}
MongoConfig
84Copyright©2016 NTT corp. All Rights Reserved.
• JavaOne 2016 (9/18-22)にて Non-blocking
JDBC に関するセッションがあった
• セッション名:
• “JDBC Next – A new non-blocking API for connecting to a
database”
• 発表者はOracleの人で、JDBC Expert Group メンバ
• 内容
• 現行JDBCの拡張や置き換えではなく、現行JDBCと選択するも
のになる(らしい)
• Oracle が Oracle DB のドライバとしてプロトタイプを実装し
た(らしい)
• 今後はJDBC Expert Group に開発が引き継がれる(らしい)
• Java10? (まだまだ未定っぽい)
Non-Blocking JDBC API ?
※ https://static.rainfocus.com/oracle/oow16/sess/1461693351182001EmRq/ppt/CONF1578%2020160916.pdf
85Copyright©2016 NTT corp. All Rights Reserved.
Future
86Copyright©2016 NTT corp. All Rights Reserved.
• M4で対応? (予定は未定)
• [SPR-14527] Reactive WebSocket adapter layer
• WebSocketHandlerのリアクティブ版
• [SPR-14546] Reactive multipart request support
• Multipartリクエストへの対応
• [SPR-14534] Reactive HTTP response based RedirectView
• “redirect:”プリフィクスによるリダイレクトに対応
• [SPR-14535] Reactive request and response in SpEL
expression within @MVC annotations
• Controllerメソッド等に使用するアノテーションのSpEL内で、
requestやresponseを参照できるようにする
• @PathVariable, @RequestParam, @RequestHeader, etc
• など…
Future
87Copyright©2016 NTT corp. All Rights Reserved.
Enjoy Reactive !!
88Copyright©2016 NTT corp. All Rights Reserved.
• Spring Framework Reference Documentation 5.0.0 M3 - 23. Web Reactive Framework
• http://docs .s pri n g .io/s pri n g -fram ework / docs /5 .0 .0 . M 3/ sprin g -fram ewor k -r efer ence/ ht ml/ web -r eacti v e.html
• The Spring Blog
• Notes on Reactive Programming Part I: The Reactive Landscape
• h t t p s :// s p r i n g .i o / b l o g / 2 0 1 6 / 0 6 / 0 7 / n o t e s - o n - r e a c t i v e - p r o g r a m m i n g - p a r t - i - t h e - r e a c t i v e - l a n d s c a p e
• Notes on Reactive Programming Part II: Writing Some Code
• h t t p s :// s p r i n g .i o / b l o g / 2 0 1 6 / 0 6 / 1 3 / n o t e s - o n - r e a c t i v e - p r o g r a m m i n g - p a r t - i i - w r i t i n g - s o m e - c o d e
• Notes on Reactive Programming Part III: A Simple HTTP Server Application
• h t t p s :// s p r i n g .i o / b l o g / 2 0 1 6 / 0 7 / 2 0 / n o t e s - o n - r e a c t i v e - p r o g r a m m i n g - p a r t - i i i - a - s i m p l e - h t t p - s e r v e r - a p p l i c a t i o n
• Understandi n g Reactive types
• h t t p s :// s p r i n g .i o / b l o g / 2 0 1 6 / 0 4 / 1 9 / u n d e r s t a n d i n g - r e a c t i v e - t y p e s
• thymeleafsandbox-springreactive
• https://gith u b.com/t h ym el eaf/t hy m elea fsan dbox -sprin g reactiv e
• MongoDB Reactive Streams Java Driver
• https://mong odb.gi th u b.i o/ mon g o -java -driv er -rea ctiv estr eams /
• Database async driver
• Postgres-as yn c -dri ver
• h t t p s :// g i t h u b . c o m /a l a i s i / p o s t g r e s - a s y n c - d r i v e r
• PostgreSQL and MySQL async driver
• h t t p s :// g i t h u b . c o m /m a u r i c i o / p o s t g r e s q l - a s y n c
• サンプルコード
• Spring-r eacti ve -pla yg rou n d (Spring Web Reactive と Mongo/ Cou ch ebas e/Post gr eSql のサンプル)
• h t t p s :// g i t h u b . c o m /s d e l e u z e / s p r i n g - r e a c t i v e - p l a y g r o u n d
• スライド
• Imperative to Reactive Web Applications
• h t t p ://w w w .s l i d e s h a r e . n e t / S p r i n g C e n t r a l /i m p e r a t i v e - t o - r e a c t i v e - w e b - a p p l i c a t i o n s
• Reactive Webアプリケ ーシ ョ ン - そしてSprin g 5へ
• h t t p :// w w w .s l i d e s h a r e . n e t / m a k i n g x / r e a c t i v e - w e b - s p r i n g - 5 - j j u g c c c - c c c e f 3
• Servlet 3.1 Async I/O
• h t t p :// w w w .s l i d e s h a r e . n e t / S i m o n e B o r d e t / s e r v l e t - 3 1 - a s y n c - i o
• JDBC Next – A new non-blocki n g API for connectin g to a database
• h t t p s :// s t a t i c . r a i n f o c u s . c o m /o r a c l e / o o w 1 6 / s e s s / 1 4 6 1 6 9 3 3 5 1 1 8 2 0 0 1 E m R q /p p t /C O N F 1 5 7 8 % 2 0 2 0 1 6 0 9 1 6 .p d f
• 書籍
• パーフェクトJava EE
• h t t p:// g i h y o . j p /b o o k / 2 0 1 6 / 9 7 8 - 4 - 7 7 4 1 - 8 3 1 6 - 9
• Spring徹底入 門 Spring Framework に よる Javaア プ リケ ー ショ ン 開発
• h t t p:// w w w .s h o e i s h a . c o . j p / b o o k / d e t a i l /9 7 8 4 7 9 8 1 4 2 4 7 0
• 論文
• Guido Salvaneschi , Mira Mezini. 2016. Debuggin g reactive programming with reactive inspector .
• Engineer Bainomugisha , Andoni Lombide Carreton , Tom Van Cutsem, Stijn Mostinckx , Wolfgang De Meuter. 2013. A survey on reactive
programming .
• Gérard Berry. 1989. Real Time Programming: Special Purpose or General Purpose Languages.
• Antony Courtney . 2001.Frappé : Functional Reactive Programming in Java .
参考文献

More Related Content

Spring 5に備えるリアクティブプログラミング入門

  • 1. Copyright©2016 NTT corp. All Rights Reserved. Spring 5に備える リアクティブプログラミング入門 2016年11月18日 岩塚 卓弥,堅田 淳也 NTT ソフトウェアイノベーションセンタ
  • 2. 2Copyright©2016 NTT corp. All Rights Reserved. • 名前:岩塚 卓弥 • 所属:NTT ソフトウェアイノベーションセンタ • NTTの研究所のうちソフトウェアを専門に扱う • 自部署ではソフトウェア工学を研究 • Springベースのグループ共通フレームワークの整備を担当 • Spring関連: • Spring I/O, SpringOne それぞれ2015,2016に参加 • 改訂新版 Spring入門,Spring徹底入門 レビュアー • JSUG幹事 Introduction
  • 3. 3Copyright©2016 NTT corp. All Rights Reserved. • 名前:堅田 淳也 • 所属:NTT ソフトウェアイノベーションセンタ • NTTの研究所のうちソフトウェアを専門に扱う • 自部署ではソフトウェア工学を研究 • Springベースのグループ共通フレームワークの整備を担当 • Spring経験: • 元SIerで、プロジェクトへのSpring適用支援などを担当 • 使ったことのあるバージョン:Spring 1, 3, 4, (5) Introduction
  • 4. 4Copyright©2016 NTT corp. All Rights Reserved. Spring Framework 5.0 M1 M2 M3 M4 GA 2016/6 2016/9 2016/12 2017 1Q • ベースラインのアップグレード • JDK 8+, Servlet 3.1+, JMS 2.0+, JPA 2.1+, JUnit 5 • コアコンテナのオーバーホール • 柔軟でプログラマティックなBean登録と解決 • 効率的なウェブアプリケーション • Reactive Streamsベースのwebコントローラ • ラムダ式によるHTTPルーティングと処理 2016/11
  • 5. 5Copyright©2016 NTT corp. All Rights Reserved. Spring Framework 5.0 / Reactor spring-web-reactive reactor-core 実装 依存 依存 For Reactive ProgrammingFor Reactive Web Spring Framework 5.0 Project Reactor
  • 6. 6Copyright©2016 NTT corp. All Rights Reserved. What is reactive program Transformational program 始めに与えられた入力値を使って結果を出力 Interactive program 必要なタイミングでユーザや他のプログラムとやりとり Reactive program 入力タイミングをコントロールできないソースに対応して動作 [G. Berry 1989] による分類 Program入力 出力 Program 入力A 次は入力Bをくれ 入力B Program ソースA ソースB ソースの変化に 対応して変化 出力 出力
  • 7. 7Copyright©2016 NTT corp. All Rights Reserved. Example (Reactive program) 22℃ ℃ → °F Program 71.6 °F リアクティブシステム 与えられた物理環境に対して 連続的にやりとりするシステム
  • 8. 8Copyright©2016 NTT corp. All Rights Reserved. Example (Reactive program) 21.5℃ ℃ → °F Program 70.7 °F リアクティブシステム 与えられた物理環境に対して 連続的にやりとりするシステム
  • 9. 9Copyright©2016 NTT corp. All Rights Reserved. Example (Reactive program) 21℃ ℃ → °F Program 69.8 °F リアクティブシステム 与えられた物理環境に対して 連続的にやりとりするシステム
  • 10. 10Copyright©2016 NTT corp. All Rights Reserved. Invading namespace : Reactive System The Reactive Manifesto* におけるリアクティブシステム 従来リアクティブシステムと呼ばれてきたものとは別物 名前の乗っ取りは良くない! * http://www.reactivemanifesto.org/
  • 11. 11Copyright©2016 NTT corp. All Rights Reserved. Classification of language for reactive programming (Synchronous / Data flow) programming • (本来の意味での)リアクティブシステムを記述する言語 • 実時間の制約を持つ • 入力の頻度や入出力のレスポンス時間に関する制約 Functional reactive programming(FRP) • 実時間の制約は無い • BehaviorとEvent,Switching combinatorがプリミティブと して提供される(後述) Cousins of reactive programming • 実時間の制約もBehaviorも無い • 値の変更の伝播など,その他のRPの特徴のみを持つ • Reactive Streamsはここに分類される [E. Bainomugisha et al. 2013] による分類
  • 12. 12Copyright©2016 NTT corp. All Rights Reserved. • 「関数型言語の上で実装されたRP=FRP」ではない • Javaで実装されたFRPもある • Frappé [A. Courtney 2001] • Eventだけを扱うRP(≠FRP)が昨今の流行り • イベントドリブンなアーキテクチャを実現するためのRP Behavior,Event,Switching combinator Behavior Event Switching combinator 時間で連続変化する値 例:マウスポジション Behavior mouseX Behavior mouseY 値変化の(無限)列 データフローを動的に 切り替えるための連結子 例:キー入力 a b c Event keyStream 例:マウスクリックで描画 色の切替え right left Behavior color = switching( rc -> blue, lc -> red) * 文法は適当
  • 13. 13Copyright©2016 NTT corp. All Rights Reserved. Observer Pattern Subject Observer observe notify Q. イベントドリブンで値の変更の伝搬とかObserver パターンで実現できるのでは. A. できます. 非常に雑に言うとObserverパターンの強いやつが 昨今の流行りです. * http://reactivex.io *
  • 14. 14Copyright©2016 NTT corp. All Rights Reserved. Push based / Pull based reactive programming Producer Consumer Producer Consumer PushベースのRP PullベースのRP 新しい値を 片っ端から送信 頑張って処理 必要なときに 欲しい分だけ要求 Consumerの 要求に従って送信 ✔ Producerで値が生成されてからConsumerが反応するまでのレイテンシが短い ✘ Consumerの処理能力を超えて値が送信されてくる可能性がある ✔ 必要なとき(Consumerが処理可能な場合)にだけ新しい値を取得できる ✘ Producerで値が生成されてからConsumerが反応するまで時間がかかる
  • 15. 15Copyright©2016 NTT corp. All Rights Reserved. Reactive Streams • 非同期ストリーム処理 • ノンブロッキングなバックプレッシャ • バックプレッシャ:受信側による送信制御を行う仕組み* • 標準を提供 • 最低限のメソッドを備えたインタフェースと,仕様のみを提供 • Publisher, Subscriber, Subscription, Processor • 実用上必要なメソッドは各実装(Reactor他)が提供 * 元々はネットワーク用語.本来の用法から拡大されて使用されている .http://moccosblue.blogspot.jp/2015/05/translatebackpressure.html ノンブロッキングなバックプレッシャを備えた 非同期ストリーム処理の標準を提供するための提案 (直訳) c.f. パルスのファルシのルシがコクーンでパージ Producer Consumer この間のやりとりをノンブロッキングに それぞれ非同期に処理できる
  • 16. 16Copyright©2016 NTT corp. All Rights Reserved. Publisher, Subscriber, Subscription Subscription Subscriber<T> request(long n) Publisher<T> onSubscribe(Subscription s) onNext(T t) × (0 〜 n) onComplete() / onError(Throwable e) … subscribe(Subscriber<? super T> s) • 各メソッドの戻り値の型はすべてvoid • Subscriberは送られてきたシグナルに対応して動作 • PushベースRP / PullベースRP の両方を実現可能 • request(Long.MAX_VALUE) とするとPushベースRPになる • Processor = Publisher ∧ Subscriber
  • 17. 17Copyright©2016 NTT corp. All Rights Reserved. • Reactive Streamsの各インタフェースを提供 • JDK9から利用可能 • Reactive Streamsとは別物扱いなので,相互変換が必要 java.util.concurrent.Flow
  • 18. 18Copyright©2016 NTT corp. All Rights Reserved. Project Reactor Reactor Core Reactor IPC Reactor Add-ons ・Reactor Core ・Reactor Core .Net ・Reactor Core JS ・Reactor Netty ・Reactor Kafka ・Reactor Aeron ・Reactor Adapter ・Reactor Test ・Reactor Logback Reactive Streamsの実装 JDK9のFlowとの変換 エンコード / デコード 通信(UDP / TCP / HTTP) 他実装との変換 その他便利系 Reactor Coreを中心とした一連のプロジェクト群
  • 19. 19Copyright©2016 NTT corp. All Rights Reserved. Mono, Flux Publisher<T> Mono<T> Flux<T> 0個または1個の値を発行 0個以上の値を発行 • Reactor Coreで提供されるPublisherの実装 • 発行され得る値の数によって使い分ける • subscribe以外にストリーム処理用の様々なメソッド を提供 • filter, map, take, skip, …
  • 20. 20Copyright©2016 NTT corp. All Rights Reserved. Flux.range(0, 10) .delayMillis(1000) .filter(n -> n%2 == 0) .map(n -> n*2); Example (Flux) 9876543210 0123456789 86420 1612840 delayMillis(1000) filter(n -> n%2 == 0) filter(n -> n*2)
  • 21. 21Copyright©2016 NTT corp. All Rights Reserved. Implementation detail Fluxのサブクラスのインスタンスが作成される. クラスはパッケージプライベートなので普通は意識しないで良い. FluxRange FluxFilter FluxMap source source Flux.range(..).filter(..).map(..);
  • 22. 22Copyright©2016 NTT corp. All Rights Reserved. Implementation detail FluxRange FluxFilter FluxMap Flux.range(..).filter(..).map(..).subscribe(..); source source Subscriber subscribe
  • 23. 23Copyright©2016 NTT corp. All Rights Reserved. Implementation detail FluxRange FluxFilter FluxMap Flux.range(..).filter(..).map(..).subscribe(..); source source actual actual Subscriber subscribe new FluxFilter .FilterSubscriber FluxMap .MapSubscriber
  • 24. 24Copyright©2016 NTT corp. All Rights Reserved. Implementation detail FluxRange FluxFilter FluxMap Flux.range(..).filter(..).map(..).subscribe(..); source source actual actual Subscriber subscribe new FluxFilter .FilterSubscriber FluxMap .MapSubscriber FluxRange .RangeSubscription actual actual onSubscribe s s s
  • 25. 25Copyright©2016 NTT corp. All Rights Reserved. Implementation detail FluxRange FluxFilter FluxMap Flux.range(..).filter(..).map(..).subscribe(..); source source actual actual Subscriber subscribe new FluxFilter .FilterSubscriber FluxMap .MapSubscriber FluxRange .RangeSubscription actual actual onSubscribe s s s request
  • 26. 26Copyright©2016 NTT corp. All Rights Reserved. Implementation detail FluxRange FluxFilter FluxMap Flux.range(..).filter(..).map(..).subscribe(..); source source actual actual Subscriber subscribe new FluxFilter .FilterSubscriber FluxMap .MapSubscriber FluxRange .RangeSubscription actual actual onSubscribe s s s request onNext
  • 27. 27Copyright©2016 NTT corp. All Rights Reserved. Don‘t return void public void nonsense(Flux<Integer> s){ s.map(n -> n*n); } public Flux<Integer> ok(Flux<Integer> s){ return s.map(n -> n*n); } ダメな例 修正 「subscribeされない = 何も起こらない」 新しいFluxが生成されて捨てられるだけ 戻り値がどこかでsubscribeされればOK
  • 28. 28Copyright©2016 NTT corp. All Rights Reserved. Subscriber for Reactive Web @Controller public Flux<Hoge> getHoges(){ … return resultFlux; } 誰がsubscribeしている? Servlet 3.1利用の場合 ServletHttpHandlerAdapter#service …
  • 29. 29Copyright©2016 NTT corp. All Rights Reserved. Comparison from other types CompletableFuture Stream Optional ✔ 非同期,ノンブロッキング ✔ 処理の合成 ✘ Pushにしか対応できない ✘ ストリーム処理に対応できない ✔ ストリーム処理 ✔ 処理の合成 ✘ 非同期処理のためのAPIがない ✔ 0個または1個の値を扱う(ReactorのMonoと同様) ✔ 処理の合成 ✘ 非同期処理のためのAPIがない
  • 30. 30Copyright©2016 NTT corp. All Rights Reserved. • Reactorの祖先はReactive Extensions(Rx) • Rxが最初に導入されたのは .NET • 当時Microsoftに在籍していたErik Meijer氏らが設計 • Erik Meijer氏は関数型言語(主にHaskell)の研究者としても知ら れている • 関数型のプログラミングスタイルとは • 副作用を極力避けるプログラミングスタイル • そもそも実は「関数型言語」には明確な定義がない* Functional programming style Rxは関数型のプログラミングスタイルが活きるよう設計されている * 東北大学 住井先生によるQiitaエントリ http://qiita.com/esumii/items/ec589d138e72e22ea97e
  • 31. 31Copyright©2016 NTT corp. All Rights Reserved. • 副作用を使用しない(=参照透明である)メリット • バグを作り込みにくくなる • 正しさを検証しやすくなる • 関数の独立性が高くなる Referential Transparency 「状態」を持たないことによる複雑性と依存性の排除 いつどこで実行されるか分からない処理では参照透明性は特に重要 Reactive Streamsにおいては… ・実際の計算はsubscribeされるまで遅延される → どのタイミングで実行されるか分からない ・非同期的に実行される可能性がある → どのスレッドで実行されるか分からない
  • 32. 32Copyright©2016 NTT corp. All Rights Reserved. • ラムダ式 • 関数型インタフェースの実装を記述する構文(from Java 8) • 名前をつけるまでもない小さな関数・述語等の記述に利用 • メソッド参照 • 関数型インタフェースの実装として既存のメソッドを渡すため の構文(from Java 8) • 既存のメソッドを高階関数に渡す際に利用 • 高階関数 • 関数を引数や戻り値とする関数 (map, filter, …) • 関数を組み合わせて処理を記述するために利用 • タプル • 順序を持った値の組 • 複数の値を返したい関数の定義等に利用 Tools for functional programming Java Reactor
  • 33. 33Copyright©2016 NTT corp. All Rights Reserved. • 関数型言語で多く用いられている一種のデザインパターン • 関数をチェーンさせて,順番に値に適用させていくことができる • Mono / Flux はMonadになっている • StreamやOptional, CompletableFutureもMonad Monad map (a.k.a fmap) just (a.k.a unit, pure) flatMap (a.k.a bind) Monad が備える関数 Monad則,Functor,Applicative等との関係等,細かい諸々は省略 → 全部同じような考え方で取り扱える
  • 34. 34Copyright©2016 NTT corp. All Rights Reserved. Example public Flux<Item> findItems(int idxA, int idxB){ return Mono.when(serviceA.findOne(idxA), serviceB.findOne(idxB)) .flatMap(p -> findItemsByXAndY( p.getT1.getX(), p.getT2.getY())); } public Mono<long> getTotal(int idxA, int idxB){ return findItems(idxA, idxB) .map(Item::calculateScore) .reduce((a, b) -> a+b); }
  • 35. 35Copyright©2016 NTT corp. All Rights Reserved. Example public Flux<Item> findItems(int idxA, int idxB){ return Mono.when(serviceA.findOne(idxA), serviceB.findOne(idxB)) .flatMap(p -> findItemsByXAndY( p.getT1.getX(), p.getT2.getY())); } public Mono<long> getTotal(int idxA, int idxB){ return findItems(idxA, idxB) .map(Item::calculateScore) .reduce(0, (a, b) -> a+b); } 2つのMonoから,Monoのタプルを作る
  • 36. 36Copyright©2016 NTT corp. All Rights Reserved. Example public Flux<Item> findItems(int idxA, int idxB){ return Mono.when(serviceA.findOne(idxA), serviceB.findOne(idxB)) .flatMap(p -> findItemsByXAndY( p.getT1.getX(), p.getT2.getY())); } public Mono<long> getTotal(int idxA, int idxB){ return findItems(idxA, idxB) .map(Item::calculateScore) .reduce((a, b) -> a+b); } ラムダ式 タプルからの値の取り出し 高階関数
  • 37. 37Copyright©2016 NTT corp. All Rights Reserved. Example public Flux<Item> findItems(int idxA, int idxB){ return Mono.when(serviceA.findOne(idxA), serviceB.findOne(idxB)) .flatMap(p -> findItemsByXAndY( p.getT1.getX(), p.getT2.getY())); } public Mono<long> getTotal(int idxA, int idxB){ return findItems(idxA, idxB) .map(Item::calculateScore) .reduce((a, b) -> a+b); } 高階関数 メソッド参照
  • 38. 38Copyright©2016 NTT corp. All Rights Reserved. Example public Flux<Item> findItems(int idxA, int idxB){ return Mono.when(serviceA.findOne(idxA), serviceB.findOne(idxB)) .flatMap(p -> findItemsByXAndY( p.getT1.getX(), p.getT2.getY())); } public Mono<long> getTotal(int idxA, int idxB){ return findItems(idxA, idxB) .map(Item::calculateScore) .reduce((a, b) -> a+b); } ラムダ式 そこまでの関数の適用結果と次の要素を 繰り返し関数に適用していく高階関数 (畳み込み)
  • 39. 39Copyright©2016 NTT corp. All Rights Reserved. • Reactorでは,特に指定しない限りsubscribeを呼び だしたスレッドでシグナルを処理する • 多くの場合それが一番パフォーマンスが良い • 必要ならば明示的にバックグラウンド実行させる • Reactor Coreに含まれるSchedulerを利用する Parallel processing Flux.just(“red”, “blue”, “green”) .map(String::toUpperCase) .subscribeOn(Schedullers.parallel()) .subscribe(); 内部的に保持している複数のスレッドから 一つを選んでタスクを実行
  • 40. 40Copyright©2016 NTT corp. All Rights Reserved. • すべての要素を並列実行させるには,すべての要素を 別々のPublisherに分配させる Parallel processing Flux.range(0, 100) .flatMap(n -> Mono.just(n*2) .subscribeOn(Scheduler.parallel()) ) .subscribe(); 別々のPublisherに分配 別のスレッドで実行 [2, 10, 18, 26, 34, 42, 50, …] [0, 8, 6, 14, 22, 30, 38, …] flatMap: concatMap: • 実行順序は非決定的になる • flatMapの代わりにconcatMapを使えば順序が担保される [0, 2, 4, 6, 8, 10, 12, …] 毎回結果が異なる
  • 41. 41Copyright©2016 NTT corp. All Rights Reserved. • 非同期プログラミングはそもそも非常に複雑で難しい • Rxは上手くデザインされているが,非同期プログラミ ングの複雑さがすべて解消するわけではない • 非決定性に起因する再現性の低いバグ • デバッグが難しい • テストが難しい • 設計が難しい • etc… Complexity gain → 不必要にRPを導入することはデメリットの方が大きい
  • 42. 42Copyright©2016 NTT corp. All Rights Reserved. Glitch 値の変更の伝搬の過程で一時的に不整合な状態になることがある Flux<Integer> a = Flux.range(0,10); Flux<Integer> b = a.map(n -> n*2); Flux<Integer> c = Flux.combineLatest( b, a, (n, m) -> n+m); a 0 1 2 3 4 5 6 7 8 9 b 0 2 4 6 8 10 12 14 16 18 c 0 3 6 9 12 15 18 21 24 27 理想 現実 a 0 1 1 2 2 3 3 4 4 … b 0 2 2 4 4 6 6 8 8 … c 0 2 3 5 6 8 9 11 12 … bの変更がaの変更より先にcに伝搬される
  • 43. 43Copyright©2016 NTT corp. All Rights Reserved. • ICSE’16でRPのデバッグに関する研究が発表されている • 対象はScala • EclipseプラグインとしてRP用デバッガを実装 • この領域はまだ他にも解決すべき課題は色々ありそう Research effort 依存関係を可視化 ブレークポイントの設定 [G. Salvaneschi, M. Mezini 2016]
  • 44. 44Copyright©2016 NTT corp. All Rights Reserved. Spring Web Reactive Spring Framework 5.0
  • 45. 45Copyright©2016 NTT corp. All Rights Reserved. Spring Framework 5.0 Web MVC vs Web Reactive @Controller, @RequestMapping Spring MVC Servlet API Servlet Container Spring Web Reactive Reactive HTTP Servlet 3.1, Netty, Undertow
  • 46. 46Copyright©2016 NTT corp. All Rights Reserved. Spring Framework 5.0 Web MVC vs Web Reactive @Controller, @RequestMapping Spring MVC Servlet API Servlet Container Spring Web Reactive Reactive HTTP Servlet 3.1, Netty, Undertow spring-web-reactive.jarが 新規追加 Servlet 3.1 で追加されたNon-blocking I/O を利用 Servlet以外の環境にも対応 従来のMVCと同様の アノテーションが使える
  • 47. 47Copyright©2016 NTT corp. All Rights Reserved. • Spring Web MVC とは独立した存在 • しかし、多くのアルゴリズムはMVCと共通 • プログラミングモデルもMVCと変わらない • @Controller, @RequestMapping, @RequestBody, etc… • Reactive HTTP request/responseを処理 • リクエスト/レスポンスの読み書きをNon-Blockingで処理 • 少ないスレッド数でスケール • 現在の実装状況など • Spring Framework 5.0 M3 が 2016/11/8 にリリース • まずはRESTにフォーカスして実装してきた • HTMLレンダリング系も徐々に実装されてきている Spring Web Reactive Framework
  • 48. 48Copyright©2016 NTT corp. All Rights Reserved. • Spring Initializr • http://start.spring.io Getting start with Spring Boot 2.0.0(SNAPSHOT)を選択すると、 Reactive Webが追加可能になる
  • 49. 49Copyright©2016 NTT corp. All Rights Reserved. pom.xml <dependencies> <dependency> <groupId>org.springframework.boot.experimental</groupId> <artifactId>spring-boot-starter-web-reactive</artifactId> </dependency> <!-- omit --> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot.experimental</groupId> <artifactId>spring-boot-dependencies-web-reactive</artifactId> <version>0.1.0.BUILD-SNAPSHOT</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
  • 50. 50Copyright©2016 NTT corp. All Rights Reserved. • ControllerからFluxやMonoが返せる • Flux/Mono の subscribe は呼ばない • subscribe するのは Spring Reactive Web Controller @RestController public class UserController { @Autowired UserRepository userRepository; @GetMapping("/find") public Mono<User> find(@RequestParam("id") long id) { return userRepository.findById(id); } @GetMapping("/listAdult") public Flux<User> listAdult() { // 20歳以上のユーザを返す return userRepository.findAll() .filter(u -> u.getAge() >= 20); } } public interface UserRepository { Mono<User> findById(long id); Flux<User> findAll(); Mono<Void> save(User user); } /listAdult の結果
  • 51. 51Copyright©2016 NTT corp. All Rights Reserved. • @ResponseBody(@RestController)メソッドの 戻り値として返せる型 Reactive Web Controller ※ “User”は任意のJavaBeanを表す • Reactor • Mono<User> • Mono<Void> • Flux<User> • Flux<ServerSentEvent> • RxJava • Single<User> • Observable<User> • Flowable<User> • Not reactive types • User • void
  • 52. 52Copyright©2016 NTT corp. All Rights Reserved. • ダメな例 • スレッドをブロックするメソッドを使ってFlux/Monoから値を 取り出す Reactive Web Controller @GetMapping("/findBlocking") public User findBlocking(@RequestParam("id") long id) { return userRepository.findById(id).block(); } @GetMapping("/listAdultBlocking") public List<User> listAdult() { List<User> list = new ArrayList<>(); // 20歳以上のユーザを返す userRepository.findAll() .filter(u -> u.getAge() >= 20) .toIterable() .forEach(list::add); return list; } Userが返ってくるまでブロック! FluxをIterableに変換 IterableからUserを取得するときにブロック!
  • 53. 53Copyright©2016 NTT corp. All Rights Reserved. • Controllerの引数 • @RequestBody を付与することで、JSON(Jackson)や XML(JAXB)を受け取れる • 引数の型にはFlux/Mono が指定可能 • Monoを使わずに単体のBeanで受けることもできる • @PathVariableや@RequestParam も使える Reactive Web Controller @RequestMapping("/helloMono") public Mono<String> hello(@RequestBody Mono<User> user) { return user.map(u -> "Hello " + u.getName() + "!!"); } @RequestMapping("/hello") public Mono<String> hello(@RequestBody User user) { return Mono.just("Hello " + user.getName() + "!!"); } Mono<User>じゃなくてもブロッキング にはならない
  • 54. 54Copyright©2016 NTT corp. All Rights Reserved. • @RequestBody が付与された引数に指定可能な型 Reactive Web Controller ※ “User”は任意のJavaBeanを表す • Reactor • Mono<User> • Flux<User> • RxJava • Single<User> • Observable<User> • Not reactive type • User
  • 55. 55Copyright©2016 NTT corp. All Rights Reserved. • @ModelAttribute 引数への対応 • 従来通り@ModelAttributeの省略も可能 Reactive Web Controller @RequestMapping("/helloAnnotation") public Mono<String> helloAnnotation(@ModelAttribute("user") Mono<User> user) { return user.map(u -> "Hello " + u.getName()); } @RequestMapping("/hello") public Mono<String> hello(Mono<User> user) { return user.map(u -> "Hello " + u.getName()); } Spring 5.0 M3 から POST /form/hello HTTP/1.1 Host: localhost:8080 Content-Type: application/x-www-form-urlencoded id=10&name=taro&age=25 HTTPリクエスト id= 10 name= “taro” age= 25 :User
  • 56. 56Copyright©2016 NTT corp. All Rights Reserved. • Bean Validation も使える • 従来通り@Validatedによる検証が可能 • pom.xmlにstarterの追加が必要 Reactive Web Controller Bean Validation <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency>
  • 57. 57Copyright©2016 NTT corp. All Rights Reserved. • Monoに対して@Validatedを付与した場合は、検証失 敗時に例外が流れてくる Reactive Web Controller Bean Validation @RequestMapping("/validate") public Mono<String> formValidate(@Validated User user, BindingResult bindingResult) { if (bindingResult.hasErrors()) { return Mono.just("Error!"); } } @RequestMapping("/validateMono") public Mono<String> formValidate(@Validated Mono<User> user) { return user .map(u -> "Hello " + u.getName()) .otherwise(WebExchangeBindException.class, e -> Mono.just("Error!")); } 検証失敗時はWebExchangeBindExceptionが発生
  • 58. 58Copyright©2016 NTT corp. All Rights Reserved. • ちなみに・・・ • Monoの引数に対してBindingResultの引数を追加すると IllegalArgumentException発生 Reactive Web Controller Bean Validation // Not work! @RequestMapping("/validateMonoWithBindingResult") public Mono<String> form(@Validated Mono<User> user, BindingResult bindingResult) { if (bindingResult.hasErrors()) { ・・・ Caused by: java.lang.IllegalArgumentException: Errors/BindingResult cannot be used with an async model attribute. Either declare the model attribute without the async wrapper type or handle WebExchangeBindException through the async type. at org.springframework.util.Assert.isNull(Assert.java:126) ~[spring-core-5.0.0.BUILD- SNAPSHOT.jar:5.0.0.BUILD-SNAPSHOT] … 例外メッセージ
  • 59. 59Copyright©2016 NTT corp. All Rights Reserved. • HTML5のサーバプッシュ技術 • Springでは4.2から対応している Reactive Web Controller Server-Sent Events @GetMapping("/connect") public SseEmitter connect() { SseEmitter sseEmitter = new SseEmitter(); sseEmitters.add(sseEmitter); sseEmitter.onCompletion(() -> sseEmitters.remove(sseEmitter)); return sseEmitter; } @PostMapping("/send") public void send(@RequestBody Message message) { for (SseEmitter sseEmitter : this.sseEmitters) { try { sseEmitter.send(message, MediaType.APPLICATION_JSON); } catch (Exception e) { e.printStackTrace(); } } } Controllerメソッドから SseEmitterを返す Spring 4.3
  • 60. 60Copyright©2016 NTT corp. All Rights Reserved. • Spring Web Reactive では • ControllerからFluxを返すだけでServer-Sent Eventsに対応 • リトライ間隔などを細かく制御する場合は、 Flux<ServerSentEvent>を返す Reactive Web Controller Server-Sent Events private FluxProcessor<Message, Message> processor = ReplayProcessor.<Message> create().serialize(); @GetMapping("/connect") public Flux<String> connect() { return processor.connect().map(m -> formatMessage(m)); } @PostMapping("/send") public Mono<Void> send(@RequestBody Mono<Message> message) { return message.doOnNext(m -> processor.onNext(m)).then(); } Fluxを返すだけでOK Spring 5 Web Reactive
  • 61. 61Copyright©2016 NTT corp. All Rights Reserved. • Servletベースでの起動の場合は・・・ • Servlet3.1から入ったNon-blocking I/O を使ってノンブロッ キングを実現している How to realize non-blocking ControllerからFluxやMonoを返せばいいのは 分かったが、中はどうなっているのか?
  • 62. 62Copyright©2016 NTT corp. All Rights Reserved. • Servlet 3.0 から入った非同期処理サポートと組み合 わせて使う • ネットワークの待ち時間からスレッドを解放 • リクエストの受信待ち • レスポンスの送信待ち • 主要なインターフェース • AsyncContext • ReadListener • WriteListener Servlet 3.1 – Non-blocking I/O
  • 63. 63Copyright©2016 NTT corp. All Rights Reserved. Servlet 3.1 – Non-blocking I/O Servlet ReadListener WriteListener レスポンスが書 き込めるように なったので、書 き込み処理開始 リクエストが読み 込めるようになっ たので処理開始 ReadListener, WriteListenerを登録し たら、Servletの処理は終了
  • 64. 64Copyright©2016 NTT corp. All Rights Reserved. • Servletの実装例 Servlet 3.1 – Non-blocking I/O @WebServlet(urlPatterns = "/nonblocking", asyncSupported = true) public class MyNonBlockingServlet extends HttpServlet { @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { AsyncContext asyncContext = req.startAsync(req, resp); ServletInputStream input = req.getInputStream(); ReadListener readListener = new ReadListenerImpl(input, resp, asyncContext); input.setReadListener(readListener); } //omit } Servletの非同期サポートを有効化 AsyncContextの取得 ReadListenerインターフェー スを実装したクラスを作成し、 リスナとして登録
  • 65. 65Copyright©2016 NTT corp. All Rights Reserved. • ReadListenerの実装例 Servlet 3.1 – Non-blocking I/O public class ReadListenerImpl implements ReadListener { @Override // データ読み込み可能になるとコールバックされる public void onDataAvailable() throws IOException { int len; byte[] b = new byte[1024]; while (input.isReady() && !input.isFinished() && (len = input.read(b)) != -1) { sb.append(new String(b, 0, len)); } } @Override // 全データを読み終わるとコールバックされる public void onAllDataRead() throws IOException { ServletOutputStream output = resp.getOutputStream(); WriteListener writeListener = new WriteListenerImpl(output, asyncContext, sb.toString()); output.setWriteListener(writeListener); } @Override // エラー時にコールバックされる public void onError(Throwable throwable) {・・・} 同様にWriteListenerイン ターフェースを実装した クラスを作成し、リスナ として登録
  • 66. 66Copyright©2016 NTT corp. All Rights Reserved. • WriteListenerの実装例 Servlet 3.1 – Non-blocking I/O public class WriteListenerImpl implements WriteListener { @Override // データ書き込み可能になるとコールバックされる public void onWritePossible() throws IOException { output.print("<body>" + result + "</body>"); output.flush(); asyncContext.complete(); } @Override // エラー時にコールバックされる public void onError(Throwable throwable) {・・・} } 非同期処理の完了
  • 67. 67Copyright©2016 NTT corp. All Rights Reserved. • ServletHttpHandlerAdapter • Servlet 3.1 の Non-blocking I/O を利用する Servletの実装 クラス • Spring MVC の DispatcherServlet で行っていたような処理 は、このServletではなく DispatcherHandler に委譲 Spring Web Reactive + Servlet 3.1 Servlet API <<Servlet>> Dispatcher Servlet Servlet API <<Servlet>> ServletHttp HandlerAdapter Dispatcher Handler Spring MVC Spring Web Reactive
  • 68. 68Copyright©2016 NTT corp. All Rights Reserved. • ServletHttpHandlerAdapter Spring Web Reactive + Servlet 3.1 @WebServlet(asyncSupported = true) @SuppressWarnings("serial") public class ServletHttpHandlerAdapter extends HttpHandlerAdapterSupport implements Servlet { // omit @Override public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws IOException { // Start async before Read/WriteListener registration AsyncContext asyncContext = servletRequest.startAsync(); // omit HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(asyncContext); getHttpHandler().handle(request, response) .subscribe(resultSubscriber); } ここでsubscribeメソッドが呼ばれている Servletの非同期サポートを有効化 AsyncContextの取得
  • 69. 69Copyright©2016 NTT corp. All Rights Reserved. Spring Web Reactive + Servlet 3.1 ReadListener private class RequestBodyReadListener implements ReadListener { @Override public void onDataAvailable() throws IOException { RequestBodyPublisher.this.onDataAvailable(); } @Override public void onAllDataRead() throws IOException { RequestBodyPublisher.this.onAllDataRead(); } @Override public void onError(Throwable throwable) { RequestBodyPublisher.this.onError(throwable); } } ServletServerHttpRequest$ RequestBodyPublisher$ RequestBodyReadListener
  • 70. 70Copyright©2016 NTT corp. All Rights Reserved. Spring Web Reactive + Servlet 3.1 ReadListener RequestBody ReadListener RequestBody Publisher <<State>> DEMAND リクエストのInputStreamか らデータを読み込む Subscriber onDataAvailable() onDataAvailable() onDataAvailable() readAndPublish() onNext() 読み込んだデータ をSubscriberへ リクエストデータの読み込みを 要求している状態を表すオブジェクト
  • 71. 71Copyright©2016 NTT corp. All Rights Reserved. Spring Web Reactive + Servlet 3.1 WriteListener private class ResponseBodyWriteListener implements WriteListener { @Override public void onWritePossible() throws IOException { if (bodyProcessor != null) { bodyProcessor.onWritePossible(); } } @Override public void onError(Throwable ex) { if (bodyProcessor != null) { bodyProcessor.cancel(); bodyProcessor.onError(ex); } } } ServletServerHttpResponse$ ResponseBodyWriteListener
  • 72. 72Copyright©2016 NTT corp. All Rights Reserved. Spring Web Reactive + Servlet 3.1 WriteListener ResponseBody WriteListener ResponseBody Processor <<State>> RECEIVED レスポンスの OutputStreamへ出力 Subscription onWritePossible() onWritePossible() onWritePossible() write() request(1) (出力が完了していなければ) 次のデータを1つ要求 出力可能なデータを受け取って いる状態を表すオブジェクト
  • 73. 73Copyright©2016 NTT corp. All Rights Reserved. • Controllerメソッドの起動タイミング • 引数がFlux/Monoかどうかでタイミングが違う Spring Web Reactive + Servlet 3.1 @RequestMapping("/helloMono") public Mono<String> hello(@RequestBody Mono<User> user) { return user.map(u -> "Hello " + u.getName() + "!!"); } @RequestMapping("/hello") public Mono<String> hello(@RequestBody User user) { return Mono.just("Hello " + user.getName() + "!!"); } メソッド起動の前にはUser(=リクエストデータ)が必要 → ReadListenerのonAllDataRead()後に起動される Mono<User>なので、User(リクエストデータ)はまだ不要 → ReadListenerを待たずに、Servletの実行スレッドから起動される
  • 74. 74Copyright©2016 NTT corp. All Rights Reserved. HTML rendering ?
  • 75. 75Copyright©2016 NTT corp. All Rights Reserved. • 状況 • REST対応にフォーカスして開発が進められてきたため、HTML レンダリングなどの画面遷移系の対応は少し遅れぎみ • テンプレートエンジン • Freemarker • Springとの統合モジュールはSpring側が提供 • spring-web-reactive にサポートクラスがすでに存在する • 基本的なHTMLレンダリングは可能 • 一応動きました • Thymeleaf • Thymeleaf自体にReactiveを意識した改善がすでに入っている • Springとの統合モジュールはThymelaf側が提供 • 統合モジュールはまだ実験段階 • 一応動いてました HTML rendering
  • 76. 76Copyright©2016 NTT corp. All Rights Reserved. • 2016年5月にリリース済み • 2016/9/28に3.0.2をリリース • Reactive フレームワークを意識した改善 • Servlet API から独立 • Engine throttling の導入 • Engine throttling • 出力チャネルからのバックプレッシャー要求に応じて、テンプ レート処理結果を小分けで出力 • 上記の出力処理を単一スレッドで実行 • data-driven モードで動作させれば、Publisher から流れてく るデータに応じて、少しずつ処理結果を出力 Thymeleaf 3.0
  • 77. 77Copyright©2016 NTT corp. All Rights Reserved. • ThymeleafとSpringを統合するためのモジュール • メインリポジトリ上での Spring 5 対応はまだ • Thymeleaf Sandbox: Spring + Spring Reactive • Spring Web Reactive 対応の実験用リポジトリ • https://github.com/thymeleaf/thymeleafsandbox- springreactive • ThymeleafView や ThymeleafViewResolver の Reactive対応版の実装が含まれている • 3つのテンプレート処理モード 1. NORMAL 2. BUFFERED 3. DATA-DRIVEN thymeleaf-spring ※ テンプレート処理モードについては ThymeleafView#renderFragmentInternal メソッド内のコメントが参考になる
  • 78. 78Copyright©2016 NTT corp. All Rights Reserved. • Mode: DATA-DRIVEN • Model に格納された Publisher (Flux/Mono) の onNext(X) に反応してレンダリングを実行していく • Non-blocking !! Thymeleaf Sandbox Template processing mode: DATA-DRIVEN @RequestMapping("/biglist-buffered.thymeleaf") public String bigListBufferedThymeleaf(final Model model) { final Publisher<PlaylistEntry> playlistFlow = this.playlistEntryRepository.findLargeCollectionPlaylistEntries(); // No need to fully resolve the Publisher! We will just let it drive model.addAttribute("dataSource", playlistFlow); return "thymeleaf/biglist-buffered"; } Controllerの実装例 PublisherをModelに格納できる! Flux/MonoでもOK (data-drivenで処理するデータのModel Attribute名を別 途設定しておく必要あり)
  • 79. 79Copyright©2016 NTT corp. All Rights Reserved. • Mode: DATA-DRIVEN • Model に格納された Publisher (Flux/Mono) の onNext(X) に反応してレンダリングを実行していく • Non-blocking !! Thymeleaf Sandbox Template processing mode: DATA-DRIVEN @RequestMapping("/biglist-buffered.thymeleaf") public String bigListBufferedThymeleaf(final Model model) { final Publisher<PlaylistEntry> playlistFlow = this.playlistEntryRepository.findLargeCollectionPlaylistEntries(); // No need to fully resolve the Publisher! We will just let it drive model.addAttribute("dataSource", playlistFlow); return "thymeleaf/biglist-buffered"; } Controllerの実装例 PublisherをModelに格納できる! Flux/MonoでもOK Spring Framework 5.0 M3 で 動かなくなりました
  • 80. 80Copyright©2016 NTT corp. All Rights Reserved. • Spring 5.0 M3 では… • レンダリング前にModel内のPublisher(Flux/Mono)が解決さ れるようになった • ThymeleafがModelからオブジェクトを取り出すときには、 Publisher(Flux/Mono)ではなくなっている… Thymeleaf Sandbox Template processing mode: DATA-DRIVEN @RequestMapping("/biglist-buffered.thymeleaf") public String bigListBufferedThymeleaf(final Model model) { final Publisher<PlaylistEntry> playlistFlow = this.playlistEntryRepository.findLargeCollectionPlaylistEntries(); // No need to fully resolve the Publisher! We will just let it drive model.addAttribute("dataSource", playlistFlow); return "thymeleaf/biglist-buffered"; } Controllerの実装例 Publisherを入れてるが、ThymeleafView が取り出すときにはPublisherではなく なっている
  • 81. 81Copyright©2016 NTT corp. All Rights Reserved. Database ?
  • 82. 82Copyright©2016 NTT corp. All Rights Reserved. • NoSQL • MongoDB • Reactive Streams driver • Couchbase • RxJava driver • Redis • lettuce • RDB • JDBCにNon-Blockingな仕組みがない • PostgreSQL と MySQL の async-driver というものが GitHub上に見つかるが・・・ • PostgreSQL / MySQL • https://github.com/mauricio/postgresql-async • PostgreSQL • https://github.com/alaisi/postgres-async-driver Database Spring Data 2.0 で 対応予定
  • 83. 83Copyright©2016 NTT corp. All Rights Reserved. MongoDB - Reactive Streams driver @Repository public class MongoRepository { private final ObjectMapper mapper; private final MongoCollection<Document> col; @Autowired public MongoRepository(MongoDatabase db, ObjectMapper mapper) { this.mapper = mapper; this.col = db.getCollection("user"); } public Mono<Void> insert(Mono<User> user) { return user.flatMap(u -> col.insertOne(Document.parse(toJson(u)))).then(); } <dependency> <groupId>org.mongodb</groupId> <artifactId>mongodb-driver- reactivestreams</artifactId> <version>1.2.0</version> </dependency> pom.xml Repository Publisherを返してくる @Bean MongoDatabase mongoDatabase() { return MongoClients.create().getDatabase("demo"); } MongoConfig
  • 84. 84Copyright©2016 NTT corp. All Rights Reserved. • JavaOne 2016 (9/18-22)にて Non-blocking JDBC に関するセッションがあった • セッション名: • “JDBC Next – A new non-blocking API for connecting to a database” • 発表者はOracleの人で、JDBC Expert Group メンバ • 内容 • 現行JDBCの拡張や置き換えではなく、現行JDBCと選択するも のになる(らしい) • Oracle が Oracle DB のドライバとしてプロトタイプを実装し た(らしい) • 今後はJDBC Expert Group に開発が引き継がれる(らしい) • Java10? (まだまだ未定っぽい) Non-Blocking JDBC API ? ※ https://static.rainfocus.com/oracle/oow16/sess/1461693351182001EmRq/ppt/CONF1578%2020160916.pdf
  • 85. 85Copyright©2016 NTT corp. All Rights Reserved. Future
  • 86. 86Copyright©2016 NTT corp. All Rights Reserved. • M4で対応? (予定は未定) • [SPR-14527] Reactive WebSocket adapter layer • WebSocketHandlerのリアクティブ版 • [SPR-14546] Reactive multipart request support • Multipartリクエストへの対応 • [SPR-14534] Reactive HTTP response based RedirectView • “redirect:”プリフィクスによるリダイレクトに対応 • [SPR-14535] Reactive request and response in SpEL expression within @MVC annotations • Controllerメソッド等に使用するアノテーションのSpEL内で、 requestやresponseを参照できるようにする • @PathVariable, @RequestParam, @RequestHeader, etc • など… Future
  • 87. 87Copyright©2016 NTT corp. All Rights Reserved. Enjoy Reactive !!
  • 88. 88Copyright©2016 NTT corp. All Rights Reserved. • Spring Framework Reference Documentation 5.0.0 M3 - 23. Web Reactive Framework • http://docs .s pri n g .io/s pri n g -fram ework / docs /5 .0 .0 . M 3/ sprin g -fram ewor k -r efer ence/ ht ml/ web -r eacti v e.html • The Spring Blog • Notes on Reactive Programming Part I: The Reactive Landscape • h t t p s :// s p r i n g .i o / b l o g / 2 0 1 6 / 0 6 / 0 7 / n o t e s - o n - r e a c t i v e - p r o g r a m m i n g - p a r t - i - t h e - r e a c t i v e - l a n d s c a p e • Notes on Reactive Programming Part II: Writing Some Code • h t t p s :// s p r i n g .i o / b l o g / 2 0 1 6 / 0 6 / 1 3 / n o t e s - o n - r e a c t i v e - p r o g r a m m i n g - p a r t - i i - w r i t i n g - s o m e - c o d e • Notes on Reactive Programming Part III: A Simple HTTP Server Application • h t t p s :// s p r i n g .i o / b l o g / 2 0 1 6 / 0 7 / 2 0 / n o t e s - o n - r e a c t i v e - p r o g r a m m i n g - p a r t - i i i - a - s i m p l e - h t t p - s e r v e r - a p p l i c a t i o n • Understandi n g Reactive types • h t t p s :// s p r i n g .i o / b l o g / 2 0 1 6 / 0 4 / 1 9 / u n d e r s t a n d i n g - r e a c t i v e - t y p e s • thymeleafsandbox-springreactive • https://gith u b.com/t h ym el eaf/t hy m elea fsan dbox -sprin g reactiv e • MongoDB Reactive Streams Java Driver • https://mong odb.gi th u b.i o/ mon g o -java -driv er -rea ctiv estr eams / • Database async driver • Postgres-as yn c -dri ver • h t t p s :// g i t h u b . c o m /a l a i s i / p o s t g r e s - a s y n c - d r i v e r • PostgreSQL and MySQL async driver • h t t p s :// g i t h u b . c o m /m a u r i c i o / p o s t g r e s q l - a s y n c • サンプルコード • Spring-r eacti ve -pla yg rou n d (Spring Web Reactive と Mongo/ Cou ch ebas e/Post gr eSql のサンプル) • h t t p s :// g i t h u b . c o m /s d e l e u z e / s p r i n g - r e a c t i v e - p l a y g r o u n d • スライド • Imperative to Reactive Web Applications • h t t p ://w w w .s l i d e s h a r e . n e t / S p r i n g C e n t r a l /i m p e r a t i v e - t o - r e a c t i v e - w e b - a p p l i c a t i o n s • Reactive Webアプリケ ーシ ョ ン - そしてSprin g 5へ • h t t p :// w w w .s l i d e s h a r e . n e t / m a k i n g x / r e a c t i v e - w e b - s p r i n g - 5 - j j u g c c c - c c c e f 3 • Servlet 3.1 Async I/O • h t t p :// w w w .s l i d e s h a r e . n e t / S i m o n e B o r d e t / s e r v l e t - 3 1 - a s y n c - i o • JDBC Next – A new non-blocki n g API for connectin g to a database • h t t p s :// s t a t i c . r a i n f o c u s . c o m /o r a c l e / o o w 1 6 / s e s s / 1 4 6 1 6 9 3 3 5 1 1 8 2 0 0 1 E m R q /p p t /C O N F 1 5 7 8 % 2 0 2 0 1 6 0 9 1 6 .p d f • 書籍 • パーフェクトJava EE • h t t p:// g i h y o . j p /b o o k / 2 0 1 6 / 9 7 8 - 4 - 7 7 4 1 - 8 3 1 6 - 9 • Spring徹底入 門 Spring Framework に よる Javaア プ リケ ー ショ ン 開発 • h t t p:// w w w .s h o e i s h a . c o . j p / b o o k / d e t a i l /9 7 8 4 7 9 8 1 4 2 4 7 0 • 論文 • Guido Salvaneschi , Mira Mezini. 2016. Debuggin g reactive programming with reactive inspector . • Engineer Bainomugisha , Andoni Lombide Carreton , Tom Van Cutsem, Stijn Mostinckx , Wolfgang De Meuter. 2013. A survey on reactive programming . • Gérard Berry. 1989. Real Time Programming: Special Purpose or General Purpose Languages. • Antony Courtney . 2001.Frappé : Functional Reactive Programming in Java . 参考文献