yoskhdia’s diary

DDDとかプログラミングとかアーキテクチャとか雑多に

ScalikeJDBCをAkka StreamsのSourceにする (ScalikeJDBC + Reactive Streams)

Scala Advent Calendar 2016の15日目です。

バッチアプリケーションを作ることになったので、Akka Streamsを使おうかと考えました。
Slickだと標準でstreamメソッドが用意されているため、Akka Streamsとも連携させやすいですが、ScalikeJDBCが学習コストが低くて素敵だったので、コレをAkka Streamsに組み込めないか試したエントリです。

DBはMySQLを使う前提ですので、ご注意ください。 (一部Postgresも引き合いにしてますが、OracleSQL Server等は調べてません。)

TL;DR

github.com

2017/05/20 追記

ScalikeJDBC 3.0に公式に取り込まれました。

github.com

JDBCブロッキング問題

現在、JDBCは同期インタフェースしか提供されていません。 asyncな実装を試みている人もいますが、標準仕様として提供されていないものをProductionで使うのは中々勇気がいります。

github.com

DBへの問い合わせはブロッキングされるとして、ExecutionContextを別にするなどスレッドを分離するのが肝要です。 とはいえ、ReactiveStreamsで長時間に渡ってサブスクライブするような場合、コネクションを掴んだままになってしまうため、コネクションプールの調整にも気をつける必要があります。

データがメモリに収まらないよ問題

大きなデータをSourceにするとOut of Memoryが発生することがあります。 これはJDBC実装のデフォルト挙動が一度に全てのデータを読むようになっているためです。 MySQLやPostgresではその回避策としてCURSORが提供されています。

よって、(大きなデータを扱う可能性のある)ストリームを使うための方法としては、2つあります。

  • CURSOR(Streaming)を使う
  • 自分でWHERE条件やLIMITなどで工夫する

前者のCURSORを使うには、Statementにいくつかの属性を付与してあげる必要があります。

MySQLの場合

MySQL :: MySQL Connector/J 5.1 Developer Guide :: 5.2 JDBC API Implementation Notes

の「ResultSet」項にあるとおり、

stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);

と設定すると1行ごと(row by row)に結果を取得できます。 TYPE_FORWARD_ONLYCONCUR_READ_ONLYcreateStatementのデフォルトのようなので、特に変更しなければ気にするのはsetFetchSizeだけです。

Postgresの場合

Chapter 5. Issuing a Query and Processing the Result

の「Getting results based on a cursor」項にあるとおり、

  • V3 protocol(サーバが7.4以上)で繋ぐ
  • オートコミットモードにしない
  • ResultSetタイプを ResultSet.TYPE_FORWARD_ONLY にする
  • シングルステートメントのクエリ(セミコロンで複数クエリにしない)

を満たしている必要があります。これらを満たしていれば stmt.setFetchSize(任意のサイズ); を指定します。

CURSOR(Streaming)と自前調整(WHERE, LIMIT)どっちが早い?

MySQLは1行ごとのため、自分で頑張ってWHEREやLIMITでまとめて回すのと、どちらがコスパが良いかはケースバイケースな気がしますが、以下の記事ではStreamingの方が早かったようです。*1 ただ、MySQLはLIMIT/OFFSETが一定のサイズを超えると遅いという話 *2なので、LIMITで頑張る場合は一工夫必要そうです。 knes1.github.io

SlickでのReactive Streamsの仕組み

Slick 3.1.1時点のdb#streamの仕組みはこんな感じでした。

f:id:yoskhdia:20161130185347p:plain

Reactive Streamsに対応する上でポイントは、以下のようなところでしょうか。

  • SubscriptionとしてSubscriberからリクエストされたときにデータベース(ないしバッファ)から取得する仕組みを実装する
  • Subscription#requestで続きの問い合わせを始める(最初の問い合わせで全件読んだ場合はメモリからの読み出し)
  • Subscription#cancelに対応する
  • Publisher#subscribeSubscriberが渡された際にSubscriber#onSubscribeを呼ぶ(↑のSubscriptionを渡す)
  • データベースからフェッチする数とSubscriberからリクエストされた数のバランスを調整しながらSubscriber#onNextを呼ぶ

Slickではこれを、DatabaseComponent、StreamingContext、StreamingInvokerAction、PositionedResultIterator辺りを主要なクラスとして実現しているようでした。 結構肝になっているのが、StreamingContextの実装で、ここにストリーミングに必要な状態が全て収められています。そのため、内部ではvarが多用されているのですが、異なるスレッドでの可視性を担保するためにメモリバリアを使って制御しています。
メモリバリア(とその前のvolatile)については以下を参照してください。

ScalikeJDBCでのSQL Statement実行の仕組み

大きく、データベースのコネクション周りを管理するクラス群とSQLを組み立てるクラス群があります。 コネクション周りでは scalikejdbc.DB, scalikejdbc.DBConnection, scalikejdbc.DBSession, scalikejdbc.SQL 辺りのコードを読めばクエリ実行の流れを追うことができます。
SQLの組み立てはString Interpolationなど別のクラス群で行われており、 scalikejdbc.SQL はPreparedStatementの一歩手前みたいな情報コンテナになっています。

scalikejdbc.DBscalikejdbc.DBConnection -> scalikejdbc.DBSessionのような感じで、コネクションプールから取り出されたコネクションがDBSessionとなり、scalikejdbc.SQL#applyのimplicitパラメータに渡されることで、クエリが実行されます。 この時、 scalikejdbc.DBSession では、 scalikejdbc.SQL からPreparedStatementを準備し scalikejdbc.StatementExecutor生成してPreparedStatementへ実行を委譲します。 scalikejdbc.StatementExecutor ではPreparedStatementにパラメータをバインドし、データベースに問い合わせて結果を返します。

そのため、CURSORを実現するためには、DBSessionになるまでの間にコネクションに属性(AutoCommitなど)を付与し、SQL#fetchSize(Int) を忘れずに呼んでおけば大丈夫そうです。

余談

fetchSizeは scalikejdbc.SQL にも scalikejdbc.DBSession にもあるのですが、どうやら scalikejdbc.SQL に付与しておくのが一番安全そうです。

余力が出来たら、どこかでトライしてみます…。

2017/05/20 追記

修正しました。

github.com

Reactive Streamsのコード例

Reactive Streamsリポジトリを見ると、Specificationなどの他にExampleが公開されています。

github.com

最初はこれをベースに実装していたのですが、 Subscription#cancel のときの動作がキューが溜まっているとされないのでは?のようなモヤモヤ感があり、結局Slickの実装をベースにしました。

終わりに

Slickは2条項BSDライセンスなので、無保証であることと元の著作権表示をすれば改変できるとのことでLICENSEファイルに併記するようにしたのですけれど、これで良いのですかね…。ライセンスに詳しい人、教えてください…。実装し直した方が良いのかな…。

*1:WHEREで頑張る方がインデックスも選択できて良いよ、みたいな話も? How does MySQL result set streaming perform vs fetching the whole JDBC ResultSet at once | Vlad Mihalcea's Blog

*2:他にもこんな記事も MySQL ORDER BY / LIMIT performance: late row lookups at EXPLAIN EXTENDED