Implement Redis Queue Mechanism in Node.js

Updated on July 22, 2023
Implement Redis Queue Mechanism in Node.js header image

Introduction

In Redis, a job queue is an ordered list of tasks that run through a background scheduler. The Job queuing model improves the application's performance by providing a non-blocking user experience. For instance, consider a long-running application that converts images to PDF documents. The job queuing model allows users to submit their requests in a single call without waiting for the application to process the data. In the background, another sub-process (or worker) processes the jobs and informs the user when the task is complete.

This article explains how to implement a job queuing model with Redis and Node.js on a Ubuntu 22.04 server. The sample gaming application in this article records user scores to a MySQL database using the Redis LPUSH and BLPOP functions.

Prerequisites

Before you begin:

Set up a Project Directory and Create a MySQL Database

  1. Create a new project directory.

     $ mkdir project
  2. Switch to the new project directory.

     $ cd project
  3. Log in to the MySQL server.

     $ sudo mysql
  4. Create a sample gaming_shop database.

     mysql> CREATE DATABASE gaming_shop;
  5. Create a new gaming_shop_user database user.

     mysql>  CREATE USER 'gaming_shop_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
  6. Grant the user full privileges to the gaming_shop database. Replace EXAMPLE_PASSWORD with a strong password.

     mysql>  GRANT ALL PRIVILEGES ON gaming_shop.* TO 'gaming_shop_user'@'localhost';           
  7. Refresh MySQL privileges to apply the user rights.

     mysql> FLUSH PRIVILEGES;
  8. Switch to the gaming_shop database.

     mysql> USE gaming_shop;
  9. Create a new scores table.

     mysql> CREATE TABLE user_scores (
                 score_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
                 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                 username VARCHAR(20),
                 score  INTEGER   
             ) ENGINE = InnoDB;

    In the above table, score_id is a PRIMARY KEY that uniquely identifies user scores in the table. AUTO_INCREMENT instructs MySQL to assign new score_ids to new records automatically. The created_at column uses a MySQL TIMESTAMP keyword to record the time when the application records a new entry. The username column uniquely identifies the users, and score records the number of points a user makes in a sample gaming application.

  10. Exit the MySQL console.

     mysql> QUIT;

2. Create a mysql_db.js File

In this section, create a separate MySQL module that connects to the MYSQL database and runs a query to insert data in the user_scores table as described in the steps below.

  1. Using a text editor such as Nano. Open a new mysql_db.js file.

     $ nano mysql_db.js
  2. Add the following configurations to the file. Replace EXAMPLE_PASSWORD with the gaming_shop_user password you created earlier.

     class mysql_db { 
    
         executeQuery(userData, callBack) { 
    
             const mysql = require('mysql');  
    
             const db_con = mysql.createConnection({
                 host:     "localhost",
                 user:     "gaming_shop_user",
                 password: "EXAMPLE_PASSWORD",
                 database: "gaming_shop"
              }); 
    
             db_con.connect(function(err) {
                 if (err) {               
                     console.log(err.message);             
                 }                    
             }); 
    
            var params = [];
    
            params.push(userData["username"]);  
            params.push(userData["score"]);  
    
            var sql = "insert into user_scores (username, score) values (?, ?)"   
    
            db_con.query(sql, params, function (err, result) {
    
                if (err) {
                    callBack(err, null);                   
                } else {
                    callBack(null, "Success");          
                }         
           }); 
    
         }      
    
    
     }
    
     module.exports = mysql_db;

    Save and close the file.

    Below is what the file functions represent:

    • executeQuery(userData, callBack) {...} in the mysql_db class module accepts two arguments. The userData argument accepts a JSON payload that contains a username and a score for a user. The callBack argument takes a function that runs after the executeQuery(...) function executes.
    • const mysql = require('mysql'); includes the MySQL driver for Node.js into the project. This adapter connects to the MySQL database from Node.js using the mysql.createConnection(...) and db_con.connect(...) functions.
    • db_con.query(sql, params,...) submits user scores to the database by executing the insert into user_scores (username, score) values (?, ?) SQL query.

3. Create an index.js File

