Don't Repeat Yourself

Don't Repeat Yourself (DRY) is a principle of software development aimed at reducing repetition of all kinds. -- wikipedia

作って学ぶ、cats.effect.IO モナドのしくみ

今回は cats.effect.IO (以下、 IO ) を理解する上で最低限必要となる*1実装を選定して、cats を参考に IO を実装してみました。具体的には、下記の機能を実装してみます。

  • IO#pure
  • IO#delay or IO#apply
  • IO#raiseError
  • IO#map
  • IO#flatMap
  • IO#unsafeRunSync

この記事のサンプルを実装し切ると、最終的にはたとえば次のようなコードが動きます。

object Main extends App {

  val io = for {
    num <- IO.pure(123)
    numStr <- IO.pure(num.toString)
    withHello <- IO.pure(s"${numStr}, Hello!")
    _ <- IO(println(withHello))
  } yield ()

    // "123, Hello!"
  io.unsafeRunSync()
}

逆に、紹介するにとどめて終わりそうな機能は下記です。

  • IO#async: 非同期処理を開始することができます。
  • IO#start: これを使用するとグリーンスレッドを扱うことができるようになります。実装的には、上記の async と、ExecutionContext の切り替え機能 (IO#contextShift) と、Fiber を実装すれば実現できます。

もちろん、IO では、ポーリング時に保有することになるスタックの大きさと深さの細かい管理が必要です。今回は、パフォーマンス面はまったく追求せず、単に IO の中身のコンセプトを理解するという点に主眼を置いています。

IO とは

遅延評価つき Future

IO は、もともとは Haskell にある概念です。IO[A] は、評価されるときに、A 型の値を返す前に副作用のある振る舞いをする計算であることを表現する型です。IO の値は純粋でイミュータブルであり、そのため参照透過です。

IO を使うことで、同期計算あるいは非同期計算を記述できます。for-yield でつないで書いていくことができますし、また内部処理をキャンセル可能にすることもできます。

もし、一般的に Scala で使用される概念である Future をご存知でしたら、IO は遅延評価つき Future といった対応関係にあります*2

ステートマシンである

実は、IO ( Future も) は、ステートマシンなのです*3。IO は、中のステート = 状態が受理になるまで遷移していき、受理になった時点で unsafeRunSync などの値を取り出す動作を行うと、値を取り出せるという仕組みになっています。

事実、cats の IO は下記のようなデータ構造をもっています。実際の IO.scalahttps://github.com/typelevel/cats-effect/blob/master/core/shared/src/main/scala/cats/effect/IO.scala#L1483

  private[effect] final case class Pure[+A](a: A)
    extends IO[A]

  private[effect] final case class Delay[+A](thunk: () => A)
    extends IO[A]

  private[effect] final case class RaiseError(e: Throwable)
    extends IO[Nothing]

  private[effect] final case class Suspend[+A](thunk: () => IO[A])
    extends IO[A]

  private[effect] final case class Bind[E, +A](source: IO[E], f: E => IO[A])
    extends IO[A]

  private[effect] final case class Map[E, +A](source: IO[E], f: E => A, index: Int)
    extends IO[A] with (E => IO[A]) {

    override def apply(value: E): IO[A] =
      new Pure(f(value))
  }

  private[effect] final case class Async[+A](
    k: (IOConnection, Either[Throwable, A]  => Unit) => Unit,
    trampolineAfter: Boolean = false)
    extends IO[A]

  private[effect] final case class ContextSwitch[A](
    source: IO[A],
    modify: IOConnection => IOConnection,
    restore: (A, Throwable, IOConnection, IOConnection) => IOConnection)
    extends IO[A]

多くの代数データがあります。cats では、同期処理であれば IORunLoop#step、非同期処理であれば IORunLoop#loop を使って、これらの代数データを遷移させていきます。これが、IO がステートマシンである所以なのです。

実装してみる

実装していきます。この IO は並行性を獲得できてはいませんが、IO の挙動の学習用には同期処理側を学ぶのが複雑性が少なく最適だと思うので、あえて実装していません。

用意した代数データ

今回は Pure, Delay, FlatMap, Map, そして RaiseError を切り出して用意しました。

object IO {
  final case class Pure[+A](a: A) extends IO[A]
  final case class Delay[+A](thunk: () => A) extends IO[A]
  final case class RaiseError(err: Throwable) extends IO[Nothing]
  final case class FlatMap[E, +A](original: IO[E], f: E => IO[A]) extends IO[A]
  final case class Map[E, +A](original: IO[E], f: E => A, index: Int)
      extends IO[A]
      with (E => IO[A]) {
    override def apply(value: E): IO[A] = Pure(f(value))
  }
}

https://github.com/yuk1ty/scratch-io-monad/blob/master/src/main/scala/effect/IO.scala

状態遷移を見てみる

今回は cats の IO を参考に、IOExecutor というクラスを実装してみました。というか、まるっと切り出しただけです。このクラスは、IO の状態遷移をかけつつ、状態に応じた関数適用や値の取り出しを逐次行っていくクラスです。

実装はこちらにあります→ https://github.com/yuk1ty/scratch-io-monad/blob/master/src/main/scala/effect/internal/IOExecutor.scala

IOExecutor#step と代数データを見比べながら、処理を少し理解してみましょう*4

登場人物を整理する

step 関数内の登場人物を少し整理します。

object IOExecutor {

  private[this] type Current = IO[Any]

  private[this] type Bind = Any => IO[Any]

  private[this] type CallBack = Either[Throwable, Any] => Unit

  private[this] type CallStack = mutable.ArrayStack[Bind]

  def step[A](source: Current): IO[A] = {
    var currentIO = source
    var bindFirst: Bind = null
    var bindRest: CallStack = null
    var hasUnboxed = false
    var unboxed: AnyRef = null
(...)
  • currentIO: 現在処理中の IO を指し示します。
  • bindFirst: 次にやってくる予定の処理を一時的に保管しておくための変数です。
  • bindRest: 2個、3個と bindFirst が積み上がることは容易に想像されます。それを保管するためのスタックを用意しています。
  • hasUnboxed: 中身が評価されたかどうかを制御しています。Delay か Pure に到達すると true になります。後続の処理で使用するためのフラグ。
  • unboxed: 評価された値を一時保存しておきます。

Pure に到達すると受理状態になる

IO は、状態遷移した後に Pure あるいは RaiseError に到達すると受理状態になります。つまり、この step 関数のなかで行っている処理は、再帰的に代数データの中身をたどっていき、Pure 状態 (例外時は RaiseError に寄せられます) に到達するまで状態遷移を繰り返していく処理です。

たとえば次のような IO の処理を組み立てたとしましょう。

for {
    num <- IO.pure(123)
    plusOne <- IO.pure(num + 1)
    plusThree <- IO.pure(plusOne + 3)
} yield plusThree

生成される代数データは次のようになっています。

FlatMap(Pure(123),Main$$$Lambda$2/892529689@6b9651f3)

中身について少し見ていきましょう。FlatMap と Delay で囲まれていることがわかります。FlatMap の代数データは、先ほどの代数データ一覧であったように、元のデータが左側に入っていて、これから連結したい対象の処理が右側に入っているという構図です。Main$$Lambda... については、左は「num + 1」、右は「plusOne + 3」が入っています。for-yield は flatMap のシンタックスシュガーなためです。flatMap の中身は Function ですよね。

この代数データに対して step 関数を適用して処理を実行していくと、次のような遷移をします。

// IO.pure(123) と IO.pure(num + 1) が入っている。
FlatMap(Pure(123),Main$$$Lambda$2/892529689@6b9651f3) 
// IO.pure(123) をたどっている。
Pure(123)
 // num + 1 適用後値と、IO.pure(plusOne + 3) が入っている。
FlatMap(Pure(124),Main$$$Lambda$6/1728790703@12f41634)
// IO.pure(123) をたどっている。
Pure(124) 
// plusOne + 3 をたどっている。
<function1>
// ↑が適用された 
Pure(127)
// 受理状態になった
Pure(127) 

Delay の挙動

公式ドキュメントに習って、IO は、遅延評価つきの IO であると先ほど紹介しました。その根幹を担うのがこの Delay という代数データです。IO#apply をすると基本的に Delay に状態遷移するようにできています。

すこしわかりにくいですが、Delay と Pure を織り交ぜて見ると、下記のような動きをします。

for {
    num <- IO.pure(123)
    plusOne <- IO(num + 1)
    plus3 <- IO(plusOne + 3)
} yield plus3
FlatMap(Pure(123),Main$$$Lambda$2/892529689@6b9651f3)
Pure(123)
FlatMap(Delay(Main$$$Lambda$6/1415157681@16f7c8c1),Main$$$Lambda$7/641853239@2f0a87b3)
Delay(Main$$$Lambda$6/1415157681@16f7c8c1)
<function1>
Delay(Main$$$Lambda$9/1338841523@b3d7190)
Pure(127)

Delay は、保持する値 (thunk) を評価せずに、一度遅延状態で保持する状態です。Delay は、step 関数に入れられてはじめて thunk の評価がかけられ、値が評価されていきます。

case Delay(thunk) => {
    Try {
       unboxed = thunk().asInstanceOf[AnyRef]
       hasUnboxed = true
       currentIO = null
    }.recover {
        case NonFatal(err) =>
          currentIO = RaiseError(err)
    }.get
}

unboxed に標準出力関数を噛ませて、中身の状態を見ていきましょう。

FlatMap(Pure(123),Main$$$Lambda$2/892529689@6b9651f3)
Pure(123)
FlatMap(Delay(Main$$$Lambda$6/1415157681@16f7c8c1),Main$$$Lambda$7/641853239@2f0a87b3)
Delay(Main$$$Lambda$6/1415157681@16f7c8c1)
delay unboxed: 124
<function1>
Delay(Main$$$Lambda$9/1338841523@b3d7190)
delay unboxed: 127
Pure(127)

きちんと関数適用が行われていたことがわかりました。

つなげる FlatMap

IO#flatMap をすると、FlatMap 状態に遷移します。この代数データの step 関数側の処理を見てみると、次のようなことをしています。

    do {
      currentIO match {
        case FlatMap(fa, bindNext) => {
          if (bindFirst != null) {
            if (bindRest == null) {
              bindRest = new mutable.ArrayStack()
            }
            bindRest.push(bindFirst)
          }

          bindFirst = bindNext.asInstanceOf[Bind]
          currentIO = fa
        }
(...)

FlatMap は一度パターンマッチに入ってくると、まず次に評価予定の内容をスタックの一番上に詰め込んでおきます。その後、現在の評価中 IO を自身のもともと評価予定だった IO に変え、次のループでその中身を処理していきます。これが、flatMap によって IO モナドがまるでひとつであるかのように連結されていくように見える所以です*5

flatMap を二重に重ねてしまう→「発火しない!」

実際にプロダクションで IO を使っていて、不意に実装中に flatMap が2回重なってしまって、IO が発火されずに困っているケースが散見されました。なぜこれが起きるかも、ステートマシンの動きを見ていくとわかります。

for {
    num <- IO.pure(123)
    plusOne <- IO(num + 1)
    plus3 <- IO(IO(plusOne + 3)) // 2回重ねてみた
} yield plus3

さて、ステートマシンの動きはどうなっているかというと…

FlatMap(Pure(123),Main$$$Lambda$2/892529689@6b9651f3)
Pure(123)
FlatMap(Delay(Main$$$Lambda$6/1415157681@16f7c8c1),Main$$$Lambda$7/641853239@2f0a87b3)
Delay(Main$$$Lambda$6/1415157681@16f7c8c1)
<function1>
Delay(Main$$$Lambda$9/1338841523@5d11346a)
Delay(Main$$$Lambda$11/2050404090@17211155)
Pure(Delay(Main$$$Lambda$11/2050404090@17211155))

Pure(Delay(...)) となってしまっていることがわかります。内側の Delay は関数適用が行われていないために評価されておらず、結果値を取り出せていません。

「そんなケースあるのか?」という話なのですが、プロダクションで見たケースとしては、たとえば IO(num + 1) の間に KVS への保存処理などを含んでいたとします。下記のように書いたとしましょう。

// num は外で定義されているものとします

// KvsRepository.scala
object KvsRepository {
    def insert(i: Int): IO[Unit] = IO(kvs.set(i)) // kvs への保存処理
}

// Main.scala
IO {
    KvsRepository.insert(num)
    num + 1
}.unsafeRunSync()

この場合は発火しませんね。なぜなら、KvsRepository 内の処理は Delay になっているからです。したがって、この insert は発火されず、num + 1 の結果だけが返ります。

IO の使い所の注意としては、きちんと最終的に Pure 状態になるようにコンビネータをつなげ続ける必要があるという点です。先ほどの KVS への保存の例のコードを修正するとしたら、次のようになるでしょう。

// num は外で定義されているものとします
val task = for {
    _ <- KVSRepository.insert(num)
    n <- num + 1
} yield n

task.unsafeRunSync()

発火しない問題には注意が必要です。が、かといって、副作用を含む処理を IO#pure に入れるのは、たしかに即時評価を起こせますが IO の本来の使い方や目的とは違っています。なので、面倒臭がらずに、きちんとコンビネータをつなげることを私はおすすめします。発火しない問題が起きた際には、一度 IO モナドを print するなどして、現在の代数データの状況がどうなっているかを確認するとよいです。

まとめ

  • IO はステートマシンである。
  • このステートマシンは、Pure か RaiseError になると受理状態になる。

リポジトリ

今回記事の長さの都合 (とわたしの気力の都合) でこの記事では解説しきれなかった、例外処理側や step 関数の実装全体もこのリポジトリに置いてあります。本当は RaiseError 時に復旧手段として用意されている IOFrame も解説するべきだったかもしれません…。

実装そのものはそこまでハードでもないですし、IO モナドを深く理解したい方には一度フルスクラッチしてみるのはとてもおすすめだと私は思いました。もともとは会社のチームメンバーのみなさんに使ってもらえるようにリポジトリを用意しましたが、2020年のチャレンジにどうぞ。

github.com

参考

*1:というか、私がよく使っているだけとも言うけれど

*2:ちなみに Future パターンに関する解説はこちらに詳しく載っています。

*3:この話については、この記事がとてもわかりやすいです→ステートマシン抽象化としてのFuture | κeenのHappy Hacκing Blog

*4:実装を見ると、再代入をかなり繰り返しますし、かつ Any や AnyRef などを多用していて型安全性も少し低めです。再代入を繰り返しているのは、パフォーマンス面を考慮してのことではないかと思いました。また、Any や AnyRef を全廃しようとすると、多くの型に対して個別実装を生やす必要が出てきて、実装の手間が膨大になりそうだなと思いました。それに加えて、Any(Ref) の範囲はこの IO 内に限られていて外には露出していませんので、問題ではないのかもしれませんね。

*5:このスタックの深さは、効率よく制御しないとパフォーマンスネックになる可能性があると想像できます。今回の実装では Scala の標準でついている ArrayStack を使用しましたが、cats では独自に Stack 実装を用意してありました。