SQS のキューメッセージのハンドリング。
Java や Kotlin などの場合、spring-cloud-aws のライブラリを使うと便利ですよね。
通常運用であれば、削除ポリシーを「ON_SUCCESS」にすると自動判別してくれて楽なのですが、用途によっては意図的にキューに戻したいケースも。
今回は、手動で acknowledge(ack)する方法を紹介します。
deletionPolicyについて
全部で 5 つありますが、主に使われるのは以下の 3 つでしょうか。
ON_SUCCESS
NEVER
ALWAYS
ON_SUCCESS は成功すればメッセージが削除され、例外がスローされたらメッセージをキューに戻してくれます。
NEVER は意図的に acknowledge しないとメッセージは削除されません。
ALWAYS は常に削除するようですが、処理に失敗しても削除するような場面があまり思い浮かびません。
NEVERを使う場合
まずは、以下のアノテーションでキューの設定を定義します。
@SqsListener
value にキュー名、deletionPolicy は前述した通りです。
キューの名前が queue-hoge だとすると以下の定義となります。
1 2 3 4 | @SqsListener( value = ["queue-hoge"], deletionPolicy = SqsMessageDeletionPolicy.NEVER ) |
リスナーの定義ができたら、メッセージを受信する処理を実装していきます。
ちょっと雑なコードですが、JSON 文字列を Data のクラスにマッピングして、time が現在日時より未来ならキューに戻します。
現在日時を過ぎていたら処理を行って、成功すれば acknowledge してメッセージを削除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | fun handleNotificationMessage( @NotificationSubject subject: String, @NotificationMessage message: String, ack: Acknowledgment ) { runCatching { val data = jacksonObjectMapper().configure().toObject(message, Data::class.java) if (data.time > now()) { return } fuga() .also { // acknowledge ack.acknowledge() // log } }.getOrElse { throw it } } } |
ON_SUCCESSを使う場合
NEVER の時とほぼ同じですが、acknowledge が不要となります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | @SqsListener( value = ["queue-hoge"], deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS ) fun handleNotificationMessage( @NotificationSubject subject: String, @NotificationMessage message: String ) { runCatching { val data = jacksonObjectMapper().configure().toObject(message, Data::class.java) if (data.time > now()) { return } fuga() .also { // log } }.getOrElse { throw it } } } |