Musings of a dad with too much time on his hands and not enough to do. Wait. Reverse that.

Tag: tools (Page 6 of 35)

Functions in Pandas groupby

I’ve written about the pandas groupby function a few times in the past; it’s a valuable command that I use frequently. You typically want to pipe your “group by” operations to a calculation function like count, sum, mean, etc. This blog post has a great write-up on groupby and the calculations you can do with it.

Most examples of groupby depict grouping your dataframes by referencing the literal names of your various columns. For example, working with this movie dataset, suppose I wanted to know how many movies are in the data per year. Typically, I’d code something like the following:

import pandas as pd


df = pd.read_csv('./data/regex_imdb.csv').fillna(0)
df[['Year', 'Name']].groupby('Year').count()

Getting fancier, suppose I wanted to group by both year and genre. I could do this (note that in this dataset, a multi-genre movie has the multiple genres comma-delimited):

df[['Year', 'Genre', 'Name']].groupby(['Year', 'Genre']).count()

But what if I wanted to do something slightly trickier, like grouping by year and whether or not a film was a comedy? You could add a new boolean column and use that in your grouping:

df['is_comedy'] = df.Genre.str.contains('Comedy')
df[['Year', 'is_comedy', 'Name']].groupby(['Year', 'is_comedy']).count()

However, instead of taking the extra step of adding a new column to your dataframe, you could do that work inline with the pandas map function, especially if you don’t think you’ll use that new column elsewhere:

df[['Year', 'Genre']].groupby(['Year', df.Genre.map(lambda g: 'Comedy' if 'Comedy' in g else 'Some other genre')]).count()

I have especially found this approach helpful grouping with timestamps. Suppose you want to group your dataframe by date and hour. That now becomes pretty simple:

from datetime import datetime


d = {'dt':[datetime(2021,10,30,3,0,0),datetime(2021,10,30,3,0,0),datetime(2021,10,30,3,0,0),datetime(2021,10,30,4,0,0),
           datetime(2021,10,30,5,0,0),datetime(2021,10,30,5,0,0),datetime(2021,10,31,3,0,0),datetime(2021,10,31,3,0,0)],
     'desc':['some event','some other event','big event','small event','medium sized event',
             'nothing to see here','event A','event B']}

df_events = pd.DataFrame(d)
df_events.groupby([df_events.dt.map(lambda d: datetime.date(d)), df_events.dt.map(lambda d: d.hour)]).count()

One important note: initially, I assumed my datetime values had a date property that I could use as I use their hour properties:

df_events.groupby([df_events.dt.map(lambda d: d.date)]).count()

Unfortunately, that command will throw a strange error. Instead, you’ll have to cast your values to a date using the datetime package.

The pandas “transform” function

I had a challenge where I need to group a large dataset by a particular feature and then calculate a variety of statistics on those feature groups including a standard score for each record. My inclination was to loop through each group and run these calculations in each iteration–sorta outside my main dataframe–but then I thought, could there be an easier way in pandas to do this work?

Yes there is: transform.

As an example, take the movies dataset that I’ve used in the past:

import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt


# data from: https://www.kaggle.com/mysarahmadbhat/imdb-top-1000-movies
df = pd.read_csv('./data/regex_imdb.csv').fillna(0)
# filter out movies with no reported Gross
df = df[df.Gross != 0.0]

Suppose you wanted to know how each movie grossed against the average gross for their respective release years. To find this out, my inclination would be to loop through each year, calculate the mean for each year, then merge that value back into my main dataframe so that I could find the mean difference for each movie:

yr_means = []
for yr in df.Year.unique().tolist():
    yr_means.append({'Year': yr, 'year_mean': df[df.Year==yr].Gross.mean()})
    
# put my year mean calculations into a new dataframe
df_year_means = pd.DataFrame(yr_means)

df = df.merge(df_year_means, on='Year')

# and now I can calculate my difference from the mean
df['diff_from_year_mean'] = df.Gross - df.year_mean

But with the transform function, I can do all this work in a single line:

df['year_mean'] = df.groupby('Year').Gross.transform(np.mean)

# and now I can calculate my difference from the mean
df['diff_from_year_mean'] = df.Gross - df.year_mean

And from there you can do interesting work like diverging line charts:

fig, ax = plt.subplots(figsize=(8, 8))

year_to_plot = 2010
plot_data = df[df.Year==year_to_plot][['Name', 'diff_from_year_mean']].sort_values('diff_from_year_mean')
plot_data['color'] = plot_data.diff_from_year_mean.apply(lambda d: 'red' if d < 0 else 'green')

_ = ax.hlines(data=plot_data, y='Name', xmin=0, xmax=plot_data.diff_from_year_mean, color=plot_data.color)
_ = ax.table(cellText=[['${0:.2f} Million'.format(df[df.Year==year_to_plot].year_mean.values[0])]], 
             colLabels=['Avg Gross'], colWidths=[0.25], loc='center right')
_ = ax.set_xlabel('Gross earnings from the average (millions of dollars)')
_ = ax.set_title('Movie gross earnings from the average: top rated movies of {0}'.format(year_to_plot))

The transform function takes a variety of functions, both in the conventional function signature and, sometimes, as a string alias. For example, you can use:

  • np.min or ‘min’ to get the minimum value of the distribution
  • np.max or ‘max’ to get the maximum value of the distribution
  • np.std or ‘std’ to get the standard deviation of the distribution
  • len or ‘count’ to get a record count of your distribution
  • np.var or ‘var’ to get the variance of the distribution

You can even throw other functions/aliases at it like ‘first’ to get the first value of your distribution. However, you may need to do some sorting first or you may not get the values you were expecting.

Transform is yet one more way to do cool, pandas one-liner operations on your dataframes. Give it a whirl!

Unit testing PySpark apps with Docker

I write a fair amount of Spark applications that run on Hadoop platforms and do lots of data parsing, transformation, and loading to HDFS, Hive, or other data repositories. When I code my applications in Scala, I usually use Eclipse and the ScalaTest framework to test my work.

However, I like to write PySpark solutions, too, but haven’t found a great way to test my solutions in an editor like VS Code. Recently, though, it occurred to me that maybe I could just test my code in a ready-made Hadoop environment, like a Docker image for Hadoop. So, here’s a sample application I put together to unit test a PySpark application with Docker.

Setting up my virtual environment

Python has a neat way of creating virtual environments for your projects so you can keep your projects’ dependencies safely isolated from each other. Here are the general steps I followed to create a virtual environment for my PySpark project:

  1. In my WSL2 command shell, navigate to my development folder (change your path as needed): cd /mnt/c/Users/brad/dev
  2. Create a directory for my project: mkdir ./pyspark-unit-testing
  3. Enter the new project folder: cd pyspark-unit-testing
  4. Create a subdir for my tests: mkdir ./tests
  5. Create my virtual environment (note: I initially had some issues doing this in WSL2 and found this blog post helpful in overcoming them): python3 -m venv app-env
  6. Start the new virtual environment: source app-env/bin/activate
  7. Install the pytest package: pip3 install pytest –trusted-host pypi.org –trusted-host files.pythonhosted.org
  8. (Optional) Create a requirements doc of your development environment: pip3 freeze -> requirements-dev.txt
  9. Leave the virtual environment: deactivate
  10. Now, fire up VS Code: code .

Download the Databricks Spark-XML binary

I’ve coded this sample application to ingest an XML file and transform it into a Spark dataframe. Much of the time, you can easily parse such XML files using the Databricks Spark-XML library. This library, though, is not native to Spark, so you must load it as you would a third party library. So, my project will demonstrate that, too! A two-for-one! You can download the JAR file I used from its Maven repository and save it to your project directory.

Write some code

I kept my code sample extremely simple and the full project is available on Github. Here’s a small snippet of my function that takes an XML file, transforms the data into a dataframe, and then returns the dataframe:

def parse_xml(xml_file):
    """Simple function using the Databricks Spark-XML API to parse XML documents into dataframes"""
    df = spark.read.format('com.databricks.spark.xml').\
        option('rootTag', 'catalog').\
        option('rowTag', 'book').load(xml_file)
    return df

Yeah, basically two lines of code…thanks to the Databricks API. If your XML is simple enough, the API can tackle it, but I’ve had some rather horrible XML to parse in the past that went well beyond the capabilities of the Spark-XML library.

Write some tests to test your code

I expect my client application to take an XML file and return a valid dataframe, but I need to test those expectations with unit tests. Here’s two I wrote for this sample application:

def test_parse_xml():
    test_file = './tests/test.xml'
    test_df = mpc.parse_xml(test_file)
    
    assert test_df is not None, 'Expected a dataframe of data to be returned from function'
    assert test_df.count() == 12, 'Received unexpected count from test data'
    assert len(test_df.columns) == 7, 'Expected 7 columns in the test data'

def test_something_else():
    # some other test
    assert True

Set up your Docker container

For most of my Spark needs, I like to use the Jupyter “all-spark-notebook” image. I tend to use the spark-2 version since I usually use Spark v2 at work, so I pulled this image like so:

docker pull /jupyter/all-spark-notebook:spark-2

My Dockerfile is quite simple. It’s two big jobs are copying the Databricks JAR to the container and installing my Python dependencies, which, for this project, is just the pytest package:

# from https://hub.docker.com/r/jupyter/all-spark-notebook
FROM jupyter/all-spark-notebook:spark-2

WORKDIR /home/jovyan/work/

# from https://mvnrepository.com/artifact/com.databricks/spark-xml_2.11/0.11.0
COPY ./spark-xml_2.11-0.11.0.jar /usr/local/spark/jars/.
COPY ./requirements-dev.txt /home/jovyan/work/.
RUN pip install -r requirements-dev.txt --trusted-host pypi.org --trusted-host files.pythonhosted.org

Run the container

When I start my Docker container, I like to include several commands that are not necessary for unit testing but helpful with other uses of the container like its Jupyter Notebook capabilities. Here’s the command I normally use:

docker run -d -p 9000:8888 -e JUPYTER_ENABLE_LAB=yes -e GRANT_SUDO=yes -v /mnt/c/Users/brad/dev/pyspark-unit-testing:/home/jovyan/work my_spark_image:v1

One of the more important arguments here is mounting my project directory to the container so that it has access to my code and tests.

Finally, do some testing

With the container now running, from your terminal, open up a bash shell:

docker exec -it <container_pid> bash

Once you have shell access to your container, you can use the pytest command line utility to run your tests:

pytest tests/ -s --disable-pytest-warnings

If all goes well, you should see a message indicating that your two tests have passed:

My two unit tests have passed!

Hope that helps with your unit testing! Grab my entire project from my Github page.

« Older posts Newer posts »

© 2024 DadOverflow.com

Theme by Anders NorenUp ↑