Musings of a dad with too much time on his hands and not enough to do. Wait. Reverse that.

Tag: python (Page 2 of 26)

Avoiding duplicates in Hive with Anti Join

In the world of data engineering, when the engineer builds a data pipeline to copy data from one system to another, it becomes easy to accidentally insert duplicate records into your target system. For example, your pipeline might break and you have to take steps to backfill the missing information. If your pipeline didn’t break in a clear and obvious spot, you may end of reprocessing the same data more than once.

When I create tables in a conventional relational database, I normally create a primary key field to ensure uniqueness–that I don’t accidentally insert the same record twice into the table. That’s great if my data pipelines write to a relational database: if I end up having to backfill a broken operation, my database can reject data that I already successfully processed the first time around.

However, if my destination data repository is Apache Hive, I don’t have those same safeguards like primary key fields. So, how can you avoid inserting duplicate records into your Hive tables? Here’s and option: use ANTI JOIN.

For starters, suppose I have a table called my_db.people_table (note that I’m testing my code in a PySpark shell running in a jupyter/all-spark-notebook Docker container):

create_db_qry = 'CREATE DATABASE my_db'
create_table_qry = """
CREATE TABLE my_db.people_table (
    person_id INT,
    fname STRING,
    lname STRING
);
"""

spark.sql(create_db_qry)
spark.sql(create_table_qry)

And the results:

>>> spark.sql('SELECT * FROM my_db.people_table ORDER BY person_id').show()
+---------+-----+-----+
|person_id|fname|lname|
+---------+-----+-----+
+---------+-----+-----+

Now, let’s add some data to the table:

initial_data = [(1, 'Andy', 'Griffith'), (2, 'Bee', 'Taylor'), (3, 'Opie', 'Griffith'), (4, 'Barney', 'Fife')]
df = spark.createDataFrame(initial_data, ['person_id', 'fname', 'lname'])
df.write.mode('append').insertInto('my_db.people_table')

Now we have some initial data:

>>> spark.sql('SELECT * FROM my_db.people_table ORDER BY person_id').show()     
+---------+------+--------+
|person_id| fname|   lname|
+---------+------+--------+
|        1|  Andy|Griffith|
|        2|   Bee|  Taylor|
|        3|  Opie|Griffith|
|        4|Barney|    Fife|
+---------+------+--------+

Suppose we need to add more data to the table, but we’re not sure if the data is all original or if the new set contains records we previously processed. Here’s how we might normally do that:

more_data = [(3, 'Opie', 'Griffith'), (4, 'Barney', 'Fife'), (5, 'Floyd', 'Lawson'), (6, 'Gomer', 'Pyle'), (7, 'Otis', 'Campbell')]
df = spark.createDataFrame(more_data, ['person_id', 'fname', 'lname'])
df.write.mode('append').insertInto('my_db.people_table')

Uh-oh: looks like that new data did contain some records we already had:

>>> spark.sql('SELECT * FROM my_db.people_table ORDER BY person_id').show()     
+---------+------+--------+
|person_id| fname|   lname|
+---------+------+--------+
|        1|  Andy|Griffith|
|        2|   Bee|  Taylor|
|        3|  Opie|Griffith|
|        3|  Opie|Griffith|
|        4|Barney|    Fife|
|        4|Barney|    Fife|
|        5| Floyd|  Lawson|
|        6| Gomer|    Pyle|
|        7|  Otis|Campbell|
+---------+------+--------+

We can avoid that dilemma by using an ANTI JOIN statement in our insert operation. Here’s how that would look instead:

more_data = [(3, 'Opie', 'Griffith'), (4, 'Barney', 'Fife'), (5, 'Floyd', 'Lawson'), (6, 'Gomer', 'Pyle'), (7, 'Otis', 'Campbell')]
df = spark.createDataFrame(more_data, ['person_id', 'fname', 'lname'])

# write our new dataset to a temporary table
df.createOrReplaceTempView('people_table_tmp')

# now, craft our INSERT statement to "ANTI JOIN" the temp table to the destination table and only write the delta
antijoin_qry = """INSERT INTO my_db.people_table 
    SELECT t.person_id, t.fname, t.lname 
    FROM (SELECT person_id, fname, lname FROM people_table_tmp a LEFT ANTI JOIN my_db.people_table b ON (a.person_id=b.person_id)) t"""

# execute that anti join statement
spark.sql(antijoin_qry)

# cleanup by dropping the temp table
spark.catalog.dropTempView('people_table_tmp')

And the results:

>>> spark.sql('SELECT * FROM my_db.people_table ORDER BY person_id').show()
+---------+------+--------+
|person_id| fname|   lname|
+---------+------+--------+
|        1|  Andy|Griffith|
|        2|   Bee|  Taylor|
|        3|  Opie|Griffith|
|        4|Barney|    Fife|
|        5| Floyd|  Lawson|
|        6| Gomer|    Pyle|
|        7|  Otis|Campbell|
+---------+------+--------+

Wow! Looks so much better. So, if you suffer from duplicate data in your Hive tables, give ANTI JOIN a try!

Unit testing PySpark and Hive

Often when I use Spark to process data, I write the results to tables in Hive.

As I’ve begun to embrace Visual Studio Code Remote Development to code and test my Spark applications, the thought occurs to me, can I use this approach to not only test the efficacy of my data processing code but how I save out the results, as well?

Back in the day, one might call this integration testing–testing how your code integrates with other systems like your database backend. Often you could use “mock” frameworks to simulate or mock the interface to your database. You could do the same for other systems like APIs you call. If you wanted to test how your code integrates with a frontend user interface, you could use a tool like Selenium.

But enough about that…how can I test that my Spark code properly writes my processed data to Hive? How can I test that the Hive scripts I write to support my Spark application actually work? (Instead of waiting until I go through all the work to deploy my solution to a real Hadoop cluster?) Turns out, you can also do these things in a proper remote container with Pytest.

Set up your environment

For starters, please check out my previous post on how to setup Remote Development in VS Code.

My simple Spark application

I started with my original sample client and added a function called save_dataframe that takes the dataframe, database name, and table name as parameters:

def save_dataframe(df, dbname, table_name):
    """Simple function to save a dataframe to Hive"""
    df = df.withColumn('insert_timestamp', current_timestamp())
    df.write.mode('append').insertInto(f'{dbname}.{table_name}')

Note that I am using the Spark function insertInto to save the dataframe data to Hive which does require that the destination table already exist (as opposed to saveAsTable).

My Hive script

So far, I have a simple Spark application that can take in some data, process that data into a dataframe, and then try to write that dataframe to Hive. From the Hive perspective, then, I have certain expectations of how that data should be stored and I write up those expectations in the form of a “table creation script.” Here’s my simple script:

--***********************************
-- Script to create my_test_table
--***********************************

CREATE TABLE some_db.my_test_table (
    fname STRING,
    lname STRING,
    age INT,
    insert_timestamp TIMESTAMP
);

Test the combination of my Spark code and Hive script

Now, we can test this work end-to-end with Pytest. Here’s my full test file, but allow me to point out a few important features:

Setting up your Spark session object

It took me a lot of time and research to get my Spark session object configured to support Hive testing. The critical settings are lines 3-5:

# important properties to set to enable Hive testing
spark = SparkSession.builder.appName('com.dadoverflow.mypysparkclient.tests') \
    .master('local[*]') \
    .enableHiveSupport() \
    .config('spark.sql.legacy.createHiveTableByDefault', 'false') \
    .getOrCreate()

Convenience function to pull in the text of your Hive scripts

For convenience, I wrote the function get_script_text to pull in the text of my Hive scripts and cast to string so that I could easily execute them:

def get_script_text(script_file):
    """
    Helper function to pull in text of CREATE TABLE scripts for testing
    """
    script_text = ''
    fn = '{0}/../hive_scripts/{1}'.format(myPath, script_file)

    with open(fn, 'r') as f:
        script_text = f.readlines()

    return ' '.join(script_text)

Leverage Pytest fixtures to set up and tear down my test database

I wrote a module-level pytest fixture that my tests could use to setup and tear down a test Hive database. Note line 5 that actually executes my Hive script to create the table I expect my application to write to:

@pytest.fixture
def setup_hive(scope='module'):
    #setup hive database for testing
    spark.sql(f'CREATE DATABASE {test_db}')
    spark.sql(get_script_text('create_table_my_test_table.sql'))
    yield

    #tear down database when testing is finished
    spark.sql(f'DROP TABLE {test_db}.{test_table}')
    spark.sql(f'DROP DATABASE {test_db}')

Finally, my unit test

In my unit test (or should I call it integration test?), I’m probably committing a no-no: I’m exercising two different functions of my application, build_dataframe and save_dataframe, when I should probably only be exercising one function per test. At any rate, the save_dataframe function should write my test data to Hive. I then turn around and read the data from Hive and then assert that the record count in Hive matches the record count of my original test data. If I wanted to be more thorough, I could probably assert on some of the individual records, data types, etc.

def test_write_some_data_to_hive(setup_hive):
    test_data = [('Ward', 'Cleaver', 35), ('June', 'Cleaver', 34), ('Wally', 'Cleaver', 10), ('Theodore', 'Cleaver', 7)]

    test_df = ma.build_dataframe(test_data)
    ma.save_dataframe(test_df, test_db, test_table)

    table_data = spark.sql(f'SELECT * FROM {test_db}.{test_table}')
    assert table_data.count() == len(test_data), 'Test failed: expected {0} records written to the test table'.format(len(test_data))

And, when I run my test from Test Explorer…it passes!

My complete project is here. Happy testing!

A better way to develop PySpark apps

A while back, I wrote about how to unit test your PySpark application using Docker. Since then, I’ve finally embraced a better way to develop and test PySpark applications when you don’t want to install Spark and all its dependencies on your workstation: using the Remote Development extension in VS Code. Here’s a simple tutorial to get you started remote developing–assuming you already have Docker and VS Code installed in your system.

Step 1: Install the Remote Development extension

In the Extensions panel in VS Code, type “remote” in the search textbox. The Remote Development extension should be one of the first to pop up. Click the “install” button to install it.

Step 2: Write your Dockerfile

The Remote Development extension lets you code in a Docker container, but you have to let the extension know what Docker image you want to use. In my case, I’ll stick with the Jupyter organization’s all-spark-notebook. Here’s my simple Dockerfile:

# Dockerfile to build docker image used to test my PySpark client

# from https://hub.docker.com/r/jupyter/all-spark-notebook
FROM jupyter/all-spark-notebook:latest
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$SPARK_HOME/python/lib/py4j-0.10.9.3-src.zip:$PYTHONPATH

WORKDIR /home/jovyan/work/

RUN conda install -c anaconda pytest

Step 3: Setup your devcontainer.json file

In order to enjoy a full development experience in your remote container, you’ll want the Remote Development extension to install all your favorite and useful VS Code extensions in that container. You do that by building out a devcontainer.json configuration file.

Back in the Extensions panel, start looking up your extensions. Since I’ll be doing Python development in Spark, I’ll lookup the Python extension:

See the handy “Add to devcontainer.json” option?

If you click on the gear icon in the page of your extension, you should see a “Add to devcontainer.json” option in the popup menu. Select that option to add the extension to the Remote Development configuration file. If this is your first time creating the configuration file for your project, you’ll get a dialog that says “no container configuration file found:”

Click the “Add Files…” button to create your configuration file

Click the “Add Files…” button to create that configuration file. Before the configuration can be created, though, Remote Development needs to know how you want to configure the core components of your Docker container. At this point, we can simply tell it to use the Dockerfile we created in Step #2:

Tell the extension to use the Dockerfile we already created

Finally, our configuration file will get built with these references. Rinse and repeat for whatever other extensions you might need, especially Python Test Explorer.

Step 4: Fire up your remote container

Did you notice the little, green Remote Window icon in the bottom left corner of your VS Code window?

That’s a handy shortcut I use to activate some of the features of Remote Development. Click that icon, then you’ll see a variety of tasks the extension offers. Select the “Reopen in Container” task to launch your development container:

If all goes well, your container will launch and VS Code will connect to it. Check out my screenshot below and allow me to highlight three items:

  1. If your container fired up successfully, you’ll notice that the Remote Window icon in the bottom left hand corner of VS Code now says “Dev Container”.
  2. One very thoughtful feature of the Remote Development extension is that, when your container launches, the extension automatically opens an interactive window for you so that you now have a command shell in your container.
  3. I’ve noticed that, even though my devcontainer.json file tells the Remote Development extension to load a variety of extensions into my container, those extensions don’t always load properly. You might have to install some of these manually.

Step 5: Start developing

You can find my sample client and unit test script in my Github project.

Step 6: Start testing

It can still be a little tricky to enable easy unit testing in VS Code. One extra thing I found I had to do was, in settings, was set the Pytest Path value to the full path to my pytest module: /opt/conda/bin/pytest.

Take a look at this screenshot and allow me to highlight a few other interesting observations:

  1. When you deploy the VS Code testing explorer, a little lab beaker icon appears at the left of the IDE. Click it to open up the Test Explorer UI.
  2. There’s a core Test Explorer UI in VS Code and then extensions on top of the core package for specific development languages. In my experience, Test Explorer is pretty finnicky and I don’t think I’ve ever gotten it to work with my Python code.
  3. Instead of using the core Test Explorer, I install the Test Explorer extension built special for Python. Here, we can see that this extension did find my one unit test and will allow me to run the unit test from the UI.
  4. This observation is strange: for some reason, Pylance is throwing a warning that it cannot resolve my import of pytest. Nevertheless, my tests still run and pass without issue.

So, I’m finding decent productivity with my PySpark applications when I take advantage of the Remote Development extension and a good Docker image representative of my Production Spark environment. Hope you find this helpful!

« Older posts Newer posts »

© 2024 DadOverflow.com

Theme by Anders NorenUp ↑