Musings of a dad with too much time on his hands and not enough to do. Wait. Reverse that.

Category: technology (Page 3 of 36)

Unit testing PySpark and Hive

Often when I use Spark to process data, I write the results to tables in Hive.

As I’ve begun to embrace Visual Studio Code Remote Development to code and test my Spark applications, the thought occurs to me, can I use this approach to not only test the efficacy of my data processing code but how I save out the results, as well?

Back in the day, one might call this integration testing–testing how your code integrates with other systems like your database backend. Often you could use “mock” frameworks to simulate or mock the interface to your database. You could do the same for other systems like APIs you call. If you wanted to test how your code integrates with a frontend user interface, you could use a tool like Selenium.

But enough about that…how can I test that my Spark code properly writes my processed data to Hive? How can I test that the Hive scripts I write to support my Spark application actually work? (Instead of waiting until I go through all the work to deploy my solution to a real Hadoop cluster?) Turns out, you can also do these things in a proper remote container with Pytest.

Set up your environment

For starters, please check out my previous post on how to setup Remote Development in VS Code.

My simple Spark application

I started with my original sample client and added a function called save_dataframe that takes the dataframe, database name, and table name as parameters:

def save_dataframe(df, dbname, table_name):
    """Simple function to save a dataframe to Hive"""
    df = df.withColumn('insert_timestamp', current_timestamp())
    df.write.mode('append').insertInto(f'{dbname}.{table_name}')

Note that I am using the Spark function insertInto to save the dataframe data to Hive which does require that the destination table already exist (as opposed to saveAsTable).

My Hive script

So far, I have a simple Spark application that can take in some data, process that data into a dataframe, and then try to write that dataframe to Hive. From the Hive perspective, then, I have certain expectations of how that data should be stored and I write up those expectations in the form of a “table creation script.” Here’s my simple script:

--***********************************
-- Script to create my_test_table
--***********************************

CREATE TABLE some_db.my_test_table (
    fname STRING,
    lname STRING,
    age INT,
    insert_timestamp TIMESTAMP
);

Test the combination of my Spark code and Hive script

Now, we can test this work end-to-end with Pytest. Here’s my full test file, but allow me to point out a few important features:

Setting up your Spark session object

It took me a lot of time and research to get my Spark session object configured to support Hive testing. The critical settings are lines 3-5:

# important properties to set to enable Hive testing
spark = SparkSession.builder.appName('com.dadoverflow.mypysparkclient.tests') \
    .master('local[*]') \
    .enableHiveSupport() \
    .config('spark.sql.legacy.createHiveTableByDefault', 'false') \
    .getOrCreate()

Convenience function to pull in the text of your Hive scripts

For convenience, I wrote the function get_script_text to pull in the text of my Hive scripts and cast to string so that I could easily execute them:

def get_script_text(script_file):
    """
    Helper function to pull in text of CREATE TABLE scripts for testing
    """
    script_text = ''
    fn = '{0}/../hive_scripts/{1}'.format(myPath, script_file)

    with open(fn, 'r') as f:
        script_text = f.readlines()

    return ' '.join(script_text)

Leverage Pytest fixtures to set up and tear down my test database

I wrote a module-level pytest fixture that my tests could use to setup and tear down a test Hive database. Note line 5 that actually executes my Hive script to create the table I expect my application to write to:

@pytest.fixture
def setup_hive(scope='module'):
    #setup hive database for testing
    spark.sql(f'CREATE DATABASE {test_db}')
    spark.sql(get_script_text('create_table_my_test_table.sql'))
    yield

    #tear down database when testing is finished
    spark.sql(f'DROP TABLE {test_db}.{test_table}')
    spark.sql(f'DROP DATABASE {test_db}')

Finally, my unit test

In my unit test (or should I call it integration test?), I’m probably committing a no-no: I’m exercising two different functions of my application, build_dataframe and save_dataframe, when I should probably only be exercising one function per test. At any rate, the save_dataframe function should write my test data to Hive. I then turn around and read the data from Hive and then assert that the record count in Hive matches the record count of my original test data. If I wanted to be more thorough, I could probably assert on some of the individual records, data types, etc.

def test_write_some_data_to_hive(setup_hive):
    test_data = [('Ward', 'Cleaver', 35), ('June', 'Cleaver', 34), ('Wally', 'Cleaver', 10), ('Theodore', 'Cleaver', 7)]

    test_df = ma.build_dataframe(test_data)
    ma.save_dataframe(test_df, test_db, test_table)

    table_data = spark.sql(f'SELECT * FROM {test_db}.{test_table}')
    assert table_data.count() == len(test_data), 'Test failed: expected {0} records written to the test table'.format(len(test_data))

And, when I run my test from Test Explorer…it passes!

My complete project is here. Happy testing!

A better way to develop PySpark apps

A while back, I wrote about how to unit test your PySpark application using Docker. Since then, I’ve finally embraced a better way to develop and test PySpark applications when you don’t want to install Spark and all its dependencies on your workstation: using the Remote Development extension in VS Code. Here’s a simple tutorial to get you started remote developing–assuming you already have Docker and VS Code installed in your system.

Step 1: Install the Remote Development extension

In the Extensions panel in VS Code, type “remote” in the search textbox. The Remote Development extension should be one of the first to pop up. Click the “install” button to install it.

Step 2: Write your Dockerfile

The Remote Development extension lets you code in a Docker container, but you have to let the extension know what Docker image you want to use. In my case, I’ll stick with the Jupyter organization’s all-spark-notebook. Here’s my simple Dockerfile:

# Dockerfile to build docker image used to test my PySpark client

# from https://hub.docker.com/r/jupyter/all-spark-notebook
FROM jupyter/all-spark-notebook:latest
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$SPARK_HOME/python/lib/py4j-0.10.9.3-src.zip:$PYTHONPATH

WORKDIR /home/jovyan/work/

RUN conda install -c anaconda pytest

Step 3: Setup your devcontainer.json file

In order to enjoy a full development experience in your remote container, you’ll want the Remote Development extension to install all your favorite and useful VS Code extensions in that container. You do that by building out a devcontainer.json configuration file.

Back in the Extensions panel, start looking up your extensions. Since I’ll be doing Python development in Spark, I’ll lookup the Python extension:

See the handy “Add to devcontainer.json” option?

If you click on the gear icon in the page of your extension, you should see a “Add to devcontainer.json” option in the popup menu. Select that option to add the extension to the Remote Development configuration file. If this is your first time creating the configuration file for your project, you’ll get a dialog that says “no container configuration file found:”

Click the “Add Files…” button to create your configuration file

Click the “Add Files…” button to create that configuration file. Before the configuration can be created, though, Remote Development needs to know how you want to configure the core components of your Docker container. At this point, we can simply tell it to use the Dockerfile we created in Step #2:

Tell the extension to use the Dockerfile we already created

Finally, our configuration file will get built with these references. Rinse and repeat for whatever other extensions you might need, especially Python Test Explorer.

Step 4: Fire up your remote container

Did you notice the little, green Remote Window icon in the bottom left corner of your VS Code window?

That’s a handy shortcut I use to activate some of the features of Remote Development. Click that icon, then you’ll see a variety of tasks the extension offers. Select the “Reopen in Container” task to launch your development container:

If all goes well, your container will launch and VS Code will connect to it. Check out my screenshot below and allow me to highlight three items:

  1. If your container fired up successfully, you’ll notice that the Remote Window icon in the bottom left hand corner of VS Code now says “Dev Container”.
  2. One very thoughtful feature of the Remote Development extension is that, when your container launches, the extension automatically opens an interactive window for you so that you now have a command shell in your container.
  3. I’ve noticed that, even though my devcontainer.json file tells the Remote Development extension to load a variety of extensions into my container, those extensions don’t always load properly. You might have to install some of these manually.

Step 5: Start developing

You can find my sample client and unit test script in my Github project.

Step 6: Start testing

It can still be a little tricky to enable easy unit testing in VS Code. One extra thing I found I had to do was, in settings, was set the Pytest Path value to the full path to my pytest module: /opt/conda/bin/pytest.

Take a look at this screenshot and allow me to highlight a few other interesting observations:

  1. When you deploy the VS Code testing explorer, a little lab beaker icon appears at the left of the IDE. Click it to open up the Test Explorer UI.
  2. There’s a core Test Explorer UI in VS Code and then extensions on top of the core package for specific development languages. In my experience, Test Explorer is pretty finnicky and I don’t think I’ve ever gotten it to work with my Python code.
  3. Instead of using the core Test Explorer, I install the Test Explorer extension built special for Python. Here, we can see that this extension did find my one unit test and will allow me to run the unit test from the UI.
  4. This observation is strange: for some reason, Pylance is throwing a warning that it cannot resolve my import of pytest. Nevertheless, my tests still run and pass without issue.

So, I’m finding decent productivity with my PySpark applications when I take advantage of the Remote Development extension and a good Docker image representative of my Production Spark environment. Hope you find this helpful!

Finding sub-ranges in my dataset

File this under: there-has-to-be-a-simpler-way-to-do-this-in-pandas-but-I-haven’t-found-what-that-is

Recently, I’ve been playing with some financial data to get a better understanding of the yield curve. Related to yield and inverted yield curves are the periods of recession in the US economy. In my work, I wanted to first build a chart that indicated the periods of recession and ultimately overlay that with yield curve data. Little did I realize the challenge of just coding that first part.

I downloaded a dataset of recession data, which contains a record for every calendar quarter from the 1960s to present day and a 0 or 1 to indicate whether the economy was in recession for that quarter–“1” indicating that it was. What I need to do was pull all the records with a “1” indicator and find the start and end times for each of those ranges so that I could paint them onto a chart.

I’ve heard it said before that any time you have to write a loop over your pandas dataframe, you’re probably doing it wrong. I’m certainly doing a loop here and I have a nagging suspicion there’s probably a more elegant way to achieve the solution. Nevertheless, here’s what I came up with to solve my recession chart problem:

Step 1: Bring in the necessary packages

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

%matplotlib inline  # for easy chart display in jupyter notebook

Step 2: Load in my downloaded recession dataset and take a peek

# recession dates: https://fred.stlouisfed.org/series/JHDUSRGDPBR
df_recessions = pd.read_csv('./data/JHDUSRGDPBR_20220327.csv')

df_recessions['DATE'] = pd.to_datetime(df_recessions.DATE)
df_recessions.head()
The first records of the Recession dataset
df_recessions[df_recessions.JHDUSRGDPBR==1.0].head()
The first records in the dataset where the economy was in recession

Step 3: Mark the start of every period of recession in the dataset

So, now I’m asking myself, “how do I extract the start and stop dates for every period of recession identified in the dataset? Let’s start with first just finding the start dates of recessions.” That shouldn’t be too difficult. If I can filter in just the recession quarters and calculate the date differences from one row to the next, if the difference is greater than three months (I estimated 93 days as three months), then I know there was a gap in quarters prior to the current record indicating that current record is the start of a new recession. Here’s what I came up with [one further note: my yield curve data only starts in 1990, so I filtered the recession data for 1990 to present]:

df_spans = df_recessions[(df_recessions.DATE.dt.year>=1990) & (df_recessions.JHDUSRGDPBR==1.0)].copy()
df_spans['days_elapsed'] = df_spans.DATE - df_spans.shift(1).DATE
df_spans['ind'] = df_spans.days_elapsed.dt.days.apply(lambda d: 's' if d > 93 else '')
df_spans.iloc[0, 3] = 's'  # mark first row as a recession start
df_spans
“s” indicates the start of a new recession

Step 4: Find the end date of each recession

Here’s where my approach starts to go off the rails a little. The only way I could think to find the end dates of each recession is to:

  1. Loop through a list of the start dates
  2. In each loop, get the next start date and then grab the date of the record immediately before that one
  3. When I hit the last loop, just consider the last record to be the end date of the most recent recession
  4. With every stop date, add three months since the stop date is only the first day of the quarter and, presumably, the recession more or less lasts the entire quarter

Confusing? Here’s my code:

start_stop_dates = []
start_dates = df_spans.loc[df_spans.ind=='s', ].DATE.tolist()

for i, start_date in enumerate(start_dates):
    if i < len(start_dates)-1:
        stop_date = df_spans.loc[df_spans.DATE < start_dates[i+1]].iloc[-1].DATE
    else:
        stop_date = df_spans.iloc[-1].DATE
        
    # add 3 months to the end of each stop date to stretch the value to the full quarter
    start_stop_dates.append((start_date, stop_date + np.timedelta64(3,'M')))
    
start_stop_dates
Recessions from 1990 to the beginning of 2022

Step 5: Build my chart

With that start/stop list, I can build my underlying recession chart:

fig, ax = plt.subplots(figsize=(12,6))

_ = ax.plot()
_ = ax.set_xlim([date(1990, 1, 1), date(2022, 4, 1)])
_ = ax.set_ylim([0, 10])

for st, sp in start_stop_dates:
    _ = ax.axvspan(st, sp, alpha=0.2, color='gray')
US Recessions: 1990 – 2021

Phew. All that work and I’m only at the starting point of my yield curve exploration, but that will have to wait for a future post. However, if you can think of a more elegant way to identify these date ranges without having to resort to looping, I’d love to hear it!

« Older posts Newer posts »

© 2024 DadOverflow.com

Theme by Anders NorenUp ↑