読者です 読者をやめる 読者になる 読者になる

関数型プログラマのための Rx 入門(後編)

Programming Reactive Scala Java

前編では、Reactive Extensions (Rx) の機能を関数型プログラミングの視点で読み解いた。続いて後編では、前編で紹介した Rx が関数型的な機能を提供している背景、つまり Observable と他の一般的なコンテナの関係に対してスポットライトを当ててみたい。

あらかじめ断っておくと、本編の話題は、実際に Rx を使う上で理解している必要は(あまり)ない。とりあえず、

  1. Observable は、ListFuture と同じくモナドの一種である
  2. 以下の表に出てくるコンテナは、隣同士で互いによく似た(あるいは正反対の)性質を持っている:
単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

…という話だけ記憶に留めてもらえば、ここで回れ右してもオーケー。とはいえ、興味のある人はこの先に目を通しておくと、今後同じような非同期データストリームのライブラリを使ったり、あるいは自分で作るときに役に立つ、かもしれない。

後編では、まず Observable が様々な型クラスのインスタンスであることを説明する。続いて、Observable が Iterable の「双対」の関係にあることや、Observable と Future の関係を紹介する。

Observable は (モナド|モノイド|...) である

flatMap メソッドがあることからも分かる通り、Observableモナドインスタンスだ。

Observable が実際にモナドを満たすことの証明は、@everpeace さんが ScalaCheck によるテストを公開している。*1

just(x).flatMap(f)      === f(x) // (1) 左単位元律
o.flatMap(just)         === o    // (2) 右単位元律
o.flatMap(f).flatMap(g) === o.flatMap(x => f(x).flatMap(g)) // (3) 結合律

また、モナド以外についても、同様に Observable に対する様々な型クラスやモナド変換子を Scalaz で実装した rxscalaz が公開されている。例えば、rxscalaz には Observable に対する Monoid 型クラスの実装がある。すなわち、Observable は二項演算 ++ に関してモノイドになっている。

implicit def observableMonoid[A] = new Monoid[Observable[A]] {
  override def zero: Observable[A] = Observable.empty
  override def append(f1: Observable[A], f2: => Observable[A]): Observable[A] = f1 ++ f2
}

rxscalaz は他にも、型クラス MonadPlus, Traverse, Foldable などや、モナド変換子 ObservableT の実装を提供している。

ところで、Haskell や Scalaz の経験がある人にとっては言うまでもないが、ListFuture もこれらの型クラスのインスタンスだ。例えば、List, Future そして Observable はすべてモナドインスタンスになっている。

前回の記事で「Observable は List や Future と共通した性質を持つ」という話を繰り返し取り上げたが、これはつまり「Observable と◯◯はどちらも (モナド|モノイド|...) のインスタンスである」という話だったのだ、ということが分かると思う。

pull モデルと push モデルの双対性

前編で、Observable はイベントストリームの一種だと述べた。では、Observable は IterableFuture のような他の種類のコンテナと何が同じで、何が違うのだろうか。

ここで、コンテナ同士を共通の議論の土台の上で比較するために、各コンテナの性質を一つの関数として書き表すという抽象化の操作を施してみる。すると…?

コンテナの四象

様々なコレクション(コンテナ)をユースケース別に整理した表を以下に示す:

単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

このようにコンテナは、T(あるいは失敗の文脈を付与した Try[T])や Iterable[T] のように、ユーザが値を同期的に pull するコンテナと、Future[T]Observable[T] のように、ユーザの用意したコールバックに値を非同期に push するコンテナの二種類に整理できる。

また、別の軸で見ると、さらに単一の値を格納するコンテナ複数の値を格納するコンテナの二つに分けることができる。

この表を眺めると、縦方向や横方向に隣り合ったコンテナ同士には何か共通の性質がありそうに思える。しかし、当然ながらこれらは API やセマンティクスが互いに異なるので、単純な比較はできない。

この課題を、冒頭で述べた「コンテナの性質を一つの関数に書き表す」というシンプル化の手法(トリック)を導入することで解決してみよう。

Iterable から Observable をつくる

例として、縦方向に隣り合っている IterableObservable を比較してみよう。

まず、Iterable の API を確認する。Iterable から値を取り出すには、iterator() メソッドIterator を生成する必要がある。次いで、生成した Iterator に対して next() メソッドを次々に呼び出すことで値を取得する。

Iterable#iterator: () => Iterator[T]
Iterator#next:     () => T

これを単純化して一つの高階関数に書き直すと、型シグネチャは以下のようになる:

   () => (() => T) // Iterable[T]

ここで、この関数の「矢印をひっくり返した」関数を作ってみよう(無引数は Unit に書き直している):

   (T => Unit) => Unit // ← 反転!
// ~~~~~~~~~~~
//  callback

この操作によって作られた関数は何を意味するのか。関数の型シグネチャを読み下すと、「引数に渡されたコールバック関数に値を渡して実行する高階関数に相当することに気付く。つまり、この関数は Observablesubscribe メソッドと本質的に同じものなのだ:

trait Observable[T] {
  def subscribe(onNext: (T) ⇒ Unit): Subscription
}

結果として、IteratorAPI を関数の形に単純化してひっくり返すだけで ObservableAPI が導出できた。つまり、pull モデルのコンテナAPIpush モデルのコンテナAPI はある種の対称性を持っていることが分かる。

Erik Meijer は、Coursera の講義において、この対称性を指してIterableObservable数学的双対 (mathematical dual) である」と呼んでいる。一見すると異なる API を持つコンテナ同士が、実は対称的な関係にあって互いに導出可能である、というわけだ。

エラー状態に対応する

さて、前節では話を簡単にするために正常系 (onNext) のみを議論した。そのため、onCompleted イベントや onError イベントの存在を無視している。*2

なので、次の話に進む前に、Iterable から関数を作る際にエラー状態の文脈を加えることで、同様に「エラー状態に対応した Observable」を導出しておく。

先ほど、Iterator から値を取り出す際に next() メソッドを呼び出すと述べたが、実際にコードを書くときは、要素列の末尾を判定するため、以下のように hasNext() メソッドを組み合わせるはずだ:

val it = iterable.iterator
while (it.hasNext) {
  val a = it.next()
  ...
}

ここで、「hasNext()true の時は値が得られるが、false の場合は得られない」ことを表すため、先ほど作った関数の戻り値を Option で囲う。また、next() は例外を投げることがあるので戻り値をさらに Try(後述)で囲う。

このようにエラー状態をエンコードしたバージョンの Iterable は、以下のような高階関数として表せる:

() => (() => Try[Option[T]])     // Iterable[T]

同様にこれを反転すると、エラー状態を表現する能力を持った Observable の関数が導出される:

(Try[Option[T]] => Unit) => Unit // Observable[T]

なお Try[T] は、演算が「成功した場合」「失敗した場合」の二つの文脈を表す代数的データ型だ。

sealed abstract class Try[+T]
case class Success[+T](value: T) extends Try[T]
case class Failure[+T](exception: Throwable) extends Try[T]

したがって、Observable に渡されるコールバックが受け取るイベントは、以下の三種類のうちのいずれかになる:

  • Success[Some[T]]
  • Success[None]
  • Failure

ここで、Success[None] を Observable の完了イベントをエンコードしたものと解釈すれば、コールバックは、それぞれ onNext, onCompleted, onError イベントを表す代数的データ型を引数にとる関数になっている。

これは、実際の subscribeAPI とも一致している:

def subscribe(onNext: (T) ⇒ Unit, onError: (Throwable) ⇒ Unit, onCompleted: () ⇒ Unit): Subscription
def subscribe(observer: Observer[T]): Subscription
// trait Observer[T] {
//   def onNext(value: T): Unit
//   def onError(error: Throwable): Unit
//   def onComplete(): Unit
// }

ところで、もし subscribeAPI が導出した関数の通りになっているなら、イベントハンドラはこんな風に書けるだろう:

o.subscribe {
  case Success(Some(a)) => ...
  case Failure(t) => ...
  case Success(None) => ...
}

もちろん、実際にはこんなパターンマッチはできないが、一方で Rx には Notification という代数的データ型が含まれている。

sealed trait Notification[+T]
object Notification {
  case class OnNext[+T](value: T) extends Notification[T]
  case class OnError[+T](error: Throwable) extends Notification[T]
  case object OnCompleted extends Notification[Nothing]
}

Notification を使うには、Observable インスタンスmaterialize: Observable[Notification[T]] メソッドを使って、通常のストリームを Notification のストリームへ変換する必要がある。

f:id:okapies:20150310022337p:plain

Notification をパターンマッチするコードはこのように書くことができ、先ほど Iterable の双対として導出した API と一致していることが分かると思う。

o.materialize.foreach {
  case OnNext(a) => ...
  case OnError(t) => ...
  case OnCompleted => ...
}

Future と Observable の関係

前節では、コンテナの表を縦方向(同期と非同期)を比較すると双対になっていることを示した。

単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

では、縦方向ではなく横方向の関係(単一と複数)はどうだろうか? 単一の値を格納する Future と、複数の値を格納する Observable を比べてみよう。

例によって、FutureObservable を関数に書き表して比較してみる。Future API の関数への書き直しは onComplete[U](f: (Try[T]) => U): Unit メソッドがそのまま使える(戻り値の U は単純化して Unit にしている):

(Try[T] => Unit) => Unit // Future[T]

これを先ほど Observable の関数と比較してみると、二つは非常によく似た型シグネチャを持つことが分かる:

(Try[       T ] => Unit) => Unit // Future[T]
(Try[Option[T]] => Unit) => Unit // Observable[T]

一方で、型シグネチャを読み下すと、Future のコールバックは2種類のイベント

  • Success[T]
  • Failure

を受け取るのに対して、Observable のコールバックは3種類のイベント

  • Success[Some[T]]
  • Success[None]
  • Failure

を受け取るようになっている。このように二つは似通っているが、Observable だけが完了 (Success[None]) イベントに相当するものを持っている。

これは、単一の値を格納する、つまり値が一つやってきた時点で自動的に「完了」する Future に対して、複数の値を格納する Observable はデータが無限に流れてくる可能性があり、ストリームの終端をシグナルする明示的な完了イベントが必要であることに対応していると考えることができる。

そして、この観察は Observable の実際の挙動とも一致している。RxScala では、Observable の from 関数で FutureObservable に変換できるが、その実装を見ると、Future の成功イベント (Success) が Observable のデータイベントと完了イベントの組 (onNext + onComplete) にマッピングされている。

def from[T](f: Future[T]): Observable[T] = {
  val s = AsyncSubject[T]()
  f.onComplete {
    case Failure(e) => s.onError(e)
    case Success(c) => s.onNext(c); s.onCompleted()
  }
  s
}

f:id:okapies:20150304003357p:plain

以上の議論から、単純化すると Observable複数の値の処理に対応した Future(あるいはその逆)であり、本質的なセマンティクスに大きな差はないことが分かる。逆に言えば、この点をどう扱うかが、非同期処理のプログラミングにおいて Rx を特徴付ける大きなポイントになっていると考えられる。

まとめ

本編では、Observable と、IterableFuture のような他の種類のコンテナとの間には密接な関係があることを示した。

いちおう補足しておくと、これは Observable が本当に Iterable の双対として作られたとか、そういう話ではない。言うまでもなく両者は独立に開発されたものだし、また前編で見たように、実際には各コンテナはそれぞれのユースケースに応じた「肉付け」をしており、それがライブラリの最終的な使いやすさを決めるからだ。

一方で、それらのライブラリの背景に見い出される基本的な、根本的なアイデアを理解しておくことはやはり大切だと思う。なぜなら、誰かが作ったライブラリを使う際に、あるいは自分で作る際に、様々な選択肢の中から一つを選び取る際の確かな判断基準になるからだ。

OSS 全盛の昨今、一つのユースケースに対して様々なライブラリが提案されることが増えている。一方で、パッと見にはそれらの違いがよく分からずに、何となく取っ付きやすくて人気のありそうなものを選んで失敗したり、先人の積み上げてきた体系やノウハウを無視して表層だけ真似たものを作った結果、仕様レベルで欠陥のあるものを作ってしまうといったことが増えているように思う。

では、どうすれば良いのか。個々のライブラリの使い方を丸暗記することも大事だが、同時にプログラミングにおける「良いプラクティス」を体系的に学ぶ機会を作るべきだと思う。みなさんもご存知の通り、そのような体系はいくつか提案されていて、オブジェクト指向におけるデザイン・パターン、ソフトウェア設計におけるドメイン駆動開発 (DDD)、あるいはこの記事でフィーチャーした関数型プログラミングといったものがある。

前編でも述べたように、関数型プログラミングが提唱する様々な原則(第一級関数、参照透過性、モナド、…)は、組み立て可能性 (composability) の高いビルディング・ブロックの作り方を体系的に整理したものだと考えられる。Rx は、その原則に従うことで、関数型プログラミングが持つメリットの多くを受け継ぐことに成功している。

また、Rx は、関数型が提供する機能に加えて、非同期データストリームのプログラミングにおいて必要となる数々の機能を提供している。Rx をよく調べることで、今後出てくる非同期データストリームのライブラリが従うべき「良いプラクティス」が見えてくると思う。

参考文献

*1:下記の式に出てくる === は等号っぽいふわっとした何かということで許してください…。justdef just[A](a: A) = rx.lang.scala.Observable.just(a) のつもり。

*2:Coursera の講義では最初からエラー状態を織り込んだ形で議論している。この記事では、話を分かりやすくするためにこの解説記事を参考にして構成した。