How to Use MongoDB Change Streams with the Go Driver

Updated on June 23, 2024
How to Use MongoDB Change Streams with the Go Driver header image

Introduction

MongoDB is a NoSQL database that stores data in documents comprised of field-value pairs. These are stored using BSON, a binary representation of JSON. A MongoDB document is similar to a row in a relational database table, and the document fields (key-value pairs) are similar to columns. A MongoDB document is part of a collection, and one or more collections are part of a database. MongoDB API supports regular CRUD operations (create, read, update, delete) along with aggregation, geospatial queries, and text search capabilities. MongoDB also provides high availability and replication. You can distribute data across multiple MongoDB servers (called Replica sets) to provide redundancy and sharding.

This article will walk you through how to set up a MongoDB replica set in Docker, deploy client applications and explore change streams with a complete example. It will cover the following:

  • An overview of MongoDB and change streams.
  • How to use the MongoDB Go driver.
  • Application setup and deployment
  • Testing the application and understanding how to use resume tokens.

MongoDB provides official support for many client drivers, including Java, Python, Node.js, PHP, Ruby, Swift, C, C++, C#, and Go. In this article, you will be using the Go driver for MongoDB to build a change streams processor and a REST API application.

MongoDB Change streams

With Change streams, you can access MongoDB data updates in real-time. Applications do not need to be concerned about dealing with oplog and low-level operational details of change streams. You can either subscribe to all data changes on a collection, a database, or an entire deployment. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications. It's possible to use change streams for replica sets (this is what you will be using in this article) and a sharded setup.

Change streams only return the difference of fields during an update operation (this is the default behavior). But you can configure it to return the latest committed version of the updated document. You can also provide an array of one or more pipeline stages to control change stream output. For example - $addFields, $match, $project, and so on.

Resume token

Resume tokens allow applications to save progress and protect against potential data loss. If a change streams application crashes, it will not be able to detect database changes during this period. A resume token can be used to continue the processing from where the application stopped (before the crash) and receive all the past events. Change streams can help build decoupled and scalable architectures and make it easier to implement Extract-Transform- Load (ETL), notification, synchronization services, and so on.

Application Overview

The following applications have been demonstrated in this article:

  1. Change Streams listener: It subscribes to the change events stream of a MongoDB Collection using the Watch API. As soon as a document is created or updated, this application is notified in real-time. It also leverages Resume Tokens to continue processing from a specific point in time after restarts or crashes.
  2. REST API: It exposes a single HTTP endpoint to create a document in MongoDB.

Prerequisites

  1. Use a local Linux workstation, or deploy a Vultr cloud server as a workstation.
  2. Make sure Docker is installed on your workstation. You will need it to build and run Docker images.
  3. Install curl, a popular command-line HTTP client on the workstation.
  4. Install a recent version of the Go programming language (version 1.18 or higher) on your workstation.

Prepare MongoDB Change Streams application

Initialize the project

Create a directory and switch to it:

mkdir mongo-change-streams
cd mongo-change-streams

Create a new Go module:

go mod init mongo-change-streams

This will create a new go.mod file

Create a new file main.go:

touch main.go

Import libraries

To import required Go modules, add the following to main.go file:

package main

import (
  "context"
  "fmt"
  "log"
  "os"
  "os/signal"
  "syscall"
  "time"

  "go.mongodb.org/mongo-driver/bson"
  "go.mongodb.org/mongo-driver/mongo"
  "go.mongodb.org/mongo-driver/mongo/options"
)

In addition to the Go standard library packages, you will import the following packages from MongoDB Go driver:

  • go.mongodb.org/mongo-driver/mongo - Provides core MongoDB functionalities.
  • go.mongodb.org/mongo-driver/bson - Package bson is a library for reading, writing, and manipulating BSON.
  • go.mongodb.org/mongo-driver/mongo/options - Package options defines the optional configurations for the MongoDB Go Driver.

Add the init function

Add the code below to main.go file:

var mongoConnectString string
var mongoDatabase string
var mongoCollection string
const msgFormat = "export RESUME_TOKEN=%s"

func init() {
  mongoConnectString = os.Getenv("MONGODB_URI")
  if mongoConnectString == "" {
    log.Fatal("missing environment variable", "MONGODB_URI")
  }

  mongoDatabase = os.Getenv("MONGODB_DATABASE")
  if mongoDatabase == "" {
    log.Fatal("missing environment variable", "MONGODB_DATABASE")
  }

  mongoCollection = os.Getenv("MONGODB_COLLECTION")
  if mongoCollection == "" {
    log.Fatal("missing environment variable", "MONGODB_COLLECTION")
  }
}

The init function retrieves the MongoDB connection string, collection name, and database name from the MONGODB_URI, MONGODB_COLLECTION, and MONGODB_DATABASE environment variables, respectively.

Add the main function

Add the main function to main.go file:

func main() {
  client, err := mongo.NewClient(options.Client().ApplyURI(mongoConnectString))
  if err != nil {
    log.Fatal("failed to create mongo client", err)
  }

  fmt.Println("created client object")

  ctx, cancel := context.WithCancel(context.Background())

  err = client.Connect(ctx)
  if err != nil {
    log.Fatal("failed to connect to mongo", err)
  }

  fmt.Println("connected to mongodb")

  coll := client.Database(mongoDatabase).Collection(mongoCollection)

  defer func() {
    err = client.Disconnect(context.Background())
    if err != nil {
      fmt.Println("failed to close mongo connection")
    }
  }()

  match := bson.D{{"$match", bson.D{{"operationType", bson.D{{"$in", bson.A{"insert", "update", "replace" }}}}}}}
  project := bson.D{{"$project", bson.M{"_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1}}}
  pipeline := mongo.Pipeline{match, project}

  opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)

  tokenFromEnv := os.Getenv("RESUME_TOKEN")

  if tokenFromEnv != "" {
    fmt.Println("resume token in enviroment variable", tokenFromEnv)

    t := bson.M{"_data": tokenFromEnv}
    opts.SetResumeAfter(t)

    fmt.Println("set resume token to watch client")
  }

  cs, err := coll.Watch(ctx, pipeline, opts)
  if err != nil {
    log.Fatal("failed to start change stream watch: ", err)
  }

  fmt.Println("watch established")

  defer func() {
    fmt.Println("resume token ", cs.ResumeToken().String())
    fmt.Println("use resume token in the next run with following command -", fmt.Sprintf(msgFormat, cs.ResumeToken().Lookup("_data").StringValue()))

    close, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    err := cs.Close(close)
    if err != nil {
      fmt.Println("failed to close change stream")
    }

    fmt.Println("closed change stream")
  }()

  go func() {
    fmt.Println("started change stream...")

    for cs.Next(ctx) {
      re := cs.Current.Index(1)
      fmt.Println("change stream event" + re.Value().String())
    }
  }()

  exit := make(chan os.Signal, 1)
  signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)

  fmt.Println("waiting for program exit signal")

  <-exit
  fmt.Println("program exit initiated")
  cancel()
}
  • Use mongo.NewClient to create a new *mongo.Client object.
  • Initiate connection to MongoDB using Connect.
  • Invoke Database to get a mongo.Database and then invoke Collection to get a handle to the mongo.Collection.
  • A defer function is used to disconnect and close the *mongo.Client object at the end of the program.
  • A mongo.Pipeline is created using match and project stages.
  • If a resume token is provided using the RESUME_TOKEN environment variable, it's used to configure the change streams watch client.
  • Get a mongo.ChangeStream using Watch. As a part of the program exit process, another defer function is used to provide the resume token information and close the change stream.
  • The change stream listener is started as a goroutine. It uses Next to listen to the change stream event, uses Current.Index to get the document, and logs the event to the console.
  • Finally, set up a Go channel to get notified on program interrupts and exit cleanly.

Prepare MongoDB REST API application

Create a new file api.go:

touch api.go

Import libraries

To import required Go modules, add the following to api.go file:

package main

import (
  "context"
  "fmt"
  "log"
  "net/http"
  "os"
  "strconv"
  "time"

  "go.mongodb.org/mongo-driver/mongo"
  "go.mongodb.org/mongo-driver/mongo/options"
)

In addition to the Go standard library packages, we import the following packages from MongoDB Go driver:

  • go.mongodb.org/mongo-driver/mongo - Provides core MongoDB functionalities.
  • go.mongodb.org/mongo-driver/mongo/options - Package options defines the optional configurations for the MongoDB Go Driver.

Add the init function

Add the code below to api.go file:

var coll *mongo.Collection
var mongoConnectString string
var mongoDatabase string
var mongoCollection string

func init() {
  mongoConnectString = os.Getenv("MONGODB_URI")
  if mongoConnectString == "" {
    log.Fatal("missing environment variable", "MONGODB_URI")
  }

  mongoDatabase = os.Getenv("MONGODB_DATABASE")
  if mongoDatabase == "" {
    log.Fatal("missing environment variable", "MONGODB_DATABASE")
  }

  mongoCollection = os.Getenv("MONGODB_COLLECTION")
  if mongoCollection == "" {
    log.Fatal("missing environment variable", "MONGODB_COLLECTION")
  }

  client, err := mongo.NewClient(options.Client().ApplyURI(mongoConnectString))
  if err != nil {
    log.Fatal("failed to create mongo client", err)
  }

  fmt.Println("created mongo client object")

  err = client.Connect(context.Background())
  if err != nil {
    log.Fatal("failed to connect to mongo", err)
  }

  fmt.Println("connected to mongo")

  coll = client.Database(mongoDatabase).Collection(mongoCollection)
}

The init function retrieves the MongoDB connection string, collection name, and database name from the MONGODB_URI, MONGODB_COLLECTION, and MONGODB_DATABASE environment variables, respectively. It invokes the Database method to get a mongo.Database and then calls Collection to get a handle to the mongo.Collection.

Add the main function

Add the main function to api.go file:

func main() {
  http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

    res, err := coll.InsertOne(context.Background(), map[string]string{"user": "user-" + strconv.Itoa(int(time.Now().Unix()))})
    if err != nil {
      log.Fatal("mongo insert failed", err)
    }

    fmt.Println("created record", res.InsertedID)
  })

  fmt.Println("started http server...")
  log.Fatal(http.ListenAndServe(":8080", nil))
}

The main function registers an HTTP handler to insert a record into the MongoDB collection on invocation. The HTTP server is started on port 8080.

Prepare Docker images

Add Dockerfiles

Create a file named Dockerfile.change_stream_app:

touch Dockerfile.change_stream_app

Enter the below contents in Dockerfile.change_stream_app:

FROM golang:1.18-buster AS build

WORKDIR /app
COPY go.mod ./
COPY go.sum ./

RUN go mod download

COPY main.go ./
RUN go build -o /mongodb-app

FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /mongodb-app /mongodb-app
EXPOSE 8080
USER nonroot:nonroot
ENTRYPOINT ["/mongodb-app"]
  • This is a multi-stage Dockerfile that uses golang:1.18-buster as the base image of the first stage.
  • Copy the application files, run go mod download, and build the application binary.
  • In the second stage, gcr.io/distroless/base-debian10 is used as the base image.
  • The binary is copied from the first stage, and the ENTRYPOINT is configured to run the application.

Create a file named Dockerfile.rest_api:

touch Dockerfile.rest_api

Enter the below contents in Dockerfile.rest_api:

FROM golang:1.18-buster AS build

WORKDIR /app
COPY go.mod ./
COPY go.sum ./

RUN go mod download

COPY api.go ./
RUN go build -o /mongodb-api

FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /mongodb-api /mongodb-api
EXPOSE 8080
USER nonroot:nonroot
ENTRYPOINT ["/mongodb-api"]
  • This is a multi-stage Dockerfile that uses golang:1.18-buster as the base image of the first stage.
  • Copy the application files, run go mod download, and build the application binary.
  • In the second stage, gcr.io/distroless/base-debian10 is used as the base image.
  • The binary is copied from the first stage, and the ENTRYPOINT is configured to run the application.

Build Docker images

Pull Go modules:

go mod tidy

Build Docker image for change streams application:

docker build -t mongo-change-streams -f Dockerfile.change_stream_app .

Build Docker image for REST API application:

docker build -t mongo-rest-api -f Dockerfile.rest_api .

Start MongoDB cluster in Docker

Create a Docker network:

docker network create mongodb-cluster

Start first node mongo1:

docker run -d --rm -p 27017:27017 --name mongo1 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo1

Start second node mongo2:

docker run -d --rm -p 27018:27017 --name mongo2 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo2

Start third node mongo3:

docker run -d --rm -p 27019:27017 --name mongo3 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo3

