関数型プログラマのための Rx 入門(前編)

概要

『Observable は単なる非同期データストリームにおけるモナドインスタンスだよ。何か問題でも?』

まともな概要

つまり、Reactive Extensions (Rx) って何だ?

ということでウェブをガサゴソと漁っていたところ、オンライン講義サービス CourseraPrinciples of Reactive Programming に行き当たった。この講座では、Rx の主要開発者の一人である「双対おじさん」こと Erik Meijer 氏自らが一部の章を担当し、Rx の理論的側面を講義している。

この講座の大きな特徴は、Rx を(命令型プログラミングではなく)関数型プログラミング (FP) の側から解き明かしていくことにある。

こう書くと奇をてらっているように見えるかもしれないが、実際には Rx は FRP (Functional Reactive Programming) のバリエーションの一つとされており、Rx を関数型プログラミングの一応用として説明するのはさほど不思議なことではない。

では、あるフレームワークを関数型のパラダイムに則って作るメリットは何だろうか? 一つ挙げるとすれば、それは少数のシンプルな概念の組み合わせで多数の具体例を作り出せることだと思う。

関数型プログラミングには、組み立て可能性 (composability) の高いビルディング・ブロックの作り方について多くの知見が蓄積されている。フレームワークは、関数型のイディオムに沿った API を提供することで、個別のユースケースごとに実装を用意することなく、関数を組み合わせるだけで様々なユースケースに対応できるようになる。

これは、フレームワークを使う側にとっても同じことが言える。フレームワークの使い方を調べる時に、すでに使いどころがよく知られている定番の関数と、その背景にある抽象的な概念をそのまま当てはめられるからだ。

例えば、Rx の機能の中核を担う Observable には実に 200 以上のメソッドがあり、これらの使い方を一個一個まじめに暗記していては日が暮れてしまう。しかし、これらのメソッドを「関数型の眼」で分析してみると、その多くが、お馴染みの高階関数のバリエーションであることに気付くはずだ。

実際に、私はこの講座を視聴してやっと Rx の使い方や設計思想が理解できるようになった。

記事の構成

この記事では、関数型プログラミングについて基本的な知識があることを前提に、Coursera の Meijer 氏の講義内容をベースに Rx を紹介する。

この記事は二つのパートに分かれている。前編では、Rx の API関数型プログラミングの観点から読み解く方法を紹介した後、実際に部品となる関数を組み立てて非同期イベント処理を実装してみる。そして後編では、Rx の理論的側面について少し突っ込んだ議論を紹介していく。

追記: 補足記事(”ReactiveX と「普通のやつらの上を行け」の意外な関係”)を書きました。

また、この記事では Rx を個別のユースケースに当てはめる方法については簡単な紹介に止める。より具体的な例に興味がある方には、id:ninjinkun 氏が翻訳されたあなたが求めていたリアクティブプログラミング入門がとても素晴らしい記事なのでぜひ一読してほしい。

ちなみに、ついさっき気付いたが、元ネタの Coursera の講座が 4/13 から一年半ぶりに第二期を開講!するようだ。ぜひ、この機会に受講してみてはいかがだろうか。

Reactive Extensions の概要

公式サイトでは、Rx を以下のように説明している:

Reactive Extensions (Rx) は、観測可能 (observable) なシーケンスと LINQ スタイルのクエリ演算子を使って、非同期なイベントベースのプログラムを合成するライブラリです。Rx を使うと、開発者は非同期データストリームを Observable で表し、非同期データストリームに対するクエリに LINQ 演算子を使い、非同期ストリームの並行性を Scheduler でパラメタ化します。簡単に言えば、Rx = Observable + LINQ + Scheduler です。

非同期処理を扱うライブラリというと FuturePromise を思い浮かべる人も多いと思う。しかし、Future/Promise が単一の非同期イベントを一つずつ処理するモデルなのに対し、Rx の Observable(時間や順序のある)複数イベントのストリームを扱う処理を対象としている点が異なる。

データストリームの具体例としては、デスクトップアプリにおいては「マウスイベント」、ウェブサービスにおいては「株価情報」や Twitter の「タイムライン」などが分かりやすいだろう。

Rx の APIGoF の Observer パターンを踏襲している。つまり、観測対象 (Observable) のイベントを観測者 (Observer) が購読 (subscribe) するという形式をとる:

trait Observable[T] {
  def subscribe(observer: Observer[T]): Subscription
}

Observable から Observer に通知されるイベントは三種類ある。これらは、Observer のコールバックメソッドとして実装する:

trait Observer[T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
  def onComplete(): Unit
}

すなわち、Observable に新しいデータが来るたびに onNext が呼ばれる。そして、ストリームが終了 (terminate) する時に onComplete が呼ばれる。一方で、エラーが起きると onError が呼ばれる。

Rx では、onCompleteonError のどちらかが発生した時点でストリームは終了し、これ以後はイベントは発生しない。逆に言うと、完了イベントやエラーイベントが起きない限りは無限ストリームになる。また、onNext逐次的 (sequential) に呼ばれる。つまり、並行には呼ばれないので競合状態 (race condition) を気にする必要がない。

また、Observable が終了する前でも Observer を明示的に購読解除 (unsubscribe) できる。購読を解除するには、subscribe した時に返される Subscriptionunsubscribe() を呼び出す。

trait Subscription {
  def unsubscribe(): Unit
}

LINQ関数型プログラミング

ところで Observable には、subscribe の他にも mapflatMap といった関数型プログラミングでおなじみの関数も多数用意されている:

def map[R](func: (T) => R): Observable[R]
def flatMap[R](f: (T) => Observable[R]): Observable[R]
def foreach(onNext: (T) => Unit): Unit
def filter(predicate: (T) => Boolean): Observable[T]
def take(n: Int): Observable[T]
def takeWhile(predicate: (T) => Boolean): Observable[T]
def toList: Observable[List[T]]
def zip[U](that: Observable[U]): Observable[(T, U)]

なぜ、ここにこんなものがあるんだろう?

"Rx = Observable + LINQ + Scheduler" だったことを思い出してほしい。オリジナルの .NET 版では、Observable を LINQ で操作できるように 標準クエリ演算子の実装を提供している。上記の関数は RxScalaAPI だが、これは標準クエリ演算子Scala のコレクション操作で用いられる高階関数へと置き換えたものだ。

なぜ置き換えられるのか? LINQ はコレクションやデータベースに対して SQL 風のクエリ式を書けるようにするための機能だが、その下にある標準クエリ演算子は、関数型言語のコレクションライブラリが提供する高階関数と実質的に同じものだからだ。例えば、標準クエリ演算子Selectmap 関数に対応する(LINQScala の対応は @eed3si9n さんの「Scala脳のための C# LINQ」が参考になる)。

従来の Observer パターンの考え方からすると、高階関数の導入は唐突に写るかもしれない。しかし、実のところ Observable は一種のストリーム(無限リスト)とみなせるので、関数型ライブラリのコレクション操作関数と非常に相性が良い。

Observer パターンの語彙と関数型の語彙が同じものを指している場合もある。例えば、Observable の foreach メソッドのドキュメントや実装を見てみると「foreachsubscribeエイリアスだ」と明記されている。どちらも「イベントが通知されるたびに何か処理をする」メソッドだからだ。

def subscribe(onNext: (T) ⇒ Unit): Subscription
def foreach(onNext: (T) => Unit): Unit

一方で違いもある。例えば Observable は無限ストリームなので foldRight は定義されてない。また、非同期コレクションなので、foldLeftreduce は、集約された値をそのまま返す代わりに Observable に包んで返すようになっている。

def foldLeft[R](initialValue: R)(accumulator: (R, T) ⇒ R): Observable[R]
def reduce[U >: T](accumulator: (U, U) ⇒ U): Observable[U]

また、非同期データストリームのユースケースに対応するために、switchcombineLatest といったメソッドが数多く追加されている。

しかし、もし未知の関数が出てきてもさほど恐れる必要はない。関数の型シグネチャとマーブルダイアグラム(下図)に注目すれば、どんな機能か把握するのはさほど難しくないからだ。

f:id:okapies:20150224023017p:plain(以下、マーブルダイアグラムは RxJava の Javadoc からの引用)

確かに、ObservableAPI を真正面から読み解こうとすると、大量のメソッド「Observable シーケンスの各要素を Observable シーケンスの新たなシーケンスへ射影し…」のような宇宙語に当惑すること必至だ。けれども、視点を変えてこれらのメソッドを関数型の機能として捉えなおしてみると、一転して強い一貫性が見えてくるはずだ。

Rx の「糊」

有名な「なぜ関数プログラミングは重要か」の中で、John Hughes は「プログラミング言語にとって高階関数と遅延評価はモジュールを貼り合わせる新しい糊である」という趣旨のことを述べている。

そんなわけで、非同期データストリーム処理の「糊」である Observable高階関数の実例をいくつか見てみよう。

  • map は Observable の各要素に「データを別のデータに変換する」関数を適用して新しい Observable を作る
def map[R](func: (T) => R): Observable[R]

f:id:okapies:20150224023017p:plain

  • flatMap は、同様に「データを Observable に変換する関数」を適用して入れ子の Observable を作り、最後にマージする(念の為に付け加えると、flatMapmapflatten を組み合わせた関数だ)
def flatMap[R](f: (T) ⇒ Observable[R]): Observable[R]

f:id:okapies:20150304003356p:plain

  • groupBy は、やはり要素に関数を適用してキーを出力し、そのキーごとに要素をグルーピングした Observable を作る(戻り値型が Observable[(K, Observable[T])] になっている)
def groupBy[K](f: (T) ⇒ K): Observable[(K, Observable[T])]

f:id:okapies:20150304003358p:plain

ところで、以上で紹介した関数のマーブルダイアグラムを見ると、データだけでなく完了 (onComplete) イベントを表す「|」も、ちゃんと新しい Observable に写し取られているのが分かる。

以前の節でも見たように、Observable には onNext, onComplete, onError という三つのイベントがある。そして、Observable の高階関数は、データだけでなく全てのイベントを出力先の Observable へ自然なやり方で写す。このとき、イベント間の制約条件(一度 onComplete になったら以後 onNext は発生しない等)も引き継がれる。このため、map 等の高階関数に渡す関数を書く時に、Observable の内部状態をいちいち気にする必要がない。

以上から、Observable が提供する高階関数ListFuture が提供するものと単に型シグネチャが同じであるだけでなく、同様のセマンティクスを持つ機能として扱える。

このため、List や Future と同様に、これらの高階関数は Observable と無関係に作った好きな関数同士を繋ぎ合わせる「糊」として使うことができる:

tweets.filter(t => t.userName == "okapies").map(t => t.text)

また、RxScala では FutureObservable に変換する関数 from が用意されている。これも、Future と Observable がよく似たセマンティクスを持っており、互換性を持っている証拠と言えるだろう。

def from[T](f: Future[T])(implicit execContext: ExecutionContext): Observable[T]

f:id:okapies:20150304003357p:plain

ただし、Observable には「時間と順序」の両方が関わってくるので、List や Future とは異なるセマンティクスを持つ関数も存在する。その代表例が concatflatten だ。こうした関数の時間に関する挙動を調べるときは、API ドキュメントに載っているマーブルダイアグラムが重要な情報源になる。

この点については、後の節で「二つ以上の Observable をマージする」ケースを検討する際に詳しく見ていくことにする。

スケジューラ (Scheduler)

Scala 標準の FutureExecutionContext を受け取るように、ObservableScheduler を受け取るオプションを持っている。これが "Rx = Observable + LINQ + Scheduler" の三つ目だ。

object Future {
  def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T]
}

trait Observable[T] {
  def observeOn(scheduler: Scheduler): Observable[T]
}

どちらもタスクの(非同期)実行を制御するための機能だが、Future は単一の値を返したらそこで完了するのに対して、Observable は複数の値を返し続ける。また、タスクの実行戦略を選んだり、処理の途中で購読解除 (Subscription.unsubscribe()) できる必要がある。

Scheduler は、これらの機能をサポートするために ExecutionContext よりも複雑な仕組みになっている。

実際に使う Scheduler は、自分で実装しなくてもフレームワーク側である程度用意されている。例えば、タスクを現在のスレッド上で実行する immediate や、CPU-bound や IO-bound なタスクのための computationio などがある。

(注: RxJava の Scheduler は 1.0 に上がる時にリファクタリングされたため、Coursera 講座(第一期)での Scheduler の説明は少し内容が古くなっている。ちなみに、チケットを見ると、Meijer (@headinthebox) 氏ご本人も議論に参加していたりする)

Observable のマージ

Rx の API を調べると複数の Observable を一つの Observable に合流(マージ)する」という似たようなメソッドを、アルゴリズムを変えて何通りも提供していることに気付く。これは、それぞれの Observable に含まれるイベントの時刻が重なっている場合、それらをマージする方法や順序の決め方には色々な考え方があって一意には決まらないからだ。

例えば Observable は、List のような通常のコレクションと同様に concatflatten という二つのメソッドを提供する。どちらも「入れ子になった Observable (Observable[Observable[U]]) を一重の Observable に畳みこむ」関数だ。*1

def concat[U]: Observable[U]
def flatten[U]: Observable[U]

ところで、List ではこれらの関数のアルゴリズムconcat = flatten だが、Observable ではそれぞれ挙動が異なる。

concat は、入れ子に入っている Observable 同士の順番が維持されるようにマージする。つまり、二番目の Observable に含まれるイベントは、必ず一番目の Observable のイベントの後ろに置かれる。

一つ目の Observable が完了するまでの間、二つ目の Observable に来たイベントはバッファされ続けることに注意しよう。つまり、場合によってはメモリがオーバーフローする可能性がある。

f:id:okapies:20150223233540p:plain

一方 flatten(あるいは merge)は、イベントの発生タイミングが維持されるようにマージする。つまり、一番目の Observable に含まれるイベントの途中に、二番目の Observable のイベントが挿入されることがある。

f:id:okapies:20150223233541p:plain

このように、「二つの Observable を一つにマージする」という簡単な操作でも二つの実現方法があるので、ユースケースに応じて使い分ける必要がある。

この他にも、マージ系のメソッドには switchamb のような「複数の Observable のうち一つを採用して、他の Observable を捨てる」ものや、zipcombineLatest のように「Observable から来るデータが揃った時にペアにして出力する」ものがある。

様々なサイトに掲載されている具体例を見ると、このマージ機能をうまく組み合わせることが、Rx を活用する際の一つのポイントとなるようだ。

関数を組み立てて非同期処理を作る

この記事の冒頭で、関数型で考えるメリットは「少数のシンプルな概念の組み合わせで多数の具体例を作り出せること」だと述べた。

これを確かめるために、これまでに紹介した関数を組み立てて、実際に非同期イベント処理を実装してみよう。ここでは、Coursera 講座に出てくる Rx のコーディング例を使って説明する。

国ごとの地震データストリーム

まず、非同期データストリームのソースとして、アメリカ地質調査所 (USGS) が提供する API から全世界の地震データのストリームを取得する関数 usgs と、逆ジオコーディングサービスに問い合わせて地理座標を国名に変換する関数 reverseGeocode が使えるとしよう。

以下の関数シグネチャを見ると分かるように、どちらもウェブサービスへの非同期な問い合わせなので、戻り値の型が ObservableFuture になっている。

def usgs(): Observable[EarthQuake] = { ... }
def reverseGeocode(c: GeoCoodinate): Future[Country] = { ... }

では、この二つの関数を組み合わせて、国ごとの地震情報のストリームを作ってみよう。

まず、usgs から取得した地震データと reverseGeocode から取得した国名を map で貼り合わせて、(地震データ, 国) を要素とするストリーム withCountry を作る。reverseGeocode への問い合わせには、usgs から取り出した地震データに含まれる地理座標の情報が必要なので、コードは以下のようになる:

val withCountry: Observable[Observable[(EarthQuake, Country)]] =
  usgs().map { quake =>
    val country: Future[Country] = reverseGeocode(q.location)
    Observable.from(country.map(country => (quake, country)))
  }

ここで withCountry の戻り型を見ると Observable が入れ子になっていることに気付く。reverseGeocodeusgs と同様に非同期関数なので、新しい地震データがやってくる度に map に渡したクロージャの中で結果待ち (Future[Country]) しているからだ。このままでは扱いにくいので、flatten で入れ子の Observable に含まれるイベントを一列にマージする:

val merged: Observable[(EarthQuake, Country)] = withCountry.flatten()

これで (地震データ, 国) を要素とするストリーム merged が得られたので、最後に国をキーにして groupBy でグルーピングする:

val byCountry: Observable[(Country, Observable[(EarthQuake, Country)])] =
  merged.groupBy { case (quake, country) => country }

これで、国ごとにタグ付けされた地震データが得られるようになった。このように mapflattengroupBy といった糊を使って関数を組み立て、byCountry という具体的な関数を作ることができた。

マージ戦略を取り替える

しかし、この実装には一つ問題がある。最終的に国ごとにストリームされる地震データの順番が、ユーザが望む通りになっていない可能性があるのだ。

つまり、入れ子の Observable をマージする時に flatten を使っているので、地震データの順番が「地震の発生順」ではなく「reverseGeocode の結果が返ってきた順」になってしまう。この挙動はエンドユーザへの速報のようなユースケースでは問題ないかもしれないが、地震データを時系列順に解析したいような場合は問題だろう。

どうすればいいか? 答えは簡単で、単に merged 関数の flattenconcat に取り替えればいい。これで、望み通りに発生順の地震データが出力される(既に書いたように concat はメモリがオーバーフローする可能性があるので注意しよう)。

val merged: Observable[(EarthQuake, Country)] = withCountry.concat()

以上のように、シンプルな関数を組み合わせて作ったアプリケーションは、関数を部分的に取り替えるのも容易であり、結果として異なるユースケースにも素早く対応できることが示せたと思う。

まとめ

前編では、Rx を関数型プログラミングの視点で読み解いていくことで、API の習得やユースケースへの適用が容易になることを示した。

続いて後編では、Observable単なるモナドインスタンスだよ?という話や、ObservableIterableFuture との関係といった議論を紹介していきたい。

*1:flatten は処理系によっては単体では提供されない。C# では .SelectMany(a => a) と同等。