Parallelize a wide df in Pandas

I was going to make a pretty picture.

Sometimes you end up with a very wide pandas dataframe and you are interested in doing the same types of operations (data processing, building a model etc.) but focused on subsets of the columns.

For example if we had a wide df with different time series kpi’s represented as columns then we might want to do something like look at each kpi at a time, apply some pre-processing and build something like an ARIMA time series model perhaps.

This is the situation i found myself in recently and it took me best part of an afternoon to figure out. Usually when i find myself in that situation i try and squeeze out a blog post in case might be useful for someone else or future me.

All the code in one glorious screenshot!

Note: repository with all code is here. p.s. thanks to this and this post that i built off of.

For this example i’m afraid i’m going to use the Iris dataset :0 . This example is as minimal and easy as i could throw together, basically the aim of the code is to:

  1. Build some function to take in a df, do some processing and spit out a new df.
  2. Have that function be parameterized in some way as might be needed (e.g if you wanted to do slightly different work for one subset of columns).
  3. Apply that function in parallel across the different subsets of your df that you want to process.

There are two main functions of interest here parallelize_dataframe() and do_work() both of which live in their own file called my_functions.py which can be imported into your jupyter notebook.

from multiprocessing import Pool
from functools import partial
import numpy as np
import pandas as pd
def parallelize_dataframe(df, func, n_pool=4, col_subsets=None, join_how='outer',**kwargs):
'''
Function to take a df, a function with args, and a list of column subsets to apply function to.
Resulting list of df's are then joined back together based on the df index.
'''
# create empty list of df's
df_list = []
# if col subsets not defined then just use all cols
if col_subsets == None:
col_subsets = [[col for col in df.columns]]
# loop over each subset of cols, make a subset df, and append it to df_list
for col_subset in col_subsets:
df_list.append(df[col_subset])
# define pool params
pool = Pool(n_pool)
# wrap the func in a partial and pass in the kwargs
p_func = partial(func, **kwargs)
# apply func via map to each of the df's in df_list
map_return_list = pool.map(p_func, df_list)
# join back all the resulting df's into one df based on joining back together based on index
for i in range(len(map_return_list)):
if i == 0:
df_out = map_return_list[i]
else:
df_out = df_out.merge(map_return_list[i],join_how,left_index=True,right_index=True)
# multiprocessing clean up
pool.close()
pool.join()
return df_out
def do_work(df,kwarg_dict=None):
''' Function (which we want to parallelize across different cols of the df) to do some operations on a df.
'''
# pass any args in as a dict to be parsed out as needed (not used in this example)
if kwarg_dict:
print(kwarg_dict)
# for every col in the subseted df we want to do some operations
for col in df._get_numeric_data().columns:
df[f'{col}_squared'] = df[col]**2
df[f'{col}_sqrt'] = np.sqrt(df[col])
return df

parallelize_dataframe() does the below things:

  1. Break out df into a list of df’s based on the col_subsets list passed in as a parameter.
  2. Wrap the function that was passed in into a partial along with the kwargs (this is how your parameters make it into the do_work() function).
  3. Use map() from multiprocessing to apply the func (along with the args you want to send it) to each subset of columns from your df in parallel.
  4. Reduce all this back into on final df by joining all the resulting df’s from the map() output into one wide df again (note the assumption here of joining back on the df indexes – they need to be stable and meaningful).

The do_work() function in this example is just a simple function to add some new columns as examples of types of pandas (or any other) goodness you might want to do. In reality in my case it would be more like a apply_model() type function that would take each subset of columns, do some feature extraction, train a model and then also score the data as needed to.

Having the ability to do this for multiple subsets of columns in your wide df can really free up your time to focus on the more important things like dickying around with model parameters and different pre-processing steps 🙂

That’s pretty much it, a productive afternoon (in the play center with kids i might add) and am quite pleased with myself.

Update: One addition i made to this as things got more complicated when i went to implement it was the ability to apply different function params to each subset df. For example if you wanted to pass in different parameters to the function for different columns. In the do_parallel_zip.ipynb and corresponding my_functions_zip.py (i’m calling them “_zip” as they use zip() to “zip” up both the df_list and the corresponding kwargs to go with it to be unpacked later by do_work_zip()).

To be concrete, if we wanted to multiply the “sepal_…” cols by 100 and the “petal_..” cols by 0.5. We could use the “zip” approach like below (notebook here):

Which is using the “zip” approach in parallelize_dataframe_zip()

Where the zipped iterable is then unpacked as needed by the do_work_zip() function:

( 0 – 0 ) / 0 != 0

Arrrgghh – I just wasted the best part of an afternoon chasing this one down. If i can knock out a quick post on it then at least i’ll feel i’ve gotten something out of it.

Here’s the story – somewhere in an admittedly crazy ETL type pipeline i was using pandas pct_change() as a data transformation prior to some downstream modelling. Problem was i was also using dropna() later to ensure only valid data going into the model. I noticed a lot of rows falling out of the data pipeline (print(df.shape) is your friend!).

After a lot of debugging, chasing this dead end down a rabbit hole, cleaning and updating my conda environment (and then dealing with the tornado issues that always seems to cause) i realized the answer was staring me in the face the whole time.

Its the 0’s!

Turns out my data has lots of zero’s and i’m an idiot who assumed (0-0) / 0 = 0 and so the NaN’s above should be 0.

But am i really an idiot? I checked…

I guess so.

So let that be a lesson – watch out for 0’s when using pct_change() – don’t be like andrewm4894.

p.s. I could not just fillna() them as i had valid NaN’s at the top of my dataframe from the lagged values. So i ended up in a state where some NaN’s were valid and some i wanted to be 0 instead. Am guessing i’ll have to write a custom pct_change() type function (which seems a bit crazy). Would be great if was a way to tell pandas pct_change() i want (0-0)/0=0. Maybe if i get brave enough i’ll make a pull request 🙂

p.p.s. Seems like i’m not the only one: https://brilliant.org/wiki/what-is-0-0/ (though still wrong)

hmmm – maybe i’ve just wasted more time on this post. Think i need to pack it in for today.

Update

Everything is always clearer in the morning, here is a one liner that pretty much behaves as i wanted. I can get back to my life now.