Implementing Message Queuing with Golang, Redis®, and MySQL 8 on Linux Server

Updated on June 22, 2024
Implementing Message Queuing with Golang, Redis®, and MySQL 8 on Linux Server header image

Introduction

Message queuing is a micro-service architecture that allows you to move data between different applications for further processing. In this model, end-users send data to your web application. Then, your system queues this data to a message broker like Redis® Server. In the end, you run one or several worker processes to work on the queued jobs.

Unlike in a publish/subscribe model usually referred to as pub/sub, each job in the message queuing architecture must be processed by only one worker and deleted from the queue when completed. In simple terms, the message queuing strategy allows you to have several workers processing the job list but there is a blocking function that eliminates any chances for duplicate processing.

Real-life examples that implement the message queueing logic include online payment processing, order fulfilments, server intercommunications, and more. Because the message queuing protocol uses in-memory databases like Redis®, it works pretty well on systems that require high availability, scalability, and real-time responses to improve user experience.

In this tutorial, you'll implement the message queuing protocol on a payment processing application with Golang, Redis®, and MySQL 8 database on your Linux server.

Prerequisites

To follow along with this tutorial, you require the following.

1. Create a MySQL Database and a User Account

In this sample payment processing application, you'll create an HTTP endpoint that listens to requests for payments and then RPushes them to a Redis® queue. Then, you'll run another backend script that continuously BLPops the queue to process and save the payments to a MySQL database.

  1. Connect to your server via SSH. Then, log in to MySQL as root.

     $ sudo mysql -u root -p
  2. Enter the root password for the MySQL server and press Enter to proceed. Then, issue the commands below to create a web_payments database and a web_payments_user account. Replace EXAMPLE_PASSWORD with a strong value.

     mysql> CREATE DATABASE web_payments;
            CREATE USER 'web_payments_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
            GRANT ALL PRIVILEGES ON web_payments.* TO 'web_payments_user'@'localhost';
            FLUSH PRIVILEGES;
  3. Switch to the new web_payments database.

     mysql> USE web_payments;
  4. Create a payments table. A Redis® worker script will automatically populate this table that gets payments details from a queue.

     mysql> CREATE TABLE payments (
                payment_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
                payment_date DATETIME,
                first_name VARCHAR(50),
                last_name VARCHAR(50),
                payment_mode VARCHAR(255),
                payment_ref_no VARCHAR (255),
                amount DECIMAL(17,4)
            ) ENGINE = InnoDB;
  5. Log out from the MySQL server.

     mysql> QUIT;

2. Create a Directory Structure for the Project

Your application requires the following directory structure to avoid any collisions with your Linux file systems.

    payment_gateway
        --queueu
            --main.go
        --worker
            --main.go
  1. Begin by creating the payment_gateway directory under your home directory.

     $ mkdir ~/payment_gateway
  2. Switch to the new directory.

     $ cd ~/payment_gateway
  3. Create the queue and worker sub-directories under payment_gateway.

     $ mkdir queue
     $ mkdir worker
  4. Your directory structure is now ready, and you'll create subsequent Golang source code files under it.

3. Create a Message Queuing Script

In this step, you'll create a message queueing script to listen for incoming payment requests and send them directly to a Redis® queue.

  1. Navigate to the ~/payment_gateway/queue directory.

     $ cd ~/payment_gateway/queue
  2. Use nano to create and open a new main.go file.

     $ nano main.go
  3. Enter the following information into the main.go file.

     package main
    
     import (
         "net/http"
         "github.com/go-redis/redis"
         "context"  
         "bytes" 
         "fmt"  
     )    
    
     func main() {
         http.HandleFunc("/payments", paymentsHandler)        
         http.ListenAndServe(":8080", nil)
     }
    
     func paymentsHandler(w http.ResponseWriter, req *http.Request) {
    
         redisClient := redis.NewClient(&redis.Options{
             Addr: "localhost:6379",
             Password: "",
             DB: 0,
         })
    
         ctx := context.TODO()
    
         buf := new(bytes.Buffer)
    
         // Include a Validation logic here to sanitize the req.Body when working in a production environment
    
         buf.ReadFrom(req.Body)
    
         paymentDetails := buf.String()
    
         err := redisClient.RPush(ctx, "payments", paymentDetails).Err(); 
    
         if err != nil {
             fmt.Fprintf(w, err.Error() + "\r\n")             
         } else {
             fmt.Fprintf(w, "Payment details accepted successfully\r\n")  
         }                         
    
     }
  4. Save and close the main.go file.

  5. In the above file, you're listening for incoming payments requests from the URL /payments on port 8080. Then, you're redirecting the payments' details to the paymentsHandler(...) function that opens a connection to the Redis® server on port 6379. You're then queuing the payment details using the Redis® RPush command under the payments key.

4. Create a Message Worker Script

In this step, you'll create a message worker script that implements the Redis® BLPOP command to retrieve, process, and dequeue(delete/remove to avoid duplicate processing) the payment details logged under the payments key.

  1. Navigate to the ~/payment_gateway/worker directory.

     $ cd ~/payment_gateway/worker
  2. Next, create a main.go file.

     $ nano main.go
  3. Enter the following information into the main.go file. Replace EXAMPLE_PASSWORD with the correct password you used for the web_payments_user account under the web_payments database.

     package main
    
     import (
     "github.com/go-redis/redis"
         _"github.com/go-sql-driver/mysql"
         "database/sql"
         "encoding/json"
         "context"
         "fmt"  
         "strings"
         "strconv" 
         "time"
     )
    
     func main() {   
    
         ctx := context.TODO()
    
         redisClient := redis.NewClient(&redis.Options{
             Addr: "localhost:6379",
             Password: "",
             DB: 0,
         })
    
         for {
    
             result, err := redisClient.BLPop(ctx, 0 * time.Second, "payments").Result()
    
             if err != nil {
                 fmt.Println(err.Error())
             } else {
    
                params := map[string]interface{}{}    
    
                err := json.NewDecoder(strings.NewReader(string(result[1]))).Decode(&params)
    
                if err != nil {
                    fmt.Println(err.Error())
                } else {                       
    
                    paymentId, err := savePayment(params)
    
                    if err != nil {
                        fmt.Println(err.Error())
                    } else {
                       fmt.Println("Payment # "+ strconv.FormatInt(paymentId, 10) + " processed successfully.\r\n")
                    }
                }
    
             }
         }
     }
    
     func savePayment (params map[string]interface{}) (int64, error) {
    
         db, err := sql.Open("mysql", "web_payments_user:EXAMPLE_PASSWORD@tcp(127.0.0.1:3306)/web_payments") 
    
         if err != nil {
             return 0, err
         }
    
         defer db.Close()
    
         queryString := `insert into payments (
                             payment_date,
                             first_name,
                             last_name,
                             payment_mode,
                             payment_ref_no,
                             amount
                         ) values (
                             ?,
                             ?,
                             ?,
                             ?,
                             ?,
                             ?
                         )`
    
         stmt, err := db.Prepare(queryString) 
    
         if err != nil {
             return 0, err       
         }
    
         defer stmt.Close() 
    
         paymentDate  := time.Now().Format("2006-01-02 15:04:05")
         firstName    := params["first_name"]
         lastName     := params["last_name"]
         paymentMode  := params["payment_mode"]
         paymentRefNo := params["payment_ref_no"]
         amount       := params["amount"]     
    
         res, err := stmt.Exec(paymentDate, firstName, lastName, paymentMode, paymentRefNo, amount)  
    
         if err != nil {
             return 0, err
         }
    
         paymentId, err := res.LastInsertId()
    
         if err != nil {
             return 0, err
         }
    
         return paymentId, nil
     }
  4. Save and close the main.go file when you're through with editing.

  5. In the above file, you're connecting to the Redis® server and using the statement redisClient.BLPop(ctx, 0 * time.Second, "payments").Result() to retrieve and remove the payment details from the queue.

  6. Then, you're sending the payment details to the MySQL database via the savePayment(params) function, which returns the paymentId for each successful payment that you insert into the payments table.

5. Test the Golang/Redis® Message Queuing Application

Your message queuing application is now ready for testing.

  1. Download the packages you've used in this payment processing project.

     $ go get github.com/go-sql-driver/mysql
     $ go get github.com/go-redis/redis
  2. Navigate to the ~/payment_gateway/queue directory, and run the Redis® queue script, which runs Golang's inbuilt web server and allows your application to listen for incoming payment requests on port 8080.

     $ cd ~/payment_gateway/queue
     $ go run ./
  3. Next, SSH to your server on a second terminal window, navigate to the ~/payment_gateway/worker directory, and run the Redis® worker script.

     $ cd ~/payment_gateway/worker 
     $ go run ./
  4. Your application is now listening for incoming payments' requests.

  5. Connect to your server on a third terminal window and use curl to send the following sample payments to your application one by one.

     $ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "JOHN", "last_name": "DOE", "payment_mode": "CASH", "payment_ref_no": "-", "amount" : 5000.25}'
     $ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "MARY", "last_name": "SMITH", "payment_mode": "CHEQUE", "payment_ref_no": "985", "amount" : 985.65}'
     $ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "ANN", "last_name": "JACOBS", "payment_mode": "PAYPAL", "payment_ref_no": "ABC-XYZ", "amount" : 15.25}'
  6. You should get the response below as you run each curl command above.

     Payment details accepted successfully
     ...
  7. After submitting each payment, your second terminal window that runs the worker script should display the following output. This means the script is dequeuing the jobs and processing the payments successfully.

     Payment # 1 processed successfully.
    
     Payment # 2 processed successfully.
    
     Payment # 3 processed successfully.
    
     ...
  8. The next step is verifying whether the payments reflect in your database. Still, on your third terminal window, log in to your MySQL server as root.

     $ sudo mysql -u root -p
  9. Enter your MySQL server root password and press Enter to proceed. Next, switch to the web_payments database.

     mysql> USE web_payments;
  10. Query the payments table.

    mysql> SELECT
               payment_id,
               payment_date,
               first_name,
               last_name,
               payment_mode,
               payment_ref_no,
               amount
           FROM payments;
  11. You should now get the following records from your payments table as processed by your Redis® worker script.

    +------------+---------------------+------------+-----------+--------------+----------------+-----------+
    | payment_id | payment_date        | first_name | last_name | payment_mode | payment_ref_no | amount    |
    +------------+---------------------+------------+-----------+--------------+----------------+-----------+
    |          1 | 2021-12-01 09:48:32 | JOHN       | DOE       | CASH         | -              | 5000.2500 |
    |          2 | 2021-12-01 09:48:42 | MARY       | SMITH     | CHEQUE       | 985            |  985.6500 |
    |          3 | 2021-12-01 09:48:55 | ANN        | JACOBS    | PAYPAL       | ABC-XYZ        |   15.2500 |
    +------------+---------------------+------------+-----------+--------------+----------------+-----------+
    3 rows in set (0.00 sec)
  12. The output above confirms that your message queuing application is working as expected.

Conclusion

In this tutorial, you've implemented a message queuing application with Golang, Redis®, and MySQL 8 on Linux Server. You've used the Redis® RPush and BLPop functions to create a payment processing platform that de-couples payment logging and processing to enhance the reliability and scalability of your application.

To check out more Golang tutorials, visit the following links: