ReactiveSocket について

この記事は、Java Advent Calendar 2015 の 22 日目です。前日は、n_slender さんの「PlayFramework 2.4 Java Ebeanでのアプリ開発」でした。

今日の記事では、この半年くらいで仕様と実装が出てきている ReactiveSocket というプロトコル仕様についてお話したいと思います。

なぜ Java Advent Calendar でプロトコルの話を? と訝しがっている方も多いと思いますが、基本的には以下の二つの理由です。

  1. JEP 266 として JDK 9 に追加される予定の Reactive Streams と密接に関わっている
  2. Java 製のサーバサイド向けライブラリを多数 OSS 化している Netflix が中心になって仕様策定を行っており、参照実装も JVM 向けが中心

予定ではプロトコルレベルの話にも踏み込んで解説したいと思っていたのですが、プライベートが色々と立て込んでいるため、概要レベルのご紹介になることをお許しください。

ReactiveSocket って何?

ReactiveSocket is an application protocol providing Reactive Streams semantics over an asynchronous, binary boundary.

(ReactiveSocket とは、非同期バイナリ境界をまたいで Reactive Streams のセマンティクスを提供するアプリケーションプロトコルである。)

ざっくり言うと Reactive Streams の考え方をアプリケーションプロトコルのレイヤで実現するための仕様。

そもそもの Reactive Streams とは何か、については以前に書いた記事でも解説しているのでご参照ください。

okapies.hateblo.jp

要点としては、メッセージ駆動のコンポーネント間でメッセージをやり取りするシステムを組んだ際のフロー制御の方法を定めている。

具体的には、送信側が受信側の処理能力を超える量のメッセージを送信してバッファを溢れさせることのないように、受信側から送信側に対して「次は◯個送っていいよ」というフィードバック (back-pressure) を通知することで、過負荷の際に処理能力を超えるメッセージを受信してシステムがクラッシュする事態を回避することを狙っている。詳細な動作については、以下のスライドの図も参考にしてほしい:

特徴

ReactiveSocket の特徴は以下の通り:

メッセージ駆動

(HTTP2 と同様の)非同期なメッセージ駆動であり、全ての通信は、単一のコネクション上に多重化されたメッセージストリームを介して行う。また、これによってレスポンス待ちでブロックすることがなくなる。

相互作用モデル

ReactiveSocket は複数の相互作用 (interaction) モデルをサポートしている。ユースケースごとに適切なモデルを選んで使用することで、性能やユーザ体験に与える影響を向上できる。

また、後述するようにトランスポートに何を使うか (TCP, WebSocket, Aeron, ...) に依存しないので、これらを使ってアプリケーションを実装すれば、性能特性に合わせてトランスポートを入れ替えることもできる。

  • Fire-and-Forget(撃ちっぱなし)
    • レスポンスが必要ない場合は、これを使うのが一番効率的
    • Future<Void> completionSignalOfSend = socketClient.fireAndForget(message);
  • Request/Response(単一レスポンス)
    • 普通のリクエストレスポンス。「レスポンス1個のストリーム」を最適化したものと考えることができる
    • Future<Payload> response = socketClient.requestResponse(requestPayload);
  • Request/Stream(有限個の複数レスポンス)
    • 「コレクション」や「リスト」に相当
    • Publisher<Payload> response = socketClient.requestStream(requestPayload);
  • Topic Subscription(無限個の複数レスポンス)
    • 「プッシュ通知」や「イベントストリーム」に相当
    • Publisher<Payload> response = socketClient.requestSubscription(topicSubscription);
  • Channel(双方向ストリーム)
    • クライアント側から途中でリクエストの条件を変更したりするような場合に用いる
    • Publisher<Payload> output = socketClient.requestChannel(Publisher<Payload> input);

フロー制御

二つのフロー制御方式をサポートしている。どちらも、トランスポートレイヤではなくアプリケーションレベルの流量制御に焦点を置いている。

一つは、Reactive Streams が仕様化しているような request(n) の非同期プル。こちらは、リクエスト発行側 (requester) から応答側 (responder) へのキャパシティの通知に使う。

もう一つは ReactiveSocket 独自のリース (leasing) という仕組みで、応答側から発行側へのキャパシティ通知に用いられる。リースは、「規定時間 (TTL) までに◯個まで送ってよし」という形式でリクエストを発行する。これによって、データセンター内のサーバ間通信のようなユースケースで、アプリケーションレベルの負荷分散(クライアント側で、各サーバから通知されたリースの情報を使ってリクエストを分散する)がやりやすくなる。

多言語 (polyglot) サポート

相互作用モデルとフロー制御を言語非依存なプロトコルとして定義しているので、言語を跨いだインタラクションに利用できる(Reactive Streams は JVM 上で動作するミドルウェア同士でしか利用できない)。

様々なトランスポートレイヤをサポート

ReactiveSocket 自体は OSI Layer 5/6 相当のアプリケーションプロトコルであり、TCP 以外にも WebSocket や Aeron (*)、Quic といった様々なトランスポートプロトコルの上に実装できる。

また、ReactiveSocket が定義するアプリケーションレイヤはトランスポートの差異を隠蔽するので、ユースケースに合わせて最適なトランスポートを選ぶことができる。

*: Reactive Manifesto の執筆者の一人である Martin Thompson の会社 Real Logic が開発しているトランスポートプロトコル。元 LMAX の CTO で Disruptor を開発していた御仁、といえば分かる方もいるのでは。

性能

コネクションを使い回すので、コネクションを何度も張り直すような余計な処理を回避できる。また、バイナリプロトコルなので CPU 負荷を削減できる。さらに、フロー制御が組み込まれているので、相手先システムがスローダウンしている時にリトライ地獄を仕掛けてさらに負荷をかけるようなことがない。

同様の課題を解決する仕組みとして、Netflix が自身のマイクロサービス同士のフロー制御に使っている Hystrix があるが、オーバヘッドや複雑さが増すという問題点があった。

なんで HTTP/2 を使わないの?

大雑把に言うと、HTTP/2 は一義的にウェブサイトからドキュメントを取得するブラウザのためのプロトコルで、ReactiveSocket が想定するユースケースに合わないから。

  • リクエスト/レスポンスのみで、それ以外の相互作用モデルをうまくサポートできない
  • アプリケーションレベルのフロー制御の仕組みがない
  • REST は非常に普及しているが、アプリケーションのセマンティクスを定義するのに使うのは非効率で不適切である

対応実装と今後について

コアライブラリとして reactivesocket-java が公開されている。これ自体はプロトコル実装を Reactive Streams API でアクセスできるようにしたもので、実際には以下のような具体的なトランスポートプロトコルの実装でラップして使う:

また、ブラウザや Node.js から使える JavaScript 版の実装も作られている:

今後…については最近あまり追いかけられてないので分からないです。すいません。とりあえず、Netflix 内部のフロー制御を Hystrix から置き換えていきたいのだとは思われる。具体的な進捗を知ってる人がいたら教えて下さい。あとは、Reactive Streams に参加してる他のベンダー(Typesafe とか)が乗っかるのかどうか(いちおう呼びかけはなされていて、Typesafe の人も関心はあるみたい)。

既に見たように、Reactive Streams 自体は JVM に閉じた仕組みだったところを、アプリケーションレイヤープロトコルとして仕様化することで多言語で活用できる可能性が出てきたわけで、個人的には注目しています。

「なぜ関数プログラミングは重要か」を要約してみた(その1)

関数型プログラミング (functional programming) の利点を説く際によく持ち出されるのが、QuickCheck の開発者の一人である John Hughes が 1984 年に著した論文 "Why Functional Programming Matters" だ。「なぜ関数プログラミングは重要か」という題名で日本語訳もされているので、読んだことがある人も多いと思う。

要旨としては、冒頭の1章および2章で述べられている「関数型プログラミングが優れているのは、高階関数遅延評価という、モジュール同士を貼り合わせる強力な『糊』を持っているからだ」という話がほぼ全てで、以降はそれを具体例に基づいて説明する構成になっている。ただ、その具体例として「数値計算アルゴリズム」やら「ゲーム用人工知能アルゴリズム」やらの話が延々と続くし、しかもコード例が Haskell の先祖にあたる Miranda という言語で書かれているのでなかなか取っ付きづらい。

今回、来年の1月に ScalaMatsuri で「なぜリアクティブは重要か」というお題で話をさせて頂けることになったこともあって、少し頑張って、元ネタであるこの論文を通読したので要約を公開したいと思う。なお、コード例は Scala で書いた(Scala 版のコードはこの記事を参考にしている)。

1. イントロダクション

  • 本論文の目的は、関数型プログラミングの重要性を示すと共に、その利点を明確にしてフル活用できるようにすること
  • 関数型プログラミングでは、プログラム全体を関数だけで構成する
    • メインプログラム自身が関数であり、プログラムへの入力を引数として受け取り、結果をプログラムの出力として供給する
    • メイン関数はさらに多くの関数を使って定義されるので、プログラムの最下層に至るまで関数は言語のプリミティブとなっている
  • 関数型プログラミングの「利点」:
    • 関数型プログラムには副作用(≒代入文)がないのでバグが減らせる
    • 参照透明なので実行順序を気にしなくてよく、式をどの時点で評価してもよいのでプログラムをより数学的に扱える
  • それはそうなんだけど…
    • 「〜ではない」についてばかり語っている(代入文がない、副作用がない、制御フローがない
    • 「〜である」について語らないと、物質的な利益に興味がある人にはピンと来ないだろう
  • 関数型プログラミングの力を語るだけでなく、それが目指す理想を示さねばならない

2. 構造化プログラミングとの類似

  • 関数型プログラミングと構造化プログラミングを比較してみる
  • 構造化プログラミングとは、「goto 文を含まず」「ブロックが複数の入口や出口を持たない」
    • さきほどの関数型プログラミングの「利点」と同様に、否定形の説明になっている
    • 「本質的な goto」のような実りのない議論の温床になった
  • 構造化プログラミングの核心はモジュール化であり、大きな生産性向上をもたらす
    1. 小さなモジュールは素早く簡潔にコーディングできる
    2. 汎用モジュールの再利用によって、プログラムをより速く開発できる
    3. モジュールは独立してテストできるので、デバッグが容易になる
  • goto は小規模プログラミングでしか役立たないが、モジュール化設計は大規模プログラミングにおいても役立つ
  • プログラミング言語が問題をモジュール化する能力を高めるには、モジュール同士を貼り合わせるが重要
    • 問題を部分問題に分割し、部分問題を解き、その解を合成する。つまり、問題を分割する方法は、解を合成する方法に依存する
    • 例: 椅子を部品(座部、脚、背もたれなど)に分けて作れるのは、ジョイントや木工接着剤があるから。さもなければ、一つの木の塊から椅子を掘り出すしかない

3. 関数の貼り合せ

この章では、二種類のの一つ目である高階関数について紹介している。sum のような単純な関数を、高階関数とその引数の組み合わせとしてモジュール化することで、reduce のような汎用的な関数を導出する。

論文では、二つのデータ型(リストとツリー)に対して適用できる高階関数について述べている。まず、リスト操作関数の汎用化を進めて、最終的に reduce 関数と map 関数を導出する。次に、木(ツリー)構造に対する操作についても同様に redtreemaptree を導出する。

章の最後では、「汎用の高階関数と特有の特殊関数の組み合わせとして部品化することで、たくさんの操作を容易にプログラムできる」「新たなデータ型を定義したときは、それを処理する高階関数を書くべきだ」と結んでいる。

リスト編

リスト処理の問題を例に説明する。リストのデータ構造を(Scala で)書くとこうなる:

sealed trait ListOf[+X]
case object ListNil extends ListOf[Nothing]
case class Cons[X](head: X, rest: ListOf[X]) extends ListOf[X]

以上のデータ構造を使って具体的なリストを表すとこうなる:

[]        は ListNil
[1]       は Cons(1, ListNil)
[1, 2, 3] は Cons(1, Cons(2, Cons(3, ListNil)))

次に、リストの要素を足し上げる関数 sum を定義してみる:

def sum: ListOf[Int] => Int = _ match {
  case ListNil         => 0
  case Cons(num, list) => num + sum(list)
}

この定義を調べると、sum に固有なのは初期値 0 と演算 + だけなのが分かる*1。つまり、sum

  • 一般的な再帰パターン(reduce と呼ばれる)
  • sum 固有の部分(0+

の二つにモジュール化して、後で貼り合わせることでも作ることができる:

def add(x: Int, y: Int) = x + y
def reduce[A, B](f: (A, B) => B, x: B)(list: ListOf[A]): B = list match {
  case ListNil    => x
  case Cons(a, l) => f(a, reduce(f, x)(l))
}

def sum: ListOf[Int] => Int = reduce(add, 0)

(注: Scala でこの書き方をするとスタックが溢れるけど、回避方法は色々なところで紹介されているので略。あと、カリー化の話も略。)

reduce は(初期値と演算を入れ替えるだけで)様々な用途に再利用できる:

// リストの全要素の積
def product: ListOf[Int]     => Int     = reduce(multiply, 1    )
// リストの要素のいずれかが true か調べる
def anytrue: ListOf[Boolean] => Boolean = reduce(or,       false)
// リストの全要素が true か調べる
def alltrue: ListOf[Boolean] => Boolean = reduce(and,      true )

ところで reduce は、リストの Cons の部分を f で、ListNil の部分を a で置き換えたものと看做せる:

                     l =     Cons(1,     Cons(2,     Cons(3, ListNil)))
     reduce(add)(0)(l) =      add(1,      add(2,      add(3,       0)))
reduce(multiply)(1)(l) = multiply(1, multiply(2, multiply(3,       1)))

つまり、reduce(Cons, ListNil) は(ConsCons に、ListNilListNil に置き換えているだけなので)リストからリストを複写する関数とみなせるし、リスト ab に対して reduce(Cons, b)(a) は二つのリストを連結する関数となる:

def copy[A](l: ListOf[A])                 = reduce(Cons[A], ListNil)(l)
def append[A](a: ListOf[A], b: ListOf[A]) = reduce(Cons[A], b      )(a)

次に、リストの全ての要素を2倍したリストを返す関数 doubleAll は、doubleAndCons 関数を使って以下のように定義できる:

def doubleAndCons(num: Int, list: ListOf[Int]) = Cons(2 * num, list)

def doubleAll(l: ListOf[A]) = reduce(doubleAndCons, ListNil)(l)

doubleAndCons 関数は、以下のように double 関数と fAndCons 関数の組み合わせに置き換えられる:

def double(n: Int) = 2 * n
def fAndCons[A, B](f: A => B)(el: A, list: ListOf[B]) = Cons(f(el), list) // 2 * num => f(el)

def doubleAndCons: (Int, ListOf[Int]) => ListOf[Int] = fAndCons(double)

ところで fAndCons 関数は fCons を合成した関数 (Cons . f) として定義することもできる:

def fAndCons[A, B](f: A => B): (A, ListOf[B]) => ListOf[B] = (Cons[B] _).compose(f)

(注: 上記を実行するには、あらかじめ以下の暗黙変換を定義して import RichFunction2._ しておく必要がある。)

implicit class RichFunction2[T1, T2, R](f: Function2[T1, T2, R]) {
  def compose[A](g: (A) => T1): (A, T2) => R = (x: A, y: T2) => f(g(x), y)
}

したがって、doubleAll 関数は double, Cons, reduce の組み合わせで定義できる:

def doubleAll: ListOf[Int] => ListOf[Int] = reduce((Cons[Int] _).compose(double), ListNil)

ここで、double 関数をパラメータ化すると、リストの全要素に f を適用する map 関数を導出できる:

def map[A, B](f: A => B): ListOf[A] => ListOf[B] = reduce((Cons[B] _).compose(f), ListNil)

def doubleAll: ListOf[Int] => ListOf[Int] = map(double)

map は汎用的に使える有用な関数で、例えば行列(=リストのリスト)の要素を足し上げる関数を作りたくなっても、以下のように簡潔に書ける:

def sumMatrix: ListOf[ListOf[Int]] => Int = sum.compose(map(sum))

ツリー編

ラベル付きの順序付きツリーについて考えてみる。(Scala で)データ構造を書くとこんな感じ:

sealed abstract class TreeOf[A] {
  def label: A
  def subtrees: ListOf[TreeOf[A]]
}
case class Node[A](label: A, subtrees: ListOf[TreeOf[A]]) extends TreeOf[A]

例えば、以下のようなツリーを:

    1 o
     / \
    /   \
   /     \
2 o       o 3
          |
          |
          |
          o 4

上記で定義したデータ構造で表すとこうなる:

Node(1,
     Cons(Node(2, ListNil),
          Cons(Node(3,
               Cons(Node(4,
                    ListNil),
               ListNil)),
          ListNil)))

リストの時と同様に、reduce と同じ役割を果たす redtree 関数を考えてみる。reduce は「Cons を置き換える何か」と「ListNil を置き換える何か」の二つを引数に取る関数だった。同じ方針で考えてみると、redtreeNodeConsListNil を置き換えた三つの何かを引数に取る関数になるはずだ。

def redtree[A, B, X](f: (X, A) => B, g: (B, A) => A, a: A)(tree: TreeOf[X]): B = {
  def redtreeImpl[A, B, X]
      (f: (X, A) => B, g: (B, A) => A, a: A)(subtrees: ListOf[TreeOf[X]]): A =
    subtrees match {
      case Cons(subtree, rest) => g(redtree(f, g, a)(subtree), redtreeImpl(f, g, a)(rest))
      case ListNil => a
    }

  f(tree.label, redtreeImpl(f, g, a)(tree.subtrees))
}

reduce と同様に、redtree を他の関数と組み合わせて様々な関数が定義できる:

// ツリー全体の label を足し合わせる関数
def sumtree: TreeOf[Int] => Int       = redtree(add,     add,       0      )
// ツリー全体の label のリストを作る関数
def labels[A]: TreeOf[A] => ListOf[A] = redtree(Cons[A], append[A], ListNil)

最後に、ツリー用の map 関数である maptreeredtree を使って定義しておく(5章でゲーム用人工知能を実装する際に使う):

def maptree[A, B](f: A => B): TreeOf[A] => TreeOf[B] =
  redtree((Node[B] _).compose(f), Cons[TreeOf[B]], ListNil)

インターミッション

ちょっと力尽きたので、今回はここまで。続きはやる気が湧いたら、ということにさせてください…。

論文では、高階関数がプログラムのモジュール化に役立つ理由について「データ型の詳細に関する知識を高階関数の中に局所化できる」と述べているが、これを逆に言うと「特定のビジネスロジックを実装した関数を、それを適用する(データ型の)文脈から切り離せる」ということになる。

で、この考え方を推し進めると、一例として「抽象的な Future」が述べているような、「本番用の非同期実行の文脈」と「テスト用の同期実行の文脈」を同じコードで切り替えるみたいな仕掛けが実現できるようになる、というわけですね。

*1:分かってる人向けの言い方をするなら、モノイドの単位元と二項演算。

JJUG ナイトセミナーで Reactive Streams について発表しました

6月24日の JJUG ナイトセミナーで「Reactive Streams 入門」のタイトルで発表させて頂きました。最近話題の Reactive Programming、気がついたら一万人以上が署名している Reactive Manifesto、そして Java 9 で標準化という話が進んでいる Reactive Streams をまとめて俯瞰してみました、という感じの内容になっています。

かなり戦々恐々だったのですが、思いのほかご好評をいただきとてもとてもほっとしています。発表の機会を与えて下さった JJUG スタッフの皆様、会場をご提供頂いたオラクル様、発表を聴いてくださった参加者の方々、ありがとうございました。

発表でも触れましたが、"Reactive" という概念が何を指すかについては大きな混乱があり、様々な論者が異なる定義を提唱しているのが現状です。一方で、そうした定義の背景には、それぞれに体系的な知見や学術的な議論の積み上げがあるのも確かで、その辺をちゃんと掘り下げた解説を書いてみたいなぁ、と思っていました。

そんなわけで、この半年ほど継続的に資料を収集したり、V2 にアップデートされた Reactive Manifesto の翻訳をやったりしていました。構成については、記事を書くことを念頭に以前からぼんやりと考えてはいたのですが、今回の発表準備にあたって参考にした、英語版 Wikipedia「データフローを記述する宣言的なプログラミングモデル」「その実行モデルを実装したランタイム」という定義を軸に据えると、Reactive の名を冠した要素技術群をそこそこ総括的に整理できるのではないか、と考えて作ったのがこのスライドになります。

このテーマについては、まだまだ考えるべきことも多そうですし、今後も継続的に研究していきたいと思っています。改めて、今回は貴重な機会を頂きありがとうございました。

余談1

たしかに、次に使う個数を書いたモノを前工程に送るのって、完全にカンバンだなぁ。日本の製造業のプラクティスがまた世界を変えてしまった(違う。

余談2

今回の発表では、Reactive Programming のような非同期プログラミングについて、私が最近考えている『プログラミングモデルについての議論は概ね決着がついていて、焦点は「いかに高機能なランタイムを提供するか」という所に移りつつあるのではないか』という話を入れてみましたが、さて、実際のところどうなんでしょうか…。みなさんはどう思われますか?

これはずっとそうだと思うのですが、特に昨今、UI プログラミングやマイクロサービスといった文脈で非同期プログラミングの需要が高まる中でも、「非同期を同期的な文法で書きたい」というニーズは非常に根強いものがあります。

しかし、以前に「マイクロサービスが Scala を選ぶ3つの理由」という記事でも書いたように、特に分散システムの文脈では(有名な「分散コンピューティングの落とし穴」が述べるように)、レイテンシや処理の失敗、ネットワークの不安定性、あるいはそれを補うための運用監視といった話題は無視できません。過去に、「同期的な文法で非同期プログラミングができる」というコンセプトを打ち出したプログラミングモデルが大体失敗に終わったのは、そういった事情でしょう。

そう考えると、「同期プログラミングにとっての異物」をプログラマの目から隠してしまうのではなく、少し考え方を変えて明示的に扱った方が、最終的には幸せになれる気がしてきます。そして、Future/Promise や(Rx の)Observable のような関数型インタフェースは、非同期な実行モデルに基づくデータフローを記述する上で、優れた抽象化を提供してくれます。Reactive Programming が、多くの場合で関数型 Reactive Programming (FRP) の同義語として扱われるのは、そうした抽象化がもたらす利便性が大きな理由でしょう。

そんなわけで、本来であれば、近年の Reactive Programming の実践は「関数型プログラミング」と極めて密接な関係があります。しかし、今回は大前提として Java ユーザの方に向けた発表なので、そういった話題は基本的に除外することを心がけました。実際、スライドを見ていただければお分かりになると思いますが、「関数型とは何か」という議論に立ち入らなくても Reactive Programming を理解し活用することは可能です。

一方で、これらのライブラリをより効果的に活用したいと望むなら、関数型の考え方を調べておくと非常に役立ちます。この手の話に関心がある方は、以前このブログで書いた「関数型プログラマのための Rx 入門」シリーズをご覧になってみてください。

Reactive Streams が 1.0.0 になった

以前に紹介した Reactive Streams 仕様が 1.0.0 になりました。リリース文はこのへん

okapies.hateblo.jp

以下、Twitter に書いた感想をぺたぺた。途中で言及してる「つらみ」ってのはこのへんの議論とか瀬良さんのこの記事の話ですね。改めて読み返すと「解消される」ってのは言い過ぎだった(他にも課題はいっぱいある)けど、まぁ一つ障壁が取り除かれつつあるよなーという。

ReactiveX と「普通のやつらの上を行け」の意外な関係

これは「関数型プログラマのための Rx 入門」の補足記事です(タイトル変えた)。

前編後編とお送りしてきたこの記事だが、特に後編について「何を言ってるのか分からん」というコメントを何人かの方から頂いた。…なんというか、ごめんなさい

繰り返しになるが、Rx を使う上で関数型プログラミングの知識は必ずしも必要ではないし、むしろ(関数型のコンセプトが基礎にあるのに関わらず)知らなくても使えるようになっている。ライブラリの作者たちは「過度な抽象化は害になる」ということを弁えているのだろう。

しかし、Rx と関数型プログラミングの関係を把握しておくと、非同期データストリームのビルディング・ブロックの作り方について大いに視野が広がるだろう。もし、貴方がこの記事の前提となる「関数型」のパラダイムに興味をお持ちなら、まずは関数プログラミング実践入門」をお勧めしたい。

本の内容そのものは Haskell を前提にしているが、関数型の重要なコンセプトが一通り紹介されているので、今回の記事で出てきたキーワード(高階関数、代数的データ型、モナド、…)が属する世界観を概観するのに良いと思う。

また、このテーマに本気で取り組みたい初学者の方には、つい先日に発売されたばかりのScala関数型デザイン&プログラミング」を併せてお勧めしたい。

この本は、かねてより国内外で高い評価を得ている "Functional Programming in Scala" の日本語訳になる。ざっと見た感じ非常に「歯ごたえがある」感じだが、「関数型でプログラムを組み上げる方法」を基礎から丁寧に解説しており、演習問題も充実しているので、一冊読み通すとかなり力がつくのではないかと思う。

以下、後編を書いた後に気付いた話について少し補足。題して「Rx と『普通のやつらの上を行け』の意外な関係」

Observable の由来

後編で延々と書いたように、Reactive Extensions (Rx) の ObservableIterable の双対になっている。

() => (() => T)     // Iterable[T]
(T => Unit) => Unit // Observable[T]

では、形式的な説明はそれでいいとして*1、実際のところ Observable のアイデアはどこから来たのだろうか?

Erik Meijer の以下の投稿によれば、Observable は Rx の前身である Volta プロジェクト*2で非同期呼び出しをうまく扱う方法を探している時に見出したものだという:

We started working on IObservable/IObserver a long time ago when we were trying to make asynchronous calls that arose from tier-splitting palatable. Initially we used just the continuation monad but then discovered the beautiful duality with IEnumerable and IEnumerator.

F# vs. Rx: Msdn forums - Reactive Extensions (Rx)

そして、これは継続モナド (continuation monad) から着想を得たものであるらしい。*3

class Cont[R, +A](val runCont: (A => R) => R) { ... }

継続渡し形式の関数 (CPS function)

上記の継続モナド Cont が保持している関数 runCont: (A => R) => R継続渡し形式 (Continuation Passing Style; CPS) の関数という名前で呼ばれている。そして、後編で導出した Observable を表す「引数に渡されたコールバック関数に値を渡して実行する高階関数」も同様に CPS 関数だ(RUnit を適用してみよう)。

(A => R   ) => R    // runCont
(T => Unit) => Unit // Observable

継続渡しの「継続」とは、ここでは CPS 関数に渡されるコールバックを指している。CPS の詳しい説明は下記に挙げたページを見てほしいのだが、簡単に言うと「関数を呼び出して、戻ってきたら『続きの処理』を実行する」代わりに「関数に『続きの処理』を渡して呼び出し、その関数の最後で実行してもらう」というやり方だ。この「続きの処理」を継続と呼ぶ。

継続渡しで「普通のやつらの上を行け」

一般に「継続」は扱いの難しいプログラミングコンセプトとされていて、あまり積極的に活用されることがない(と思う)。では、なぜ非同期呼び出しの文脈で継続が出てくるのかというと、「処理をある所で中断してコンテキストを保存し、続きの処理が再開されるときに受け渡す」というパターンが CPS の考え方にバッチリはまるからだろう。以下、「なんでも継続」から引用:

ポイントは、外部の処理を呼びたいのだが、呼び出して戻り値を受け取るという形式が使えないケースにある。

例えばユーザインタフェースだ。処理の途中でユーザーに何か入力を促し、その結果を使って処理を続けたいことは良くある。しかし多くのGUIプログラミングでは、ユーザーの入力を受け付けるためには一度GUIのイベントループに戻らなければならない。したがって、プログラマは 処理をユーザーの入力の前にやる処理Aとユーザーの入力の後にやる処理Bに分けて、

 1. 処理Aの最後に入力ウィンドウをポップアップし、イベントループに戻る

 2. 入力ウィンドウの "OK" ボタンが押されるイベントが発生した時に 処理Bが呼ばれるようにする。

という具合にコーディングしているはずだ。この時、まさに処理Bは処理Aの「継続」なのだ。(Webアプリケーションにも全く同じ原理が使えることを指摘しておこう。 ユーザーからの入力が必要になった時、Webアプリケーションは一度入力フォームを吐き出してhttpサーバに制御を戻さなければならない。「普通のやつらの上を行け」Paul Grahamが述べているYahoo! Storeの システムはまさにこの技術を実装している。

つまり、歴史的にも意味的にも、Observable はまさにユースケースを非同期データストリームに絞って扱いやすくした継続モナドだということになる。

アカデミックな知識が MUST になる時代?

この「なんでも継続」の記事は(ブクマを見る限り)少なくとも 2006 年頃からあって、私も何度か目を通していた。のだが、正直なところ、今回記事を書くために読み返すまで UI や Web アプリへの応用という話は完全に忘れていた。

ぶっちゃけると、00 年代後半(?)にウェブ界隈で何度か不動点コンビネータの話がバズっていた頃、Yコンビネータの話題に関連してこの記事を読んでいるはずなのだが、当時は「いかにもギークの好きそうな頭の体操」として受け止めていて、応用の可能性についてはろくに考えが及んでいなかったと思う。

とかく、こうしたアカデミックな形式知は「小難しくて実用性がない」として軽んじられがちだ。しかし、関数型プログラミングしかり、こうして積み上げられてきた知見が実践的な問題に取り組むためのフレームワークとして活用されるケースは増えているし、海外の新しい OSS の動向を見るに、今後ますます増えていくだろうという感想を持っている。

全てのソフトウェア技術者がこうした方面の知識を習得すべきだとは思わない。しかし、個人的な感想としては、それぞれの現場で新技術の開発や選定に関わるリーダーや、エバンジェリストを自認するオピニオンリーダーにとって、このような学問的知識の習得が MUST になる時代はそう遠くない、という予感は日々強くなっている。

*1:ちなみに、Meijer は Rx の公開前にアップされた紹介動画の時点で既に双対性についての議論を披露している。この双対おじさん、筋金入りである…。

*2:Rx の電気ウナギのアイコンは、Volta のものをそのまま引き継いでいるらしい。

*3:Scala でのコード例は kmizu さんのコードから引用。

関数型プログラマのための Rx 入門(後編)

前編では、Reactive Extensions (Rx) の機能を関数型プログラミングの視点で読み解いた。続いて後編では、前編で紹介した Rx が関数型的な機能を提供している背景、つまり Observable と他の一般的なコンテナの関係に対してスポットライトを当ててみたい。

あらかじめ断っておくと、本編の話題は、実際に Rx を使う上で理解している必要は(あまり)ない。とりあえず、

  1. Observable は、ListFuture と同じくモナドの一種である
  2. 以下の表に出てくるコンテナは、隣同士で互いによく似た(あるいは正反対の)性質を持っている:
単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

…という話だけ記憶に留めてもらえば、ここで回れ右してもオーケー。とはいえ、興味のある人はこの先に目を通しておくと、今後同じような非同期データストリームのライブラリを使ったり、あるいは自分で作るときに役に立つ、かもしれない。

後編では、まず Observable が様々な型クラスのインスタンスであることを説明する。続いて、Observable が Iterable の「双対」の関係にあることや、Observable と Future の関係を紹介する。

Observable は (モナド|モノイド|...) である

flatMap メソッドがあることからも分かる通り、Observableモナドインスタンスだ。

Observable が実際にモナドを満たすことの証明は、@everpeace さんが ScalaCheck によるテストを公開している。*1

just(x).flatMap(f)      === f(x) // (1) 左単位元律
o.flatMap(just)         === o    // (2) 右単位元律
o.flatMap(f).flatMap(g) === o.flatMap(x => f(x).flatMap(g)) // (3) 結合律

また、モナド以外についても、同様に Observable に対する様々な型クラスやモナド変換子を Scalaz で実装した rxscalaz が公開されている。例えば、rxscalaz には Observable に対する Monoid 型クラスの実装がある。すなわち、Observable は二項演算 ++ に関してモノイドになっている。

implicit def observableMonoid[A] = new Monoid[Observable[A]] {
  override def zero: Observable[A] = Observable.empty
  override def append(f1: Observable[A], f2: => Observable[A]): Observable[A] = f1 ++ f2
}

rxscalaz は他にも、型クラス MonadPlus, Traverse, Foldable などや、モナド変換子 ObservableT の実装を提供している。

ところで、Haskell や Scalaz の経験がある人にとっては言うまでもないが、ListFuture もこれらの型クラスのインスタンスだ。例えば、List, Future そして Observable はすべてモナドインスタンスになっている。

前回の記事で「Observable は List や Future と共通した性質を持つ」という話を繰り返し取り上げたが、これはつまり「Observable と◯◯はどちらも (モナド|モノイド|...) のインスタンスである」という話だったのだ、ということが分かると思う。

pull モデルと push モデルの双対性

前編で、Observable はイベントストリームの一種だと述べた。では、Observable は IterableFuture のような他の種類のコンテナと何が同じで、何が違うのだろうか。

ここで、コンテナ同士を共通の議論の土台の上で比較するために、各コンテナの性質を一つの関数として書き表すという抽象化の操作を施してみる。すると…?

コンテナの四象

様々なコレクション(コンテナ)をユースケース別に整理した表を以下に示す:

単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

このようにコンテナは、T(あるいは失敗の文脈を付与した Try[T])や Iterable[T] のように、ユーザが値を同期的に pull するコンテナと、Future[T]Observable[T] のように、ユーザの用意したコールバックに値を非同期に push するコンテナの二種類に整理できる。

また、別の軸で見ると、さらに単一の値を格納するコンテナ複数の値を格納するコンテナの二つに分けることができる。

この表を眺めると、縦方向や横方向に隣り合ったコンテナ同士には何か共通の性質がありそうに思える。しかし、当然ながらこれらは API やセマンティクスが互いに異なるので、単純な比較はできない。

この課題を、冒頭で述べた「コンテナの性質を一つの関数に書き表す」というシンプル化の手法(トリック)を導入することで解決してみよう。

Iterable から Observable をつくる

例として、縦方向に隣り合っている IterableObservable を比較してみよう。

まず、Iterable の API を確認する。Iterable から値を取り出すには、iterator() メソッドIterator を生成する必要がある。次いで、生成した Iterator に対して next() メソッドを次々に呼び出すことで値を取得する。

Iterable#iterator: () => Iterator[T]
Iterator#next:     () => T

これを単純化して一つの高階関数に書き直すと、型シグネチャは以下のようになる:

   () => (() => T) // Iterable[T]

ここで、この関数の「矢印をひっくり返した」関数を作ってみよう(無引数は Unit に書き直している):

   (T => Unit) => Unit // ← 反転!
// ~~~~~~~~~~~
//  callback

この操作によって作られた関数は何を意味するのか。関数の型シグネチャを読み下すと、「引数に渡されたコールバック関数に値を渡して実行する高階関数に相当することに気付く。つまり、この関数は Observablesubscribe メソッドと本質的に同じものなのだ:

trait Observable[T] {
  def subscribe(onNext: (T) ⇒ Unit): Subscription
}

結果として、IteratorAPI を関数の形に単純化してひっくり返すだけで ObservableAPI が導出できた。つまり、pull モデルのコンテナAPIpush モデルのコンテナAPI はある種の対称性を持っていることが分かる。

Erik Meijer は、Coursera の講義において、この対称性を指してIterableObservable数学的双対 (mathematical dual) である」と呼んでいる。一見すると異なる API を持つコンテナ同士が、実は対称的な関係にあって互いに導出可能である、というわけだ。

エラー状態に対応する

さて、前節では話を簡単にするために正常系 (onNext) のみを議論した。そのため、onCompleted イベントや onError イベントの存在を無視している。*2

なので、次の話に進む前に、Iterable から関数を作る際にエラー状態の文脈を加えることで、同様に「エラー状態に対応した Observable」を導出しておく。

先ほど、Iterator から値を取り出す際に next() メソッドを呼び出すと述べたが、実際にコードを書くときは、要素列の末尾を判定するため、以下のように hasNext() メソッドを組み合わせるはずだ:

val it = iterable.iterator
while (it.hasNext) {
  val a = it.next()
  ...
}

ここで、「hasNext()true の時は値が得られるが、false の場合は得られない」ことを表すため、先ほど作った関数の戻り値を Option で囲う。また、next() は例外を投げることがあるので戻り値をさらに Try(後述)で囲う。

このようにエラー状態をエンコードしたバージョンの Iterable は、以下のような高階関数として表せる:

() => (() => Try[Option[T]])     // Iterable[T]

同様にこれを反転すると、エラー状態を表現する能力を持った Observable の関数が導出される:

(Try[Option[T]] => Unit) => Unit // Observable[T]

なお Try[T] は、演算が「成功した場合」「失敗した場合」の二つの文脈を表す代数的データ型だ。

sealed abstract class Try[+T]
case class Success[+T](value: T) extends Try[T]
case class Failure[+T](exception: Throwable) extends Try[T]

したがって、Observable に渡されるコールバックが受け取るイベントは、以下の三種類のうちのいずれかになる:

  • Success[Some[T]]
  • Success[None]
  • Failure

ここで、Success[None] を Observable の完了イベントをエンコードしたものと解釈すれば、コールバックは、それぞれ onNext, onCompleted, onError イベントを表す代数的データ型を引数にとる関数になっている。

これは、実際の subscribeAPI とも一致している:

def subscribe(onNext: (T) ⇒ Unit, onError: (Throwable) ⇒ Unit, onCompleted: () ⇒ Unit): Subscription
def subscribe(observer: Observer[T]): Subscription
// trait Observer[T] {
//   def onNext(value: T): Unit
//   def onError(error: Throwable): Unit
//   def onComplete(): Unit
// }

ところで、もし subscribeAPI が導出した関数の通りになっているなら、イベントハンドラはこんな風に書けるだろう:

o.subscribe {
  case Success(Some(a)) => ...
  case Failure(t) => ...
  case Success(None) => ...
}

もちろん、実際にはこんなパターンマッチはできないが、一方で Rx には Notification という代数的データ型が含まれている。

sealed trait Notification[+T]
object Notification {
  case class OnNext[+T](value: T) extends Notification[T]
  case class OnError[+T](error: Throwable) extends Notification[T]
  case object OnCompleted extends Notification[Nothing]
}

Notification を使うには、Observable インスタンスmaterialize: Observable[Notification[T]] メソッドを使って、通常のストリームを Notification のストリームへ変換する必要がある。

f:id:okapies:20150310022337p:plain

Notification をパターンマッチするコードはこのように書くことができ、先ほど Iterable の双対として導出した API と一致していることが分かると思う。

o.materialize.foreach {
  case OnNext(a) => ...
  case OnError(t) => ...
  case OnCompleted => ...
}

Future と Observable の関係

前節では、コンテナの表を縦方向(同期と非同期)を比較すると双対になっていることを示した。

単数 複数
同期 (pull) T/Try[T] Iterable[T]
非同期 (push) Future[T] Observable[T]

では、縦方向ではなく横方向の関係(単一と複数)はどうだろうか? 単一の値を格納する Future と、複数の値を格納する Observable を比べてみよう。

例によって、FutureObservable を関数に書き表して比較してみる。Future API の関数への書き直しは onComplete[U](f: (Try[T]) => U): Unit メソッドがそのまま使える(戻り値の U は単純化して Unit にしている):

(Try[T] => Unit) => Unit // Future[T]

これを先ほど Observable の関数と比較してみると、二つは非常によく似た型シグネチャを持つことが分かる:

(Try[       T ] => Unit) => Unit // Future[T]
(Try[Option[T]] => Unit) => Unit // Observable[T]

一方で、型シグネチャを読み下すと、Future のコールバックは2種類のイベント

  • Success[T]
  • Failure

を受け取るのに対して、Observable のコールバックは3種類のイベント

  • Success[Some[T]]
  • Success[None]
  • Failure

を受け取るようになっている。このように二つは似通っているが、Observable だけが完了 (Success[None]) イベントに相当するものを持っている。

これは、単一の値を格納する、つまり値が一つやってきた時点で自動的に「完了」する Future に対して、複数の値を格納する Observable はデータが無限に流れてくる可能性があり、ストリームの終端をシグナルする明示的な完了イベントが必要であることに対応していると考えることができる。

そして、この観察は Observable の実際の挙動とも一致している。RxScala では、Observable の from 関数で FutureObservable に変換できるが、その実装を見ると、Future の成功イベント (Success) が Observable のデータイベントと完了イベントの組 (onNext + onComplete) にマッピングされている。

def from[T](f: Future[T]): Observable[T] = {
  val s = AsyncSubject[T]()
  f.onComplete {
    case Failure(e) => s.onError(e)
    case Success(c) => s.onNext(c); s.onCompleted()
  }
  s
}

f:id:okapies:20150304003357p:plain

以上の議論から、単純化すると Observable複数の値の処理に対応した Future(あるいはその逆)であり、本質的なセマンティクスに大きな差はないことが分かる。逆に言えば、この点をどう扱うかが、非同期処理のプログラミングにおいて Rx を特徴付ける大きなポイントになっていると考えられる。

まとめ

本編では、Observable と、IterableFuture のような他の種類のコンテナとの間には密接な関係があることを示した。

いちおう補足しておくと、これは Observable が本当に Iterable の双対として作られたとか、そういう話ではない。言うまでもなく両者は独立に開発されたものだし、また前編で見たように、実際には各コンテナはそれぞれのユースケースに応じた「肉付け」をしており、それがライブラリの最終的な使いやすさを決めるからだ。

一方で、それらのライブラリの背景に見い出される基本的な、根本的なアイデアを理解しておくことはやはり大切だと思う。なぜなら、誰かが作ったライブラリを使う際に、あるいは自分で作る際に、様々な選択肢の中から一つを選び取る際の確かな判断基準になるからだ。

OSS 全盛の昨今、一つのユースケースに対して様々なライブラリが提案されることが増えている。一方で、パッと見にはそれらの違いがよく分からずに、何となく取っ付きやすくて人気のありそうなものを選んで失敗したり、先人の積み上げてきた体系やノウハウを無視して表層だけ真似たものを作った結果、仕様レベルで欠陥のあるものを作ってしまうといったことが増えているように思う。

では、どうすれば良いのか。個々のライブラリの使い方を丸暗記することも大事だが、同時にプログラミングにおける「良いプラクティス」を体系的に学ぶ機会を作るべきだと思う。みなさんもご存知の通り、そのような体系はいくつか提案されていて、オブジェクト指向におけるデザイン・パターン、ソフトウェア設計におけるドメイン駆動開発 (DDD)、あるいはこの記事でフィーチャーした関数型プログラミングといったものがある。

前編でも述べたように、関数型プログラミングが提唱する様々な原則(第一級関数、参照透過性、モナド、…)は、組み立て可能性 (composability) の高いビルディング・ブロックの作り方を体系的に整理したものだと考えられる。Rx は、その原則に従うことで、関数型プログラミングが持つメリットの多くを受け継ぐことに成功している。

また、Rx は、関数型が提供する機能に加えて、非同期データストリームのプログラミングにおいて必要となる数々の機能を提供している。Rx をよく調べることで、今後出てくる非同期データストリームのライブラリが従うべき「良いプラクティス」が見えてくると思う。

参考文献

*1:下記の式に出てくる === は等号っぽいふわっとした何かということで許してください…。justdef just[A](a: A) = rx.lang.scala.Observable.just(a) のつもり。

*2:Coursera の講義では最初からエラー状態を織り込んだ形で議論している。この記事では、話を分かりやすくするためにこの解説記事を参考にして構成した。

関数型プログラマのための Rx 入門(前編)

概要

『Observable は単なる非同期データストリームにおけるモナドインスタンスだよ。何か問題でも?』

まともな概要

つまり、Reactive Extensions (Rx) って何だ?

ということでウェブをガサゴソと漁っていたところ、オンライン講義サービス CourseraPrinciples of Reactive Programming に行き当たった。この講座では、Rx の主要開発者の一人である「双対おじさん」こと Erik Meijer 氏自らが一部の章を担当し、Rx の理論的側面を講義している。

この講座の大きな特徴は、Rx を(命令型プログラミングではなく)関数型プログラミング (FP) の側から解き明かしていくことにある。

こう書くと奇をてらっているように見えるかもしれないが、実際には Rx は FRP (Functional Reactive Programming) のバリエーションの一つとされており、Rx を関数型プログラミングの一応用として説明するのはさほど不思議なことではない。

では、あるフレームワークを関数型のパラダイムに則って作るメリットは何だろうか? 一つ挙げるとすれば、それは少数のシンプルな概念の組み合わせで多数の具体例を作り出せることだと思う。

関数型プログラミングには、組み立て可能性 (composability) の高いビルディング・ブロックの作り方について多くの知見が蓄積されている。フレームワークは、関数型のイディオムに沿った API を提供することで、個別のユースケースごとに実装を用意することなく、関数を組み合わせるだけで様々なユースケースに対応できるようになる。

これは、フレームワークを使う側にとっても同じことが言える。フレームワークの使い方を調べる時に、すでに使いどころがよく知られている定番の関数と、その背景にある抽象的な概念をそのまま当てはめられるからだ。

例えば、Rx の機能の中核を担う Observable には実に 200 以上のメソッドがあり、これらの使い方を一個一個まじめに暗記していては日が暮れてしまう。しかし、これらのメソッドを「関数型の眼」で分析してみると、その多くが、お馴染みの高階関数のバリエーションであることに気付くはずだ。

実際に、私はこの講座を視聴してやっと Rx の使い方や設計思想が理解できるようになった。

記事の構成

この記事では、関数型プログラミングについて基本的な知識があることを前提に、Coursera の Meijer 氏の講義内容をベースに Rx を紹介する。

この記事は二つのパートに分かれている。前編では、Rx の API関数型プログラミングの観点から読み解く方法を紹介した後、実際に部品となる関数を組み立てて非同期イベント処理を実装してみる。そして後編では、Rx の理論的側面について少し突っ込んだ議論を紹介していく。

追記: 補足記事(”ReactiveX と「普通のやつらの上を行け」の意外な関係”)を書きました。

また、この記事では Rx を個別のユースケースに当てはめる方法については簡単な紹介に止める。より具体的な例に興味がある方には、id:ninjinkun 氏が翻訳されたあなたが求めていたリアクティブプログラミング入門がとても素晴らしい記事なのでぜひ一読してほしい。

ちなみに、ついさっき気付いたが、元ネタの Coursera の講座が 4/13 から一年半ぶりに第二期を開講!するようだ。ぜひ、この機会に受講してみてはいかがだろうか。

Reactive Extensions の概要

公式サイトでは、Rx を以下のように説明している:

Reactive Extensions (Rx) は、観測可能 (observable) なシーケンスと LINQ スタイルのクエリ演算子を使って、非同期なイベントベースのプログラムを合成するライブラリです。Rx を使うと、開発者は非同期データストリームを Observable で表し、非同期データストリームに対するクエリに LINQ 演算子を使い、非同期ストリームの並行性を Scheduler でパラメタ化します。簡単に言えば、Rx = Observable + LINQ + Scheduler です。

非同期処理を扱うライブラリというと FuturePromise を思い浮かべる人も多いと思う。しかし、Future/Promise が単一の非同期イベントを一つずつ処理するモデルなのに対し、Rx の Observable(時間や順序のある)複数イベントのストリームを扱う処理を対象としている点が異なる。

データストリームの具体例としては、デスクトップアプリにおいては「マウスイベント」、ウェブサービスにおいては「株価情報」や Twitter の「タイムライン」などが分かりやすいだろう。

Rx の APIGoF の Observer パターンを踏襲している。つまり、観測対象 (Observable) のイベントを観測者 (Observer) が購読 (subscribe) するという形式をとる:

trait Observable[T] {
  def subscribe(observer: Observer[T]): Subscription
}

Observable から Observer に通知されるイベントは三種類ある。これらは、Observer のコールバックメソッドとして実装する:

trait Observer[T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
  def onComplete(): Unit
}

すなわち、Observable に新しいデータが来るたびに onNext が呼ばれる。そして、ストリームが終了 (terminate) する時に onComplete が呼ばれる。一方で、エラーが起きると onError が呼ばれる。

Rx では、onCompleteonError のどちらかが発生した時点でストリームは終了し、これ以後はイベントは発生しない。逆に言うと、完了イベントやエラーイベントが起きない限りは無限ストリームになる。また、onNext逐次的 (sequential) に呼ばれる。つまり、並行には呼ばれないので競合状態 (race condition) を気にする必要がない。

また、Observable が終了する前でも Observer を明示的に購読解除 (unsubscribe) できる。購読を解除するには、subscribe した時に返される Subscriptionunsubscribe() を呼び出す。

trait Subscription {
  def unsubscribe(): Unit
}

LINQ関数型プログラミング

ところで Observable には、subscribe の他にも mapflatMap といった関数型プログラミングでおなじみの関数も多数用意されている:

def map[R](func: (T) => R): Observable[R]
def flatMap[R](f: (T) => Observable[R]): Observable[R]
def foreach(onNext: (T) => Unit): Unit
def filter(predicate: (T) => Boolean): Observable[T]
def take(n: Int): Observable[T]
def takeWhile(predicate: (T) => Boolean): Observable[T]
def toList: Observable[List[T]]
def zip[U](that: Observable[U]): Observable[(T, U)]

なぜ、ここにこんなものがあるんだろう?

"Rx = Observable + LINQ + Scheduler" だったことを思い出してほしい。オリジナルの .NET 版では、Observable を LINQ で操作できるように 標準クエリ演算子の実装を提供している。上記の関数は RxScalaAPI だが、これは標準クエリ演算子Scala のコレクション操作で用いられる高階関数へと置き換えたものだ。

なぜ置き換えられるのか? LINQ はコレクションやデータベースに対して SQL 風のクエリ式を書けるようにするための機能だが、その下にある標準クエリ演算子は、関数型言語のコレクションライブラリが提供する高階関数と実質的に同じものだからだ。例えば、標準クエリ演算子Selectmap 関数に対応する(LINQScala の対応は @eed3si9n さんの「Scala脳のための C# LINQ」が参考になる)。

従来の Observer パターンの考え方からすると、高階関数の導入は唐突に写るかもしれない。しかし、実のところ Observable は一種のストリーム(無限リスト)とみなせるので、関数型ライブラリのコレクション操作関数と非常に相性が良い。

Observer パターンの語彙と関数型の語彙が同じものを指している場合もある。例えば、Observable の foreach メソッドのドキュメントや実装を見てみると「foreachsubscribeエイリアスだ」と明記されている。どちらも「イベントが通知されるたびに何か処理をする」メソッドだからだ。

def subscribe(onNext: (T) ⇒ Unit): Subscription
def foreach(onNext: (T) => Unit): Unit

一方で違いもある。例えば Observable は無限ストリームなので foldRight は定義されてない。また、非同期コレクションなので、foldLeftreduce は、集約された値をそのまま返す代わりに Observable に包んで返すようになっている。

def foldLeft[R](initialValue: R)(accumulator: (R, T) ⇒ R): Observable[R]
def reduce[U >: T](accumulator: (U, U) ⇒ U): Observable[U]

また、非同期データストリームのユースケースに対応するために、switchcombineLatest といったメソッドが数多く追加されている。

しかし、もし未知の関数が出てきてもさほど恐れる必要はない。関数の型シグネチャとマーブルダイアグラム(下図)に注目すれば、どんな機能か把握するのはさほど難しくないからだ。

f:id:okapies:20150224023017p:plain(以下、マーブルダイアグラムは RxJava の Javadoc からの引用)

確かに、ObservableAPI を真正面から読み解こうとすると、大量のメソッド「Observable シーケンスの各要素を Observable シーケンスの新たなシーケンスへ射影し…」のような宇宙語に当惑すること必至だ。けれども、視点を変えてこれらのメソッドを関数型の機能として捉えなおしてみると、一転して強い一貫性が見えてくるはずだ。

Rx の「糊」

有名な「なぜ関数プログラミングは重要か」の中で、John Hughes は「プログラミング言語にとって高階関数と遅延評価はモジュールを貼り合わせる新しい糊である」という趣旨のことを述べている。

そんなわけで、非同期データストリーム処理の「糊」である Observable高階関数の実例をいくつか見てみよう。

  • map は Observable の各要素に「データを別のデータに変換する」関数を適用して新しい Observable を作る
def map[R](func: (T) => R): Observable[R]

f:id:okapies:20150224023017p:plain

  • flatMap は、同様に「データを Observable に変換する関数」を適用して入れ子の Observable を作り、最後にマージする(念の為に付け加えると、flatMapmapflatten を組み合わせた関数だ)
def flatMap[R](f: (T) ⇒ Observable[R]): Observable[R]

f:id:okapies:20150304003356p:plain

  • groupBy は、やはり要素に関数を適用してキーを出力し、そのキーごとに要素をグルーピングした Observable を作る(戻り値型が Observable[(K, Observable[T])] になっている)
def groupBy[K](f: (T) ⇒ K): Observable[(K, Observable[T])]

f:id:okapies:20150304003358p:plain

ところで、以上で紹介した関数のマーブルダイアグラムを見ると、データだけでなく完了 (onComplete) イベントを表す「|」も、ちゃんと新しい Observable に写し取られているのが分かる。

以前の節でも見たように、Observable には onNext, onComplete, onError という三つのイベントがある。そして、Observable の高階関数は、データだけでなく全てのイベントを出力先の Observable へ自然なやり方で写す。このとき、イベント間の制約条件(一度 onComplete になったら以後 onNext は発生しない等)も引き継がれる。このため、map 等の高階関数に渡す関数を書く時に、Observable の内部状態をいちいち気にする必要がない。

以上から、Observable が提供する高階関数ListFuture が提供するものと単に型シグネチャが同じであるだけでなく、同様のセマンティクスを持つ機能として扱える。

このため、List や Future と同様に、これらの高階関数は Observable と無関係に作った好きな関数同士を繋ぎ合わせる「糊」として使うことができる:

tweets.filter(t => t.userName == "okapies").map(t => t.text)

また、RxScala では FutureObservable に変換する関数 from が用意されている。これも、Future と Observable がよく似たセマンティクスを持っており、互換性を持っている証拠と言えるだろう。

def from[T](f: Future[T])(implicit execContext: ExecutionContext): Observable[T]

f:id:okapies:20150304003357p:plain

ただし、Observable には「時間と順序」の両方が関わってくるので、List や Future とは異なるセマンティクスを持つ関数も存在する。その代表例が concatflatten だ。こうした関数の時間に関する挙動を調べるときは、API ドキュメントに載っているマーブルダイアグラムが重要な情報源になる。

この点については、後の節で「二つ以上の Observable をマージする」ケースを検討する際に詳しく見ていくことにする。

スケジューラ (Scheduler)

Scala 標準の FutureExecutionContext を受け取るように、ObservableScheduler を受け取るオプションを持っている。これが "Rx = Observable + LINQ + Scheduler" の三つ目だ。

object Future {
  def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T]
}

trait Observable[T] {
  def observeOn(scheduler: Scheduler): Observable[T]
}

どちらもタスクの(非同期)実行を制御するための機能だが、Future は単一の値を返したらそこで完了するのに対して、Observable は複数の値を返し続ける。また、タスクの実行戦略を選んだり、処理の途中で購読解除 (Subscription.unsubscribe()) できる必要がある。

Scheduler は、これらの機能をサポートするために ExecutionContext よりも複雑な仕組みになっている。

実際に使う Scheduler は、自分で実装しなくてもフレームワーク側である程度用意されている。例えば、タスクを現在のスレッド上で実行する immediate や、CPU-bound や IO-bound なタスクのための computationio などがある。

(注: RxJava の Scheduler は 1.0 に上がる時にリファクタリングされたため、Coursera 講座(第一期)での Scheduler の説明は少し内容が古くなっている。ちなみに、チケットを見ると、Meijer (@headinthebox) 氏ご本人も議論に参加していたりする)

Observable のマージ

Rx の API を調べると複数の Observable を一つの Observable に合流(マージ)する」という似たようなメソッドを、アルゴリズムを変えて何通りも提供していることに気付く。これは、それぞれの Observable に含まれるイベントの時刻が重なっている場合、それらをマージする方法や順序の決め方には色々な考え方があって一意には決まらないからだ。

例えば Observable は、List のような通常のコレクションと同様に concatflatten という二つのメソッドを提供する。どちらも「入れ子になった Observable (Observable[Observable[U]]) を一重の Observable に畳みこむ」関数だ。*1

def concat[U]: Observable[U]
def flatten[U]: Observable[U]

ところで、List ではこれらの関数のアルゴリズムconcat = flatten だが、Observable ではそれぞれ挙動が異なる。

concat は、入れ子に入っている Observable 同士の順番が維持されるようにマージする。つまり、二番目の Observable に含まれるイベントは、必ず一番目の Observable のイベントの後ろに置かれる。

一つ目の Observable が完了するまでの間、二つ目の Observable に来たイベントはバッファされ続けることに注意しよう。つまり、場合によってはメモリがオーバーフローする可能性がある。

f:id:okapies:20150223233540p:plain

一方 flatten(あるいは merge)は、イベントの発生タイミングが維持されるようにマージする。つまり、一番目の Observable に含まれるイベントの途中に、二番目の Observable のイベントが挿入されることがある。

f:id:okapies:20150223233541p:plain

このように、「二つの Observable を一つにマージする」という簡単な操作でも二つの実現方法があるので、ユースケースに応じて使い分ける必要がある。

この他にも、マージ系のメソッドには switchamb のような「複数の Observable のうち一つを採用して、他の Observable を捨てる」ものや、zipcombineLatest のように「Observable から来るデータが揃った時にペアにして出力する」ものがある。

様々なサイトに掲載されている具体例を見ると、このマージ機能をうまく組み合わせることが、Rx を活用する際の一つのポイントとなるようだ。

関数を組み立てて非同期処理を作る

この記事の冒頭で、関数型で考えるメリットは「少数のシンプルな概念の組み合わせで多数の具体例を作り出せること」だと述べた。

これを確かめるために、これまでに紹介した関数を組み立てて、実際に非同期イベント処理を実装してみよう。ここでは、Coursera 講座に出てくる Rx のコーディング例を使って説明する。

国ごとの地震データストリーム

まず、非同期データストリームのソースとして、アメリカ地質調査所 (USGS) が提供する API から全世界の地震データのストリームを取得する関数 usgs と、逆ジオコーディングサービスに問い合わせて地理座標を国名に変換する関数 reverseGeocode が使えるとしよう。

以下の関数シグネチャを見ると分かるように、どちらもウェブサービスへの非同期な問い合わせなので、戻り値の型が ObservableFuture になっている。

def usgs(): Observable[EarthQuake] = { ... }
def reverseGeocode(c: GeoCoodinate): Future[Country] = { ... }

では、この二つの関数を組み合わせて、国ごとの地震情報のストリームを作ってみよう。

まず、usgs から取得した地震データと reverseGeocode から取得した国名を map で貼り合わせて、(地震データ, 国) を要素とするストリーム withCountry を作る。reverseGeocode への問い合わせには、usgs から取り出した地震データに含まれる地理座標の情報が必要なので、コードは以下のようになる:

val withCountry: Observable[Observable[(EarthQuake, Country)]] =
  usgs().map { quake =>
    val country: Future[Country] = reverseGeocode(q.location)
    Observable.from(country.map(country => (quake, country)))
  }

ここで withCountry の戻り型を見ると Observable が入れ子になっていることに気付く。reverseGeocodeusgs と同様に非同期関数なので、新しい地震データがやってくる度に map に渡したクロージャの中で結果待ち (Future[Country]) しているからだ。このままでは扱いにくいので、flatten で入れ子の Observable に含まれるイベントを一列にマージする:

val merged: Observable[(EarthQuake, Country)] = withCountry.flatten()

これで (地震データ, 国) を要素とするストリーム merged が得られたので、最後に国をキーにして groupBy でグルーピングする:

val byCountry: Observable[(Country, Observable[(EarthQuake, Country)])] =
  merged.groupBy { case (quake, country) => country }

これで、国ごとにタグ付けされた地震データが得られるようになった。このように mapflattengroupBy といった糊を使って関数を組み立て、byCountry という具体的な関数を作ることができた。

マージ戦略を取り替える

しかし、この実装には一つ問題がある。最終的に国ごとにストリームされる地震データの順番が、ユーザが望む通りになっていない可能性があるのだ。

つまり、入れ子の Observable をマージする時に flatten を使っているので、地震データの順番が「地震の発生順」ではなく「reverseGeocode の結果が返ってきた順」になってしまう。この挙動はエンドユーザへの速報のようなユースケースでは問題ないかもしれないが、地震データを時系列順に解析したいような場合は問題だろう。

どうすればいいか? 答えは簡単で、単に merged 関数の flattenconcat に取り替えればいい。これで、望み通りに発生順の地震データが出力される(既に書いたように concat はメモリがオーバーフローする可能性があるので注意しよう)。

val merged: Observable[(EarthQuake, Country)] = withCountry.concat()

以上のように、シンプルな関数を組み合わせて作ったアプリケーションは、関数を部分的に取り替えるのも容易であり、結果として異なるユースケースにも素早く対応できることが示せたと思う。

まとめ

前編では、Rx を関数型プログラミングの視点で読み解いていくことで、API の習得やユースケースへの適用が容易になることを示した。

続いて後編では、Observable単なるモナドインスタンスだよ?という話や、ObservableIterableFuture との関係といった議論を紹介していきたい。

*1:flatten は処理系によっては単体では提供されない。C# では .SelectMany(a => a) と同等。