How to join multiple DataFrames in PySpark Azure Databricks?

Are you looking to find out how to join multiple DataFrames in PySpark Azure Databricks cloud or maybe you are looking for a solution, to remove duplicate joining columns while performing joins in PySpark using Azure Databricks? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use both PySpark and Spark SQL way of doing a multiple-column join in Azure Databricks. I will explain it with a practical example. So please don’t waste time let’s start with a step-by-step guide to understand how to join multiple DataFrames in PySpark Azure Databricks.

In this blog, I will teach you the following with practical examples:

  • Syntax of join()
  • Joining two DataFrames
  • Joining multiple DataFrames
  • Multiple joining conditions
  • Multiple joining conditions using where and filter functions
  • Multiple DataFrame joining using SQL expression

join() method is used to join two Dataframes together based on condition specified in PySpark Azure Databricks.

Syntax: dataframe_name.join()

What is the syntax of the join() function in PySpark Azure Databricks?

The syntax is as follows:

dataframe_name.join(other, on, how)
Parameter NameRequiredDescription
other (Dataframe)YesIt represents the second column to be joined.
on (str, list, or Column)YesA string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides.
how (str)OptionalIt represents join type, by default how=”inner”.
Table 1: join() Method in PySpark Databricks Parameter list with Details

Apache Spark Official documentation link: join()

Create a simple DataFrame

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

a) Create manual PySpark DataFrame

# 1. Student Dataset

student_data = [
    (1,"Karee",1,"A"),
    (2,"Jobie",2,"B"),
    (3,"Kyle",3,"A"),
    (4,"Georges",1,"B"),
    (5,"Tracey",2,"A")
]

std_df = spark.createDataFrame(student_data, schema=["id","name","dept_id","section"])
std_df.printSchema()
std_df.show(truncate=False)

"""
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dept_id: long (nullable = true)
 |-- section: string (nullable = true)

+---+-------+-------+-------+
|id |name   |dept_id|section|
+---+-------+-------+-------+
|1  |Karee  |1      |A      |
|2  |Jobie  |2      |B      |
|3  |Kyle   |3      |A      |
|4  |Georges|1      |B      |
|5  |Tracey |2      |A      |
+---+-------+-------+-------+
"""
# 2. Department Dataset

dept_data = [
    (1,"civil","A",301),
    (1,"civil","B",302),
    (2,"mech","A",401),
    (2,"mech","B",402),
    (3,"ece","A",501),
    (3,"ece","B",502)
]

columns = ["id","name","section","hall_no"]
dept_df = spark.createDataFrame(data = dept_data, schema=columns)
dept_df.printSchema()
dept_df.show(truncate=False)

"""
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- section: string (nullable = true)
 |-- hall_no: long (nullable = true)

+-------+---------+-------+-------+
|id|name|section|hall_no|
+-------+---------+-------+-------+
|1      |civil    |A      |301    |
|1      |civil    |B      |302    |
|2      |mech     |A      |401    |
|2      |mech     |B      |402    |
|3      |ece      |A      |501    |
|3      |ece      |B      |502    |
+-------+---------+-------+-------+
"""
# Hall details
hall_data = [  
    (301,1),
    (302,1),
    (401,2),
    (402,2),
    (501,3),
    (502,3)
]

hall_df = spark.createDataFrame(data=hall_data, schema=["hall_no","floor_no"])
hall_df.printSchema()
hall_df.show(truncate=False)

"""
root
 |-- hall_no: long (nullable = true)
 |-- floor_no: long (nullable = true)
"""

b) Creating a DataFrame by reading files

Download and use the below source file.

# replace the file_paths with the source file location which you have downloaded.

std_df_2 = spark.read.format("csv").option("header", True).load(student_file_path)
std_df_2.printSchema()

dept_df_2 = spark.read.format("csv").option("header", True).load(department_file_path)
dept_df_2.printSchema()

hall_df_2 = spark.read.format("csv").option("header", True).load(hall_file_path)
hall_df_2.printSchema()

"""
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dept_id: long (nullable = true)
 |-- section: string (nullable = true)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- section: string (nullable = true)
 |-- hall_no: long (nullable = true)

root
 |-- hall_no: long (nullable = true)
 |-- floor_no: long (nullable = true)
"""

Note: Here, I will be using the manually created DataFrame.

How to join two DataFrames in PySpark Azure Databricks?

In this section, let’s see how to join two DataFrames in PySpark Azure Databricks with an example using an inner join.

Example:

dept_df.join(hall_df, dept_df.hall_no == hall_df.hall_no, "inner").show()

"""
Output:

+---+-----+-------+-------+-------+--------+
| id| name|section|hall_no|hall_no|floor_no|
+---+-----+-------+-------+-------+--------+
|  1|civil|      A|    301|    301|       1|
|  1|civil|      B|    302|    302|       1|
|  2| mech|      A|    401|    401|       2|
|  2| mech|      B|    402|    402|       2|
|  3|  ece|      A|    501|    501|       3|
|  3|  ece|      B|    502|    502|       3|
+---+-----+-------+-------+-------+--------+

"""

In this example, we have joined two DataFrames based on column “hall_no” using an inner join, which returns all the matching records from both DataFrames.

Note: Since joining DataFrames involves in shuffling, always keep an eye on it.

How to remove duplicate columns while joining DataFrames in PySpark Azure Databricks?

In the above example, you can see that we have two “hall_no” columns. Let’s see how to remove duplicate columns while joining DataFrames in PySpark Azure Databricks with an example.

Example:

# Method 1:
dept_df.join(hall_df, "hall_no", "inner").show()

# Method 2:
dept_df.join(hall_df, ["hall_no"], "inner").show()

# The above methods generates the following output

"""
Output:

+-------+---+-----+-------+--------+
|hall_no| id| name|section|floor_no|
+-------+---+-----+-------+--------+
|    301|  1|civil|      A|       1|
|    302|  1|civil|      B|       1|
|    401|  2| mech|      A|       2|
|    402|  2| mech|      B|       2|
|    501|  3|  ece|      A|       3|
|    502|  3|  ece|      B|       3|
+-------+---+-----+-------+--------+

"""

In this example, we have no duplicate columns in the output DataFrame. By specifying the column names in a String or Array fashion, you can remove the duplicate columns. If you have only one joining column use String fashion else go for Array.

How to join multiple DataFrames in PySpark Azure Databricks?

In this section, let’s see how to join multiple DataFrames in PySpark Azure Databricks with an example using the chaining method, which is nothing but using the join() function on a join() function. Let’s understand this with an example.

Example:

std_df \
.join(dept_df, std_df.dept_id == dept_df.id, "inner") \ # First chain
.join(hall_df, "hall_no", "inner").show(5) # Second chain

"""
Output:

+-------+---+-------+-------+-------+---+-----+-------+--------+
|hall_no| id|   name|dept_id|section| id| name|section|floor_no|
+-------+---+-------+-------+-------+---+-----+-------+--------+
|    301|  4|Georges|      1|      B|  1|civil|      A|       1|
|    301|  1|  Karee|      1|      A|  1|civil|      A|       1|
|    302|  4|Georges|      1|      B|  1|civil|      B|       1|
|    302|  1|  Karee|      1|      A|  1|civil|      B|       1|
|    401|  5| Tracey|      2|      A|  2| mech|      A|       2|
+-------+---+-------+-------+-------+---+-----+-------+--------+

"""

As mentioned before, in order to join multiple DataFrames we have to use a chaining method by adding a join() function on a join() function.

How to join multiple columns in PySpark Azure Databricks?

In this section, let’s see how to join multiple columns in PySpark Azure Databricks with an examples.

Example:

multiple_join_condition  = (std_df.dept_id == dept_df.id) & (std_df.section == dept_df.section)
std_dept_df = std_df.join(dept_df, multiple_join_condition , how="inner")
std_dept_df.show()

"""
Output:

+---+-------+-------+-------+---+-----+-------+-------+
| id|   name|dept_id|section| id| name|section|hall_no|
+---+-------+-------+-------+---+-----+-------+-------+
|  1|  Karee|      1|      A|  1|civil|      A|    301|
|  4|Georges|      1|      B|  1|civil|      B|    302|
|  5| Tracey|      2|      A|  2| mech|      A|    401|
|  2|  Jobie|      2|      B|  2| mech|      B|    402|
|  3|   Kyle|      3|      A|  3|  ece|      A|    501|
+---+-------+-------+-------+---+-----+-------+-------+


"""

Remember, always specific condition inside ‘()’ brackets, because ‘==' has lower precedence than bitwise AND and OR.

