Java

KotlinでGCPのPubSubから同期プルした際のクローズ処理

以前、PHP を使った PubSub からのメッセージ取得(プル)について触れました。

Androidの月額課金でPub/Subからデベロッパー通知を受信する以前、Google の月額課金の状態はリアルタイムデベロッパー通知で制御すると楽だという話をしました。 https://itnek...

Java も同様にメッセージプルの実装サンプルが用意されていますが、同期プルのサンプルプログラムについて少し疑問がありました。

Pub/Sub は push と pull の両方のメッセージ配信をサポートします。pull サブスクリプションと push サブスクリプションの概要と比較については、サブスクライバーの概要をご覧ください。このドキュメントでは、pull 配信について説明します。push 配信の説明については、push サブスクライバー ガイドをご覧ください。

pull を使用したメッセージの受信(同期 pull)

それは、サブスクライバのインスタンスをクローズしないとエラーが発生することです。

今回はそのエラーの対処方法とリソースについて紹介したいと思います。

発生するエラー

サブスクライバ(GrpcSubscriberStub)をクローズさせずに処理を終了させると、以下のエラーが発生します。

shutdown() または shutdownNow() を呼び出せということでしょうか。

サンプルコードにそのような処理は書かれていないのですが、サブスクライバにメソッドがあるということは個別に対処しないといけないのでしょう。

ちなみに、サブスクライバには close() というメソッドもあるのですが、次のメジャーバージョンで「throws Exception」が削除されるとコメントが書かれていました。

現状は内部的に shutdown() が呼び出されていますが、何が違うのでしょうね・・・。

イメージとしてはこんな感じ。finally 使いたかったので runCatching じゃなくて一旦 try, catch で。

シャットダウンの呼び出し

エラーメッセージで指摘のあった通り、shutdownNow() を呼び出すようにしたところエラーは出力されなくなりました。

しかし、メモリ使用率が緩やかに右肩上がりなのが気になります。

かなり緩やかなのですが、解放される気配がないので何かガベージコレクトされてないものがあるのではないかと心配になります。

じゃあ shutdown() の方が正解なのかっと悩んだりもしたのですが、エラーメッセージの以下の部分を見逃していました。

and wait until awaitTermination() returns true.

shutdown() の後に awaitTermination() を呼んで true が返るまで待てというやつですね。

試しに以下のように実装してみましたが、これでもまだメモリの解放が甘そうに感じます。

まとめ

また原因が判明していないので消化不良なのですが、awaitTermination() を入れていなかった時は、ある一定のところまでメモリ消費するとそこからは横ばいになっていました。

よってメモリ不足に陥るということはなさそうですが、awaitTermination() を追加したことでメモリ使用率の変動がどうなるか様子見したいと思います。

また状況がわかり次第、追記していきます。