RabbitMQ

Ack / Nack / Reject の実装とベストプラクティス

basicAck

用途(ようと) :「このメッセージは正常(せいじょう)消費(しょうひ) され処理(しょり)完了(かんりょう) した」ことを(しめ) す。

構文(こうぶん) channel.basicAck(deliveryTag, multiple)

パラメータ説明(せつめい)
deliveryTagメッセージの一意(いちい) 識別子(しきべつし) (RabbitMQ が()() て)
multipletrue場合(ばあい) 、deliveryTag 以下(いか) のすべてのメッセージを一括(いっかつ) ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

ベストプラクティス

  • 通常(つうじょう) 、ビジネスロジックが成功(せいこう) した(あと) に Ack することを推奨(すいしょう)

  • Spring AMQP を使用(しよう) する場合(ばあい)ackMode = MANUALchannel.basicAck()()() わせる

  • try (そと) で ack しないようにし、エラー() のメッセージ損失(そんしつ)(ふせ)

basicNack

用途(ようと) :broker に「処理(しょり) 失敗(しっぱい) 」を明示的(めいじてき)(つた) え、再配信(さいはいしん) するかどうかを選択(せんたく) できる。

構文(こうぶん) channel.basicNack(deliveryTag, multiple, requeue)

パラメータ説明(せつめい)
requeue = trueキューに(もど) す(通常(つうじょう)重複(じゅうふく) 消費(しょうひ)発生(はっせい)
requeue = false破棄(はき) または DLX(デッドレターキュー)に転送(てんそう)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

ベストプラクティス

  • ビジネスが再試行(さいしこう) 可能(かのう)場合(ばあい)requeue = true設定(せってい)

  • データ異常(いじょう) (フォーマットエラー、回復(かいふく) 不可能(ふかのう) )を検出(けんしゅつ) した場合(ばあい)requeue = false → DLX で処理(しょり)

  • 無限(むげん) requeue(メッセージストーム)を() け、retry 機構(きこう) や DLX で回数(かいすう)制限(せいげん)

basicReject

用途(ようと) 単一(たんいつ) のメッセージを拒否(きょひ)複数(ふくすう)一度(いちど)拒否(きょひ) できない)。

構文(こうぶん) channel.basicReject(deliveryTag, requeue)

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

ベストプラクティス

  • 機能(きのう)basicNack類似(るいじ) しているが、単一(たんいつ) メッセージのみ対象(たいしょう)

  • 通常(つうじょう)basicNack(ほう)機能(きのう)豊富(ほうふ) なため、basicNack使用(しよう)推奨(すいしょう)

推奨の実装テンプレート

@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
    long tag = message.getMessageProperties().getDeliveryTag();
    try {
        String body = new String(message.getBody());
        // ビジネスロジックを処理
        processTask(body);

        // 成功 → Ack
        channel.basicAck(tag, false);
    } catch (BusinessException e) {
        // 再試行可能なエラー
        channel.basicNack(tag, false, true);
    } catch (Exception e) {
        // 回復不可能なエラー → デッドレターキューへ
        channel.basicReject(tag, false);
    }
}

Retry 機構(きこう) () きバージョン:

@RabbitListener(queues = "task.queue", ackMode = "MANUAL")
public void onMessage(Message message, Channel channel, CustomMessage payload) throws IOException {
    long tag = message.getMessageProperties().getDeliveryTag();
    Map<String, Object> headers = message.getMessageProperties().getHeaders();
    int retryCount = (int) headers.getOrDefault("x-retry-count", 0);

    try {
        // ビジネス層 payload でロジックを実行
        processBusinessLogic(payload);

        // 成功 → Ack
        channel.basicAck(tag, false);
    } catch (TemporaryException e) {
        // 一時的なエラー → Retry 機構
        if (retryCount < 3) {
            Message newMessage = MessageBuilder
                .withBody(message.getBody())
                .copyHeaders(headers)
                .setHeader("x-retry-count", retryCount + 1)
                .build();

            rabbitTemplate.convertAndSend(
                message.getMessageProperties().getReceivedExchange(),
                message.getMessageProperties().getReceivedRoutingKey(),
                newMessage
            );
        } else {
            log.error("Message {} failed 3 times: {}", payload.getTaskId(), e.getMessage());
        }
        channel.basicAck(tag, false);
    } catch (Exception e) {
        // 非一時的なエラー → 破棄
        log.error("Fatal error on message: {}", payload, e);
        channel.basicReject(tag, false);
    }
}

よく使う Class / Method / Property 一覧表

クラス / コンポーネント属性(ぞくせい) / メソッド説明(せつめい)
ChannelbasicAck(long tag, boolean multiple)手動(しゅどう) でメッセージ処理(しょり) 成功(せいこう)確認(かくにん)
ChannelbasicNack(long tag, boolean multiple, boolean requeue)手動(しゅどう) 拒否(きょひ) (キューに(もど) すか選択(せんたく) 可能(かのう)
ChannelbasicReject(long tag, boolean requeue)単一(たんいつ) メッセージを拒否(きょひ)
ChannelbasicConsume(...)キューからメッセージの消費(しょうひ)開始(かいし)
MessagegetBody()メッセージ内容(ないよう)取得(しゅとく) (byte[])
MessagegetMessageProperties()メッセージ属性(ぞくせい) オブジェクトを取得(しゅとく)
MessagePropertiesgetDeliveryTag()delivery tag(メッセージ一意(いちい) 識別子(しきべつし) )を取得(しゅとく)
MessagePropertiesgetHeaders()カスタム headers(metadata)
MessagePropertiesgetCorrelationId()Request-Response パターン(よう)
MessagePropertiesgetConsumerQueue()メッセージが(ぞく) するキュー
@RabbitListenerackModeAck モードを設定(せってい)AUTOMANUALNONE
AcknowledgeModeAUTO / MANUAL / NONEコンシューマー確認(かくにん) モード
AmqpRejectAndDontRequeueException例外(れいがい) クラス拒否(きょひ) して(さい) キューしない標準(ひょうじゅん) 例外(れいがい)
RabbitTemplateconvertAndSend(exchange, routingKey, message)指定(してい) の Exchange にメッセージを送信(そうしん)
SimpleMessageListenerContainersetAcknowledgeMode(AcknowledgeMode.MANUAL)手動(しゅどう) ack モードを設定(せってい)

Ack モード比較

モード動作(どうさ)適用(てきよう) シナリオ
AUTO処理(しょり) 成功(せいこう) ()自動(じどう) ack、例外(れいがい)() げると自動(じどう) nack して requeueビジネスロジックが単純(たんじゅん)
MANUALbasicAck / basicNack自分(じぶん)()()必要(ひつよう) あり複雑(ふくざつ) なビジネス、正確(せいかく) なリトライ制御(せいぎょ)必要(ひつよう)
NONEack を(かえ) さない、メッセージが重複(じゅうふく) する可能性(かのうせい) あり非常(ひじょう)(まれ) 、テスト(よう) のみ

実務上の推奨事項(ベストプラクティスチェックリスト)

カテゴリ推奨(すいしょう)
リトライ制御(せいぎょ)Spring RetryTemplate またはカスタム retry 回数(かいすう)使用(しよう) し、無限(むげん) requeue を() ける
デッドレターキュー (DLX)消費(しょうひ) できないメッセージを処理(しょり) するデッドレターエクスチェンジを設定(せってい)
冪等性(べきとうせい)消費(しょうひ) ロジックで重複(じゅうふく) メッセージが副作用(ふくさよう)() こさないことを確認(かくにん)
ロギング & トレーシングdeliveryTagmessageIdcorrelationId記録(きろく) して追跡(ついせき)容易(ようい)
一括(いっかつ) AckbasicAck(tag, true) でパフォーマンス向上(こうじょう) (データ整合性(せいごうせい)注意(ちゅうい)
モニタリングRabbitMQ Management Plugin で unacked 状態(じょうたい)監視(かんし)