Parallelize a wide df in Pandas

I was going to make a pretty picture.

Sometimes you end up with a very wide pandas dataframe and you are interested in doing the same types of operations (data processing, building a model etc.) but focused on subsets of the columns.

For example if we had a wide df with different time series kpi’s represented as columns then we might want to do something like look at each kpi at a time, apply some pre-processing and build something like an ARIMA time series model perhaps.

This is the situation i found myself in recently and it took me best part of an afternoon to figure out. Usually when i find myself in that situation i try and squeeze out a blog post in case might be useful for someone else or future me.

All the code in one glorious screenshot!

Note: repository with all code is here. p.s. thanks to this and this post that i built off of.

For this example i’m afraid i’m going to use the Iris dataset :0 . This example is as minimal and easy as i could throw together, basically the aim of the code is to:

  1. Build some function to take in a df, do some processing and spit out a new df.
  2. Have that function be parameterized in some way as might be needed (e.g if you wanted to do slightly different work for one subset of columns).
  3. Apply that function in parallel across the different subsets of your df that you want to process.

There are two main functions of interest here parallelize_dataframe() and do_work() both of which live in their own file called my_functions.py which can be imported into your jupyter notebook.

from multiprocessing import Pool
from functools import partial
import numpy as np
import pandas as pd
def parallelize_dataframe(df, func, n_pool=4, col_subsets=None, join_how='outer',**kwargs):
'''
Function to take a df, a function with args, and a list of column subsets to apply function to.
Resulting list of df's are then joined back together based on the df index.
'''
# create empty list of df's
df_list = []
# if col subsets not defined then just use all cols
if col_subsets == None:
col_subsets = [[col for col in df.columns]]
# loop over each subset of cols, make a subset df, and append it to df_list
for col_subset in col_subsets:
df_list.append(df[col_subset])
# define pool params
pool = Pool(n_pool)
# wrap the func in a partial and pass in the kwargs
p_func = partial(func, **kwargs)
# apply func via map to each of the df's in df_list
map_return_list = pool.map(p_func, df_list)
# join back all the resulting df's into one df based on joining back together based on index
for i in range(len(map_return_list)):
if i == 0:
df_out = map_return_list[i]
else:
df_out = df_out.merge(map_return_list[i],join_how,left_index=True,right_index=True)
# multiprocessing clean up
pool.close()
pool.join()
return df_out
def do_work(df,kwarg_dict=None):
''' Function (which we want to parallelize across different cols of the df) to do some operations on a df.
'''
# pass any args in as a dict to be parsed out as needed (not used in this example)
if kwarg_dict:
print(kwarg_dict)
# for every col in the subseted df we want to do some operations
for col in df._get_numeric_data().columns:
df[f'{col}_squared'] = df[col]**2
df[f'{col}_sqrt'] = np.sqrt(df[col])
return df

parallelize_dataframe() does the below things:

  1. Break out df into a list of df’s based on the col_subsets list passed in as a parameter.
  2. Wrap the function that was passed in into a partial along with the kwargs (this is how your parameters make it into the do_work() function).
  3. Use map() from multiprocessing to apply the func (along with the args you want to send it) to each subset of columns from your df in parallel.
  4. Reduce all this back into on final df by joining all the resulting df’s from the map() output into one wide df again (note the assumption here of joining back on the df indexes – they need to be stable and meaningful).

The do_work() function in this example is just a simple function to add some new columns as examples of types of pandas (or any other) goodness you might want to do. In reality in my case it would be more like a apply_model() type function that would take each subset of columns, do some feature extraction, train a model and then also score the data as needed to.

Having the ability to do this for multiple subsets of columns in your wide df can really free up your time to focus on the more important things like dickying around with model parameters and different pre-processing steps 🙂

That’s pretty much it, a productive afternoon (in the play center with kids i might add) and am quite pleased with myself.

Update: One addition i made to this as things got more complicated when i went to implement it was the ability to apply different function params to each subset df. For example if you wanted to pass in different parameters to the function for different columns. In the do_parallel_zip.ipynb and corresponding my_functions_zip.py (i’m calling them “_zip” as they use zip() to “zip” up both the df_list and the corresponding kwargs to go with it to be unpacked later by do_work_zip()).

To be concrete, if we wanted to multiply the “sepal_…” cols by 100 and the “petal_..” cols by 0.5. We could use the “zip” approach like below (notebook here):

Which is using the “zip” approach in parallelize_dataframe_zip()

Where the zipped iterable is then unpacked as needed by the do_work_zip() function:

Parallel Jupyter Notebooks

I have become master of the notebooks, they bend at my will and exist to serve my data science needs!

Ok i might be getting a bit carried away, but i recently discovered papermill and have been finding it very useful in conjunction with Python multiprocessing to speed up a lot of data science experimental type work. So useful in fact, i was motivated to write a post on a Saturday night!

Note: All the code for this post is here.

One problem with notebooks

I’m generally (have swayed back and forth) a fan of notebooks but am wary of some of the downsides or costs they can impose. When doing experimental type work, if your not careful, you can end up with lots of duplicated code or what i think of as “notebook instances”, where you have ran your notebook many times on different (but similar) datasets and with different (but similar) parameters.

Aside: Great talk and deck from @joelgrus (who is great – and who’s meme game is very strong) on some drawbacks of notebooks.

Having the executed notebooks themselves become self documenting artifacts relating to the experiment is really useful – the code you ran and its outputs in one place. But when you start building new features on top of these “notebook instances” as you iterate on the research, things can quickly get messy.

Where I’ve found papermill to be very useful is in basically template-ing up your notebooks in one single place and paramaterizing them such that the actual living notebook code and the executed “notebook instances” have a much cleaner separation.

I’ll try make this clearer with an example.

data_explorer Notebook

Lets suppose you have a notebook that you often use on new datasets (in reality it’s more likely to be some more complicated ml pipeline type notebook for quickly experimenting on updated datasets with while maintaining some common structure in how you go about things).

In this example its a simple notebook to download a dataset and just do some descriptive stats and plotting.

The main idea here is to paramaterize the whole notebook as much as possible. This is done with a json dictionary called “config”. So the idea is that everything the notebook needs is pretty much defined in the first cell.

config = {        
"data_url" :"https://raw.githubusercontent.com/andrewm4894/papermill_dev/master/data/titanic.csv"
}

In this case, the data_explorer notebook just takes in one parameter called “data_url”. It then downloads from this url into a pandas dataframe and does some basic plotting. In reality this “config” dict can contain all the input parameters you need to define and execute you notebook. For example it could be defining the type of models to build against your data, what data to use, model parameters, where to store outputs etc. anything and everything really.

Enter Papermill

So lets say you now have a number of different datasets that you want to run through your data_explorer notebook. You could manually update the config and then just rerun the notebook 3 times (making sure to restart the kernel and clear all each time), maybe saving outputs into specific locations. Or worse you could make 3 copies of your notebook and just run them each individually (don’t do this, future you will hate it).

Much better is to let papermill kick off the execution of the notebooks so you have a clear separation between the notebooks your code lives in (in this case, the notebooks folder of the repo) and the outputs or “notebook instances” of running the same notebooks multiple times against different data or the same data but with slightly different parameters (in this case the papermill_outputs folder according to a convention you can control).

Two things let us do this, a python script (run_nb_batch.py) that uses papermill and multiprocessing to kick of parallel notebook executions as defined in a json file defining the notebooks to be run and their configs to be run with configs.json.

run_nb_batch.py:

import papermill as pm
import multiprocessing
import os
import argparse
import json
def run_papermill(config):
''' Function to run notebook(s) in paralell using papermill.
'''
# get some variables from the config being run
config = config['config'] # a bit ugly
notebook = config['notebook']
output_label = config["output_label"]
# get name of notebook
notebook_name = notebook.split('/')[1].replace('.ipynb','')
output_dir = f'papermill_outputs/{notebook_name}/{output_label}'
# print config to be run
print("-"*50)
print(config)
print("-"*50)
# make output dir if need to
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_path = f'{output_dir}/{notebook_name}_{output_label}.ipynb'
output_path_backup = output_path.replace('.ipynb','_backup.ipynb')
# rename existing output file if need to
if os.path.exists(output_path):
# remove existing backup file if there is one
if os.path.exists(output_path):
os.remove(output_path_backup)
# rename existing output file
os.rename(output_path,output_path_backup)
# run notebook using papermill
pm.execute_notebook(
notebook,
output_path,
parameters=dict(config=config)
)
# add args
parser = argparse.ArgumentParser(description='Batch run some notebooks.')
parser.add_argument(
'–config_file',
type=str,
default='configs.json',
help='point to the config file you want to use.'
)
parser.add_argument(
'–run_mode',
type=str,
default='parallel',
help="If set to 'parallel', then run using multiprocessing, just sequential for any other value."
)
# parse args
args = parser.parse_args()
config_file = args.config_file
run_mode = args.run_mode
# read in config_file
with open(config_file) as json_file:
configs = json.load(json_file)
if __name__ == '__main__':
# loop over each config
for config in configs:
# pass the config keys in a dict with known name for unpacking by the run_papermill function
config_dict = [{'config':configs[config]}]
if run_mode == 'parallel':
p = multiprocessing.Process(
target=run_papermill,
args=(config_dict)
)
p.start()
else:
run_papermill(config_dict)
view raw run_nb_batch.py hosted with ❤ by GitHub

configs.json

{
"config_bank": {
"notebook": "notebooks/data_explorer.ipynb",
"data_url": "https://raw.githubusercontent.com/andrewm4894/papermill_dev/master/data/bank-full.csv",
"output_label": "bank"
},
"config_adult": {
"notebook": "notebooks/data_explorer.ipynb",
"data_url": "https://raw.githubusercontent.com/andrewm4894/papermill_dev/master/data/adult.csv",
"output_label": "adult"
},
"config_titanic": {
"notebook": "notebooks/data_explorer.ipynb",
"data_url": "https://raw.githubusercontent.com/andrewm4894/papermill_dev/master/data/titanic.csv",
"output_label": "titanic"
}
}
view raw configs.json hosted with ❤ by GitHub

The idea is to loop through each config in the configs.json file and execute the specified notebook with the specified configuration. Executed notebooks then go to a predefined output file such as papermill_outputs/data_explorer/adult/data_explorer_adult.ipynb.

In this case i’ve chosen the naming convention of /papermill_outputs/<notebook_name>/<output_label>/<notebook_name>_<output_label> .ipynb but obviously you can chose whatever you want.

That’s pretty much it for this one. Feel free to clone the repo and play around with it or add improvements as you like.

I’ve been finding that this sort of approach to template-ing up core notebooks you end up using quite a lot (albeit with slightly different params etc.) along with a standardized approach using something like mlflow to further instrument and store artifacts of your notebook runs can make running multiple ‘experiments’ on your data in parallel much easier and overall help make you a bit more productive.

Update: I decided to make a quick video as sometimes easier to just see what we are doing. (Sorry audio quality a bit bad (and poor resolution), first time :))