Using Ruby and Amazon SQS FIFO Queues
06 Aug 2017For those not familiar with Amazon SQS, it’s a reliable and high-scalable fully managed queue service. If you need to process in background, and all that jazz, but without the overhead of managing your queue service, go for it. It can be also free depending on our usage.
This post is about Amazon SQS FIFO Queues, which is a type of queue designed to guarantee that messages are processed exactly once and in the exact order that they are sent.
Shoryuken
The examples in this post are based on Shoryuken, a SQS thread based message processor in Ruby.
Why Shoryuken? aws-sdk-ruby is great, but it’s just an API client. Shoryuken empowers the SDK by adding support for continuously polling and thread based processing, besides that, it also adds abstractions simplifying the integration with SQS, including Active Job support.
FIFO Queues
There are two key attributes when working with FIFO Queues: Message Group ID and Message Deduplication ID.
Message Group ID
For the Message Group ID you can either use the same message group for all messages, or group them by some business logic, let’s say account_id
, or whatever that makes sense to you. The messages will be received in the order they were sent to their group.
Shoryuken automatically sets the Message Group ID to ShoryukenMessage
if it is not provided.
# Shoryuken will set the message_group_id to ShoryukenMessage
MyWorker.perform_async('test')
# User-defined message_group_id
MyWorker.perform_async('test', message_group_id: '...')
See Message Group ID.
Message Deduplication ID
For exactly-once processing, FIFO Queues use the Message Deduplication ID for identifying and rejecting duplicates sent within a 5 minutes interval. The Message Deduplication is auto generated based on the message body if you enable content-based deduplication when creating your queue.
Shoryuken automatically sets the Deduplication ID to a SHA-256 hash of the message body if it is not provided.
# Shoryuken will set the messaged_deduplication_id to a SHA-256 hash using the message body
MyWorker.perform_async('test')
# User-defined message_deduplication_id
MyWorker.perform_async('test', message_deduplication_id: '...')
Getting started
A few steps for using message groups and Shoryuken.
Create a FIFO Queue
shoryuken sqs create queue.fifo
Create a worker
class HelloWorker
include Shoryuken::Worker
shoryuken_options queue: 'queue.fifo', auto_delete: true
def perform(sqs_msg, name)
puts "Hello, #{name}"
end
end
Send messages
HelloWorker.perform_async('Ken 1', message_group_id: 'hello')
HelloWorker.perform_async('Ken 2', message_group_id: 'hello')
HelloWorker.perform_async('Ken 3', message_group_id: 'hello')
Start Shoryuken
shoryuken -q queue -r ./hello_worker.rb
That’s it. Shoryuken will start processing your messages.
Sequential processing per group
By default, Shoryuken tries to receive as many messages as it can, limited to the number of the available threads (see concurrency) and the SQS limit of 10 messages per request. If there are 10 available threads, it may receive up to 10 messages for the same Message Group ID and process them all in parallel.
If you want to sequentially process one message at a time, set max_number_of_messages: 1
(see receive options), with that, if you send messages as follows:
ExpireMembershipsWorker.perform_async(date)
ActivatePendingMembershipsWorker.perform_async(date)
RenewMembershipsWorker.perform_async(date)
Shoryuken will receive the message for ExpireMembershipsWorker
, process, receive the message forActivatePendingMembershipsWorker
, process, receive the message for RenewMembershipsWorker
and process, one by one.
How about multiple processes and other clients?
Every time Shoryuken (or any other SQS client) receives a message associated with a Message Group ID, SQS locks the group until the message gets deleted or its visibility timeout expires. Preventing Shoryuken or other SQS clients to receive messages for the locked group.
Using FIFO Queues for rate limiting
Given that, you can sequentially process messages per group, you could easily use FIFO for controlling the number of requests per second.
Shoryuken.sqs_client_receive_message_opts = {
max_number_of_messages: 1
}
class SyncWithExternalAPIWorker
include Shoryuken::Worker
shoryuken_options queue: 'queue.fifo', auto_delete: true
def perform(sqs_msg, name)
# call an external API that supports max 10 calls per second...
# limit the max number processing per second to 10
sleep(0.1)
end
end
I don’t recommend long running workers, I would go with sleep
only for a short delay. For longer delays, you can try to take advantage of the visibility timeout or delay queues.
Should I use FIFO Queues?
If you want to reject duplicated messages sent within a 5 minutes interval.
Yes.
If you want to control the order that the messages are received.
Yes.
If you want to control the order, but you want to receive duplicates.
Yes. Just set message_deduplication_id
to something unique (SecureRandom.uuid
) and SQS won’t reject as a duplicate.
Same applies if you want to reject duplicates and don’t want to control the order, just set the message_group_id
to something unique.
If you don’t care about the order and duplicates.
No. Go with standard queues.
If you plan to make more than 300 transactions per second.
Maybe. FIFO Queues are limited to 300 transactions per second (TPS). If you can split the transactions across multiple queues avoiding the limit of 300 TPS per queue, you are good, otherwise, you better go with standard queues.
If you want per-message delays
No. FIFO queues don’t support per-message delays, only per-queue delays.