How to Implement PostgreSQL Database Transactions with Python on Ubuntu 20.04

Updated on June 23, 2024
How to Implement PostgreSQL Database Transactions with Python on Ubuntu 20.04 header image

Introduction

A database transaction is a chain of SQL commands that fulfill a business logic. For example, in an e-commerce application, the SQL commands required to fill a customer's order may affect the sales_orders, sales_order_products, and sales_payments tables. Database transactions address the principle of atomicity that state that a transaction should have an all or nothing effect in a database. If any of the SQL commands in the transaction fails, the database should delete (roll back) the entire transaction. PostgreSQL is one of the most popular database servers that support transactions to eliminate the possibility of partial database updates.

This guide shows you how to implement PostgreSQL transactions with psycopg2, an advanced Python library used to connect to a PostgreSQL server.

Prerequisites

To complete this guide:

1. Set Up a Sample Database

This sample database is the back end of a bank application that stores customers and their loan balances.

This application uses two tables to complete a database transaction. The customers table stores the names of the customers. Then, the loans table stores the customers' loan balances. Later, this guide shows you how to use the Linux curl command to send sample transactions to the application. The application must complete transactions as a single unit of work to fulfill the business logic. Otherwise, the database should reject partially complete transactions.

To set up this sample application, you require the postgresql-client package to connect to the managed PostgreSQL database cluster and create a database. Follow the steps below to install the package and initialize the database:

  1. Update the server's package information index.

     $ sudo apt update 
  2. Use the apt tool to install the postgresql-client package.

     $ sudo apt install -y postgresql-client
  3. Log in to your managed PostgreSQL database cluster. Replace SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com with the correct host for your database.

     $ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb

    Output:

     Password for user vultradmin:
  4. Enter the password for your managed PostgreSQL database cluster and press Enter to proceed.

    Output:

     defaultdb=>
  5. Issue the following SQL command to create a sample bank_db database.

     defaultdb=> CREATE DATABASE bank_db;

    Output:

     CREATE DATABASE
  6. Connect to the new bank_db database.

     defaultdb=> \c bank_db;

    Output:

     ...
     You are now connected to database "bank_db" as user "vultradmin".
  7. Create a sample customers table. This table stores the customer_ids, first_names, and last_names. The SERIAL keyword instructs the PostgreSQL server to generate new customer_ids automatically.

     bank_db=> CREATE TABLE customers (
                   customer_id SERIAL PRIMARY KEY,
                   first_name VARCHAR(50),
                   last_name VARCHAR(50)        
               );

    Output:

     CREATE TABLE
  8. Create a loans table. This table stores loan account balances held by customers. The customer_id column in this table links back to the same column in the customers table.

     bank_db=> CREATE TABLE loans (
                   loan_id SERIAL PRIMARY KEY,
                   customer_id BIGINT,
                   amount DECIMAL(17, 4)  
               );

    Output:

     CREATE TABLE
  9. Log out from the managed PostgreSQL database cluster.

     bank_db=> \q
  10. Follow the next step to create a database class to access your sample PostgreSQL database.

2. Create a Custom PostgreSQL Database Class

With your sample database in place, you now require a central class that connects to the database to store data in the tables. Follow the steps below to create the class:

  1. Begin by creating a new project directory to separate your source code from system files.

     $ mkdir project
  2. Navigate to the new project directory.

     $ cd project
  3. Open a new postgresql_db.py file on a text editor.

     $ nano postgresql_db.py
  4. Enter the following information into the postgresql_db.py file. Remember to replace the database credentials (db_host, db_user, db_pass, and db_port) with the correct values for your PostgreSQL database cluster.

     import psycopg2
    
     class PostgresqlDb:
    
         def __init__(self):
    
             db_host = 'SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com' 
             db_name = 'bank_db'  
             db_user = 'vultradmin'
             db_pass = 'EXAMPLE_POSTGRESQL_PASSWORD'   
             db_port = 16751
    
             self.db_conn = psycopg2.connect(host = db_host, database = db_name, user = db_user, password = db_pass, port = db_port)
    
         def execute_db(self, json_payload):
    
             try: 
    
                 print("Starting new database transaction...")
    
                 self.db_conn.autocommit = False
    
                 self.cur = self.db_conn.cursor()   
    
                 print("Inserting new customer to database...")                 
    
                 sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id'     
                 self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name']))
                 customer_id = self.cur.fetchone()[0]
    
                 print("Customer successfully inserted to database, new customer_id is " + str(customer_id))
    
                 print("Inserting customer's loan record...")    
    
                 sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id'     
                 self.cur.execute(sql_string, (customer_id, json_payload['loan_amount']))
                 loan_id = self.cur.fetchone()[0]
    
                 print("Customer loan record inserted successfully, new loan_id is " + str(loan_id))
    
                 self.db_conn.commit()
    
                 print("Database transaction completed successfully.")    
    
                 return "Success"
    
             except (Exception, psycopg2.DatabaseError) as error:
    
                 print("Database transaction failed, rolling back database changes...")
    
                 self.db_conn.rollback()
    
                 return str(error)
    
             finally:
    
                 if self.db_conn:
    
                     self.cur.close()
                     self.db_conn.close()
                     print("Database connection closed successfully.")
  5. Save and close the postgresql_db.py file.

The postgresql_db.py file explained:

  1. The import psycopg2 statement loads the psycopg2 adapter that connects your PostgreSQL database cluster from the Python code.

  2. The postgresql_db.py file contains one PostgresqlDb class with two methods.

     import psycopg2
    
     class PostgresqlDb:
    
         def __init__(self):
    
             ...
    
         def execute_db(self, json_payload):
    
             ...
  3. The __init__(...) method is a constructor that fires every time you create a new object from the PostgresqlDb class.

  4. The execute_db(self, json_payload) method takes a JSON payload from an HTTP POST method containing the customer's names and the loan balance and forwards the requests to the PostgreSQL database.

  5. Under the execute_db(...) method, you're setting the PostgreSQL autocommit argument to False. This directive allows you to use the commit() command to permanently commit successful transactions or the rollback() command to prevent partial transactions.

             ...
             try: 
    
                 print("Starting new database transaction...")
    
                 self.db_conn.autocommit = False
    
                 self.cur = self.db_conn.cursor()  
             ...
  6. The following code block only fires when there are no errors in the database transaction. Under the transaction, the application creates a new record in the customers table and another record in a loans table.

                 ...
    
                 print("Inserting new customer to database...")                 
    
                 sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id'     
                 self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name']))
                 customer_id = self.cur.fetchone()[0]
    
                 print("Customer successfully inserted to database, new customer_id is " + str(customer_id))
    
                 print("Inserting customer's loan record...")    
    
                 sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id'     
                 self.cur.execute(sql_string, (customer_id, json_payload['loan_amount']))
                 loan_id = self.cur.fetchone()[0]
    
                 print("Customer loan record inserted successfully, new loan_id is " + str(loan_id))
    
                 self.db_conn.commit()
    
                 print("Database transaction completed successfully.")    
    
                 return "Success"
                 ...
  7. The except(...) block fires when a transaction fails with an exception. Then, the finally block executes in every case to close the cursor and the database connection.

             ...
             except (Exception, psycopg2.DatabaseError) as error:
    
                 print("Database transaction failed, rolling back database changes...")
    
                 self.db_conn.rollback()
    
                 return str(error)
    
             finally:
    
                 if self.db_conn:
    
                     self.cur.close()
                     self.db_conn.close()
                     print("Database connection closed successfully.")

The PostgresqlDb class is now ready. Use the following syntax to include it in other Python source code files.

import postgresql_db
pg = postgresql_db.PostgresqlDb() 

resp = pg.execute_db(...)

Follow the next step to create the main.py file for your Python application.

3. Create the Application's Entry Point

To complete this sample application, you need an HTTP server that accepts incoming POST requests on port 8080. Python has some inbuilt libraries that you can use to carry out the task. Follow the steps below to create the HTTP server:

  1. Open a new main.py file on a text editor.

      $ nano main.py
  2. Enter the following information into the main.py file.

     import http.server
     from http import HTTPStatus        
     import socketserver
    
     import json 
     import postgresql_db
    
     class httpHandler(http.server.SimpleHTTPRequestHandler):
    
                 def do_POST(self):
    
                     content_length = int(self.headers['Content-Length'])
                     post_data = self.rfile.read(content_length)
                     json_payload = json.loads(post_data)  
    
                     self.send_response(HTTPStatus.OK)
                     self.send_header('Content-type', 'application/json')
                     self.end_headers()
    
                     pg = postgresql_db.PostgresqlDb() 
    
                     resp = pg.execute_db(json_payload)
    
                     self.wfile.write(bytes( resp + '\r\n', "utf8")) 
    
     httpServer = socketserver.TCPServer(('', 8080), httpHandler)
    
     print("Web server started at port 8080")
    
     try:
    
         httpServer.serve_forever()
    
     except KeyboardInterrupt:
    
         httpServer.server_close()
         print("The HTTP server is stopped.")
  3. Save and close the main.py file.

The main.py file explained:

  1. The import section loads all the Python libraries required by the sample application. The http.server, HTTPStatus, and socketserver libraries load HTTP functionalities. The json module allows you to work with JSON data while the postgresql_db loads your custom PostgreSQL database class.

     import http.server
     from http import HTTPStatus        
     import socketserver
    
     import json 
    
     import postgresql_db
     ...
  2. The httpHandler is a handler class for the HTTP server. This class accepts a JSON payload from HTTP clients. Then under this class, the pg = postgresql_db.PostgresqlDb() and pg.execute_db(json_payload) statements call your custom PostgresqlDb class to save data to the database and return a response using the self.wfile.write(bytes( resp + '\r\n', "utf8")) statement.

     ...
     class httpHandler(http.server.SimpleHTTPRequestHandler):
    
                 def do_POST(self):
    
                     content_length = int(self.headers['Content-Length'])
                     post_data = self.rfile.read(content_length)
                     json_payload = json.loads(post_data)  
    
                     self.send_response(HTTPStatus.OK)
                     self.send_header('Content-type', 'application/json')
                     self.end_headers()
    
                     pg = postgresql_db.PostgresqlDb() 
    
                     resp = pg.execute_db(json_payload)
    
                     self.wfile.write(bytes( resp + '\r\n', "utf8")) 
     ...
  3. The following declarations at the end of the file create a web server that listens for HTTP requests and dispatches the request to the httpHandler class.

     ...
     httpServer = socketserver.TCPServer(('', 8080), httpHandler)
    
     print("Web server started at port 8080")
    
     try:
    
         httpServer.serve_forever()
    
     except KeyboardInterrupt:
    
         httpServer.server_close()
         print("The HTTP server is stopped.")

You now have all the necessary source code files required by your application. Proceed to the next step to test the application.

4. Test the Application

After coding all the Python files, the final step is installing the Python pip package, downloading the psycopg2 library, and testing the application. Follow the steps below to complete the application:

  1. Install the Python pip package.

     $ sudo apt install -y python3-pip
  2. Use the pip package to install the psycopg2-binary library for the PostgreSQL server.

     $ pip install psycopg2-binary

    Output:

     ...
     Installing collected packages: psycopg2-binary
     Successfully installed psycopg2-binary-2.9.5
  3. Use the python3 command to run the application.

     $ python3 main.py

    Output:

     Web server started at port 8080
  4. Establish another SSH connection to your server and run the following Linux curl command to send a sample JSON payload to the application.

         $  curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "4560"}'

    Output:

     "Success"
  5. Check out the output below from the first terminal window where the web server is running. The transaction succeeds without any errors.

     Web server started at port 8080
    
    
     Starting new database transaction...
     Inserting new customer to database...
     Customer successfully inserted to database, new customer_id is 1
     Inserting customer's loan record...
     Customer loan record inserted successfully, new loan_id is 1
     Database transaction completed successfully.
     Database connection closed successfully.
  6. Try sending the following invalid transaction with the wrong loan amount. That is PP instead of a numeric value.

     $  curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "PP"}'

    Output:

     "invalid input syntax for type numeric: \"PP\"..."
  7. Examine the output from the first terminal window. This time, the transaction fails without making any changes to the database. Although the application inserts the customer's details into the database and obtains a new customer_id (2), the entire transaction rolls back per the following output.

     ..
     Starting new database transaction...
     Inserting new customer to database...
     Customer successfully inserted to database, new customer_id is 2
     Inserting customer's loan record...
     Database transaction failed, rolling back database changes...
     Database connection closed successfully.
  8. To verify the changes, log in to your PostgreSQL database cluster.

     $ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb

    Output:

     Password for user vultradmin:
  9. Enter your password and press Enter to proceed.

    Output:

     defaultdb=>
  10. Switch to the bank_db database.

     defaultdb=> \c bank_db;

    Output:

     You are now connected to database "bank_db" as user "vultradmin".
  11. Query the customers table.

     defaultdb=> SELECT
                     customer_id,
                     first_name,
                     last_name
                 FROM customers;

    Output:

      customer_id | first_name | last_name
     -------------+------------+-----------
                1 | JOHN       | DOE
     (1 row)
  12. Query the loans table.

     defaultdb=> SELECT
                     loan_id,
                     customer_id,
                     amount
                 FROM loans;

    Output:

      loan_id | customer_id |  amount
     ---------+-------------+-----------
            1 |           1 | 4560.0000
     (1 row)

The above outputs confirm that the application's logic is working as expected. Without the PostgreSQL transactions logic, you should now have an orphaned customer record without a matching loan record.

Conclusion

This guide shows you how to implement PostgreSQL database transactions with Python on Ubuntu 20.04 server. Use the source code in this guide to create applications that treat the unit of database work as a whole. Transactions ensure database consistency and prevent possible cases of orphaned records.

Check out the links below to learn more about the managed PostgreSQL database cluster: