以前、PHP を使った PubSub からのメッセージ取得(プル)について触れました。

Java も同様にメッセージプルの実装サンプルが用意されていますが、同期プルのサンプルプログラムについて少し疑問がありました。
Pub/Sub は push と pull の両方のメッセージ配信をサポートします。pull サブスクリプションと push サブスクリプションの概要と比較については、サブスクライバーの概要をご覧ください。このドキュメントでは、pull 配信について説明します。push 配信の説明については、push サブスクライバー ガイドをご覧ください。
それは、サブスクライバのインスタンスをクローズしないとエラーが発生することです。
今回はそのエラーの対処方法とリソースについて紹介したいと思います。
発生するエラー
サブスクライバ(GrpcSubscriberStub)をクローズさせずに処理を終了させると、以下のエラーが発生します。
1 2 | *~*~*~ Channel ManagedChannelImpl{logId=93, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~* Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true. |
shutdown() または shutdownNow() を呼び出せということでしょうか。
サンプルコードにそのような処理は書かれていないのですが、サブスクライバにメソッドがあるということは個別に対処しないといけないのでしょう。
ちなみに、サブスクライバには close() というメソッドもあるのですが、次のメジャーバージョンで「throws Exception」が削除されるとコメントが書かれていました。
現状は内部的に shutdown() が呼び出されていますが、何が違うのでしょうね・・・。
1 2 3 4 5 | // NEXT_MAJOR_VER: remove 'throws Exception' @Override public final void close() throws Exception { shutdown(); } |
イメージとしてはこんな感じ。finally 使いたかったので runCatching じゃなくて一旦 try, catch で。
1 2 3 4 5 6 7 8 9 10 | subscriber: GrpcSubscriberStub = null try { // pull && ack } catch (e: Exception) { // error log } finally { if (subscriber != null && !subscriber.isShutdown) { subscriber.shutdown() } } |
シャットダウンの呼び出し
エラーメッセージで指摘のあった通り、shutdownNow() を呼び出すようにしたところエラーは出力されなくなりました。
しかし、メモリ使用率が緩やかに右肩上がりなのが気になります。
かなり緩やかなのですが、解放される気配がないので何かガベージコレクトされてないものがあるのではないかと心配になります。
じゃあ shutdown() の方が正解なのかっと悩んだりもしたのですが、エラーメッセージの以下の部分を見逃していました。
and wait until awaitTermination() returns true.
shutdown() の後に awaitTermination() を呼んで true が返るまで待てというやつですね。
試しに以下のように実装してみましたが、これでもまだメモリの解放が甘そうに感じます。
1 2 | subscriber.shutdown() subscriber.awaitTermination(1, TimeUnit.MINUTES) |
まとめ
また原因が判明していないので消化不良なのですが、awaitTermination() を入れていなかった時は、ある一定のところまでメモリ消費するとそこからは横ばいになっていました。
よってメモリ不足に陥るということはなさそうですが、awaitTermination() を追加したことでメモリ使用率の変動がどうなるか様子見したいと思います。
また状況がわかり次第、追記していきます。