ReactiveSocket について
この記事は、Java Advent Calendar 2015 の 22 日目です。前日は、n_slender さんの「PlayFramework 2.4 Java Ebeanでのアプリ開発」でした。
今日の記事では、この半年くらいで仕様と実装が出てきている ReactiveSocket というプロトコル仕様についてお話したいと思います。
なぜ Java Advent Calendar でプロトコルの話を? と訝しがっている方も多いと思いますが、基本的には以下の二つの理由です。
- JEP 266 として JDK 9 に追加される予定の Reactive Streams と密接に関わっている
- Java 製のサーバサイド向けライブラリを多数 OSS 化している Netflix が中心になって仕様策定を行っており、参照実装も JVM 向けが中心
予定ではプロトコルレベルの話にも踏み込んで解説したいと思っていたのですが、プライベートが色々と立て込んでいるため、概要レベルのご紹介になることをお許しください。
ReactiveSocket って何?
ReactiveSocket is an application protocol providing Reactive Streams semantics over an asynchronous, binary boundary.
(ReactiveSocket とは、非同期バイナリ境界をまたいで Reactive Streams のセマンティクスを提供するアプリケーションプロトコルである。)
ざっくり言うと Reactive Streams の考え方をアプリケーションプロトコルのレイヤで実現するための仕様。
そもそもの Reactive Streams とは何か、については以前に書いた記事でも解説しているのでご参照ください。
要点としては、メッセージ駆動のコンポーネント間でメッセージをやり取りするシステムを組んだ際のフロー制御の方法を定めている。
具体的には、送信側が受信側の処理能力を超える量のメッセージを送信してバッファを溢れさせることのないように、受信側から送信側に対して「次は◯個送っていいよ」というフィードバック (back-pressure) を通知することで、過負荷の際に処理能力を超えるメッセージを受信してシステムがクラッシュする事態を回避することを狙っている。詳細な動作については、以下のスライドの図も参考にしてほしい:
特徴
ReactiveSocket の特徴は以下の通り:
メッセージ駆動
(HTTP2 と同様の)非同期なメッセージ駆動であり、全ての通信は、単一のコネクション上に多重化されたメッセージストリームを介して行う。また、これによってレスポンス待ちでブロックすることがなくなる。
相互作用モデル
ReactiveSocket は複数の相互作用 (interaction) モデルをサポートしている。ユースケースごとに適切なモデルを選んで使用することで、性能やユーザ体験に与える影響を向上できる。
また、後述するようにトランスポートに何を使うか (TCP, WebSocket, Aeron, ...) に依存しないので、これらを使ってアプリケーションを実装すれば、性能特性に合わせてトランスポートを入れ替えることもできる。
- Fire-and-Forget(撃ちっぱなし)
- レスポンスが必要ない場合は、これを使うのが一番効率的
Future<Void> completionSignalOfSend = socketClient.fireAndForget(message);
- Request/Response(単一レスポンス)
- 普通のリクエストレスポンス。「レスポンス1個のストリーム」を最適化したものと考えることができる
Future<Payload> response = socketClient.requestResponse(requestPayload);
- Request/Stream(有限個の複数レスポンス)
- 「コレクション」や「リスト」に相当
Publisher<Payload> response = socketClient.requestStream(requestPayload);
- Topic Subscription(無限個の複数レスポンス)
- 「プッシュ通知」や「イベントストリーム」に相当
Publisher<Payload> response = socketClient.requestSubscription(topicSubscription);
- Channel(双方向ストリーム)
- クライアント側から途中でリクエストの条件を変更したりするような場合に用いる
Publisher<Payload> output = socketClient.requestChannel(Publisher<Payload> input);
フロー制御
二つのフロー制御方式をサポートしている。どちらも、トランスポートレイヤではなくアプリケーションレベルの流量制御に焦点を置いている。
一つは、Reactive Streams が仕様化しているような request(n)
の非同期プル。こちらは、リクエスト発行側 (requester) から応答側 (responder) へのキャパシティの通知に使う。
もう一つは ReactiveSocket 独自のリース (leasing) という仕組みで、応答側から発行側へのキャパシティ通知に用いられる。リースは、「規定時間 (TTL) までに◯個まで送ってよし」という形式でリクエストを発行する。これによって、データセンター内のサーバ間通信のようなユースケースで、アプリケーションレベルの負荷分散(クライアント側で、各サーバから通知されたリースの情報を使ってリクエストを分散する)がやりやすくなる。
多言語 (polyglot) サポート
相互作用モデルとフロー制御を言語非依存なプロトコルとして定義しているので、言語を跨いだインタラクションに利用できる(Reactive Streams は JVM 上で動作するミドルウェア同士でしか利用できない)。
様々なトランスポートレイヤをサポート
ReactiveSocket 自体は OSI Layer 5/6 相当のアプリケーションプロトコルであり、TCP 以外にも WebSocket や Aeron (*)、Quic といった様々なトランスポートプロトコルの上に実装できる。
また、ReactiveSocket が定義するアプリケーションレイヤはトランスポートの差異を隠蔽するので、ユースケースに合わせて最適なトランスポートを選ぶことができる。
*: Reactive Manifesto の執筆者の一人である Martin Thompson の会社 Real Logic が開発しているトランスポートプロトコル。元 LMAX の CTO で Disruptor を開発していた御仁、といえば分かる方もいるのでは。
性能
コネクションを使い回すので、コネクションを何度も張り直すような余計な処理を回避できる。また、バイナリプロトコルなので CPU 負荷を削減できる。さらに、フロー制御が組み込まれているので、相手先システムがスローダウンしている時にリトライ地獄を仕掛けてさらに負荷をかけるようなことがない。
同様の課題を解決する仕組みとして、Netflix が自身のマイクロサービス同士のフロー制御に使っている Hystrix があるが、オーバヘッドや複雑さが増すという問題点があった。
なんで HTTP/2 を使わないの?
大雑把に言うと、HTTP/2 は一義的にウェブサイトからドキュメントを取得するブラウザのためのプロトコルで、ReactiveSocket が想定するユースケースに合わないから。
- リクエスト/レスポンスのみで、それ以外の相互作用モデルをうまくサポートできない
- アプリケーションレベルのフロー制御の仕組みがない
- REST は非常に普及しているが、アプリケーションのセマンティクスを定義するのに使うのは非効率で不適切である
対応実装と今後について
コアライブラリとして reactivesocket-java が公開されている。これ自体はプロトコル実装を Reactive Streams API でアクセスできるようにしたもので、実際には以下のような具体的なトランスポートプロトコルの実装でラップして使う:
また、ブラウザや Node.js から使える JavaScript 版の実装も作られている:
- reactivesocket-js
- reactivesocket-js-ws(今のところ空っぽ)
今後…については最近あまり追いかけられてないので分からないです。すいません。とりあえず、Netflix 内部のフロー制御を Hystrix から置き換えていきたいのだとは思われる。具体的な進捗を知ってる人がいたら教えて下さい。あとは、Reactive Streams に参加してる他のベンダー(Typesafe とか)が乗っかるのかどうか(いちおう呼びかけはなされていて、Typesafe の人も関心はあるみたい)。
既に見たように、Reactive Streams 自体は JVM に閉じた仕組みだったところを、アプリケーションレイヤープロトコルとして仕様化することで多言語で活用できる可能性が出てきたわけで、個人的には注目しています。