How to Implement an Event Processing Model with Go, Redis®, and MySQL 8
Introduction
An event in computer programming represents a change of state. Common examples include a subscriber submitting registration information to your application, a hardware sensor reporting a spike of temperature in a room, a request to validate a payment, a call to a customer service department, and more. When events occur in your application, you must track and analyze them as soon as they occur. This is called event processing.
In today's world, you'll encounter different situations where you must integrate event processing in your application. The main reason you need this technology is scalability and enhancing the user experience through real-time stream processing.
One of the most efficient ways for processing events is using the publish/subscribe model. In this software design architecture, you simply create a publishing script that channels events as they occur to different subscribers through a broker such as the Redis® server.
For instance, if you expect thousands of signups in your online subscription service, you can use Redis® to publish the information to a signups
channel. Then, under the hood, you can use several Redis® subscribers (event processing scripts) to save the information to a MySQL database, send confirmation emails to customers, and process credit card payments.
In this guide, you'll learn how to implement the event processing model with Golang, Redis®, and MySQL 8 on your Linux server.
Prerequisites
To complete this tutorial, you require:
- A Linux server.
- A non-root
sudo
user. - A MySQL database.
- A Redis® server.
- A Golang package.
1. Set Up a Sample Database and a User Account
In this sample application, you'll capture customers' signup events as they occur in your application using a Redis® server. However, to store data permanently, you'll need MySQL. So, SSH to your server and follow the steps below to create a database.
Log in to your MySQL server as
root
.$ sudo mysql -u root -p
Enter the
root
password for your MySQL server and press Enter to proceed when prompted. Then, issue the SQL commands below to set up a newsample_store
database and asample_store_user
account. ReplaceEXAMPLE_PASSWORD
with a strong value.mysql> CREATE DATABASE sample_store; CREATE USER 'sample_store_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD'; GRANT ALL PRIVILEGES ON sample_store.* TO 'sample_store_user'@'localhost'; FLUSH PRIVILEGES;
Next, switch to the new
sample_store
database.mysql> USE sample_store;
Then, create a
customers
table. This table stores customers' information including theircustomer_ids
,first_names
, andlast_names
.mysql> CREATE TABLE customers ( customer_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(50), last_name VARCHAR(50), email_address VARCHAR(255) ) ENGINE = InnoDB;
Populate the
customers
table with sample data.mysql> INSERT INTO customers (first_name, last_name, email_address) VALUES ('JOHN', 'DOE', 'john_doe@example.com'); INSERT INTO customers (first_name, last_name, email_address) VALUES ('JIM', 'JADE', 'jim_jade@example.com'); INSERT INTO customers (first_name, last_name, email_address) VALUES ('MARY', 'MARK', 'mary_mark@example.com');
Query the
customers
table to ensure the records are in place.mysql> SELECT customer_id, first_name, last_name, email_address FROM customers;
Output.
+-------------+------------+-----------+-----------------------+ | customer_id | first_name | last_name | email_address | +-------------+------------+-----------+-----------------------+ | 1 | JOHN | DOE | john_doe@example.com | | 2 | JIM | JADE | jim_jade@example.com | | 3 | MARY | MARK | mary_mark@example.com | +-------------+------------+-----------+-----------------------+ 3 rows in set (0.00 sec)
Create a
packages
table. This table stores different packages that customers can choose when subscribing to your services in your sample company.mysql> CREATE TABLE packages ( package_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, package_name VARCHAR(50), monthly_rate DOUBLE ) ENGINE = InnoDB;
Populate the
packages
table with sample data.mysql> INSERT INTO packages (package_name, monthly_rate) VALUES ('BASIC PACKAGE', 5); INSERT INTO packages (package_name, monthly_rate) VALUES ('ADVANCED PACKAGE', 15); INSERT INTO packages (package_name, monthly_rate) VALUES ('PREMIUM PACKAGE', 50);
Query the
packages
table to make sure you've populated it.mysql> SELECT package_id, package_name, monthly_rate FROM packages;
Output.
+------------+------------------+--------------+ | package_id | package_name | monthly_rate | +------------+------------------+--------------+ | 1 | BASIC PACKAGE | 5 | | 2 | ADVANCED PACKAGE | 15 | | 3 | PREMIUM PACKAGE | 50 | +------------+------------------+--------------+ 3 rows in set (0.00 sec)
Next, create a
subscriptions
table. This table creates a many-to-many relationship between thecustomers
andpackages
tables. In simple terms, it shows you the services that customers have subscribed to.mysql> CREATE TABLE subscriptions ( subscription_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, customer_id DOUBLE, package_id BIGINT, subscription_date DATETIME ) ENGINE = InnoDB;
Don't populate the
subscriptions
table for now. When customer events for subscribing to your services arrive in your application through an HTTP call, you'll capture the data in a Redis® channel. Then, you will create an independent Redis® script that subscribes to the channel and insert the data into the MySQL database. This will de-couple your system's events and the data processing logic to enhance scalability.Log out from the MySQL server.
mysql> QUIT;
2. Create a Directory Structure For your Application
In this Golang event processing application, you'll need a frontend and a backend script. The frontend script acts as a web server and listens for incoming JSON signup requests. The script publishes the customers' signup information to a Redis® signups
channel. On the other hand, the backend script subscribes to the signups
channel to listen and save events' details permanently to your database.
When completed, the directory structure for your applications will have the following levels.
project
--frontend
--main.go
--backend
--event_processor.go
Begin by creating a
project
directory under your home directory.$ mkdir ~/project
Next, navigate to the new
project
directory.$ cd ~/project
Make
frontend
andbackend
sub-directories underproject
.$ mkdir ~/project/frontend $ mkdir ~/project/backend
You now have the correct directory structure for your event streaming project.
3. Create the main.go
File
The main.go
file is the main entry point for your application. This script runs the Golang inbuilt web server and accepts signup requests submitted as JSON payload.
To create the
main.go
file, first navigate to the~/project/frontend
directory.$ cd ~/project/frontend
Then, use
nano
to open a newmain.go
file.$ nano main.go
Enter the following information into the file.
package main import ( "fmt" "net/http" "github.com/go-redis/redis" "context" "bytes" ) func main() { http.HandleFunc("/signup", signupHandler) http.ListenAndServe(":8080", nil) } func signupHandler(w http.ResponseWriter, req *http.Request) { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, }) ctx := context.TODO() buf := new(bytes.Buffer) buf.ReadFrom(req.Body) reqBody := buf.String() err := redisClient.Publish(ctx, "signups", reqBody).Err(); if err != nil { fmt.Fprintf(w, err.Error() + "\r\n") } else { fmt.Fprintf(w, "Success\r\n") } }
Save and close the
main.go
file when you're through with editing.In the above file, you're accepting signup information via the
/signup
resource. Then, you're redirecting the HTTP request to yoursignupHandler
function that connects to your Redis® server on port6379
. You're then using the statementerr := redisClient.Publish(ctx, "signups", reqBody).Err();
to publish the signup information to a Redis® channel that you've namedsignups
.
4. Create an event_processor.go
File
The main.go
file you've created in the previous step doesn't directly interact with your database. Instead, it simply creates an unprocessed event on the Redis® server in a shared channel that you've named signups
.
To process this information, you'll create an event_processor.go
file that passes signup data to your database.
Navigate to the
~/project/backend
directory.$ cd ~/project/backend
Next, open a new
event_processor.go
file for editing purposes.$ nano event_processor.go
Enter the following information into the
event_processor.go
file.package main import ( "github.com/go-redis/redis" _"github.com/go-sql-driver/mysql" "database/sql" "encoding/json" "context" "fmt" "strings" "time" ) func main() { ctx := context.TODO() redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, }) subscriber := redisClient.Subscribe(ctx, "signups") for { msg, err := subscriber.ReceiveMessage(ctx) if err != nil { fmt.Println(err.Error()) } else { params := map[string]interface{}{} err := json.NewDecoder(strings.NewReader(msg.Payload)).Decode(¶ms) if err != nil { fmt.Println(err.Error()) } else { err = createSubscription(params) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("Processed subscription for customer # " + fmt.Sprint(params["customer_id"]) + "\r\n...") } } } } } func createSubscription (params map[string]interface{}) error { db, err := sql.Open("mysql", "sample_store_user:EXAMPLE_PASSWORD@tcp(127.0.0.1:3306)/sample_store") if err != nil { return err } defer db.Close() queryString := `insert into subscriptions ( customer_id, package_id, subscription_date ) values ( ?, ?, ? )` stmt, err := db.Prepare(queryString) if err != nil { return err } defer stmt.Close() dt := time.Now() subscriptionDate := dt.Format("2006-01-02 15:04:05") _, err = stmt.Exec(params["customer_id"], params["package_id"], subscriptionDate) if err != nil { return err } return nil }
Save and close the file when you're through with editing.
In the above file, you have a
main
function that opens a connection to your Redis® server. Then, you're using the statementsubscriber := redisClient.Subscribe(ctx, "signups")
to subscribe to thesignups
channel. You're then using the Golangfor {...}
blocking loop to listen for incoming events. Next, you redirect the customers' signup events to acreateSubscription
function which saves data to your MySQL database under thesubscriptions
table.Your frontend and backend scripts are now ready to accept new signups.
5. Test the Golang Event Processing Application
Once you've finalized coding the frontend and backend scripts for your event processing application, you'll now run them to test the functionalities.
Before that, download the packages you've used in your scripts from GitHub.
$ go get github.com/go-sql-driver/mysql $ go get github.com/go-redis/redis
Next, navigate to the backend directory and run the
main.go
file. This command has a blocking function, and you should not run any other command on this SSH session.$ cd ~/project/backend $ go run ./
Open a second terminal window, navigate to the
~/project/frontend
directory, and run theevent_processor.go
file. This causes your application to listen on port8080
. Don't enter any other command on this terminal.$ cd ~/project/frontend $ go run ./
Next, open a third terminal window and run the following
curl
commands to signup three customers.$ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 1, "package_id": 2}' $ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 2, "package_id": 3}' $ curl -i -X POST localhost:8080/signup -H "Content-Type: application/json" -d '{"customer_id": 3, "package_id": 2}'
Output.
... Success
After running the commands above, your backend scripts prints the following output to confirm everything is working as expected.
Output.
Processed subscription for customer # 1 ... Processed subscription for customer # 2 ... Processed subscription for customer # 3 ...
Still on your third terminal window, log in to your MySQL database server to confirm if your scripts are successfully streaming and processing events via the Redis® server.
$ sudo mysql -u root -p
Enter your root password and press Enter to proceed. Then, switch to the
sample_store
database.mysql> USE sample_store;
Query the
subscriptions
table to check if you can stream and process the new signup entries.mysql> SELECT customer_id, package_id, subscription_date FROM subscriptions;
You should now get the following output which confirms your application is working as expected.
+-------------+------------+---------------------+ | customer_id | package_id | subscription_date | +-------------+------------+---------------------+ | 1 | 2 | 2021-11-30 09:57:24 | | 2 | 3 | 2021-11-30 09:58:22 | | 3 | 2 | 2021-11-30 09:58:32 | +-------------+------------+---------------------+ 3 rows in set (0.00 sec)
Conclusion
In this guide, you've implemented event streaming with Golang, MySQL 8, and Redis® server on your Linux machine. Use the knowledge in this guide to capture and process your system events as they occur, enhance scalability, and increase responsiveness for your complex multi-step applications.
Visit the following resources to read more Golang tutorials: