Implement Worker Queues for Background Job Processing using Vultr Managed Database for Caching
Introduction
Redis® is an open-source, in-memory data structure store. Redis® Lists are collections of string elements sorted by insertion order. They offer a range of commands that enable various operations such as push, pop, and traversal from both ends of the list, along with the ability to access or remove elements by indexing or value.
Redis® List blocking operations enable it to be used as a message queue. This allows a client to wait for a message to be available if the list is empty, hence facilitating asynchronous task processing. In this article, you will implement Worker Queues for Background Job Processing for applications using a Vultr Managed Database for Caching.
Prerequisites
Before you begin:
Deploy a Vultr Managed Database for Caching.
When deployed and ready, copy the database connection string to connect to the Redis® database as below:
rediss://default:[DATABASE_PASSWORD]@[DATABASE_HOST]:[DATABASE_PORT]
Deploy a Ubuntu 22.04 management server.
Using SSH, access the server as a non-root sudo user and install:
The latest stable GO programming language.
$ sudo snap install go --classic
The Redis CLI tool.
$ sudo apt-get install redis
Redis® List commands
In this section, explore the Redis® List commands, and implement them in your Redis® database as described in the steps below:
To implement the commands, use redis-cli
to access your Vultr Managed Database for Caching using a connection string as below.
$ redis-cli -u rediss://default:[DATABASE_PASSWORD]@[DATABASE_HOST]:[DATABASE_PORT]
Replace DATABASE_PASSWORD
, DATABASE_HOST
, and DATABASE_PORT
with your actual Redis® database values.
When connected, your terminal prompt should change to the Redis® Shell as below:
>
LPUSH
The LPUSH
command inserts all specified values at the head of the list stored at the key.
> LPUSH students "Sam" "Alex"
Output:
(integer) 2
The above command adds two students, Sam and Alex to the list.
RPUSH
RPUSH
inserts specified values at the tail of the list stored at the key.
> RPUSH students "Lily" "John"
Output:
(integer) 4
The command adds two more students, Lily and John to the list. This turns the total number of students to 4
.
LRANGE
LRANGE
gets a range of elements from a list as below.
> LRANGE students 0 -1
Output:
1) "Alex"
2) "Sam"
3) "Lily"
4) "John"
LLEN
LLEN
returns the length of the list stored at the key.
> LLEN students
Output:
(integer) 4
As displayed in the output, the total number of students in the list is 4.
LPOP
LPOP
removes and returns the first element of the list stored at the key.
> LPOP students
Output:
"Alex"
As displayed in the output, the name Alex
which was the first student entry in the list is now removed.
RPOP
RPOP
removes and returns the last element of the list stored at the key.
> RPOP students
Output:
"John"
As per the output, the last student in the list John
is now removed.
LINDEX
LINDEX
returns the element at index position in the list stored at the key.
> LINDEX students 0
Output:
"Sam"
As per the output, Sam
is the first student in the list.
LREM
LREM
removes the first count occurrences of elements equal to a value from the list stored at the key.
> LREM students 1 "Sam"
Output:
(integer) 1
The above output shows that Sam
is removed from the list, and the command affects 1 entry.
Redis® as a Worker Queue
Redis® lists support blocking operations (such as BLPOP
and BRPOP
). This makes it possible to wait for a specified time duration if there is no item in the list. When an item is added to the list (for example, by another application using the LPUSH
method), it's retrieved and processed. This allows you to decouple applications and build asynchronous, event-driven solutions.
This is also known as the worker queue pattern, where:
- A Redis® List acts as a queue.
- The producer applications add items to this queue.
- The worker applications (consumers) retrieve and process them.
In this section, you will implement a sample application that uses Redis® as a worker queue. The application retrieves and processes notification tasks from a Redis® List as described in the steps below.
Initialize the Project
Create a directory.
$ mkdir redis-notification-processor
Switch to the directory.
$ cd redis-notification-processor
Create a new Go module:
$ go mod init redis-notification-processor
The above command creates a new
go.mod
file in the directory.To correctly use your Redis® connection, add your Vultr Managed Database for Caching connection string to the
REDIS_URL
variable.$ export REDIS_URL=rediss://default:[DATABASE_PASSWORD]@[DATABASE_HOST]:[DATABASE_PORT]
Replace
DATABASE_PASSWORD
,DATABASE_HOST
, andDATABASE_PORT
with your actual database values.
Create a Consumer Application
Using a text editor such as
Nano
, create a new fileconsumer.go
.$ nano consumer.go
Add the following contents to the file.
package main import ( "context" "crypto/tls" "errors" "fmt" "log" "os" "strings" "time" "github.com/redis/go-redis/v9" ) const listName = "jobs" var client *redis.Client func init() { redisURL := os.Getenv("REDIS_URL") if redisURL == "" { log.Fatal("missing environment variable REDIS_URL") } opt, err := redis.ParseURL(redisURL) if err != nil { log.Fatal("invalid redis url", err) } opt.TLSConfig = &tls.Config{} client = redis.NewClient(opt) _, err = client.Ping(context.Background()).Result() if err != nil { log.Fatal("ping failed. could not connect", err) } fmt.Println("successfully connected to redis") } func main() { fmt.Println("consumer application is ready") for { val, err := client.BRPop(context.Background(), 2*time.Second, listName).Result() if err != nil { if errors.Is(err, redis.Nil) { continue } log.Println("brpop issue", err) } job := val[1] info := strings.Split(job, ",") fmt.Println("received notification job", info) fmt.Printf("sending notification '%s' to %s\n", info[1], info[0]) } }
Save and close the file.
In the above configuration:
- The
init
function reads the Vultr Managed Database for Caching URL from theREDIS_URL
environment variable and fails if it's missing. - Then, it creates a new
redis.Client
instance and verifies connectivity to the Vultr Managed Database for Caching using thePing
utility. If connectivity fails, the program exists with an error message. - The
main
function runs an infinite loop and:- Continuously tries to use
BRPop
to get a message from thejobs
list with a timeout of 2 seconds. This function acts as a blocking operation where the client waits for a message to be available in the list, if the list is empty. - When it receives a message, it splits it using a comma as a delimiter (the message is assumed to be in the format of "notification, message"), and prints out the notification.
- Continuously tries to use
- The
Fetch the Go module dependencies for the program.
$ go get
Run the program.
$ go run consumer.go
Your output should look like the one below:
successfully connected to redis consumer application is ready
In a new terminal session, establish a connection to your Redis® Database and add a new item to the list to simulate a new job.
> LPUSH jobs 'user1@foo.com,application installed'
Output:
(integer) 1
Add another item to the list.
> LPUSH jobs 'user2@foo.com,monthly fee due tomorrow'
Output:
(integer) 1
Add another item to the list.
> LPUSH jobs 'user3@foo.com,please upgrade for faster service'
Output:
(integer) 1
Navigate to your first terminal session, and verify that your output looks like the one below:
received notification job [user1@foo.com application installed] sending notification 'application installed' to user1@foo.com received notification job [user2@foo.com monthly fee due tomorrow] sending notification 'monthly fee due tomorrow' to user2@foo.com received notification job [user3@foo.com please upgrade for faster service] sending notification 'please upgrade for faster service' to user3@foo.com
To stop the application, press
CTRL + C
on your keyboard to send a kill signal.
Reliable Processing with Redis® Lists
In the previous application, the consumer process used BRPop
method to get the job information required for it to send notifications. The process of using BRPop
(or BLPop
) removes the job item from the list.
In a scenario where the job processor fails for some reason. The notification job should not be lost if the consumer process fails. By this, the application needs to be reliable and fault-tolerant. The BLMOVE
command makes this possible by first moving the item from the original list to another list which reduces the possibility of data loss.
BLMOVE
works in an atomic manner, and if it fails, the item remains in the original list, and you can retry the process. When processed, the items can be deleted from the destination list. You can have a monitoring application in the form of another background application that checks the destination list for failed jobs.
To make its operation reliable and fault-tolerant, the consumer application:
- Uses
BLMove
to retrieve notification job information and simultaneously move it to another temporary list. - Processes the notification job:
- If the job finishes successfully, the entry from the temporary list is removed.
- If the job fails, it remains in the list where it can eventually be picked up by another monitoring job.
- When successful, it's moved back to the destination list. Else, the cycle repeats in a loop.
Reliable Consumer Application
Create a new file
reliable_consumer.go
.$ nano reliable_consumer.go
Add the following contents to the file.
package main import ( "context" "crypto/tls" "errors" "fmt" "log" "os" "strings" "time" "github.com/redis/go-redis/v9" ) const listName = "new_jobs" const emailTempQueueList = "jobs_temp" var client *redis.Client func init() { redisURL := os.Getenv("REDIS_URL") if redisURL == "" { log.Fatal("missing environment variable REDIS_URL") } opt, err := redis.ParseURL(redisURL) if err != nil { log.Fatal("invalid redis url", err) } opt.TLSConfig = &tls.Config{} client = redis.NewClient(opt) _, err = client.Ping(context.Background()).Result() if err != nil { log.Fatal("ping failed. could not connect", err) } fmt.Println("successfully connected to redis") } func main() { fmt.Println("reliable consumer application is ready") for { val, err := client.BLMove(context.Background(), listName, emailTempQueueList, "RIGHT", "LEFT", 2*time.Second).Result() if err != nil { if errors.Is(err, redis.Nil) { continue } fmt.Println("blmove issue", err) } job := val info := strings.Split(job, ",") fmt.Println("received notification job", info) fmt.Printf("sending notification '%s' to %s\n", info[1], info[0]) go func() { err = client.LRem(context.Background(), emailTempQueueList, 0, job).Err() if err != nil { log.Fatal("lrem failed for", job, "error", err) } fmt.Println("removed job from temporary list", job) }() } }
Save and close the file.
In the above code:
init
reads the Vultr Managed Database for Caching URL from theREDIS_URL
environment variable.- A new
redis.Client
instance is created to verify connectivity to the Vultr Managed Database for Caching. If the connection fails, the program exits with an error message. - The main function starts an infinite loop and does the following:
- Uses the
BLMove
command to move an element from thenew_jobs
list to thejobs_temp
list (a temporary list used for processing jobs) in blocking mode where it waits up to 2 seconds for an element to be available in the new_jobs list. - Checks whether there is an error in the
BLMove
operation, and when the Redis® list is empty (redis.Nil
), it continues the loop to try again. If the error is due to some other issue, it logs the error and proceeds. - When an element (job) is successfully moved to the
jobs_temp
list, it's split using a comma as a delimiter (the message is assumed to be in the format of "notification, message"), and prints out both the notification and message. - It launches a new goroutine to remove the processed job from the
jobs_temp
list.
- Uses the
Fetch the Go module dependencies for the program:
$ go get
Run the program.
$ go run reliable_consumer.go
Your output should look like the one below:
successfully connected to redis reliable consumer application is ready
In your Redis® connection terminal session, add a new item to the list to simulate a job.
> LPUSH new_jobs 'user1@foo.com,application installed'
Output:
(integer) 1
Add another item to the list:
> LPUSH new_jobs 'user2@foo.com,monthly fee due tomorrow'
Output:
(integer) 1
Navigate back to the application terminal session, and verify that your output looks like the one below.
received notification job [user1@foo.com application installed] sending notification 'application installed' to user1@foo.com removed job from temporary list user1@foo.com,application installed received notification job [user2@foo.com monthly fee due tomorrow] sending notification 'monthly fee due tomorrow' to user2@foo.com removed job from temporary list user2@foo.com,monthly fee due tomorrow
To stop the application, press
Ctrl + C
on your keyboard.
Conclusion
In this article, you implemented Redis® List operations using a Vultr Managed Database for Caching. Then, you used blocking list commands for asynchronous job processing, and also explored a reliable option to avoid data loss in case of consumer application failures. For more information about Redis® lists, visit the official documentation.