How to perform self-join in PySpark Azure Databricks?

Are you looking to find out how to perform self-join in PySpark Azure Databricks cloud or maybe you are looking for a solution, to find a method to do self-join in PySpark? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use both PySpark and Spark SQL way of doing a self-join in Azuure Databricks. I will explain it with a practical example. So please don’t waste time let’s start with a step-by-step guide to understand perform self-join in PySpark Azure Databricks.

In this blog, I will teach you the following with practical examples:

  • Syntax of join()
  • Self-join using PySpark join() function
  • Self-join using SQL expression

join() method is used to join two Dataframes together based on condition specified in PySpark Azure Databricks.

Syntax: dataframe_name.join()

What is the syntax of the join() function in PySpark Azure Databricks?

The syntax is as follows:

dataframe_name.join(other, on, how)
Parameter NameRequiredDescription
other (Dataframe)YesIt represents the second column to be joined.
on (str, list, or Column)YesA string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides.
how (str)OptionalIt represents join type, by default how=”inner”.
Table 1: join() Method in PySpark Databricks Parameter list with Details

Apache Spark Official documentation link: join()

Create a simple DataFrame

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

a) Create manual PySpark DataFrame

data = [
    (1,"Hilton",None),
    (2,"Mei",None),
    (3,"Renaldo",2),
    (4,"Marlo",2),
    (5,"Daniela",1)
]

df = spark.createDataFrame(data, schema=["emp_id", "emp_name", "mng_id"])
df.printSchema()
df.show()

"""
root
 |-- emp_id: long (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- mng_id: long (nullable = true)

+------+--------+------+
|emp_id|emp_name|mng_id|
+------+--------+------+
|     1|  Hilton|  null|
|     2|     Mei|  null|
|     3| Renaldo|     2|
|     4|   Marlo|     2|
|     5| Daniela|     1|
+------+--------+------+
"""

b) Creating a DataFrame by reading files

Download and use the below source file.

# replace the file_path with the source file location which you have downloaded.

df_2 = spark.read.format("csv").option("header", True).load(file_path)
df_2.printSchema()

"""
root
 |-- emp_id: long (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- mng_id: long (nullable = true)
"""

Note: Here, I will be using the manually created DataFrame.

How to perform self-join in PySpark Azure Databricks using the join() function?

Because PySpark lacks an inbuilt function for performing self-join, we must devise a solution. You can see that we have an employee DataFrame with an id, name, and manager id. Let’s try to fetch the manager name of each employee using self-join.

Example:

from pyspark.sql.functions import col

# Joining df with df

df.alias("a") \
.join(df.alias("b"), col("a.mng_id") == col("b.emp_id"), "left") \
.selectExpr("a.emp_id", "a.emp_name", "b.emp_name as mng_name").show()

"""
Output:

+------+--------+--------+
|emp_id|emp_name|mng_name|
+------+--------+--------+
|     1|  Hilton|    null|
|     2|     Mei|    null|
|     5| Daniela|  Hilton|
|     3| Renaldo|     Mei|
|     4|   Marlo|     Mei|
+------+--------+--------+
"""

In the preceding example, we can see that we used a left join to perform a self-join. Also, we have provided aliases for both DataFrames, because of similar column names. If we haven’t specified any aliases, PySpark will get confused.

How to perform self-join in PySpark Azure Databricks using SQL expression?

In this section, let’s solve the above-mentioned problem using SQL expressions. In order to use a raw SQL expression, we have to convert our DataFrame into a SQL view.

Example:

df.createOrReplaceTempView("employee")

spark.sql('''
    SELECT a.emp_id, a.emp_name, b.emp_name AS mng_name
    FROM employee AS `a`
    LEFT JOIN employee AS `b` ON a.mng_id = b.emp_id
''').show()

"""
Output:

+------+--------+--------+
|emp_id|emp_name|mng_name|
+------+--------+--------+
|     1|  Hilton|    null|
|     2|     Mei|    null|
|     5| Daniela|  Hilton|
|     3| Renaldo|     Mei|
|     4|   Marlo|     Mei|
+------+--------+--------+
"""

I have attached the complete code used in this blog in notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you use self-join in PySpark using Azure Databricks?

This could be the possible reason, whenever you want to join or combine a DataFrame by itself we could go for self-join.

Real World Use Case Scenarios for using self-join in PySpark Azure Databricks?

Assume that you have an employee dataset with their id, name, and manager id. The person who is the manager will not have any manager id. You got a requirement to show the employee’s id, name, and corresponding manager’s name. You can perform a self join.

What are the alternatives for performing self-join in PySpark using Azure Databricks?

There are multiple alternatives for self-join in PySpark DataFrame, which are as follows:

  • DataFrame.join(): used for combining DataFrames
  • Using PySpark SQL expressions

Final Thoughts

In this article, we have learned about how to perform self-join in PySpark Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

Arud Seka Berne S

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.