How to Implement an Event Processing Model with Go, Redis, and MySQL 8

Updated on December 15, 2021
How to Implement an Event Processing Model with Go, Redis, and MySQL 8 header image

Introduction

An event in computer programming represents a change of state. Common examples include a subscriber submitting registration information to your application, a hardware sensor reporting a spike of temperature in a room, a request to validate a payment, a call to a customer service department, and more. When events occur in your application, you must track and analyze them as soon as they occur. This is called event processing.

In today's world, you'll encounter different situations where you must integrate event processing in your application. The main reason you need this technology is scalability and enhancing the user experience through real-time stream processing.

One of the most efficient ways for processing events is using the publish/subscribe model. In this software design architecture, you simply create a publishing script that channels events as they occur to different subscribers through a broker such as the Redis server.

For instance, if you expect thousands of signups in your online subscription service, you can use Redis to publish the information to a signups channel. Then, under the hood, you can use several Redis subscribers (event processing scripts) to save the information to a MySQL database, send confirmation emails to customers, and process credit card payments.

In this guide, you'll learn how to implement the event processing model with Golang, Redis, and MySQL 8 on your Linux server.

Prerequisites

To complete this tutorial, you require:

1. Set Up a Sample Database and a User Account

In this sample application, you'll capture customers' signup events as they occur in your application using a Redis server. However, to store data permanently, you'll need MySQL. So, SSH to your server and follow the steps below to create a database.

  1. Log in to your MySQL server as root.

     $ sudo mysql -u root -p
  2. Enter the root password for your MySQL server and press Enter to proceed when prompted. Then, issue the SQL commands below to set up a new sample_store database and a sample_store_user account. Replace EXAMPLE_PASSWORD with a strong value.

     mysql> CREATE DATABASE sample_store;
            CREATE USER 'sample_store_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
            GRANT ALL PRIVILEGES ON sample_store.* TO 'sample_store_user'@'localhost';
            FLUSH PRIVILEGES;
  3. Next, switch to the new sample_store database.

     mysql> USE sample_store;
  4. Then, create a customers table. This table stores customers' information including their customer_ids, first_names, and last_names.

     mysql> CREATE TABLE customers (
                customer_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
                first_name VARCHAR(50),
                last_name VARCHAR(50),
                email_address VARCHAR(255)
            ) ENGINE = InnoDB;
  5. Populate the customers table with sample data.

     mysql> INSERT INTO customers (first_name, last_name, email_address) VALUES ('JOHN', 'DOE', 'john_doe@example.com');
            INSERT INTO customers (first_name, last_name, email_address) VALUES ('JIM', 'JADE', 'jim_jade@example.com');
            INSERT INTO customers (first_name, last_name, email_address) VALUES ('MARY', 'MARK', 'mary_mark@example.com');
  6. Query the customers table to ensure the records are in place.

          mysql> SELECT
                     customer_id,
                     first_name,
                     last_name,
                     email_address
                 FROM customers;

    Output.

          +-------------+------------+-----------+-----------------------+
          | customer_id | first_name | last_name | email_address         |
          +-------------+------------+-----------+-----------------------+
          |           1 | JOHN       | DOE       | john_doe@example.com  |
          |           2 | JIM        | JADE      | jim_jade@example.com  |
          |           3 | MARY       | MARK      | mary_mark@example.com |
          +-------------+------------+-----------+-----------------------+
          3 rows in set (0.00 sec)   
  7. Create a packages table. This table stores different packages that customers can choose when subscribing to your services in your sample company.

     mysql> CREATE TABLE packages (
                package_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
                package_name VARCHAR(50),
                monthly_rate DOUBLE       
            ) ENGINE = InnoDB;      
  8. Populate the packages table with sample data.

     mysql> INSERT INTO packages (package_name, monthly_rate) VALUES ('BASIC PACKAGE', 5);
            INSERT INTO packages (package_name, monthly_rate) VALUES ('ADVANCED PACKAGE', 15);
            INSERT INTO packages (package_name, monthly_rate) VALUES ('PREMIUM PACKAGE', 50);
  9. Query the packages table to make sure you've populated it.

          mysql> SELECT
                     package_id,
                     package_name,
                     monthly_rate
                 FROM packages;

    Output.

          +------------+------------------+--------------+
          | package_id | package_name     | monthly_rate |
          +------------+------------------+--------------+
          |          1 | BASIC PACKAGE    |            5 |
          |          2 | ADVANCED PACKAGE |           15 |
          |          3 | PREMIUM PACKAGE  |           50 |
          +------------+------------------+--------------+
          3 rows in set (0.00 sec)
  10. Next, create a subscriptions table. This table creates a many-to-many relationship between the customers and packages tables. In simple terms, it shows you the services that customers have subscribed to.

    mysql> CREATE TABLE subscriptions (
               subscription_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
               customer_id DOUBLE,
               package_id BIGINT,
               subscription_date DATETIME       
           ) ENGINE = InnoDB;   
  11. Don't populate the subscriptions table for now. When customer events for subscribing to your services arrive in your application through an HTTP call, you'll capture the data in a Redis channel. Then, you will create an independent Redis script that subscribes to the channel and insert the data into the MySQL database. This will de-couple your system's events and the data processing logic to enhance scalability.

  12. Log out from the MySQL server.

    mysql> QUIT;

