How to Implement Apache Cassandra Driver for Python
Introduction
Apache Cassandra is a fault-tolerant high-availability NoSQL database. The database server uses a distributed system model to handle large amounts of data across different nodes, and implements a modern peer-to-peer architecture. This architecture is different from the traditional primary-secondary architecture which is prone to a single point of failure.
Among the key advantages of the Cassandra database server is scalability. Depending on your application needs, you can scale your database up by adding new nodes or down by deleting nodes. These high-performance features make Cassandra one of the best database systems for deploying Python applications.
This guide explains how to implement the Apache Cassandra database with Python.
Prerequisites
Before you start:
- Deploy a Ubuntu 22.04 server on Vultr.
- Using SSH, create a non-root sudo user.
- Install Apache Cassandra database server.
- Update the server.
Set Up a Sample Cassandra Database
In this section, set up the Cassandra database, that's used by the sample application with a single products
table to store product information as described in the steps below.
Log in to the Apache Cassandra database server.
$ cqlsh
Create a sample
online_shop
keyspace.cqlsh> CREATE KEYSPACE online_shop WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor' : 1 };
In Cassandra, a keyspace is a data container similar to a database in traditional relation database management systems. The above command creates a new keyspace named
online_shop
.Switch to the new
online_shop
keyspace.cqlsh> USE online_shop;
Create a sample
products
table with three columns.cqlsh:online_shop> CREATE TABLE products ( product_id BIGINT PRIMARY KEY, product_name TEXT, retail_price DOUBLE );
In the above table, the
product_id
column is aPRIMARY KEY
that uniquely identifies the products. Theproduct_name
column stores the product names and theretail_price
column stores the final price that customers pay for the products.Populate the
products
table with sample data.cqlsh:online_shop> INSERT INTO products (product_id, product_name, retail_price) VALUES (1, '4G ROUTER', 58.55); INSERT INTO products (product_id, product_name, retail_price) VALUES (2, 'WIRELESS MOUSE', 25.80); INSERT INTO products (product_id, product_name, retail_price) VALUES (3, 'SMART WATCH', 189.50);
View the
products
table data.cqlsh:online_shop> SELECT product_id, product_name, retail_price FROM products;
Output:
product_id | product_name | retail_price ------------+----------------+-------------- 2 | WIRELESS MOUSE | 25.8 3 | SMART WATCH | 189.5 1 | 4G ROUTER | 58.55 (3 rows)
Exit the database server.
EXIT;
Install Project Dependencies
To keep your application organized, Python source code files must be in a separate directory. For the Python application to connect to the Apache Cassandra database you created earlier, install the cassandra-driver
module as described in the following steps.
Create a new project directory.
$ mkdir project
Switch to the directory.
$ cd project
Install the
pip
Python package manager.$ sudo apt install -y python3-pip
Using
pip
, install the Cassandra database driver.$ pip install cassandra-driver
The above command installs
cassandra-driver
, a feature-rich and highly-customizable Python module for the Apache Cassandra database.
Create a Database Connection Module
To promote code reusability in your Python application, develop a central module that connects to the Apache Cassandra database to execute queries. The module works as a gateway to the Apache Cassandra database server. Later, you can import and reuse the module in other Python source code files. To create the module, follow the steps described below.
Using a text editor such as
Nano
, create a newcassandra_gateway.py
file.$ nano cassandra_gateway.py
Add the following contents to the file.
from cassandra.cluster import Cluster from cassandra.query import dict_factory class CassandraGateway: def db_session(self): clstr = Cluster() session = clstr.connect('online_shop') return session def execute(self, json_data): db_session = self.db_session() query_string = "insert into products (product_id, product_name, retail_price) values (?, ?, ?);" stmt = db_session.prepare(query_string) product_id = int(json_data["product_id"]) product_name = json_data["product_name"] retail_price = json_data["retail_price"] prepared_query = stmt.bind([product_id, product_name, retail_price]) db_session.execute(prepared_query) return self.query(product_id) def query(self, product_id = 0): db_session = self.db_session() db_session.row_factory = dict_factory if product_id == 0: query_string = "select product_id, product_name, retail_price from products;" stmt = db_session.prepare(query_string) prepared_query = stmt.bind([]) else: query_string = "select product_id, product_name, retail_price from products where product_id = ?;" stmt = db_session.prepare(query_string) prepared_query = stmt.bind([int(product_id)]) rows = db_session.execute(prepared_query) return list(rows)
Save and close the file.
Below is what the file declarations represent:
The declaration below imports the Apache Cassandra database driver for Python.
from cassandra.cluster import Cluster ...
The
dict_factory
function returns data from theproducts
table in a dictionary format that lists the column names. The dictionary format displays meaningful data for JSON responses.... from cassandra.query import dict_factory ...
The
CassandraGateway
class hosts three methods as below:db_session(self)
: Executes theclstr = Cluster()
andsession = clstr.connect('online_shop')
functions to connect to the Apache Cassandra keyspace you created earlier.execute(self, json_data)
: Reuses thedb_session()
method to rundb_session.prepare(query_string)
,stmt.bind([product_id, product_name, retail_price])
anddb_session.execute(prepared_query)
functions to insert data into theproducts
table using a parameterized query (insert into products (product_id, product_name, retail_price) values (?, ?, ?);
).query(self, product_id = 0)
: Runs aSELECT
statement against theproducts
table and returns the output as a list using thereturn list(rows)
declaration.if product_id == 0: ... else ...
returns either the full list of products or a specific product depending on whether the HTTP client request that specifies aproduct_id
value when making a GET request.
Create the Application's Entry Point
Every Python application requires a main file that executes when the application starts. In this section, create a new index.py
file that reuses the custom cassandra_gateway
module you created in the previous step as described in the steps below.
Create a new
index.py
file.$ nano index.py
Add the following contents to the file.
import http.server from http import HTTPStatus import socketserver import json import cassandra_gateway class HttpHandler(http.server.SimpleHTTPRequestHandler): def do_POST(self): self.send_response(HTTPStatus.OK) self.send_header('Content-type','application/json') self.end_headers() content_length = int(self.headers['Content-Length']) post_data = self.rfile.rea(content_length) json_data = json.loads(post_data) db_gateway = cassandra_gateway.CassandraGateway() db_resp = db_gateway.execute(json_data) resp = { "data": db_resp } self.wfile.write(bytes(json.dumps(resp, indent = 2) + "\r\n", "utf8")) def do_GET(self): self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() product_id = 0 if len(self.path.split("/")) >= 3: product_id = self.path.split("/")[2] db_gateway = cassandra_gateway.CassandraGateway() db_resp = db_gateway.query(product_id) resp = { "data": db_resp } self.wfile.write(bytes(json.dumps(resp, indent = 2) + "\r\n", "utf8")) httpd = socketserver.TCPServer(('', 8080), HttpHandler) print("HTTP server started at port 8080...") try: httpd.serve_forever() except KeyboardInterrupt: httpd.server_close() print("You've stopped the HTTP server.")
Save and close the file.
Below is what the file declarations represent:
The code below imports HTTP functionalities.
import http.server from http import HTTPStatus import socketserver ...
The following lines enable JSON formatting and Cassandra database functions.
... import json import cassandra_gateway ...
The
HttpHandler(http.server.SimpleHTTPRequestHandler)
class responds to HTTPPOST
andGET
requests to insert and retrieve data using the customcassandra_gateway.py
module.The following lines run an HTTP server that listens for incoming connections on port
8080
.... httpd = socketserver.TCPServer(('', 8080), HttpHandler) print("HTTP server started at port 8080...") try: httpd.serve_forever() ...
Test the Application
When all Python source code files are created, run and test your application's logic as described in the following steps.
Run the application.
$ python3 index.py
Your output should look like the one below:
HTTP server started at port 8080...
In a new terminal session, establish another SSH connection to the server.
$ ssh root@SERVER-IP
Run the following HTTP
GET
request to list products from the database.$ curl -X GET http://localhost:8080/products
Output:
{ "data": [ { "product_id": 2, "product_name": "WIRELESS MOUSE", "retail_price": 25.8 }, { "product_id": 3, "product_name": "SMART WATCH", "retail_price": 189.5 }, { "product_id": 1, "product_name": "4G ROUTER", "retail_price": 58.55 } ] }
Specify a
product_id
at the end of the URL to retrieve a specific product.$ curl -X GET http://localhost:8080/products/2
Output:
{ "data": [ { "product_id": 2, "product_name": "WIRELESS MOUSE", "retail_price": 25.8 } ] }
Insert a new product into the Cassandra database by running the following HTTP
POST
request.$ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"product_id": 4, "product_name": "TRENCH COAT", "retail_price": 456.28}'
Output:
{ "data": [ { "product_id": 4, "product_name": "TRENCH COAT", "retail_price": 456.28 } ] }
When all commands are successful, Your application is working correctly as expected.
Conclusion
In this guide, you implemented the Apache Cassandra Python module to perform basic database operations on a custom keyspace. You created a test database using the Cassandra CLI (cqlsh
), set up a custom Python database gateway module, and reused it in a new source code file to execute queries on the Cassandra database.
Next Steps
To implement more Python database modules, visit the following resources.