Implement Redis® Queue Mechanism in Node.js
Introduction
In Redis®, a job queue is an ordered list of tasks that run through a background scheduler. The Job queuing model improves the application's performance by providing a non-blocking user experience. For instance, consider a long-running application that converts images to PDF documents. The job queuing model allows users to submit their requests in a single call without waiting for the application to process the data. In the background, another sub-process (or worker) processes the jobs and informs the user when the task is complete.
This article explains how to implement a job queuing model with Redis® and Node.js on a Ubuntu 22.04 server. The sample gaming application in this article records user scores to a MySQL database using the Redis® LPUSH
and BLPOP
functions.
Prerequisites
Before you begin:
- Deploy a Ubuntu 22.04 server on Vultr.
- Use SSH to access the server and Create a non-root
sudo
user and install:
Set up a Project Directory and Create a MySQL Database
Create a new
project
directory.$ mkdir project
Switch to the new
project
directory.$ cd project
Log in to the MySQL server.
$ sudo mysql
Create a sample
gaming_shop
database.mysql> CREATE DATABASE gaming_shop;
Create a new
gaming_shop_user
database user.mysql> CREATE USER 'gaming_shop_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
Grant the user full privileges to the
gaming_shop
database. ReplaceEXAMPLE_PASSWORD
with a strong password.mysql> GRANT ALL PRIVILEGES ON gaming_shop.* TO 'gaming_shop_user'@'localhost';
Refresh MySQL privileges to apply the user rights.
mysql> FLUSH PRIVILEGES;
Switch to the
gaming_shop
database.mysql> USE gaming_shop;
Create a new
scores
table.mysql> CREATE TABLE user_scores ( score_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, username VARCHAR(20), score INTEGER ) ENGINE = InnoDB;
In the above table,
score_id
is aPRIMARY KEY
that uniquely identifies user scores in the table.AUTO_INCREMENT
instructs MySQL to assign newscore_ids
to new records automatically. Thecreated_at
column uses a MySQLTIMESTAMP
keyword to record the time when the application records a new entry. Theusername
column uniquely identifies the users, andscore
records the number of points a user makes in a sample gaming application.Exit the MySQL console.
mysql> QUIT;
2. Create a mysql_db.js
File
In this section, create a separate MySQL module that connects to the MYSQL database and runs a query to insert data in the user_scores
table as described in the steps below.
Using a text editor such as
Nano
. Open a newmysql_db.js
file.$ nano mysql_db.js
Add the following configurations to the file. Replace
EXAMPLE_PASSWORD
with thegaming_shop_user
password you created earlier.class mysql_db { executeQuery(userData, callBack) { const mysql = require('mysql'); const db_con = mysql.createConnection({ host: "localhost", user: "gaming_shop_user", password: "EXAMPLE_PASSWORD", database: "gaming_shop" }); db_con.connect(function(err) { if (err) { console.log(err.message); } }); var params = []; params.push(userData["username"]); params.push(userData["score"]); var sql = "insert into user_scores (username, score) values (?, ?)" db_con.query(sql, params, function (err, result) { if (err) { callBack(err, null); } else { callBack(null, "Success"); } }); } } module.exports = mysql_db;
Save and close the file.
Below is what the file functions represent:
executeQuery(userData, callBack) {...}
in themysql_db
class module accepts two arguments. TheuserData
argument accepts a JSON payload that contains ausername
and ascore
for a user. ThecallBack
argument takes a function that runs after theexecuteQuery(...)
function executes.const mysql = require('mysql');
includes the MySQL driver for Node.js into the project. This adapter connects to the MySQL database from Node.js using themysql.createConnection(...)
anddb_con.connect(...)
functions.db_con.query(sql, params,...)
submits user scores to the database by executing theinsert into user_scores (username, score) values (?, ?)
SQL query.
3. Create an index.js
File
In order for the application to accept data through HTTP, create an HTTP server that runs on the local port 8080
. Through the port, the application receives incoming HTTP requests containing a JSON payload with user scores as described in this section.
Create a new
index.js
file.$ nano index.js
Add the following configurations to the file.
const redis = require('redis'); const http = require('http'); const hostname = 'localhost'; const port = 8080; const server = http.createServer(httpHandler); server.listen(port, hostname, () => { console.log(`The HTTP Server is running at http://${hostname}:${port}/`); }); function httpHandler(req, res) { var json_payload = ""; req.on('data', function (data) { json_payload += data; }); req.on('end', function () { var response = ''; if (req.method == "POST") { const client = redis.createClient(); client.connect(); client.LPUSH("user_scores", JSON.stringify(json_payload)); response = "You've added a new job to the queue.\r\n"; } else { response = "Method not allowed.\r\n"; } res.write(response); res.end(); }); }
Save and close the file.
Below is what the statements do:
const redis = require('redis');
imports the Redis® driver for Node.js. This driver uses theconst client = redis.createClient()
andclient.connect()
functions to connect to the Redis® server.const http = require('http');
imports HTTP functions into the application and allows you to establish and run an HTTP server using theconst server = http.createServer(httpHandler);
andserver.listen(port, hostname, (...);
functions.httpHandler(req, res)
creates a handler function for the HTTP server that processes the HTTPPOST
method (if (req.method == "POST") {...}
).client.LPUSH(...)
pushes a JSON payload containing user scores to theuser_scores
Redis® list. TheLPUSH
command adds a new job to the queue.- The
res.write(response);
statement writes a response to the calling HTTP client.
4. Create a job_processor
File
In a job queuing model, you must create a background scheduler that connects the Redis® server to check for new jobs. The scheduler in this section dequeues the jobs from the list and saves them permanently to the MySQL database. Create the scheduler through the following steps.
Create a new
job_processor.js
file.$ nano job_processor.js
Add the following configurations to the file.
const mysql_db = require('./mysql_db.js'); const redis = require('redis'); const client = redis.createClient(); client.connect(); function callBack(err, result) { if (err) { console.log(err.message); } else { console.log("Data added to database. \r\n"); } } function process_queue() { client.BLPOP('user_scores', 2).then((res) => { if (res != null) { var redisQueueData = JSON.parse(res['element']); userData = JSON.parse(redisQueueData); var dg = new mysql_db(); dg.executeQuery(userData, callBack); } process_queue(); }); } process_queue();
Save and close the file.
Below is what the configuration does:
const mysql_db = require('./mysql_db.js');
imports and uses themysql_db
module you created earlier.const redis = require('redis');
imports the Redis® server driver and establishes a new Redis® connection using the...redis.createClient()
, andclient.connect();
functions.- The application processes
callBack(err, result) {...}
after executing thedg.executeQuery(userData, callBack);
function. - Within the
process_queue()
recursive function, the application uses the Redis®BLPOP(...)
function to dequeue tasks from theuser_scores
list. TheBLPOP()
function removes and gets the first element from a list. After parsing the data using theJSON.parse(redisQueueData)
function, the application pushes data to the MySQL database using thevar dg = new mysql_db();
anddg.executeQuery(userData, callBack);
functions.
5. Test the Application
After creating all the necessary Node.js source code files, test the application as described below.
Update the
npm
package.$ sudo npm install npm -g
Initialize the project.
$ npm init -yes
Install the
redis
adapter for Node.js.$ npm install redis
Install the
mysql
adapter.$ npm install mysql
Run the application's start-up file in the background.
$ node index.js &
Output:
The HTTP Server is running at http://localhost:8080/
Switch to the project directory.
$ cd project
Run the
job_processor.js
file.$ node job_processor.js
In a new terminal session, establish another
SSH
connection to the server.$ ssh user@SERVER-IP
Run the following
curl
commands to queue sample jobs to the Redis® server.$ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"john_doe","score":"429"}' $ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"mary_jane","score":"897"}' $ curl -X POST http://127.0.0.1:8080 -H 'Content-Type: application/json' -d '{"username":"ian_job","score":"678"}'
Output:
... You've added a new job to the queue.
In your main terminal session that runs the
job_processor.js
file. Verify that the following output displays.... Data added to database.
Log in to the MySQL console.
$ sudo mysql
Switch to the
gaming_shop
database.mysql> USE gaming_shop;
View the
user_scores
table data to verify that new data is added.mysql> SELECT score_id, created_at, username, score FROM user_scores;
If your output looks like the one below. The job-queuing model is working as expected.
+----------+---------------------+-----------+-------+ | score_id | created_at | username | score | +----------+---------------------+-----------+-------+ | 1 | 2023-07-06 13:12:02 | john_doe | 429 | | 2 | 2023-07-06 13:12:13 | mary_jane | 897 | | 3 | 2023-07-06 13:12:22 | ian_job | 678 | +----------+---------------------+-----------+-------+ 3 rows in set (0.00 sec)
Conclusion
In this article, you implemented a Redis® queueing model with Node.js on Ubuntu 20.04. You created a sample MySQL database, and set up Node.js modules that use the Redis® LPUSH
and BLPOP
functions to queue, and process data from a Redis® list.
Next Steps
For more Redis® use cases, visit the following resources.