ScalikeJDBC streamsモジュールの使い方解説
ScalikeJDBCをReactive Streamsに対応させる記事を公開してから、ScalikeJDBC公式にモジュールが取り込まれる*1こととなり、ついに本日バージョン3.0がリリースされました!
この scalikejdbc-streams モジュールの使い方を解説するエントリです。
前置き
scalikejdbc-streamsは、全てのResultSetを読み込まずにDBがサポートするCURSORなどの仕組みを使ってストリーム処理を行えるよう設計されたDBアクセスのためのモジュールです。 Reactive Streamsに準拠しており、非同期処理をサポートします。 主にバッチアプリケーションでの利用を想定しています。 現在の実装では、ストリーム処理中にDBコネクションを保持したままとなるため、多くのトラフィックのあるWebアプリケーションでは、コネクションプールの上限などに気を配る必要があります。 WHERE/LIMIT(OFFSET)などで、少しずつデータをフェッチできる手段も将来的に用意したい*2と考えていますが、ストリームにしたいくらい大きなデータをWebアプリケーションで扱うことは少ないでしょう。*3
使い方
scalikejdbc-streamsは、enrich my libraryパターンでScalikeJDBCを拡張しています。 標準のモジュールではなく、追加モジュールである点に注意してください。sbtのlibraryDependenciesに以下のように追加します。
val scalikeJdbcVersion = "3.0.0" libraryDependencies ++= Seq( "org.scalikejdbc" %% "scalikejdbc" % scalikeJdbcVersion, "org.scalikejdbc" %% "scalikejdbc-streams" % scalikeJdbcVersion, // 使用DBに合ったJDBCライブラリ等 )
scalikejdbc-streamsでは、標準のScalikeJDBCに対して何も制限を加えません。 DBへのコネクションプールの初期化などは、ScalikeJDBCの標準的な方法で行えばOKです。*4
// Prepare a connection pool in advance. Class.forName("org.h2.Driver") ConnectionPool.singleton("jdbc:h2:file:./db/hello", "user", "pass")
Publisher
それでは、Reactive StreamsのPublisherを取得するまでの全体像です。リリースノートから抜粋します。
import scalikejdbc._ import scalikejdbc.streams._ import java.util.concurrent._ // ------------ // publisher val publisherExecutor = Executors.newFixedThreadPool(5) implicit val publisherEC = ExecutionContext.fromExecutor(publisherExecutor) val publisher: DatabasePublisher[Int] = DB readOnlyStream { sql"select id from users".map(r => r.int("id")).iterator }
前述のようにenrich my libraryパターンによる拡張であるため、 import scalikejdbc.streams._
とワイルドカードでインポートします。
これにより、ScalikeJDBCのDBオブジェクト、または、NamedDBオブジェクトから readOnlyStream
メソッドを呼ぶことができるようになります。このメソッドにStreamReadySQLオブジェクトを渡すことで、DatabasePublisherがインスタンス化されます。
StreamReadySQLの生成は、SQLオブジェクトへの拡張メソッドである iterator
メソッドを呼ぶことで行うことができます。
sql"select id from users".map(r => r.int("id"))
のように、ScalikeJDBCの提供する標準的なクエリビルダを利用することができます。*5
scalikejdbc-streamsの提供するDatabasePublisherへは、非同期実行のためにExecutionContextを渡す必要があります。 Scalaのベストプラクティスに則り、異なるExecutionContextを用意することを推奨します。
DBSessionの属性操作
scalikejdbc-streamsでは、対象のDBがMySQLまたはPostgreSQLの場合、自動的にストリーム処理に必要な属性を標準で有効化します。
この動作は、 StreamReadySQL#withDBSessionForceAdjuster
に関数を渡すことで変更することができます。
val publisher: DatabasePublisher[Int] = DB readOnlyStream { sql"select id from users".map(r => r.int("id")) .iterator .withDBSessionForceAdjuster(session => { session.conn.setAutoCommit(true) }) }
MySQLやPostgreSQL以外のDBを使っている場合は、各JDBCライブラリに応じて設定の有効化を行ってください。
自動的な有効化を抑止したい場合は、 session => ()
のように何もしない関数をこのメソッドに渡してください。
Subscriber
scalikejdbc-streamsはReactive Streams 1.0に準拠しているため、Reactive Streams仕様を満たすSubscriber実装であれば何でも使用することができます。 ここでは、Reactive Streamsで提供されているExampleのAsyncSubscriberを使った例を示します。リリースノートから抜粋します。
import org.reactivestreams.example.unicast.AsyncSubscriber // ------------ // subscriber val subscriberExecutor = Executors.newFixedThreadPool(5) val subscriber = new AsyncSubscriber[Int](subscriberExecutor) { override def whenNext(element: Int): Boolean = { // do something here log.info(s"element: ${element}") true } override def whenComplete(): Unit = { if (executor.awaitTermination(1, TimeUnit.SECONDS) == false) { log.warn("Timed out while waiting for all tasks terminated") } subscriberExecutor.shutdownNow() } } publisher.subscribe(subscriber)
ここでのポイントは、 publisher.subscribe(subscriber)
の一文のみです。
Publisherに対して、複数のSubscriberが購読することができます。これはReactive StreamsのSpecificationにて定められています。
Reactive StreamsのExampleで示されるAsyncSubscriberは、 whenNext
メソッドと whenComplete
メソッドをオーバライドする必要があります。
詳しくは、Subscriberの仕様および実装を参照してください。
Akka StreamのSourceにする方法
前回の記事では、例を示せていなかったので軽く紹介しておきたいと思います。 よくあるユースケースとして、SQLSyntaxSupportを使うことも多いかと思いますので、これを想定した例です。
case class Entity(id: Int, name: String) object Entity extends SQLSyntaxSupport[Entity] { // see also http://scalikejdbc.org/documentation/sql-interpolation.html val e = Entity.syntax("e") def streamBy(name: String): StreamReadySQL[Entity] = { sql"select * from ${Entity as e} where ${e.name} = ${name}".map(Entity(e.resultName)).iterator() } } implicit val publisherEC = ??? val publisher: DatabasePublisher[Entity] = DB readOnlyStream { Entity.streamBy("foo") } implicit val system = ActorSystem("streams") import system.dispatcher implicit val materializer = ActorMaterializer() Source.fromPublisher(publisher).filter(_.id % 2 == 0).runForeach(println)
前述のように、StreamReadySQLオブジェクトを DB#readOnlyStream
に渡すことでPublisherを取得できますので、ORMでは StreamReadySQLを返すようにすることで再利用性が向上します。
Akka StreamはReactive Streamsに準拠しているため、 Source#fromPublisher
メソッドで簡単にストリームのソースにすることができます。便利ですね。*6
さいごに
scalikejdbc-streamsでは、SELECTなどのデータ取得を行うPublisherのみを提供しています。 DDLやINSERTなどはサポートされませんので注意してください。
繰り返しになりますが、現状scalikejdbc-streamsの提供するDatabasePublisherはバッチアプリケーションでの利用を想定しています。 これは、DBコネクションを保持し続けてしまうことによるものですので、将来的にWHERE/LIMIT(OFFSET)などで、少しずつデータをフェッチできる手段が用意できれば、Webアプリケーションでの適用もやりやすくなると思っています。 Akka HTTPの登場によって、WebアプリケーションでもFlowを使った処理が書きやすくなり、Java 9では標準ライブラリにReactive Streamsが取り込まれる予定など、ストリーム処理が少しずつ一般化しつつあります。 JDBCのAsync対応は、まだまだずっと先と思われるため、DBライブラリがReactive StreamsをサポートすることはAkka Streamでの利用に限らず一定の価値があると思います。
引き続き、ScalikeJDBC(とscalikejdbc-streams)をよろしくお願いします。