「なぜ関数プログラミングは重要か」を要約してみた(その1)
関数型プログラミング (functional programming) の利点を説く際によく持ち出されるのが、QuickCheck の開発者の一人である John Hughes が 1984 年に著した論文 "Why Functional Programming Matters" だ。「なぜ関数プログラミングは重要か」という題名で日本語訳もされているので、読んだことがある人も多いと思う。
要旨としては、冒頭の1章および2章で述べられている「関数型プログラミングが優れているのは、高階関数と遅延評価という、モジュール同士を貼り合わせる強力な『糊』を持っているからだ」という話がほぼ全てで、以降はそれを具体例に基づいて説明する構成になっている。ただ、その具体例として「数値計算アルゴリズム」やら「ゲーム用人工知能アルゴリズム」やらの話が延々と続くし、しかもコード例が Haskell の先祖にあたる Miranda という言語で書かれているのでなかなか取っ付きづらい。
今回、来年の1月に ScalaMatsuri で「なぜリアクティブは重要か」というお題で話をさせて頂けることになったこともあって、少し頑張って、元ネタであるこの論文を通読したので要約を公開したいと思う。なお、コード例は Scala で書いた(Scala 版のコードはこの記事を参考にしている)。
1. イントロダクション
- 本論文の目的は、関数型プログラミングの重要性を示すと共に、その利点を明確にしてフル活用できるようにすること
- 関数型プログラミングでは、プログラム全体を関数だけで構成する
- メインプログラム自身が関数であり、プログラムへの入力を引数として受け取り、結果をプログラムの出力として供給する
- メイン関数はさらに多くの関数を使って定義されるので、プログラムの最下層に至るまで関数は言語のプリミティブとなっている
- 関数型プログラミングの「利点」:
- 関数型プログラムには副作用(≒代入文)がないのでバグが減らせる
- 参照透明なので実行順序を気にしなくてよく、式をどの時点で評価してもよいのでプログラムをより数学的に扱える
- それはそうなんだけど…
- 「〜ではない」についてばかり語っている(代入文がない、副作用がない、制御フローがない)
- 「〜である」について語らないと、物質的な利益に興味がある人にはピンと来ないだろう
- 関数型プログラミングの力を語るだけでなく、それが目指す理想を示さねばならない
2. 構造化プログラミングとの類似
- 関数型プログラミングと構造化プログラミングを比較してみる
- 構造化プログラミングとは、「goto 文を含まず」「ブロックが複数の入口や出口を持たない」
- さきほどの関数型プログラミングの「利点」と同様に、否定形の説明になっている
- 「本質的な goto」のような実りのない議論の温床になった
- 構造化プログラミングの核心はモジュール化であり、大きな生産性向上をもたらす
- 小さなモジュールは素早く簡潔にコーディングできる
- 汎用モジュールの再利用によって、プログラムをより速く開発できる
- モジュールは独立してテストできるので、デバッグが容易になる
- goto は小規模プログラミングでしか役立たないが、モジュール化設計は大規模プログラミングにおいても役立つ
- プログラミング言語が問題をモジュール化する能力を高めるには、モジュール同士を貼り合わせる糊が重要
- 問題を部分問題に分割し、部分問題を解き、その解を合成する。つまり、問題を分割する方法は、解を合成する方法に依存する
- 例: 椅子を部品(座部、脚、背もたれなど)に分けて作れるのは、ジョイントや木工接着剤があるから。さもなければ、一つの木の塊から椅子を掘り出すしかない
3. 関数の貼り合せ
この章では、二種類の糊の一つ目である「高階関数」について紹介している。sum
のような単純な関数を、高階関数とその引数の組み合わせとしてモジュール化することで、reduce
のような汎用的な関数を導出する。
論文では、二つのデータ型(リストとツリー)に対して適用できる高階関数について述べている。まず、リスト操作関数の汎用化を進めて、最終的に reduce
関数と map
関数を導出する。次に、木(ツリー)構造に対する操作についても同様に redtree
と maptree
を導出する。
章の最後では、「汎用の高階関数と特有の特殊関数の組み合わせとして部品化することで、たくさんの操作を容易にプログラムできる」「新たなデータ型を定義したときは、それを処理する高階関数を書くべきだ」と結んでいる。
リスト編
リスト処理の問題を例に説明する。リストのデータ構造を(Scala で)書くとこうなる:
sealed trait ListOf[+X] case object ListNil extends ListOf[Nothing] case class Cons[X](head: X, rest: ListOf[X]) extends ListOf[X]
以上のデータ構造を使って具体的なリストを表すとこうなる:
[] は ListNil [1] は Cons(1, ListNil) [1, 2, 3] は Cons(1, Cons(2, Cons(3, ListNil)))
次に、リストの要素を足し上げる関数 sum
を定義してみる:
def sum: ListOf[Int] => Int = _ match { case ListNil => 0 case Cons(num, list) => num + sum(list) }
この定義を調べると、sum
に固有なのは初期値 0
と演算 +
だけなのが分かる*1。つまり、sum
は
- 一般的な再帰パターン(
reduce
と呼ばれる) sum
固有の部分(0
と+
)
の二つにモジュール化して、後で貼り合わせることでも作ることができる:
def add(x: Int, y: Int) = x + y def reduce[A, B](f: (A, B) => B, x: B)(list: ListOf[A]): B = list match { case ListNil => x case Cons(a, l) => f(a, reduce(f, x)(l)) } def sum: ListOf[Int] => Int = reduce(add, 0)
(注: Scala でこの書き方をするとスタックが溢れるけど、回避方法は色々なところで紹介されているので略。あと、カリー化の話も略。)
reduce
は(初期値と演算を入れ替えるだけで)様々な用途に再利用できる:
// リストの全要素の積 def product: ListOf[Int] => Int = reduce(multiply, 1 ) // リストの要素のいずれかが true か調べる def anytrue: ListOf[Boolean] => Boolean = reduce(or, false) // リストの全要素が true か調べる def alltrue: ListOf[Boolean] => Boolean = reduce(and, true )
ところで reduce
は、リストの Cons
の部分を f
で、ListNil
の部分を a
で置き換えたものと看做せる:
l = Cons(1, Cons(2, Cons(3, ListNil))) reduce(add)(0)(l) = add(1, add(2, add(3, 0))) reduce(multiply)(1)(l) = multiply(1, multiply(2, multiply(3, 1)))
つまり、reduce(Cons, ListNil)
は(Cons
を Cons
に、ListNil
を ListNil
に置き換えているだけなので)リストからリストを複写する関数とみなせるし、リスト a
と b
に対して reduce(Cons, b)(a)
は二つのリストを連結する関数となる:
def copy[A](l: ListOf[A]) = reduce(Cons[A], ListNil)(l) def append[A](a: ListOf[A], b: ListOf[A]) = reduce(Cons[A], b )(a)
次に、リストの全ての要素を2倍したリストを返す関数 doubleAll
は、doubleAndCons
関数を使って以下のように定義できる:
def doubleAndCons(num: Int, list: ListOf[Int]) = Cons(2 * num, list) def doubleAll(l: ListOf[A]) = reduce(doubleAndCons, ListNil)(l)
doubleAndCons
関数は、以下のように double
関数と fAndCons
関数の組み合わせに置き換えられる:
def double(n: Int) = 2 * n def fAndCons[A, B](f: A => B)(el: A, list: ListOf[B]) = Cons(f(el), list) // 2 * num => f(el) def doubleAndCons: (Int, ListOf[Int]) => ListOf[Int] = fAndCons(double)
ところで fAndCons
関数は f
と Cons
を合成した関数 (Cons . f
) として定義することもできる:
def fAndCons[A, B](f: A => B): (A, ListOf[B]) => ListOf[B] = (Cons[B] _).compose(f)
(注: 上記を実行するには、あらかじめ以下の暗黙変換を定義して import RichFunction2._
しておく必要がある。)
implicit class RichFunction2[T1, T2, R](f: Function2[T1, T2, R]) { def compose[A](g: (A) => T1): (A, T2) => R = (x: A, y: T2) => f(g(x), y) }
したがって、doubleAll
関数は double
, Cons
, reduce
の組み合わせで定義できる:
def doubleAll: ListOf[Int] => ListOf[Int] = reduce((Cons[Int] _).compose(double), ListNil)
ここで、double
関数をパラメータ化すると、リストの全要素に f
を適用する map
関数を導出できる:
def map[A, B](f: A => B): ListOf[A] => ListOf[B] = reduce((Cons[B] _).compose(f), ListNil) def doubleAll: ListOf[Int] => ListOf[Int] = map(double)
map
は汎用的に使える有用な関数で、例えば行列(=リストのリスト)の要素を足し上げる関数を作りたくなっても、以下のように簡潔に書ける:
def sumMatrix: ListOf[ListOf[Int]] => Int = sum.compose(map(sum))
ツリー編
ラベル付きの順序付きツリーについて考えてみる。(Scala で)データ構造を書くとこんな感じ:
sealed abstract class TreeOf[A] { def label: A def subtrees: ListOf[TreeOf[A]] } case class Node[A](label: A, subtrees: ListOf[TreeOf[A]]) extends TreeOf[A]
例えば、以下のようなツリーを:
1 o / \ / \ / \ 2 o o 3 | | | o 4
上記で定義したデータ構造で表すとこうなる:
Node(1, Cons(Node(2, ListNil), Cons(Node(3, Cons(Node(4, ListNil), ListNil)), ListNil)))
リストの時と同様に、reduce
と同じ役割を果たす redtree
関数を考えてみる。reduce
は「Cons
を置き換える何か」と「ListNil
を置き換える何か」の二つを引数に取る関数だった。同じ方針で考えてみると、redtree
は Node
と Cons
と ListNil
を置き換えた三つの何かを引数に取る関数になるはずだ。
def redtree[A, B, X](f: (X, A) => B, g: (B, A) => A, a: A)(tree: TreeOf[X]): B = { def redtreeImpl[A, B, X] (f: (X, A) => B, g: (B, A) => A, a: A)(subtrees: ListOf[TreeOf[X]]): A = subtrees match { case Cons(subtree, rest) => g(redtree(f, g, a)(subtree), redtreeImpl(f, g, a)(rest)) case ListNil => a } f(tree.label, redtreeImpl(f, g, a)(tree.subtrees)) }
reduce
と同様に、redtree
を他の関数と組み合わせて様々な関数が定義できる:
// ツリー全体の label を足し合わせる関数 def sumtree: TreeOf[Int] => Int = redtree(add, add, 0 ) // ツリー全体の label のリストを作る関数 def labels[A]: TreeOf[A] => ListOf[A] = redtree(Cons[A], append[A], ListNil)
最後に、ツリー用の map
関数である maptree
を redtree
を使って定義しておく(5章でゲーム用人工知能を実装する際に使う):
def maptree[A, B](f: A => B): TreeOf[A] => TreeOf[B] =
redtree((Node[B] _).compose(f), Cons[TreeOf[B]], ListNil)
インターミッション
ちょっと力尽きたので、今回はここまで。続きはやる気が湧いたら、ということにさせてください…。
論文では、高階関数がプログラムのモジュール化に役立つ理由について「データ型の詳細に関する知識を高階関数の中に局所化できる」と述べているが、これを逆に言うと「特定のビジネスロジックを実装した関数を、それを適用する(データ型の)文脈から切り離せる」ということになる。
で、この考え方を推し進めると、一例として「抽象的な Future」が述べているような、「本番用の非同期実行の文脈」と「テスト用の同期実行の文脈」を同じコードで切り替えるみたいな仕掛けが実現できるようになる、というわけですね。
JJUG ナイトセミナーで Reactive Streams について発表しました
6月24日の JJUG ナイトセミナーで「Reactive Streams 入門」のタイトルで発表させて頂きました。最近話題の Reactive Programming、気がついたら一万人以上が署名している Reactive Manifesto、そして Java 9 で標準化という話が進んでいる Reactive Streams をまとめて俯瞰してみました、という感じの内容になっています。
かなり戦々恐々だったのですが、思いのほかご好評をいただきとてもとてもほっとしています。発表の機会を与えて下さった JJUG スタッフの皆様、会場をご提供頂いたオラクル様、発表を聴いてくださった参加者の方々、ありがとうございました。
発表でも触れましたが、"Reactive" という概念が何を指すかについては大きな混乱があり、様々な論者が異なる定義を提唱しているのが現状です。一方で、そうした定義の背景には、それぞれに体系的な知見や学術的な議論の積み上げがあるのも確かで、その辺をちゃんと掘り下げた解説を書いてみたいなぁ、と思っていました。
そんなわけで、この半年ほど継続的に資料を収集したり、V2 にアップデートされた Reactive Manifesto の翻訳をやったりしていました。構成については、記事を書くことを念頭に以前からぼんやりと考えてはいたのですが、今回の発表準備にあたって参考にした、英語版 Wikipedia の「データフローを記述する宣言的なプログラミングモデル」と「その実行モデルを実装したランタイム」という定義を軸に据えると、Reactive の名を冠した要素技術群をそこそこ総括的に整理できるのではないか、と考えて作ったのがこのスライドになります。
このテーマについては、まだまだ考えるべきことも多そうですし、今後も継続的に研究していきたいと思っています。改めて、今回は貴重な機会を頂きありがとうございました。
余談1
"Back-Pressure" のあたりでカンバン方式とかザ・ゴール(TOC)とかを連想した / “Reactive Streams 入門 #jjug // Speaker Deck” http://t.co/nYj4AJ1LzE
— Terada Yuichiro (@u_1roh) 2015, 6月 25
たしかに、次に使う個数を書いたモノを前工程に送るのって、完全にカンバンだなぁ。日本の製造業のプラクティスがまた世界を変えてしまった(違う。
余談2
今回の発表では、Reactive Programming のような非同期プログラミングについて、私が最近考えている『プログラミングモデルについての議論は概ね決着がついていて、焦点は「いかに高機能なランタイムを提供するか」という所に移りつつあるのではないか』という話を入れてみましたが、さて、実際のところどうなんでしょうか…。みなさんはどう思われますか?
これはずっとそうだと思うのですが、特に昨今、UI プログラミングやマイクロサービスといった文脈で非同期プログラミングの需要が高まる中でも、「非同期を同期的な文法で書きたい」というニーズは非常に根強いものがあります。
しかし、以前に「マイクロサービスが Scala を選ぶ3つの理由」という記事でも書いたように、特に分散システムの文脈では(有名な「分散コンピューティングの落とし穴」が述べるように)、レイテンシや処理の失敗、ネットワークの不安定性、あるいはそれを補うための運用監視といった話題は無視できません。過去に、「同期的な文法で非同期プログラミングができる」というコンセプトを打ち出したプログラミングモデルが大体失敗に終わったのは、そういった事情でしょう。
そう考えると、「同期プログラミングにとっての異物」をプログラマの目から隠してしまうのではなく、少し考え方を変えて明示的に扱った方が、最終的には幸せになれる気がしてきます。そして、Future/Promise や(Rx の)Observable のような関数型インタフェースは、非同期な実行モデルに基づくデータフローを記述する上で、優れた抽象化を提供してくれます。Reactive Programming が、多くの場合で関数型 Reactive Programming (FRP) の同義語として扱われるのは、そうした抽象化がもたらす利便性が大きな理由でしょう。
そんなわけで、本来であれば、近年の Reactive Programming の実践は「関数型プログラミング」と極めて密接な関係があります。しかし、今回は大前提として Java ユーザの方に向けた発表なので、そういった話題は基本的に除外することを心がけました。実際、スライドを見ていただければお分かりになると思いますが、「関数型とは何か」という議論に立ち入らなくても Reactive Programming を理解し活用することは可能です。
一方で、これらのライブラリをより効果的に活用したいと望むなら、関数型の考え方を調べておくと非常に役立ちます。この手の話に関心がある方は、以前このブログで書いた「関数型プログラマのための Rx 入門」シリーズをご覧になってみてください。
Reactive Streams が 1.0.0 になった
以前に紹介した Reactive Streams 仕様が 1.0.0 になりました。リリース文はこのへん。
以下、Twitter に書いた感想をぺたぺた。途中で言及してる「つらみ」ってのはこのへんの議論とか瀬良さんのこの記事の話ですね。改めて読み返すと「解消される」ってのは言い過ぎだった(他にも課題はいっぱいある)けど、まぁ一つ障壁が取り除かれつつあるよなーという。
リアクティブストリームの仕様が 1.0.0 になったようですね。 http://t.co/TK1GEGsYJh
— seratch_ja (@seratch_ja) 2015, 5月 7
MongoDB の公式実装もあるのか。 http://t.co/YnGP5lTk6V https://t.co/BiI0B4b86Y
— seratch_ja (@seratch_ja) 2015, 5月 7
正直、最初に見た時はどうなることやらと思っていたけど、とうとう辿り着いてしまった。Typesafe って、何だかんだでエコシステムを作るのがうまい気がする。 http://t.co/DYl85vr7Ux
— Yuta Okamoto (@okapies) 2015, 5月 7
実は Rx とか Akka とかよりも、DB の対応が進んだ事の方が影響大きそう。"Reactive Slick" とか Mongo の公式実装 http://t.co/ddIuYaF3hd とか。このまま行けば、以前話題になってたノンブロッキングのつらみが解消されていくことに。
— Yuta Okamoto (@okapies) 2015, 5月 7
Reactive Streams は分散環境での話がまだ全然入ってないからまだまだ使い物になるのは先だなー、とか思ってたけど、実は v1.0 で注目すべきはそこでは無く、データベース周りがノンブロッキングのエコシステムに入ってくることだったんだ、と今気付いた。
— Yuta Okamoto (@okapies) 2015, 5月 7
うわ、Reactive Rabbit なんてのもある。RabbitMQ/AMQP の Reactive Streams 対応ドライバ。 https://t.co/sm6C1Wljff
— Yuta Okamoto (@okapies) 2015, 5月 7
ReactiveX と「普通のやつらの上を行け」の意外な関係
これは「関数型プログラマのための Rx 入門」の補足記事です(タイトル変えた)。
前編、後編とお送りしてきたこの記事だが、特に後編について「何を言ってるのか分からん」というコメントを何人かの方から頂いた。…なんというか、ごめんなさい。
繰り返しになるが、Rx を使う上で関数型プログラミングの知識は必ずしも必要ではないし、むしろ(関数型のコンセプトが基礎にあるのに関わらず)知らなくても使えるようになっている。ライブラリの作者たちは「過度な抽象化は害になる」ということを弁えているのだろう。
しかし、Rx と関数型プログラミングの関係を把握しておくと、非同期データストリームのビルディング・ブロックの作り方について大いに視野が広がるだろう。もし、貴方がこの記事の前提となる「関数型」のパラダイムに興味をお持ちなら、まずは「関数プログラミング実践入門」をお勧めしたい。
関数プログラミング実践入門 ──簡潔で、正しいコードを書くために (WEB+DB PRESS plus)
- 作者: 大川徳之
- 出版社/メーカー: 技術評論社
- 発売日: 2014/11/14
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (4件) を見る
本の内容そのものは Haskell を前提にしているが、関数型の重要なコンセプトが一通り紹介されているので、今回の記事で出てきたキーワード(高階関数、代数的データ型、モナド、…)が属する世界観を概観するのに良いと思う。
また、このテーマに本気で取り組みたい初学者の方には、つい先日に発売されたばかりの「Scala関数型デザイン&プログラミング」を併せてお勧めしたい。
Scala関数型デザイン&プログラミング ―Scalazコントリビューターによる関数型徹底ガイド (impress top gear)
- 作者: Paul Chiusano,Rúnar Bjarnason,株式会社クイープ
- 出版社/メーカー: インプレス
- 発売日: 2015/03/20
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (4件) を見る
この本は、かねてより国内外で高い評価を得ている "Functional Programming in Scala" の日本語訳になる。ざっと見た感じ非常に「歯ごたえがある」感じだが、「関数型でプログラムを組み上げる方法」を基礎から丁寧に解説しており、演習問題も充実しているので、一冊読み通すとかなり力がつくのではないかと思う。
以下、後編を書いた後に気付いた話について少し補足。題して「Rx と『普通のやつらの上を行け』の意外な関係」。
Observable の由来
後編で延々と書いたように、Reactive Extensions (Rx) の Observable は Iterable の双対になっている。
() => (() => T) // Iterable[T] (T => Unit) => Unit // Observable[T]
では、形式的な説明はそれでいいとして*1、実際のところ Observable のアイデアはどこから来たのだろうか?
Erik Meijer の以下の投稿によれば、Observable は Rx の前身である Volta プロジェクト*2で非同期呼び出しをうまく扱う方法を探している時に見出したものだという:
We started working on IObservable/IObserver a long time ago when we were trying to make asynchronous calls that arose from tier-splitting palatable. Initially we used just the continuation monad but then discovered the beautiful duality with IEnumerable and IEnumerator.
そして、これは継続モナド (continuation monad) から着想を得たものであるらしい。*3
class Cont[R, +A](val runCont: (A => R) => R) { ... }
継続渡し形式の関数 (CPS function)
上記の継続モナド Cont
が保持している関数 runCont: (A => R) => R
は継続渡し形式 (Continuation Passing Style; CPS) の関数という名前で呼ばれている。そして、後編で導出した Observable を表す「引数に渡されたコールバック関数に値を渡して実行する高階関数」も同様に CPS 関数だ(R
に Unit
を適用してみよう)。
(A => R ) => R // runCont (T => Unit) => Unit // Observable
継続渡しの「継続」とは、ここでは CPS 関数に渡されるコールバックを指している。CPS の詳しい説明は下記に挙げたページを見てほしいのだが、簡単に言うと「関数を呼び出して、戻ってきたら『続きの処理』を実行する」代わりに「関数に『続きの処理』を渡して呼び出し、その関数の最後で実行してもらう」というやり方だ。この「続きの処理」を継続と呼ぶ。
継続渡しで「普通のやつらの上を行け」
一般に「継続」は扱いの難しいプログラミングコンセプトとされていて、あまり積極的に活用されることがない(と思う)。では、なぜ非同期呼び出しの文脈で継続が出てくるのかというと、「処理をある所で中断してコンテキストを保存し、続きの処理が再開されるときに受け渡す」というパターンが CPS の考え方にバッチリはまるからだろう。以下、「なんでも継続」から引用:
ポイントは、外部の処理を呼びたいのだが、呼び出して戻り値を受け取るという形式が使えないケースにある。
例えばユーザインタフェースだ。処理の途中でユーザーに何か入力を促し、その結果を使って処理を続けたいことは良くある。しかし多くのGUIプログラミングでは、ユーザーの入力を受け付けるためには一度GUIのイベントループに戻らなければならない。したがって、プログラマは 処理をユーザーの入力の前にやる処理Aとユーザーの入力の後にやる処理Bに分けて、
1. 処理Aの最後に入力ウィンドウをポップアップし、イベントループに戻る
2. 入力ウィンドウの "OK" ボタンが押されるイベントが発生した時に 処理Bが呼ばれるようにする。
という具合にコーディングしているはずだ。この時、まさに処理Bは処理Aの「継続」なのだ。(Webアプリケーションにも全く同じ原理が使えることを指摘しておこう。 ユーザーからの入力が必要になった時、Webアプリケーションは一度入力フォームを吐き出してhttpサーバに制御を戻さなければならない。「普通のやつらの上を行け」でPaul Grahamが述べているYahoo! Storeの システムはまさにこの技術を実装している。
つまり、歴史的にも意味的にも、Observable はまさに「ユースケースを非同期データストリームに絞って扱いやすくした継続モナド」だということになる。
アカデミックな知識が MUST になる時代?
この「なんでも継続」の記事は(ブクマを見る限り)少なくとも 2006 年頃からあって、私も何度か目を通していた。のだが、正直なところ、今回記事を書くために読み返すまで UI や Web アプリへの応用という話は完全に忘れていた。
ぶっちゃけると、00 年代後半(?)にウェブ界隈で何度か不動点コンビネータの話がバズっていた頃、Yコンビネータの話題に関連してこの記事を読んでいるはずなのだが、当時は「いかにもギークの好きそうな頭の体操」として受け止めていて、応用の可能性についてはろくに考えが及んでいなかったと思う。
とかく、こうしたアカデミックな形式知は「小難しくて実用性がない」として軽んじられがちだ。しかし、関数型プログラミングしかり、こうして積み上げられてきた知見が実践的な問題に取り組むためのフレームワークとして活用されるケースは増えているし、海外の新しい OSS の動向を見るに、今後ますます増えていくだろうという感想を持っている。
全てのソフトウェア技術者がこうした方面の知識を習得すべきだとは思わない。しかし、個人的な感想としては、それぞれの現場で新技術の開発や選定に関わるリーダーや、エバンジェリストを自認するオピニオンリーダーにとって、このような学問的知識の習得が MUST になる時代はそう遠くない、という予感は日々強くなっている。
関数型プログラマのための Rx 入門(後編)
前編では、Reactive Extensions (Rx) の機能を関数型プログラミングの視点で読み解いた。続いて後編では、前編で紹介した Rx が関数型的な機能を提供している背景、つまり Observable と他の一般的なコンテナの関係に対してスポットライトを当ててみたい。
あらかじめ断っておくと、本編の話題は、実際に Rx を使う上で理解している必要は(あまり)ない。とりあえず、
- Observable は、List や Future と同じくモナドの一種である
- 以下の表に出てくるコンテナは、隣同士で互いによく似た(あるいは正反対の)性質を持っている:
単数 | 複数 | |
---|---|---|
同期 (pull) | T /Try[T] |
Iterable[T] |
非同期 (push) | Future[T] |
Observable[T] |
…という話だけ記憶に留めてもらえば、ここで回れ右してもオーケー。とはいえ、興味のある人はこの先に目を通しておくと、今後同じような非同期データストリームのライブラリを使ったり、あるいは自分で作るときに役に立つ、かもしれない。
後編では、まず Observable が様々な型クラスのインスタンスであることを説明する。続いて、Observable が Iterable の「双対」の関係にあることや、Observable と Future の関係を紹介する。
Observable は (モナド|モノイド|...) である
flatMap メソッドがあることからも分かる通り、Observable はモナドのインスタンスだ。
Observable が実際にモナド則を満たすことの証明は、@everpeace さんが ScalaCheck によるテストを公開している。*1
just(x).flatMap(f) === f(x) // (1) 左単位元律 o.flatMap(just) === o // (2) 右単位元律 o.flatMap(f).flatMap(g) === o.flatMap(x => f(x).flatMap(g)) // (3) 結合律
また、モナド以外についても、同様に Observable に対する様々な型クラスやモナド変換子を Scalaz で実装した rxscalaz が公開されている。例えば、rxscalaz には Observable に対する Monoid
型クラスの実装がある。すなわち、Observable は二項演算 ++
に関してモノイドになっている。
implicit def observableMonoid[A] = new Monoid[Observable[A]] { override def zero: Observable[A] = Observable.empty override def append(f1: Observable[A], f2: => Observable[A]): Observable[A] = f1 ++ f2 }
rxscalaz は他にも、型クラス MonadPlus
, Traverse
, Foldable
などや、モナド変換子 ObservableT
の実装を提供している。
ところで、Haskell や Scalaz の経験がある人にとっては言うまでもないが、List
や Future
もこれらの型クラスのインスタンスだ。例えば、List
, Future
そして Observable
はすべてモナドのインスタンスになっている。
前回の記事で「Observable は List や Future と共通した性質を持つ」という話を繰り返し取り上げたが、これはつまり「Observable と◯◯はどちらも (モナド|モノイド|...) のインスタンスである」という話だったのだ、ということが分かると思う。
pull モデルと push モデルの双対性
前編で、Observable はイベントストリームの一種だと述べた。では、Observable は Iterable や Future のような他の種類のコンテナと何が同じで、何が違うのだろうか。
ここで、コンテナ同士を共通の議論の土台の上で比較するために、各コンテナの性質を一つの関数として書き表すという抽象化の操作を施してみる。すると…?
コンテナの四象限
様々なコレクション(コンテナ)をユースケース別に整理した表を以下に示す:
単数 | 複数 | |
---|---|---|
同期 (pull) | T /Try[T] |
Iterable[T] |
非同期 (push) | Future[T] |
Observable[T] |
このようにコンテナは、T
(あるいは失敗の文脈を付与した Try[T]
)や Iterable[T]
のように、ユーザが値を同期的に pull するコンテナと、Future[T]
や Observable[T]
のように、ユーザの用意したコールバックに値を非同期に push するコンテナの二種類に整理できる。
また、別の軸で見ると、さらに単一の値を格納するコンテナと複数の値を格納するコンテナの二つに分けることができる。
この表を眺めると、縦方向や横方向に隣り合ったコンテナ同士には何か共通の性質がありそうに思える。しかし、当然ながらこれらは API やセマンティクスが互いに異なるので、単純な比較はできない。
この課題を、冒頭で述べた「コンテナの性質を一つの関数に書き表す」というシンプル化の手法(トリック)を導入することで解決してみよう。
Iterable から Observable をつくる
例として、縦方向に隣り合っている Iterable と Observable を比較してみよう。
まず、Iterable の API を確認する。Iterable から値を取り出すには、iterator() メソッドで Iterator を生成する必要がある。次いで、生成した Iterator に対して next() メソッドを次々に呼び出すことで値を取得する。
Iterable#iterator: () => Iterator[T] Iterator#next: () => T
これを単純化して一つの高階関数に書き直すと、型シグネチャは以下のようになる:
() => (() => T) // Iterable[T]
ここで、この関数の「矢印をひっくり返した」関数を作ってみよう(無引数は Unit
に書き直している):
(T => Unit) => Unit // ← 反転! // ~~~~~~~~~~~ // callback
この操作によって作られた関数は何を意味するのか。関数の型シグネチャを読み下すと、「引数に渡されたコールバック関数に値を渡して実行する高階関数」に相当することに気付く。つまり、この関数は Observable の subscribe
メソッドと本質的に同じものなのだ:
trait Observable[T] { def subscribe(onNext: (T) ⇒ Unit): Subscription }
結果として、Iterator の API を関数の形に単純化してひっくり返すだけで Observable の API が導出できた。つまり、pull モデルのコンテナの API と push モデルのコンテナの API はある種の対称性を持っていることが分かる。
Erik Meijer は、Coursera の講義において、この対称性を指して「Iterable と Observable は数学的双対 (mathematical dual) である」と呼んでいる。一見すると異なる API を持つコンテナ同士が、実は対称的な関係にあって互いに導出可能である、というわけだ。
エラー状態に対応する
さて、前節では話を簡単にするために正常系 (onNext) のみを議論した。そのため、onCompleted イベントや onError イベントの存在を無視している。*2
なので、次の話に進む前に、Iterable から関数を作る際にエラー状態の文脈を加えることで、同様に「エラー状態に対応した Observable」を導出しておく。
先ほど、Iterator から値を取り出す際に next()
メソッドを呼び出すと述べたが、実際にコードを書くときは、要素列の末尾を判定するため、以下のように hasNext()
メソッドを組み合わせるはずだ:
val it = iterable.iterator while (it.hasNext) { val a = it.next() ... }
ここで、「hasNext()
が true
の時は値が得られるが、false
の場合は得られない」ことを表すため、先ほど作った関数の戻り値を Option で囲う。また、next()
は例外を投げることがあるので戻り値をさらに Try(後述)で囲う。
このようにエラー状態をエンコードしたバージョンの Iterable は、以下のような高階関数として表せる:
() => (() => Try[Option[T]]) // Iterable[T]
同様にこれを反転すると、エラー状態を表現する能力を持った Observable
の関数が導出される:
(Try[Option[T]] => Unit) => Unit // Observable[T]
なお Try[T]
は、演算が「成功した場合」と「失敗した場合」の二つの文脈を表す代数的データ型だ。
sealed abstract class Try[+T] case class Success[+T](value: T) extends Try[T] case class Failure[+T](exception: Throwable) extends Try[T]
したがって、Observable に渡されるコールバックが受け取るイベントは、以下の三種類のうちのいずれかになる:
Success[Some[T]]
Success[None]
Failure
ここで、Success[None]
を Observable の完了イベントをエンコードしたものと解釈すれば、コールバックは、それぞれ onNext, onCompleted, onError イベントを表す代数的データ型を引数にとる関数になっている。
これは、実際の subscribe
の API とも一致している:
def subscribe(onNext: (T) ⇒ Unit, onError: (Throwable) ⇒ Unit, onCompleted: () ⇒ Unit): Subscription def subscribe(observer: Observer[T]): Subscription // trait Observer[T] { // def onNext(value: T): Unit // def onError(error: Throwable): Unit // def onComplete(): Unit // }
ところで、もし subscribe
の API が導出した関数の通りになっているなら、イベントハンドラはこんな風に書けるだろう:
o.subscribe { case Success(Some(a)) => ... case Failure(t) => ... case Success(None) => ... }
もちろん、実際にはこんなパターンマッチはできないが、一方で Rx には Notification という代数的データ型が含まれている。
sealed trait Notification[+T] object Notification { case class OnNext[+T](value: T) extends Notification[T] case class OnError[+T](error: Throwable) extends Notification[T] case object OnCompleted extends Notification[Nothing] }
Notification
を使うには、Observable インスタンスの materialize: Observable[Notification[T]]
メソッドを使って、通常のストリームを Notification のストリームへ変換する必要がある。
Notification をパターンマッチするコードはこのように書くことができ、先ほど Iterable の双対として導出した API と一致していることが分かると思う。
o.materialize.foreach { case OnNext(a) => ... case OnError(t) => ... case OnCompleted => ... }
Future と Observable の関係
前節では、コンテナの表を縦方向(同期と非同期)を比較すると双対になっていることを示した。
単数 | 複数 | |
---|---|---|
同期 (pull) | T /Try[T] |
Iterable[T] |
非同期 (push) | Future[T] |
Observable[T] |
では、縦方向ではなく横方向の関係(単一と複数)はどうだろうか? 単一の値を格納する Future
と、複数の値を格納する Observable
を比べてみよう。
例によって、Future と Observable を関数に書き表して比較してみる。Future API の関数への書き直しは onComplete[U](f: (Try[T]) => U): Unit
メソッドがそのまま使える(戻り値の U
は単純化して Unit
にしている):
(Try[T] => Unit) => Unit // Future[T]
これを先ほど Observable の関数と比較してみると、二つは非常によく似た型シグネチャを持つことが分かる:
(Try[ T ] => Unit) => Unit // Future[T] (Try[Option[T]] => Unit) => Unit // Observable[T]
一方で、型シグネチャを読み下すと、Future のコールバックは2種類のイベント
Success[T]
Failure
を受け取るのに対して、Observable のコールバックは3種類のイベント
Success[Some[T]]
Success[None]
Failure
を受け取るようになっている。このように二つは似通っているが、Observable だけが完了 (Success[None]
) イベントに相当するものを持っている。
これは、単一の値を格納する、つまり値が一つやってきた時点で自動的に「完了」する Future に対して、複数の値を格納する Observable はデータが無限に流れてくる可能性があり、ストリームの終端をシグナルする明示的な完了イベントが必要であることに対応していると考えることができる。
そして、この観察は Observable の実際の挙動とも一致している。RxScala では、Observable の from
関数で Future を Observable に変換できるが、その実装を見ると、Future の成功イベント (Success
) が Observable のデータイベントと完了イベントの組 (onNext
+ onComplete
) にマッピングされている。
def from[T](f: Future[T]): Observable[T] = { val s = AsyncSubject[T]() f.onComplete { case Failure(e) => s.onError(e) case Success(c) => s.onNext(c); s.onCompleted() } s }
以上の議論から、単純化すると Observable は複数の値の処理に対応した Future(あるいはその逆)であり、本質的なセマンティクスに大きな差はないことが分かる。逆に言えば、この点をどう扱うかが、非同期処理のプログラミングにおいて Rx を特徴付ける大きなポイントになっていると考えられる。
まとめ
本編では、Observable と、Iterable や Future のような他の種類のコンテナとの間には密接な関係があることを示した。
いちおう補足しておくと、これは Observable が本当に Iterable の双対として作られたとか、そういう話ではない。言うまでもなく両者は独立に開発されたものだし、また前編で見たように、実際には各コンテナはそれぞれのユースケースに応じた「肉付け」をしており、それがライブラリの最終的な使いやすさを決めるからだ。
一方で、それらのライブラリの背景に見い出される基本的な、根本的なアイデアを理解しておくことはやはり大切だと思う。なぜなら、誰かが作ったライブラリを使う際に、あるいは自分で作る際に、様々な選択肢の中から一つを選び取る際の確かな判断基準になるからだ。
OSS 全盛の昨今、一つのユースケースに対して様々なライブラリが提案されることが増えている。一方で、パッと見にはそれらの違いがよく分からずに、何となく取っ付きやすくて人気のありそうなものを選んで失敗したり、先人の積み上げてきた体系やノウハウを無視して表層だけ真似たものを作った結果、仕様レベルで欠陥のあるものを作ってしまうといったことが増えているように思う。
では、どうすれば良いのか。個々のライブラリの使い方を丸暗記することも大事だが、同時にプログラミングにおける「良いプラクティス」を体系的に学ぶ機会を作るべきだと思う。みなさんもご存知の通り、そのような体系はいくつか提案されていて、オブジェクト指向におけるデザイン・パターン、ソフトウェア設計におけるドメイン駆動開発 (DDD)、あるいはこの記事でフィーチャーした関数型プログラミングといったものがある。
前編でも述べたように、関数型プログラミングが提唱する様々な原則(第一級関数、参照透過性、モナド、…)は、組み立て可能性 (composability) の高いビルディング・ブロックの作り方を体系的に整理したものだと考えられる。Rx は、その原則に従うことで、関数型プログラミングが持つメリットの多くを受け継ぐことに成功している。
また、Rx は、関数型が提供する機能に加えて、非同期データストリームのプログラミングにおいて必要となる数々の機能を提供している。Rx をよく調べることで、今後出てくる非同期データストリームのライブラリが従うべき「良いプラクティス」が見えてくると思う。
参考文献
- Principles of Reactive Programming
- Rx (Reactive Extensions)
- RxJava Javadoc
- RxScala Scaladoc
- あなたが求めていたリアクティブプログラミング入門 - ninjinkun's diary
- Introduction to Rx
- Reactive Extensions学習ノート
- Rx入門 - xin9le.net
- Reactive Extensions再入門 - かずきのBlog@hatena
- RxのHotとColdについて
- everpeace/observable-canbe-monad
- everpeace/rxscalaz
- Certified Rx Developer - Michiel Overeem
- Subject/Observer is Dual to Iterator
- Introduction to the Reactive Framework Part II | Matthew Podwysocki
- Duality of IEnumerable/IObservable - josemiguel.torres
関数型プログラマのための Rx 入門(前編)
概要
『Observable は単なる非同期データストリームにおけるモナドのインスタンスだよ。何か問題でも?』
まともな概要
つまり、Reactive Extensions (Rx) って何だ?
ということでウェブをガサゴソと漁っていたところ、オンライン講義サービス Coursera の Principles of Reactive Programming に行き当たった。この講座では、Rx の主要開発者の一人である「双対おじさん」こと Erik Meijer 氏自らが一部の章を担当し、Rx の理論的側面を講義している。
この講座の大きな特徴は、Rx を(命令型プログラミングではなく)関数型プログラミング (FP) の側から解き明かしていくことにある。
こう書くと奇をてらっているように見えるかもしれないが、実際には Rx は FRP (Functional Reactive Programming) のバリエーションの一つとされており、Rx を関数型プログラミングの一応用として説明するのはさほど不思議なことではない。
では、あるフレームワークを関数型のパラダイムに則って作るメリットは何だろうか? 一つ挙げるとすれば、それは少数のシンプルな概念の組み合わせで多数の具体例を作り出せることだと思う。
関数型プログラミングには、組み立て可能性 (composability) の高いビルディング・ブロックの作り方について多くの知見が蓄積されている。フレームワークは、関数型のイディオムに沿った API を提供することで、個別のユースケースごとに実装を用意することなく、関数を組み合わせるだけで様々なユースケースに対応できるようになる。
これは、フレームワークを使う側にとっても同じことが言える。フレームワークの使い方を調べる時に、すでに使いどころがよく知られている定番の関数と、その背景にある抽象的な概念をそのまま当てはめられるからだ。
例えば、Rx の機能の中核を担う Observable
には実に 200 以上のメソッドがあり、これらの使い方を一個一個まじめに暗記していては日が暮れてしまう。しかし、これらのメソッドを「関数型の眼」で分析してみると、その多くが、お馴染みの高階関数のバリエーションであることに気付くはずだ。
実際に、私はこの講座を視聴してやっと Rx の使い方や設計思想が理解できるようになった。
記事の構成
この記事では、関数型プログラミングについて基本的な知識があることを前提に、Coursera の Meijer 氏の講義内容をベースに Rx を紹介する。
この記事は二つのパートに分かれている。前編では、Rx の API を関数型プログラミングの観点から読み解く方法を紹介した後、実際に部品となる関数を組み立てて非同期イベント処理を実装してみる。そして後編では、Rx の理論的側面について少し突っ込んだ議論を紹介していく。
追記: 補足記事(”ReactiveX と「普通のやつらの上を行け」の意外な関係”)を書きました。
また、この記事では Rx を個別のユースケースに当てはめる方法については簡単な紹介に止める。より具体的な例に興味がある方には、id:ninjinkun 氏が翻訳されたあなたが求めていたリアクティブプログラミング入門がとても素晴らしい記事なのでぜひ一読してほしい。
ちなみに、ついさっき気付いたが、元ネタの Coursera の講座が 4/13 から一年半ぶりに第二期を開講!するようだ。ぜひ、この機会に受講してみてはいかがだろうか。
Reactive Extensions の概要
公式サイトでは、Rx を以下のように説明している:
Reactive Extensions (Rx) は、観測可能 (observable) なシーケンスと LINQ スタイルのクエリ演算子を使って、非同期なイベントベースのプログラムを合成するライブラリです。Rx を使うと、開発者は非同期データストリームを Observable で表し、非同期データストリームに対するクエリに LINQ 演算子を使い、非同期ストリームの並行性を Scheduler でパラメタ化します。簡単に言えば、Rx = Observable + LINQ + Scheduler です。
非同期処理を扱うライブラリというと Future
や Promise
を思い浮かべる人も多いと思う。しかし、Future/Promise が単一の非同期イベントを一つずつ処理するモデルなのに対し、Rx の Observable
は(時間や順序のある)複数イベントのストリームを扱う処理を対象としている点が異なる。
データストリームの具体例としては、デスクトップアプリにおいては「マウスイベント」、ウェブサービスにおいては「株価情報」や Twitter の「タイムライン」などが分かりやすいだろう。
Rx の API は GoF の Observer パターンを踏襲している。つまり、観測対象 (Observable) のイベントを観測者 (Observer) が購読 (subscribe) するという形式をとる:
trait Observable[T] { def subscribe(observer: Observer[T]): Subscription }
Observable から Observer に通知されるイベントは三種類ある。これらは、Observer
のコールバックメソッドとして実装する:
trait Observer[T] { def onNext(value: T): Unit def onError(error: Throwable): Unit def onComplete(): Unit }
すなわち、Observable に新しいデータが来るたびに onNext
が呼ばれる。そして、ストリームが終了 (terminate) する時に onComplete
が呼ばれる。一方で、エラーが起きると onError
が呼ばれる。
Rx では、onComplete
と onError
のどちらかが発生した時点でストリームは終了し、これ以後はイベントは発生しない。逆に言うと、完了イベントやエラーイベントが起きない限りは無限ストリームになる。また、onNext
は逐次的 (sequential) に呼ばれる。つまり、並行には呼ばれないので競合状態 (race condition) を気にする必要がない。
また、Observable が終了する前でも Observer を明示的に購読解除 (unsubscribe) できる。購読を解除するには、subscribe
した時に返される Subscription
の unsubscribe()
を呼び出す。
trait Subscription { def unsubscribe(): Unit }
LINQ と関数型プログラミング
ところで Observable
には、subscribe
の他にも map
や flatMap
といった関数型プログラミングでおなじみの関数も多数用意されている:
def map[R](func: (T) => R): Observable[R] def flatMap[R](f: (T) => Observable[R]): Observable[R] def foreach(onNext: (T) => Unit): Unit def filter(predicate: (T) => Boolean): Observable[T] def take(n: Int): Observable[T] def takeWhile(predicate: (T) => Boolean): Observable[T] def toList: Observable[List[T]] def zip[U](that: Observable[U]): Observable[(T, U)]
なぜ、ここにこんなものがあるんだろう?
"Rx = Observable + LINQ + Scheduler" だったことを思い出してほしい。オリジナルの .NET 版では、Observable を LINQ で操作できるように 標準クエリ演算子の実装を提供している。上記の関数は RxScala の API だが、これは標準クエリ演算子を Scala のコレクション操作で用いられる高階関数へと置き換えたものだ。
なぜ置き換えられるのか? LINQ はコレクションやデータベースに対して SQL 風のクエリ式を書けるようにするための機能だが、その下にある標準クエリ演算子は、関数型言語のコレクションライブラリが提供する高階関数と実質的に同じものだからだ。例えば、標準クエリ演算子 の Select
は map
関数に対応する(LINQ と Scala の対応は @eed3si9n さんの「Scala脳のための C# LINQ」が参考になる)。
従来の Observer パターンの考え方からすると、高階関数の導入は唐突に写るかもしれない。しかし、実のところ Observable は一種のストリーム(無限リスト)とみなせるので、関数型ライブラリのコレクション操作関数と非常に相性が良い。
Observer パターンの語彙と関数型の語彙が同じものを指している場合もある。例えば、Observable の foreach
メソッドのドキュメントや実装を見てみると「foreach
は subscribe
のエイリアスだ」と明記されている。どちらも「イベントが通知されるたびに何か処理をする」メソッドだからだ。
def subscribe(onNext: (T) ⇒ Unit): Subscription def foreach(onNext: (T) => Unit): Unit
一方で違いもある。例えば Observable は無限ストリームなので foldRight
は定義されてない。また、非同期コレクションなので、foldLeft
や reduce
は、集約された値をそのまま返す代わりに Observable に包んで返すようになっている。
def foldLeft[R](initialValue: R)(accumulator: (R, T) ⇒ R): Observable[R] def reduce[U >: T](accumulator: (U, U) ⇒ U): Observable[U]
また、非同期データストリームのユースケースに対応するために、switch
や combineLatest
といったメソッドが数多く追加されている。
しかし、もし未知の関数が出てきてもさほど恐れる必要はない。関数の型シグネチャとマーブルダイアグラム(下図)に注目すれば、どんな機能か把握するのはさほど難しくないからだ。
(以下、マーブルダイアグラムは RxJava の Javadoc からの引用)
確かに、Observable
の API を真正面から読み解こうとすると、大量のメソッドや「Observable シーケンスの各要素を Observable シーケンスの新たなシーケンスへ射影し…」のような宇宙語に当惑すること必至だ。けれども、視点を変えてこれらのメソッドを関数型の機能として捉えなおしてみると、一転して強い一貫性が見えてくるはずだ。
Rx の「糊」
有名な「なぜ関数プログラミングは重要か」の中で、John Hughes は「プログラミング言語にとって高階関数と遅延評価はモジュールを貼り合わせる新しい糊である」という趣旨のことを述べている。
そんなわけで、非同期データストリーム処理の「糊」である Observable
の高階関数の実例をいくつか見てみよう。
map
は Observable の各要素に「データを別のデータに変換する」関数を適用して新しい Observable を作る
def map[R](func: (T) => R): Observable[R]
flatMap
は、同様に「データを Observable に変換する関数」を適用して入れ子の Observable を作り、最後にマージする(念の為に付け加えると、flatMap
はmap
とflatten
を組み合わせた関数だ)
def flatMap[R](f: (T) ⇒ Observable[R]): Observable[R]
groupBy
は、やはり要素に関数を適用してキーを出力し、そのキーごとに要素をグルーピングした Observable を作る(戻り値型がObservable[(K, Observable[T])]
になっている)
def groupBy[K](f: (T) ⇒ K): Observable[(K, Observable[T])]
ところで、以上で紹介した関数のマーブルダイアグラムを見ると、データだけでなく完了 (onComplete
) イベントを表す「|」も、ちゃんと新しい Observable に写し取られているのが分かる。
以前の節でも見たように、Observable には onNext
, onComplete
, onError
という三つのイベントがある。そして、Observable の高階関数は、データだけでなく全てのイベントを出力先の Observable へ自然なやり方で写す。このとき、イベント間の制約条件(一度 onComplete
になったら以後 onNext
は発生しない等)も引き継がれる。このため、map
等の高階関数に渡す関数を書く時に、Observable の内部状態をいちいち気にする必要がない。
以上から、Observable
が提供する高階関数は List
や Future
が提供するものと単に型シグネチャが同じであるだけでなく、同様のセマンティクスを持つ機能として扱える。
このため、List や Future と同様に、これらの高階関数は Observable と無関係に作った好きな関数同士を繋ぎ合わせる「糊」として使うことができる:
tweets.filter(t => t.userName == "okapies").map(t => t.text)
また、RxScala では Future
を Observable
に変換する関数 from
が用意されている。これも、Future と Observable がよく似たセマンティクスを持っており、互換性を持っている証拠と言えるだろう。
def from[T](f: Future[T])(implicit execContext: ExecutionContext): Observable[T]
ただし、Observable には「時間と順序」の両方が関わってくるので、List や Future とは異なるセマンティクスを持つ関数も存在する。その代表例が concat
や flatten
だ。こうした関数の時間に関する挙動を調べるときは、API ドキュメントに載っているマーブルダイアグラムが重要な情報源になる。
この点については、後の節で「二つ以上の Observable をマージする」ケースを検討する際に詳しく見ていくことにする。
スケジューラ (Scheduler)
Scala 標準の Future
が ExecutionContext
を受け取るように、Observable
も Scheduler
を受け取るオプションを持っている。これが "Rx = Observable + LINQ + Scheduler" の三つ目だ。
object Future { def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T] } trait Observable[T] { def observeOn(scheduler: Scheduler): Observable[T] }
どちらもタスクの(非同期)実行を制御するための機能だが、Future は単一の値を返したらそこで完了するのに対して、Observable は複数の値を返し続ける。また、タスクの実行戦略を選んだり、処理の途中で購読解除 (Subscription.unsubscribe()
) できる必要がある。
Scheduler は、これらの機能をサポートするために ExecutionContext よりも複雑な仕組みになっている。
実際に使う Scheduler は、自分で実装しなくてもフレームワーク側である程度用意されている。例えば、タスクを現在のスレッド上で実行する immediate
や、CPU-bound や IO-bound なタスクのための computation
や io
などがある。
(注: RxJava の Scheduler
は 1.0 に上がる時にリファクタリングされたため、Coursera 講座(第一期)での Scheduler の説明は少し内容が古くなっている。ちなみに、チケットを見ると、Meijer (@headinthebox) 氏ご本人も議論に参加していたりする)
Observable のマージ
Rx の API を調べると「複数の Observable を一つの Observable に合流(マージ)する」という似たようなメソッドを、アルゴリズムを変えて何通りも提供していることに気付く。これは、それぞれの Observable に含まれるイベントの時刻が重なっている場合、それらをマージする方法や順序の決め方には色々な考え方があって一意には決まらないからだ。
例えば Observable は、List のような通常のコレクションと同様に concat
と flatten
という二つのメソッドを提供する。どちらも「入れ子になった Observable (Observable[Observable[U]]
) を一重の Observable に畳みこむ」関数だ。*1
def concat[U]: Observable[U] def flatten[U]: Observable[U]
ところで、List
ではこれらの関数のアルゴリズムは concat = flatten
だが、Observable
ではそれぞれ挙動が異なる。
concat
は、入れ子に入っている Observable 同士の順番が維持されるようにマージする。つまり、二番目の Observable に含まれるイベントは、必ず一番目の Observable のイベントの後ろに置かれる。
一つ目の Observable が完了するまでの間、二つ目の Observable に来たイベントはバッファされ続けることに注意しよう。つまり、場合によってはメモリがオーバーフローする可能性がある。
一方 flatten
(あるいは merge
)は、イベントの発生タイミングが維持されるようにマージする。つまり、一番目の Observable に含まれるイベントの途中に、二番目の Observable のイベントが挿入されることがある。
このように、「二つの Observable を一つにマージする」という簡単な操作でも二つの実現方法があるので、ユースケースに応じて使い分ける必要がある。
この他にも、マージ系のメソッドには switch
や amb
のような「複数の Observable のうち一つを採用して、他の Observable を捨てる」ものや、zip
や combineLatest
のように「Observable から来るデータが揃った時にペアにして出力する」ものがある。
様々なサイトに掲載されている具体例を見ると、このマージ機能をうまく組み合わせることが、Rx を活用する際の一つのポイントとなるようだ。
関数を組み立てて非同期処理を作る
この記事の冒頭で、関数型で考えるメリットは「少数のシンプルな概念の組み合わせで多数の具体例を作り出せること」だと述べた。
これを確かめるために、これまでに紹介した関数を組み立てて、実際に非同期イベント処理を実装してみよう。ここでは、Coursera 講座に出てくる Rx のコーディング例を使って説明する。
国ごとの地震データストリーム
まず、非同期データストリームのソースとして、アメリカ地質調査所 (USGS) が提供する API から全世界の地震データのストリームを取得する関数 usgs
と、逆ジオコーディングサービスに問い合わせて地理座標を国名に変換する関数 reverseGeocode
が使えるとしよう。
以下の関数シグネチャを見ると分かるように、どちらもウェブサービスへの非同期な問い合わせなので、戻り値の型が Observable
や Future
になっている。
def usgs(): Observable[EarthQuake] = { ... } def reverseGeocode(c: GeoCoodinate): Future[Country] = { ... }
では、この二つの関数を組み合わせて、国ごとの地震情報のストリームを作ってみよう。
まず、usgs
から取得した地震データと reverseGeocode
から取得した国名を map
で貼り合わせて、(地震データ, 国)
を要素とするストリーム withCountry
を作る。reverseGeocode
への問い合わせには、usgs
から取り出した地震データに含まれる地理座標の情報が必要なので、コードは以下のようになる:
val withCountry: Observable[Observable[(EarthQuake, Country)]] = usgs().map { quake => val country: Future[Country] = reverseGeocode(q.location) Observable.from(country.map(country => (quake, country))) }
ここで withCountry
の戻り型を見ると Observable
が入れ子になっていることに気付く。reverseGeocode
も usgs
と同様に非同期関数なので、新しい地震データがやってくる度に map
に渡したクロージャの中で結果待ち (Future[Country]
) しているからだ。このままでは扱いにくいので、flatten
で入れ子の Observable
に含まれるイベントを一列にマージする:
val merged: Observable[(EarthQuake, Country)] = withCountry.flatten()
これで (地震データ, 国)
を要素とするストリーム merged
が得られたので、最後に国をキーにして groupBy
でグルーピングする:
val byCountry: Observable[(Country, Observable[(EarthQuake, Country)])] = merged.groupBy { case (quake, country) => country }
これで、国ごとにタグ付けされた地震データが得られるようになった。このように map
や flatten
、groupBy
といった糊を使って関数を組み立て、byCountry
という具体的な関数を作ることができた。
マージ戦略を取り替える
しかし、この実装には一つ問題がある。最終的に国ごとにストリームされる地震データの順番が、ユーザが望む通りになっていない可能性があるのだ。
つまり、入れ子の Observable
をマージする時に flatten
を使っているので、地震データの順番が「地震の発生順」ではなく「reverseGeocode
の結果が返ってきた順」になってしまう。この挙動はエンドユーザへの速報のようなユースケースでは問題ないかもしれないが、地震データを時系列順に解析したいような場合は問題だろう。
どうすればいいか? 答えは簡単で、単に merged
関数の flatten
を concat
に取り替えればいい。これで、望み通りに発生順の地震データが出力される(既に書いたように concat
はメモリがオーバーフローする可能性があるので注意しよう)。
val merged: Observable[(EarthQuake, Country)] = withCountry.concat()
以上のように、シンプルな関数を組み合わせて作ったアプリケーションは、関数を部分的に取り替えるのも容易であり、結果として異なるユースケースにも素早く対応できることが示せたと思う。
まとめ
前編では、Rx を関数型プログラミングの視点で読み解いていくことで、API の習得やユースケースへの適用が容易になることを示した。
続いて後編では、Observable
は単なるモナドのインスタンスだよ?という話や、Observable
と Iterable
や Future
との関係といった議論を紹介していきたい。
マイクロサービスのための Tumblr 製フレームワーク "Colossus"
この記事は Scala Advent Calendar 2014 の 15 日目です。昨日は id:qtamaki さんの”「関数プログラミング 珠玉のアルゴリズムデザイン」をScalaで実装してみる”でした。
今日は、先日に Tumblr が OSS 化を発表した Scala 製のノンブロッキング I/O (NIO) フレームワーク "Colossus" を紹介したい。”高性能なマイクロサービスを構築するためのフレームワーク”を謳っており、まだ OSS 化されて日が浅いものの Tumblr ではすでに production で使われているとされる。また、Colossus 自体がアクターフレームワーク Akka のアクターとして実装されており、それを使った独自のスレッドモデルを提供している点も興味深い。
基本的なコンセプト
Tumblr が Colossus を開発した狙いについて、公式サイトの冒頭でこのように述べている。
Colossus は、ノンブロッキングネットワーク I/O を必要とする高性能なアプリケーション構築のための Scala 製軽量フレームワークである。Colossus は特に、低レイテンシのステートレスなマイクロサービス(たいていはデータベースやキャッシュを抽象化したものと大きく変わらない)に焦点を当てている。Colossus は、こうしたユースケースにおいて性能を最大化すると共に、インタフェースをクリーンかつ簡潔に保つことを狙っている。
つまり…どういうことだってばよ?
マイクロサービス・アーキテクチャでは、コンポーネント化された軽量サービス同士を連携させてアプリケーションを組み立てる。つまり、多くのマイクロサービスのビジネスロジックはこのようになっているはずだ:
- クライアントからコネクションが張られてリクエストが来る。
- 他のバックエンドサービス(データベースとか)にリクエストを飛ばしてレスポンスを待つ。
- バックエンドサービスから戻ってきたレスポンスを使ってレスポンスを組み立ててクライアントに返す。
Colossus を使って書くとこんな感じ(Colossus のドキュメントより)。
Service.serve[Http]("http-service", 9000){ context => val redis = context.clientFor[Redis]("localhost", 6379) context.handle{ connection => connection.become { case request @ Get on Root / "get" / key => redis.send(Commands.Get(key)).map{ case BulkReply(data) => request.ok(data.utf8String) case NilReply => request.notFound("(nil)") } ... } } }
このように、非同期計算を抽象化した型である Future
を map
や flatMap
をつないでいくアプローチは、Scala プログラマにとって見慣れたものだ。
ところで、Colossus において redis.send
の戻り値は Callback
であり Scala 標準ライブラリや Twitter 製ライブラリに含まれているような Future
ではない。
Callback は Future
と同様のインタフェースを持つ(し、実際に Future に変換することもできる)が、スレッドセーフではない。これは、Callback は単一のワーカスレッド上で実行されることを前提にしているからだ(あと、Callback を呼び出す準備ができた際に execute()
メソッドを明示的に呼び出す必要がある。この呼び出しは通常はフレームワーク側がやってくれる)。
これは、冒頭に挙げたようなマイクロサービスのユースケースを前提にするなら、基本的にはスレッド間で状態を共有する必要がないからだ。また、データベース接続などの複数のコネクション間で共有したい状態については、イベントループごとに保持してコネクション間で共有すればよい(一つのイベントループは多数のコネクションを処理するが、シングルスレッドなのでイベントループ内では競合を心配する必要がない。また、全てはノンブロッキングに処理されるので、あるコネクションがデータベースからの応答を待っている間に、後続のコネクションがブロックすることはない)。
Future を使う場合、多くの実装はマルチスレッド動作のために ExecutionContext
内で処理を実行するが、これによりわずかだがオーバヘッドが発生する。このオーバヘッドが問題になるようなリクエストの処理においては、Colossus では代わりにシングルスレッドで動作する Callback を使うことでレイテンシを削減できる。これは、リクエストのサイズが小さくステートレスな場合に特に有効だと言える。
アーキテクチャ
Colossus の Core Layer は Akka 上で動作するアクターであり、例えばイベントループは下記のような”自分にメッセージを送るアクター”として実装されている(実際のコードはこのあたりにある)。
receive message {
case `Select` => {
event_handlers = selector.select()
...
self ! `Select`
}
...
}
ドキュメントから読み取れるアーキテクチャを図示するとこんな感じか:
Colossus 全体を駆動するのが IOSystem
で、作成時に ActorSystem
と関連付けて指定した数の worker(イベントループ)を作成する。なお、例えばアプリケーションが複数の独立したサーバを実行するような場合は、一つの ActorSystem に複数の IOSystem を関連付けてもよい。
implicit val actor_system = ActorSystem("system") val io_system = IOSystem("io-system", numWorkers = 4)
IOSystem は、コネクションがやってくるとそれを各 Worker に振り分ける。各 Worker はアクターであり、ひとたび割り当てられたコネクションはずっと同じ Worker に束縛され続ける。つまり、Worker 内のイベントループが呼び出すイベントハンドラはシングルスレッドで動作することが保証される。
では、イベントループ間やコネクション間で状態を共有したい場合や、ブロックするような処理を実行したい場合はどうするか? その場合は、アクターと Future を使ってイベントループの外側で処理すればよい。
プログラミングモデル
サーバのビジネスロジックの実装方法は、関数型っぽい書き方とオブジェクト指向っぽい書き方の二通りがある。まずは関数型っぽい方から:
Service.serve[Http]("service-name", 456){context: ServiceContext[Http] => //everything here happens once per event-loop context.handle{ connecton: ConnectionContext[Http] => //everything here happens once per connection connection.become { //partial function HttpRequest => Response[HttpResponse] case req => req.ok("Hello world!") } } }
この記法は、実際には下記のような ConnectionHandler
の実装と同じことをやっている(ServiceServer
は ConnectionHandler の継承クラス):
import com.tumblr.colossus._ import protocols.Telnet._ class HelloWorldHandler(config: ServiceConfig, worker: WorkerRef) extends ServiceServer[HttpRequest, HttpResponse](new HttpServerCodec, config, worker) { def processRequest(request: HttpRequest): Response[HttpResponse] = { req.ok("Hello World!") } def processFailure(request: HttpRequest, reason: Throwable) = { request.error(s"Error: $reason") } }
テストサーバ、メトリクス、タスク
時間がないので下記を読んでください…。個人的には、最初のリリースからテスト関係や計測関係のユーティリティが含まれているのが素晴らしいなと思う。
まとめ
ドキュメントには他にも色々と紹介したい話が書いてあるんだけど、全然時間が足りなかった…。もしこれを読んで興味が湧いたら、読んでみたりコードを触ってみたりしてください。あと、Issue を見てるとまだまだ成熟してない感じなので、逆に言えば貢献のチャンスかも。
ところで、Scala 製のマイクロサービス向けフレームワークといえば、真っ先に Finagle の名前が思い浮かぶ。以前のエントリでも書いたように、Tumblr が Scala を採用するに至った大きな理由の一つに Finagle の存在がある。
では、ここに至って Tumblr が独自のフレームワークの実装を始めたのはなぜか。特に、性能については折り紙付きの Netty の代わりに Java NIO + Akka で実装しているのは何故なのか。
一つは上で説明したように、特定のユースケースにおける性能を追求していく中で Future
の”リッチな”スレッドモデルがオーバヘッドになったこと。そしてもう一つは、明示はされていないが、性能を追求して下のレイヤに手を入れていく際に、Netty の低レベル操作を中心とした API や複雑なスレッドモデルと密結合した Finagle が扱いにくかった、ということがあるのではないかと思う。例えば、Colossus のコンセプトとして掲げられている四つの原則の中にこんな項がある:
Keep it transparent - Choose how much magic you want! Colossus is built in several layers which allow you to build high-level abstractions without being walled off from taking full control.
実際、Finagle の Netty 3 依存について、2014 年度中としていた Netty 4 移行計画が 2015 2Q にズレこむなど、Finagle が Netty に対して相当に密結合している様子が見てとれる。
その一方で、大規模並列処理を実現するためのイベント駆動なミドルウェアやアプリケーションを構築する上で、Akka が使われるケースがかなり広がってきている。例えば、今回紹介した Colossus の他にも、ストレステストツールの Gatling の実装に Akka が使われているのは有名かと思う。
また、Akka 自身も Netty に相当するレイヤを Akka I/O として再実装すると共に、大規模ストリームをうまく扱うための背圧制御 (back-pressure) の仕組みを加えて "Reactive Stream" として標準化したりと、低レイヤについても手を打ってきている*1。
結果として、分散並列化したいネットワーク I/O からビジネスロジックまでを、アクターモデルという同じ抽象の上で統一的に扱えるのが Akka というランタイムの強みとなってきており、そのような扱いやすさがミドルウェアやアプリケーションの作者から支持を集めつつあるのだとすれば、Netty 陣営の”苦境”は Akka 陣営にとってはチャンスなのかもしれない。
*1:念の為に書いておくと、Colossus 自身は今のところ Akka I/O は使っていない。ただし、似たような背圧制御の仕組みを独自で持っているので、この部分を移行するという話もあり得なくもなさそうな…?