How to Use InfluxDB with Python
Introduction
InfluxDB is a time-series database that powers most IoT projects like home automation apps, AI security cameras, health applications, server monitoring systems, and more. Although InfluxDB is a powerful database, you can enhance its functionalities using Python modules. Python offers a great library and its popularity is rapidly growing due to its user-friendly data structures and code versatility.
This guide explains how to use the InfluxDB library with Python on a Ubuntu server. The library is a flexible self-contained driver that allows you to establish communication to an InfluxDB database using Python.
Prerequisites
Before you begin:
- Deploy a Ubuntu 20.04 server.
- Using SSH, access the server.
- Create a non-root sudo user and switch to the user account.
- Install InfluxDB and create an administrator account.
Install the influxdb
Library and Create a Sample Database
Update the server.
$ sudo apt update
Install the
pip
Python package manager.$ sudo apt install -y python3-pip
Using
pip
install theinfluxdb
library.$ pip install influxdb
If the above command returns an error, use:
$ apt install -y python3-influxdb
Log in to the InfluxDB database console. Replace
EXAMPLE_PASSWORD
with a strong password.$ influx -username 'admin' -password 'EXAMPLE_PASSWORD'
Output:
Connected to http://localhost:8086 version 1.6.7~rc0 InfluxDB shell version: 1.6.7~rc0
Create a sample
fitness_app_db
database.> CREATE DATABASE fitness_app_db
Switch to the new
fitness_app_db
database.> USE fitness_app_db
Insert sample data into the
fitness_app_db
database using the line protocol syntax.> INSERT steps_counter,user=john_doe daily_steps=1879i INSERT steps_counter,user=mary_doe daily_steps=2785i INSERT steps_counter,user=jane_smith daily_steps=4563i
The above commands declare
steps_counter
as the name of themeasurement
. Theuser
tag uniquely identifies each user in the database. Then thedaily_steps
field logs a user's total daily steps in the database. The trailingi
at the end of thedaily_steps
field value instructs InfluxDB to insert values as integers.Query the
steps_counter
measurements to verify the data.> SELECT "user", "daily_steps" FROM "steps_counter"
Output:
name: steps_counter time user daily_steps ---- ---- ----------- 1687426847395972699 john_doe 1879 1687426847414045751 mary_doe 2785 1687426847498567130 jane_smith 4563
Exit the InfluxDB database console.
> quit
Create an InfluxDB Database Gateway Class
In this section, create a separate project
directory to store your Python source code. Then, within the directory, create a custom InfluxDB database gateway module to communicate with the database you created earlier as described below.
Create a new
project
directory for your application.$ mkdir project
Switch to the new
project
directory.$ cd project
Using a text editor such as
Nano
, open theinflux_db_gateway.py
file.$ nano influx_db_gateway.py
Add the following configurations to the file.
from influxdb import InfluxDBClient class InfluxDbGateway: def db_conn(self): client = InfluxDBClient( 'localhost', 8086, 'admin', 'EXAMPLE_PASSWORD', 'fitness_app_db' ) return client def query_data(self): inf_db_client = self.db_conn() result = inf_db_client.query('SELECT "user", "daily_steps" FROM "steps_counter"') return list(result) def insert_data(self, data_points): user = data_points['user'] daily_steps = data_points['daily_steps'] inf_db_client = self.db_conn() inf_db_client.write_points([ { "measurement": "steps_counter", "tags": {"user": user}, "fields": {"daily_steps": daily_steps} } ]) return ['success']
Replace
EXAMPLE_PASSWORD
with the actual InfluxDB password you created earlier.Below is what the configuration file does:
from influxdb import InfluxDBClient
declaration imports the InfluxDB library to the file.class InfluxDbGateway:
initializes a new class with three methods as below:def db_conn(self):
connects to the samplefitness_app_db
database.def query_data(self):
queries thesteps_counter
measurement to retrieve points using the line protocol syntaxSELECT "user", "daily_steps" FROM "steps_counter"
and returns the data as a list (return list(result)
).def insert_data(self, data_points):
accepts InfluxDB data points in JSON format and runs theinf_db_client.write_points()
function to write the points to the database. After writing the points, the method returns a success messagereturn ['success']
.
Save and close the file.
Create the Application Startup File
In this section, you will combine the sample databae, and gateway module to create a startup file for the application as described below.
Create a new
index.py
file.$ nano index.py
Add the following configurations to the file.
import http.server from http import HTTPStatus import socketserver import json import influx_db_gateway class HttpHandler(http.server.SimpleHTTPRequestHandler): def do_GET(self): self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() db_gateway = influx_db_gateway.InfluxDbGateway() result = db_gateway.query_data() self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8")) def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data_points = json.loads(post_data) self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() db_gateway = influx_db_gateway.InfluxDbGateway() result = db_gateway.insert_data(data_points) self.wfile.write(bytes(json.dumps(result, indent=2) + "\n", "utf8")) httpd = socketserver.TCPServer(('', 8082), HttpHandler) print("Web server is now running on port 8082...") try: httpd.serve_forever() except KeyboardInterrupt: httpd.server_close() print("The HTTP server has stopped running.")
Below is what the configuration file does:
import...
: imports the Python HTTP moduleshttp.server
,HTTPStatus
,socketserver
, JSON modulejson
, and the custominflux_db_gateway
module you created earlier.HttpHandler(http.server.SimpleHTTPRequestHandler):
handles the following HTTP methods:GET
: retrieves points from the database using thedb_gateway = influx_db_gateway.InfluxDbGateway()
andresult = db_gateway.query_data()
statements.POST
: inserts new data points in the database using thedb_gateway = influx_db_gateway.InfluxDbGateway()
andresult = db_gateway.insert_data(data_points)
statements.
The following code returns the correct status codes and headers to HTTP clients.
... self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers()
The code block below starts a web server that accepts incoming HTTP requests on port
8082
.... httpd = socketserver.TCPServer(('', 8082), HttpHandler) print("Web server is now running on port 8082...") try: httpd.serve_forever() except KeyboardInterrupt: httpd.server_close() print("The HTTP server has stopped running.")
Save and close the file.
Test the Application
Run the application in the background.
$ python3 index.py
Output:
Web server is now running on port 8082...
Using
curl
, retrieve all points from the InfluxDB database.$ curl -X GET http://localhost:8082/
Output:
[ [ { "time": "2023-06-23T09:19:20.963877Z", "user": "john_doe", "daily_steps": 1879 }, { "time": "2023-06-23T09:19:20.978350Z", "user": "mary_doe", "daily_steps": 2785 }, { "time": "2023-06-23T09:19:21.020994Z", "user": "jane_smith", "daily_steps": 4563 } ] ]
Run the following commands to write new data points to the database.
$ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "james_gates", "daily_steps": 2948}' $ curl -X POST http://localhost:8082/ -H 'Content-Type: application/json' -d '{"user": "eva_dukes", "daily_steps": 2759}'
Output:
... [ "success" ]
Fetch the application again to verify if the new data is added.
$ curl -X GET http://localhost:8082/
Output:
[ [ { "time": "2023-06-23T09:19:20.963877Z", "user": "john_doe", "daily_steps": 1879 }, { "time": "2023-06-23T09:19:20.978350Z", "user": "mary_doe", "daily_steps": 2785 }, { "time": "2023-06-23T09:19:21.020994Z", "user": "jane_smith", "daily_steps": 4563 }, { "time": "2023-06-23T09:24:13.425451Z", "user": "james_gates", "daily_steps": 2948 }, { "time": "2023-06-23T09:24:24.832866Z", "user": "eva_dukes", "daily_steps": 2759 } ] ]
To stop the application. View its Job ID.
$ jobs
Output:
[1]+ Running python3 index.py &
Kill the job ID to stop the application.
$ kill %1
Conclusion
In this guide, you have used the InfluxDB database with Python on a Ubuntu 20.04 server. You created a sample InfluxDB database, queried, and submitted data via Python source code using the curl
utility.
For more information about InfluxDB, visit the following resources.