How to Implement PostgreSQL Database Transactions with Python on Ubuntu 20.04
Introduction
A database transaction is a chain of SQL commands that fulfill a business logic. For example, in an e-commerce application, the SQL commands required to fill a customer's order may affect the sales_orders
, sales_order_products
, and sales_payments
tables. Database transactions address the principle of atomicity that state that a transaction should have an all or nothing effect in a database. If any of the SQL commands in the transaction fails, the database should delete (roll back) the entire transaction. PostgreSQL is one of the most popular database servers that support transactions to eliminate the possibility of partial database updates.
This guide shows you how to implement PostgreSQL transactions with psycopg2
, an advanced Python library used to connect to a PostgreSQL server.
Prerequisites
To complete this guide:
Locate the Connection Details for the PostgreSQL database cluster, located under the Overview tab. This guide uses the following sample connection details:
- username:
vultradmin
- password:
EXAMPLE_POSTGRESQL_PASSWORD
- host:
SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com
- port:
16751
- username:
1. Set Up a Sample Database
This sample database is the back end of a bank application that stores customers and their loan balances.
This application uses two tables to complete a database transaction. The customers
table stores the names of the customers. Then, the loans
table stores the customers' loan balances. Later, this guide shows you how to use the Linux curl
command to send sample transactions to the application. The application must complete transactions as a single unit of work to fulfill the business logic. Otherwise, the database should reject partially complete transactions.
To set up this sample application, you require the postgresql-client
package to connect to the managed PostgreSQL database cluster and create a database. Follow the steps below to install the package and initialize the database:
Update the server's package information index.
$ sudo apt update
Use the
apt
tool to install thepostgresql-client
package.$ sudo apt install -y postgresql-client
Log in to your managed PostgreSQL database cluster. Replace
SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com
with the correcthost
for your database.$ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb
Output:
Password for user vultradmin:
Enter the password for your managed PostgreSQL database cluster and press Enter to proceed.
Output:
defaultdb=>
Issue the following SQL command to create a sample
bank_db
database.defaultdb=> CREATE DATABASE bank_db;
Output:
CREATE DATABASE
Connect to the new
bank_db
database.defaultdb=> \c bank_db;
Output:
... You are now connected to database "bank_db" as user "vultradmin".
Create a sample
customers
table. This table stores thecustomer_ids
,first_names
, andlast_names
. TheSERIAL
keyword instructs the PostgreSQL server to generate newcustomer_ids
automatically.bank_db=> CREATE TABLE customers ( customer_id SERIAL PRIMARY KEY, first_name VARCHAR(50), last_name VARCHAR(50) );
Output:
CREATE TABLE
Create a
loans
table. This table stores loan account balances held by customers. Thecustomer_id
column in this table links back to the same column in thecustomers
table.bank_db=> CREATE TABLE loans ( loan_id SERIAL PRIMARY KEY, customer_id BIGINT, amount DECIMAL(17, 4) );
Output:
CREATE TABLE
Log out from the managed PostgreSQL database cluster.
bank_db=> \q
Follow the next step to create a database class to access your sample PostgreSQL database.
2. Create a Custom PostgreSQL Database Class
With your sample database in place, you now require a central class that connects to the database to store data in the tables. Follow the steps below to create the class:
Begin by creating a new
project
directory to separate your source code from system files.$ mkdir project
Navigate to the new
project
directory.$ cd project
Open a new
postgresql_db.py
file on a text editor.$ nano postgresql_db.py
Enter the following information into the
postgresql_db.py
file. Remember to replace the database credentials (db_host
,db_user
,db_pass
, anddb_port
) with the correct values for your PostgreSQL database cluster.import psycopg2 class PostgresqlDb: def __init__(self): db_host = 'SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com' db_name = 'bank_db' db_user = 'vultradmin' db_pass = 'EXAMPLE_POSTGRESQL_PASSWORD' db_port = 16751 self.db_conn = psycopg2.connect(host = db_host, database = db_name, user = db_user, password = db_pass, port = db_port) def execute_db(self, json_payload): try: print("Starting new database transaction...") self.db_conn.autocommit = False self.cur = self.db_conn.cursor() print("Inserting new customer to database...") sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id' self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name'])) customer_id = self.cur.fetchone()[0] print("Customer successfully inserted to database, new customer_id is " + str(customer_id)) print("Inserting customer's loan record...") sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id' self.cur.execute(sql_string, (customer_id, json_payload['loan_amount'])) loan_id = self.cur.fetchone()[0] print("Customer loan record inserted successfully, new loan_id is " + str(loan_id)) self.db_conn.commit() print("Database transaction completed successfully.") return "Success" except (Exception, psycopg2.DatabaseError) as error: print("Database transaction failed, rolling back database changes...") self.db_conn.rollback() return str(error) finally: if self.db_conn: self.cur.close() self.db_conn.close() print("Database connection closed successfully.")
Save and close the
postgresql_db.py
file.
The postgresql_db.py
file explained:
The
import psycopg2
statement loads thepsycopg2
adapter that connects your PostgreSQL database cluster from the Python code.The
postgresql_db.py
file contains onePostgresqlDb
class with two methods.import psycopg2 class PostgresqlDb: def __init__(self): ... def execute_db(self, json_payload): ...
The
__init__(...)
method is a constructor that fires every time you create a new object from thePostgresqlDb
class.The
execute_db(self, json_payload)
method takes a JSON payload from an HTTPPOST
method containing the customer's names and the loan balance and forwards the requests to the PostgreSQL database.Under the
execute_db(...)
method, you're setting the PostgreSQLautocommit
argument toFalse
. This directive allows you to use thecommit()
command to permanently commit successful transactions or therollback()
command to prevent partial transactions.... try: print("Starting new database transaction...") self.db_conn.autocommit = False self.cur = self.db_conn.cursor() ...
The following code block only fires when there are no errors in the database transaction. Under the transaction, the application creates a new record in the
customers
table and another record in aloans
table.... print("Inserting new customer to database...") sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id' self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name'])) customer_id = self.cur.fetchone()[0] print("Customer successfully inserted to database, new customer_id is " + str(customer_id)) print("Inserting customer's loan record...") sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id' self.cur.execute(sql_string, (customer_id, json_payload['loan_amount'])) loan_id = self.cur.fetchone()[0] print("Customer loan record inserted successfully, new loan_id is " + str(loan_id)) self.db_conn.commit() print("Database transaction completed successfully.") return "Success" ...
The
except(...)
block fires when a transaction fails with an exception. Then, thefinally
block executes in every case to close the cursor and the database connection.... except (Exception, psycopg2.DatabaseError) as error: print("Database transaction failed, rolling back database changes...") self.db_conn.rollback() return str(error) finally: if self.db_conn: self.cur.close() self.db_conn.close() print("Database connection closed successfully.")
The PostgresqlDb
class is now ready. Use the following syntax to include it in other Python source code files.
import postgresql_db
pg = postgresql_db.PostgresqlDb()
resp = pg.execute_db(...)
Follow the next step to create the main.py
file for your Python application.
3. Create the Application's Entry Point
To complete this sample application, you need an HTTP server that accepts incoming POST
requests on port 8080
. Python has some inbuilt libraries that you can use to carry out the task. Follow the steps below to create the HTTP server:
Open a new
main.py
file on a text editor.$ nano main.py
Enter the following information into the
main.py
file.import http.server from http import HTTPStatus import socketserver import json import postgresql_db class httpHandler(http.server.SimpleHTTPRequestHandler): def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) json_payload = json.loads(post_data) self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() pg = postgresql_db.PostgresqlDb() resp = pg.execute_db(json_payload) self.wfile.write(bytes( resp + '\r\n', "utf8")) httpServer = socketserver.TCPServer(('', 8080), httpHandler) print("Web server started at port 8080") try: httpServer.serve_forever() except KeyboardInterrupt: httpServer.server_close() print("The HTTP server is stopped.")
Save and close the
main.py
file.
The main.py
file explained:
The
import
section loads all the Python libraries required by the sample application. Thehttp.server
,HTTPStatus
, andsocketserver
libraries load HTTP functionalities. Thejson
module allows you to work with JSON data while thepostgresql_db
loads your custom PostgreSQL database class.import http.server from http import HTTPStatus import socketserver import json import postgresql_db ...
The
httpHandler
is a handler class for the HTTP server. This class accepts a JSON payload from HTTP clients. Then under this class, thepg = postgresql_db.PostgresqlDb()
andpg.execute_db(json_payload)
statements call your customPostgresqlDb
class to save data to the database and return a response using theself.wfile.write(bytes( resp + '\r\n', "utf8"))
statement.... class httpHandler(http.server.SimpleHTTPRequestHandler): def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) json_payload = json.loads(post_data) self.send_response(HTTPStatus.OK) self.send_header('Content-type', 'application/json') self.end_headers() pg = postgresql_db.PostgresqlDb() resp = pg.execute_db(json_payload) self.wfile.write(bytes( resp + '\r\n', "utf8")) ...
The following declarations at the end of the file create a web server that listens for HTTP requests and dispatches the request to the
httpHandler
class.... httpServer = socketserver.TCPServer(('', 8080), httpHandler) print("Web server started at port 8080") try: httpServer.serve_forever() except KeyboardInterrupt: httpServer.server_close() print("The HTTP server is stopped.")
You now have all the necessary source code files required by your application. Proceed to the next step to test the application.
4. Test the Application
After coding all the Python files, the final step is installing the Python pip
package, downloading the psycopg2
library, and testing the application. Follow the steps below to complete the application:
Install the Python
pip
package.$ sudo apt install -y python3-pip
Use the
pip
package to install thepsycopg2-binary
library for the PostgreSQL server.$ pip install psycopg2-binary
Output:
... Installing collected packages: psycopg2-binary Successfully installed psycopg2-binary-2.9.5
Use the
python3
command to run the application.$ python3 main.py
Output:
Web server started at port 8080
Establish another
SSH
connection to your server and run the following Linuxcurl
command to send a sample JSON payload to the application.$ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "4560"}'
Output:
"Success"
Check out the output below from the first terminal window where the web server is running. The transaction succeeds without any errors.
Web server started at port 8080 Starting new database transaction... Inserting new customer to database... Customer successfully inserted to database, new customer_id is 1 Inserting customer's loan record... Customer loan record inserted successfully, new loan_id is 1 Database transaction completed successfully. Database connection closed successfully.
Try sending the following invalid transaction with the wrong loan amount. That is
PP
instead of a numeric value.$ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "PP"}'
Output:
"invalid input syntax for type numeric: \"PP\"..."
Examine the output from the first terminal window. This time, the transaction fails without making any changes to the database. Although the application inserts the customer's details into the database and obtains a new
customer_id
(2
), the entire transaction rolls back per the following output... Starting new database transaction... Inserting new customer to database... Customer successfully inserted to database, new customer_id is 2 Inserting customer's loan record... Database transaction failed, rolling back database changes... Database connection closed successfully.
To verify the changes, log in to your PostgreSQL database cluster.
$ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb
Output:
Password for user vultradmin:
Enter your password and press Enter to proceed.
Output:
defaultdb=>
Switch to the
bank_db
database.defaultdb=> \c bank_db;
Output:
You are now connected to database "bank_db" as user "vultradmin".
Query the
customers
table.defaultdb=> SELECT customer_id, first_name, last_name FROM customers;
Output:
customer_id | first_name | last_name -------------+------------+----------- 1 | JOHN | DOE (1 row)
Query the
loans
table.defaultdb=> SELECT loan_id, customer_id, amount FROM loans;
Output:
loan_id | customer_id | amount ---------+-------------+----------- 1 | 1 | 4560.0000 (1 row)
The above outputs confirm that the application's logic is working as expected. Without the PostgreSQL transactions logic, you should now have an orphaned customer record without a matching loan record.
Conclusion
This guide shows you how to implement PostgreSQL database transactions with Python on Ubuntu 20.04 server. Use the source code in this guide to create applications that treat the unit of database work as a whole. Transactions ensure database consistency and prevent possible cases of orphaned records.
Check out the links below to learn more about the managed PostgreSQL database cluster: