How to Use MongoDB Change Streams with the Go Driver
Introduction
MongoDB is a NoSQL database that stores data in documents comprised of field-value pairs. These are stored using BSON, a binary representation of JSON. A MongoDB document is similar to a row in a relational database table, and the document fields (key-value pairs) are similar to columns. A MongoDB document is part of a collection, and one or more collections are part of a database. MongoDB API supports regular CRUD operations (create, read, update, delete) along with aggregation, geospatial queries, and text search capabilities. MongoDB also provides high availability and replication. You can distribute data across multiple MongoDB servers (called Replica sets) to provide redundancy and sharding.
This article will walk you through how to set up a MongoDB replica set in Docker, deploy client applications and explore change streams with a complete example. It will cover the following:
- An overview of MongoDB and change streams.
- How to use the MongoDB Go driver.
- Application setup and deployment
- Testing the application and understanding how to use resume tokens.
MongoDB provides official support for many client drivers, including Java, Python, Node.js, PHP, Ruby, Swift, C, C++, C#, and Go. In this article, you will be using the Go driver for MongoDB to build a change streams processor and a REST API application.
MongoDB Change streams
With Change streams, you can access MongoDB data updates in real-time. Applications do not need to be concerned about dealing with oplog and low-level operational details of change streams. You can either subscribe to all data changes on a collection, a database, or an entire deployment. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications. It's possible to use change streams for replica sets (this is what you will be using in this article) and a sharded setup.
Change streams only return the difference of fields during an update operation (this is the default behavior). But you can configure it to return the latest committed version of the updated document. You can also provide an array of one or more pipeline stages to control change stream output. For example - $addFields
, $match
, $project
, and so on.
Resume token
Resume tokens allow applications to save progress and protect against potential data loss. If a change streams application crashes, it will not be able to detect database changes during this period. A resume token can be used to continue the processing from where the application stopped (before the crash) and receive all the past events. Change streams can help build decoupled and scalable architectures and make it easier to implement Extract-Transform- Load (ETL), notification, synchronization services, and so on.
Application Overview
The following applications have been demonstrated in this article:
- Change Streams listener: It subscribes to the change events stream of a MongoDB Collection using the Watch API. As soon as a document is created or updated, this application is notified in real-time. It also leverages Resume Tokens to continue processing from a specific point in time after restarts or crashes.
- REST API: It exposes a single HTTP endpoint to create a document in MongoDB.
Prerequisites
- Use a local Linux workstation, or deploy a Vultr cloud server as a workstation.
- Make sure Docker is installed on your workstation. You will need it to build and run Docker images.
- Install curl, a popular command-line HTTP client on the workstation.
- Install a recent version of the Go programming language (version 1.18 or higher) on your workstation.
Prepare MongoDB Change Streams application
Initialize the project
Create a directory and switch to it:
mkdir mongo-change-streams
cd mongo-change-streams
Create a new Go module:
go mod init mongo-change-streams
This will create a new
go.mod
file
Create a new file main.go
:
touch main.go
Import libraries
To import required Go modules, add the following to main.go
file:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
In addition to the Go standard library packages, you will import the following packages from MongoDB Go driver:
go.mongodb.org/mongo-driver/mongo
- Provides core MongoDB functionalities.go.mongodb.org/mongo-driver/bson
- Package bson is a library for reading, writing, and manipulating BSON.go.mongodb.org/mongo-driver/mongo/options
- Package options defines the optional configurations for the MongoDB Go Driver.
Add the init
function
Add the code below to main.go
file:
var mongoConnectString string
var mongoDatabase string
var mongoCollection string
const msgFormat = "export RESUME_TOKEN=%s"
func init() {
mongoConnectString = os.Getenv("MONGODB_URI")
if mongoConnectString == "" {
log.Fatal("missing environment variable", "MONGODB_URI")
}
mongoDatabase = os.Getenv("MONGODB_DATABASE")
if mongoDatabase == "" {
log.Fatal("missing environment variable", "MONGODB_DATABASE")
}
mongoCollection = os.Getenv("MONGODB_COLLECTION")
if mongoCollection == "" {
log.Fatal("missing environment variable", "MONGODB_COLLECTION")
}
}
The init
function retrieves the MongoDB connection string, collection name, and database name from the MONGODB_URI
, MONGODB_COLLECTION
, and MONGODB_DATABASE
environment variables, respectively.
Add the main
function
Add the main
function to main.go
file:
func main() {
client, err := mongo.NewClient(options.Client().ApplyURI(mongoConnectString))
if err != nil {
log.Fatal("failed to create mongo client", err)
}
fmt.Println("created client object")
ctx, cancel := context.WithCancel(context.Background())
err = client.Connect(ctx)
if err != nil {
log.Fatal("failed to connect to mongo", err)
}
fmt.Println("connected to mongodb")
coll := client.Database(mongoDatabase).Collection(mongoCollection)
defer func() {
err = client.Disconnect(context.Background())
if err != nil {
fmt.Println("failed to close mongo connection")
}
}()
match := bson.D{{"$match", bson.D{{"operationType", bson.D{{"$in", bson.A{"insert", "update", "replace" }}}}}}}
project := bson.D{{"$project", bson.M{"_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1}}}
pipeline := mongo.Pipeline{match, project}
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
tokenFromEnv := os.Getenv("RESUME_TOKEN")
if tokenFromEnv != "" {
fmt.Println("resume token in enviroment variable", tokenFromEnv)
t := bson.M{"_data": tokenFromEnv}
opts.SetResumeAfter(t)
fmt.Println("set resume token to watch client")
}
cs, err := coll.Watch(ctx, pipeline, opts)
if err != nil {
log.Fatal("failed to start change stream watch: ", err)
}
fmt.Println("watch established")
defer func() {
fmt.Println("resume token ", cs.ResumeToken().String())
fmt.Println("use resume token in the next run with following command -", fmt.Sprintf(msgFormat, cs.ResumeToken().Lookup("_data").StringValue()))
close, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := cs.Close(close)
if err != nil {
fmt.Println("failed to close change stream")
}
fmt.Println("closed change stream")
}()
go func() {
fmt.Println("started change stream...")
for cs.Next(ctx) {
re := cs.Current.Index(1)
fmt.Println("change stream event" + re.Value().String())
}
}()
exit := make(chan os.Signal, 1)
signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
fmt.Println("waiting for program exit signal")
<-exit
fmt.Println("program exit initiated")
cancel()
}
- Use
mongo.NewClient
to create a new*mongo.Client
object. - Initiate connection to MongoDB using Connect.
- Invoke Database to get a
mongo.Database
and then invoke Collection to get a handle to themongo.Collection
. - A defer function is used to disconnect and close the
*mongo.Client
object at the end of the program. - A
mongo.Pipeline
is created using match and project stages. - If a resume token is provided using the
RESUME_TOKEN
environment variable, it's used to configure the change streams watch client. - Get a
mongo.ChangeStream
using Watch. As a part of the program exit process, another defer function is used to provide the resume token information and close the change stream. - The change stream listener is started as a goroutine. It uses Next to listen to the change stream event, uses
Current.Index
to get the document, and logs the event to the console. - Finally, set up a Go channel to get notified on program interrupts and exit cleanly.
Prepare MongoDB REST API application
Create a new file api.go
:
touch api.go
Import libraries
To import required Go modules, add the following to api.go
file:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"strconv"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
In addition to the Go standard library packages, we import the following packages from MongoDB Go driver:
go.mongodb.org/mongo-driver/mongo
- Provides core MongoDB functionalities.go.mongodb.org/mongo-driver/mongo/options
- Package options defines the optional configurations for the MongoDB Go Driver.
Add the init
function
Add the code below to api.go
file:
var coll *mongo.Collection
var mongoConnectString string
var mongoDatabase string
var mongoCollection string
func init() {
mongoConnectString = os.Getenv("MONGODB_URI")
if mongoConnectString == "" {
log.Fatal("missing environment variable", "MONGODB_URI")
}
mongoDatabase = os.Getenv("MONGODB_DATABASE")
if mongoDatabase == "" {
log.Fatal("missing environment variable", "MONGODB_DATABASE")
}
mongoCollection = os.Getenv("MONGODB_COLLECTION")
if mongoCollection == "" {
log.Fatal("missing environment variable", "MONGODB_COLLECTION")
}
client, err := mongo.NewClient(options.Client().ApplyURI(mongoConnectString))
if err != nil {
log.Fatal("failed to create mongo client", err)
}
fmt.Println("created mongo client object")
err = client.Connect(context.Background())
if err != nil {
log.Fatal("failed to connect to mongo", err)
}
fmt.Println("connected to mongo")
coll = client.Database(mongoDatabase).Collection(mongoCollection)
}
The init
function retrieves the MongoDB connection string, collection name, and database name from the MONGODB_URI
, MONGODB_COLLECTION
, and MONGODB_DATABASE
environment variables, respectively. It invokes the Database method to get a mongo.Database
and then calls Collection to get a handle to the mongo.Collection
.
Add the main
function
Add the main
function to api.go
file:
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
res, err := coll.InsertOne(context.Background(), map[string]string{"user": "user-" + strconv.Itoa(int(time.Now().Unix()))})
if err != nil {
log.Fatal("mongo insert failed", err)
}
fmt.Println("created record", res.InsertedID)
})
fmt.Println("started http server...")
log.Fatal(http.ListenAndServe(":8080", nil))
}
The main function registers an HTTP handler to insert a record into the MongoDB collection on invocation. The HTTP server is started on port 8080.
Prepare Docker images
Add Dockerfiles
Create a file named Dockerfile.change_stream_app
:
touch Dockerfile.change_stream_app
Enter the below contents in Dockerfile.change_stream_app
:
FROM golang:1.18-buster AS build
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY main.go ./
RUN go build -o /mongodb-app
FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /mongodb-app /mongodb-app
EXPOSE 8080
USER nonroot:nonroot
ENTRYPOINT ["/mongodb-app"]
- This is a multi-stage
Dockerfile
that usesgolang:1.18-buster
as the base image of the first stage. - Copy the application files, run
go mod download
, and build the application binary. - In the second stage,
gcr.io/distroless/base-debian10
is used as the base image. - The binary is copied from the first stage, and the
ENTRYPOINT
is configured to run the application.
Create a file named Dockerfile.rest_api
:
touch Dockerfile.rest_api
Enter the below contents in Dockerfile.rest_api
:
FROM golang:1.18-buster AS build
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY api.go ./
RUN go build -o /mongodb-api
FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /mongodb-api /mongodb-api
EXPOSE 8080
USER nonroot:nonroot
ENTRYPOINT ["/mongodb-api"]
- This is a multi-stage
Dockerfile
that usesgolang:1.18-buster
as the base image of the first stage. - Copy the application files, run
go mod download
, and build the application binary. - In the second stage,
gcr.io/distroless/base-debian10
is used as the base image. - The binary is copied from the first stage, and the
ENTRYPOINT
is configured to run the application.
Build Docker images
Pull Go modules:
go mod tidy
Build Docker image for change streams application:
docker build -t mongo-change-streams -f Dockerfile.change_stream_app .
Build Docker image for REST API application:
docker build -t mongo-rest-api -f Dockerfile.rest_api .
Start MongoDB cluster in Docker
Create a Docker network:
docker network create mongodb-cluster
Start first node mongo1:
docker run -d --rm -p 27017:27017 --name mongo1 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo1
Start second node mongo2:
docker run -d --rm -p 27018:27017 --name mongo2 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo2
Start third node mongo3:
docker run -d --rm -p 27019:27017 --name mongo3 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo3
Configure replica set:
docker exec -it mongo1 mongosh --eval "rs.initiate({
_id: \"myReplicaSet\",
members: [
{_id: 0, host: \"mongo1\"},
{_id: 1, host: \"mongo2\"},
{_id: 2, host: \"mongo3\"}
]
})"
You should see this output:
{ ok: 1 }
Start both the applications
In a terminal, start the change streams application:
export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet
docker run --network mongodb-cluster -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection -e RESUME_TOKEN=$RESUME_TOKEN mongo-change-streams
You will see an output similar to this:
created client object
connected to mongodb
watch established
started change stream...
waiting for program exit signal
In another terminal, start the REST API application:
export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet
docker run --network mongodb-cluster -p 8080:8080 -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection mongo-rest-api
You will see an output similar to:
created mongo client object
connected to mongo
started http server...
Test the application
Create a few records in MongoDB. For this, invoke the HTTP endpoint exposed by the REST API application (from another terminal):
curl -i localhost:8080
Repeat the above process two to three times. You should get an HTTP/1.1 200 OK response in all the cases.
Navigate to the terminal and check REST API application logs. You should see the records that were created. Note that the object IDs might be different in your case:
created record ObjectID("639c092160078afff212209b")
created record ObjectID("639c097660078afff212209c")
created record ObjectID("639c097760078afff212209d")
Navigate to the terminal and check the change streams application logs. You should see the same records as above (that were created by the REST API application). These were automatically detected by the change streams listener process. Note that the object IDs might be different in your case:
change stream event{"_id": {"$oid":"639c092160078afff212209b"},"user": "user-1671170337"}
change stream event{"_id": {"$oid":"639c097660078afff212209c"},"user": "user-1671170422"}
change stream event{"_id": {"$oid":"639c097760078afff212209d"},"user": "user-1671170423"}
Use Change Streams Resume Token
First, shut down the change stream application - press Control+C on the respective terminal where the application is running.
You should see logs similar to this. Note that the token might differ in your case.
resume token {"_data": "82639C09A4000000012B0229296E04"}
use this token in the next run with following command - export RESUME_TOKEN=82639C09A4000000012B0229296E04
The log message highlights the command that should be used if you want to leverage the Resume token. Make a note of the command.
Add a few records to MongoDB by invoking the HTTP endpoint exposed by the REST API application:
curl -i localhost:8080
Repeat this a few times.
Navigate to the terminal and check REST API application logs. You should see the records that were created. Note that the object IDs might be different in your case:
created record ObjectID("639c09ed60078afff212209e")
created record ObjectID("639c09ee60078afff212209f")
Restart the change streams application. This time, use the Resume token by passing it as an environment variable. You can use the command in the log output above:
export RESUME_TOKEN=82639C09A4000000012B0229296E04
export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet
docker run --network mongodb-cluster -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection -e RESUME_TOKEN=$RESUME_TOKEN mongo-change-streams
You should see logs similar to this. Note that the token and object IDs might differ in your case.
token passed in as enviroment variable 82639C09A4000000012B0229296E04
set token to watch client option
watch established
started change stream...
change stream event{"_id": {"$oid":"639c09ed60078afff212209e"},"user": "user-1671170541"}
change stream event{"_id": {"$oid":"639c09ee60078afff212209f"},"user": "user-1671170542"}
Verify that you received the same records which were added when the change streams application was shut down.
You can continue to add records to MongoDB by invoking the HTTP endpoint exposed by the REST API application:
curl -i localhost:8080
As expected, these will be detected and logged by the change streams application in real-time:
You should see logs similar to this:
change stream event{"_id": {"$oid":"639c0a5c60078afff21220a0"},"user": "user-1671170652"}
change stream event{"_id": {"$oid":"639c0a5d60078afff21220a1"},"user": "user-1671170653"}
Clean up
Finally, to stop both applications, press Control+C in their respective terminals. Delete the MongoDB cluster instances and the Docker network as well:
docker rm -f mongo1 mongo2 mongo3
docker network rm mongodb-cluster
Conclusion
In this article, you get an overview of MongoDB and change streams. You set up a MongoDB replica set in Docker, deployed client applications, and explored change streams and resume tokens with an end-to-end example.
You can also learn more in the following documentation: