Time series clustering with tslearn

I’ve recently been playing around with some time series clustering tasks and came across the tslearn library. I was interested in seeing how easy it would be to get up and running some of the clustering functionality that is already built into tslearn, turns out it was quite easy and straight forward, perfect blog post fodder 🙂

tl;dr here is a Google Colab notebook you can just copy, run and play with for yourself.

First lets import the libraries we will need:

import pandas as pd
import numpy as np
from tslearn.clustering import TimeSeriesKMeans, KShape, KernelKMeans
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from netdata_pandas.data import get_data, get_chart_list
from am4894plots.plots import plot_lines, plot_lines_grid

netdata_pandas is a helper library i created to pull some time series data from servers running Netdata (where i work) into a pandas dataframe. In this example we will use a demo server, http://london.my-netdata.io/, so as to have somewhat realistic (messy) time series data.

am4894plots is another library i made for myself that i add common plotting functionality i find myself returning to time and time again (because i never really ‘learned’ matplotlib and at this stage refuse to!).

Lets define our inputs, basically anything that is something we can play with and change is worth adding as an input at top of the notebook:

# inputs
host = 'london.my-netdata.io' # host running netdata that we want to pull data from
n = 60*5 # how many seconds of most recent data to pull
n_charts = None # If None then pull data for all charts otherwise sample n_charts randomly
n_clusters = 50 # number of clusters to fit
diff = False # take diffs of the data or not
preprocessing_meanvar = False # True to use TimeSeriesScalerMeanVariance preprocessing
smooth_n = 15 # n observations to smooth over
smooth_func = 'mean' # one of ['mean','min','max','sum']
norm = True # normalize the data to 0-1 range
model = 'kmeans' # one of ['kmeans','kshape','kernelkmeans','dtw']

Next we will get our data and do some fairly standard pre-processing:

# get charts
if n_charts:
    charts = np.random.choice(get_chart_list(host), n_charts).tolist()
    charts = get_chart_list(host)
# get data
df = get_data(host, charts, after=-n, before=0)
# remove duplicate columns that we might get from get_data()
df = df.loc[:,~df.columns.duplicated()]
# drop any empty columns (it can happen)
df = df.dropna(axis=1, how='all')
# forward fill and backward fill to try remove any N/A values
df = df.ffill().bfill()
# take differences if specified
if diff:
    df = df.diff()
# do any smoothing as specified
if smooth_n > 0:
    if smooth_func == 'mean':
        df = df.rolling(smooth_n).mean().dropna(how='all')
    elif smooth_func == 'max':
        df = df.rolling(smooth_n).max().dropna(how='all')
    elif smooth_func == 'min':
        df = df.rolling(smooth_n).min().dropna(how='all')
    elif smooth_func == 'sum':
        df = df.rolling(smooth_n).sum().dropna(how='all')
        df = df.rolling(smooth_n).mean().dropna(how='all')
# normalize the data if specified
if norm:
    df = (df-df.min())/(df.max()-df.min())
# drop any empty columns that may remain
df = df.dropna(axis=1, how='all')
# set index to be a datetime for better plotting later
df = df.set_index(pd.to_datetime(df.index, unit='s'))

# look at our data

Now time to build our clustering model using tslearn (there is a few more parameters here we probably should have added as separate inputs but not to worry):

# get values to cluster on
X = df.transpose().values
if preprocessing_meanvar:
    X = TimeSeriesScalerMeanVariance().fit_transform(X)
    df = pd.DataFrame(X.reshape(df.shape), columns=df.columns, index=df.index)
if model == 'kshape':
    model = KShape(n_clusters=n_clusters, max_iter=10, n_init=2).fit(X)
elif model == 'kmeans':
    model = TimeSeriesKMeans(n_clusters=n_clusters, metric="euclidean", max_iter=10, n_init=2).fit(X)
elif model == 'dtw':
    model = TimeSeriesKMeans(n_clusters=n_clusters, metric="dtw", max_iter=5, n_init=2).fit(X)
elif model == 'kernelkmeans':
    model = KernelKMeans(n_clusters=n_clusters, kernel="gak", max_iter=5, n_init=2).fit(X)
    model = TimeSeriesKMeans(n_clusters=n_clusters, metric="euclidean", max_iter=10, n_init=2).fit(X)

Once we have our clusters we can make some helper objects to use later:

# build helper df to map metrics to their cluster labels
df_cluster = pd.DataFrame(list(zip(df.columns, model.labels_)), columns=['metric', 'cluster'])

# make some helper dictionaries and lists
cluster_metrics_dict = df_cluster.groupby(['cluster'])['metric'].apply(lambda x: [x for x in x]).to_dict()
cluster_len_dict = df_cluster['cluster'].value_counts().to_dict()
clusters_dropped = [cluster for cluster in cluster_len_dict if cluster_len_dict[cluster]==1]
clusters_final = [cluster for cluster in cluster_len_dict if cluster_len_dict[cluster]>1]


Finally, the fun part, lets plot each cluster separately and see what we have:

for cluster_number in clusters_final:
    # get a rough quality score based on the correlation between metrics in the cluster
    x_corr = df[cluster_metrics_dict[cluster_number]].corr().abs().values
    x_corr_mean = round(x_corr[np.triu_indices(x_corr.shape[0],1)].mean(),2)
    # plot each cluster
    plot_title = f'cluster {cluster_number} (quality={x_corr_mean}, n={cluster_len_dict[cluster_number]})'
    plot_lines(df, cols=cluster_metrics_dict[cluster_number], renderer='colab', theme=None, title=plot_title)

Here are some good examples:

And some not so good ones:

As is typical with clustering you are always going to get some pretty bad random looking ones, especially since i have really just picked a lot of the parameters above off the top of my head, most importantly k the number of clusters which i set to 50 given the high number of metrics we had (over 700).

All in all, i found the tslearn library very useful as it saved me quite a bit of time to get a quick working prototype up and running so i’m looking forward to also playing with some of the other time series related functionality it offers.

Premature Optimization

I’ve been doing some work that necessitated using the same statistical test from spicy lots of times on a fairly wide pandas dataframe with lots of columns. I spent a bit too much time googling around for the most efficient ways to do this, and even more time re-writing things various way before realizing i should have RTFM a bit more in the first place, yep i’ve gone about a week down a path of premature optimization – but hey, *blog post* 🙂

The Set Up

I have a wide pandas dataframe of lots of time series metrics – one for each column, and i have a ‘focus’ window of time during which i am interested to know what metrics look like the may have changed in some way in reference to a ‘baseline’ window just before the focus window.

A rough first idea (before getting too fancy and building models – not there yet for various reasons) is to break out our old friend the KS Test to basically do a statistical test to see if the each metric the ‘focus’ distribution looks statistically significantly different then the ‘baseline’ distribution. The idea being that those metrics that do look to have ‘changed’ in this sense between the two windows might be worth looking at first.

So a pretty simple set up and application. The tricky part was doing this as quickly as possible on a dataframe with around 500-1000 columns and anywhere between 1000-10000 rows of data as a rough typical usage scenario.

Dumb Approach

So my first approach, as usual, is to do the dumbest thing i can and just get something that works and go from there. So here is my ks_df_dumb() function.

def ks_df_dumb(df, ks_mode):
    Take in a df, loop over each column, split into base and focus, and apply test.
    results = []
    for col in df._get_numeric_data():
        base = df[df['window'] == 'base'][col].values
        focus = df[df['window'] == 'focus'][col].values
        ks_stat, p_value = ks_2samp(base, focus, mode=ks_mode)
        results.append((ks_stat, p_value))
    return results

If i run this on my test dataframe of 500 columns * 1000 rows i see the below timings.

%%timeit -n 5 -r 5
results = ks_df_dumb(df, ks_mode)
# 3.77 s ± 57.4 ms per loop (mean ± std. dev. of 5 runs, 5 loops each)
start_time = time.time()
results = ks_df_dumb(df, ks_mode)
end_time = time.time()
print(f'{round(end_time-start_time,2)} seconds')
# ks_df_dumb
# 3.55 seconds

So about 3-4 seconds which is not great for what i need (it may end up being something a user clicks to trigger and so want to have them wait as little as possible for the results).

Vectorize it?

So now i start messing around with super cool tricks to try and be a hero. I know better than to be looping over stuff in python and pandas, i know i’ll try vectorize it!

def ks_df_vec(df, ks_mode):
    """Take in a df, and use np.vectorize to avoid pandas loop.
    def my_ks_2samp(a,b):
        """Wrapper function to pass args to vectorized function. 
        return ks_2samp(a,b,mode='asymp')
    results = []
    base = df[df['window'] == 'base']._get_numeric_data().transpose().values
    focus = df[df['window'] == 'focus']._get_numeric_data().transpose().values
    ks_2samp_vec = np.vectorize(ks_2samp, signature='(n),(m)->(),()')
    results = ks_2samp_vec(base, focus)
    results = list(zip(results[0], results[1]))
    return results

Now i see:

%%timeit -n 5 -r 5
results = ks_df_vec(df, ks_mode)
# 2.22 s ± 35.5 ms per loop (mean ± std. dev. of 5 runs, 5 loops each)
start_time = time.time()
results = ks_df_vec(df, ks_mode)
end_time = time.time()
print(f'{round(end_time-start_time,2)} seconds')
# ks_df_vec
# 2.16 seconds

So a bit better at just over 2 seconds but still not great given this is still only 1000 rows of data.


Time to break out numpy! (confession: i never really learned numpy properly and find it very painful to work with and reason about the data and shapes etc as i do stuff to them – it all just feels so unnatural to me in some way – and i find it hard to keep track of things without and indexes or keys, i just don’t trust myself with it – i know i’m not supposed to speak this out loud but hey).

So my approach now will be to just get the data into two separate numpy arrays and work solely with them.

def ks_np_dumb(arr_a, arr_b, ks_mode):
    results = []
    for n in range(arr_a.shape[1]):        
        ks_stat, p_value = ks_2samp(arr_a[:,n],arr_b[:,n], mode=ks_mode)
        results.append((ks_stat, p_value))
    return results
%%timeit -n 5 -r 5
results = ks_np_dumb(arr_base, arr_focus, ks_mode)
# 2.43 s ± 200 ms per loop (mean ± std. dev. of 5 runs, 5 loops each)
start_time = time.time()
results = ks_np_dumb(arr_base, arr_focus, ks_mode)
end_time = time.time()
print(f'{round(end_time-start_time,2)} seconds')
# ks_np_dumb
# 2.22 seconds
def ks_np_vec(arr_a, arr_b, ks_mode):
    def my_ks_2samp(a,b):
        return ks_2samp(a,b,mode=ks_mode)
    ks_2samp_vec = np.vectorize(my_ks_2samp, signature='(n),(m)->(),()')
    results = ks_2samp_vec(arr_a.T, arr_b.T)
    results = list(zip(results[0], results[1]))
    return results
%%timeit -n 5 -r 5
results = ks_np_vec(arr_base, arr_focus, ks_mode)
# 2.2 s ± 38.7 ms per loop (mean ± std. dev. of 5 runs, 5 loops each)
start_time = time.time()
results = ks_np_vec(arr_base, arr_focus, ks_mode)
end_time = time.time()
print(f'{round(end_time-start_time,2)} seconds')
# ks_np_vec
# 2.29 seconds

Hmm – that did not seem to add too much, which i guess is kinda reassuring, it makes sense that the dumb numpy approach be a little bit faster then the dumb pandas one, but is comforting in that its not order of magnitudes different.

And makes sense the the numpy dumb and numpy vectorize are not that different as the docs for it state that its really just still a loop (so to properly really vectorize it that means i’d probably have to do much more work to figure it out).

Feck this time for Cython!!!

Hell yeah i’m going to cythonize the shit out of this! Let be honest this is what i’ve wanted to do the whole time, do something with cython so i can boast about it to all my friends and how i got this awesome speedup, even just by adding some typing information.

Lets go.


import numpy as np
cimport numpy as np
cimport cython
from scipy.stats import ks_2samp

DTYPE = np.double

cpdef cy_ks_np(double[:, :] arr_a, double[:, :] arr_b, str ks_mode):

    cdef double k, p
    cdef Py_ssize_t i
    cdef Py_ssize_t m = arr_a.shape[1]
    result = np.zeros((m, 2), dtype=DTYPE)
    cdef double[:, :] result_view = result

    for i in range(m):
        k, p = ks_2samp(arr_a[:,i], arr_b[:,i], mode=ks_mode)
        result_view[i,0] = k
        result_view[i,1] = p

    return result

Ahhh look at it, very pleased with it if i do say so myself. I manged to wrangle this tutorial to fit my needs. Went to bed that night very pleased with myself.

But…whats this…

%%timeit -n 5 -r 5
results = cy_ks_np(arr_base, arr_focus, ks_mode)
# 2.28 s ± 54 ms per loop (mean ± std. dev. of 5 runs, 5 loops each)
start_time = time.time()
results = cy_ks_np(arr_base, arr_focus, ks_mode)
end_time = time.time()
print(f'{round(end_time-start_time,2)} seconds')
# cy_ks_np
# 2.1 seconds

2.2 Seconds!!! What the heck i was expecting some magical voodoo that would at least 10x speed me up, come on cython don’t do this to me, i was going to be a hero, they were going to chant my name in the office.

So i did what was the logical next step – made a reproducible example and asked StackOverflow to do it for me 🙂

Bro, do you even Profile!

So while i waited on SO to do its thing i asked a few real engineers in my company what they thought. And their first response was – did you profile your code?

I began to panic, i’ve been found out, oh no its happening so i quickly looked the jupyter cell magic to profile my functions.

Well would ya look at that – 500 calls to stats.py:5187(_compute_prob_inside_method) taking up ~1.8 of my ~2 seconds.

Turns out this whole exercise has been a bit of a waste of time so far. So i went back and dug into the ks_2samp() docs and the code on github to see if anything could be done.

Wait whats this mode parameter – maybe i can play with that a bit, oh one option is “‘asymp’ : use asymptotic distribution of test statistic” that sounds like it could be faster than “exact“.

So with ks_mode='asymp' i ran things again and held my breath.

Lo and behold the solution was staring me in the face all along, obviously someone has already provided some sort of knobs and parameters to get a faster implementation under the hood.

As per usual i should have stuck to my number 1 rule of writing as little code as possible and trying to use other peoples good work to make myself look better 🙂

p.s. all the code is in a notebook here.

Ireland Covid19 Data

I was looking around a bit and could not really find any datasets behind the daily updates from the Irish government that get posted here. In particular i was thinking the break out tables of numbers by different dimensions might be of use for anyone looking to analyse the data.

So here is a python script to grab all press release links from the updates page, pull the html tables in pandas dataframes, do some ugly/gnarly data wrangling and save results into csv files here.

As an example i’ve stuck some of the headline figures and stats in a Tableau dashboard here.

Update1: This looks like a nice dashboard using similar data for Ireland.

Update2: IrishDataViz is a great twitter account with some analysis of the irish numbers.

A little brainteaser (or i’m an idiot)

This took me waaay too long to work out today and i was thinking it could make a nice little interview coding type question (which i’d probably fail).

Suppose you have 10,000 rows of data and need to continually train and retrain a model training on at most 1,000 rows at a time and retraining the model every 500 rows, can you tell me how many “batches” of data this will create and the start and end index of each batch?

So thats:

n = 10000
train_max = 1000
train_every = 500

And we want a dictionary like this:

  1: {"start": 1, "end": 1000},
  2: {"start": 500, "end": 1000}, 
  ?: {"start": ?, "end": ?},

After doing some crazy loops in python for a while I decided to go back to basics and do it Jeremy Howard style in excel (well gsheets – i’m not a savage) – gsheet.

And here is my Python solution:

def calc_batches(train_max: int, train_every: int, n: int) -> dict:
batches = dict()
# loop over up to as many records as you have
for batch in range(n):
# work out the start of the batch, with a max() to handle first batch
start = max(train_every * batch, 1)
# work out the end of the batch, with a min() to handle last batch
end = min(train_max+(train_every * batch), n)
# add batch info to the dictionary
batches[batch+1] = {"start": start, "end": end}
# break out once you have assigned all rows to a batch
if end == n:
return batches
calc_batches(train_max=1000, train_every=500, n=10000)
{1: {'start': 1, 'end': 1000},
2: {'start': 500, 'end': 1500},
3: {'start': 1000, 'end': 2000},
4: {'start': 1500, 'end': 2500},
5: {'start': 2000, 'end': 3000},
6: {'start': 2500, 'end': 3500},
7: {'start': 3000, 'end': 4000},
8: {'start': 3500, 'end': 4500},
9: {'start': 4000, 'end': 5000},
10: {'start': 4500, 'end': 5500},
11: {'start': 5000, 'end': 6000},
12: {'start': 5500, 'end': 6500},
13: {'start': 6000, 'end': 7000},
14: {'start': 6500, 'end': 7500},
15: {'start': 7000, 'end': 8000},
16: {'start': 7500, 'end': 8500},
17: {'start': 8000, 'end': 9000},
18: {'start': 8500, 'end': 9500},
19: {'start': 9000, 'end': 10000}}
view raw calc_batches.py hosted with ❤ by GitHub

…I’m pretty sure someone will come along with a super pythonic one liner that shows maybe i am an idiot after all.

Ok now back to work.

Update: Actually i think what i want is more something like the below where you can define a minimum and maximum size of your training data and then roll that over your data.

def calc_batches(train_min: int, train_max: int, train_every: int, n: int) -> dict:
batches = dict()
batch = 0
for row in range(1,n+1):
if row < train_min:
elif row == train_min:
batches[batch] = dict(start=0, end=row)
elif row % train_every == 0:
batch += 1
batches[batch] = dict(start=max(0,rowtrain_max), end=row)
return batches
calc_batches(train_min=1000, train_max=5000, train_every=500, n=10000)
{0: {'start': 0, 'end': 1000},
1: {'start': 0, 'end': 1500},
2: {'start': 0, 'end': 2000},
3: {'start': 0, 'end': 2500},
4: {'start': 0, 'end': 3000},
5: {'start': 0, 'end': 3500},
6: {'start': 0, 'end': 4000},
7: {'start': 0, 'end': 4500},
8: {'start': 0, 'end': 5000},
9: {'start': 500, 'end': 5500},
10: {'start': 1000, 'end': 6000},
11: {'start': 1500, 'end': 6500},
12: {'start': 2000, 'end': 7000},
13: {'start': 2500, 'end': 7500},
14: {'start': 3000, 'end': 8000},
15: {'start': 3500, 'end': 8500},
16: {'start': 4000, 'end': 9000},
17: {'start': 4500, 'end': 9500},
18: {'start': 5000, 'end': 10000}}

Github Webhook -> Cloud Function -> BigQuery

I have recently needed to watch and track various activities on specific github repos i’m working on, however the rest api from Gtihub can sometimes be a bit limited (for example, best i could see, if you want to get the most recent list of people who began watching your repo you need to make a lot of paginated api calls and do battle with rate limiting 💩).

This is where Github Webhooks can be a very useful alternative way to trigger certain events of interest to some endpoint where you can then handle the data as you need. The use case i was interested in was triggering an event any time someone starred, unstarred, watched or forked a specific repository. I wanted to then store that info in a table in Google BigQuery where it can be used to track repository activity over time for various reasons you might want (outreach to the community around the repository, or just tracking growth over time).

After the usual few hours of googling around i landed upon the idea of having the webhook for Github send events to a Google Cloud Function, from there my cloud function can process and append the data onto a BigQuery table. To make developing and maintaining the cloud function easy i used Serverless and built on this example in particular.

p.s. i also found this repository very useful as well as this one from Bloomberg. Also i think you could maybe get something similar done without any code using something like Zapier (although i don’t think they have all the Github Webhook events available).

p.p.s all the code is in this repo.

Step 1 – Serverless

We start by leveraging this Serverless example to create the bare bones structure for our cloud function.

In a folder where we want the code to live we run the below to install Serverless if needed, and pull down the google-python-simple-http-endpoint template and save it into a new Serverless project called handle-github-events.

npm install serverless -g
serverless install -u https://github.com/serverless/examples/tree/master/google-python-simple-http-endpoint -n handle-github-events
view raw start.sh hosted with ❤ by GitHub

The approach i am taking also depends on using a .env file to handle secrets and enviornmental variables so we also need to install the serverless-dotenv-plugin, and run npm install for everything else we need.

cd handle-github-errors
npm i -D serverless-dotenv-plugin
npm install
view raw do_npm_stuff.sh hosted with ❤ by GitHub

Step 2 – Cloud Function

Once we have the bare bones serverless template in place we can build on it to create the function we want for handling incoming requests from the Github webhook. All the code is in this repository and i’ll walk through the main points below.

The core of what we want to do in our Cloud function is in main.py. What it tries to do is:

  1. Validate that the request is coming from a known Github ip address.
  2. Validate that the hashed secret key stored in Github when you create your webhook matches what is expected by the cloud function as pulled from the GITHUB_WEBHOOK_SECRET environment variable.
  3. Parse the json received from the Github request and append it to a table somewhere in BigQuery.
  4. Return as the response to Github some info about the event.
GCP HTTP Cloud Function to handle github webhook events.
Some code stolen from here: https://github.com/carlos-jenkins/python-github-webhooks/blob/master/webhooks.py
# -*- coding: utf-8 -*-
import hmac
import json
import datetime
import logging
import os
from ipaddress import ip_address, ip_network
import pandas as pd
import requests
def validate_request_ip(request):
"""Function to validate that request comes from a known github ip"""
# get ip of request
request_ip_address = ip_address(u'{}'.format(request.access_route[0]))
# get whitelist of valid ip's from github
github_ip_whitelist = requests.get('https://api.github.com/meta&#39;).json()['hooks']
# check if ip is a valid one from github
for valid_ip in github_ip_whitelist:
if request_ip_address in ip_network(valid_ip):
error_msg = 'IP {} not allowed.'.format(request_ip_address)
raise ValueError(error_msg)
def validate_request_signature(request):
"""Validate that request signature and function signature match"""
# get signature from header
sha_name, request_signature = request.headers.get('X-Hub-Signature').split('=')
# create matching signature
function_signature = hmac.new(
str.encode(os.environ.get('GITHUB_WEBHOOK_SECRET', 'Specified environment variable is not set.')),
# check if signatures match
if str(request_signature) != str(function_signature):
error_msg = 'Signatures do not match.'
raise ValueError(error_msg)
def validate_event_type(event_type):
"""Function to error out if event type is of a type not yet implemented for handling by this function"""
if event_type not in ['star', 'watch', 'fork']:
error_msg = f"Event Type '{event_type}' not yet implemented by this function."
raise NotImplementedError()
def github_event(request):
"""Function to handle incoming event from github webhook and save event data to BigQuery."""
# validate request ip
# validate request signature
# request_timestamp
request_timestamp = str(datetime.datetime.now())
# github_request_type
github_event_type = request.headers.get('X-GitHub-Event')
# get relevant env vars
gcp_project_id = os.environ.get('GCP_PROJECT_NAME')
bq_dataset_name = os.environ.get('BQ_DATASET_NAME')
bq_table_name = os.environ.get('BQ_TABLE_NAME')
bq_if_exists = os.environ.get('BQ_IF_EXISTS')
bq_table_suffix = request_timestamp.replace('-', '')[0:8]
# get json from request
request_json = request.get_json()
# create response body
response_body = {
"request_method": str(request.method),
"timestamp": request_timestamp,
"event_type": github_event_type,
"action": request_json.get("action", github_event_type),
"starred_at": request_json.get("starred_at", ""),
"repository_full_name": request_json.get("repository")["full_name"],
"sender_username": request_json.get("sender")["login"]
# build response
response = {
"statusCode": 200,
"body": response_body
# logging response
# make pandas df
data = [response_body['timestamp'], response_body['repository_full_name'], response_body['event_type'],
response_body['action'], response_body['sender_username']]
columns = ['timestamp', 'repo', 'event_type', 'action', 'username']
df = pd.DataFrame(data=[data], columns=columns)
# display df.head() in logs
# save to big query
project_id=gcp_project_id, if_exists=bq_if_exists
return json.dumps(response, indent=4)
view raw main.py hosted with ❤ by GitHub

Our serverless.yml file looks like below. Note that it is pulling environment variables required for serverless to deploy from a .env file you would need to create yourself (here is an example in the repo).

service: handle-github-events
frameworkVersion: ">=1.2.0 <2.0.0"
name: google
runtime: python37
project: ${env:GCP_PROJECT_NAME}
region: ${env:GCP_REGION_NAME}
credentials: ${env:GCP_KEY_FILE}
handler: github_event
http: path
view raw serverless.yml hosted with ❤ by GitHub

Step 3 – Deploy

Once we are ready we run `serverless deploy` and if all goes well see output like below:

>serverless deploy -v
Serverless: DOTENV: Loading environment variables from .env:
Serverless:      - GITHUB_WEBHOOK_SECRET
Serverless:      - GCP_KEY_FILE
Serverless:      - GCP_PROJECT_NAME
Serverless:      - GCP_REGION_NAME
Serverless:      - BQ_DATASET_NAME
Serverless:      - BQ_TABLE_NAME
Serverless:      - BQ_IF_EXISTS
Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Compiling function "github_event"...
Serverless: Uploading artifacts...
Serverless: Artifacts successfully uploaded...
Serverless: Updating deployment...
Serverless: Checking deployment update progress...
Serverless: Done...
Service Information
service: handle-github-events
project: <your project name will be here>
stage: dev
region: <your region will be here>

Deployed functions

Serverless: Removing old artifacts...

Now you should have a cloud function alive at some url like https://your-region-your-project-name.cloudfunctions.net/github_event.

Step 4 – Github Webhook

Once your function is deployed (or in reality you might make the Gtibhub webhook first and then iterate on the function to get it doing what you want) you can create and test Github Webhook you want to send events from.

In my case and for this post i’m going to add the webhook to my andrewm4894/random repository for illustration. Payload URL is the url of the cloud function we created and Secret should be the same string you are storing in your .env file as “GITHUB_WEBHOOK_SECRET”.

Check whatever events you want to trigger on – i’m my case it was star, watch and fork events (Note: the function might not work if you were to send all events or different events – you would just need to adapt it accordingly).

Fingers Crossed

Now we can try see if it works by triggering some events. In this example i logged on as a second username i have and pressed some star, watch, and fork buttons to see what happened.

You can see recent triggers of the webhook in Github and this can be very useful for debugging things and while developing.

An example request sent to the cloud function.

And you can also see the response received from the cloud function. In this case showing that “andrewm4894netdata” (my other user) deleted a star from the “andrewm4894/random” repository 😔.

Example response back from our cloud function.

And then finally we can see the stored events in our table in BigQuery:

We have the data!!

And that’s it! We have our Github Webhook sending events to our Google Cloud Function which is in turn appending them onto a daily table in BigQuery. Go Webhooks!

My First PyPI Package

I’ve been threatening to myself to do this for a long time and recently got around to it, so as usual i’m going to try milk it for a blog post (Note: i’m not talking about getting into a box like the below picture, its something much less impressive).

Confession – I don’t know matplotlib

I have a confession to make that’s been eating away at me and i need to get off my chest – i’m pretty useless at plotting anything in Python. I never really had the time/need to sit down and ‘learn’ matplotlib from first principles (does anyone?). I’ve usually had tools like Tableau or Looker to sit on top of whatever database i am using and make visualizations pretty painlessly.

When I’ve needed to do something custom or more complicated it usually goes like this, i spend about a day or two randomly googling around for something that looks close enough to what i need, start playing around with the code (copy paste), then i find some other example i like a little bit more that uses a different library (seaborn, bokeh, plotly etc.) and start the whole painful process over again!

Eventually i settle on some Frankenstein solution that gets me over the line until the next time. After living this cycle many times i decided to some day build my own plotting library that would short circuit this shitshow and over time become the answer to all my plotting needs. And i was hoping it would also be a nice excuse to learn about Python packaging and deploying to PyPI.

Cookiecutter to the rescue

Turns out, like most other things, there are already great tools out there to make this much easier then i expected it would be – the main one being cookiecutter and in particular this cookiecutter template for pypi packages (i also found this TalkPython course and these talks really useful starting points).


So after a bit of dicking around with cookiecutter i had the basis for my plotting package (see my minimal example ‘hello world’ type package on PyPI here) and just needed to build out my functionality (am4894plots on PyPI).

I’ve mostly been working with time series data recently so decided to start there with some common typical plots i might often reach for when looking at such data. My main principles in the package are:

  • Usually my data is in a pandas dataframe and that what i want to pass into my plotting function along with a list of what cols i want to plot and as little else as possible.
  • I don’t care what library i use under the hood and where possible i might even want to implement the same version of a plot in multiple underlying libraries for whatever reason (At the moment it’s mainly just either Plotly or Bokeh being used, but i can easily see myself adding more over time as needs arise).
  • This package is just for me to use, you are not allowed to use it 🙂

Moving parts

The great thing about leveraging something like cookiecutter is you can plug into as many best practice tools as possible with as little sweat as possible on your end. Below are some notable examples of tools or components you get pretty much out of the box that i expected to have to work much harder for.


I’ll finish with some quick examples to illustrate what the package actually does and some ways i’m planing to use it.



plot_hists(), plot_boxes()


Thats it

That’s it, now that i (technically) have a package on PyPI i feel just a little bit less of an impostor 🙂

KubeFlow Custom Jupyter Image (+ github for notebook source control)

I’ve been playing around a bit with KubeFlow a bit lately and found that a lot of the tutorials and examples of Jupyter notebooks on KubeFlow do a lot of the pip install and other sort of setup and config stuff in the notebook itself which feels icky.

But, in reality, if you were working in Jupyter notebooks on KubeFlow for real you’d want to build a lot of this into the image used to build the notebook server. Luckily, as with most of KubeFlow, its pretty flexible to customize and extend as you want, in this case by adding custom jupyter images.

Two main example use cases you’d want to do this are for ensuring some custom python package (e.g. my_utils) you have built is readily available in all your notebooks, and other external libraries that you use all the time are also available – e.g. kubeflow pipelines.

To that end, here is a Dockerfile that illustrates this (and here is corresponding image on docker hub).

ARG BASE_CONTAINER=gcr.io/kubeflow-images-public/tensorflow-1.13.1-notebook-cpu:v0.5.0
LABEL maintainer="andrewm4894@gmail.com"
LABEL version="01"
RUN pip3 install git+https://github.com/andrewm4894/my_utils.git#egg=my_utils
RUN pip3 install kfp –upgrade
CMD ["sh","-c", "jupyter notebook –notebook-dir=/home/jovyan –ip= –no-browser –allow-root –port=8888 –NotebookApp.token='' –NotebookApp.password='' –NotebookApp.allow_origin='*' –NotebookApp.base_url=${NB_PREFIX}"]
view raw Dockerfile hosted with ❤ by GitHub

Once you have such a custom image building fine it’s pretty easy to just point KubeFlow at it when creating a Jupyter notebook server.

Just specify your custom image

Now when you create a new workbook on that jupyter server you have all your custom goodness ready to go.

Github for notebooks

As i was looking around it seems like there is currently plans to implement some git functionality into the notebooks on KubeFlow in a bit more of a native way (see this issue).

For now i decided to just create a ssh key (help docs) for the persistent workspace volume connected to the notebook server (see step 10 here).

Then once you want to git push from your notebook server you can just hack together a little notebook like this that you can use as a poor man’s git ui 🙂

Multi-Variate, Multi-Step, LSTM for Anomaly Detection

This post will walk through a synthetic example illustrating one way to use a multi-variate, multi-step LSTM for anomaly detection.

Imagine you have a matrix of k time series data coming at you at regular intervals and you look at the last n observations for each metric.

A matrix of 5 metrics from period t to t-n

One approach to doing anomaly detection in such a setting is to build a model to predict each metric over each time step in your forecast horizon and when you notice your prediction errors start to change significantly this can be a sign of some anomalies in your incoming data.

This is essentially an unsupervised problem that can be converted into a supervised one. You train the model to predict its own training data. Then once it gets good at this (assuming your training data is relatively typical of normal behavior of your data), if you see some new data for which your prediction error is much higher then expected, that can be a sign that you new data is anomalous in some way.

Note: This example is adapted and built off of this tutorial which i found a very useful starting point. All the code for this post is in this notebook. The rest of this post will essentially walk though the code.

Imports & Paramaters

Below shows the imports and all the parameters for this example, you should be able to play with them and see what different results you get.

Note: There is a Pipfile here that shows the Python libraries needed. If you are not familiar, you should really check out pipenv, its really useful once you play with it a bit.

import numpy as np
import pandas as pd
from numpy import concatenate
from matplotlib import pyplot
from keras.models import Sequential
from keras.callbacks import Callback
from keras.layers import LSTM, Dense, Activation
import matplotlib.pyplot as plt
%matplotlib inline
N_DATA_ORIG = 3000 # length of data to generate
N_FEATURES = 5 # number of syntethic features to create
N_ROLLING = 1000 # length of window over which to smooth the random data to make it look realistic
N_TIMESTEPS = 5 # number of timesteps you want to both train on and predict out to
N_DATA = N_DATA_ORIG N_ROLLING # length of final data after smoothing
N_TRAIN_ITERATIONS = 5 # number of times to iterate training of the model
N_EPOCHS = 5 # within each train call how many epochs to run for
BATCH_SIZE = 100 # batch size to train on
N_LAYERS = 3 # number of layers for the LSTM
N_LSTM_UNITS = 2 # number of hidden unit in each LSTM layer
BREAK_LEN = 1000 # length of the break in the data we will create
random_break_point = np.random.choice(N_DATA) # pick a random point in the data to swap in the broken data into

Fake Data!

We will generate some random data, and then smooth it out to look realistic. This will be our ‘normal’ data that we will use to train the model.

I couldn’t help myself.

Then we will make a copy of this normal data and inject in some random noise at a certain point and for a period of time. This will be our ‘broken’ data.

So this ‘broken’ data is the data that we should see the model struggle with in terms of prediction error. It’s this error (aggregated and summarized in some way, e.g. turned into a z-score) that you could then use to drive an anomaly score (you could also use loss from the continually re-training on new data whereby the training loss should initially spike once the broken data comes into the system but over time the training would then adapt the model to the new data).

# make some noisy but smooth looking data
data = np.sqrt(np.random.rand(N_DATA_ORIG,N_FEATURES))
df_data = pd.DataFrame(data)
df_data = df_data.rolling(window=N_ROLLING).mean()
df_data = df_data.dropna()
df_data = df_data.head(N_DATA)
data = df_data.values
# plot the normal healthy data
fig, ax = plt.subplots(num=None, figsize=(14, 6), dpi=80, facecolor='w', edgecolor='k')
size = len(data)
for x in range(data.shape[1]):
ax.plot(range(0,size), data[:,x], '-', linewidth=1)

This gives us our normal-ish real word looking data that we will use to train the model.

5 random time series that have been smoothed a bit to look realistic.

To make our ‘broken’ data (called data_new in the code) i lazily just copy the ‘normal’ data but mess up a segment of it with some random noise.

# make some random data
data_rand = np.random.rand(N_DATA,N_FEATURES)
data_new = np.copy(data)
# at a random point for a certain number of steps, swap out the smooth data with some random data
data_new[random_break_point🙁random_break_point+BREAK_LEN)] = data_rand[random_break_point🙁random_break_point+BREAK_LEN)]
# plot the new data
fig, ax = plt.subplots(num=None, figsize=(14, 6), dpi=80, facecolor='w', edgecolor='k')
size = len(data_new)
for x in range(data_new.shape[1]):
ax.plot(range(0,size), data_new[:,x], '-', linewidth=0.5)

And so below we can see our ‘broken’ data. I’ve set the broken segment to be quite wide here and its very obvious the broken data is totally different. The hope is that in reality the model once trained would be good at picking up much more nuanced changes in the data that are less obvious to the human eye.

For example if all metrics were to suddenly become more or less correlated than normal but all still each move by a typical amount individually then this is the sort of change you’d like the model to highlight (this is probably something i should have tried to do when making the ‘broken’ data to make the whole example more realistic, feel free to try this yourself and let me know how you get on).

Same as the “normal” data but i’ve messed up a huge chunk of it.

Some Helper Functions

I’ve built some helper functions to make life easier in the example notebook. I’ll share the code below and talk a little about each.

  • data_reshape_for_model() : This function basically takes in an typical dataframe type array, loops through that data and reshapes it all into a numpy array of the shape expected by the keras LSTM model for both training and prediction. Figuring out how to reshape the data based on the N_TIMESTEPS, N_FEATURES and length of the data was actually probably the trickiest part of this whole example. I’ve noticed that many tutorials online just reshape the data but do so in an incomplete way by essentially just pairing off rows. But what you really want to do is step through all the rows to make sure you roll your N_TIMESTEPS window properly over the data to as to all possible windows in your training.
  • train() : This is just a simple wrapper for the keras train function. There is no real need for it.
  • predict() : Similar to train() is just a wrapper function that does not really do much.
  • model_data_to_df_long() : This function takes in a data array as used by the keras model and unrolls it into one big long pandas dataframe (numpy arrays freak me out a bit sometimes so i always try fall back pandas when i can get away with it 😉).
  • model_df_long_to_wide() : This function then takes the long format dataframe created by model_data_to_df_long() and converts it into a wide format that is closed to the original dataset of one row one observation and one column for each input feature (plus lots more columns for predictions for each feature for each timestep).
  • df_out_add_errors() : This function adds errors and error aggregation columns to the main df_out dataframe which stores all the predictions and errors for each original row of data.
  • yhat_to_df_out() : This function take’s in the model formatted training data and model formatted prediction outputs and wraps all the above functions to make a nice little “df_out” dataframe that has everything we want in it and is one row one observation so lines up more naturally with the original data.
def data_reshape_for_model(data_in,n_timesteps,n_features,print_info=True):
''' Function to reshape the data into model ready format, either for training or prediction.
# get original data shape
data_in_shape = data_in.shape
# create a dummy row with desired shape and one empty observation
data_out = np.zeros((1,n_timesteps,n_features))
# loop though each row of data and reshape accordingly
for row in range(len(data_in)):
# for each row look ahead as many timesteps as needed and then transpose the data to give shape keras wants
tmp_array = np.array([data_in[row🙁row+n_timesteps),].transpose()])
# if your reshaped data is as expected then concate the new observation into data_out
if tmp_array.shape == (1,n_timesteps,n_features):
data_out = np.concatenate((data_out,tmp_array))
# drop first dummy row of data_out
data_out = data_out[1:]
# get output data shape
data_out_shape = data_out.shape
if print_info: print(f'{data_in_shape} -> {data_out_shape}')
return data_out
def train(model,data,n_epochs=10,batch_size=50,print_info=False,callbacks=None,shuffle=False,verbose=1):
''' Function to take in model and data and train the model using defined params.
# fit the model to the data
model.fit(data, data, epochs=n_epochs, batch_size=batch_size,
validation_data=(data, data), verbose=verbose, shuffle=shuffle,
return model
def predict(model,data,print_info=True):
''' Function to take in model and data and return predictions in model data format.
# get prediction from model
yhat = model.predict(data)
if print_info: print(yhat.shape)
return yhat
def model_data_to_df_long(data,n_timesteps,n_features):
''' Function to take model data numpy array and translate it into a long format dataframe.
# define empty list to collect data into
data_tmp = []
# for each row in the data
for r in range(len(data)):
row = data[r]
# for each feature in each row
for f in range(n_features):
# for each timestep of each feature in each row
for t in range(n_timesteps):
# add an element to the list decoding what it represents
tmp = [r,f'f{f}',f't{t}',row[f,t]]
# append that element to the data
# now use the collected data to create a pandas df
df_long = pd.DataFrame(data_tmp,columns=['row','feature','timestep','value'])
# add a label col that can be used to go from long format to wide
df_long['label'] = df_long['feature'] + '_' + df_long['timestep']
return df_long
def model_df_long_to_wide(df_long,key_col='label'):
''' Function that can translate a long formant model data df into a wide version of it.
# use pivot to go from long to wide
df_wide = df_long[['row','label','value']].pivot(index='row',columns=key_col,values='value')
return df_wide
def df_out_add_errors(df_out,n_timesteps,n_features):
''' Function to take in a df_out type df and add in error columns
# loop through to get errors
f_cols = [f'f{f}' for f in range(n_features)]
t_cols = [f't{t}' for t in range(n_timesteps)]
for f_col in f_cols:
for t_col in t_cols:
lag = int(t_col.replace('t','')) + 1
df_out[f'{f_col}_{t_col}_error'] = df_out[f_col].shift(lag*1) df_out[f'{f_col}_{t_col}_yhat']
# get summary error metrics by timestep across all features
for t_col in t_cols:
df_out[f'{t_col}_error_avg'] = df_out[[col for col in df_out.columns if f'{t_col}_error' in col]].mean(axis=1)
df_out[f'{t_col}_error_med'] = df_out[[col for col in df_out.columns if f'{t_col}_error' in col]].median(axis=1)
df_out[f'{t_col}_error_min'] = df_out[[col for col in df_out.columns if f'{t_col}_error' in col]].min(axis=1)
df_out[f'{t_col}_error_max'] = df_out[[col for col in df_out.columns if f'{t_col}_error' in col]].max(axis=1)
df_out[f'{t_col}_error_rng'] = df_out[[col for col in df_out.columns if f'{t_col}_error' in col]].max(axis=1) df_out[[col for col in df_out.columns if f'{t_col}_error' in col]].min(axis=1)
return df_out
def yhat_to_df_out(data_train,yhat,n_timesteps,n_features):
''' Function to take data train and yhat prediction output array from model and turn it into final form of df_out.
# helper df's
df_train_long = model_data_to_df_long(data_train,n_timesteps,n_features)
df_yhat_long = model_data_to_df_long(yhat,n_timesteps,n_features)
df_train_wide = model_df_long_to_wide(df_train_long)
df_yhat_wide = model_df_long_to_wide(df_yhat_long)
df_yhat_wide.columns = [f'{col}_yhat' for col in df_yhat_wide.columns]
# begin process to collect final data frame
# make df_out
train_cols_latest = [col for col in df_train_wide.columns if f't{n_timesteps1}' in col]
df_out = df_train_wide[train_cols_latest]
# clean up col names
df_out.columns = [col.split('_')[0] for col in df_out.columns]
# now concat train cols and cols from df_yhat_wide
df_out = pd.concat([df_out,df_yhat_wide],axis=1)
# add in error cols
df_out = df_out_add_errors(df_out,n_timesteps,n_features)
return df_out

Build & Train The Model

Below code builds the model, trains it and also calls predict on all the training data be able to get errors on the original ‘normal’ training data.

# build network
model = Sequential()
# add number of layer specified
for layer in range(N_LAYERS):
model.compile(loss='mae', optimizer='adam')
# print model summary
# reshape data for training
print(f'… reshaping data for training …')
data_train = data_reshape_for_model(data,N_TIMESTEPS,N_FEATURES)
# begin training iterations
for i in range(N_TRAIN_ITERATIONS):
print(f'… training iteration {i} …')
model = train(model,data_train,callbacks=[LossHistory()])
# get predictions on healthy data using final trained model
yhat = predict(model,data_train)

We then call our “do everything” yhat_to_df_out() function on the training data and the predictions from the model.

df_out = yhat_to_df_out(data_train,yhat,N_TIMESTEPS,N_FEATURES)

Now we can plot lots of things from df_out. For example here are the errors averaged across all five features are each timestep prediction horizon.

plot_cols = [col for col in df_out.columns if 'error_avg' in col]
# plot the new data
fig, ax = plt.subplots(num=None, figsize=(14, 6), dpi=80, facecolor='w', edgecolor='k')
size = len(df_out)
for col in plot_cols:
ax.plot(range(0,size), df_out[col], '-', linewidth=0.5)

In the above plot we can see the averaged error of the model on its training data. Each line represents a different forecast horizon. We can see that the lines are sort of ‘stacked’ on top of each other which makes sense as you’d generally expect the error 5 timesteps out (red line “t4_error_avg”) to be higher then the one step ahead forecast (greeny/orangy line “t0_error_avg”).

If we look at the standard deviation of our errors in a similar way, we can see how the standard deviation of our errors generally tends to increase at times when our 5 original features are diverging from each other as you can imagine these are the hardest parts of our time series for this model to predict.

Lets Break It

So now that we have our model trained on our ‘normal’ data we can use it to see how well it does at predicting our new ‘broken’ data.

# now train on new data
print(f'… reshaping data for new data training …')
data_train_new = data_reshape_for_model(data_new,N_TIMESTEPS,N_FEATURES)
print("… begin training on new data …")
model = train(model,data_train_new,n_epochs=1)
yhat_new = predict(model,data_train_new)
df_out_new = yhat_to_df_out(data_train_new,yhat_new,N_TIMESTEPS,N_FEATURES)
plot_cols = [col for col in df_out_new.columns if 'error_avg' in col]
# plot the new data
fig, ax = plt.subplots(num=None, figsize=(14, 6), dpi=80, facecolor='w', edgecolor='k')
size = len(df_out_new)
for col in plot_cols:
ax.plot(range(0,size), df_out_new[col], '-', linewidth=0.5)
Here we can see that as soon as we hit the broken data the prediction errors go through the roof.

From the above we can see that as soon as the random broken data comes into the time series the model prediction errors explode.

As mentioned, this is a very obvious and synthetic use case just for learning on but the main idea is that if your data changed in a more complicated and harder to spot way then your error rates would everywhere reflect this change. These error rates could then be used as input into a more global anomaly score for your system.

That’s it, thanks for reading and feel free to add any comments or questions below. I may add some more complicated or real world examples building on this approach at a later stage.

UPDATEHere is a Google Colab notebook that’s a bit better as i’ve worked a bit more on this since the original blog post.

Custom Python Packages in AWS Lambda

It’s True.

I’m pretty sure i’ll be looking this up again at some stage so that passed one of my main thresholds for a blog post.

I’ve recently been porting some data and model development pipelines over to AWS Lambda and was mildly horrified to see how clunky the whole process for adding custom python packages to your Lambda was (see docs here).

This was probably the best post i found but it still did not quite cover custom python packages you might need to include beyond just the more typical pypi ones like numpy, pandas, etc. (p.s. this video was really useful if you are working in Cloud9).

So i set out to hack together a process that would automate 90% of the work in packaging up any python packages you might want to make available to your AWS Lambda including local custom python packages you might have built yourself.

The result involves a Docker container to build your packages in (i have to use this as using windows based python package local install does not work in Lambda as the install contains some windows stuff Lambda won’t like), and a jupyter notebook (of course there is some jupyter 🙂 ) to take some inputs (what packages you want, what to call the AWS Layer, etc), build local installs of the packages, add them to a zip file, load zip file to S3 and then finally use awscli to make a new layer from said S3 zip file.


The first place to start is with the below Dockerfile that creates a basic conda ready docker container with jupyter installed. Note it also includes conda-build and copies over the packages/ folder into the container (required as i wanted to install my “my_utils” package and have it available to the jupyter notebook).

ARG BASE_CONTAINER=jupyter/scipy-notebook
LABEL maintainer="myemail@email.com"
LABEL version="01"
# install specific package versions i want to use here
RUN conda install –quiet –yes \
pandas \
matplotlib \
boto3 && \
conda remove –quiet –yes –force qt pyqt && \
conda clean -tipsy && \
fix-permissions $CONDA_DIR && \
fix-permissions /home/$NB_USER
# install conda build
RUN conda install –quiet –yes conda-build
# copy over local files for my package
ADD packages/ /home/$NB_USER/packages/
# add my_utils package to conda
RUN conda develop /home/$NB_USER/packages/my_utils
# some additional conda installs
RUN conda install -y awscli
# run as root user
USER root
# run jupyter lab
ENTRYPOINT ["jupyter", "lab","–ip=","–allow-root"]
view raw Dockerfile hosted with ❤ by GitHub

Build this with:

$ docker build -t my-aws-python-packages -f ./Dockerfile ./

And then run it with:

$ docker run -it --name my-aws-python-packages 
    -p 8888:8888
    --mount type=bind,source="$(pwd)/work",target=/home/jovyan/work
    --mount type=bind,source="$(pwd)/packages",target=/home/jovyan/packages 
    -e AWS_ACCESS_KEY_ID=$(aws --profile default configure get aws_access_key_id)
    -e AWS_SECRET_ACCESS_KEY=$(aws --profile default configure get aws_secret_access_key)

The above runs the container, port forwards 8888 (for jupyter), mounts both the /packages and /work folders (as for these files we want changes from outside docker or inside to be reflected and vice versa), and passes in my AWS credentials as environment variables to the container (needed for the asw cli commands we will run inside the container). Its last step is to then launch jupyter lab which you then should be able to get to at http://localhost:8888/lab using the token provided by jupyter.

Notebook time – make_layer.ipynb

Once the docker container is running and you are in jupyter the make_layer notebook automates the local installation of a list of python packages, zipping them to /work/python.zip folder as expected by AWS Layers (when unzipped your root folder needs to be /python/…), loading it to an S3 location, and then using awscli to add a new layer or version (if the layer already exists).

The notebook itself is not that big so i’ve included it below.

For this example i’ve included two custom packages along with pandas into my AWS Layer. The custom packages are just two little basic hello_world() type packages (one actually creates the subprocess_execute() function used in the make_layer notebook). I’ve included pandas then as well to illustrate how to include a pypi package.

Serverless Deploy!

To round off the example we then also need to create a little AWS Lambda function to validate that the packages installed in our layer can actually be used by Lambda.

To that end, i’ve adapted the serverless example cron lamdba from here into my own little lambda using both my custom packages and pandas.

Here is the handler.py that uses my packages:

import json
from my_utils.os_utils import subprocess_execute
from my_dev.dev import hello_world
import pandas as pd
def run(event=dict(), context=dict()):
''' Function to be called by serverless lambda
# make a dummy df to ensure pandas available to the lambda function
df = pd.DataFrame(data=['this is a dataframe'],columns=['message'])
# call something from my_dev package
# print out results of ls
print(subprocess_execute('ls -la'))
# run shell command to print out results of pip freeze
print(subprocess_execute('pip freeze'))
view raw handler.py hosted with ❤ by GitHub

And the serverless.yml used to configure and deploy the lambda:

service: serverless-learn-lambda
name: aws
runtime: python3.6
region: us-west-2
stage: dev
role: arn:aws:iam::XXX:role/serverless-lambda
handler: handler.run
schedule: rate(1 hour)
view raw serverless.yml hosted with ❤ by GitHub

We then deploy this function (from here) with:

$ serverless deploy

And we can then go into the AWS console to the Lamdba function we just created. We can test it in the UI and see the expected output whereby our custom functions work as expected as does Pandas:


That’s it for this one, i’m hoping someone might find this useful as i was really surprised by how painful it was to get a simple custom package or even pypi packages for that matter available to your AWS Lambda functions.

If you wanted you could convert the ipynb notebook into a python script and automate the whole thing. Although i’m pretty sure Amazon will continue to make the whole experience a bit more seamless and easier over time.

Parallelize a wide df in Pandas

I was going to make a pretty picture.

Sometimes you end up with a very wide pandas dataframe and you are interested in doing the same types of operations (data processing, building a model etc.) but focused on subsets of the columns.

For example if we had a wide df with different time series kpi’s represented as columns then we might want to do something like look at each kpi at a time, apply some pre-processing and build something like an ARIMA time series model perhaps.

This is the situation i found myself in recently and it took me best part of an afternoon to figure out. Usually when i find myself in that situation i try and squeeze out a blog post in case might be useful for someone else or future me.

All the code in one glorious screenshot!

Note: repository with all code is here. p.s. thanks to this and this post that i built off of.

For this example i’m afraid i’m going to use the Iris dataset :0 . This example is as minimal and easy as i could throw together, basically the aim of the code is to:

  1. Build some function to take in a df, do some processing and spit out a new df.
  2. Have that function be parameterized in some way as might be needed (e.g if you wanted to do slightly different work for one subset of columns).
  3. Apply that function in parallel across the different subsets of your df that you want to process.

There are two main functions of interest here parallelize_dataframe() and do_work() both of which live in their own file called my_functions.py which can be imported into your jupyter notebook.

from multiprocessing import Pool
from functools import partial
import numpy as np
import pandas as pd
def parallelize_dataframe(df, func, n_pool=4, col_subsets=None, join_how='outer',**kwargs):
Function to take a df, a function with args, and a list of column subsets to apply function to.
Resulting list of df's are then joined back together based on the df index.
# create empty list of df's
df_list = []
# if col subsets not defined then just use all cols
if col_subsets == None:
col_subsets = [[col for col in df.columns]]
# loop over each subset of cols, make a subset df, and append it to df_list
for col_subset in col_subsets:
# define pool params
pool = Pool(n_pool)
# wrap the func in a partial and pass in the kwargs
p_func = partial(func, **kwargs)
# apply func via map to each of the df's in df_list
map_return_list = pool.map(p_func, df_list)
# join back all the resulting df's into one df based on joining back together based on index
for i in range(len(map_return_list)):
if i == 0:
df_out = map_return_list[i]
df_out = df_out.merge(map_return_list[i],join_how,left_index=True,right_index=True)
# multiprocessing clean up
return df_out
def do_work(df,kwarg_dict=None):
''' Function (which we want to parallelize across different cols of the df) to do some operations on a df.
# pass any args in as a dict to be parsed out as needed (not used in this example)
if kwarg_dict:
# for every col in the subseted df we want to do some operations
for col in df._get_numeric_data().columns:
df[f'{col}_squared'] = df[col]**2
df[f'{col}_sqrt'] = np.sqrt(df[col])
return df

parallelize_dataframe() does the below things:

  1. Break out df into a list of df’s based on the col_subsets list passed in as a parameter.
  2. Wrap the function that was passed in into a partial along with the kwargs (this is how your parameters make it into the do_work() function).
  3. Use map() from multiprocessing to apply the func (along with the args you want to send it) to each subset of columns from your df in parallel.
  4. Reduce all this back into on final df by joining all the resulting df’s from the map() output into one wide df again (note the assumption here of joining back on the df indexes – they need to be stable and meaningful).

The do_work() function in this example is just a simple function to add some new columns as examples of types of pandas (or any other) goodness you might want to do. In reality in my case it would be more like a apply_model() type function that would take each subset of columns, do some feature extraction, train a model and then also score the data as needed to.

Having the ability to do this for multiple subsets of columns in your wide df can really free up your time to focus on the more important things like dickying around with model parameters and different pre-processing steps 🙂

That’s pretty much it, a productive afternoon (in the play center with kids i might add) and am quite pleased with myself.

Update: One addition i made to this as things got more complicated when i went to implement it was the ability to apply different function params to each subset df. For example if you wanted to pass in different parameters to the function for different columns. In the do_parallel_zip.ipynb and corresponding my_functions_zip.py (i’m calling them “_zip” as they use zip() to “zip” up both the df_list and the corresponding kwargs to go with it to be unpacked later by do_work_zip()).

To be concrete, if we wanted to multiply the “sepal_…” cols by 100 and the “petal_..” cols by 0.5. We could use the “zip” approach like below (notebook here):

Which is using the “zip” approach in parallelize_dataframe_zip()

Where the zipped iterable is then unpacked as needed by the do_work_zip() function: