Instruct RabbitMQ to resend undelivered messages periodically

By : Johan
Source: Stackoverflow.com
Question!

Background

We're using langohr to interact with RabbitMQ. We've tried two different approaches to let RabbitMQ resend messages that has not yet been properly handled by our service. One way that works is to send a basic.nack with requeue set to the true but this will resend the message immediately until the service responds with a basic.ack. This is a bit problematic if the service for example tries to persist the message to a datastore that is currently down (and is down for a while). It would be better for us to just fetch the undelivered messages say every 20 seconds or so (i.e. we neither do a basic.ack or basic.nack if the datastore is down, we just let the messages be retained in the queue). We've tried to implement this using an ExecutorService whose gist is implemented like this:

(let [chan (lch/open conn)]  ; We create a new channel since channels in Langohr are not thread-safe
    (log/info "Triggering \"recover\" for channel" chan)
    (try
      (lb/recover chan)
      (catch Exception e (log/error "Failed to call recover" e))
      (finally (lch/close chan))))

Unfortunately this doesn't seem to work (the messages are not redelivered and just remains in the queue). If we restart the service the queued messages are consumed correctly. However we have other services that are implemented using spring-rabbitmq (in Java) and they seem to be taking care of this out of the box. I've tried looking in the source code to figure out how they do it but I haven't managed to do so yet.

Question

How do you instruct RabbitMQ to (re-)deliver messages in the queue periodically (preferably using Langohr)?

By : Johan


Answers

I am not sure what you are doing with your Spring AMQP apps, but there's nothing built into RabbitMQ for this.

However, it's pretty easy to set up dead-lettering using a TTL to requeue back to the original queue after some period of time. See this answer for examples, links etc.

EDIT

However, Spring AMQP does have a retry interceptor which can be configured to suspend the consumer thread for some period(s) during retry.

Stateful retry rejects and requeues; stateless retry handles the retries internally and has no interaction with the broker during retries.



See this answer which has instructions: we Nack the message, the nack puts the message into a holding queue for N seconds, then it TTLs out of that queue and into another queue that puts it back in the original queue.

It took a little bit of work to setup, but it works great!

By : jhilden


This video can help you solving your question :)
By: admin