Develop database driven applications in Python with Peewee

Python PeeWee ORM

It is not uncommon that most of the applications these days are interacting with database these days. Specially with RDBMS based engines( DB engines that support SQL). Like any other languages Python also provides native and 3rd party libraries to interact with database. Normally you have to write SQL queries for CRUD operations. That’s OK but at times it happens that things get messy:

  • The Big Boss decided to move from MySQL to.. MSSQL and you have no choice but to nod at him and making changes in your queries compatible with other Db Engine.
  • You gotta make multiple queries just to retrieve a single piece of info from the other table.
  • umm.. many others, might be?

To deal such kind of mess ORM comes to rescue.

Enter ORM

ORM is an acronym for Object Relational Mapping. Alright, but what does an ORM do?

From Wikipedia:

Object-relational mapping (ORM, O/RM, and O/R mapping tool) in computer science is a programming technique for converting data between incompatible type systems in object-oriented programming languages. This creates, in effect, a “virtual object database” that can be used from within the programming language.

Sounds awesome, right?

What is Peewee?

Peewee is a small and expressive ORM that currently supports postgresql, mysql and sqlite. It is not the first and only ORM for Python developers. Django offers it’s own ORM library and then there’s SqlAlchemy. The best thing about Peewee that it has small footprint and learning curve and within a few minutes you can start using it in your applications.

OK enough praises, let’s get into the code.

Getting Started

Like many other Python libraries you can install Peewee as well by using pip:

pip install peewee

Setting up Database

As I told that Peewee supports multiple database engines, for this tutorial I am using MySQL.

user = 'root'
password = 'root'
db_name = 'peewee_demo'

dbhandle = MySQLDatabase(
    db_name, user=user,
    password=password,
    host='localhost'
)

An object of  MySQLDatabase is created.

Creating Models

I am now going to create models. For this post I am using two tables or models; Category and Product. A category can have multiple products.

class BaseModel(Model):
    class Meta:
        database = dbhandle


class Category(BaseModel):
    id = PrimaryKeyField(null=False)
    name = CharField(max_length=100)

    created_at = DateTimeField(default=datetime.datetime.now())
    updated_at = DateTimeField(default=datetime.datetime.now())

    class Meta:
        db_table = "categories"
        order_by = ('created_at',)

First I created a BaseModel. The reason to do is that Peewee asks you to pass dbhandle in each Model class. To avoid this redundancy I just created a base class and extended it.

Alright our BaseModel and Category is created. Category model consists of four fields:

  1. id which is an auto increment field.
  2. name holds the name of the category.
  3. updated_at and created_at are timestamps fields that have been default with current time.

In Meta class I am passing table name in db_table property. It is not necessary if table and model name are same. The order_by property dictates which column should be used to sort the data while retrieving. You can overwrite it by passing your own sort by field.

Before I move further, I am creating another file, called operations.py in which I will be utilizing these models.

import peewee
from models import *

if __name__ == '__main__':
    try:
        dbhandle.connect()
        Category.create_table()
    except peewee.InternalError as px:
        print(str(px))

After required imports I am connecting with the database. The exception peewee.OperationalError takes care of all exceptions related to Peewee. For instance, if you give wrong credentials it will give something like:

(1045, "Access denied for user 'root1'@'localhost' (using password: YES)")

Then we called Category.create_table() which creates the table with the properties defined above. If you pass safe=True as a parameter, it will silently overwrite the existing table. Something which is quite dangerous in real world scenario.

Next up, Product model:

class Product(BaseModel):
    id = PrimaryKeyField(null=False)
    name = CharField(max_length=100)
    price = FloatField(default=None)
    category = ForeignKeyField(Category, related_name='fk_cat_prod', to_field='id', on_delete='cascade',
                               on_update='cascade')
    created_at = DateTimeField(default=datetime.datetime.now())
    updated_at = DateTimeField(default=datetime.datetime.now())

    class Meta:
        db_table = "products"
        order_by = ('created_at',)

It is similar to Cateogory. The difference is ForeignKeyField which is telling how Product should be linked up with Category. Updating the main will now look like this:

if __name__ == '__main__':
    try:
        dbhandle.connect()
        Category.create_table()
    except peewee.InternalError as px:
        print(str(px))
    try:
        Product.create_table()
    except peewee.InternalError as px:
        print(str(px))

On running this it would create the products table along with creation relationship with categories table. See the screenshot of my sql client:

Inserting a record

Now we are going to add a category first and then products. Since we have models ready, it is not simply to add/update records.

def add_category(name):
    row = Category(
        name=name.lower().strip(),
    )
    row.save()

I added a function called add_category() with a parameter, name in it. A Catregory object is created and the table fields which are actually properties of this class object is passed. In our case it’s name field.

The row.save() actually storing the object information into db.

Kewl, no? No more writing ugly looking INSERTs.

Now let’s add a product.

def add_product(name, price, category_name):
    cat_exist = True
    try:
        category = Category.select().where(Category.name == category_name.strip()).get()
    except DoesNotExist as de:
        cat_exist = False

    if cat_exist:
        row = Product(
            name=name.lower().strip(),
            price=price,
            category=category
        )
        row.save()

add_product takes name,price and category_id as input. First I am checking whether the category exist, if yes then it’s object being stored. In ORMs you deal with object, this is why you are passing category info as an object since we already defined the relation above.

In the main I am then calling it:

add_category('Books')
add_category('Electronic Appliances')

Two categories added.

Now let’s add products:

# Adding Products
 add_product('C++ Premier', 24.5, 'books')
 add_product('Juicer', 224.25, 'Electronic Appliances')

I am passing category name, once it’s object found then assigned to Product class object. If you are going to SQL Way, you will run a SELECT first to fetch category_id and then assign the id to the product being inserted. Since ORM means dealing with objects, we store objects instead of scalar values. One might consider it over-engineering, in our scenario, yes but think of use case when you have no idea which DB gonna be used in future. Your code is DB agonistic so either it’s running MySQL or MSSQL or even MongoDb (hypothetically), it will work.

Selecting multiple records

First select categories:

def find_all_categories():
    return Category.select()

Category.select() returns ALL the record here which will be sorted by created_at as we defined in the Meta class.

Now fetch all products:

# Retrieve Products
        products = find_all_products()
        product_data = []
        for product in products:
            product_data.append({'title': product.name, 'price': product.price, 'category': product.category.name})

        print(product_data)

Here I am iterating products and pushing the record in product_data which is of type List. Notice how product’s category being access. No more SELECTs for fetching category Id and then finding the name of it. A single loop covered all. On running it will display product info as:

[{'price': 24.5, 'title': 'c++ premier', 'category': 'books'}, {'price': 224.25, 'title': 'juicer', 'category': 'electronic appliances'}]

Selecting Single Record

For selecting a single record you will use get method:

def find_product(name):
    return Product.get(Product.name == name.lower().strip())

Now this is called as:

# Find single Product
  p = find_product('c++ premier')
  print(p.category.name)

Product Name is passed, if record exists it will return the Product instance. Here I am printing category associated with that product.

Updating Records

Updating records is as simple as insertion. You get the instance of the object and then update it.

def update_category(id, new_name):
    category = Category.get(Category.id == id)
    category.name = new_name
    category.save()

And then call it as:

# Update Single Category
  update_category(2, 'Kindle Books')

Deleting Records

Deleting a record is not different than updating one:

def delete_category(name):
    category = Category.get(Category.name == name.lower().strip())
    category.delete_instance()

And then called as:

# Delete Category
  delete_category('Kindle Books')

Notice when you delete it, it will also delete the associated products since products is connected with category and we have already defined ON_DELETE constraint.

Conclusion

Alright so you learnt the basics of Peewee and how you can use this small and efficient ORM in your next Python based scripts. To learn more about it visit official documentation.

Like always, the code is available on Github.

 

If you like this post then you should subscribe to my blog for future updates.

* indicates required