Musings of a dad with too much time on his hands and not enough to do. Wait. Reverse that.

Category: technology (Page 1 of 36)

Spark SQL vs JSON, Part 2

In a previous post, I discussed the challenge of using Spark SQL to deal with missing fields in JSON files. In that instance, the missing fields belonged to a structure column–a column of data type STRUCT. In this post, I’ll show you a way to use Spark SQL to deal with missing top-level fields.

Where did my JSON fields go?

Imagine a “people” table with three columns: first_name, last_name, and middle_name:

sql("""
CREATE TABLE default.people(
	first_name STRING, last_name STRING, middle_name STRING
)
""")

Now, let’s take some JSON data and write it to that table. In this first example, all three fields will be present:

val data1 = """[
{"first_name": "Elvis", "middle_name": "Aaron", "last_name": "Presley"},
{"first_name": "David", "middle_name": "Lee", "last_name": "Roth"},
{"first_name": "Friedrich", "middle_name": "August", "last_name": "Hayek"}
]"""
val df1 = spark.read.json( Seq(data1).toDS() )
df1.write.mode("append").saveAsTable("default.people")

Awesome! All the records inserted into the table as expected.

Now let’s see an example where one record in the JSON data has all three fields but another doesn’t:

val data2 = """[
{"first_name": "Alex", "middle_name": "P", "last_name": "Keaton"}, 
{"first_name": "Al", "last_name": "Bundy"}
]"""
val df2 = spark.read.json( Seq(data2).toDS() )
df2.write.mode("append").saveAsTable("default.people")

Interesting. Although my “Al Bundy” record was missing the “middle_name” field, Hive was smart enough to write a “null” to the column.

Now, let’s try to insert JSON data where the middle_name field is missing from all the records:

val data3 = """[
{"first_name": "Michael", "last_name": "Knight"},
{"first_name": "Mike", "last_name": "Heck"}
]"""
val df3 = spark.read.json( Seq(data3).toDS() )
df3.write.mode("append").saveAsTable("default.people")

Uh-oh! Spark did not like that. Like I found in my last post, it seems that when at least one record is present in your JSON data that completely matches the schema of your Hive table, Spark can easily deal with the other, less complete records. However, when no record fully matches the schema, trouble ensues.

Now, there are a number of ways in Scala and PySpark to deal with missing values. But what about Spark SQL? Is there a way to write Spark SQL to handle missing top-level fields? Yes there is! Spark SQL has two functions–to_json and get_json_object–that can help. We can use to_json to convert the whole record back to JSON and then use get_json_object to search for our wayward field. If get_json_object finds the field, it will return that value. Otherwise, it returns a “null” which is enough to make Spark happy. Check this out:

df3.selectExpr("first_name", "last_name", "get_json_object(to_json(struct(*)), '$.middle_name') AS middle_name").write.mode("append").saveAsTable("default.people")

And so, if you’re only option is Spark SQL, you now have at least one way to deal with missing top-level fields in your data.

JSON, Spark and Hive

I’ve been doing a fair amount of work processing JSON data files with Spark and loading the results into Hive and have made a few interesting observations I thought I’d share:

  • Spark tends to reorder your JSON fields alphabetically which can cause issues writing your dataframes to Hive and
  • Fields in JSON files can legitimately appear and disappear which, again, can cause issues writing your dataframes to Hive.

Side note: In this work, I’m using Spark 3.2.1 and Scala 2.12.15. I do not know if newer versions of Spark behave differently.

Alphabetic ordering of fields

Imagine this very simple JSON:

[
{"my_col": {"colD": "some val", "colB": "another val", "colA": "different val", "colC": "last val"}}
]

Note the decidedly non-alphabetic order of the fields under the “my_col” key. Now, let’s load this data into a dataframe and print its schema:

Note how the columns have been reordered alphabetically. Now what if you created your Hive table with the original field order in mind and then tried to write your dataframe to it? Errors abound!

Field order is clearly important to Spark. But, if we create our table with the fields already alphabetically ordered, our insert will go smoothly:

Fields appearing or disappearing from your JSON

With JSON files, fields can disappear and reappear for legitimate reasons and the JSON file still be valid. For example, consider this JSON:

[
{"person_name": {"first_name": "Alex", "middle_name": "P", "last_name": "Keaton"}}, 
{"person_name": {"first_name": "Al", "last_name": "Bundy"}}
]

In the above JSON, my first record has a middle_name value and so the field is present; however, my second record does not have a middle_name value so the field is gone altogether and the JSON is still legitimate.

Given this, let’s experiment how the appearance or disappearance of the middle_name field affects our ability to write our data to Hive. First, we’ll create our table (note that I’ve already ordered the fields alphabetically to avoid the ordering problem):

Now, let’s create three different datasets:

  • One where every record has first, last, and middle names (df1)
  • One where there’s a mix of records, some with all three fields and some with only two (df2), and
  • One where none of the records have a middle_name field (df3)

So…place your bets: which dataframes, if any, will write successfully to our Hive table? If your answer was dataframes 1 and 2, you win!

But what happens when we try to write the third dataframe to the table?

Errors! Spark finally gets upset about the missing middle_name field. I think Spark was ok with our second dataframe because at least one of the records had all three fields and Spark inferred a schema based on that full example.

So, how might we fix this third dataframe problem? I can think of at least two solutions:

  1. Pre-process the JSON before loading the data into the dataframe to ensure all the fields are present or
  2. Force your dataframe to use the schema you require.

I’m just going to focus on the second solution. There are probably a few ways to force your dataframe into a particular schema, but one option is to use the selectExpr function in combination with the to_json and from_json functions. Basically, I’ll select the person_name column back into JSON with the to_json function and then use the from_json function to force each record into the schema I provide as the second argument. Spark does all the magic of aligning the fields appropriately and creating a NULL value field when no field is present. Here’s my expression:

from_json(to_json(person_name), 'STRUCT<first_name: STRING, last_name: STRING, middle_name: STRING>') AS person_name

…and my results:

Slick, eh? So now if you ever run into a situation where some of your JSON fields are missing, don’t fret: you can always force your dataframe to comply with the schema Hive is expecting.

Interesting way to use Pandas to_datetime

I do a lot of work with timestamps in Pandas dataframes and tend to use the to_datetime function quite a lot to cast my timestamps–usually read as strings–into proper datetime objects. Here’s a simple scenario where I cast my string timestamp column into a datetime:

import pandas as pd

d = [{'ts': '2023-01-01 12:05:00'}, {'ts': '2023-01-09 13:23:00'}, {'ts': '2023-01-11 08:37:00'}, {'ts': '2023-01-13 15:45:00'}]
df = pd.DataFrame(d)
pd.to_datetime(df.ts)

In the above, I explicitly pass the column “ts” to the function. However, I recently discovered another way to use to_datetime where you don’t have to be so explicit:

d1 = [{'year':2023,'month':1, 'day': 1},{'year':2023,'month':2, 'day': 9},{'year':2023,'month':3, 'day': 11},{'year':2023,'month':4, 'day': 13}]
df1 = pd.DataFrame(d1)
pd.to_datetime(df1)

Passing a dataframe to the function with columns named as year, month, and day seems to be enough to get the function to do its thing. That’s pretty cool!

The Assign function: at long last

I knew there had to be a way to add new columns inline (in a chain of commands) to a Pandas dataframe. The assign function is a way to do that.

Suppose I only have year and month columns in my dataframe. I can use assign to add a day column and perform my datetime conversion:

d2 = [{'year':2023,'month':1},{'year':2023,'month':2},{'year':2023,'month':3},{'year':2023,'month':4}]
df2 = pd.DataFrame(d2)
pd.to_datetime(df2.assign(day=1))

to_period: another useful datetime related function

Recently I was working with a dataset where the events only had year/month values. The to_period function is there to help with such situations:

d3 = [{'year':2023,'month':1},{'year':2023,'month':2},{'year':2023,'month':3},{'year':2023,'month':4}]
df3 = pd.DataFrame(d3)
pd.to_datetime(df3.assign(day=1)).dt.to_period('M')

How intuitive is to_datetime?

Just how intuitive is this less explicit way of using to_datetime? Can it read and cast month names? The answer is: it depends. Pandas version 1.3.5 doesn’t like month names:

d4 = [{'year':2023,'month':'January'},{'year':2023,'month':'February'},{'year':2023,'month':'March'},{'year':2023,'month':'April'}]
df4 = pd.DataFrame(d4)
pd.to_datetime(df4.assign(day=1)).dt.to_period('M')

However, I’ve found that earlier versions of Pandas will successfully parse month names. To resolve these value errors, you’ll have to add a line of code to convert your month names to their associated numeric values:

from datetime import datetime

d4 = [{'year':2023,'month':'January'},{'year':2023,'month':'February'},{'year':2023,'month':'March'},{'year':2023,'month':'April'}]
df4 = pd.DataFrame(d4)
df4['month'] = df4.month.apply(lambda m: datetime.strptime(m, '%B').month)
pd.to_datetime(df4.assign(day=1)).dt.to_period('M')

So, here are yet more ways to leverage Pandas with your timestamps!

« Older posts

© 2024 DadOverflow.com

Theme by Anders NorenUp ↑