Implement Worker Queues for Background Job Processing using Vultr Managed Database for Caching

Updated on June 22, 2024
Implement Worker Queues for Background Job Processing using Vultr Managed Database for Caching header image

Introduction

Redis® is an open-source, in-memory data structure store. Redis® Lists are collections of string elements sorted by insertion order. They offer a range of commands that enable various operations such as push, pop, and traversal from both ends of the list, along with the ability to access or remove elements by indexing or value.

Redis® List blocking operations enable it to be used as a message queue. This allows a client to wait for a message to be available if the list is empty, hence facilitating asynchronous task processing. In this article, you will implement Worker Queues for Background Job Processing for applications using a Vultr Managed Database for Caching.

Prerequisites

Before you begin:

Redis® List commands

In this section, explore the Redis® List commands, and implement them in your Redis® database as described in the steps below:

To implement the commands, use redis-cli to access your Vultr Managed Database for Caching using a connection string as below.

$ redis-cli -u rediss://default:[DATABASE_PASSWORD]@[DATABASE_HOST]:[DATABASE_PORT]

Replace DATABASE_PASSWORD, DATABASE_HOST, and DATABASE_PORT with your actual Redis® database values.

When connected, your terminal prompt should change to the Redis® Shell as below:

>

LPUSH

The LPUSH command inserts all specified values at the head of the list stored at the key.

> LPUSH students "Sam" "Alex"

Output:

(integer) 2

The above command adds two students, Sam and Alex to the list.

RPUSH

RPUSH inserts specified values at the tail of the list stored at the key.

> RPUSH students "Lily" "John"

Output:

(integer) 4

The command adds two more students, Lily and John to the list. This turns the total number of students to 4.

LRANGE

LRANGE gets a range of elements from a list as below.

> LRANGE students 0 -1

Output:

1) "Alex"
2) "Sam"
3) "Lily"
4) "John"

LLEN

LLEN returns the length of the list stored at the key.

> LLEN students

Output:

(integer) 4

As displayed in the output, the total number of students in the list is 4.

LPOP

LPOP removes and returns the first element of the list stored at the key.

> LPOP students

Output:

"Alex"

As displayed in the output, the name Alex which was the first student entry in the list is now removed.

RPOP

RPOP removes and returns the last element of the list stored at the key.

> RPOP students

Output:

"John"

As per the output, the last student in the list John is now removed.

LINDEX

LINDEX returns the element at index position in the list stored at the key.

> LINDEX students 0

Output:

"Sam"

As per the output, Samis the first student in the list.

LREM

LREM removes the first count occurrences of elements equal to a value from the list stored at the key.

> LREM students 1 "Sam"

Output:

(integer) 1

The above output shows that Sam is removed from the list, and the command affects 1 entry.

Redis® as a Worker Queue

Redis® lists support blocking operations (such as BLPOP and BRPOP). This makes it possible to wait for a specified time duration if there is no item in the list. When an item is added to the list (for example, by another application using the LPUSH method), it's retrieved and processed. This allows you to decouple applications and build asynchronous, event-driven solutions.

This is also known as the worker queue pattern, where:

  • A Redis® List acts as a queue.
  • The producer applications add items to this queue.
  • The worker applications (consumers) retrieve and process them.

In this section, you will implement a sample application that uses Redis® as a worker queue. The application retrieves and processes notification tasks from a Redis® List as described in the steps below.

Initialize the Project

  1. Create a directory.

     $ mkdir redis-notification-processor
  2. Switch to the directory.

     $ cd redis-notification-processor
  3. Create a new Go module:

     $ go mod init redis-notification-processor

    The above command creates a new go.mod file in the directory.

  4. To correctly use your Redis® connection, add your Vultr Managed Database for Caching connection string to the REDIS_URL variable.

     $ export REDIS_URL=rediss://default:[DATABASE_PASSWORD]@[DATABASE_HOST]:[DATABASE_PORT]

    Replace DATABASE_PASSWORD, DATABASE_HOST, and DATABASE_PORT with your actual database values.

Create a Consumer Application

  1. Using a text editor such as Nano, create a new file consumer.go.

     $ nano consumer.go
  2. Add the following contents to the file.

     package main
    
     import (
        "context"
        "crypto/tls"
        "errors"
        "fmt"
        "log"
        "os"
        "strings"
        "time"
    
        "github.com/redis/go-redis/v9"
     )
    
     const listName = "jobs"
    
     var client *redis.Client
    
     func init() {
    
        redisURL := os.Getenv("REDIS_URL")
        if redisURL == "" {
            log.Fatal("missing environment variable REDIS_URL")
        }
    
        opt, err := redis.ParseURL(redisURL)
        if err != nil {
            log.Fatal("invalid redis url", err)
        }
    
        opt.TLSConfig = &tls.Config{}
    
        client = redis.NewClient(opt)
    
        _, err = client.Ping(context.Background()).Result()
    
        if err != nil {
            log.Fatal("ping failed. could not connect", err)
        }
    
        fmt.Println("successfully connected to redis")
     }
    
     func main() {
    
        fmt.Println("consumer application is ready")
        for {
            val, err := client.BRPop(context.Background(), 2*time.Second, listName).Result()
            if err != nil {
                if errors.Is(err, redis.Nil) {
                    continue
                }
                log.Println("brpop issue", err)
            }
    
            job := val[1]
            info := strings.Split(job, ",")
    
            fmt.Println("received notification job", info)
    
            fmt.Printf("sending notification '%s' to %s\n", info[1], info[0])
        }
     }

    Save and close the file.

    In the above configuration:

    • The init function reads the Vultr Managed Database for Caching URL from the REDIS_URL environment variable and fails if it's missing.
    • Then, it creates a new redis.Client instance and verifies connectivity to the Vultr Managed Database for Caching using the Ping utility. If connectivity fails, the program exists with an error message.
    • The main function runs an infinite loop and:
      • Continuously tries to use BRPop to get a message from the jobs list with a timeout of 2 seconds. This function acts as a blocking operation where the client waits for a message to be available in the list, if the list is empty.
      • When it receives a message, it splits it using a comma as a delimiter (the message is assumed to be in the format of "notification, message"), and prints out the notification.
  3. Fetch the Go module dependencies for the program.

     $ go get
  4. Run the program.

     $ go run consumer.go

    Your output should look like the one below:

     successfully connected to redis
     consumer application is ready
  5. In a new terminal session, establish a connection to your Redis® Database and add a new item to the list to simulate a new job.

     > LPUSH jobs 'user1@foo.com,application installed'

    Output:

     (integer) 1
  6. Add another item to the list.

     > LPUSH jobs 'user2@foo.com,monthly fee due tomorrow'

    Output:

     (integer) 1
  7. Add another item to the list.

     > LPUSH jobs 'user3@foo.com,please upgrade for faster service'

    Output:

     (integer) 1
  8. Navigate to your first terminal session, and verify that your output looks like the one below:

     received notification job [user1@foo.com application installed]
     sending notification 'application installed' to user1@foo.com
     received notification job [user2@foo.com monthly fee due tomorrow]
     sending notification 'monthly fee due tomorrow' to user2@foo.com
     received notification job [user3@foo.com please upgrade for faster service]
     sending notification 'please upgrade for faster service' to user3@foo.com
  9. To stop the application, press CTRL + C on your keyboard to send a kill signal.

Reliable Processing with Redis® Lists

In the previous application, the consumer process used BRPop method to get the job information required for it to send notifications. The process of using BRPop (or BLPop) removes the job item from the list.

In a scenario where the job processor fails for some reason. The notification job should not be lost if the consumer process fails. By this, the application needs to be reliable and fault-tolerant. The BLMOVE command makes this possible by first moving the item from the original list to another list which reduces the possibility of data loss.

BLMOVE works in an atomic manner, and if it fails, the item remains in the original list, and you can retry the process. When processed, the items can be deleted from the destination list. You can have a monitoring application in the form of another background application that checks the destination list for failed jobs.

To make its operation reliable and fault-tolerant, the consumer application:

  • Uses BLMove to retrieve notification job information and simultaneously move it to another temporary list.
  • Processes the notification job:
    • If the job finishes successfully, the entry from the temporary list is removed.
    • If the job fails, it remains in the list where it can eventually be picked up by another monitoring job.
    • When successful, it's moved back to the destination list. Else, the cycle repeats in a loop.

Reliable Consumer Application

  1. Create a new file reliable_consumer.go.

     $ nano reliable_consumer.go
  2. Add the following contents to the file.

     package main
    
     import (
        "context"
        "crypto/tls"
        "errors"
        "fmt"
        "log"
        "os"
        "strings"
        "time"
    
        "github.com/redis/go-redis/v9"
     )
    
     const listName = "new_jobs"
     const emailTempQueueList = "jobs_temp"
    
     var client *redis.Client
    
     func init() {
    
        redisURL := os.Getenv("REDIS_URL")
        if redisURL == "" {
            log.Fatal("missing environment variable REDIS_URL")
        }
    
        opt, err := redis.ParseURL(redisURL)
        if err != nil {
            log.Fatal("invalid redis url", err)
        }
    
        opt.TLSConfig = &tls.Config{}
    
        client = redis.NewClient(opt)
    
        _, err = client.Ping(context.Background()).Result()
    
        if err != nil {
            log.Fatal("ping failed. could not connect", err)
        }
    
        fmt.Println("successfully connected to redis")
     }
    
     func main() {
    
        fmt.Println("reliable consumer application is ready")
    
        for {
    
            val, err := client.BLMove(context.Background(), listName, emailTempQueueList, "RIGHT", "LEFT", 2*time.Second).Result()
    
            if err != nil {
                if errors.Is(err, redis.Nil) {
                    continue
                }
                fmt.Println("blmove issue", err)
            }
    
            job := val
            info := strings.Split(job, ",")
    
            fmt.Println("received notification job", info)
    
            fmt.Printf("sending notification '%s' to %s\n", info[1], info[0])
    
            go func() {
                err = client.LRem(context.Background(), emailTempQueueList, 0, job).Err()
                if err != nil {
                    log.Fatal("lrem failed for", job, "error", err)
                }
                fmt.Println("removed job from temporary list", job)
            }()
        }
    
     }

    Save and close the file.

    In the above code:

    • init reads the Vultr Managed Database for Caching URL from the REDIS_URL environment variable.
    • A new redis.Client instance is created to verify connectivity to the Vultr Managed Database for Caching. If the connection fails, the program exits with an error message.
    • The main function starts an infinite loop and does the following:
      • Uses the BLMove command to move an element from the new_jobs list to the jobs_temp list (a temporary list used for processing jobs) in blocking mode where it waits up to 2 seconds for an element to be available in the new_jobs list.
      • Checks whether there is an error in the BLMove operation, and when the Redis® list is empty (redis.Nil), it continues the loop to try again. If the error is due to some other issue, it logs the error and proceeds.
      • When an element (job) is successfully moved to the jobs_temp list, it's split using a comma as a delimiter (the message is assumed to be in the format of "notification, message"), and prints out both the notification and message.
      • It launches a new goroutine to remove the processed job from the jobs_temp list.
  3. Fetch the Go module dependencies for the program:

     $ go get
  4. Run the program.

     $ go run reliable_consumer.go

    Your output should look like the one below:

     successfully connected to redis
     reliable consumer application is ready
  5. In your Redis® connection terminal session, add a new item to the list to simulate a job.

     > LPUSH new_jobs 'user1@foo.com,application installed'

    Output:

     (integer) 1
  6. Add another item to the list:

     > LPUSH new_jobs 'user2@foo.com,monthly fee due tomorrow'

    Output:

     (integer) 1
  7. Navigate back to the application terminal session, and verify that your output looks like the one below.

     received notification job [user1@foo.com application installed]
     sending notification 'application installed' to user1@foo.com
     removed job from temporary list user1@foo.com,application installed
     received notification job [user2@foo.com monthly fee due tomorrow]
     sending notification 'monthly fee due tomorrow' to user2@foo.com
     removed job from temporary list user2@foo.com,monthly fee due tomorrow
  8. To stop the application, press Ctrl + C on your keyboard.

Conclusion

In this article, you implemented Redis® List operations using a Vultr Managed Database for Caching. Then, you used blocking list commands for asynchronous job processing, and also explored a reliable option to avoid data loss in case of consumer application failures. For more information about Redis® lists, visit the official documentation.