Reactive Streams が 1.0.0 になった

以前に紹介した Reactive Streams 仕様が 1.0.0 になりました。リリース文はこのへん

okapies.hateblo.jp

以下、Twitter に書いた感想をぺたぺた。途中で言及してる「つらみ」ってのはこのへんの議論とか瀬良さんのこの記事の話ですね。改めて読み返すと「解消される」ってのは言い過ぎだった(他にも課題はいっぱいある)けど、まぁ一つ障壁が取り除かれつつあるよなーという。

ReactiveX と「普通のやつらの上を行け」の意外な関係

これは「関数型プログラマのための Rx 入門」の補足記事です(タイトル変えた)。

前編後編とお送りしてきたこの記事だが、特に後編について「何を言ってるのか分からん」というコメントを何人かの方から頂いた。…なんというか、ごめんなさい

繰り返しになるが、Rx を使う上で関数型プログラミングの知識は必ずしも必要ではないし、むしろ(関数型のコンセプトが基礎にあるのに関わらず)知らなくても使えるようになっている。ライブラリの作者たちは「過度な抽象化は害になる」ということを弁えているのだろう。

しかし、Rx と関数型プログラミングの関係を把握しておくと、非同期データストリームのビルディング・ブロックの作り方について大いに視野が広がるだろう。もし、貴方がこの記事の前提となる「関数型」のパラダイムに興味をお持ちなら、まずは関数プログラミング実践入門」をお勧めしたい。

本の内容そのものは Haskell を前提にしているが、関数型の重要なコンセプトが一通り紹介されているので、今回の記事で出てきたキーワード(高階関数、代数的データ型、モナド、…)が属する世界観を概観するのに良いと思う。

また、このテーマに本気で取り組みたい初学者の方には、つい先日に発売されたばかりのScala関数型デザイン&プログラミング」を併せてお勧めしたい。

この本は、かねてより国内外で高い評価を得ている "Functional Programming in Scala" の日本語訳になる。ざっと見た感じ非常に「歯ごたえがある」感じだが、「関数型でプログラムを組み上げる方法」を基礎から丁寧に解説しており、演習問題も充実しているので、一冊読み通すとかなり力がつくのではないかと思う。

以下、後編を書いた後に気付いた話について少し補足。題して「Rx と『普通のやつらの上を行け』の意外な関係」

Observable の由来

後編で延々と書いたように、Reactive Extensions (Rx) の ObservableIterable の双対になっている。

() => (() => T)     // Iterable[T]
(T => Unit) => Unit // Observable[T]

では、形式的な説明はそれでいいとして*1、実際のところ Observable のアイデアはどこから来たのだろうか?

Erik Meijer の以下の投稿によれば、Observable は Rx の前身である Volta プロジェクト*2で非同期呼び出しをうまく扱う方法を探している時に見出したものだという:

We started working on IObservable/IObserver a long time ago when we were trying to make asynchronous calls that arose from tier-splitting palatable. Initially we used just the continuation monad but then discovered the beautiful duality with IEnumerable and IEnumerator.

F# vs. Rx: Msdn forums - Reactive Extensions (Rx)

そして、これは継続モナド (continuation monad) から着想を得たものであるらしい。*3

class Cont[R, +A](val runCont: (A => R) => R) { ... }

継続渡し形式の関数 (CPS function)

上記の継続モナド Cont が保持している関数 runCont: (A => R) => R継続渡し形式 (Continuation Passing Style; CPS) の関数という名前で呼ばれている。そして、後編で導出した Observable を表す「引数に渡されたコールバック関数に値を渡して実行する高階関数」も同様に CPS 関数だ(RUnit を適用してみよう)。

(A => R   ) => R    // runCont
(T => Unit) => Unit // Observable

継続渡しの「継続」とは、ここでは CPS 関数に渡されるコールバックを指している。CPS の詳しい説明は下記に挙げたページを見てほしいのだが、簡単に言うと「関数を呼び出して、戻ってきたら『続きの処理』を実行する」代わりに「関数に『続きの処理』を渡して呼び出し、その関数の最後で実行してもらう」というやり方だ。この「続きの処理」を継続と呼ぶ。

継続渡しで「普通のやつらの上を行け」

一般に「継続」は扱いの難しいプログラミングコンセプトとされていて、あまり積極的に活用されることがない(と思う)。では、なぜ非同期呼び出しの文脈で継続が出てくるのかというと、「処理をある所で中断してコンテキストを保存し、続きの処理が再開されるときに受け渡す」というパターンが CPS の考え方にバッチリはまるからだろう。以下、「なんでも継続」から引用:

ポイントは、外部の処理を呼びたいのだが、呼び出して戻り値を受け取るという形式が使えないケースにある。

例えばユーザインタフェースだ。処理の途中でユーザーに何か入力を促し、その結果を使って処理を続けたいことは良くある。しかし多くのGUIプログラミングでは、ユーザーの入力を受け付けるためには一度GUIのイベントループに戻らなければならない。したがって、プログラマは 処理をユーザーの入力の前にやる処理Aとユーザーの入力の後にやる処理Bに分けて、

 1. 処理Aの最後に入力ウィンドウをポップアップし、イベントループに戻る

 2. 入力ウィンドウの "OK" ボタンが押されるイベントが発生した時に 処理Bが呼ばれるようにする。

という具合にコーディングしているはずだ。この時、まさに処理Bは処理Aの「継続」なのだ。(Webアプリケーションにも全く同じ原理が使えることを指摘しておこう。 ユーザーからの入力が必要になった時、Webアプリケーションは一度入力フォームを吐き出してhttpサーバに制御を戻さなければならない。「普通のやつらの上を行け」Paul Grahamが述べているYahoo! Storeの システムはまさにこの技術を実装している。

つまり、歴史的にも意味的にも、Observable はまさにユースケースを非同期データストリームに絞って扱いやすくした継続モナドだということになる。

アカデミックな知識が MUST になる時代?

この「なんでも継続」の記事は(ブクマを見る限り)少なくとも 2006 年頃からあって、私も何度か目を通していた。のだが、正直なところ、今回記事を書くために読み返すまで UI や Web アプリへの応用という話は完全に忘れていた。

ぶっちゃけると、00 年代後半(?)にウェブ界隈で何度か不動点コンビネータの話がバズっていた頃、Yコンビネータの話題に関連してこの記事を読んでいるはずなのだが、当時は「いかにもギークの好きそうな頭の体操」として受け止めていて、応用の可能性についてはろくに考えが及んでいなかったと思う。

とかく、こうしたアカデミックな形式知は「小難しくて実用性がない」として軽んじられがちだ。しかし、関数型プログラミングしかり、こうして積み上げられてきた知見が実践的な問題に取り組むためのフレームワークとして活用されるケースは増えているし、海外の新しい OSS の動向を見るに、今後ますます増えていくだろうという感想を持っている。

全てのソフトウェア技術者がこうした方面の知識を習得すべきだとは思わない。しかし、個人的な感想としては、それぞれの現場で新技術の開発や選定に関わるリーダーや、エバンジェリストを自認するオピニオンリーダーにとって、このような学問的知識の習得が MUST になる時代はそう遠くない、という予感は日々強くなっている。

*1:ちなみに、Meijer は Rx の公開前にアップされた紹介動画の時点で既に双対性についての議論を披露している。この双対おじさん、筋金入りである…。

*2:Rx の電気ウナギのアイコンは、Volta のものをそのまま引き継いでいるらしい。

*3:Scala でのコード例は kmizu さんのコードから引用。

関数型プログラマのための Rx 入門(後編)

前編では、Reactive Extensions (Rx) の機能を関数型プログラミングの視点で読み解いた。続いて後編では、前編で紹介した Rx が関数型的な機能を提供している背景、つまり Observable と他の一般的なコンテナの関係に対してスポットライトを当ててみたい。

あらかじめ断っておくと、本編の話題は、実際に Rx を使う上で理解している必要は(あまり)ない。とりあえず、

  1. Observable は、ListFuture と同じくモナドの一種である
  2. 以下の表に出てくるコンテナは、隣同士で互いによく似た(あるいは正反対の)性質を持っている:
単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

…という話だけ記憶に留めてもらえば、ここで回れ右してもオーケー。とはいえ、興味のある人はこの先に目を通しておくと、今後同じような非同期データストリームのライブラリを使ったり、あるいは自分で作るときに役に立つ、かもしれない。

後編では、まず Observable が様々な型クラスのインスタンスであることを説明する。続いて、Observable が Iterable の「双対」の関係にあることや、Observable と Future の関係を紹介する。

Observable は (モナド|モノイド|...) である

flatMap メソッドがあることからも分かる通り、Observableモナドインスタンスだ。

Observable が実際にモナドを満たすことの証明は、@everpeace さんが ScalaCheck によるテストを公開している。*1

just(x).flatMap(f)      === f(x) // (1) 左単位元律
o.flatMap(just)         === o    // (2) 右単位元律
o.flatMap(f).flatMap(g) === o.flatMap(x => f(x).flatMap(g)) // (3) 結合律

また、モナド以外についても、同様に Observable に対する様々な型クラスやモナド変換子を Scalaz で実装した rxscalaz が公開されている。例えば、rxscalaz には Observable に対する Monoid 型クラスの実装がある。すなわち、Observable は二項演算 ++ に関してモノイドになっている。

implicit def observableMonoid[A] = new Monoid[Observable[A]] {
  override def zero: Observable[A] = Observable.empty
  override def append(f1: Observable[A], f2: => Observable[A]): Observable[A] = f1 ++ f2
}

rxscalaz は他にも、型クラス MonadPlus, Traverse, Foldable などや、モナド変換子 ObservableT の実装を提供している。

ところで、Haskell や Scalaz の経験がある人にとっては言うまでもないが、ListFuture もこれらの型クラスのインスタンスだ。例えば、List, Future そして Observable はすべてモナドインスタンスになっている。

前回の記事で「Observable は List や Future と共通した性質を持つ」という話を繰り返し取り上げたが、これはつまり「Observable と◯◯はどちらも (モナド|モノイド|...) のインスタンスである」という話だったのだ、ということが分かると思う。

pull モデルと push モデルの双対性

前編で、Observable はイベントストリームの一種だと述べた。では、Observable は IterableFuture のような他の種類のコンテナと何が同じで、何が違うのだろうか。

ここで、コンテナ同士を共通の議論の土台の上で比較するために、各コンテナの性質を一つの関数として書き表すという抽象化の操作を施してみる。すると…?

コンテナの四象

様々なコレクション(コンテナ)をユースケース別に整理した表を以下に示す:

単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

このようにコンテナは、T(あるいは失敗の文脈を付与した Try[T])や Iterable[T] のように、ユーザが値を同期的に pull するコンテナと、Future[T]Observable[T] のように、ユーザの用意したコールバックに値を非同期に push するコンテナの二種類に整理できる。

また、別の軸で見ると、さらに単一の値を格納するコンテナ複数の値を格納するコンテナの二つに分けることができる。

この表を眺めると、縦方向や横方向に隣り合ったコンテナ同士には何か共通の性質がありそうに思える。しかし、当然ながらこれらは API やセマンティクスが互いに異なるので、単純な比較はできない。

この課題を、冒頭で述べた「コンテナの性質を一つの関数に書き表す」というシンプル化の手法(トリック)を導入することで解決してみよう。

Iterable から Observable をつくる

例として、縦方向に隣り合っている IterableObservable を比較してみよう。

まず、Iterable の API を確認する。Iterable から値を取り出すには、iterator() メソッドIterator を生成する必要がある。次いで、生成した Iterator に対して next() メソッドを次々に呼び出すことで値を取得する。

Iterable#iterator: () => Iterator[T]
Iterator#next:     () => T

これを単純化して一つの高階関数に書き直すと、型シグネチャは以下のようになる:

   () => (() => T) // Iterable[T]

ここで、この関数の「矢印をひっくり返した」関数を作ってみよう(無引数は Unit に書き直している):

   (T => Unit) => Unit // ← 反転!
// ~~~~~~~~~~~
//  callback

この操作によって作られた関数は何を意味するのか。関数の型シグネチャを読み下すと、「引数に渡されたコールバック関数に値を渡して実行する高階関数に相当することに気付く。つまり、この関数は Observablesubscribe メソッドと本質的に同じものなのだ:

trait Observable[T] {
  def subscribe(onNext: (T) ⇒ Unit): Subscription
}

結果として、IteratorAPI を関数の形に単純化してひっくり返すだけで ObservableAPI が導出できた。つまり、pull モデルのコンテナAPIpush モデルのコンテナAPI はある種の対称性を持っていることが分かる。

Erik Meijer は、Coursera の講義において、この対称性を指してIterableObservable数学的双対 (mathematical dual) である」と呼んでいる。一見すると異なる API を持つコンテナ同士が、実は対称的な関係にあって互いに導出可能である、というわけだ。

エラー状態に対応する

さて、前節では話を簡単にするために正常系 (onNext) のみを議論した。そのため、onCompleted イベントや onError イベントの存在を無視している。*2

なので、次の話に進む前に、Iterable から関数を作る際にエラー状態の文脈を加えることで、同様に「エラー状態に対応した Observable」を導出しておく。

先ほど、Iterator から値を取り出す際に next() メソッドを呼び出すと述べたが、実際にコードを書くときは、要素列の末尾を判定するため、以下のように hasNext() メソッドを組み合わせるはずだ:

val it = iterable.iterator
while (it.hasNext) {
  val a = it.next()
  ...
}

ここで、「hasNext()true の時は値が得られるが、false の場合は得られない」ことを表すため、先ほど作った関数の戻り値を Option で囲う。また、next() は例外を投げることがあるので戻り値をさらに Try(後述)で囲う。

このようにエラー状態をエンコードしたバージョンの Iterable は、以下のような高階関数として表せる:

() => (() => Try[Option[T]])     // Iterable[T]

同様にこれを反転すると、エラー状態を表現する能力を持った Observable の関数が導出される:

(Try[Option[T]] => Unit) => Unit // Observable[T]

なお Try[T] は、演算が「成功した場合」「失敗した場合」の二つの文脈を表す代数的データ型だ。

sealed abstract class Try[+T]
case class Success[+T](value: T) extends Try[T]
case class Failure[+T](exception: Throwable) extends Try[T]

したがって、Observable に渡されるコールバックが受け取るイベントは、以下の三種類のうちのいずれかになる:

  • Success[Some[T]]
  • Success[None]
  • Failure

ここで、Success[None] を Observable の完了イベントをエンコードしたものと解釈すれば、コールバックは、それぞれ onNext, onCompleted, onError イベントを表す代数的データ型を引数にとる関数になっている。

これは、実際の subscribeAPI とも一致している:

def subscribe(onNext: (T) ⇒ Unit, onError: (Throwable) ⇒ Unit, onCompleted: () ⇒ Unit): Subscription
def subscribe(observer: Observer[T]): Subscription
// trait Observer[T] {
//   def onNext(value: T): Unit
//   def onError(error: Throwable): Unit
//   def onComplete(): Unit
// }

ところで、もし subscribeAPI が導出した関数の通りになっているなら、イベントハンドラはこんな風に書けるだろう:

o.subscribe {
  case Success(Some(a)) => ...
  case Failure(t) => ...
  case Success(None) => ...
}

もちろん、実際にはこんなパターンマッチはできないが、一方で Rx には Notification という代数的データ型が含まれている。

sealed trait Notification[+T]
object Notification {
  case class OnNext[+T](value: T) extends Notification[T]
  case class OnError[+T](error: Throwable) extends Notification[T]
  case object OnCompleted extends Notification[Nothing]
}

Notification を使うには、Observable インスタンスmaterialize: Observable[Notification[T]] メソッドを使って、通常のストリームを Notification のストリームへ変換する必要がある。

f:id:okapies:20150310022337p:plain

Notification をパターンマッチするコードはこのように書くことができ、先ほど Iterable の双対として導出した API と一致していることが分かると思う。

o.materialize.foreach {
  case OnNext(a) => ...
  case OnError(t) => ...
  case OnCompleted => ...
}

Future と Observable の関係

前節では、コンテナの表を縦方向(同期と非同期)を比較すると双対になっていることを示した。

単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

では、縦方向ではなく横方向の関係(単一と複数)はどうだろうか? 単一の値を格納する Future と、複数の値を格納する Observable を比べてみよう。

例によって、FutureObservable を関数に書き表して比較してみる。Future API の関数への書き直しは onComplete[U](f: (Try[T]) => U): Unit メソッドがそのまま使える(戻り値の U は単純化して Unit にしている):

(Try[T] => Unit) => Unit // Future[T]

これを先ほど Observable の関数と比較してみると、二つは非常によく似た型シグネチャを持つことが分かる:

(Try[       T ] => Unit) => Unit // Future[T]
(Try[Option[T]] => Unit) => Unit // Observable[T]

一方で、型シグネチャを読み下すと、Future のコールバックは2種類のイベント

  • Success[T]
  • Failure

を受け取るのに対して、Observable のコールバックは3種類のイベント

  • Success[Some[T]]
  • Success[None]
  • Failure

を受け取るようになっている。このように二つは似通っているが、Observable だけが完了 (Success[None]) イベントに相当するものを持っている。

これは、単一の値を格納する、つまり値が一つやってきた時点で自動的に「完了」する Future に対して、複数の値を格納する Observable はデータが無限に流れてくる可能性があり、ストリームの終端をシグナルする明示的な完了イベントが必要であることに対応していると考えることができる。

そして、この観察は Observable の実際の挙動とも一致している。RxScala では、Observable の from 関数で FutureObservable に変換できるが、その実装を見ると、Future の成功イベント (Success) が Observable のデータイベントと完了イベントの組 (onNext + onComplete) にマッピングされている。

def from[T](f: Future[T]): Observable[T] = {
  val s = AsyncSubject[T]()
  f.onComplete {
    case Failure(e) => s.onError(e)
    case Success(c) => s.onNext(c); s.onCompleted()
  }
  s
}

f:id:okapies:20150304003357p:plain

以上の議論から、単純化すると Observable複数の値の処理に対応した Future(あるいはその逆)であり、本質的なセマンティクスに大きな差はないことが分かる。逆に言えば、この点をどう扱うかが、非同期処理のプログラミングにおいて Rx を特徴付ける大きなポイントになっていると考えられる。

まとめ

本編では、Observable と、IterableFuture のような他の種類のコンテナとの間には密接な関係があることを示した。

いちおう補足しておくと、これは Observable が本当に Iterable の双対として作られたとか、そういう話ではない。言うまでもなく両者は独立に開発されたものだし、また前編で見たように、実際には各コンテナはそれぞれのユースケースに応じた「肉付け」をしており、それがライブラリの最終的な使いやすさを決めるからだ。

一方で、それらのライブラリの背景に見い出される基本的な、根本的なアイデアを理解しておくことはやはり大切だと思う。なぜなら、誰かが作ったライブラリを使う際に、あるいは自分で作る際に、様々な選択肢の中から一つを選び取る際の確かな判断基準になるからだ。

OSS 全盛の昨今、一つのユースケースに対して様々なライブラリが提案されることが増えている。一方で、パッと見にはそれらの違いがよく分からずに、何となく取っ付きやすくて人気のありそうなものを選んで失敗したり、先人の積み上げてきた体系やノウハウを無視して表層だけ真似たものを作った結果、仕様レベルで欠陥のあるものを作ってしまうといったことが増えているように思う。

では、どうすれば良いのか。個々のライブラリの使い方を丸暗記することも大事だが、同時にプログラミングにおける「良いプラクティス」を体系的に学ぶ機会を作るべきだと思う。みなさんもご存知の通り、そのような体系はいくつか提案されていて、オブジェクト指向におけるデザイン・パターン、ソフトウェア設計におけるドメイン駆動開発 (DDD)、あるいはこの記事でフィーチャーした関数型プログラミングといったものがある。

前編でも述べたように、関数型プログラミングが提唱する様々な原則(第一級関数、参照透過性、モナド、…)は、組み立て可能性 (composability) の高いビルディング・ブロックの作り方を体系的に整理したものだと考えられる。Rx は、その原則に従うことで、関数型プログラミングが持つメリットの多くを受け継ぐことに成功している。

また、Rx は、関数型が提供する機能に加えて、非同期データストリームのプログラミングにおいて必要となる数々の機能を提供している。Rx をよく調べることで、今後出てくる非同期データストリームのライブラリが従うべき「良いプラクティス」が見えてくると思う。

参考文献

*1:下記の式に出てくる === は等号っぽいふわっとした何かということで許してください…。justdef just[A](a: A) = rx.lang.scala.Observable.just(a) のつもり。

*2:Coursera の講義では最初からエラー状態を織り込んだ形で議論している。この記事では、話を分かりやすくするためにこの解説記事を参考にして構成した。

関数型プログラマのための Rx 入門(前編)

概要

『Observable は単なる非同期データストリームにおけるモナドインスタンスだよ。何か問題でも?』

まともな概要

つまり、Reactive Extensions (Rx) って何だ?

ということでウェブをガサゴソと漁っていたところ、オンライン講義サービス CourseraPrinciples of Reactive Programming に行き当たった。この講座では、Rx の主要開発者の一人である「双対おじさん」こと Erik Meijer 氏自らが一部の章を担当し、Rx の理論的側面を講義している。

この講座の大きな特徴は、Rx を(命令型プログラミングではなく)関数型プログラミング (FP) の側から解き明かしていくことにある。

こう書くと奇をてらっているように見えるかもしれないが、実際には Rx は FRP (Functional Reactive Programming) のバリエーションの一つとされており、Rx を関数型プログラミングの一応用として説明するのはさほど不思議なことではない。

では、あるフレームワークを関数型のパラダイムに則って作るメリットは何だろうか? 一つ挙げるとすれば、それは少数のシンプルな概念の組み合わせで多数の具体例を作り出せることだと思う。

関数型プログラミングには、組み立て可能性 (composability) の高いビルディング・ブロックの作り方について多くの知見が蓄積されている。フレームワークは、関数型のイディオムに沿った API を提供することで、個別のユースケースごとに実装を用意することなく、関数を組み合わせるだけで様々なユースケースに対応できるようになる。

これは、フレームワークを使う側にとっても同じことが言える。フレームワークの使い方を調べる時に、すでに使いどころがよく知られている定番の関数と、その背景にある抽象的な概念をそのまま当てはめられるからだ。

例えば、Rx の機能の中核を担う Observable には実に 200 以上のメソッドがあり、これらの使い方を一個一個まじめに暗記していては日が暮れてしまう。しかし、これらのメソッドを「関数型の眼」で分析してみると、その多くが、お馴染みの高階関数のバリエーションであることに気付くはずだ。

実際に、私はこの講座を視聴してやっと Rx の使い方や設計思想が理解できるようになった。

記事の構成

この記事では、関数型プログラミングについて基本的な知識があることを前提に、Coursera の Meijer 氏の講義内容をベースに Rx を紹介する。

この記事は二つのパートに分かれている。前編では、Rx の API関数型プログラミングの観点から読み解く方法を紹介した後、実際に部品となる関数を組み立てて非同期イベント処理を実装してみる。そして後編では、Rx の理論的側面について少し突っ込んだ議論を紹介していく。

追記: 補足記事(”ReactiveX と「普通のやつらの上を行け」の意外な関係”)を書きました。

また、この記事では Rx を個別のユースケースに当てはめる方法については簡単な紹介に止める。より具体的な例に興味がある方には、id:ninjinkun 氏が翻訳されたあなたが求めていたリアクティブプログラミング入門がとても素晴らしい記事なのでぜひ一読してほしい。

ちなみに、ついさっき気付いたが、元ネタの Coursera の講座が 4/13 から一年半ぶりに第二期を開講!するようだ。ぜひ、この機会に受講してみてはいかがだろうか。

Reactive Extensions の概要

公式サイトでは、Rx を以下のように説明している:

Reactive Extensions (Rx) は、観測可能 (observable) なシーケンスと LINQ スタイルのクエリ演算子を使って、非同期なイベントベースのプログラムを合成するライブラリです。Rx を使うと、開発者は非同期データストリームを Observable で表し、非同期データストリームに対するクエリに LINQ 演算子を使い、非同期ストリームの並行性を Scheduler でパラメタ化します。簡単に言えば、Rx = Observable + LINQ + Scheduler です。

非同期処理を扱うライブラリというと FuturePromise を思い浮かべる人も多いと思う。しかし、Future/Promise が単一の非同期イベントを一つずつ処理するモデルなのに対し、Rx の Observable(時間や順序のある)複数イベントのストリームを扱う処理を対象としている点が異なる。

データストリームの具体例としては、デスクトップアプリにおいては「マウスイベント」、ウェブサービスにおいては「株価情報」や Twitter の「タイムライン」などが分かりやすいだろう。

Rx の APIGoF の Observer パターンを踏襲している。つまり、観測対象 (Observable) のイベントを観測者 (Observer) が購読 (subscribe) するという形式をとる:

trait Observable[T] {
  def subscribe(observer: Observer[T]): Subscription
}

Observable から Observer に通知されるイベントは三種類ある。これらは、Observer のコールバックメソッドとして実装する:

trait Observer[T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
  def onComplete(): Unit
}

すなわち、Observable に新しいデータが来るたびに onNext が呼ばれる。そして、ストリームが終了 (terminate) する時に onComplete が呼ばれる。一方で、エラーが起きると onError が呼ばれる。

Rx では、onCompleteonError のどちらかが発生した時点でストリームは終了し、これ以後はイベントは発生しない。逆に言うと、完了イベントやエラーイベントが起きない限りは無限ストリームになる。また、onNext逐次的 (sequential) に呼ばれる。つまり、並行には呼ばれないので競合状態 (race condition) を気にする必要がない。

また、Observable が終了する前でも Observer を明示的に購読解除 (unsubscribe) できる。購読を解除するには、subscribe した時に返される Subscriptionunsubscribe() を呼び出す。

trait Subscription {
  def unsubscribe(): Unit
}

LINQ関数型プログラミング

ところで Observable には、subscribe の他にも mapflatMap といった関数型プログラミングでおなじみの関数も多数用意されている:

def map[R](func: (T) => R): Observable[R]
def flatMap[R](f: (T) => Observable[R]): Observable[R]
def foreach(onNext: (T) => Unit): Unit
def filter(predicate: (T) => Boolean): Observable[T]
def take(n: Int): Observable[T]
def takeWhile(predicate: (T) => Boolean): Observable[T]
def toList: Observable[List[T]]
def zip[U](that: Observable[U]): Observable[(T, U)]

なぜ、ここにこんなものがあるんだろう?

"Rx = Observable + LINQ + Scheduler" だったことを思い出してほしい。オリジナルの .NET 版では、Observable を LINQ で操作できるように 標準クエリ演算子の実装を提供している。上記の関数は RxScalaAPI だが、これは標準クエリ演算子Scala のコレクション操作で用いられる高階関数へと置き換えたものだ。

なぜ置き換えられるのか? LINQ はコレクションやデータベースに対して SQL 風のクエリ式を書けるようにするための機能だが、その下にある標準クエリ演算子は、関数型言語のコレクションライブラリが提供する高階関数と実質的に同じものだからだ。例えば、標準クエリ演算子Selectmap 関数に対応する(LINQScala の対応は @eed3si9n さんの「Scala脳のための C# LINQ」が参考になる)。

従来の Observer パターンの考え方からすると、高階関数の導入は唐突に写るかもしれない。しかし、実のところ Observable は一種のストリーム(無限リスト)とみなせるので、関数型ライブラリのコレクション操作関数と非常に相性が良い。

Observer パターンの語彙と関数型の語彙が同じものを指している場合もある。例えば、Observable の foreach メソッドのドキュメントや実装を見てみると「foreachsubscribeエイリアスだ」と明記されている。どちらも「イベントが通知されるたびに何か処理をする」メソッドだからだ。

def subscribe(onNext: (T) ⇒ Unit): Subscription
def foreach(onNext: (T) => Unit): Unit

一方で違いもある。例えば Observable は無限ストリームなので foldRight は定義されてない。また、非同期コレクションなので、foldLeftreduce は、集約された値をそのまま返す代わりに Observable に包んで返すようになっている。

def foldLeft[R](initialValue: R)(accumulator: (R, T) ⇒ R): Observable[R]
def reduce[U >: T](accumulator: (U, U) ⇒ U): Observable[U]

また、非同期データストリームのユースケースに対応するために、switchcombineLatest といったメソッドが数多く追加されている。

しかし、もし未知の関数が出てきてもさほど恐れる必要はない。関数の型シグネチャとマーブルダイアグラム(下図)に注目すれば、どんな機能か把握するのはさほど難しくないからだ。

f:id:okapies:20150224023017p:plain(以下、マーブルダイアグラムは RxJava の Javadoc からの引用)

確かに、ObservableAPI を真正面から読み解こうとすると、大量のメソッド「Observable シーケンスの各要素を Observable シーケンスの新たなシーケンスへ射影し…」のような宇宙語に当惑すること必至だ。けれども、視点を変えてこれらのメソッドを関数型の機能として捉えなおしてみると、一転して強い一貫性が見えてくるはずだ。

Rx の「糊」

有名な「なぜ関数プログラミングは重要か」の中で、John Hughes は「プログラミング言語にとって高階関数と遅延評価はモジュールを貼り合わせる新しい糊である」という趣旨のことを述べている。

そんなわけで、非同期データストリーム処理の「糊」である Observable高階関数の実例をいくつか見てみよう。

  • map は Observable の各要素に「データを別のデータに変換する」関数を適用して新しい Observable を作る
def map[R](func: (T) => R): Observable[R]

f:id:okapies:20150224023017p:plain

  • flatMap は、同様に「データを Observable に変換する関数」を適用して入れ子の Observable を作り、最後にマージする(念の為に付け加えると、flatMapmapflatten を組み合わせた関数だ)
def flatMap[R](f: (T) ⇒ Observable[R]): Observable[R]

f:id:okapies:20150304003356p:plain

  • groupBy は、やはり要素に関数を適用してキーを出力し、そのキーごとに要素をグルーピングした Observable を作る(戻り値型が Observable[(K, Observable[T])] になっている)
def groupBy[K](f: (T) ⇒ K): Observable[(K, Observable[T])]

f:id:okapies:20150304003358p:plain

ところで、以上で紹介した関数のマーブルダイアグラムを見ると、データだけでなく完了 (onComplete) イベントを表す「|」も、ちゃんと新しい Observable に写し取られているのが分かる。

以前の節でも見たように、Observable には onNext, onComplete, onError という三つのイベントがある。そして、Observable の高階関数は、データだけでなく全てのイベントを出力先の Observable へ自然なやり方で写す。このとき、イベント間の制約条件(一度 onComplete になったら以後 onNext は発生しない等)も引き継がれる。このため、map 等の高階関数に渡す関数を書く時に、Observable の内部状態をいちいち気にする必要がない。

以上から、Observable が提供する高階関数ListFuture が提供するものと単に型シグネチャが同じであるだけでなく、同様のセマンティクスを持つ機能として扱える。

このため、List や Future と同様に、これらの高階関数は Observable と無関係に作った好きな関数同士を繋ぎ合わせる「糊」として使うことができる:

tweets.filter(t => t.userName == "okapies").map(t => t.text)

また、RxScala では FutureObservable に変換する関数 from が用意されている。これも、Future と Observable がよく似たセマンティクスを持っており、互換性を持っている証拠と言えるだろう。

def from[T](f: Future[T])(implicit execContext: ExecutionContext): Observable[T]

f:id:okapies:20150304003357p:plain

ただし、Observable には「時間と順序」の両方が関わってくるので、List や Future とは異なるセマンティクスを持つ関数も存在する。その代表例が concatflatten だ。こうした関数の時間に関する挙動を調べるときは、API ドキュメントに載っているマーブルダイアグラムが重要な情報源になる。

この点については、後の節で「二つ以上の Observable をマージする」ケースを検討する際に詳しく見ていくことにする。

スケジューラ (Scheduler)

Scala 標準の FutureExecutionContext を受け取るように、ObservableScheduler を受け取るオプションを持っている。これが "Rx = Observable + LINQ + Scheduler" の三つ目だ。

object Future {
  def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T]
}

trait Observable[T] {
  def observeOn(scheduler: Scheduler): Observable[T]
}

どちらもタスクの(非同期)実行を制御するための機能だが、Future は単一の値を返したらそこで完了するのに対して、Observable は複数の値を返し続ける。また、タスクの実行戦略を選んだり、処理の途中で購読解除 (Subscription.unsubscribe()) できる必要がある。

Scheduler は、これらの機能をサポートするために ExecutionContext よりも複雑な仕組みになっている。

実際に使う Scheduler は、自分で実装しなくてもフレームワーク側である程度用意されている。例えば、タスクを現在のスレッド上で実行する immediate や、CPU-bound や IO-bound なタスクのための computationio などがある。

(注: RxJava の Scheduler は 1.0 に上がる時にリファクタリングされたため、Coursera 講座(第一期)での Scheduler の説明は少し内容が古くなっている。ちなみに、チケットを見ると、Meijer (@headinthebox) 氏ご本人も議論に参加していたりする)

Observable のマージ

Rx の API を調べると複数の Observable を一つの Observable に合流(マージ)する」という似たようなメソッドを、アルゴリズムを変えて何通りも提供していることに気付く。これは、それぞれの Observable に含まれるイベントの時刻が重なっている場合、それらをマージする方法や順序の決め方には色々な考え方があって一意には決まらないからだ。

例えば Observable は、List のような通常のコレクションと同様に concatflatten という二つのメソッドを提供する。どちらも「入れ子になった Observable (Observable[Observable[U]]) を一重の Observable に畳みこむ」関数だ。*1

def concat[U]: Observable[U]
def flatten[U]: Observable[U]

ところで、List ではこれらの関数のアルゴリズムconcat = flatten だが、Observable ではそれぞれ挙動が異なる。

concat は、入れ子に入っている Observable 同士の順番が維持されるようにマージする。つまり、二番目の Observable に含まれるイベントは、必ず一番目の Observable のイベントの後ろに置かれる。

一つ目の Observable が完了するまでの間、二つ目の Observable に来たイベントはバッファされ続けることに注意しよう。つまり、場合によってはメモリがオーバーフローする可能性がある。

f:id:okapies:20150223233540p:plain

一方 flatten(あるいは merge)は、イベントの発生タイミングが維持されるようにマージする。つまり、一番目の Observable に含まれるイベントの途中に、二番目の Observable のイベントが挿入されることがある。

f:id:okapies:20150223233541p:plain

このように、「二つの Observable を一つにマージする」という簡単な操作でも二つの実現方法があるので、ユースケースに応じて使い分ける必要がある。

この他にも、マージ系のメソッドには switchamb のような「複数の Observable のうち一つを採用して、他の Observable を捨てる」ものや、zipcombineLatest のように「Observable から来るデータが揃った時にペアにして出力する」ものがある。

様々なサイトに掲載されている具体例を見ると、このマージ機能をうまく組み合わせることが、Rx を活用する際の一つのポイントとなるようだ。

関数を組み立てて非同期処理を作る

この記事の冒頭で、関数型で考えるメリットは「少数のシンプルな概念の組み合わせで多数の具体例を作り出せること」だと述べた。

これを確かめるために、これまでに紹介した関数を組み立てて、実際に非同期イベント処理を実装してみよう。ここでは、Coursera 講座に出てくる Rx のコーディング例を使って説明する。

国ごとの地震データストリーム

まず、非同期データストリームのソースとして、アメリカ地質調査所 (USGS) が提供する API から全世界の地震データのストリームを取得する関数 usgs と、逆ジオコーディングサービスに問い合わせて地理座標を国名に変換する関数 reverseGeocode が使えるとしよう。

以下の関数シグネチャを見ると分かるように、どちらもウェブサービスへの非同期な問い合わせなので、戻り値の型が ObservableFuture になっている。

def usgs(): Observable[EarthQuake] = { ... }
def reverseGeocode(c: GeoCoodinate): Future[Country] = { ... }

では、この二つの関数を組み合わせて、国ごとの地震情報のストリームを作ってみよう。

まず、usgs から取得した地震データと reverseGeocode から取得した国名を map で貼り合わせて、(地震データ, 国) を要素とするストリーム withCountry を作る。reverseGeocode への問い合わせには、usgs から取り出した地震データに含まれる地理座標の情報が必要なので、コードは以下のようになる:

val withCountry: Observable[Observable[(EarthQuake, Country)]] =
  usgs().map { quake =>
    val country: Future[Country] = reverseGeocode(q.location)
    Observable.from(country.map(country => (quake, country)))
  }

ここで withCountry の戻り型を見ると Observable が入れ子になっていることに気付く。reverseGeocodeusgs と同様に非同期関数なので、新しい地震データがやってくる度に map に渡したクロージャの中で結果待ち (Future[Country]) しているからだ。このままでは扱いにくいので、flatten で入れ子の Observable に含まれるイベントを一列にマージする:

val merged: Observable[(EarthQuake, Country)] = withCountry.flatten()

これで (地震データ, 国) を要素とするストリーム merged が得られたので、最後に国をキーにして groupBy でグルーピングする:

val byCountry: Observable[(Country, Observable[(EarthQuake, Country)])] =
  merged.groupBy { case (quake, country) => country }

これで、国ごとにタグ付けされた地震データが得られるようになった。このように mapflattengroupBy といった糊を使って関数を組み立て、byCountry という具体的な関数を作ることができた。

マージ戦略を取り替える

しかし、この実装には一つ問題がある。最終的に国ごとにストリームされる地震データの順番が、ユーザが望む通りになっていない可能性があるのだ。

つまり、入れ子の Observable をマージする時に flatten を使っているので、地震データの順番が「地震の発生順」ではなく「reverseGeocode の結果が返ってきた順」になってしまう。この挙動はエンドユーザへの速報のようなユースケースでは問題ないかもしれないが、地震データを時系列順に解析したいような場合は問題だろう。

どうすればいいか? 答えは簡単で、単に merged 関数の flattenconcat に取り替えればいい。これで、望み通りに発生順の地震データが出力される(既に書いたように concat はメモリがオーバーフローする可能性があるので注意しよう)。

val merged: Observable[(EarthQuake, Country)] = withCountry.concat()

以上のように、シンプルな関数を組み合わせて作ったアプリケーションは、関数を部分的に取り替えるのも容易であり、結果として異なるユースケースにも素早く対応できることが示せたと思う。

まとめ

前編では、Rx を関数型プログラミングの視点で読み解いていくことで、API の習得やユースケースへの適用が容易になることを示した。

続いて後編では、Observable単なるモナドインスタンスだよ?という話や、ObservableIterableFuture との関係といった議論を紹介していきたい。

*1:flatten は処理系によっては単体では提供されない。C# では .SelectMany(a => a) と同等。

マイクロサービスのための Tumblr 製フレームワーク "Colossus"

この記事は Scala Advent Calendar 2014 の 15 日目です。昨日は id:qtamaki さんの”「関数プログラミング 珠玉のアルゴリズムデザイン」をScalaで実装してみる”でした。

今日は、先日に TumblrOSS 化を発表した Scala 製のノンブロッキング I/O (NIO) フレームワーク "Colossus" を紹介したい。”高性能なマイクロサービスを構築するためのフレームワークを謳っており、まだ OSS 化されて日が浅いものの Tumblr ではすでに production で使われているとされる。また、Colossus 自体がアクターフレームワーク Akka のアクターとして実装されており、それを使った独自のスレッドモデルを提供している点も興味深い。

基本的なコンセプト

Tumblr が Colossus を開発した狙いについて、公式サイトの冒頭でこのように述べている。

Colossus は、ノンブロッキングネットワーク I/O を必要とする高性能なアプリケーション構築のための Scala 製軽量フレームワークである。Colossus は特に、低レイテンシのステートレスなマイクロサービス(たいていはデータベースやキャッシュを抽象化したものと大きく変わらない)に焦点を当てている。Colossus は、こうしたユースケースにおいて性能を最大化すると共に、インタフェースをクリーンかつ簡潔に保つことを狙っている。

つまり…どういうことだってばよ?

マイクロサービス・アーキテクチャでは、コンポーネント化された軽量サービス同士を連携させてアプリケーションを組み立てる。つまり、多くのマイクロサービスのビジネスロジックはこのようになっているはずだ:

  1. クライアントからコネクションが張られてリクエストが来る。
  2. 他のバックエンドサービス(データベースとか)にリクエストを飛ばしてレスポンスを待つ。
  3. バックエンドサービスから戻ってきたレスポンスを使ってレスポンスを組み立ててクライアントに返す。

Colossus を使って書くとこんな感じ(Colossus のドキュメントより)。

Service.serve[Http]("http-service", 9000){ context =>
  val redis = context.clientFor[Redis]("localhost", 6379)
  context.handle{ connection => 
    connection.become {
      case request @ Get on Root / "get" / key => redis.send(Commands.Get(key)).map{
        case BulkReply(data) => request.ok(data.utf8String)
        case NilReply => request.notFound("(nil)")
      }
      ...
    }
  }
}

このように、非同期計算を抽象化した型である FuturemapflatMap をつないでいくアプローチは、Scala プログラマにとって見慣れたものだ。

ところで、Colossus において redis.send の戻り値は Callback であり Scala 標準ライブラリTwitter 製ライブラリに含まれているような Future ではない。

CallbackFuture と同様のインタフェースを持つ(し、実際に Future に変換することもできる)が、スレッドセーフではない。これは、Callback は単一のワーカスレッド上で実行されることを前提にしているからだ(あと、Callback を呼び出す準備ができた際に execute() メソッドを明示的に呼び出す必要がある。この呼び出しは通常はフレームワーク側がやってくれる)。

これは、冒頭に挙げたようなマイクロサービスのユースケースを前提にするなら、基本的にはスレッド間で状態を共有する必要がないからだ。また、データベース接続などの複数のコネクション間で共有したい状態については、イベントループごとに保持してコネクション間で共有すればよい(一つのイベントループは多数のコネクションを処理するが、シングルスレッドなのでイベントループ内では競合を心配する必要がない。また、全てはノンブロッキングに処理されるので、あるコネクションがデータベースからの応答を待っている間に、後続のコネクションがブロックすることはない)。

Future を使う場合、多くの実装はマルチスレッド動作のために ExecutionContext 内で処理を実行するが、これによりわずかだがオーバヘッドが発生する。このオーバヘッドが問題になるようなリクエストの処理においては、Colossus では代わりにシングルスレッドで動作する Callback を使うことでレイテンシを削減できる。これは、リクエストのサイズが小さくステートレスな場合に特に有効だと言える。

アーキテクチャ

Colossus の Core Layer は Akka 上で動作するアクターであり、例えばイベントループは下記のような”自分にメッセージを送るアクター”として実装されている(実際のコードはこのあたりにある)。

receive message {
  case `Select` => {
    event_handlers = selector.select()
    ...
    self ! `Select`
  }
  ...
}

ドキュメントから読み取れるアーキテクチャを図示するとこんな感じか:

f:id:okapies:20141215011648j:plain

Colossus 全体を駆動するのが IOSystem で、作成時に ActorSystem と関連付けて指定した数の worker(イベントループ)を作成する。なお、例えばアプリケーションが複数の独立したサーバを実行するような場合は、一つの ActorSystem に複数の IOSystem を関連付けてもよい。

implicit val actor_system = ActorSystem("system")
val io_system = IOSystem("io-system", numWorkers = 4)

IOSystem は、コネクションがやってくるとそれを各 Worker に振り分ける。各 Worker はアクターであり、ひとたび割り当てられたコネクションはずっと同じ Worker に束縛され続ける。つまり、Worker 内のイベントループが呼び出すイベントハンドラはシングルスレッドで動作することが保証される。

では、イベントループ間やコネクション間で状態を共有したい場合や、ブロックするような処理を実行したい場合はどうするか? その場合は、アクターと Future を使ってイベントループの外側で処理すればよい。

プログラミングモデル

サーバのビジネスロジックの実装方法は、関数型っぽい書き方とオブジェクト指向っぽい書き方の二通りがある。まずは関数型っぽい方から:

Service.serve[Http]("service-name", 456){context: ServiceContext[Http] => 
  //everything here happens once per event-loop
  context.handle{ connecton: ConnectionContext[Http] => 
    //everything here happens once per connection
    connection.become {
      //partial function HttpRequest => Response[HttpResponse]
      case req => req.ok("Hello world!")
    }
  }
}

この記法は、実際には下記のような ConnectionHandler の実装と同じことをやっている(ServiceServer は ConnectionHandler の継承クラス):

import com.tumblr.colossus._
import protocols.Telnet._

class HelloWorldHandler(config: ServiceConfig, worker: WorkerRef) 
  extends ServiceServer[HttpRequest, HttpResponse](new HttpServerCodec, config, worker) {

  def processRequest(request: HttpRequest): Response[HttpResponse] = {
    req.ok("Hello World!")
  }

  def processFailure(request: HttpRequest, reason: Throwable) = {
    request.error(s"Error: $reason")
  }
}

テストサーバ、メトリクス、タスク

時間がないので下記を読んでください…。個人的には、最初のリリースからテスト関係や計測関係のユーティリティが含まれているのが素晴らしいなと思う。

まとめ

ドキュメントには他にも色々と紹介したい話が書いてあるんだけど、全然時間が足りなかった…。もしこれを読んで興味が湧いたら、読んでみたりコードを触ってみたりしてください。あと、Issue を見てるとまだまだ成熟してない感じなので、逆に言えば貢献のチャンスかも。

ところで、Scala 製のマイクロサービス向けフレームワークといえば、真っ先に Finagle の名前が思い浮かぶ。以前のエントリでも書いたように、TumblrScala を採用するに至った大きな理由の一つに Finagle の存在がある。

では、ここに至って Tumblr が独自のフレームワークの実装を始めたのはなぜか。特に、性能については折り紙付きの Netty の代わりに Java NIO + Akka で実装しているのは何故なのか。

一つは上で説明したように、特定のユースケースにおける性能を追求していく中で Future の”リッチな”スレッドモデルがオーバヘッドになったこと。そしてもう一つは、明示はされていないが、性能を追求して下のレイヤに手を入れていく際に、Netty の低レベル操作を中心とした API や複雑なスレッドモデルと密結合した Finagle が扱いにくかった、ということがあるのではないかと思う。例えば、Colossus のコンセプトとして掲げられている四つの原則の中にこんな項がある:

Keep it transparent - Choose how much magic you want! Colossus is built in several layers which allow you to build high-level abstractions without being walled off from taking full control.

実際、Finagle の Netty 3 依存について、2014 年度中としていた Netty 4 移行計画が 2015 2Q にズレこむなど、Finagle が Netty に対して相当に密結合している様子が見てとれる。

その一方で、大規模並列処理を実現するためのイベント駆動なミドルウェアやアプリケーションを構築する上で、Akka が使われるケースがかなり広がってきている。例えば、今回紹介した Colossus の他にも、ストレステストツールGatling の実装に Akka が使われているのは有名かと思う。

また、Akka 自身も Netty に相当するレイヤを Akka I/O として再実装すると共に、大規模ストリームをうまく扱うための背圧制御 (back-pressure) の仕組みを加えて "Reactive Stream" として標準化したりと、低レイヤについても手を打ってきている*1

結果として、分散並列化したいネットワーク I/O からビジネスロジックまでを、アクターモデルという同じ抽象の上で統一的に扱えるのが Akka というランタイムの強みとなってきており、そのような扱いやすさがミドルウェアやアプリケーションの作者から支持を集めつつあるのだとすれば、Netty 陣営の”苦境”は Akka 陣営にとってはチャンスなのかもしれない。

*1:念の為に書いておくと、Colossus 自身は今のところ Akka I/O は使っていない。ただし、似たような背圧制御の仕組みを独自で持っているので、この部分を移行するという話もあり得なくもなさそうな…?

"The Reactive Manifesto v2.0" 日本語訳

はじめに

いつの間にか "The Reactive Manifesto" のバージョンが上がって v2.0 になっていたので、さっくりと翻訳。従前よりかなりコンパクトになっている。マニフェストに署名したい方は、公式サイトの一番下の "Sign the manifesto" をクリックしてください。

v1.0 の日本語訳は id:kimito_k さんがこちらで公開されています。

追記【2015/03/16】: 公式サイトに掲載されました

追記【2014/12/27】: 公式へ Pull Request してマージしてもらいました。最新版は以下をご覧ください。

v2.0 での変更点

v2.0 になって変わった点についてはこのへんこのへんに記事がある。

リライトの結果として、リアクティブマニフェストが最終的に実現したい価値は”システムの即応性を保ち続けること”であり、そのために”耐障害性””弾力性”という二つの非機能が必要であり、それら三つを下支えするのが”メッセージ駆動”アーキテクチャである、という論理構成が明確になったと思う。

で、v1.0 で書かれていた細かい話はどこに行ったかというと、用語集という形で別ページに集約されている。

あと、特に明示されてないけど、”リアクティブアプリケーション (reactive application)”という呼称が全て取り除かれて”リアクティブシステム (reactive systems)”に置き換わっている。従来からある概念である”リアクティブプログラミング (reactive programming)”と混同しやすい、という批判に配慮した形なのかな。

The Reactive Manifesto v2.0

異なる分野で活動する組織が、同じようなソフトウェア構築のパターンを独立に発見している。このようなシステムはより堅牢で、より耐障害性があり、より柔軟で、より最新の要求を反映しやすくなっている。

こうした変化が起きているのは、近年、アプリケーションの要求が著しく変化してきているからだ。ほんの数年前、巨大アプリケーションは数十のサーバから構成され、数秒の応答時間と数時間のオフラインメンテナンスを許容し、データは数ギガバイトだった。今日のアプリケーションは、モバイル機器から数千のマルチコアプロセッサによって動作するクラウドベースのクラスタまで、あらゆる機器上に配備される。ユーザはミリ秒の応答時間と 100% の稼働率を期待する。データはペタバイト単位で測定される。昨日のソフトウェアアーキテクチャは、今日の要求を全く満たしていない。

求められているのは、システムアーキテクチャに対する明快なアプローチであると我々は考える。そして、必要な側面の全ては既に独立に認識されている: 求めるものは、即応性と、耐障害性と、弾力性と、メッセージ駆動とを備えたシステムだ。我々はこれをリアクティブシステム (Reactive Systems) と呼ぶ。

リアクティブシステムとして構築されたシステムはより柔軟で、疎結合で、スケーラビリティがある。これによって開発が容易になるだけでなく、変更を受け入れやすくなる。これらは障害に対してより著しい耐性を持ち、たとえ障害が起きても災害を起こすことなく優雅に対処する。リアクティブシステムは高い即応性を持ち、ユーザに対して効果的な対話型フィードバックを与える。

リアクティブシステムとは:

  • 即応性 (Responsive): システムは可能な限りすみやかに応答する。即応性とは使い勝手と実用性の基盤だが、しかしそれだけではなく、問題が素早く検出され効果的に対処できることを意味する。即応性のあるシステムは、迅速で、かつ一貫した応答時間を提供することに主眼を置く。システムは応答時間に信頼性のある上限を確立し、一貫した品質のサービスを供給する。この一貫した挙動によってエラー処理が単純化され、エンドユーザの信頼を醸成し、さらなる相互作用を促す。
  • 耐障害性 (Resilient): システムは障害に直面しても即応性を保ち続ける。これが当てはまるのは高可用性のミッションクリティカルシステムだけではない — 耐障害性を持たないシステムは障害が起きると即応性を失う。耐障害性は、レプリケーション、封じ込め、隔離、そして委譲によって実現される。障害はそれぞれのコンポーネントに封じ込められ、コンポーネントは互いに隔離されるので、システムが部分的に故障してもシステム全体を危険に晒すことなしに回復することが保証される。各々のコンポーネントの回復処理は(外部の)他のコンポーネントに委譲され、また必要な場合はレプリケーションによって高可用性を保証する。コンポーネントのクライアントはコンポーネントの障害への対処に苦しめられることがなくなる。
  • 弾力性 (Elastic): システムはワークロードが変動しても即応性を保ち続ける。リアクティブシステムは、入力の提供に割り当てるリソースを増加あるいは減少させることで入力量の変化に反応する。これは、システムの中に競合する場所や中心的なボトルネックが存在しないように設計し、シャーディングしたりレプリケーションしたコンポーネント間に入力を分散させることを意味する。リアクティブシステムは関連するライブな性能測定を提供することで、予測的かつリアクティブなスケーリングアルゴリズムをサポートする。これらは、コモディティなハードウェアとソフトウェアプラットフォーム上で費用対効果の高い弾力性を実現する。
  • メッセージ駆動 (Message Driven): リアクティブシステムは非同期メッセージパッシングに依ってコンポーネント間の境界を確立する。これによって、疎結合性、隔離性、位置透過性を保証すると共に、エラーをメッセージとして委譲する手段を確保する。明示的なメッセージパッシングは負荷の管理と弾力性を可能とする。また、システム内にメッセージキューを作成して監視し、必要ならバックプレッシャーを適用することでフロー制御が可能になる。通信の手段として位置透過なメッセージングを使うことで、通信がクラスタを跨ぐ場合も単一のホスト内の場合も、同じ構成とセマンティクスで障害を管理できる。ノンブロッキング通信により、受信側はアクティブ時のみリソースを消費できるのでシステムのオーバヘッドを抑制できる。

大きなシステムはより小さなシステムからできているので、故にそれらの構成要素のリアクティブな性質に依存する。リアクティブシステムは設計原則を適用してリアクティブな性質をあらゆる規模で適用し、それら構成要素を合成できるようにする。世界最大級のシステムは、これらの性質に基づくアーキテクチャに依存することで数十億の人々のニーズを日々満たしている。こうした設計原則をその都度再発見するのをやめて、最初から自覚的に適用する時だ。

マイクロサービスが Scala を選ぶ3つの理由

 今年も開催される Scala Advent Calendar 2014 の 15 日目にエントリーしていて、ネタとしては先日 Tumblr が発表した "I/O and Microservice library for Scala" を謳う Colossus をやる予定なんだけど、前振りとして「なぜマイクロサービス化を進めるサービスは Scala を選ぶのか」という話をしてみるエントリ。ちなみに、Advent Calendar の前振りと書いたけど、とりあえず Scala をあまり知らない人向け。



そもそもマイクロサービスって何だっけ?

この記事とかよくまとまってると思います。

マイクロサービスへの移行と Scala

 成功を収めたウェブサービスが、自身のビジネスを持続的にスケールさせるため、巨大化・複雑化したモノリシックなアプリケーションを解体し、単機能のコンポーネントであるマイクロサービスからなるサービス指向アーキテクチャ (SOA) へと移行する…。近年、こんなストーリーを耳にする機会が多い。

 この際、ランタイムをスクリプト言語から JVM へ置き換えたり、特に使用言語として Scala を採用するケースが目立つ。以下は一例だが、名だたる有名サービスが Scala を使ったマイクロサービス化に取り組んでいることがわかる:

なぜ Scala が選ばれるのか?

 なぜ、マイクロサービス化で Scala が選ばれるのか? Scala 移行の事例を見ると、次の三つのポイントが指摘されることが多い。

  1. JVM 言語である
  2. 非同期 RPC フレームワーク "Finagle" の存在
  3. 静的型付き言語である

 以下に一つずつ見て行こう。

1. JVM 言語である

We were enamored by the level of performance that the JVM gave us. It wasn’t going to be easy to get our performance, reliability, and efficiency goals out of the Ruby VM, so we embarked on writing code to be run on the JVM instead. We estimated that rewriting our codebase could get us > 10x performance improvement, on the same hardware –– and now, today, we push on the order of 10 - 20K requests / sec / host.

New Tweets per second record, and how!

 近年、様々なサービスが、自社サービスの再構築にあたって、大規模なトラフィックに耐える性能・信頼性・効率性の要件を達成するために Java VM (JVM) を選んだと証言している。JVM は、過去 10 年以上にも渡ってサンやオラクル等によって大きな開発リソースが投じられてきたこともあって、高い性能と安定性を実現している。

 つまり、JVM 言語として作られた Scala は、当然その恩恵を受けることができる。

 また I/O 性能についても、JVMVM 内のプログラムから OS の低レベル I/O を直接叩ける NIO (Non-blocking I/O) API を備えている。よって、クライアント−サービス間に加えて、内部サービス間の I/O 性能が特に重要となるマイクロサービス・アーキテクチャとも相性が良い。

 また、採用面で技術者の確保が容易である点もしばしば挙げられる。

Changed to a JVM centric approach for hiring and speed of development reasons.

Tumblr Architecture - 15 Billion Page Views a Month and Harder to Scale than Twitter - High Scalability -

 高負荷環境での JVM の運用は GC(ガベージ・コレクション)との戦いになると言われるが、他の言語ランタイムに比べて、そのあたりのチューニングのノウハウを持った人材を確保しやすいということもあるのかもしれない。

2. Finagle の存在

Finagle was a compelling factor in choosing Scala. It is a library from Twitter. It handles most of the distributed issues like distributed tracing, service discovery, and service registration. You don’t have to implement all this stuff. It just comes for free.

Tumblr Architecture - 15 Billion Page Views a Month and Harder to Scale than Twitter - High Scalability -

 Scala 採用の理由として、Twitter が開発した Scala 製非同期 RPC フレームワーク "Finagle" の存在を挙げるサービスは多い。Finagle の興味深い点はたくさんあるが、マイクロサービスのためのフレームワーク、という観点から言うと以下の三つ(性能、プログラミングモデル、運用ツールとの連携)が挙げられる。

性能

 Finagle は、先に挙げた NIO フレームワークの定番である NettyScala ラッパーであり、高い性能を誇る。また、Twitter 自身が Netty の開発に大きくコミットしており、Finagle の開発とも密接に連携している。

 Finagle の高性能は、”バルス祭り”の大規模トラフィックにも耐えうるシステムの構築に大きく貢献した。

Our new stack has enabled us to reach new records in throughput and as of this writing our record tweets per second is 143,199.

Netty at Twitter with Finagle

プログラミングモデル

 マイクロサービス・アーキテクチャでは、各サービスの実装にあたって、必然的に他の内部サービスに対する非同期 RPC のコーディングが必要になるが、このスタイルには厄介な点がいくつもある。

  • ある RPC の結果を使って次の RPC をリクエストするような逐次処理や、並列にリクエストした複数の RPC の結果が揃うのを待って集約するような並列処理を書こうとすると、どうしてもコードが煩雑になる。
  • リクエストした処理が長時間戻ってこない場合、スレッドをブロックしないようにプログラムする必要がある。
  • あらゆるリモートへのリクエストは、想定外の理由で失敗する可能性がある(ネットワーク障害、リモートホストの障害、等々)。したがって、エラー処理やリトライ、タイムアウト等を考慮したコーディングが必要になる。

 Finagle は、非同期 RPC を FutureService/Filter というインタフェースで抽象化する。これらの API は、オブジェクト指向言語であると同時に関数型言語である Scala の特徴をよく活かしたものになっている。

 FutureService、そして Filter を組み合わせると、例えば、以下のようにマイクロサービスを組み合わせてサービスを作る際に、逐次処理と並列処理が絡み合った複雑な非同期 RPC 処理をクリーンかつシンプル、そして安全に記述できる(引用元)。

  1. 認証サービス (AuthService) に問い合わせて、ユーザ認証を行う
  2. タイムラインサービス(TimelineService) に問い合わせて、指定したユーザの Tweet ID の一覧を受け取る
  3. ツイートサービス (TweetService) に各 ID に対応するツイート本文を並列に問い合わせて、全ての結果が戻ってきたら集約してクライアントに返す
val timelineSvc = Thrift.newIface[TimelineService](...) // #1
val tweetSvc = Thrift.newIface[TweetService](...)
val authSvc = Thrift.newIface[AuthService](...)
  
val authFilter = Filter.mk[Req, AuthReq, Res, Res] { (req, svc) => // #2
  authSvc.authenticate(req) flatMap svc(_)
}
  
val apiService = Service.mk[AuthReq, Res] { req =>
  timelineSvc(req.userId) flatMap { tl =>
    val tweets = tl map tweetSvc.getById(_)
    Future.collect(tweets) map tweetsToJson(_)
  }
} //#3
Http.serve(":80", authFilter andThen apiService) // #4
  
// #1 サービスごとのクライアントを作成する
// #2 入ってくるリクエストを認証する Filter を作成する
// #3 認証されたタイムラインリクエストを JSON に変換して返す service を作成する
// #4 認証 filter と service を使って 80 番ポートで動作する HTTP サーバを開始する

 ここではエラー処理が明示的に書かれていないが、上記の処理のいずれかが失敗した段階(認証が失敗した場合とか、タイムラインサービスへの問い合わせがタイムアウトした場合とか)で処理全体が失敗するようになっている。もちろん、明示的に復旧処理を書くこともできる。

 Service の実装は一般的な HTTP (REST) だけではなく Thrift も使える*1し、MemcachedMySQL、あるいは Redis などのデータベース向けプロトコルも用意されている。

 このように様々なプロトコルService として抽象化しているため、アプリケーション非依存な機能である Filter を様々なプロトコルに対して直交的に適用できる。Filter で追加できる機能には、認証やリトライ、そして後述するモニタリングやトレースといったものがある。

運用ツールとの連携

 RPC ベースの分散システムへ移行する際の厄介事は、運用・監視の面でもたくさんある。例えば、以下のうちいずれを欠いても、多数のノードで構成される複雑なマイクロサービス群の運用やデバッグは難しくなる。

 また、個々の開発者が、担当するマイクロサービスに対してこれらのツールを簡単に組み込める必要がある。

 Finagle には、ZooKeeper や分散トレースシステムの Zipkin 等と連携する機能が最初から組み込まれている。つまり、必要な設定を追加してやるだけで、ZooKeeper を使ってクラスタを組み、リクエスト数などのメトリクスをリアルタイムに監視し、各ノードでリクエストの処理にかかった時間を Zipkin へ自動的に集約して可視化したりできる。

3. 静的型付き言語である

運用が必要なシステムで1万、2万行越えだすと、静的型付けであることによる保守性の高さは、結果的にコンパイル時間等を払拭できるほどの安全性、生産性、心の平安を生むと思っていて、要は静的型付けである事は非常に価値があって、特にテストが難しいテンプレート(twirl)の静的型付けは素晴しいという事を言いたかった。

ScalaMatsuri 2014 で「国技と Scala」というタイトルで発表しました - sandbox

 マイクロサービス化の究極的な目的は、ビジネス要求の変化に合わせて継続的にサービスを更新できる体制を作ることにある。つまり、Scala の静的型付け (static typing) が提供する保守性(≒安心してコードを書き換えられる性質)は、常に変化を必要とするようなシステムでこそ大きなメリットがある。

余談

 後日に紹介予定の Colossus は、TumblrFinagle に相当するものを自分たちの要件に合わせて作ったものだと考えると良さそう。

 また、Finagle が Netty ベースであるのに対し、Colossus は Scala 言語の開発元が作っている Akkaアクターモデルの分散フレームワーク)ベースで、さらに Akka 自身が Netty に相当するレイヤ (Akka I/O) の再実装を進めていたりと、そうした「フレームワーク同士の競争」という野次馬的な面でも興味深い。

*1:Twitter の内部サービス同士のやりとりは主に Thrift を使っているようだ。