Akka HTTP クライアントを使う

この記事は Scala Advent Calendar 2017 の三日目です。前回は @poad1010 さんの「JupyterでScala」でした。

Akka HTTP を使って REST API を叩いてみようと思って色々と試したメモ。基本的には下記のドキュメントを読めば良いのだけど、サーバ側はともかくクライアント側の使い方について日本語で書かれたものが少ないので、使う側の目線での話を書き残しておくのも良いでしょうという感じ。ここでは、最も簡単な API である Request-Level Client-Side API について話をする。

doc.akka.io

少し難しい話になるが、簡単に使いたい場合は、記事の最後にラッパーコードを貼っておくのでコピペして使ってもらうと良いかと思う。

HTTP ボディを取得する

ドキュメントを見ると、初っ端からこういう感じで脅されるので嫌な予感がすると思うが、その予感は的中する。

「ストリームが一級市民」ではない HTTP クライアントに慣れている人は、最初に Akka HTTP クライアントの裏側にあるフルスタックなストリーミングの概念について説明した「リクエストとレスポンスのエンティティがストリームの性質を持つとはどういう事か」の章を読むことをお勧めします。

とは言っても、レスポンスを受け取るところまでは特に難しいところはなく、単に Http().singleRequest()HttpRequest を渡すだけだ(以下では、スコープに implicit な ActorSystem, Materializer, ExecutionContext が入っていることを前提に話を進める):

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = ...

val request = HttpRequest(uri = Uri("http://example.com/api/v1/users"))
val response: Future[HttpResponse] = Http().singleRequest(request)

ややこしいのはレスポンスボディをメモリに読み込むところで、HttpResponse を見てもそれらしいメソッドが見当たらない。結論から言うと、以下のようにする必要がある。

val body: Future[String] =
  response.flatMap(_.entity.dataBytes.runFold(ByteString.empty)(_ ++ _).map(_.utf8String))

なぜこういう書き方をする必要があるかというと、HttpEntity から取得できる dataBytes は Akka Streams のストリーム (Source[ByteString, Any]) だから。

Akka HTTP はストリームベースのライブラリなので、サーバから到着したレスポンスを全てメモリに貯めこんでからユーザに引き渡すのではなく、バイト列が到着するたびにイベントを発火して処理する。これにより、例えば巨大なデータファイルや終端がない (unbounded) サーバログのようなようなレスポンスを効率的に扱うことができ、細かい制御(チャンクの区切り方とか)も容易になる。

一方で、従来のように文字列として全体を一回でアクセスできるようにするには、

  1. 受信した複数のバイト列を全て足しあわせて一つのバイト列に畳み込む (.runFold(ByteString.empty)(_ ++ _))
  2. バイト列を文字列に変換する (.map(_.utf8String))

という処理を明示的に記述する必要がある(ちなみに、1 と 2 を逆にすると多分文字化けが起きる。理由は考えてみよう)。分かってしまえば大した話ではないが、カジュアルな API を期待していると面食らうのは確かだ。

もう一つ注意点があり、Akka HTTP は送信側のデータ送信量を TCP レベルでスロットリング (TCP back-pressure) しているので、あるレスポンスのエンティティを消費せずに途中で放り出してしまうと、その TCP 接続の送信が詰まってしまい、その接続を利用している他のリクエストの処理に影響が出る。したがって、エラーの場合でも必ず response.discardEntityBytes() を呼び出す必要がある(将来的には自動検出できるようにしたいらしい)。

なお、全てのレスポンスをメモリに読み込む方法はもう一つあり、以下のように toStrict(timeout) を呼ぶと Future[HttpEntity.Strict} を取得できる。Strict を使うと受信したバイト列を集約した data にアクセスできるので、以下のように書ける:

import scala.concurrent.duration._
val timeout: FiniteDuration = 10.seconds
val body: Future[String] = response.flatMap(_.entity.toStrict(timeout).map(_.data.utf8String))

toStrictタイムアウトを指定する必要があるが、レスポンスを待つ時間を明示的に指定するならこちらを使うべきだろう(runFold を使う場合は idle timeout が成立するまで待つ)。

また、タイムアウトFiniteDuration という型名が示すとおり「無限」は指定できない。「タイムアウトが無限」はナンセンスである、というライブラリ作者たちの意思表示なので、いつまでも待ち続けたいお気持ちを表明する場合は適当に 100 万秒とかを指定すると良い。

JSON を変換 (unmarshal) する

メモリに読み込んだ JSON 文字列をアプリケーション内部の形式に変換するには、Akka HTTP が提供する Unmarshal(...).to[A] を使う。ただ、Unmarshal 自体は単なるラッパーで、具体的な処理は他の JSON パーサーに委譲する仕組みになっている。なので、好きなライブラリを直接使っても構わない。

標準では spary-json のラッパーが提供されている。使うには、ライブラリの依存関係に akka-http-spray-json を追加する。

libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.11"

例として、JSONUser 型にマッピング (jsonFromat2()) して Unmarshal してみる。akka-http-spray-json を使うには SprayJsonSupport の配下の implicit をスコープに入れる。こんな感じだろうか:

import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json._

case class User(id: Long, name: String)

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val UserFormat = jsonFormat2(User)
}

class FooClient() extends JsonSupport {

  val res =
    Http().singleRequest(HttpRequest(uri = Uri("http://example.com/api/v1/users/1")))
  res.entity.dataBytes
    .runFold(ByteString.empty)(_ ++ _)
    .flatMap(bs => Unmarshal(bs.utf8String).to[User])

}

spray-json ではなく、他の Jackson や circe などのライブラリを使いたい場合は、それぞれに対して Unmarshaller を実装した akka-http-json というサードパーティのライブラリの開発が進んでいる。

github.com

ラッパーを作る

以上のノウハウをまとめて、こういう感じのラッパーを作っておくと便利なのではないかと思う。言うまでもなく、このコードで 10 GB のデータファイルとかを受け取るとヒープメモリが爆発四散するので、その場合はストリームの作法に従って書こう。

def doRequest[A](req: HttpRequest, unmarshal: String => Future[A]): Future[A] =
  Http().singleRequest(req).transformWith {
    case Success(res) if res.status == StatusCodes.OK =>
      res.entity.dataBytes
        .runFold(ByteString.empty)(_ ++ _)
        .flatMap(bs => unmarshal(bs.utf8String))
    case Success(res) =>
      res.discardEntityBytes()
      Future.failed(new Exception(s"HttpRequest failed: $res"))
    case Failure(t) => Future.failed(t)
  }

追記: UnmarshalHttpEntity を直接与えることで、より短く書くことができる:

def doRequest[A](request: HttpRequest)
                (implicit unmarshaller: FromEntityUnmarshaller[A]): Future[A] =
  Http().singleRequest(request).transformWith {
    case Success(res) if res.status == StatusCodes.OK =>
      Unmarshal(res.entity).to[A]
    case Success(res) =>
      res.discardEntityBytes()
      Future.failed(new Exception(s"HttpRequest failed: $res"))
    case Failure(t) => Future.failed(t)
  }

これは、FromEntityUnmarshaller[A] の実装がバイト列の集約を実装している Unmarshaller.byteStringUnmarshaller を呼び出すため(実装を見るrunFold(...) による畳み込みをしているのが分かる)。

まとめ

Akka HTTP はドキュメントが充実しており、ライブラリの思想や使う上で必要なことは概ね書かれている。また、機能的、非機能的な制約が型として埋め込まれており、自分がどんなコードを書こうとしているのか、常にプログラマに意識させるような作りになっている。

ただ、それはごまかしが効きにくいということでもある。「サーバからデータをガサッと持ってきてババッて変換してさー」的なやり方は技術的にはいくつも穴があるが、ちょっとした作業では不問に付したい場合も多い。なので、Akka HTTP は日々の作業をアドホックにこなしていくツールとしては使いにくい点が多い。ただ、堅牢で高性能なアプリケーションを書く上では強力な道具になりうると感じた。

最近、SoftwareMill が開発する sttp が 1.0 になった。これは、akka-http や async-http-client などをラップしてシンプルな API を提供することを目的としている。こうしたライブラリを、作業に求められている品質に応じて使い分けていくのが良いのではないかと思う。

github.com

参考文献