In the world of data engineering, when the engineer builds a data pipeline to copy data from one system to another, it becomes easy to accidentally insert duplicate records into your target system. For example, your pipeline might break and you have to take steps to backfill the missing information. If your pipeline didn’t break in a clear and obvious spot, you may end of reprocessing the same data more than once.

When I create tables in a conventional relational database, I normally create a primary key field to ensure uniqueness–that I don’t accidentally insert the same record twice into the table. That’s great if my data pipelines write to a relational database: if I end up having to backfill a broken operation, my database can reject data that I already successfully processed the first time around.

However, if my destination data repository is Apache Hive, I don’t have those same safeguards like primary key fields. So, how can you avoid inserting duplicate records into your Hive tables? Here’s and option: use ANTI JOIN.

For starters, suppose I have a table called my_db.people_table (note that I’m testing my code in a PySpark shell running in a jupyter/all-spark-notebook Docker container):

create_db_qry = 'CREATE DATABASE my_db'
create_table_qry = """
CREATE TABLE my_db.people_table (
    person_id INT,
    fname STRING,
    lname STRING
);
"""

spark.sql(create_db_qry)
spark.sql(create_table_qry)

And the results:

>>> spark.sql('SELECT * FROM my_db.people_table ORDER BY person_id').show()
+---------+-----+-----+
|person_id|fname|lname|
+---------+-----+-----+
+---------+-----+-----+

Now, let’s add some data to the table:

initial_data = [(1, 'Andy', 'Griffith'), (2, 'Bee', 'Taylor'), (3, 'Opie', 'Griffith'), (4, 'Barney', 'Fife')]
df = spark.createDataFrame(initial_data, ['person_id', 'fname', 'lname'])
df.write.mode('append').insertInto('my_db.people_table')

Now we have some initial data:

>>> spark.sql('SELECT * FROM my_db.people_table ORDER BY person_id').show()     
+---------+------+--------+
|person_id| fname|   lname|
+---------+------+--------+
|        1|  Andy|Griffith|
|        2|   Bee|  Taylor|
|        3|  Opie|Griffith|
|        4|Barney|    Fife|
+---------+------+--------+

Suppose we need to add more data to the table, but we’re not sure if the data is all original or if the new set contains records we previously processed. Here’s how we might normally do that:

more_data = [(3, 'Opie', 'Griffith'), (4, 'Barney', 'Fife'), (5, 'Floyd', 'Lawson'), (6, 'Gomer', 'Pyle'), (7, 'Otis', 'Campbell')]
df = spark.createDataFrame(more_data, ['person_id', 'fname', 'lname'])
df.write.mode('append').insertInto('my_db.people_table')

Uh-oh: looks like that new data did contain some records we already had:

>>> spark.sql('SELECT * FROM my_db.people_table ORDER BY person_id').show()     
+---------+------+--------+
|person_id| fname|   lname|
+---------+------+--------+
|        1|  Andy|Griffith|
|        2|   Bee|  Taylor|
|        3|  Opie|Griffith|
|        3|  Opie|Griffith|
|        4|Barney|    Fife|
|        4|Barney|    Fife|
|        5| Floyd|  Lawson|
|        6| Gomer|    Pyle|
|        7|  Otis|Campbell|
+---------+------+--------+

We can avoid that dilemma by using an ANTI JOIN statement in our insert operation. Here’s how that would look instead:

more_data = [(3, 'Opie', 'Griffith'), (4, 'Barney', 'Fife'), (5, 'Floyd', 'Lawson'), (6, 'Gomer', 'Pyle'), (7, 'Otis', 'Campbell')]
df = spark.createDataFrame(more_data, ['person_id', 'fname', 'lname'])

# write our new dataset to a temporary table
df.createOrReplaceTempView('people_table_tmp')

# now, craft our INSERT statement to "ANTI JOIN" the temp table to the destination table and only write the delta
antijoin_qry = """INSERT INTO my_db.people_table 
    SELECT t.person_id, t.fname, t.lname 
    FROM (SELECT person_id, fname, lname FROM people_table_tmp a LEFT ANTI JOIN my_db.people_table b ON (a.person_id=b.person_id)) t"""

# execute that anti join statement
spark.sql(antijoin_qry)

# cleanup by dropping the temp table
spark.catalog.dropTempView('people_table_tmp')

And the results:

>>> spark.sql('SELECT * FROM my_db.people_table ORDER BY person_id').show()
+---------+------+--------+
|person_id| fname|   lname|
+---------+------+--------+
|        1|  Andy|Griffith|
|        2|   Bee|  Taylor|
|        3|  Opie|Griffith|
|        4|Barney|    Fife|
|        5| Floyd|  Lawson|
|        6| Gomer|    Pyle|
|        7|  Otis|Campbell|
+---------+------+--------+

Wow! Looks so much better. So, if you suffer from duplicate data in your Hive tables, give ANTI JOIN a try!