I’ve written about the pandas groupby function a fewtimes in the past; it’s a valuable command that I use frequently. You typically want to pipe your “group by” operations to a calculation function like count, sum, mean, etc. This blog post has a great write-up on groupby and the calculations you can do with it.
Most examples of groupby depict grouping your dataframes by referencing the literal names of your various columns. For example, working with this movie dataset, suppose I wanted to know how many movies are in the data per year. Typically, I’d code something like the following:
import pandas as pd
df = pd.read_csv('./data/regex_imdb.csv').fillna(0)
df[['Year', 'Name']].groupby('Year').count()
Getting fancier, suppose I wanted to group by both year and genre. I could do this (note that in this dataset, a multi-genre movie has the multiple genres comma-delimited):
But what if I wanted to do something slightly trickier, like grouping by year and whether or not a film was a comedy? You could add a new boolean column and use that in your grouping:
However, instead of taking the extra step of adding a new column to your dataframe, you could do that work inline with the pandas map function, especially if you don’t think you’ll use that new column elsewhere:
df[['Year', 'Genre']].groupby(['Year', df.Genre.map(lambda g: 'Comedy' if 'Comedy' in g else 'Some other genre')]).count()
I have especially found this approach helpful grouping with timestamps. Suppose you want to group your dataframe by date and hour. That now becomes pretty simple:
from datetime import datetime
d = {'dt':[datetime(2021,10,30,3,0,0),datetime(2021,10,30,3,0,0),datetime(2021,10,30,3,0,0),datetime(2021,10,30,4,0,0),
datetime(2021,10,30,5,0,0),datetime(2021,10,30,5,0,0),datetime(2021,10,31,3,0,0),datetime(2021,10,31,3,0,0)],
'desc':['some event','some other event','big event','small event','medium sized event',
'nothing to see here','event A','event B']}
df_events = pd.DataFrame(d)
df_events.groupby([df_events.dt.map(lambda d: datetime.date(d)), df_events.dt.map(lambda d: d.hour)]).count()
One important note: initially, I assumed my datetime values had a date property that I could use as I use their hour properties:
I had a challenge where I need to group a large dataset by a particular feature and then calculate a variety of statistics on those feature groups including a standard score for each record. My inclination was to loop through each group and run these calculations in each iteration–sorta outside my main dataframe–but then I thought, could there be an easier way in pandas to do this work?
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
# data from: https://www.kaggle.com/mysarahmadbhat/imdb-top-1000-movies
df = pd.read_csv('./data/regex_imdb.csv').fillna(0)
# filter out movies with no reported Gross
df = df[df.Gross != 0.0]
Suppose you wanted to know how each movie grossed against the average gross for their respective release years. To find this out, my inclination would be to loop through each year, calculate the mean for each year, then merge that value back into my main dataframe so that I could find the mean difference for each movie:
yr_means = []
for yr in df.Year.unique().tolist():
yr_means.append({'Year': yr, 'year_mean': df[df.Year==yr].Gross.mean()})
# put my year mean calculations into a new dataframe
df_year_means = pd.DataFrame(yr_means)
df = df.merge(df_year_means, on='Year')
# and now I can calculate my difference from the mean
df['diff_from_year_mean'] = df.Gross - df.year_mean
But with the transform function, I can do all this work in a single line:
df['year_mean'] = df.groupby('Year').Gross.transform(np.mean)
# and now I can calculate my difference from the mean
df['diff_from_year_mean'] = df.Gross - df.year_mean
And from there you can do interesting work like diverging line charts:
fig, ax = plt.subplots(figsize=(8, 8))
year_to_plot = 2010
plot_data = df[df.Year==year_to_plot][['Name', 'diff_from_year_mean']].sort_values('diff_from_year_mean')
plot_data['color'] = plot_data.diff_from_year_mean.apply(lambda d: 'red' if d < 0 else 'green')
_ = ax.hlines(data=plot_data, y='Name', xmin=0, xmax=plot_data.diff_from_year_mean, color=plot_data.color)
_ = ax.table(cellText=[['${0:.2f} Million'.format(df[df.Year==year_to_plot].year_mean.values[0])]],
colLabels=['Avg Gross'], colWidths=[0.25], loc='center right')
_ = ax.set_xlabel('Gross earnings from the average (millions of dollars)')
_ = ax.set_title('Movie gross earnings from the average: top rated movies of {0}'.format(year_to_plot))
The transform function takes a variety of functions, both in the conventional function signature and, sometimes, as a string alias. For example, you can use:
np.min or ‘min’ to get the minimum value of the distribution
np.max or ‘max’ to get the maximum value of the distribution
np.std or ‘std’ to get the standard deviation of the distribution
len or ‘count’ to get a record count of your distribution
np.var or ‘var’ to get the variance of the distribution
You can even throw other functions/aliases at it like ‘first’ to get the first value of your distribution. However, you may need to do some sorting first or you may not get the values you were expecting.
Transform is yet one more way to do cool, pandas one-liner operations on your dataframes. Give it a whirl!
I write a fair amount of Spark applications that run on Hadoop platforms and do lots of data parsing, transformation, and loading to HDFS, Hive, or other data repositories. When I code my applications in Scala, I usually use Eclipse and the ScalaTest framework to test my work.
However, I like to write PySpark solutions, too, but haven’t found a great way to test my solutions in an editor like VS Code. Recently, though, it occurred to me that maybe I could just test my code in a ready-made Hadoop environment, like a Docker image for Hadoop. So, here’s a sample application I put together to unit test a PySpark application with Docker.
Setting up my virtual environment
Python has a neat way of creating virtual environments for your projects so you can keep your projects’ dependencies safely isolated from each other. Here are the general steps I followed to create a virtual environment for my PySpark project:
In my WSL2 command shell, navigate to my development folder (change your path as needed): cd /mnt/c/Users/brad/dev
Create a directory for my project: mkdir ./pyspark-unit-testing
Enter the new project folder: cd pyspark-unit-testing
Create a subdir for my tests: mkdir ./tests
Create my virtual environment (note: I initially had some issues doing this in WSL2 and found this blog post helpful in overcoming them): python3 -m venv app-env
Start the new virtual environment: source app-env/bin/activate
(Optional) Create a requirements doc of your development environment: pip3 freeze -> requirements-dev.txt
Leave the virtual environment: deactivate
Now, fire up VS Code: code .
Download the Databricks Spark-XML binary
I’ve coded this sample application to ingest an XML file and transform it into a Spark dataframe. Much of the time, you can easily parse such XML files using the Databricks Spark-XML library. This library, though, is not native to Spark, so you must load it as you would a third party library. So, my project will demonstrate that, too! A two-for-one! You can download the JAR file I used from its Maven repository and save it to your project directory.
Write some code
I kept my code sample extremely simple and the full project is available on Github. Here’s a small snippet of my function that takes an XML file, transforms the data into a dataframe, and then returns the dataframe:
def parse_xml(xml_file):
"""Simple function using the Databricks Spark-XML API to parse XML documents into dataframes"""
df = spark.read.format('com.databricks.spark.xml').\
option('rootTag', 'catalog').\
option('rowTag', 'book').load(xml_file)
return df
Yeah, basically two lines of code…thanks to the Databricks API. If your XML is simple enough, the API can tackle it, but I’ve had some rather horrible XML to parse in the past that went well beyond the capabilities of the Spark-XML library.
Write some tests to test your code
I expect my client application to take an XML file and return a valid dataframe, but I need to test those expectations with unit tests. Here’s two I wrote for this sample application:
def test_parse_xml():
test_file = './tests/test.xml'
test_df = mpc.parse_xml(test_file)
assert test_df is not None, 'Expected a dataframe of data to be returned from function'
assert test_df.count() == 12, 'Received unexpected count from test data'
assert len(test_df.columns) == 7, 'Expected 7 columns in the test data'
def test_something_else():
# some other test
assert True
Set up your Docker container
For most of my Spark needs, I like to use the Jupyter “all-spark-notebook” image. I tend to use the spark-2 version since I usually use Spark v2 at work, so I pulled this image like so:
docker pull /jupyter/all-spark-notebook:spark-2
My Dockerfile is quite simple. It’s two big jobs are copying the Databricks JAR to the container and installing my Python dependencies, which, for this project, is just the pytest package:
# from https://hub.docker.com/r/jupyter/all-spark-notebook
FROM jupyter/all-spark-notebook:spark-2
WORKDIR /home/jovyan/work/
# from https://mvnrepository.com/artifact/com.databricks/spark-xml_2.11/0.11.0
COPY ./spark-xml_2.11-0.11.0.jar /usr/local/spark/jars/.
COPY ./requirements-dev.txt /home/jovyan/work/.
RUN pip install -r requirements-dev.txt --trusted-host pypi.org --trusted-host files.pythonhosted.org
Run the container
When I start my Docker container, I like to include several commands that are not necessary for unit testing but helpful with other uses of the container like its Jupyter Notebook capabilities. Here’s the command I normally use:
Recent Comments