Often when I use Spark to process data, I write the results to tables in Hive.

As I’ve begun to embrace Visual Studio Code Remote Development to code and test my Spark applications, the thought occurs to me, can I use this approach to not only test the efficacy of my data processing code but how I save out the results, as well?

Back in the day, one might call this integration testing–testing how your code integrates with other systems like your database backend. Often you could use “mock” frameworks to simulate or mock the interface to your database. You could do the same for other systems like APIs you call. If you wanted to test how your code integrates with a frontend user interface, you could use a tool like Selenium.

But enough about that…how can I test that my Spark code properly writes my processed data to Hive? How can I test that the Hive scripts I write to support my Spark application actually work? (Instead of waiting until I go through all the work to deploy my solution to a real Hadoop cluster?) Turns out, you can also do these things in a proper remote container with Pytest.

Set up your environment

For starters, please check out my previous post on how to setup Remote Development in VS Code.

My simple Spark application

I started with my original sample client and added a function called save_dataframe that takes the dataframe, database name, and table name as parameters:

def save_dataframe(df, dbname, table_name):
    """Simple function to save a dataframe to Hive"""
    df = df.withColumn('insert_timestamp', current_timestamp())
    df.write.mode('append').insertInto(f'{dbname}.{table_name}')

Note that I am using the Spark function insertInto to save the dataframe data to Hive which does require that the destination table already exist (as opposed to saveAsTable).

My Hive script

So far, I have a simple Spark application that can take in some data, process that data into a dataframe, and then try to write that dataframe to Hive. From the Hive perspective, then, I have certain expectations of how that data should be stored and I write up those expectations in the form of a “table creation script.” Here’s my simple script:

--***********************************
-- Script to create my_test_table
--***********************************

CREATE TABLE some_db.my_test_table (
    fname STRING,
    lname STRING,
    age INT,
    insert_timestamp TIMESTAMP
);

Test the combination of my Spark code and Hive script

Now, we can test this work end-to-end with Pytest. Here’s my full test file, but allow me to point out a few important features:

Setting up your Spark session object

It took me a lot of time and research to get my Spark session object configured to support Hive testing. The critical settings are lines 3-5:

# important properties to set to enable Hive testing
spark = SparkSession.builder.appName('com.dadoverflow.mypysparkclient.tests') \
    .master('local[*]') \
    .enableHiveSupport() \
    .config('spark.sql.legacy.createHiveTableByDefault', 'false') \
    .getOrCreate()

Convenience function to pull in the text of your Hive scripts

For convenience, I wrote the function get_script_text to pull in the text of my Hive scripts and cast to string so that I could easily execute them:

def get_script_text(script_file):
    """
    Helper function to pull in text of CREATE TABLE scripts for testing
    """
    script_text = ''
    fn = '{0}/../hive_scripts/{1}'.format(myPath, script_file)

    with open(fn, 'r') as f:
        script_text = f.readlines()

    return ' '.join(script_text)

Leverage Pytest fixtures to set up and tear down my test database

I wrote a module-level pytest fixture that my tests could use to setup and tear down a test Hive database. Note line 5 that actually executes my Hive script to create the table I expect my application to write to:

@pytest.fixture
def setup_hive(scope='module'):
    #setup hive database for testing
    spark.sql(f'CREATE DATABASE {test_db}')
    spark.sql(get_script_text('create_table_my_test_table.sql'))
    yield

    #tear down database when testing is finished
    spark.sql(f'DROP TABLE {test_db}.{test_table}')
    spark.sql(f'DROP DATABASE {test_db}')

Finally, my unit test

In my unit test (or should I call it integration test?), I’m probably committing a no-no: I’m exercising two different functions of my application, build_dataframe and save_dataframe, when I should probably only be exercising one function per test. At any rate, the save_dataframe function should write my test data to Hive. I then turn around and read the data from Hive and then assert that the record count in Hive matches the record count of my original test data. If I wanted to be more thorough, I could probably assert on some of the individual records, data types, etc.

def test_write_some_data_to_hive(setup_hive):
    test_data = [('Ward', 'Cleaver', 35), ('June', 'Cleaver', 34), ('Wally', 'Cleaver', 10), ('Theodore', 'Cleaver', 7)]

    test_df = ma.build_dataframe(test_data)
    ma.save_dataframe(test_df, test_db, test_table)

    table_data = spark.sql(f'SELECT * FROM {test_db}.{test_table}')
    assert table_data.count() == len(test_data), 'Test failed: expected {0} records written to the test table'.format(len(test_data))

And, when I run my test from Test Explorer…it passes!

My complete project is here. Happy testing!