How to Use InfluxDB with Python

Updated on July 12, 2023
How to Use InfluxDB with Python header image

Introduction

InfluxDB is a time-series database that powers most IoT projects like home automation apps, AI security cameras, health applications, server monitoring systems, and more. Although InfluxDB is a powerful database, you can enhance its functionalities using Python modules. Python offers a great library and its popularity is rapidly growing due to its user-friendly data structures and code versatility.

This guide explains how to use the InfluxDB library with Python on a Ubuntu server. The library is a flexible self-contained driver that allows you to establish communication to an InfluxDB database using Python.

Prerequisites

Before you begin:

Install the influxdb Library and Create a Sample Database

  1. Update the server.

     $ sudo apt update
  2. Install the pip Python package manager.

     $ sudo apt install -y python3-pip
  3. Using pip install the influxdb library.

     $ pip install influxdb

    If the above command returns an error, use:

     $ apt install -y python3-influxdb
  4. Log in to the InfluxDB database console. Replace EXAMPLE_PASSWORD with a strong password.

     $ influx -username 'admin' -password 'EXAMPLE_PASSWORD'

    Output:

     Connected to http://localhost:8086 version 1.6.7~rc0
     InfluxDB shell version: 1.6.7~rc0
  5. Create a sample fitness_app_db database.

     > CREATE DATABASE fitness_app_db
  6. Switch to the new fitness_app_db database.

     > USE fitness_app_db
  7. Insert sample data into the fitness_app_db database using the line protocol syntax.

     > INSERT steps_counter,user=john_doe daily_steps=1879i
        INSERT steps_counter,user=mary_doe daily_steps=2785i
        INSERT steps_counter,user=jane_smith daily_steps=4563i

    The above commands declare steps_counter as the name of the measurement. The user tag uniquely identifies each user in the database. Then the daily_steps field logs a user's total daily steps in the database. The trailing i at the end of the daily_steps field value instructs InfluxDB to insert values as integers.

  8. Query the steps_counter measurements to verify the data.

     > SELECT "user", "daily_steps" FROM "steps_counter"

    Output:

       name: steps_counter
       time                user       daily_steps
       ----                ----       -----------
       1687426847395972699 john_doe   1879
       1687426847414045751 mary_doe   2785
       1687426847498567130 jane_smith 4563
  9. Exit the InfluxDB database console.

     > quit

Create an InfluxDB Database Gateway Class

In this section, create a separate project directory to store your Python source code. Then, within the directory, create a custom InfluxDB database gateway module to communicate with the database you created earlier as described below.

  1. Create a new project directory for your application.

     $ mkdir project
  2. Switch to the new project directory.

     $ cd project
  3. Using a text editor such as Nano, open the influx_db_gateway.py file.

     $ nano influx_db_gateway.py
  4. Add the following configurations to the file.

     from influxdb import InfluxDBClient
    
     class InfluxDbGateway:
         def db_conn(self):
             client = InfluxDBClient(
                 'localhost',
                 8086,
                 'admin',
                 'EXAMPLE_PASSWORD',
                 'fitness_app_db'
             )
             return client
    
         def query_data(self):
             inf_db_client = self.db_conn()
             result = inf_db_client.query('SELECT "user", "daily_steps" FROM "steps_counter"')
             return list(result)
    
         def insert_data(self, data_points):
             user = data_points['user']
             daily_steps = data_points['daily_steps']
             inf_db_client = self.db_conn()
             inf_db_client.write_points([
                 {
                     "measurement": "steps_counter",
                     "tags": {"user": user},
                     "fields": {"daily_steps": daily_steps}
                 }
             ])
             return ['success']

    Replace EXAMPLE_PASSWORD with the actual InfluxDB password you created earlier.

    Below is what the configuration file does:

    • from influxdb import InfluxDBClient declaration imports the InfluxDB library to the file.
    • class InfluxDbGateway: initializes a new class with three methods as below:
      • def db_conn(self): connects to the sample fitness_app_db database.
      • def query_data(self): queries the steps_counter measurement to retrieve points using the line protocol syntax SELECT "user", "daily_steps" FROM "steps_counter" and returns the data as a list (return list(result)).
      • def insert_data(self, data_points): accepts InfluxDB data points in JSON format and runs the inf_db_client.write_points() function to write the points to the database. After writing the points, the method returns a success message return ['success'].

    Save and close the file.

Create the Application Startup File

In this section, you will combine the sample databae, and gateway module to create a startup file for the application as described below.

  1. Create a new index.py file.

     $ nano index.py
  2. Add the following configurations to the file.

     import http.server
     from http import HTTPStatus
     import socketserver
     import json
     import influx_db_gateway
    
     class HttpHandler(http.server.SimpleHTTPRequestHandler):
         def do_GET(self):
             self.send_response(HTTPStatus.OK)
             self.send_header('Content-type', 'application/json')
             self.end_headers()
    
             db_gateway = influx_db_gateway.InfluxDbGateway()
             result = db_gateway.query_data()
    
             self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8"))
    
         def do_POST(self):
             content_length = int(self.headers['Content-Length'])
             post_data = self.rfile.read(content_length)
    
             data_points = json.loads(post_data)
    
             self.send_response(HTTPStatus.OK)
             self.send_header('Content-type', 'application/json')
             self.end_headers()
    
             db_gateway = influx_db_gateway.InfluxDbGateway()
             result = db_gateway.insert_data(data_points)
    
             self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8"))
    
     httpd = socketserver.TCPServer(('', 8082), HttpHandler)
     print("Web server is now running on port 8082...")
    
     try:
         httpd.serve_forever()
     except KeyboardInterrupt:
         httpd.server_close()
         print("The HTTP server has stopped running.")

    Below is what the configuration file does:

    • import...: imports the Python HTTP modules http.server, HTTPStatus, socketserver, JSON module json, and the custom influx_db_gateway module you created earlier.

    • HttpHandler(http.server.SimpleHTTPRequestHandler): handles the following HTTP methods:

      • GET: retrieves points from the database using the db_gateway = influx_db_gateway.InfluxDbGateway() and result = db_gateway.query_data() statements.
      • POST: inserts new data points in the database using the db_gateway = influx_db_gateway.InfluxDbGateway() and result = db_gateway.insert_data(data_points) statements.
    • The following code returns the correct status codes and headers to HTTP clients.

        ...
        self.send_response(HTTPStatus.OK)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
    • The code block below starts a web server that accepts incoming HTTP requests on port 8082.

        ...
        httpd = socketserver.TCPServer(('', 8082), HttpHandler)
        print("Web server is now running on port 8082...")
      
        try:
            httpd.serve_forever()
        except KeyboardInterrupt: 
            httpd.server_close()
            print("The HTTP server has stopped running.")

    Save and close the file.

Test the Application

  1. Run the application in the background.

     $ python3 index.py

    Output:

     Web server is now running on port 8082...     
  2. Using curl, retrieve all points from the InfluxDB database.

     $ curl -X GET http://localhost:8082/

    Output:

     [
        [
          {
            "time": "2023-06-23T09:19:20.963877Z",
            "user": "john_doe",
            "daily_steps": 1879
          },
          {
            "time": "2023-06-23T09:19:20.978350Z",
            "user": "mary_doe",
            "daily_steps": 2785
          },
          {
            "time": "2023-06-23T09:19:21.020994Z",
            "user": "jane_smith",
            "daily_steps": 4563
          }
        ]
     ]
  3. Run the following commands to write new data points to the database.

     $ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "james_gates", "daily_steps": 2948}'
    
     $ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "eva_dukes", "daily_steps": 2759}'

    Output:

     ...
     [
       "success"
     ]
  4. Fetch the application again to verify if the new data is added.

     $ curl -X GET http://localhost:8082/

    Output:

     [
        [
          {
            "time": "2023-06-23T09:19:20.963877Z",
            "user": "john_doe",
            "daily_steps": 1879
          },
          {
            "time": "2023-06-23T09:19:20.978350Z",
            "user": "mary_doe",
            "daily_steps": 2785
          },
          {
            "time": "2023-06-23T09:19:21.020994Z",
            "user": "jane_smith",
            "daily_steps": 4563
          },
          {
            "time": "2023-06-23T09:24:13.425451Z",
            "user": "james_gates",
            "daily_steps": 2948
          },
          {
            "time": "2023-06-23T09:24:24.832866Z",
            "user": "eva_dukes",
            "daily_steps": 2759
          }
        ]
     ]
  5. To stop the application. View its Job ID.

     $ jobs

    Output:

     [1]+  Running                 python3 index.py &
  6. Kill the job ID to stop the application.

     $ kill %1

Conclusion

In this guide, you have used the InfluxDB database with Python on a Ubuntu 20.04 server. You created a sample InfluxDB database, queried, and submitted data via Python source code using the curl utility.

For more information about InfluxDB, visit the following resources.