関数型プログラマのための Rx 入門(前編)

概要

『Observable は単なる非同期データストリームにおけるモナドインスタンスだよ。何か問題でも?』

まともな概要

つまり、Reactive Extensions (Rx) って何だ?

ということでウェブをガサゴソと漁っていたところ、オンライン講義サービス CourseraPrinciples of Reactive Programming に行き当たった。この講座では、Rx の主要開発者の一人である「双対おじさん」こと Erik Meijer 氏自らが一部の章を担当し、Rx の理論的側面を講義している。

この講座の大きな特徴は、Rx を(命令型プログラミングではなく)関数型プログラミング (FP) の側から解き明かしていくことにある。

こう書くと奇をてらっているように見えるかもしれないが、実際には Rx は FRP (Functional Reactive Programming) のバリエーションの一つとされており、Rx を関数型プログラミングの一応用として説明するのはさほど不思議なことではない。

では、あるフレームワークを関数型のパラダイムに則って作るメリットは何だろうか? 一つ挙げるとすれば、それは少数のシンプルな概念の組み合わせで多数の具体例を作り出せることだと思う。

関数型プログラミングには、組み立て可能性 (composability) の高いビルディング・ブロックの作り方について多くの知見が蓄積されている。フレームワークは、関数型のイディオムに沿った API を提供することで、個別のユースケースごとに実装を用意することなく、関数を組み合わせるだけで様々なユースケースに対応できるようになる。

これは、フレームワークを使う側にとっても同じことが言える。フレームワークの使い方を調べる時に、すでに使いどころがよく知られている定番の関数と、その背景にある抽象的な概念をそのまま当てはめられるからだ。

例えば、Rx の機能の中核を担う Observable には実に 200 以上のメソッドがあり、これらの使い方を一個一個まじめに暗記していては日が暮れてしまう。しかし、これらのメソッドを「関数型の眼」で分析してみると、その多くが、お馴染みの高階関数のバリエーションであることに気付くはずだ。

実際に、私はこの講座を視聴してやっと Rx の使い方や設計思想が理解できるようになった。

記事の構成

この記事では、関数型プログラミングについて基本的な知識があることを前提に、Coursera の Meijer 氏の講義内容をベースに Rx を紹介する。

この記事は二つのパートに分かれている。前編では、Rx の API関数型プログラミングの観点から読み解く方法を紹介した後、実際に部品となる関数を組み立てて非同期イベント処理を実装してみる。そして後編では、Rx の理論的側面について少し突っ込んだ議論を紹介していく。

追記: 補足記事(”ReactiveX と「普通のやつらの上を行け」の意外な関係”)を書きました。

また、この記事では Rx を個別のユースケースに当てはめる方法については簡単な紹介に止める。より具体的な例に興味がある方には、id:ninjinkun 氏が翻訳されたあなたが求めていたリアクティブプログラミング入門がとても素晴らしい記事なのでぜひ一読してほしい。

ちなみに、ついさっき気付いたが、元ネタの Coursera の講座が 4/13 から一年半ぶりに第二期を開講!するようだ。ぜひ、この機会に受講してみてはいかがだろうか。

Reactive Extensions の概要

公式サイトでは、Rx を以下のように説明している:

Reactive Extensions (Rx) は、観測可能 (observable) なシーケンスと LINQ スタイルのクエリ演算子を使って、非同期なイベントベースのプログラムを合成するライブラリです。Rx を使うと、開発者は非同期データストリームを Observable で表し、非同期データストリームに対するクエリに LINQ 演算子を使い、非同期ストリームの並行性を Scheduler でパラメタ化します。簡単に言えば、Rx = Observable + LINQ + Scheduler です。

非同期処理を扱うライブラリというと FuturePromise を思い浮かべる人も多いと思う。しかし、Future/Promise が単一の非同期イベントを一つずつ処理するモデルなのに対し、Rx の Observable(時間や順序のある)複数イベントのストリームを扱う処理を対象としている点が異なる。

データストリームの具体例としては、デスクトップアプリにおいては「マウスイベント」、ウェブサービスにおいては「株価情報」や Twitter の「タイムライン」などが分かりやすいだろう。

Rx の APIGoF の Observer パターンを踏襲している。つまり、観測対象 (Observable) のイベントを観測者 (Observer) が購読 (subscribe) するという形式をとる:

trait Observable[T] {
  def subscribe(observer: Observer[T]): Subscription
}

Observable から Observer に通知されるイベントは三種類ある。これらは、Observer のコールバックメソッドとして実装する:

trait Observer[T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
  def onComplete(): Unit
}

すなわち、Observable に新しいデータが来るたびに onNext が呼ばれる。そして、ストリームが終了 (terminate) する時に onComplete が呼ばれる。一方で、エラーが起きると onError が呼ばれる。

Rx では、onCompleteonError のどちらかが発生した時点でストリームは終了し、これ以後はイベントは発生しない。逆に言うと、完了イベントやエラーイベントが起きない限りは無限ストリームになる。また、onNext逐次的 (sequential) に呼ばれる。つまり、並行には呼ばれないので競合状態 (race condition) を気にする必要がない。

また、Observable が終了する前でも Observer を明示的に購読解除 (unsubscribe) できる。購読を解除するには、subscribe した時に返される Subscriptionunsubscribe() を呼び出す。

trait Subscription {
  def unsubscribe(): Unit
}

LINQ関数型プログラミング

ところで Observable には、subscribe の他にも mapflatMap といった関数型プログラミングでおなじみの関数も多数用意されている:

def map[R](func: (T) => R): Observable[R]
def flatMap[R](f: (T) => Observable[R]): Observable[R]
def foreach(onNext: (T) => Unit): Unit
def filter(predicate: (T) => Boolean): Observable[T]
def take(n: Int): Observable[T]
def takeWhile(predicate: (T) => Boolean): Observable[T]
def toList: Observable[List[T]]
def zip[U](that: Observable[U]): Observable[(T, U)]

なぜ、ここにこんなものがあるんだろう?

"Rx = Observable + LINQ + Scheduler" だったことを思い出してほしい。オリジナルの .NET 版では、Observable を LINQ で操作できるように 標準クエリ演算子の実装を提供している。上記の関数は RxScalaAPI だが、これは標準クエリ演算子Scala のコレクション操作で用いられる高階関数へと置き換えたものだ。

なぜ置き換えられるのか? LINQ はコレクションやデータベースに対して SQL 風のクエリ式を書けるようにするための機能だが、その下にある標準クエリ演算子は、関数型言語のコレクションライブラリが提供する高階関数と実質的に同じものだからだ。例えば、標準クエリ演算子Selectmap 関数に対応する(LINQScala の対応は @eed3si9n さんの「Scala脳のための C# LINQ」が参考になる)。

従来の Observer パターンの考え方からすると、高階関数の導入は唐突に写るかもしれない。しかし、実のところ Observable は一種のストリーム(無限リスト)とみなせるので、関数型ライブラリのコレクション操作関数と非常に相性が良い。

Observer パターンの語彙と関数型の語彙が同じものを指している場合もある。例えば、Observable の foreach メソッドのドキュメントや実装を見てみると「foreachsubscribeエイリアスだ」と明記されている。どちらも「イベントが通知されるたびに何か処理をする」メソッドだからだ。

def subscribe(onNext: (T) ⇒ Unit): Subscription
def foreach(onNext: (T) => Unit): Unit

一方で違いもある。例えば Observable は無限ストリームなので foldRight は定義されてない。また、非同期コレクションなので、foldLeftreduce は、集約された値をそのまま返す代わりに Observable に包んで返すようになっている。

def foldLeft[R](initialValue: R)(accumulator: (R, T) ⇒ R): Observable[R]
def reduce[U >: T](accumulator: (U, U) ⇒ U): Observable[U]

また、非同期データストリームのユースケースに対応するために、switchcombineLatest といったメソッドが数多く追加されている。

しかし、もし未知の関数が出てきてもさほど恐れる必要はない。関数の型シグネチャとマーブルダイアグラム(下図)に注目すれば、どんな機能か把握するのはさほど難しくないからだ。

f:id:okapies:20150224023017p:plain(以下、マーブルダイアグラムは RxJava の Javadoc からの引用)

確かに、ObservableAPI を真正面から読み解こうとすると、大量のメソッド「Observable シーケンスの各要素を Observable シーケンスの新たなシーケンスへ射影し…」のような宇宙語に当惑すること必至だ。けれども、視点を変えてこれらのメソッドを関数型の機能として捉えなおしてみると、一転して強い一貫性が見えてくるはずだ。

Rx の「糊」

有名な「なぜ関数プログラミングは重要か」の中で、John Hughes は「プログラミング言語にとって高階関数と遅延評価はモジュールを貼り合わせる新しい糊である」という趣旨のことを述べている。

そんなわけで、非同期データストリーム処理の「糊」である Observable高階関数の実例をいくつか見てみよう。

  • map は Observable の各要素に「データを別のデータに変換する」関数を適用して新しい Observable を作る
def map[R](func: (T) => R): Observable[R]

f:id:okapies:20150224023017p:plain

  • flatMap は、同様に「データを Observable に変換する関数」を適用して入れ子の Observable を作り、最後にマージする(念の為に付け加えると、flatMapmapflatten を組み合わせた関数だ)
def flatMap[R](f: (T) ⇒ Observable[R]): Observable[R]

f:id:okapies:20150304003356p:plain

  • groupBy は、やはり要素に関数を適用してキーを出力し、そのキーごとに要素をグルーピングした Observable を作る(戻り値型が Observable[(K, Observable[T])] になっている)
def groupBy[K](f: (T) ⇒ K): Observable[(K, Observable[T])]

f:id:okapies:20150304003358p:plain

ところで、以上で紹介した関数のマーブルダイアグラムを見ると、データだけでなく完了 (onComplete) イベントを表す「|」も、ちゃんと新しい Observable に写し取られているのが分かる。

以前の節でも見たように、Observable には onNext, onComplete, onError という三つのイベントがある。そして、Observable の高階関数は、データだけでなく全てのイベントを出力先の Observable へ自然なやり方で写す。このとき、イベント間の制約条件(一度 onComplete になったら以後 onNext は発生しない等)も引き継がれる。このため、map 等の高階関数に渡す関数を書く時に、Observable の内部状態をいちいち気にする必要がない。

以上から、Observable が提供する高階関数ListFuture が提供するものと単に型シグネチャが同じであるだけでなく、同様のセマンティクスを持つ機能として扱える。

このため、List や Future と同様に、これらの高階関数は Observable と無関係に作った好きな関数同士を繋ぎ合わせる「糊」として使うことができる:

tweets.filter(t => t.userName == "okapies").map(t => t.text)

また、RxScala では FutureObservable に変換する関数 from が用意されている。これも、Future と Observable がよく似たセマンティクスを持っており、互換性を持っている証拠と言えるだろう。

def from[T](f: Future[T])(implicit execContext: ExecutionContext): Observable[T]

f:id:okapies:20150304003357p:plain

ただし、Observable には「時間と順序」の両方が関わってくるので、List や Future とは異なるセマンティクスを持つ関数も存在する。その代表例が concatflatten だ。こうした関数の時間に関する挙動を調べるときは、API ドキュメントに載っているマーブルダイアグラムが重要な情報源になる。

この点については、後の節で「二つ以上の Observable をマージする」ケースを検討する際に詳しく見ていくことにする。

スケジューラ (Scheduler)

Scala 標準の FutureExecutionContext を受け取るように、ObservableScheduler を受け取るオプションを持っている。これが "Rx = Observable + LINQ + Scheduler" の三つ目だ。

object Future {
  def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T]
}

trait Observable[T] {
  def observeOn(scheduler: Scheduler): Observable[T]
}

どちらもタスクの(非同期)実行を制御するための機能だが、Future は単一の値を返したらそこで完了するのに対して、Observable は複数の値を返し続ける。また、タスクの実行戦略を選んだり、処理の途中で購読解除 (Subscription.unsubscribe()) できる必要がある。

Scheduler は、これらの機能をサポートするために ExecutionContext よりも複雑な仕組みになっている。

実際に使う Scheduler は、自分で実装しなくてもフレームワーク側である程度用意されている。例えば、タスクを現在のスレッド上で実行する immediate や、CPU-bound や IO-bound なタスクのための computationio などがある。

(注: RxJava の Scheduler は 1.0 に上がる時にリファクタリングされたため、Coursera 講座(第一期)での Scheduler の説明は少し内容が古くなっている。ちなみに、チケットを見ると、Meijer (@headinthebox) 氏ご本人も議論に参加していたりする)

Observable のマージ

Rx の API を調べると複数の Observable を一つの Observable に合流(マージ)する」という似たようなメソッドを、アルゴリズムを変えて何通りも提供していることに気付く。これは、それぞれの Observable に含まれるイベントの時刻が重なっている場合、それらをマージする方法や順序の決め方には色々な考え方があって一意には決まらないからだ。

例えば Observable は、List のような通常のコレクションと同様に concatflatten という二つのメソッドを提供する。どちらも「入れ子になった Observable (Observable[Observable[U]]) を一重の Observable に畳みこむ」関数だ。*1

def concat[U]: Observable[U]
def flatten[U]: Observable[U]

ところで、List ではこれらの関数のアルゴリズムconcat = flatten だが、Observable ではそれぞれ挙動が異なる。

concat は、入れ子に入っている Observable 同士の順番が維持されるようにマージする。つまり、二番目の Observable に含まれるイベントは、必ず一番目の Observable のイベントの後ろに置かれる。

一つ目の Observable が完了するまでの間、二つ目の Observable に来たイベントはバッファされ続けることに注意しよう。つまり、場合によってはメモリがオーバーフローする可能性がある。

f:id:okapies:20150223233540p:plain

一方 flatten(あるいは merge)は、イベントの発生タイミングが維持されるようにマージする。つまり、一番目の Observable に含まれるイベントの途中に、二番目の Observable のイベントが挿入されることがある。

f:id:okapies:20150223233541p:plain

このように、「二つの Observable を一つにマージする」という簡単な操作でも二つの実現方法があるので、ユースケースに応じて使い分ける必要がある。

この他にも、マージ系のメソッドには switchamb のような「複数の Observable のうち一つを採用して、他の Observable を捨てる」ものや、zipcombineLatest のように「Observable から来るデータが揃った時にペアにして出力する」ものがある。

様々なサイトに掲載されている具体例を見ると、このマージ機能をうまく組み合わせることが、Rx を活用する際の一つのポイントとなるようだ。

関数を組み立てて非同期処理を作る

この記事の冒頭で、関数型で考えるメリットは「少数のシンプルな概念の組み合わせで多数の具体例を作り出せること」だと述べた。

これを確かめるために、これまでに紹介した関数を組み立てて、実際に非同期イベント処理を実装してみよう。ここでは、Coursera 講座に出てくる Rx のコーディング例を使って説明する。

国ごとの地震データストリーム

まず、非同期データストリームのソースとして、アメリカ地質調査所 (USGS) が提供する API から全世界の地震データのストリームを取得する関数 usgs と、逆ジオコーディングサービスに問い合わせて地理座標を国名に変換する関数 reverseGeocode が使えるとしよう。

以下の関数シグネチャを見ると分かるように、どちらもウェブサービスへの非同期な問い合わせなので、戻り値の型が ObservableFuture になっている。

def usgs(): Observable[EarthQuake] = { ... }
def reverseGeocode(c: GeoCoodinate): Future[Country] = { ... }

では、この二つの関数を組み合わせて、国ごとの地震情報のストリームを作ってみよう。

まず、usgs から取得した地震データと reverseGeocode から取得した国名を map で貼り合わせて、(地震データ, 国) を要素とするストリーム withCountry を作る。reverseGeocode への問い合わせには、usgs から取り出した地震データに含まれる地理座標の情報が必要なので、コードは以下のようになる:

val withCountry: Observable[Observable[(EarthQuake, Country)]] =
  usgs().map { quake =>
    val country: Future[Country] = reverseGeocode(q.location)
    Observable.from(country.map(country => (quake, country)))
  }

ここで withCountry の戻り型を見ると Observable が入れ子になっていることに気付く。reverseGeocodeusgs と同様に非同期関数なので、新しい地震データがやってくる度に map に渡したクロージャの中で結果待ち (Future[Country]) しているからだ。このままでは扱いにくいので、flatten で入れ子の Observable に含まれるイベントを一列にマージする:

val merged: Observable[(EarthQuake, Country)] = withCountry.flatten()

これで (地震データ, 国) を要素とするストリーム merged が得られたので、最後に国をキーにして groupBy でグルーピングする:

val byCountry: Observable[(Country, Observable[(EarthQuake, Country)])] =
  merged.groupBy { case (quake, country) => country }

これで、国ごとにタグ付けされた地震データが得られるようになった。このように mapflattengroupBy といった糊を使って関数を組み立て、byCountry という具体的な関数を作ることができた。

マージ戦略を取り替える

しかし、この実装には一つ問題がある。最終的に国ごとにストリームされる地震データの順番が、ユーザが望む通りになっていない可能性があるのだ。

つまり、入れ子の Observable をマージする時に flatten を使っているので、地震データの順番が「地震の発生順」ではなく「reverseGeocode の結果が返ってきた順」になってしまう。この挙動はエンドユーザへの速報のようなユースケースでは問題ないかもしれないが、地震データを時系列順に解析したいような場合は問題だろう。

どうすればいいか? 答えは簡単で、単に merged 関数の flattenconcat に取り替えればいい。これで、望み通りに発生順の地震データが出力される(既に書いたように concat はメモリがオーバーフローする可能性があるので注意しよう)。

val merged: Observable[(EarthQuake, Country)] = withCountry.concat()

以上のように、シンプルな関数を組み合わせて作ったアプリケーションは、関数を部分的に取り替えるのも容易であり、結果として異なるユースケースにも素早く対応できることが示せたと思う。

まとめ

前編では、Rx を関数型プログラミングの視点で読み解いていくことで、API の習得やユースケースへの適用が容易になることを示した。

続いて後編では、Observable単なるモナドインスタンスだよ?という話や、ObservableIterableFuture との関係といった議論を紹介していきたい。

*1:flatten は処理系によっては単体では提供されない。C# では .SelectMany(a => a) と同等。

マイクロサービスのための Tumblr 製フレームワーク "Colossus"

この記事は Scala Advent Calendar 2014 の 15 日目です。昨日は id:qtamaki さんの”「関数プログラミング 珠玉のアルゴリズムデザイン」をScalaで実装してみる”でした。

今日は、先日に TumblrOSS 化を発表した Scala 製のノンブロッキング I/O (NIO) フレームワーク "Colossus" を紹介したい。”高性能なマイクロサービスを構築するためのフレームワークを謳っており、まだ OSS 化されて日が浅いものの Tumblr ではすでに production で使われているとされる。また、Colossus 自体がアクターフレームワーク Akka のアクターとして実装されており、それを使った独自のスレッドモデルを提供している点も興味深い。

基本的なコンセプト

Tumblr が Colossus を開発した狙いについて、公式サイトの冒頭でこのように述べている。

Colossus は、ノンブロッキングネットワーク I/O を必要とする高性能なアプリケーション構築のための Scala 製軽量フレームワークである。Colossus は特に、低レイテンシのステートレスなマイクロサービス(たいていはデータベースやキャッシュを抽象化したものと大きく変わらない)に焦点を当てている。Colossus は、こうしたユースケースにおいて性能を最大化すると共に、インタフェースをクリーンかつ簡潔に保つことを狙っている。

つまり…どういうことだってばよ?

マイクロサービス・アーキテクチャでは、コンポーネント化された軽量サービス同士を連携させてアプリケーションを組み立てる。つまり、多くのマイクロサービスのビジネスロジックはこのようになっているはずだ:

  1. クライアントからコネクションが張られてリクエストが来る。
  2. 他のバックエンドサービス(データベースとか)にリクエストを飛ばしてレスポンスを待つ。
  3. バックエンドサービスから戻ってきたレスポンスを使ってレスポンスを組み立ててクライアントに返す。

Colossus を使って書くとこんな感じ(Colossus のドキュメントより)。

Service.serve[Http]("http-service", 9000){ context =>
  val redis = context.clientFor[Redis]("localhost", 6379)
  context.handle{ connection => 
    connection.become {
      case request @ Get on Root / "get" / key => redis.send(Commands.Get(key)).map{
        case BulkReply(data) => request.ok(data.utf8String)
        case NilReply => request.notFound("(nil)")
      }
      ...
    }
  }
}

このように、非同期計算を抽象化した型である FuturemapflatMap をつないでいくアプローチは、Scala プログラマにとって見慣れたものだ。

ところで、Colossus において redis.send の戻り値は Callback であり Scala 標準ライブラリTwitter 製ライブラリに含まれているような Future ではない。

CallbackFuture と同様のインタフェースを持つ(し、実際に Future に変換することもできる)が、スレッドセーフではない。これは、Callback は単一のワーカスレッド上で実行されることを前提にしているからだ(あと、Callback を呼び出す準備ができた際に execute() メソッドを明示的に呼び出す必要がある。この呼び出しは通常はフレームワーク側がやってくれる)。

これは、冒頭に挙げたようなマイクロサービスのユースケースを前提にするなら、基本的にはスレッド間で状態を共有する必要がないからだ。また、データベース接続などの複数のコネクション間で共有したい状態については、イベントループごとに保持してコネクション間で共有すればよい(一つのイベントループは多数のコネクションを処理するが、シングルスレッドなのでイベントループ内では競合を心配する必要がない。また、全てはノンブロッキングに処理されるので、あるコネクションがデータベースからの応答を待っている間に、後続のコネクションがブロックすることはない)。

Future を使う場合、多くの実装はマルチスレッド動作のために ExecutionContext 内で処理を実行するが、これによりわずかだがオーバヘッドが発生する。このオーバヘッドが問題になるようなリクエストの処理においては、Colossus では代わりにシングルスレッドで動作する Callback を使うことでレイテンシを削減できる。これは、リクエストのサイズが小さくステートレスな場合に特に有効だと言える。

アーキテクチャ

Colossus の Core Layer は Akka 上で動作するアクターであり、例えばイベントループは下記のような”自分にメッセージを送るアクター”として実装されている(実際のコードはこのあたりにある)。

receive message {
  case `Select` => {
    event_handlers = selector.select()
    ...
    self ! `Select`
  }
  ...
}

ドキュメントから読み取れるアーキテクチャを図示するとこんな感じか:

f:id:okapies:20141215011648j:plain

Colossus 全体を駆動するのが IOSystem で、作成時に ActorSystem と関連付けて指定した数の worker(イベントループ)を作成する。なお、例えばアプリケーションが複数の独立したサーバを実行するような場合は、一つの ActorSystem に複数の IOSystem を関連付けてもよい。

implicit val actor_system = ActorSystem("system")
val io_system = IOSystem("io-system", numWorkers = 4)

IOSystem は、コネクションがやってくるとそれを各 Worker に振り分ける。各 Worker はアクターであり、ひとたび割り当てられたコネクションはずっと同じ Worker に束縛され続ける。つまり、Worker 内のイベントループが呼び出すイベントハンドラはシングルスレッドで動作することが保証される。

では、イベントループ間やコネクション間で状態を共有したい場合や、ブロックするような処理を実行したい場合はどうするか? その場合は、アクターと Future を使ってイベントループの外側で処理すればよい。

プログラミングモデル

サーバのビジネスロジックの実装方法は、関数型っぽい書き方とオブジェクト指向っぽい書き方の二通りがある。まずは関数型っぽい方から:

Service.serve[Http]("service-name", 456){context: ServiceContext[Http] => 
  //everything here happens once per event-loop
  context.handle{ connecton: ConnectionContext[Http] => 
    //everything here happens once per connection
    connection.become {
      //partial function HttpRequest => Response[HttpResponse]
      case req => req.ok("Hello world!")
    }
  }
}

この記法は、実際には下記のような ConnectionHandler の実装と同じことをやっている(ServiceServer は ConnectionHandler の継承クラス):

import com.tumblr.colossus._
import protocols.Telnet._

class HelloWorldHandler(config: ServiceConfig, worker: WorkerRef) 
  extends ServiceServer[HttpRequest, HttpResponse](new HttpServerCodec, config, worker) {

  def processRequest(request: HttpRequest): Response[HttpResponse] = {
    req.ok("Hello World!")
  }

  def processFailure(request: HttpRequest, reason: Throwable) = {
    request.error(s"Error: $reason")
  }
}

テストサーバ、メトリクス、タスク

時間がないので下記を読んでください…。個人的には、最初のリリースからテスト関係や計測関係のユーティリティが含まれているのが素晴らしいなと思う。

まとめ

ドキュメントには他にも色々と紹介したい話が書いてあるんだけど、全然時間が足りなかった…。もしこれを読んで興味が湧いたら、読んでみたりコードを触ってみたりしてください。あと、Issue を見てるとまだまだ成熟してない感じなので、逆に言えば貢献のチャンスかも。

ところで、Scala 製のマイクロサービス向けフレームワークといえば、真っ先に Finagle の名前が思い浮かぶ。以前のエントリでも書いたように、TumblrScala を採用するに至った大きな理由の一つに Finagle の存在がある。

では、ここに至って Tumblr が独自のフレームワークの実装を始めたのはなぜか。特に、性能については折り紙付きの Netty の代わりに Java NIO + Akka で実装しているのは何故なのか。

一つは上で説明したように、特定のユースケースにおける性能を追求していく中で Future の”リッチな”スレッドモデルがオーバヘッドになったこと。そしてもう一つは、明示はされていないが、性能を追求して下のレイヤに手を入れていく際に、Netty の低レベル操作を中心とした API や複雑なスレッドモデルと密結合した Finagle が扱いにくかった、ということがあるのではないかと思う。例えば、Colossus のコンセプトとして掲げられている四つの原則の中にこんな項がある:

Keep it transparent - Choose how much magic you want! Colossus is built in several layers which allow you to build high-level abstractions without being walled off from taking full control.

実際、Finagle の Netty 3 依存について、2014 年度中としていた Netty 4 移行計画が 2015 2Q にズレこむなど、Finagle が Netty に対して相当に密結合している様子が見てとれる。

その一方で、大規模並列処理を実現するためのイベント駆動なミドルウェアやアプリケーションを構築する上で、Akka が使われるケースがかなり広がってきている。例えば、今回紹介した Colossus の他にも、ストレステストツールGatling の実装に Akka が使われているのは有名かと思う。

また、Akka 自身も Netty に相当するレイヤを Akka I/O として再実装すると共に、大規模ストリームをうまく扱うための背圧制御 (back-pressure) の仕組みを加えて "Reactive Stream" として標準化したりと、低レイヤについても手を打ってきている*1

結果として、分散並列化したいネットワーク I/O からビジネスロジックまでを、アクターモデルという同じ抽象の上で統一的に扱えるのが Akka というランタイムの強みとなってきており、そのような扱いやすさがミドルウェアやアプリケーションの作者から支持を集めつつあるのだとすれば、Netty 陣営の”苦境”は Akka 陣営にとってはチャンスなのかもしれない。

*1:念の為に書いておくと、Colossus 自身は今のところ Akka I/O は使っていない。ただし、似たような背圧制御の仕組みを独自で持っているので、この部分を移行するという話もあり得なくもなさそうな…?

"The Reactive Manifesto v2.0" 日本語訳

はじめに

いつの間にか "The Reactive Manifesto" のバージョンが上がって v2.0 になっていたので、さっくりと翻訳。従前よりかなりコンパクトになっている。マニフェストに署名したい方は、公式サイトの一番下の "Sign the manifesto" をクリックしてください。

v1.0 の日本語訳は id:kimito_k さんがこちらで公開されています。

追記【2015/03/16】: 公式サイトに掲載されました

追記【2014/12/27】: 公式へ Pull Request してマージしてもらいました。最新版は以下をご覧ください。

v2.0 での変更点

v2.0 になって変わった点についてはこのへんこのへんに記事がある。

リライトの結果として、リアクティブマニフェストが最終的に実現したい価値は”システムの即応性を保ち続けること”であり、そのために”耐障害性””弾力性”という二つの非機能が必要であり、それら三つを下支えするのが”メッセージ駆動”アーキテクチャである、という論理構成が明確になったと思う。

で、v1.0 で書かれていた細かい話はどこに行ったかというと、用語集という形で別ページに集約されている。

あと、特に明示されてないけど、”リアクティブアプリケーション (reactive application)”という呼称が全て取り除かれて”リアクティブシステム (reactive systems)”に置き換わっている。従来からある概念である”リアクティブプログラミング (reactive programming)”と混同しやすい、という批判に配慮した形なのかな。

The Reactive Manifesto v2.0

異なる分野で活動する組織が、同じようなソフトウェア構築のパターンを独立に発見している。このようなシステムはより堅牢で、より耐障害性があり、より柔軟で、より最新の要求を反映しやすくなっている。

こうした変化が起きているのは、近年、アプリケーションの要求が著しく変化してきているからだ。ほんの数年前、巨大アプリケーションは数十のサーバから構成され、数秒の応答時間と数時間のオフラインメンテナンスを許容し、データは数ギガバイトだった。今日のアプリケーションは、モバイル機器から数千のマルチコアプロセッサによって動作するクラウドベースのクラスタまで、あらゆる機器上に配備される。ユーザはミリ秒の応答時間と 100% の稼働率を期待する。データはペタバイト単位で測定される。昨日のソフトウェアアーキテクチャは、今日の要求を全く満たしていない。

求められているのは、システムアーキテクチャに対する明快なアプローチであると我々は考える。そして、必要な側面の全ては既に独立に認識されている: 求めるものは、即応性と、耐障害性と、弾力性と、メッセージ駆動とを備えたシステムだ。我々はこれをリアクティブシステム (Reactive Systems) と呼ぶ。

リアクティブシステムとして構築されたシステムはより柔軟で、疎結合で、スケーラビリティがある。これによって開発が容易になるだけでなく、変更を受け入れやすくなる。これらは障害に対してより著しい耐性を持ち、たとえ障害が起きても災害を起こすことなく優雅に対処する。リアクティブシステムは高い即応性を持ち、ユーザに対して効果的な対話型フィードバックを与える。

リアクティブシステムとは:

  • 即応性 (Responsive): システムは可能な限りすみやかに応答する。即応性とは使い勝手と実用性の基盤だが、しかしそれだけではなく、問題が素早く検出され効果的に対処できることを意味する。即応性のあるシステムは、迅速で、かつ一貫した応答時間を提供することに主眼を置く。システムは応答時間に信頼性のある上限を確立し、一貫した品質のサービスを供給する。この一貫した挙動によってエラー処理が単純化され、エンドユーザの信頼を醸成し、さらなる相互作用を促す。
  • 耐障害性 (Resilient): システムは障害に直面しても即応性を保ち続ける。これが当てはまるのは高可用性のミッションクリティカルシステムだけではない — 耐障害性を持たないシステムは障害が起きると即応性を失う。耐障害性は、レプリケーション、封じ込め、隔離、そして委譲によって実現される。障害はそれぞれのコンポーネントに封じ込められ、コンポーネントは互いに隔離されるので、システムが部分的に故障してもシステム全体を危険に晒すことなしに回復することが保証される。各々のコンポーネントの回復処理は(外部の)他のコンポーネントに委譲され、また必要な場合はレプリケーションによって高可用性を保証する。コンポーネントのクライアントはコンポーネントの障害への対処に苦しめられることがなくなる。
  • 弾力性 (Elastic): システムはワークロードが変動しても即応性を保ち続ける。リアクティブシステムは、入力の提供に割り当てるリソースを増加あるいは減少させることで入力量の変化に反応する。これは、システムの中に競合する場所や中心的なボトルネックが存在しないように設計し、シャーディングしたりレプリケーションしたコンポーネント間に入力を分散させることを意味する。リアクティブシステムは関連するライブな性能測定を提供することで、予測的かつリアクティブなスケーリングアルゴリズムをサポートする。これらは、コモディティなハードウェアとソフトウェアプラットフォーム上で費用対効果の高い弾力性を実現する。
  • メッセージ駆動 (Message Driven): リアクティブシステムは非同期メッセージパッシングに依ってコンポーネント間の境界を確立する。これによって、疎結合性、隔離性、位置透過性を保証すると共に、エラーをメッセージとして委譲する手段を確保する。明示的なメッセージパッシングは負荷の管理と弾力性を可能とする。また、システム内にメッセージキューを作成して監視し、必要ならバックプレッシャーを適用することでフロー制御が可能になる。通信の手段として位置透過なメッセージングを使うことで、通信がクラスタを跨ぐ場合も単一のホスト内の場合も、同じ構成とセマンティクスで障害を管理できる。ノンブロッキング通信により、受信側はアクティブ時のみリソースを消費できるのでシステムのオーバヘッドを抑制できる。

大きなシステムはより小さなシステムからできているので、故にそれらの構成要素のリアクティブな性質に依存する。リアクティブシステムは設計原則を適用してリアクティブな性質をあらゆる規模で適用し、それら構成要素を合成できるようにする。世界最大級のシステムは、これらの性質に基づくアーキテクチャに依存することで数十億の人々のニーズを日々満たしている。こうした設計原則をその都度再発見するのをやめて、最初から自覚的に適用する時だ。

マイクロサービスが Scala を選ぶ3つの理由

 今年も開催される Scala Advent Calendar 2014 の 15 日目にエントリーしていて、ネタとしては先日 Tumblr が発表した "I/O and Microservice library for Scala" を謳う Colossus をやる予定なんだけど、前振りとして「なぜマイクロサービス化を進めるサービスは Scala を選ぶのか」という話をしてみるエントリ。ちなみに、Advent Calendar の前振りと書いたけど、とりあえず Scala をあまり知らない人向け。



そもそもマイクロサービスって何だっけ?

この記事とかよくまとまってると思います。

マイクロサービスへの移行と Scala

 成功を収めたウェブサービスが、自身のビジネスを持続的にスケールさせるため、巨大化・複雑化したモノリシックなアプリケーションを解体し、単機能のコンポーネントであるマイクロサービスからなるサービス指向アーキテクチャ (SOA) へと移行する…。近年、こんなストーリーを耳にする機会が多い。

 この際、ランタイムをスクリプト言語から JVM へ置き換えたり、特に使用言語として Scala を採用するケースが目立つ。以下は一例だが、名だたる有名サービスが Scala を使ったマイクロサービス化に取り組んでいることがわかる:

なぜ Scala が選ばれるのか?

 なぜ、マイクロサービス化で Scala が選ばれるのか? Scala 移行の事例を見ると、次の三つのポイントが指摘されることが多い。

  1. JVM 言語である
  2. 非同期 RPC フレームワーク "Finagle" の存在
  3. 静的型付き言語である

 以下に一つずつ見て行こう。

1. JVM 言語である

We were enamored by the level of performance that the JVM gave us. It wasn’t going to be easy to get our performance, reliability, and efficiency goals out of the Ruby VM, so we embarked on writing code to be run on the JVM instead. We estimated that rewriting our codebase could get us > 10x performance improvement, on the same hardware –– and now, today, we push on the order of 10 - 20K requests / sec / host.

New Tweets per second record, and how!

 近年、様々なサービスが、自社サービスの再構築にあたって、大規模なトラフィックに耐える性能・信頼性・効率性の要件を達成するために Java VM (JVM) を選んだと証言している。JVM は、過去 10 年以上にも渡ってサンやオラクル等によって大きな開発リソースが投じられてきたこともあって、高い性能と安定性を実現している。

 つまり、JVM 言語として作られた Scala は、当然その恩恵を受けることができる。

 また I/O 性能についても、JVMVM 内のプログラムから OS の低レベル I/O を直接叩ける NIO (Non-blocking I/O) API を備えている。よって、クライアント−サービス間に加えて、内部サービス間の I/O 性能が特に重要となるマイクロサービス・アーキテクチャとも相性が良い。

 また、採用面で技術者の確保が容易である点もしばしば挙げられる。

Changed to a JVM centric approach for hiring and speed of development reasons.

Tumblr Architecture - 15 Billion Page Views a Month and Harder to Scale than Twitter - High Scalability -

 高負荷環境での JVM の運用は GC(ガベージ・コレクション)との戦いになると言われるが、他の言語ランタイムに比べて、そのあたりのチューニングのノウハウを持った人材を確保しやすいということもあるのかもしれない。

2. Finagle の存在

Finagle was a compelling factor in choosing Scala. It is a library from Twitter. It handles most of the distributed issues like distributed tracing, service discovery, and service registration. You don’t have to implement all this stuff. It just comes for free.

Tumblr Architecture - 15 Billion Page Views a Month and Harder to Scale than Twitter - High Scalability -

 Scala 採用の理由として、Twitter が開発した Scala 製非同期 RPC フレームワーク "Finagle" の存在を挙げるサービスは多い。Finagle の興味深い点はたくさんあるが、マイクロサービスのためのフレームワーク、という観点から言うと以下の三つ(性能、プログラミングモデル、運用ツールとの連携)が挙げられる。

性能

 Finagle は、先に挙げた NIO フレームワークの定番である NettyScala ラッパーであり、高い性能を誇る。また、Twitter 自身が Netty の開発に大きくコミットしており、Finagle の開発とも密接に連携している。

 Finagle の高性能は、”バルス祭り”の大規模トラフィックにも耐えうるシステムの構築に大きく貢献した。

Our new stack has enabled us to reach new records in throughput and as of this writing our record tweets per second is 143,199.

Netty at Twitter with Finagle

プログラミングモデル

 マイクロサービス・アーキテクチャでは、各サービスの実装にあたって、必然的に他の内部サービスに対する非同期 RPC のコーディングが必要になるが、このスタイルには厄介な点がいくつもある。

  • ある RPC の結果を使って次の RPC をリクエストするような逐次処理や、並列にリクエストした複数の RPC の結果が揃うのを待って集約するような並列処理を書こうとすると、どうしてもコードが煩雑になる。
  • リクエストした処理が長時間戻ってこない場合、スレッドをブロックしないようにプログラムする必要がある。
  • あらゆるリモートへのリクエストは、想定外の理由で失敗する可能性がある(ネットワーク障害、リモートホストの障害、等々)。したがって、エラー処理やリトライ、タイムアウト等を考慮したコーディングが必要になる。

 Finagle は、非同期 RPC を FutureService/Filter というインタフェースで抽象化する。これらの API は、オブジェクト指向言語であると同時に関数型言語である Scala の特徴をよく活かしたものになっている。

 FutureService、そして Filter を組み合わせると、例えば、以下のようにマイクロサービスを組み合わせてサービスを作る際に、逐次処理と並列処理が絡み合った複雑な非同期 RPC 処理をクリーンかつシンプル、そして安全に記述できる(引用元)。

  1. 認証サービス (AuthService) に問い合わせて、ユーザ認証を行う
  2. タイムラインサービス(TimelineService) に問い合わせて、指定したユーザの Tweet ID の一覧を受け取る
  3. ツイートサービス (TweetService) に各 ID に対応するツイート本文を並列に問い合わせて、全ての結果が戻ってきたら集約してクライアントに返す
val timelineSvc = Thrift.newIface[TimelineService](...) // #1
val tweetSvc = Thrift.newIface[TweetService](...)
val authSvc = Thrift.newIface[AuthService](...)
  
val authFilter = Filter.mk[Req, AuthReq, Res, Res] { (req, svc) => // #2
  authSvc.authenticate(req) flatMap svc(_)
}
  
val apiService = Service.mk[AuthReq, Res] { req =>
  timelineSvc(req.userId) flatMap { tl =>
    val tweets = tl map tweetSvc.getById(_)
    Future.collect(tweets) map tweetsToJson(_)
  }
} //#3
Http.serve(":80", authFilter andThen apiService) // #4
  
// #1 サービスごとのクライアントを作成する
// #2 入ってくるリクエストを認証する Filter を作成する
// #3 認証されたタイムラインリクエストを JSON に変換して返す service を作成する
// #4 認証 filter と service を使って 80 番ポートで動作する HTTP サーバを開始する

 ここではエラー処理が明示的に書かれていないが、上記の処理のいずれかが失敗した段階(認証が失敗した場合とか、タイムラインサービスへの問い合わせがタイムアウトした場合とか)で処理全体が失敗するようになっている。もちろん、明示的に復旧処理を書くこともできる。

 Service の実装は一般的な HTTP (REST) だけではなく Thrift も使える*1し、MemcachedMySQL、あるいは Redis などのデータベース向けプロトコルも用意されている。

 このように様々なプロトコルService として抽象化しているため、アプリケーション非依存な機能である Filter を様々なプロトコルに対して直交的に適用できる。Filter で追加できる機能には、認証やリトライ、そして後述するモニタリングやトレースといったものがある。

運用ツールとの連携

 RPC ベースの分散システムへ移行する際の厄介事は、運用・監視の面でもたくさんある。例えば、以下のうちいずれを欠いても、多数のノードで構成される複雑なマイクロサービス群の運用やデバッグは難しくなる。

 また、個々の開発者が、担当するマイクロサービスに対してこれらのツールを簡単に組み込める必要がある。

 Finagle には、ZooKeeper や分散トレースシステムの Zipkin 等と連携する機能が最初から組み込まれている。つまり、必要な設定を追加してやるだけで、ZooKeeper を使ってクラスタを組み、リクエスト数などのメトリクスをリアルタイムに監視し、各ノードでリクエストの処理にかかった時間を Zipkin へ自動的に集約して可視化したりできる。

3. 静的型付き言語である

運用が必要なシステムで1万、2万行越えだすと、静的型付けであることによる保守性の高さは、結果的にコンパイル時間等を払拭できるほどの安全性、生産性、心の平安を生むと思っていて、要は静的型付けである事は非常に価値があって、特にテストが難しいテンプレート(twirl)の静的型付けは素晴しいという事を言いたかった。

ScalaMatsuri 2014 で「国技と Scala」というタイトルで発表しました - sandbox

 マイクロサービス化の究極的な目的は、ビジネス要求の変化に合わせて継続的にサービスを更新できる体制を作ることにある。つまり、Scala の静的型付け (static typing) が提供する保守性(≒安心してコードを書き換えられる性質)は、常に変化を必要とするようなシステムでこそ大きなメリットがある。

余談

 後日に紹介予定の Colossus は、TumblrFinagle に相当するものを自分たちの要件に合わせて作ったものだと考えると良さそう。

 また、Finagle が Netty ベースであるのに対し、Colossus は Scala 言語の開発元が作っている Akkaアクターモデルの分散フレームワーク)ベースで、さらに Akka 自身が Netty に相当するレイヤ (Akka I/O) の再実装を進めていたりと、そうした「フレームワーク同士の競争」という野次馬的な面でも興味深い。

*1:Twitter の内部サービス同士のやりとりは主に Thrift を使っているようだ。

Future/Promise はいつモナドになったのか

「非同期計算をモナドで合成し、依存関係に従ってパイプライン化する」というアイデアはいつ誰が提案したのか、というのを調べてみたけどよく分からなかった記録。網羅的な調べ方はしてないので、何か知ってる人がいたら教えてください。

明示的 vs. 暗黙的

id:xuwei さんに教えて頂いた Wikipedia の記事によると「まだ完了していない計算結果へのプロキシオブジェクト」というコンセプトが FuturePromise と名付けられたのは 1976〜1977 年頃らしい。

1976 年に出た Daniel P. Friedman と David Wise の論文や Peter Hibbard の論文で言及されていた Promise(あるいは Eventual)は明示的 (explicit) に使うものだった。つまり、Java の(Completable じゃない方の)Future のように、promise から値を取り出すのに get のようなメソッドを呼ぶ必要があるということ。

一方で、アクターモデルの研究者である Henry Baker と Carl Hewitt による論文(1977 年)が言及している Future は、そのまま普通の参照のように使える暗黙的 (implicit) のものとされていた。

Promise Pipelining

次に、Promise 同士をつなぎあわせてパイプライン化しましょうというアイデアの登場は、Promise の発明から約 10 年後、1988 年の Barbara Liskov と Liuba Shrira の論文を待つことになる。また、同様のアイデアは、一部のアレゲな紳士諸君に著名な Xanadu プロジェクトの一環として Mark S. Miller*1 や Dean Tribble らからも 1989 年に提出されている。

ただ、これらの Promise Pipelining に関する論文の主眼は、同一マシン上に配置した Promise 間のネットワーク通信を削減したりして性能を向上させることなので、「非同期計算の合成」という点に限れば、もっと以前からアイデアはあったのかもしれない。

また、当時、これらの論文のまともな形の実装が世に出ることは無かったらしい。Wikipedia では、Promise Pipelining を実装した処理系として、後に Miller らが 1997 年に発表した E 言語 や、同様に Tribble らが 1996 年に発表した Joule 言語が挙げられている。

ちなみに、E 言語での記法はこんな感じ(x <- a() は「x にメッセージ a() を送る」と読ます):

t3 := (x <- a()) <- c(y <- b())

これを展開するとこうなる:

t1 := x <- a();
t2 := y <- b();
t3 := t1 <- c(t2);

どことなく Reactive Programming っぽい…?

JavaScript の Deferred

「Deferred に A => Deferred[B] を受け取って Deferred[B] を返すメソッド then() を持たせましょう」みたいなのは JavaScript ではかなり以前からあり、CommonJS では、先行例として MochiKitDojo's Deferred挙げている。少なくとも 2006 年頃にはあった模様。Twisted の Deferred は「明示的」なスタイルなのでちょっと違うかな。

最近は、Promises/A+ 仕様として標準化されて Thenable と呼ばれている。

Future/Promise モナド

結論から言うと、少なくとも Scala の実装においては、「Future/Promise はモナドである」というようなことを書いてる論文等があったり、それを参照して実装したりした、というわけではなさそう。

まず、Akka において公開レポジトリで辿れる最古の Future 実装を見ると完全に「明示的」なスタイルで、mapflatMap も見当たらない。

sealed trait Future[T] {
  def await : Future[T]
  def awaitBlocking : Future[T]
  def isCompleted: Boolean
  def isExpired: Boolean
  def timeoutInNanos: Long
  def result: Option[T]
  def exception: Option[Throwable]
}

その後 map追加されたりしたけど、ちゃんとモナドであることを意識した実装になったのはこの辺(2011 年 2 月)。下記のディスカッションによると、提案者の当初のモチベーションとしては、Akka に Scalaz の(ような?)型クラスを導入したいということだったらしい。

ちなみに、Twitter-Util の Future は、2010 年 8 月の公開当初からモナディックになってる。Marius さんは、Twitter の非同期処理系の API について Concurrent ML を参照しているとか言ってたので、あるいはそっち方面からアイデアを得たのかなぁ…?

追記: Scalaz の昔のコードを調べたらもっと古い実装があった(2009 年 5 月)。Scala 界隈だとこれが最古? どちらにせよ、リファクタリングの一環で自然に入ってきた的なノリを感じる。

まとめ

Haskell 方面の歴史を知ってれば一発なのかもしれないけど、Haskell の Future/Promise モナドに相当する型クラスってどれなんですかね…。Continuation モナド

追記

この Par モナドに成功/失敗の文脈を組み合わせると近いのかな。

追記2

*1:Wikipedia によると、今は Google のひとで、ECMAScript の仕様策定をやってる TC39 のメンバーでもあるらしい。

Java/Scala で風景から歩行者を消してみる

一昨日くらいにホッテントリ入りしてた記事↓を見て、

Export["result.jpg",
 Image[Mean[Map[ImageData,
    Import["movie.mov", "ImageList"]]]]]

このくらいのコードで済むなら Java/Scala でもすぐに書けるかも? と思ってやってみた。

理想

ヤッター、こんなに簡単にできたよー^^

import opencv._                               // ← ん?
System.loadLibrary(Core.NATIVE_LIBRARY_NAME)  // ← んんん???
saveImage("result.jpg", loadVideo("movie.mov")(mean))

現実

Isolator requires Java bindings for OpenCV. 

$ curl -OL https://github.com/Itseez/opencv/archive/2.4.9.zip -o opencv-2.4.9.zip
$ unzip opencv-2.4.9.zip
$ cd opencv-2.4.9
$ mkdir build
$ cd build/
$ cmake -DBUILD_SHARED_LIBS=OFF ..
$ make -j8
import org.opencv.core.{Core, CvType, Mat, Scalar, Size}
import org.opencv.highgui.{Highgui, VideoCapture}

package object opencv {
  ...
  def mean(frames: Iterator[Mat]): Mat =
    if (frames.hasNext) {
      val head = convertTo(CvType.CV_64FC3)(frames.next)
      val (count, out) =
        frames.
          map(convertTo(CvType.CV_64FC3)).
          foldLeft((1, head)) { case ((cnt, sum), f) => (cnt + 1, add(sum, f)) }

      divide(out, new Scalar(count, count, count))
    } else {
      new Mat
    }

  def loadVideo[A](filename: String)(f: Iterator[Mat] => A): A = {
    val cap = new VideoCapture(filename)
    try {
      f(Iterator.continually(nextFrame(cap)).takeWhile(_ != None).map(_.get))
    } finally {
      cap.release()
    }
  }
  ...
  def saveImage(filename: String, m: Mat) = Highgui.imwrite(filename, m)
}

^^;;;

考察

https://raw.githubusercontent.com/okapies/isolator/master/examples/mean-opencv.jpg https://raw.githubusercontent.com/okapies/isolator/master/examples/mean-opencv.jpg

Mathematica、画像や動画の読み書きをネイティブサポートしているのはホントにいいなぁ、という感想。

なんか、動画を扱える Pure Java の成熟したソリューションって未だにあんまりないらしくて、したがって OpenCVJava から叩くのが一番堅実な選択肢ということになり、その道の先には jar をソースからビルドする楽しい作業が待ってたり、コーディング時もリソースの明示的な解放をサボると一瞬でメモリが爆裂したりと、色々めんどくさい…。

コード

今回書いたコードは GitHub に置いておきますので、世紀末ごっこして遊ぶなり改造するなりどうぞ。

あと、mean のアルゴリズムはもう少しマシな方法があるように思うので、OpenCV に詳しい方、どなたかご教示くださいませ…。

追記

@colspan一晩でやってくれました

非同期ストリーム処理の標準化を目指す "Reactive Streams" とは

TL でこんなのが流れてたので少し調べてみた。

Reactive Streams って?

JVM 上でのノンブロッキングなバックプレッシャーを持つ非同期ストリーム処理の標準の提案”(公式サイトより)。

ざっくり言うと、既にある JVM ベースの様々な非同期ストリーム処理フレームワーク実装の共通部分を括りだして API 化、SPI 化しようというもの。最終的には JSR での標準化を目指している。

ここで言う”非同期ストリーム処理”とは、(広義の)リアルタイム性が求められるデータ処理中心のアプリケーション、より具体的にはビデオストリーミングや数百万ユーザのトランザクション処理など。

下のインタビュー記事では、Reactive Streams を策定する理由として、API 策定による相互運用性の向上と共に、バックプレッシャー (back pressure) による流量制御の必要性が挙げられている。

以上から分かるように、この "API" はユーザが直接使うものではなく、各フレームワークが、この API を使ってエンドユーザ向けの DSL を提供するためのもの。これにより、異なるフレームワーク間の相互運用性を担保できる。逆に言うと、ユーザがデータ変換や分割・結合を記述する方法は守備範囲ではない

なお、この仕様が出てきた背景としては、去年の秋くらいから "Reactive Manifesto"日本語訳)というキャンペーンが始まっていて、Reactive Streams はその流れに連なっていると思われる。

どんな仕様なの?

概要がこの辺に、より詳細な仕様がこの辺に書かれている。

現状、SPI コンポーネントの構成要素は以下の三つ。普通の Pub/Sub だけど、バックプレッシャーを伝える方法が定義されているのが特徴?

  • Publisher
    • Subscriber からの要求に応じて、潜在的に無制個の順序付けされた要素を提供する。Publisher は複数の Subscriber に配信することができ、処理率に応じて配分する。
    • これ以上、要素を提供できないときは Subscriber の onComplete メソッドを呼ぶ。
  • Subscriber
    • 一つ以上の Publisher を購読して、順序付けされた要素ストリームを受け取る。Producer が Subscriber に要素を渡すときは onNext コールバックを呼ぶ。Subscriber は、Producer をブロックせずに非同期処理するかキューイングする必要がある。
  • Subscription
    • Subscriber から Producer へ要求を伝えるときに使う。Subscriber が Subscription の requestMore(int) メソッドを呼ぶと、Publisher は 制限時間 T が経つ前に最高 N 回まで onNext メソッドを呼び出せる。

API レベルでは、上記の Publisher に対応する型として Producer が、Subscriber に対応するものとして Consumer が定義される。また、両者を組み合わせて入力と出力の両方を行う Processor も提供される。

【追記】組み合わせるとこんな感じ?

誰が関わってるの?

Scala 言語Akka フレームワークの開発をしている Typesafe 社が中心となって仕様策定を進めている。Akka 開発者による解説記事はこちら

それ以外にも、以下のような人々やプロダクトが関わっている。

JVM エコシステムの各方面で実績のある人物・プロダクトが、ズラッと一堂に会しているのが印象的。特に、Oracle の人や Doug Lea 氏が関わっている辺りを見ても、標準化に対する本気が伺える。(元)Microsoft の人が一枚噛んでるのも興味深い。

批判とか

Reactive Streams の”思想的根拠”である Reactive Manifesto に対する批判。たしかに、”そもそも "Reactive" って何だよ” というのがよく分からない感じはある。

ただ、Akka というプロダクトは明らかに Erlang/OTP に対するリスペクトから出てきたものだし、本人達もそれを隠しているわけではない(例えば、Akka の公式サイトの名前は "Let it crash")。アクターモデルを広めたいという意図こそあれ、Erlang の成果を横取りしようとしている、というのは考え過ぎじゃないかなぁと。【追記】: @pokarim さんから「(引用した発言に)そのような意図はない」とご指摘を受けたので訂正します。失礼しました。

まとめ

非同期ストリーム処理フレームワークは乱立気味、というか思い切り乱立しているので、使う側からすると「標準化してくれるのは助かるなぁ」という感想。

とりあえず、これから出てくる実装を開発者が各々の立場から調べて、フレームワークの相互運用性や流量制御といったフィーチャーが、自分たちのユースケースに対してどの程度メリットがあるのか判断していけば良いのではないかと思う。

あと、そういえば Storm に声は掛かってないんだろうか? 明らかにこのモデルに乗っけられそうだけど。