Configure replica set:

docker exec -it mongo1 mongosh --eval "rs.initiate({
_id: \"myReplicaSet\",
members: [
  {_id: 0, host: \"mongo1\"},
  {_id: 1, host: \"mongo2\"},
  {_id: 2, host: \"mongo3\"}
]
})"

You should see this output:

{ ok: 1 }

Start both the applications

In a terminal, start the change streams application:

export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet

docker run --network mongodb-cluster -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection -e RESUME_TOKEN=$RESUME_TOKEN mongo-change-streams

You will see an output similar to this:

created client object
connected to mongodb
watch established
started change stream...
waiting for program exit signal

In another terminal, start the REST API application:

export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet

docker run --network mongodb-cluster -p 8080:8080 -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection mongo-rest-api

You will see an output similar to:

created mongo client object
connected to mongo
started http server...

Test the application

Create a few records in MongoDB. For this, invoke the HTTP endpoint exposed by the REST API application (from another terminal):

curl -i localhost:8080

Repeat the above process two to three times. You should get an HTTP/1.1 200 OK response in all the cases.

Navigate to the terminal and check REST API application logs. You should see the records that were created. Note that the object IDs might be different in your case:

created record ObjectID("639c092160078afff212209b")
created record ObjectID("639c097660078afff212209c")
created record ObjectID("639c097760078afff212209d")

Navigate to the terminal and check the change streams application logs. You should see the same records as above (that were created by the REST API application). These were automatically detected by the change streams listener process. Note that the object IDs might be different in your case:

change stream event{"_id": {"$oid":"639c092160078afff212209b"},"user": "user-1671170337"}
change stream event{"_id": {"$oid":"639c097660078afff212209c"},"user": "user-1671170422"}
change stream event{"_id": {"$oid":"639c097760078afff212209d"},"user": "user-1671170423"}

Use Change Streams Resume Token

First, shut down the change stream application - press Control+C on the respective terminal where the application is running.

You should see logs similar to this. Note that the token might differ in your case.

resume token  {"_data": "82639C09A4000000012B0229296E04"}
use this token in the next run with following command - export RESUME_TOKEN=82639C09A4000000012B0229296E04

The log message highlights the command that should be used if you want to leverage the Resume token. Make a note of the command.

Add a few records to MongoDB by invoking the HTTP endpoint exposed by the REST API application:

curl -i localhost:8080

Repeat this a few times.

Navigate to the terminal and check REST API application logs. You should see the records that were created. Note that the object IDs might be different in your case:

created record ObjectID("639c09ed60078afff212209e")
created record ObjectID("639c09ee60078afff212209f")

Restart the change streams application. This time, use the Resume token by passing it as an environment variable. You can use the command in the log output above:

export RESUME_TOKEN=82639C09A4000000012B0229296E04

export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet

docker run --network mongodb-cluster -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection -e RESUME_TOKEN=$RESUME_TOKEN mongo-change-streams

You should see logs similar to this. Note that the token and object IDs might differ in your case.

token passed in as enviroment variable 82639C09A4000000012B0229296E04
set token to watch client option
watch established
started change stream...
change stream event{"_id": {"$oid":"639c09ed60078afff212209e"},"user": "user-1671170541"}
change stream event{"_id": {"$oid":"639c09ee60078afff212209f"},"user": "user-1671170542"}

Verify that you received the same records which were added when the change streams application was shut down.

You can continue to add records to MongoDB by invoking the HTTP endpoint exposed by the REST API application:

curl -i localhost:8080

As expected, these will be detected and logged by the change streams application in real-time:

You should see logs similar to this:

change stream event{"_id": {"$oid":"639c0a5c60078afff21220a0"},"user": "user-1671170652"}
change stream event{"_id": {"$oid":"639c0a5d60078afff21220a1"},"user": "user-1671170653"}

Clean up

Finally, to stop both applications, press Control+C in their respective terminals. Delete the MongoDB cluster instances and the Docker network as well:

docker rm -f mongo1 mongo2 mongo3
docker network rm mongodb-cluster

Conclusion

In this article, you get an overview of MongoDB and change streams. You set up a MongoDB replica set in Docker, deployed client applications, and explored change streams and resume tokens with an end-to-end example.

You can also learn more in the following documentation: