In this blog post, I’ll go over how to implement the merge operation in Python Polars. I explain what upsert and merge operations are in the first section since they might be a bit confusing to differentiate the two. So, if you want to learn about the actual implementation, skip to the second section of this post.
What are Upsert and Merge Operations?
They’re similar. “Upsert” is a concept referring to the combined operation of insertion and updating (e.g. insert + update). “Merge” is a concept referring to combining two datasets into one (e.g. insert + update + deletion). I’d say the merge operation is basically a superset of the upsert operation. Merge is more about synchronizing tables and provides you ways to make changes to the target table based on conditions.
I’ll go over how to implement the merge operation in this post as it covers the usage of upsert.
Merge Into a Delta Lake Table in Python Polars
Python Polars utilizes the deltalake package for the merge operation. The Polars documentation already has a good example, but here’s another one that includes the delete operation on top of the upsert operation.
First, let’s create a dataframe for demonstration.
import polars as pl
df = pl.DataFrame(
{
'key': [1, 2, 3],
'letter': ['a', 'b', 'c'],
'value': [100, 200, 300]
}
)
print(df)
'''
shape: (3, 3)
┌─────┬────────┬───────┐
│ key ┆ letter ┆ value │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ i64 │
╞═════╪════════╪═══════╡
│ 1 ┆ a ┆ 100 │
│ 2 ┆ b ┆ 200 │
│ 3 ┆ c ┆ 300 │
└─────┴────────┴───────┘
'''
And we write it to a delta lake table in a local machine. This delta lake table is the target table we want to merge our changes into.
output_table_path = './my_delta_lake_table'
df.write_delta(output_table_path, mode='overwrite')
print('The target Delta Lake table output:\n', pl.read_delta(output_table_path))
'''
The target Delta Lake table output:
shape: (3, 3)
┌─────┬────────┬───────┐
│ key ┆ letter ┆ value │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ i64 │
╞═════╪════════╪═══════╡
│ 1 ┆ a ┆ 100 │
│ 2 ┆ b ┆ 200 │
│ 3 ┆ c ┆ 300 │
└─────┴────────┴───────┘
'''
Next, we make our changes to the source dataframe. The changes I made are insertion, update, deletion:
- Updated the letter value of “c” to “d”
- Deleted the row whose key is 1
- Added a new row
df_col_updated_and_row_deleted = (
df
.with_columns(
pl.when(pl.col('letter')=='c') # update a column
.then(pl.lit('d'))
.otherwise(pl.col('letter'))
.alias('letter')
)
.filter(pl.col('key') != 1) # delete a row
)
df_with_changes = (
pl.concat(
[
df_col_updated_and_row_deleted,
pl.DataFrame({'key': 4, 'letter': 'd', 'value': 400}) # a new row
],
how='vertical'
)
)
print(df_with_changes)
'''
shape: (3, 3)
┌─────┬────────┬───────┐
│ key ┆ letter ┆ value │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ i64 │
╞═════╪════════╪═══════╡
│ 2 ┆ b ┆ 200 │
│ 3 ┆ d ┆ 300 │
│ 4 ┆ d ┆ 400 │
└─────┴────────┴───────┘
'''
Finally, we apply a merge operation into the target delta lake table. Polars utilizes TableMerger methods for implement various merge commends.
(
df_with_changes
.write_delta(
output_table_path,
mode='merge',
delta_merge_options={
'predicate': 'source.key = target.key',
'source_alias': 'source',
'target_alias': 'target',
},
)
.when_matched_update_all()
.when_not_matched_insert_all()
.when_not_matched_by_source_delete()
.execute()
)
'''
{'num_source_rows': 3,
'num_target_rows_inserted': 1,
'num_target_rows_updated': 2,
'num_target_rows_deleted': 1,
'num_target_rows_copied': 0,
'num_output_rows': 3,
'num_target_files_added': 2,
'num_target_files_removed': 1,
'execution_time_ms': 24,
'scan_time_ms': 0,
'rewrite_time_ms': 2}
'''
Print out the table to check the merge operation was done as expected. I expect the resulting delta lake table contains all the changes I made.
print('Delta lake table output:\n', pl.read_delta(output_table_path))
'''
Delta lake table output:
shape: (3, 3)
┌─────┬────────┬───────┐
│ key ┆ letter ┆ value │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ i64 │
╞═════╪════════╪═══════╡
│ 4 ┆ d ┆ 400 │
│ 3 ┆ d ┆ 300 │
│ 2 ┆ b ┆ 200 │
└─────┴────────┴───────┘
'''
As you can see, all the insert, update, and deletion were successful.
See the code in my GitHub repo.
Limitation
One limitation I’m aware of in implementing upsert or merge in Python Polars is that schema revolution is NOT supported as the deltalake package has not implemented it yet as time of this writing.