ScalikeJDBCをAkka StreamsのSourceにする (ScalikeJDBC + Reactive Streams)
Scala Advent Calendar 2016の15日目です。
バッチアプリケーションを作ることになったので、Akka Streamsを使おうかと考えました。
Slickだと標準でstreamメソッドが用意されているため、Akka Streamsとも連携させやすいですが、ScalikeJDBCが学習コストが低くて素敵だったので、コレをAkka Streamsに組み込めないか試したエントリです。
DBはMySQLを使う前提ですので、ご注意ください。 (一部Postgresも引き合いにしてますが、OracleやSQL Server等は調べてません。)
TL;DR
2017/05/20 追記
ScalikeJDBC 3.0に公式に取り込まれました。
JDBCブロッキング問題
現在、JDBCは同期インタフェースしか提供されていません。 asyncな実装を試みている人もいますが、標準仕様として提供されていないものをProductionで使うのは中々勇気がいります。
DBへの問い合わせはブロッキングされるとして、ExecutionContextを別にするなどスレッドを分離するのが肝要です。 とはいえ、ReactiveStreamsで長時間に渡ってサブスクライブするような場合、コネクションを掴んだままになってしまうため、コネクションプールの調整にも気をつける必要があります。
データがメモリに収まらないよ問題
大きなデータをSourceにするとOut of Memoryが発生することがあります。 これはJDBC実装のデフォルト挙動が一度に全てのデータを読むようになっているためです。 MySQLやPostgresではその回避策としてCURSORが提供されています。
よって、(大きなデータを扱う可能性のある)ストリームを使うための方法としては、2つあります。
- CURSOR(Streaming)を使う
- 自分でWHERE条件やLIMITなどで工夫する
前者のCURSORを使うには、Statementにいくつかの属性を付与してあげる必要があります。
MySQLの場合
MySQL :: MySQL Connector/J 5.1 Developer Guide :: 5.2 JDBC API Implementation Notes
の「ResultSet」項にあるとおり、
stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY); stmt.setFetchSize(Integer.MIN_VALUE);
と設定すると1行ごと(row by row)に結果を取得できます。
TYPE_FORWARD_ONLY
と CONCUR_READ_ONLY
はcreateStatementのデフォルトのようなので、特に変更しなければ気にするのはsetFetchSizeだけです。
Postgresの場合
Chapter 5. Issuing a Query and Processing the Result
の「Getting results based on a cursor」項にあるとおり、
- V3 protocol(サーバが7.4以上)で繋ぐ
- オートコミットモードにしない
- ResultSetタイプを
ResultSet.TYPE_FORWARD_ONLY
にする - シングルステートメントのクエリ(セミコロンで複数クエリにしない)
を満たしている必要があります。これらを満たしていれば stmt.setFetchSize(任意のサイズ);
を指定します。
CURSOR(Streaming)と自前調整(WHERE, LIMIT)どっちが早い?
MySQLは1行ごとのため、自分で頑張ってWHEREやLIMITでまとめて回すのと、どちらがコスパが良いかはケースバイケースな気がしますが、以下の記事ではStreamingの方が早かったようです。*1 ただ、MySQLはLIMIT/OFFSETが一定のサイズを超えると遅いという話 *2なので、LIMITで頑張る場合は一工夫必要そうです。 knes1.github.io
SlickでのReactive Streamsの仕組み
Slick 3.1.1時点のdb#streamの仕組みはこんな感じでした。
Reactive Streamsに対応する上でポイントは、以下のようなところでしょうか。
Subscription
としてSubscriber
からリクエストされたときにデータベース(ないしバッファ)から取得する仕組みを実装するSubscription#request
で続きの問い合わせを始める(最初の問い合わせで全件読んだ場合はメモリからの読み出し)Subscription#cancel
に対応するPublisher#subscribe
にSubscriber
が渡された際にSubscriber#onSubscribe
を呼ぶ(↑のSubscriptionを渡す)- データベースからフェッチする数と
Subscriber
からリクエストされた数のバランスを調整しながらSubscriber#onNext
を呼ぶ
Slickではこれを、DatabaseComponent、StreamingContext、StreamingInvokerAction、PositionedResultIterator辺りを主要なクラスとして実現しているようでした。
結構肝になっているのが、StreamingContextの実装で、ここにストリーミングに必要な状態が全て収められています。そのため、内部ではvarが多用されているのですが、異なるスレッドでの可視性を担保するためにメモリバリアを使って制御しています。
メモリバリア(とその前のvolatile)については以下を参照してください。
ScalikeJDBCでのSQL Statement実行の仕組み
大きく、データベースのコネクション周りを管理するクラス群とSQLを組み立てるクラス群があります。
コネクション周りでは scalikejdbc.DB
, scalikejdbc.DBConnection
, scalikejdbc.DBSession
, scalikejdbc.SQL
辺りのコードを読めばクエリ実行の流れを追うことができます。
※SQLの組み立てはString Interpolationなど別のクラス群で行われており、 scalikejdbc.SQL
はPreparedStatementの一歩手前みたいな情報コンテナになっています。
scalikejdbc.DB
≒ scalikejdbc.DBConnection
-> scalikejdbc.DBSession
のような感じで、コネクションプールから取り出されたコネクションがDBSessionとなり、scalikejdbc.SQL#apply
のimplicitパラメータに渡されることで、クエリが実行されます。
この時、 scalikejdbc.DBSession
では、 scalikejdbc.SQL
からPreparedStatementを準備し scalikejdbc.StatementExecutor
を生成してPreparedStatementへ実行を委譲します。
scalikejdbc.StatementExecutor
ではPreparedStatementにパラメータをバインドし、データベースに問い合わせて結果を返します。
そのため、CURSORを実現するためには、DBSessionになるまでの間にコネクションに属性(AutoCommitなど)を付与し、SQL#fetchSize(Int)
を忘れずに呼んでおけば大丈夫そうです。
余談
fetchSizeは scalikejdbc.SQL
にも scalikejdbc.DBSession
にもあるのですが、どうやら scalikejdbc.SQL
に付与しておくのが一番安全そうです。
おや?ScalikeJDBCでDBSession#fetchSizeをしてもSQLインスタンスのもつfetchSizeで上書きされてしまっているような?(動作未確認、コードリーディングレベル) https://t.co/ACbngVBL2B
— Okuda (@yoskhdia) 2016年12月13日
よくよく見るとDBSessionの中で各種クエリ実行時にusingを使ってて、これで最後に初期化もしてるな… https://t.co/LXXHqLjN5D
— Okuda (@yoskhdia) 2016年12月13日
@yoskhdia ありがとうございます。問題点理解しました。不具合といって良さそうに思いますね。もしよろしければなおしていただければと。。
— seratch(ja) (@seratch_ja) 2016年12月13日
余力が出来たら、どこかでトライしてみます…。
2017/05/20 追記
修正しました。
Reactive Streamsのコード例
Reactive Streamsのリポジトリを見ると、Specificationなどの他にExampleが公開されています。
最初はこれをベースに実装していたのですが、 Subscription#cancel
のときの動作がキューが溜まっているとされないのでは?のようなモヤモヤ感があり、結局Slickの実装をベースにしました。
終わりに
Slickは2条項BSDライセンスなので、無保証であることと元の著作権表示をすれば改変できるとのことでLICENSEファイルに併記するようにしたのですけれど、これで良いのですかね…。ライセンスに詳しい人、教えてください…。実装し直した方が良いのかな…。
お客様の中に二条項BSDライセンスのOSSから、一部コードを持ってきて手を入れた場合のライセンス表記をどうすれば良いかご存知の方はいらっしゃいませんか…?
— Okuda (@yoskhdia) 2016年12月13日
*1:WHEREで頑張る方がインデックスも選択できて良いよ、みたいな話も? How does MySQL result set streaming perform vs fetching the whole JDBC ResultSet at once | Vlad Mihalcea's Blog
*2:他にもこんな記事も MySQL ORDER BY / LIMIT performance: late row lookups at EXPLAIN EXTENDED