2. Create a Directory Structure For your Application

In this Golang event processing application, you'll need a frontend and a backend script. The frontend script acts as a web server and listens for incoming JSON signup requests. The script publishes the customers' signup information to a Redis signups channel. On the other hand, the backend script subscribes to the signups channel to listen and save events' details permanently to your database.

When completed, the directory structure for your applications will have the following levels.

    project
        --frontend
          --main.go
        --backend
          --event_processor.go
  1. Begin by creating a project directory under your home directory.

     $ mkdir ~/project
  2. Next, navigate to the new project directory.

     $ cd ~/project
  3. Make frontend and backend sub-directories under project.

     $ mkdir ~/project/frontend
     $ mkdir ~/project/backend
  4. You now have the correct directory structure for your event streaming project.

3. Create the main.go File

The main.go file is the main entry point for your application. This script runs the Golang inbuilt web server and accepts signup requests submitted as JSON payload.

  1. To create the main.go file, first navigate to the ~/project/frontend directory.

     $ cd ~/project/frontend
  2. Then, use nano to open a new main.go file.

     $ nano main.go
  3. Enter the following information into the file.

     package main
    
     import (
         "fmt"
         "net/http"
         "github.com/go-redis/redis"
         "context"  
         "bytes"   
     )    
    
     func main() {
         http.HandleFunc("/signup", signupHandler)        
         http.ListenAndServe(":8080", nil)
     }
    
     func signupHandler(w http.ResponseWriter, req *http.Request) {
    
         redisClient := redis.NewClient(&redis.Options{
         Addr: "localhost:6379",
         Password: "",
         DB: 0,
         })
    
         ctx := context.TODO()
    
         buf := new(bytes.Buffer)
         buf.ReadFrom(req.Body)
         reqBody := buf.String()
    
         err := redisClient.Publish(ctx, "signups", reqBody).Err(); 
    
         if err != nil {
             fmt.Fprintf(w, err.Error() + "\r\n")             
         } else {
             fmt.Fprintf(w, "Success\r\n")  
         }                         
    
     }
  4. Save and close the main.go file when you're through with editing.

  5. In the above file, you're accepting signup information via the /signup resource. Then, you're redirecting the HTTP request to your signupHandler function that connects to your Redis server on port 6379. You're then using the statement err := redisClient.Publish(ctx, "signups", reqBody).Err(); to publish the signup information to a Redis channel that you've named signups.

4. Create an event_processor.go File

The main.go file you've created in the previous step doesn't directly interact with your database. Instead, it simply creates an unprocessed event on the Redis server in a shared channel that you've named signups.

To process this information, you'll create an event_processor.go file that passes signup data to your database.

  1. Navigate to the ~/project/backend directory.

     $ cd ~/project/backend
  2. Next, open a new event_processor.go file for editing purposes.

     $ nano event_processor.go
  3. Enter the following information into the event_processor.go file.

     package main
    
     import (
     "github.com/go-redis/redis"
         _"github.com/go-sql-driver/mysql"
         "database/sql"
         "encoding/json"
         "context"
         "fmt"  
         "strings" 
         "time"
     )
    
     func main() {   
    
         ctx := context.TODO()
    
         redisClient := redis.NewClient(&redis.Options{
         Addr: "localhost:6379",
         Password: "",
         DB: 0,
         })
    
         subscriber := redisClient.Subscribe(ctx, "signups")
    
         for {
    
             msg, err := subscriber.ReceiveMessage(ctx)
    
             if err != nil {
                 fmt.Println(err.Error())
             } else {
    
                params := map[string]interface{}{}    
    
                err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(&params)
    
                if err != nil {
                    fmt.Println(err.Error())
                } else {                       
    
                    err = createSubscription(params)
    
                    if err != nil {
                        fmt.Println(err.Error())
                    } else {
                       fmt.Println("Processed subscription for customer # " + fmt.Sprint(params["customer_id"]) + "\r\n...")
                    }
                }
    
             }
         }
     }
    
     func createSubscription (params map[string]interface{}) error {
    
         db, err := sql.Open("mysql", "sample_store_user:EXAMPLE_PASSWORD@tcp(127.0.0.1:3306)/sample_store") 
    
         if err != nil {
             return err
         }
    
         defer db.Close()
    
         queryString := `insert into subscriptions (
                              customer_id, 
                              package_id, 
                              subscription_date
                          ) values (
                              ?,
                              ?,
                              ?
                          )`
    
         stmt, err := db.Prepare(queryString) 
    
         if err != nil {
             return err           
         }
    
         defer stmt.Close() 
    
         dt := time.Now()
         subscriptionDate := dt.Format("2006-01-02 15:04:05")   
    
         _, err = stmt.Exec(params["customer_id"], params["package_id"], subscriptionDate)  
    
         if err != nil {
             return err
         }
    
         return nil
     }
  4. Save and close the file when you're through with editing.

  5. In the above file, you have a main function that opens a connection to your Redis server. Then, you're using the statement subscriber := redisClient.Subscribe(ctx, "signups") to subscribe to the signups channel. You're then using the Golang for {...} blocking loop to listen for incoming events. Next, you redirect the customers' signup events to a createSubscription function which saves data to your MySQL database under the subscriptions table.

  6. Your frontend and backend scripts are now ready to accept new signups.

5. Test the Golang Event Processing Application

Once you've finalized coding the frontend and backend scripts for your event processing application, you'll now run them to test the functionalities.

  1. Before that, download the packages you've used in your scripts from GitHub.

     $ go get github.com/go-sql-driver/mysql
     $ go get github.com/go-redis/redis  
  2. Next, navigate to the backend directory and run the main.go file. This command has a blocking function, and you should not run any other command on this SSH session.

     $ cd ~/project/backend
     $ go run ./
  3. Open a second terminal window, navigate to the ~/project/frontend directory, and run the event_processor.go file. This causes your application to listen on port 8080. Don't enter any other command on this terminal.

     $ cd ~/project/frontend   
     $ go run ./
  4. Next, open a third terminal window and run the following curl commands to signup three customers.

     $ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 1, "package_id": 2}'
     $ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 2, "package_id": 3}'
     $ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 3, "package_id": 2}'

    Output.

     ...
     Success
  5. After running the commands above, your backend scripts prints the following output to confirm everything is working as expected.

    Output.

     Processed subscription for customer # 1
     ...
     Processed subscription for customer # 2
     ...
     Processed subscription for customer # 3
     ...
  6. Still on your third terminal window, log in to your MySQL database server to confirm if your scripts are successfully streaming and processing events via the Redis server.

     $ sudo mysql -u root -p
  7. Enter your root password and press Enter to proceed. Then, switch to the sample_store database.

     mysql> USE sample_store;
  8. Query the subscriptions table to check if you can stream and process the new signup entries.

     mysql> SELECT
                customer_id,
                package_id,
                subscription_date
            FROM subscriptions;
  9. You should now get the following output which confirms your application is working as expected.

     +-------------+------------+---------------------+
     | customer_id | package_id | subscription_date   |
     +-------------+------------+---------------------+
     |           1 |          2 | 2021-11-30 09:57:24 |
     |           2 |          3 | 2021-11-30 09:58:22 |
     |           3 |          2 | 2021-11-30 09:58:32 |
     +-------------+------------+---------------------+
     3 rows in set (0.00 sec)

Conclusion

In this guide, you've implemented event streaming with Golang, MySQL 8, and Redis server on your Linux machine. Use the knowledge in this guide to capture and process your system events as they occur, enhance scalability, and increase responsiveness for your complex multi-step applications.

Visit the following resources to read more Golang tutorials: