yoskhdia’s diary

DDDとかプログラミングとかアーキテクチャとか雑多に

ScalikeJDBC streamsモジュールの使い方解説

ScalikeJDBCをReactive Streamsに対応させる記事を公開してから、ScalikeJDBC公式にモジュールが取り込まれる*1こととなり、ついに本日バージョン3.0がリリースされました!

github.com

この scalikejdbc-streams モジュールの使い方を解説するエントリです。

前置き

scalikejdbc-streamsは、全てのResultSetを読み込まずにDBがサポートするCURSORなどの仕組みを使ってストリーム処理を行えるよう設計されたDBアクセスのためのモジュールです。 Reactive Streamsに準拠しており、非同期処理をサポートします。 主にバッチアプリケーションでの利用を想定しています。 現在の実装では、ストリーム処理中にDBコネクションを保持したままとなるため、多くのトラフィックのあるWebアプリケーションでは、コネクションプールの上限などに気を配る必要があります。 WHERE/LIMIT(OFFSET)などで、少しずつデータをフェッチできる手段も将来的に用意したい*2と考えていますが、ストリームにしたいくらい大きなデータをWebアプリケーションで扱うことは少ないでしょう。*3

使い方

scalikejdbc-streamsは、enrich my libraryパターンでScalikeJDBCを拡張しています。 標準のモジュールではなく、追加モジュールである点に注意してください。sbtのlibraryDependenciesに以下のように追加します。

val scalikeJdbcVersion = "3.0.0"
libraryDependencies ++= Seq(
  "org.scalikejdbc" %% "scalikejdbc" % scalikeJdbcVersion,
  "org.scalikejdbc" %% "scalikejdbc-streams" % scalikeJdbcVersion,
  // 使用DBに合ったJDBCライブラリ等
)

scalikejdbc-streamsでは、標準のScalikeJDBCに対して何も制限を加えません。 DBへのコネクションプールの初期化などは、ScalikeJDBCの標準的な方法で行えばOKです。*4

// Prepare a connection pool in advance.
Class.forName("org.h2.Driver")
ConnectionPool.singleton("jdbc:h2:file:./db/hello", "user", "pass")

Publisher

それでは、Reactive StreamsのPublisherを取得するまでの全体像です。リリースノートから抜粋します。

import scalikejdbc._
import scalikejdbc.streams._
import java.util.concurrent._

// ------------
// publisher
val publisherExecutor = Executors.newFixedThreadPool(5)
implicit val publisherEC = ExecutionContext.fromExecutor(publisherExecutor)
val publisher: DatabasePublisher[Int] = DB readOnlyStream {
  sql"select id from users".map(r => r.int("id")).iterator
}

前述のようにenrich my libraryパターンによる拡張であるため、 import scalikejdbc.streams._ワイルドカードでインポートします。 これにより、ScalikeJDBCのDBオブジェクト、または、NamedDBオブジェクトから readOnlyStream メソッドを呼ぶことができるようになります。このメソッドにStreamReadySQLオブジェクトを渡すことで、DatabasePublisherがインスタンス化されます。

StreamReadySQLの生成は、SQLオブジェクトへの拡張メソッドである iterator メソッドを呼ぶことで行うことができます。 sql"select id from users".map(r => r.int("id")) のように、ScalikeJDBCの提供する標準的なクエリビルダを利用することができます。*5

scalikejdbc-streamsの提供するDatabasePublisherへは、非同期実行のためにExecutionContextを渡す必要があります。 Scalaのベストプラクティスに則り、異なるExecutionContextを用意することを推奨します。

DBSessionの属性操作

scalikejdbc-streamsでは、対象のDBがMySQLまたはPostgreSQLの場合、自動的にストリーム処理に必要な属性を標準で有効化します。 この動作は、 StreamReadySQL#withDBSessionForceAdjuster に関数を渡すことで変更することができます。

val publisher: DatabasePublisher[Int] = DB readOnlyStream {
  sql"select id from users".map(r => r.int("id"))
    .iterator
    .withDBSessionForceAdjuster(session => {
      session.conn.setAutoCommit(true)
    })
}

MySQLPostgreSQL以外のDBを使っている場合は、各JDBCライブラリに応じて設定の有効化を行ってください。 自動的な有効化を抑止したい場合は、 session => () のように何もしない関数をこのメソッドに渡してください。

Subscriber

scalikejdbc-streamsはReactive Streams 1.0に準拠しているため、Reactive Streams仕様を満たすSubscriber実装であれば何でも使用することができます。 ここでは、Reactive Streamsで提供されているExampleのAsyncSubscriberを使った例を示します。リリースノートから抜粋します。

import org.reactivestreams.example.unicast.AsyncSubscriber

// ------------
// subscriber
val subscriberExecutor = Executors.newFixedThreadPool(5)
val subscriber = new AsyncSubscriber[Int](subscriberExecutor) {
  override def whenNext(element: Int): Boolean = { 
    // do something here
    log.info(s"element: ${element}")
    true 
  }
  override def whenComplete(): Unit = {
    if (executor.awaitTermination(1, TimeUnit.SECONDS) == false) {
      log.warn("Timed out while waiting for all tasks terminated")
    }
    subscriberExecutor.shutdownNow()
  }
}
publisher.subscribe(subscriber)

ここでのポイントは、 publisher.subscribe(subscriber) の一文のみです。 Publisherに対して、複数のSubscriberが購読することができます。これはReactive StreamsのSpecificationにて定められています。

Reactive StreamsのExampleで示されるAsyncSubscriberは、 whenNext メソッドと whenComplete メソッドをオーバライドする必要があります。 詳しくは、Subscriberの仕様および実装を参照してください。

Akka StreamのSourceにする方法

前回の記事では、例を示せていなかったので軽く紹介しておきたいと思います。 よくあるユースケースとして、SQLSyntaxSupportを使うことも多いかと思いますので、これを想定した例です。

case class Entity(id: Int, name: String)
object Entity extends SQLSyntaxSupport[Entity] {
  // see also http://scalikejdbc.org/documentation/sql-interpolation.html
  val e = Entity.syntax("e")
  def streamBy(name: String): StreamReadySQL[Entity] = {
    sql"select * from ${Entity as e} where ${e.name} = ${name}".map(Entity(e.resultName)).iterator()
  }
}

implicit val publisherEC = ???
val publisher: DatabasePublisher[Entity] = DB readOnlyStream {
  Entity.streamBy("foo")
}

implicit val system = ActorSystem("nexusReporter")
import system.dispatcher
implicit val materializer = ActorMaterializer()
Source.fromPublisher(publisher).filter(_.id % 2 == 0).runForeach(println)

前述のように、StreamReadySQLオブジェクトを DB#readOnlyStream に渡すことでPublisherを取得できますので、ORMでは StreamReadySQLを返すようにすることで再利用性が向上します。

Akka StreamはReactive Streamsに準拠しているため、 Source#fromPublisher メソッドで簡単にストリームのソースにすることができます。便利ですね。*6

さいごに

scalikejdbc-streamsでは、SELECTなどのデータ取得を行うPublisherのみを提供しています。 DDLやINSERTなどはサポートされませんので注意してください。

繰り返しになりますが、現状scalikejdbc-streamsの提供するDatabasePublisherはバッチアプリケーションでの利用を想定しています。 これは、DBコネクションを保持し続けてしまうことによるものですので、将来的にWHERE/LIMIT(OFFSET)などで、少しずつデータをフェッチできる手段が用意できれば、Webアプリケーションでの適用もやりやすくなると思っています。 Akka HTTPの登場によって、WebアプリケーションでもFlowを使った処理が書きやすくなり、Java 9では標準ライブラリにReactive Streamsが取り込まれる予定など、ストリーム処理が少しずつ一般化しつつあります。 JDBCのAsync対応は、まだまだずっと先と思われるため、DBライブラリがReactive StreamsをサポートすることはAkka Streamでの利用に限らず一定の価値があると思います。

引き続き、ScalikeJDBC(とscalikejdbc-streams)をよろしくお願いします。

*1:そのため、オリジナルのリポジトリはメンテナンスを終了しています。

*2:クエリによってはバッチアプリケーションでも有用な手法と思っています。

*3:と願いたい…

*4:個人的にはいつも scalikejdbc-config を利用しています。

*5:勿論、typesafeなクエリビルダを使ってSQLオブジェクトを生成することもできます。

*6:AkkaではSource#unfoldResourceを使うことでscalikejdbc-streamsと同様のことはできちゃうんですが、細かな最適化が行われている点ではscalikejdbc-streamsの方が有利です。多分。