How to Implement Apache Cassandra Driver for Python

Updated on August 10, 2023
How to Implement Apache Cassandra Driver for Python header image

Introduction

Apache Cassandra is a fault-tolerant high-availability NoSQL database. The database server uses a distributed system model to handle large amounts of data across different nodes, and implements a modern peer-to-peer architecture. This architecture is different from the traditional primary-secondary architecture which is prone to a single point of failure.

Among the key advantages of the Cassandra database server is scalability. Depending on your application needs, you can scale your database up by adding new nodes or down by deleting nodes. These high-performance features make Cassandra one of the best database systems for deploying Python applications.

This guide explains how to implement the Apache Cassandra database with Python.

Prerequisites

Before you start:

Set Up a Sample Cassandra Database

In this section, set up the Cassandra database, that's used by the sample application with a single products table to store product information as described in the steps below.

  1. Log in to the Apache Cassandra database server.

     $ cqlsh
  2. Create a sample online_shop keyspace.

     cqlsh> CREATE KEYSPACE online_shop 
          WITH REPLICATION = { 
              'class': 'SimpleStrategy', 
              'replication_factor' : 1 
          };

    In Cassandra, a keyspace is a data container similar to a database in traditional relation database management systems. The above command creates a new keyspace named online_shop.

  3. Switch to the new online_shop keyspace.

     cqlsh> USE online_shop;
  4. Create a sample products table with three columns.

     cqlsh:online_shop> CREATE TABLE products (
                          product_id BIGINT PRIMARY KEY,                     
                          product_name TEXT,
                          retail_price DOUBLE           
                      );

    In the above table, the product_id column is a PRIMARY KEY that uniquely identifies the products. The product_name column stores the product names and the retail_price column stores the final price that customers pay for the products.

  5. Populate the products table with sample data.

     cqlsh:online_shop> INSERT INTO products (product_id, product_name, retail_price) VALUES (1, '4G ROUTER', 58.55);
                        INSERT INTO products (product_id, product_name, retail_price) VALUES (2, 'WIRELESS MOUSE', 25.80);
                        INSERT INTO products (product_id, product_name, retail_price) VALUES (3, 'SMART WATCH', 189.50);
  6. View the products table data.

     cqlsh:online_shop> SELECT
                            product_id,
                            product_name,
                            retail_price
                        FROM products;

    Output:

     product_id | product_name   | retail_price
     ------------+----------------+--------------
              2 | WIRELESS MOUSE |         25.8
              3 |    SMART WATCH |        189.5
              1 |      4G ROUTER |        58.55
    
     (3 rows)
  7. Exit the database server.

     EXIT;

Install Project Dependencies

To keep your application organized, Python source code files must be in a separate directory. For the Python application to connect to the Apache Cassandra database you created earlier, install the cassandra-driver module as described in the following steps.

  1. Create a new project directory.

     $ mkdir project
  2. Switch to the directory.

     $ cd project
  3. Install the pip Python package manager.

     $ sudo apt install -y python3-pip
  4. Using pip, install the Cassandra database driver.

     $ pip install cassandra-driver

    The above command installs cassandra-driver, a feature-rich and highly-customizable Python module for the Apache Cassandra database.

Create a Database Connection Module

To promote code reusability in your Python application, develop a central module that connects to the Apache Cassandra database to execute queries. The module works as a gateway to the Apache Cassandra database server. Later, you can import and reuse the module in other Python source code files. To create the module, follow the steps described below.

  1. Using a text editor such as Nano, create a new cassandra_gateway.py file.

     $ nano cassandra_gateway.py
  2. Add the following contents to the file.

     from cassandra.cluster import Cluster
     from cassandra.query import dict_factory
    
     class CassandraGateway:
    
       def db_session(self):
    
           clstr   = Cluster()
           session = clstr.connect('online_shop')
    
           return session
    
       def execute(self, json_data):
    
           db_session   = self.db_session()
           query_string = "insert into products (product_id, product_name, retail_price) values (?, ?, ?);"
    
           stmt = db_session.prepare(query_string)
    
           product_id   = int(json_data["product_id"])
           product_name = json_data["product_name"]
           retail_price = json_data["retail_price"]
    
           prepared_query = stmt.bind([product_id, product_name, retail_price])
    
           db_session.execute(prepared_query)
    
           return self.query(product_id)
    
       def query(self, product_id = 0):
    
           db_session   = self.db_session()
           db_session.row_factory = dict_factory
    
           if product_id == 0:
    
               query_string = "select product_id, product_name, retail_price from products;"
               stmt = db_session.prepare(query_string)         
                prepared_query = stmt.bind([])   
    
           else:
    
               query_string = "select product_id, product_name, retail_price from products where product_id = ?;"
               stmt = db_session.prepare(query_string)         
               prepared_query = stmt.bind([int(product_id)])          
    
           rows = db_session.execute(prepared_query)
    
           return list(rows)

    Save and close the file.

    Below is what the file declarations represent:

    • The declaration below imports the Apache Cassandra database driver for Python.

        from cassandra.cluster import Cluster
        ...
    • The dict_factory function returns data from the products table in a dictionary format that lists the column names. The dictionary format displays meaningful data for JSON responses.

        ...
        from cassandra.query import dict_factory
        ...
    • The CassandraGateway class hosts three methods as below:

      • db_session(self): Executes the clstr = Cluster() and session = clstr.connect('online_shop') functions to connect to the Apache Cassandra keyspace you created earlier.
      • execute(self, json_data): Reuses the db_session() method to run db_session.prepare(query_string), stmt.bind([product_id, product_name, retail_price]) and db_session.execute(prepared_query) functions to insert data into the products table using a parameterized query (insert into products (product_id, product_name, retail_price) values (?, ?, ?);).
      • query(self, product_id = 0): Runs a SELECT statement against the products table and returns the output as a list using the return list(rows) declaration. if product_id == 0: ... else ... returns either the full list of products or a specific product depending on whether the HTTP client request that specifies a product_id value when making a GET request.

Create the Application's Entry Point

Every Python application requires a main file that executes when the application starts. In this section, create a new index.py file that reuses the custom cassandra_gateway module you created in the previous step as described in the steps below.

  1. Create a new index.py file.

     $ nano index.py
  2. Add the following contents to the file.

     import http.server
     from http import HTTPStatus
     import socketserver
    
     import json
     import cassandra_gateway
    
     class HttpHandler(http.server.SimpleHTTPRequestHandler): 
    
        def do_POST(self):
    
            self.send_response(HTTPStatus.OK)
            self.send_header('Content-type','application/json')
            self.end_headers()
    
            content_length = int(self.headers['Content-Length'])
             post_data = self.rfile.rea(content_length)
             json_data = json.loads(post_data)  
    
            db_gateway =  cassandra_gateway.CassandraGateway()
    
            db_resp = db_gateway.execute(json_data)  
    
            resp = {                   
                "data": db_resp
            }           
    
            self.wfile.write(bytes(json.dumps(resp, indent = 2) + "\r\n", "utf8")) 
    
        def do_GET(self):
    
            self.send_response(HTTPStatus.OK)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
    
            product_id = 0
    
            if len(self.path.split("/")) >= 3:
                 product_id = self.path.split("/")[2] 
    
            db_gateway = cassandra_gateway.CassandraGateway()
    
            db_resp = db_gateway.query(product_id)  
    
            resp = {                   
                 "data": db_resp
            }           
    
            self.wfile.write(bytes(json.dumps(resp, indent = 2) + "\r\n", "utf8")) 
    
     httpd = socketserver.TCPServer(('', 8080), HttpHandler)
     print("HTTP server started at port 8080...")
    
     try:
    
        httpd.serve_forever()
    
     except KeyboardInterrupt: 
    
        httpd.server_close()
    
        print("You've stopped the HTTP server.")

    Save and close the file.

    Below is what the file declarations represent:

    • The code below imports HTTP functionalities.

        import http.server
        from http import HTTPStatus
        import socketserver
        ...
    • The following lines enable JSON formatting and Cassandra database functions.

        ...
        import json
        import cassandra_gateway
        ...
    • The HttpHandler(http.server.SimpleHTTPRequestHandler) class responds to HTTP POST and GET requests to insert and retrieve data using the custom cassandra_gateway.py module.

    • The following lines run an HTTP server that listens for incoming connections on port 8080.

        ...
        httpd = socketserver.TCPServer(('', 8080), HttpHandler)
        print("HTTP server started at port 8080...")
      
        try:
      
          httpd.serve_forever()     
        ...

Test the Application

When all Python source code files are created, run and test your application's logic as described in the following steps.

  1. Run the application.

     $ python3 index.py

    Your output should look like the one below:

     HTTP server started at port 8080...
  2. In a new terminal session, establish another SSH connection to the server.

     $ ssh root@SERVER-IP
  3. Run the following HTTP GET request to list products from the database.

     $ curl -X GET http://localhost:8080/products

    Output:

     {
      "data": [
        {
          "product_id": 2,
          "product_name": "WIRELESS MOUSE",
          "retail_price": 25.8
        },
        {
          "product_id": 3,
          "product_name": "SMART WATCH",
          "retail_price": 189.5
        },
        {
          "product_id": 1,
          "product_name": "4G ROUTER",
          "retail_price": 58.55
        }
      ]
     }
  4. Specify a product_id at the end of the URL to retrieve a specific product.

     $ curl -X GET http://localhost:8080/products/2

    Output:

     {
       "data": [
         {
           "product_id": 2,
           "product_name": "WIRELESS MOUSE",
           "retail_price": 25.8
         }
       ]
     }
  5. Insert a new product into the Cassandra database by running the following HTTP POST request.

     $ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"product_id": 4, "product_name": "TRENCH COAT", "retail_price": 456.28}'

    Output:

     {
       "data": [
         {
           "product_id": 4,
           "product_name": "TRENCH COAT",
           "retail_price": 456.28
         }
       ]
     }

    When all commands are successful, Your application is working correctly as expected.

Conclusion

In this guide, you implemented the Apache Cassandra Python module to perform basic database operations on a custom keyspace. You created a test database using the Cassandra CLI (cqlsh), set up a custom Python database gateway module, and reused it in a new source code file to execute queries on the Cassandra database.

Next Steps

To implement more Python database modules, visit the following resources.