Musings of a dad with too much time on his hands and not enough to do. Wait. Reverse that.

Tag: scala (Page 1 of 2)

Spark SQL vs JSON, Part 2

In a previous post, I discussed the challenge of using Spark SQL to deal with missing fields in JSON files. In that instance, the missing fields belonged to a structure column–a column of data type STRUCT. In this post, I’ll show you a way to use Spark SQL to deal with missing top-level fields.

Where did my JSON fields go?

Imagine a “people” table with three columns: first_name, last_name, and middle_name:

sql("""
CREATE TABLE default.people(
	first_name STRING, last_name STRING, middle_name STRING
)
""")

Now, let’s take some JSON data and write it to that table. In this first example, all three fields will be present:

val data1 = """[
{"first_name": "Elvis", "middle_name": "Aaron", "last_name": "Presley"},
{"first_name": "David", "middle_name": "Lee", "last_name": "Roth"},
{"first_name": "Friedrich", "middle_name": "August", "last_name": "Hayek"}
]"""
val df1 = spark.read.json( Seq(data1).toDS() )
df1.write.mode("append").saveAsTable("default.people")

Awesome! All the records inserted into the table as expected.

Now let’s see an example where one record in the JSON data has all three fields but another doesn’t:

val data2 = """[
{"first_name": "Alex", "middle_name": "P", "last_name": "Keaton"}, 
{"first_name": "Al", "last_name": "Bundy"}
]"""
val df2 = spark.read.json( Seq(data2).toDS() )
df2.write.mode("append").saveAsTable("default.people")

Interesting. Although my “Al Bundy” record was missing the “middle_name” field, Hive was smart enough to write a “null” to the column.

Now, let’s try to insert JSON data where the middle_name field is missing from all the records:

val data3 = """[
{"first_name": "Michael", "last_name": "Knight"},
{"first_name": "Mike", "last_name": "Heck"}
]"""
val df3 = spark.read.json( Seq(data3).toDS() )
df3.write.mode("append").saveAsTable("default.people")

Uh-oh! Spark did not like that. Like I found in my last post, it seems that when at least one record is present in your JSON data that completely matches the schema of your Hive table, Spark can easily deal with the other, less complete records. However, when no record fully matches the schema, trouble ensues.

Now, there are a number of ways in Scala and PySpark to deal with missing values. But what about Spark SQL? Is there a way to write Spark SQL to handle missing top-level fields? Yes there is! Spark SQL has two functions–to_json and get_json_object–that can help. We can use to_json to convert the whole record back to JSON and then use get_json_object to search for our wayward field. If get_json_object finds the field, it will return that value. Otherwise, it returns a “null” which is enough to make Spark happy. Check this out:

df3.selectExpr("first_name", "last_name", "get_json_object(to_json(struct(*)), '$.middle_name') AS middle_name").write.mode("append").saveAsTable("default.people")

And so, if you’re only option is Spark SQL, you now have at least one way to deal with missing top-level fields in your data.

JSON, Spark and Hive

I’ve been doing a fair amount of work processing JSON data files with Spark and loading the results into Hive and have made a few interesting observations I thought I’d share:

  • Spark tends to reorder your JSON fields alphabetically which can cause issues writing your dataframes to Hive and
  • Fields in JSON files can legitimately appear and disappear which, again, can cause issues writing your dataframes to Hive.

Side note: In this work, I’m using Spark 3.2.1 and Scala 2.12.15. I do not know if newer versions of Spark behave differently.

Alphabetic ordering of fields

Imagine this very simple JSON:

[
{"my_col": {"colD": "some val", "colB": "another val", "colA": "different val", "colC": "last val"}}
]

Note the decidedly non-alphabetic order of the fields under the “my_col” key. Now, let’s load this data into a dataframe and print its schema:

Note how the columns have been reordered alphabetically. Now what if you created your Hive table with the original field order in mind and then tried to write your dataframe to it? Errors abound!

Field order is clearly important to Spark. But, if we create our table with the fields already alphabetically ordered, our insert will go smoothly:

Fields appearing or disappearing from your JSON

With JSON files, fields can disappear and reappear for legitimate reasons and the JSON file still be valid. For example, consider this JSON:

[
{"person_name": {"first_name": "Alex", "middle_name": "P", "last_name": "Keaton"}}, 
{"person_name": {"first_name": "Al", "last_name": "Bundy"}}
]

In the above JSON, my first record has a middle_name value and so the field is present; however, my second record does not have a middle_name value so the field is gone altogether and the JSON is still legitimate.

Given this, let’s experiment how the appearance or disappearance of the middle_name field affects our ability to write our data to Hive. First, we’ll create our table (note that I’ve already ordered the fields alphabetically to avoid the ordering problem):

Now, let’s create three different datasets:

  • One where every record has first, last, and middle names (df1)
  • One where there’s a mix of records, some with all three fields and some with only two (df2), and
  • One where none of the records have a middle_name field (df3)

So…place your bets: which dataframes, if any, will write successfully to our Hive table? If your answer was dataframes 1 and 2, you win!

But what happens when we try to write the third dataframe to the table?

Errors! Spark finally gets upset about the missing middle_name field. I think Spark was ok with our second dataframe because at least one of the records had all three fields and Spark inferred a schema based on that full example.

So, how might we fix this third dataframe problem? I can think of at least two solutions:

  1. Pre-process the JSON before loading the data into the dataframe to ensure all the fields are present or
  2. Force your dataframe to use the schema you require.

I’m just going to focus on the second solution. There are probably a few ways to force your dataframe into a particular schema, but one option is to use the selectExpr function in combination with the to_json and from_json functions. Basically, I’ll select the person_name column back into JSON with the to_json function and then use the from_json function to force each record into the schema I provide as the second argument. Spark does all the magic of aligning the fields appropriately and creating a NULL value field when no field is present. Here’s my expression:

from_json(to_json(person_name), 'STRUCT<first_name: STRING, last_name: STRING, middle_name: STRING>') AS person_name

…and my results:

Slick, eh? So now if you ever run into a situation where some of your JSON fields are missing, don’t fret: you can always force your dataframe to comply with the schema Hive is expecting.

Unit testing PySpark apps with Docker

I write a fair amount of Spark applications that run on Hadoop platforms and do lots of data parsing, transformation, and loading to HDFS, Hive, or other data repositories. When I code my applications in Scala, I usually use Eclipse and the ScalaTest framework to test my work.

However, I like to write PySpark solutions, too, but haven’t found a great way to test my solutions in an editor like VS Code. Recently, though, it occurred to me that maybe I could just test my code in a ready-made Hadoop environment, like a Docker image for Hadoop. So, here’s a sample application I put together to unit test a PySpark application with Docker.

Setting up my virtual environment

Python has a neat way of creating virtual environments for your projects so you can keep your projects’ dependencies safely isolated from each other. Here are the general steps I followed to create a virtual environment for my PySpark project:

  1. In my WSL2 command shell, navigate to my development folder (change your path as needed): cd /mnt/c/Users/brad/dev
  2. Create a directory for my project: mkdir ./pyspark-unit-testing
  3. Enter the new project folder: cd pyspark-unit-testing
  4. Create a subdir for my tests: mkdir ./tests
  5. Create my virtual environment (note: I initially had some issues doing this in WSL2 and found this blog post helpful in overcoming them): python3 -m venv app-env
  6. Start the new virtual environment: source app-env/bin/activate
  7. Install the pytest package: pip3 install pytest –trusted-host pypi.org –trusted-host files.pythonhosted.org
  8. (Optional) Create a requirements doc of your development environment: pip3 freeze -> requirements-dev.txt
  9. Leave the virtual environment: deactivate
  10. Now, fire up VS Code: code .

Download the Databricks Spark-XML binary

I’ve coded this sample application to ingest an XML file and transform it into a Spark dataframe. Much of the time, you can easily parse such XML files using the Databricks Spark-XML library. This library, though, is not native to Spark, so you must load it as you would a third party library. So, my project will demonstrate that, too! A two-for-one! You can download the JAR file I used from its Maven repository and save it to your project directory.

Write some code

I kept my code sample extremely simple and the full project is available on Github. Here’s a small snippet of my function that takes an XML file, transforms the data into a dataframe, and then returns the dataframe:

def parse_xml(xml_file):
    """Simple function using the Databricks Spark-XML API to parse XML documents into dataframes"""
    df = spark.read.format('com.databricks.spark.xml').\
        option('rootTag', 'catalog').\
        option('rowTag', 'book').load(xml_file)
    return df

Yeah, basically two lines of code…thanks to the Databricks API. If your XML is simple enough, the API can tackle it, but I’ve had some rather horrible XML to parse in the past that went well beyond the capabilities of the Spark-XML library.

Write some tests to test your code

I expect my client application to take an XML file and return a valid dataframe, but I need to test those expectations with unit tests. Here’s two I wrote for this sample application:

def test_parse_xml():
    test_file = './tests/test.xml'
    test_df = mpc.parse_xml(test_file)
    
    assert test_df is not None, 'Expected a dataframe of data to be returned from function'
    assert test_df.count() == 12, 'Received unexpected count from test data'
    assert len(test_df.columns) == 7, 'Expected 7 columns in the test data'

def test_something_else():
    # some other test
    assert True

Set up your Docker container

For most of my Spark needs, I like to use the Jupyter “all-spark-notebook” image. I tend to use the spark-2 version since I usually use Spark v2 at work, so I pulled this image like so:

docker pull /jupyter/all-spark-notebook:spark-2

My Dockerfile is quite simple. It’s two big jobs are copying the Databricks JAR to the container and installing my Python dependencies, which, for this project, is just the pytest package:

# from https://hub.docker.com/r/jupyter/all-spark-notebook
FROM jupyter/all-spark-notebook:spark-2

WORKDIR /home/jovyan/work/

# from https://mvnrepository.com/artifact/com.databricks/spark-xml_2.11/0.11.0
COPY ./spark-xml_2.11-0.11.0.jar /usr/local/spark/jars/.
COPY ./requirements-dev.txt /home/jovyan/work/.
RUN pip install -r requirements-dev.txt --trusted-host pypi.org --trusted-host files.pythonhosted.org

Run the container

When I start my Docker container, I like to include several commands that are not necessary for unit testing but helpful with other uses of the container like its Jupyter Notebook capabilities. Here’s the command I normally use:

docker run -d -p 9000:8888 -e JUPYTER_ENABLE_LAB=yes -e GRANT_SUDO=yes -v /mnt/c/Users/brad/dev/pyspark-unit-testing:/home/jovyan/work my_spark_image:v1

One of the more important arguments here is mounting my project directory to the container so that it has access to my code and tests.

Finally, do some testing

With the container now running, from your terminal, open up a bash shell:

docker exec -it <container_pid> bash

Once you have shell access to your container, you can use the pytest command line utility to run your tests:

pytest tests/ -s --disable-pytest-warnings

If all goes well, you should see a message indicating that your two tests have passed:

My two unit tests have passed!

Hope that helps with your unit testing! Grab my entire project from my Github page.

« Older posts

© 2024 DadOverflow.com

Theme by Anders NorenUp ↑