In order for the application to accept data through HTTP, create an HTTP server that runs on the local port 8080. Through the port, the application receives incoming HTTP requests containing a JSON payload with user scores as described in this section.

  1. Create a new index.js file.

     $ nano index.js
  2. Add the following configurations to the file.

     const redis  = require('redis');    
    
     const http     = require('http');
     const hostname = 'localhost';
     const port     = 8080;
    
     const server = http.createServer(httpHandler);
    
     server.listen(port, hostname, () => {
         console.log(`The HTTP Server is running at http://${hostname}:${port}/`);
     });
    
    
     function httpHandler(req, res) {
    
         var json_payload = ""; 
    
         req.on('data', function (data) {
             json_payload += data;
         }); 
    
         req.on('end', function () {
    
             var response = '';
    
             if (req.method == "POST") {  
    
                 const client = redis.createClient();
                 client.connect();         
    
                 client.LPUSH("user_scores", JSON.stringify(json_payload));
    
                 response = "You've added a new job to the queue.\r\n";
    
             } else {
    
                 response = "Method not allowed.\r\n";
             } 
    
             res.write(response);
    
             res.end();
        });
     }

    Save and close the file.

    Below is what the statements do:

    • const redis = require('redis'); imports the Redis driver for Node.js. This driver uses the const client = redis.createClient() and client.connect() functions to connect to the Redis server.
    • const http = require('http'); imports HTTP functions into the application and allows you to establish and run an HTTP server using the const server = http.createServer(httpHandler); and server.listen(port, hostname, (...); functions.
    • httpHandler(req, res) creates a handler function for the HTTP server that processes the HTTP POST method (if (req.method == "POST") {...}).
    • client.LPUSH(...) pushes a JSON payload containing user scores to the user_scores Redis list. The LPUSH command adds a new job to the queue.
    • The res.write(response); statement writes a response to the calling HTTP client.

4. Create a job_processor File

In a job queuing model, you must create a background scheduler that connects the Redis server to check for new jobs. The scheduler in this section dequeues the jobs from the list and saves them permanently to the MySQL database. Create the scheduler through the following steps.

  1. Create a new job_processor.js file.

     $ nano job_processor.js
  2. Add the following configurations to the file.

     const mysql_db = require('./mysql_db.js');
     const redis  = require('redis');
     const client = redis.createClient();
    
     client.connect();
    
     function callBack(err, result) {
         if (err) { 
             console.log(err.message);
         } else {
             console.log("Data added to database. \r\n");
         }
     }
    
     function process_queue() {
         client.BLPOP('user_scores', 2).then((res) => {
    
              if (res != null) {    
                  var redisQueueData = JSON.parse(res['element']);
                  userData = JSON.parse(redisQueueData);
                  var dg = new mysql_db(); 
                  dg.executeQuery(userData, callBack);  
              }       
    
              process_queue();
    
         });
     }
    
     process_queue();

    Save and close the file.

    Below is what the configuration does:

    • const mysql_db = require('./mysql_db.js'); imports and uses the mysql_db module you created earlier.
    • const redis = require('redis'); imports the Redis server driver and establishes a new Redis connection using the ...redis.createClient(), and client.connect(); functions.
    • The application processes callBack(err, result) {...} after executing the dg.executeQuery(userData, callBack); function.
    • Within the process_queue() recursive function, the application uses the Redis BLPOP(...) function to dequeue tasks from the user_scores list. The BLPOP() function removes and gets the first element from a list. After parsing the data using the JSON.parse(redisQueueData) function, the application pushes data to the MySQL database using the var dg = new mysql_db(); and dg.executeQuery(userData, callBack); functions.

5. Test the Application

After creating all the necessary Node.js source code files, test the application as described below.

  1. Update the npm package.

     $ sudo npm install npm -g
  2. Initialize the project.

     $ npm init -yes 
  3. Install the redis adapter for Node.js.

     $ npm install redis
  4. Install the mysql adapter.

     $ npm install mysql
  5. Run the application's start-up file in the background.

     $ node index.js &

    Output:

     The HTTP Server is running at http://localhost:8080/
  6. Switch to the project directory.

     $ cd project
  7. Run the job_processor.js file.

     $ node job_processor.js
  8. In a new terminal session, establish another SSH connection to the server.

     $ ssh user@SERVER-IP
  9. Run the following curl commands to queue sample jobs to the Redis server.

     $ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"john_doe","score":"429"}'
    
     $ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"mary_jane","score":"897"}'
    
     $ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"ian_job","score":"678"}'

    Output:

     ...
     You've added a new job to the queue.
  10. In your main terminal session that runs the job_processor.js file. Verify that the following output displays.

     ...
     Data added to database.
  11. Log in to the MySQL console.

     $ sudo mysql
  12. Switch to the gaming_shop database.

     mysql> USE gaming_shop;
  13. View the user_scores table data to verify that new data is added.

     mysql> SELECT
                score_id,
                created_at,
                username,
                score
            FROM user_scores;

    If your output looks like the one below. The job-queuing model is working as expected.

     +----------+---------------------+-----------+-------+
     | score_id | created_at          | username  | score |
     +----------+---------------------+-----------+-------+
     |        1 | 2023-07-06 13:12:02 | john_doe  |   429 |
     |        2 | 2023-07-06 13:12:13 | mary_jane |   897 |
     |        3 | 2023-07-06 13:12:22 | ian_job   |   678 |
     +----------+---------------------+-----------+-------+
     3 rows in set (0.00 sec)

Conclusion

In this article, you implemented a Redis queueing model with Node.js on Ubuntu 20.04. You created a sample MySQL database, and set up Node.js modules that use the Redis LPUSH and BLPOP functions to queue, and process data from a Redis list.

Next Steps

For more Redis use cases, visit the following resources.