yoskhdia’s diary

DDDとかプログラミングとかアーキテクチャとか雑多に

誰得: finagle-mysql + quill code reading

誰得情報ですが、せっかく読んだので公開しておきます。

Version

Clientの生成

  1. io.getquill.FinagleMysqlContextConfig にtypesafe Configを渡して #client が出発点
    1. io.getquill.FinagleMysqlContext のコンストラクタに生成したClientを渡すことで、DaoなどはこのContextで定義されている #executeQuery#transaction を経由してClientにアクセスする
  2. com.twitter.finagle.Mysql#client が呼ばれる
    1. = Mysql.Client#apply
      1. stack & params 引数は渡さないのでデフォルトが指定される
        1. stackパラメータには Mysql.Client.stack が設定される。これには StackClient#newStack をベースにしてTracingFilterとConnection管理をちょっと便利にする系のヘルパーを組み込んだStackが設定されている。
        2. paramsパラメータには StackClient.defaultParams をベースにしてDefaultPoolの設定のデフォルト値などが設定されている。
      2. Stackのなかには ClosableService がある。
        1. https://github.com/twitter/finagle/commit/c64bea0939a2c41dcc2addd8fcea3ad6f2af63e2
          1. (DeepL翻訳)newClientの ServiceFactory で作成されたセッションを閉じても、接続は接続プール内で生きているため、再利用を妨げることはありません。新しいクライアントスタックモジュールは DefaultPool の上にあります。FactoryToServiceが有効になっていない場合、DefaultPoolから返されるサービスは ClosableService でラップされ、閉じたセッションが再利用されないようにします。
      3. Stackにコネクションプールの設定が含まれる
        1. Finagle標準の仕組みでServiceインスタンスをプールできる。=DefaultPool
        2. DefaultPoolもServiceFactory
      4. Mysql.ClientMySQLサーバとの間のTransportを設定するために存在する
        1. com.twitter.finagle.client.StdStackClient をextendsしていて、TransporterとDispatcherを実装することで endpointer: EndpointerModule が動くよう定義されている。この endpointer が後述の EndpointerStackClient#newClient のなかで呼ばれて、 com.twitter.finagle.ServiceFactory が生成される。
    2. FinagleMysqlContextConfig に処理が戻って、 Mysql.Client#withXxx を使ってConfigを反映
    3. 最後に Mysql#newRichClient を呼んで Client インスタンスを生成する
      1. = com.twitter.finagle.mysql.Client#apply が呼ばれる ※小文字のmysqlに注意
      2. 第一引数に ServiceFactory をとり、ここで Client#newClient を呼ぶことで、そのImplである Mysql#newClient が呼ばれる。これはそのまま Mysql.Client#newClient にデリゲートされる。
      3. Mysql.Client#newClientEndpointerStackClient#newClient そのものであり、このなかでは Mysql.Client.stack に前述のendpointerをjoinして Stack#make によって ServiceFactory が生成される。これで、Stackに載せていた各種機能ががっちゃんこされる。
        1. また、 Mysql#newRitchClient に渡される dest: Stringcom.twitter.finagle.Resolver#evalLabeled によって名前解決されて com.twitter.finagle.Name になる。これがアドレスを指す。
          1. EndpointerStackClient#newClient ではこのNameを clientParams として他のparamsとくっつけて Stack#make に渡す
          2. 謎の魔法によって EndpointAddr になり、Transporterに設定される。
      4. 生成した ServiceFactory といくらかの設定(StatsReceiverなど)を mysql.Client#apply に渡す
        1. com.twitter.finagle.mysql.StdClient が生成される。こいつが、実際にクエリ実行やトランザクションの開始・終了をコントロールする。

Points

  • コネクションまわりの制御は Mysql.Client
    • Stackを組み立てて、MySQLトランスポートを組み込んで、ServiceFactoryをつくるところまで
  • 実行部分の制御は mysql.Client
    • ServiceFactoryからServiceをとって、クエリなどを投げる

DBコネクションの確立

  1. Mysql.Client がextendsしている com.twitter.finagle.client.StdStackClient#endpointerEndpointerModule をStackに載せる
  2. EndpointerModuleStack#make が呼ばれたとき、 com.twitter.finagle.mysql.MysqlTransporter を取得し ServiceFactory を生成する
  3. ServiceFactory#apply が呼ばれたとき MysqlTransporter#apply メソッドを呼ぶ
    1. = Service を取得するタイミング
    2. https://github.com/twitter/finagle/blob/finagle-19.12.0/finagle-core/src/main/scala/com/twitter/finagle/client/StdStackClient.scala#L75
    3. netty4 でコネクション(Transport)を取得して com.twitter.finagle.mysql.transport.MysqlTransportインスタンス
    4. com.twitter.finagle.mysql.Handshake#apply を呼び設定に従ってHandshakeオブジェクトを取得する
      1. PlainかSecureかはparamから解決される
    5. Handshake.connectionPhase が呼ばれ MysqlTransport を通してネゴシエーションがされる

クエリ実行

  1. StdClient が生成されるタイミングで ServiceFactory#toService が呼ばれる
    1. val で束縛しているが、 FactoryToService#apply(Request) では都度 ServiceFactory#apply() によってServiceを取得している
    2. #prepare#transaction も新しく ServiceFactory#apply() によってServiceを都度取得する
  2. ServiceFactory#toService でServiceを取得
    1. = new com.twitter.finagle.FactoryToService
  3. #query などでは Service#apply にRequestオブジェクトを投げる
    1. このRequestオブジェクトは com.twitter.finagle.mysql.QueryRequest など
    2. com.twitter.finagle.mysql.Request#toPacket によってPacketオブジェクトを取り出し、先述の Mysql.Client で定義されているTransporterによってプロトコル制御に渡される。

トランザクション

  1. StdClient#session が呼ばれる
    1. ServiceFactory#apply() によって(恐らくService自体にコネクションは組み込まれているので、ClientConnectionはnilでよい) Future[Service[..]] を取得
    2. sessionブロックのなかで使うClientインスタンスを用意しており、これは↑のServiceインスタンスをSingletonに利用する。これによって、前述のとおり FactoryToService#apply が返すServiceが同じになるため、トランザクションセッション中に同じコネクションが使われることが担保される。
      1. sessionブロックのなかでもう一段ならセッションを開始できるみたい(だけど、あまりやるべきでなはない)
    3. sessionブロックを抜けるときにServiceインスタンス#close が呼ばれる
      1. Client#closeServiceFactory#closeService#close で回収される
  2. sessionメソッドに f ブロックを渡す
    1. Optional SET TRANSACTION ISOLATION LEVEL -> START TRANSACTION -> f(client) -> COMMIT
    2. 途中のどこかで Throw(e @ WrappedChannelClosedException()) 結果となるとROLLBACKせずに終わる
      1. ChannelClosedException を cause まで再帰的に見てたどり着けばこの分岐
    3. それ以外の Throw でROLLBACKが実行される
      1. このROLLBACKにも失敗すると、 Client#discard (実体は StdClient#session のなかでつくられる StdClient with Session)を呼んで PoisonConnectionRequest をServiceに渡して自死させる。=コネクションを再利用されないよう閉じる
        1. Client生成時点でStackに PoisonConnection.module が積まれており、Serviceは PoisonableService に包まれている。PoisonConnectionRequest を受け取ると、 Service#status をClosedにし、 #apply(Request) が呼ばれると PoisonedConnectionException 例外を投げるようになる。実際の close メソッドはこのなかでは呼ばず、回収されるのを待つ感じになる。トランザクションに関しては前述の session 出口の close 地点。

Points