DadOverflow.com

Musings of a dad with too much time on his hands and not enough to do. Wait. Reverse that.

Page 9 of 57

Unit testing PySpark apps with Docker

I write a fair amount of Spark applications that run on Hadoop platforms and do lots of data parsing, transformation, and loading to HDFS, Hive, or other data repositories. When I code my applications in Scala, I usually use Eclipse and the ScalaTest framework to test my work.

However, I like to write PySpark solutions, too, but haven’t found a great way to test my solutions in an editor like VS Code. Recently, though, it occurred to me that maybe I could just test my code in a ready-made Hadoop environment, like a Docker image for Hadoop. So, here’s a sample application I put together to unit test a PySpark application with Docker.

Setting up my virtual environment

Python has a neat way of creating virtual environments for your projects so you can keep your projects’ dependencies safely isolated from each other. Here are the general steps I followed to create a virtual environment for my PySpark project:

  1. In my WSL2 command shell, navigate to my development folder (change your path as needed): cd /mnt/c/Users/brad/dev
  2. Create a directory for my project: mkdir ./pyspark-unit-testing
  3. Enter the new project folder: cd pyspark-unit-testing
  4. Create a subdir for my tests: mkdir ./tests
  5. Create my virtual environment (note: I initially had some issues doing this in WSL2 and found this blog post helpful in overcoming them): python3 -m venv app-env
  6. Start the new virtual environment: source app-env/bin/activate
  7. Install the pytest package: pip3 install pytest –trusted-host pypi.org –trusted-host files.pythonhosted.org
  8. (Optional) Create a requirements doc of your development environment: pip3 freeze -> requirements-dev.txt
  9. Leave the virtual environment: deactivate
  10. Now, fire up VS Code: code .

Download the Databricks Spark-XML binary

I’ve coded this sample application to ingest an XML file and transform it into a Spark dataframe. Much of the time, you can easily parse such XML files using the Databricks Spark-XML library. This library, though, is not native to Spark, so you must load it as you would a third party library. So, my project will demonstrate that, too! A two-for-one! You can download the JAR file I used from its Maven repository and save it to your project directory.

Write some code

I kept my code sample extremely simple and the full project is available on Github. Here’s a small snippet of my function that takes an XML file, transforms the data into a dataframe, and then returns the dataframe:

def parse_xml(xml_file):
    """Simple function using the Databricks Spark-XML API to parse XML documents into dataframes"""
    df = spark.read.format('com.databricks.spark.xml').\
        option('rootTag', 'catalog').\
        option('rowTag', 'book').load(xml_file)
    return df

Yeah, basically two lines of code…thanks to the Databricks API. If your XML is simple enough, the API can tackle it, but I’ve had some rather horrible XML to parse in the past that went well beyond the capabilities of the Spark-XML library.

Write some tests to test your code

I expect my client application to take an XML file and return a valid dataframe, but I need to test those expectations with unit tests. Here’s two I wrote for this sample application:

def test_parse_xml():
    test_file = './tests/test.xml'
    test_df = mpc.parse_xml(test_file)
    
    assert test_df is not None, 'Expected a dataframe of data to be returned from function'
    assert test_df.count() == 12, 'Received unexpected count from test data'
    assert len(test_df.columns) == 7, 'Expected 7 columns in the test data'

def test_something_else():
    # some other test
    assert True

Set up your Docker container

For most of my Spark needs, I like to use the Jupyter “all-spark-notebook” image. I tend to use the spark-2 version since I usually use Spark v2 at work, so I pulled this image like so:

docker pull /jupyter/all-spark-notebook:spark-2

My Dockerfile is quite simple. It’s two big jobs are copying the Databricks JAR to the container and installing my Python dependencies, which, for this project, is just the pytest package:

# from https://hub.docker.com/r/jupyter/all-spark-notebook
FROM jupyter/all-spark-notebook:spark-2

WORKDIR /home/jovyan/work/

# from https://mvnrepository.com/artifact/com.databricks/spark-xml_2.11/0.11.0
COPY ./spark-xml_2.11-0.11.0.jar /usr/local/spark/jars/.
COPY ./requirements-dev.txt /home/jovyan/work/.
RUN pip install -r requirements-dev.txt --trusted-host pypi.org --trusted-host files.pythonhosted.org

Run the container

When I start my Docker container, I like to include several commands that are not necessary for unit testing but helpful with other uses of the container like its Jupyter Notebook capabilities. Here’s the command I normally use:

docker run -d -p 9000:8888 -e JUPYTER_ENABLE_LAB=yes -e GRANT_SUDO=yes -v /mnt/c/Users/brad/dev/pyspark-unit-testing:/home/jovyan/work my_spark_image:v1

One of the more important arguments here is mounting my project directory to the container so that it has access to my code and tests.

Finally, do some testing

With the container now running, from your terminal, open up a bash shell:

docker exec -it <container_pid> bash

Once you have shell access to your container, you can use the pytest command line utility to run your tests:

pytest tests/ -s --disable-pytest-warnings

If all goes well, you should see a message indicating that your two tests have passed:

My two unit tests have passed!

Hope that helps with your unit testing! Grab my entire project from my Github page.

Parsing inconsistent file data with Python

I’ve posted a few of my data file parsing challenges in the past–here and here, for example–and here’s a slight variation on those examples.

A friend of mine was challenged with parsing data from a file where not every field was guaranteed to be present. Take this for example:

A file with not-so-consistent data

In the above file, the “Nickname” field only appears in one record. Plus, the fields in the last record are in a different order than the others. All we know for sure is that we have a string (/****start record) that indicates when a new record begins and field keys and values are separated by “: “. How would I go about parsing such a file?

Step 1: Read in the file with pandas

I could pull in the file with Python’s open and read capabilities, but it’s probably even easier to do it with the pandas read_csv function (even though this is not a true CSV file):

import pandas as pd

df_raw = pd.read_csv('./data/another_odd_format.txt', names=['col1'])

Now we have a “raw” dataframe that looks like so:

Step 2: Find where your records start and end

Finding out where your records start is easy enough: find all the rows with your start record indicator (/****start record) and add 1. After that, finding where your records end is easy, too: subtract 1 from each of the “start record” rows beginning with the second start line. (Technically, I subtracted 2 to account for the empty line at the end of each record.) Then, you can just add that last index of your dataframe on at the end. Here’s what I did:

new_rec_str = '/****start record'
start_rows = [i+1 for i in df_raw[df_raw.col1.str.startswith(new_rec_str)].index.tolist()]
end_rows = [s-2 for s in start_rows[1:]] + [df_raw.index[-1]]

Step 3: Loop through each start/end record pair and parse away

recs = []  # list to contain my parsed records
for start_row, end_row in zip(start_rows, end_rows):
    new_rec = {}
    for i in range(start_row, end_row + 1):
        line = df_raw.loc[i, 'col1']
        new_rec[line.split(': ')[0]] = line.split(': ')[1]
    recs.append(new_rec)

This gives you a nice list of dictionary objects:

[{'Fname': 'al', 'LName': 'bundy', 'Address': 'chicago'},
 {'Fname': 'marcy', 'LName': 'darcy', 'Address': 'chicago'},
 {'Fname': 'theodore',
  'LName': 'cleaver',
  'Address': 'mayfield',
  'Nickname': 'the beaver'},
 {'Address': 'Hill Valley', 'LName': 'mcfly', 'Fname': 'marty'}]

Step 4: Load your list into a new dataframe

df_clean = pd.DataFrame(recs)

This gives you a neat and tidy dataframe:

If you don’t like the NaN in records missing a “Nickname” value, you can always add a fillna function to the end of the dataframe line and replace the NaN values with something like empty string (”). So, that’s how I would tackle this sort of challenge.

Filtering dataframes with tuples

Often when I’m working with data, I need to filter the data down into sub-groups for more detailed analysis. Usually, my needs are simple, where I only need to parse on fields independently. However, sometimes I need to filter on field combinations and that work can get complicated.

Even Mr. Hitchcock finds filtering by field combinations challenging

For example, suppose I’m working with a movie dataset and I want to compare Francis Ford Coppola dramas to Alfred Hitchcock mysteries to Martin Scorsese biography pictures. Initially, I might try something like this:

import pandas as pd


# data from: https://www.kaggle.com/mysarahmadbhat/imdb-top-1000-movies
df = pd.read_csv('./data/regex_imdb.csv').fillna(0)
# for simplicity, I'm just going to use the first, comma-delimited genre value
df['Genre'] = df.Genre.apply(lambda g: g.split(',')[0])

directors = ['Francis Ford Coppola', 'Alfred Hitchcock', 'Martin Scorsese']
genres = ['Drama', 'Mystery', 'Biography']
df[(df.Director.isin(directors)) & (df.Genre.isin(genres))]
This code allowed in Scorsese “mysteries”, for example, when I only wanted Scorsese “biographies”

Ok. Let’s then filter the hard way:

df[((df.Director=='Francis Ford Coppola') & (df.Genre=='Drama')) | 
   ((df.Director=='Alfred Hitchcock') & (df.Genre=='Mystery')) | 
   ((df.Director=='Martin Scorsese') & (df.Genre=='Biography'))]

This code does get me the data I want, but it’s long and a little unreadable. Is there a better way? Yes: tuples!

First, let’s take the director/genre combinations we want and put them together in a single list of tuples:

dirs_genres = [('Francis Ford Coppola', 'Drama'), ('Alfred Hitchcock', 'Mystery'), ('Martin Scorsese', 'Biography')]

Now, we can use the apply function to create an on-the-fly director/genre tuple field that we can parse on:

df[df.apply(lambda row: (row.Director, row.Genre), axis=1).isin(dirs_genres)]

We get the same results as before, but with slightly more readable code. Here’s a neat alternative option using Python’s tuple function:

df[df[['Director', 'Genre']].apply(tuple, axis=1).isin(dirs_genres)]

If you anticipate that you might use this director/genre combination a lot in your work, you could consider adding this tuple as a new column to your dataframe:

df['pg'] = df[['Director', 'Genre']].apply(tuple, axis=1)
df.head()

Then, you could do all your filtering directly against that single field.

Happy filtering!

« Older posts Newer posts »

© 2025 DadOverflow.com

Theme by Anders NorenUp ↑