JulianHysi 31. October 2022
#postgres #Django #Python
Bulk upsert with Django and Postgres
This short article covers what upserting is and how it works with Python, Django, and Postgres.
The problem
There are many cases where we want to create a database record if it doesn't exist, or update it if it exists (in this context, the presence or absence of a record is defined as the presence/absence of an identifier field or key value primary ). To meet this need, the Django ORM offers us aupdate_or_create()
method that you can call in a model's default manager. see theDocumentationfor more details on its use.
While this works fine for a single record, the problem is that it issues two accesses to the database: one to find out whether the record exists or not, and another to insert or to depending on the result of the first query To update. As you can imagine, this duplication of queries doesn't scale when you're working with large batches of datasets. For example, if we wanted to create or update a batch of 1000 records, we would have to access the database 2000 times, which is far from ideal.
We could of course go from 2N operations to N + 2 by usingbulk_create()
eBulk-Update()
not as shownDocumentation🇧🇷 In our previous example with 1000 records, we would have to access the database 1000 times just to find out if the records exist or not and group them into two smaller batches of records to be inserted and other to be updated should be. Then, for each of the smaller batches, we could invoke the respective bulk method, which issues a single query, resulting in a total of 1002 queries issued.
While this may seem like a huge improvement, database instances are mostly I/O-bound operations, especially in our case since the select, insert, and update statements themselves would be fairly simple. This means that neither approach is an acceptable solution, and as N gets larger, both would introduce more or less the same significant bottleneck into our system.
bulk insertion
What if the example above could be reduced to a few queries? This is where bulk upsert comes to the rescue.Upsert, a term derived from the conflation of "insert" and "update", is a SQL technique that allows us to create or update a single-occurrence record in the database. Best of all, you can also run it in bulk, which drastically reduces the number of queries made to the database.
I say a few queries and not exactly one because while you could technically bulk up a single query, the count could go up significantly depending on some implementation details. For example, using a temporary table solution would add an additional query. Automatic handling of timestamp fields like created_at and updated_at (which we'll get to later) would add a plus. If the initial batch size is too large, a split would add one more query per spike. However, the end result would still leave you with significantly fewer queries. Also, you would probably need to split your stack using the normal built-in bulk methods as well. And some of the other steps like creating a temp table or knowing if the model is being tracked is always a single additional query regardless of the batch size (constant running) so you don't have to worry.
However, unlike SQLAlchemy, Django's ORM does not provide us with built-in bulk upsert constructs. For this reason, we'll write a short, simple utility class in Python that does just that. The solution is not Django-centric; With a few small tweaks, it can easily be used with pure Python or any other framework. But since we avoid the ORM entirely, the SQL statements are written and tested againstdialo PL/pgSQL🇧🇷 This means that the syntax may change slightly or may not be supported on another DBMS depending on what you are using.
In order not to overload this blog post with snippets, you can find the complete code in this onebeingsLet me briefly examine how the class is used and what steps execution goes through from a bird's-eye view:
- First you need to create one
BulkUpserter
Instance with the appropriate attributes, which is pretty much self-explanatory. - To pull the trigger, you need to call
my_instance.upsert()
.This method is the entry point and the only one you need to call directly. The rest of the code in the class is dedicated to this method, and we only consider it for explanation purposes. - Within the body of the upsert() method, we first create a temporary table within a context manager. One of the benefits of the temporary table is early data validation, since its definition is copied from the original table we want to write to.
- We then populate the newly created temporary table with records derived from the instance's data attribute, which contains a list of dicts representing records.
- Finally, the data is written to the main table using the ON CONFLICT clause to get the upsert.
other considerations
First of all, this is a basic solution to the problem. It should work fine for simple use cases, but you may need to tweak and tweak it for your own situation. The goal is to provide a basic structure that you can build on. The following is a list of concerns or potential improvements to consider before using the code.
Invalid entry🇧🇷 Note that if we have two datasets with the same value for in the raw data, the update will failon_conflict_col
, which means nothing is written to the database. This applies regardless of whether this value already exists in the database. You would have to discard the duplicates before trying to paste them. There are situations where such an operation makes sense and you really don't lose much by discarding duplicate records. Consider the implications of your case. also theon_conflict_col
must represent a field with a UNIQUE constraint, otherwise the upserting will also fail automatically. Which makes sense when you think about it since this field is used to resolve conflicts and identify records.
references🇧🇷 In terms of performance, I was able to insert 3000 records in less than a second on my local machine. And although this isn't a real benchmark as it depends on many factors specific to my computer's hardware (CPU clock speed and number of cores, RAM frequency and latency, SSD query speed), software versions (Linux, Python, Django , postgres, psycopg2 , etc.), database states (indexes, query plan cache) and other things I can forget, it's usually pretty fast. It is recommended to run your own benchmarks to get a better picture.
fragmentation🇧🇷 You probably also want to add some chunking logic to your code when working with big data. As mentioned above, there doesn't seem to be any trouble with thousands of raw data, and there isn't a significant spike in memory or processor usage on my local machine. However, as the number of records increases, running them all at the same time is not the best idea. The current structure only hits the database four times, so don't feel guilty about taking a few more trips, especially considering what you're getting compared to the traditional approach. However, if you can't resist running batches in parallel, keep in mind the class of problems that parallel computing introduces.
Fields are updated automatically.Database tables, which are mostly not includedcreated in
eupdated
timestamp fields. While we usually don't need to think about it at the ORM level (models would likely inherit from an abstract TrackedModel model or something similar), when writing plain SQL we need to explicitly fill in these fields as default constraints are not propagated to a database level (Postgres cannot invoke callable use python files). To avoid having to write a custom migration that sets these database defaults for each model crawled with BulkUpserter, this has been addressed in the code. We first understand whether the model is being tracked or not, and then fill in "created_at" or "updated_at" depending on the action applied to the record. The process works as expected even if the model is not tracked. If for some reason you don't want this, removing this piece of code should be fairly easy and leave you with a much cleaner class.
Comment