Since we have duplicate columns in the std_dept_df DataFrame, we can’t call straight away, which leads to an error. Therefore, we have used the department DataFrame to call the specific column.

How to join multiple DataFrames in PySpark Azure Databricks using where() and filter() functions?

In this section, let’s see how to join multiple DataFrames in PySpark Azure Databricks using where() and filter() functions with multiple examples.

Example:

Note: In order to join DataFrames based on multiple conditions, multiple join conditions has been specified.

first_condition = std_df.dept_id == dept_df.id
second_condition = std_df.section == dept_df.section

# 1. Multiple join using where
std_dept_df_2 = std_df.join(dept_df, first_condition, how="inner") # <- first condition in join()
std_dept_df_2 = std_dept_df_2.where(second_condition) # <- Second condition in where()
std_dept_df_2.show()

# 2. Multiple join using where
std_dept_df_3 = std_df.join(dept_df, first_condition, how="inner") # <- first condition in join()
std_dept_df_3 = std_dept_df_2.filter(second_condition) # <- Second condition in filter()
std_dept_df_3.show()

# The above code generated the following output.

"""
Output:

+---+-------+-------+-------+---+-----+-------+-------+
| id|   name|dept_id|section| id| name|section|hall_no|
+---+-------+-------+-------+---+-----+-------+-------+
|  1|  Karee|      1|      A|  1|civil|      A|    301|
|  5| Tracey|      2|      A|  2| mech|      A|    401|
|  3|   Kyle|      3|      A|  3|  ece|      A|    501|
|  4|Georges|      1|      B|  1|civil|      B|    302|
|  2|  Jobie|      2|      B|  2| mech|      B|    402|
+---+-------+-------+-------+---+-----+-------+-------+

"""

Remember, always specific condition inside ‘()’ brackets, because ‘==' has lower precedence than bitwise AND and OR.

Since we have duplicate columns in the std_dept_df DataFrame, we can’t call straight away, which leads to an error. Therefore, we have used the department DataFrame to call the specific column.

How to join multiple DataFrames in PySpark Azure Databricks using SQL expression?

In this section, let’s try to join multiple DataFrames using student and department datasets using SQL expressions. In order to use a raw SQL expression, we have to convert our DataFrame into a SQL view.

std_df.createOrReplaceTempView("student")
dept_df.createOrReplaceTempView("department")
hall_df.createOrReplaceTempView("hall")

Example:

spark.sql('''
    SELECT *
    FROM student AS std
    JOIN department AS dept
    ON std.dept_id = dept.id AND std.section = dept.section
    JOIN hall ON hall.hall_no = dept.hall_no
''').show()

"""
Output:

+---+-------+-------+-------+---+-----+-------+-------+-------+--------+
| id|   name|dept_id|section| id| name|section|hall_no|hall_no|floor_no|
+---+-------+-------+-------+---+-----+-------+-------+-------+--------+
|  1|  Karee|      1|      A|  1|civil|      A|    301|    301|       1|
|  4|Georges|      1|      B|  1|civil|      B|    302|    302|       1|
|  5| Tracey|      2|      A|  2| mech|      A|    401|    401|       2|
|  2|  Jobie|      2|      B|  2| mech|      B|    402|    402|       2|
|  3|   Kyle|      3|      A|  3|  ece|      A|    501|    501|       3|
+---+-------+-------+-------+---+-----+-------+-------+-------+--------+

"""

I have attached the complete code used in this blog in notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you join multiple DataFrames in PySpark using Azure Databricks?

These could be the possible reasons:

  1. PySpark join() function is one of the most commonly used transformations. You can use the join() function for combining two DataFrames.
  2. For combining more than two DataFrames.

Real World Use Case Scenarios for joining multiple DataFrames in PySpark Azure Databricks?

Assume that you have a student and department data set. The student dataset has the student id, name, and department id. And the department dataset has the department id and name of that department. You can combine the student and department dataset with the department id column to see each student’s department.

What are the alternatives for joining multiple DataFrames in PySpark using Azure Databricks?

There are multiple alternatives for joining multiple DataFrames in PySpark DataFrame, which are as follows:

  • DataFrame.join()
  • Using PySpark SQL expression

Final Thoughts

In this article, we have learned about how to join multiple DataFrames in PySpark Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

PySpark in Azure Databricks, as explained by Arud Seka Berne S on azurelib.com.

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.