How to partition records in PySpark Azure Databricks?

Are you looking to find out how to partition PySpark DataFrame in the Azure Databricks cloud, or maybe you are looking for a solution to split records based on column identical values in PySpark Databricks using the partitionBy() method? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use the partitionBy() function in DataFrames in Azure Databricks. I will explain it by taking a practical example. So don’t waste time let’s start with a step-by-step guide to understanding how to split records by column values in PySpark DataFrame.

In this blog, I will teach you the following with practical examples:

  • Syntax of partitionBy()
  • Partition data by a single column
  • Partition data by multiple columns
  • Fetch records from specific partition folder
  • Control the number of partition file
  • Control the number of records per partition file

partitionBy() method is used to split a large dataset into smaller ones based on single or multiple columns on disk.

Syntax:

dataframe_name.write.partitionBy()

What is the syntax of the partitionBy() function in PySpark Azure Databricks?

The syntax is as follows:

dataframe_name.write.partitionBy(columns)
Parameter NameRequiredDescription
columns (str or list)YesIt represents the columns to be considered for partitioning
Table 1: partitionBy() Method in PySpark Databricks Parameter list with Details

Official documentation link: partitionBy()

Create a simple DataFrame

Let’s understand the use of the partitionBy() function with a variety of examples. Let’s start by creating a DataFrame.

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

a) Create manual PySpark DataFrame

data = [
    ("Olga Glatskikh","Russia",2004,"Rhythmic Gymnastics",1,0,0,1),
    ("Anna Pavlova","Russia",2004,"Swimming",0,0,2,2),
    ("Dan Izotov","Russia",2008,"Swimming",0,1,0,1),
    ("Katie Ledecky","United States",2012,"Swimming",1,0,0,1),
    ("Kyla Ross","United States",2008,"Gymnastics",1,0,0,1),
    ("Tasha Schwikert-Warren","United States",2000,"Gymnastics",0,0,1,1),
    ("Yang Yun","China",2000,"Gymnastics",0,0,1,1),
    ("Yang Yilin","China",2008,"Diving",1,0,2,3),
    ("Chen Ruolin","China",2008,"Diving",2,0,0,2)
]

df = spark.createDataFrame(data, schema=["Athlete","Country","Year","Sport","Gold","Silver","Bronze","Total"])
df.printSchema()
df.show(truncate=False)

"""
root
 |-- Athlete: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Gold: long (nullable = true)
 |-- Silver: long (nullable = true)
 |-- Bronze: long (nullable = true)
 |-- Total: long (nullable = true)

+----------------------+-------------+----+-------------------+----+------+------+-----+
|Athlete               |Country      |Year|Sport              |Gold|Silver|Bronze|Total|
+----------------------+-------------+----+-------------------+----+------+------+-----+
|Olga Glatskikh        |Russia       |2004|Rhythmic Gymnastics|1   |0     |0     |1    |
|Anna Pavlova          |Russia       |2004|Swimming           |0   |0     |2     |2    |
|Dan Izotov            |Russia       |2008|Swimming           |0   |1     |0     |1    |
|Katie Ledecky         |United States|2012|Swimming           |1   |0     |0     |1    |
|Kyla Ross             |United States|2008|Gymnastics         |1   |0     |0     |1    |
|Tasha Schwikert-Warren|United States|2000|Gymnastics         |0   |0     |1     |1    |
|Yang Yun              |China        |2000|Gymnastics         |0   |0     |1     |1    |
|Yang Yilin            |China        |2008|Diving             |1   |0     |2     |3    |
|Chen Ruolin           |China        |2008|Diving             |2   |0     |0     |2    |
+----------------------+-------------+----+-------------------+----+------+------+-----+
"""

b) Creating a DataFrame by reading files

Download and use the below source file.

# replace the file_path with the source file location which you have downloaded.

df_2 = spark.read.format("csv").option("inferSchema", True).option("header", True).load("file_path")
df_2.printSchema()

"""
root
 |-- Athlete: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Gold: integer (nullable = true)
 |-- Silver: integer (nullable = true)
 |-- Bronze: integer (nullable = true)
 |-- Total: integer (nullable = true)
"""

Note: Here, I will be using the manually created DataFrame.

In this blog, I have used Azure Data Lake Storage Gen2 for writing the partitioned records. So that I can demonstrate to you how the files are stored in ADLS using the dbutils.fs() function, To use ADLS in Azure databricks, kindly mount your container to databricks.

How to choose partitioning columns?

Always go for columns that can be categorized, for example: ‘state’, ‘country’, ‘year’, department’, etc.

Never go for an uncategorized column, for example: ‘address’, ’email’, etc.

How to partition single column records in PySpark Azure Databricks?

The PySpark function partitionBy() is used for dividing large datasets into smaller datasets based on single column values. Here is a practical example that can be used to understand partition further.

Example:

sink_path = "/mnt/azurelibadls/practice/partitionBy/partitionByYear/"

df.write.option("header", True) \
.partitionBy("Year") \
.format("csv").mode("overwrite") \
.save(sink_path)

for each_folder in dbutils.fs.ls(sink_path):
    print(each_folder.path)

"""
Output:

dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYear/Year=2000/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYear/Year=2004/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYear/Year=2008/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYear/Year=2012/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYear/_SUCCESS

"""

As you can see, the dataset was divided into four partitions based on column ‘Year’.

Note: Azure Databricks provides notebook functionality. Here we have used dbutils.fs() which is a functionality of Azure FileStore. With the help of dbutils.fs(), we can fetch the files mentioned above.

How to partition multiple column records in PySpark Azure Databricks?

The partitionBy() function in PySpark is used to divide large datasets into smaller datasets based on multiple-column values. Here is a practical example that can be used to understand partitioning further.

Example:

sink_path = "/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/"

df.write.option("header", True) \
.partitionBy(["Year", "Country"]) \
.format("csv").mode("overwrite") \
.save(sink_path)

# First layer of partitioning
for each_folder in dbutils.fs.ls(sink_path):
    print(each_folder.path)

print("----------------")

# Second layer of partitioning
for each_folder in dbutils.fs.ls(f"{sink_path}/Year=2008/"):
    print(each_folder.path)

"""
Output:

dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/Year=2000/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/Year=2004/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/Year=2008/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/Year=2012/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/_SUCCESS
----------------
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/Year=2008/Country=China/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/Year=2008/Country=Russia/
dbfs:/mnt/azurelibadls/practice/partitionBy/partitionByYearAndCountry/Year=2008/Country=United States/
"""

As you can see the above code, partitions the data on both columns ‘Year’ and ‘Country’.

How to read partitioned records in PySpark Azure Databricks?

Example:

year_2012_df = spark.read.format("csv").option("header", True) \
.csv("/mnt/azurelibadls/practice/partitionBy/partitionByYear/Year=2012")

year_2012_df.show()

"""
Output:

+-------------+-------------+--------+----+------+------+-----+
|      Athlete|      Country|   Sport|Gold|Silver|Bronze|Total|
+-------------+-------------+--------+----+------+------+-----+
|Katie Ledecky|United States|Swimming|   1|     0|     0|    1|
+-------------+-------------+--------+----+------+------+-----+

"""

How to control the number of partition files in PySpark Azure Databricks?

In this section, I will teach you how to control the number of partition files in PySpark Azure Databricks.

Example:

sink_path = "/mnt/azurelibadls/practice/partitionBy/partitionByCountry"

df.repartition(1) \
.write.partitionBy("Country") \
.option("header", True) \
.format("csv").mode("overwrite") \
.save(sink_path)

for each_folder in dbutils.fs.ls(f"{sink_path}/Country=Russia"):
    if each_folder.name[0] == "p":
        print(each_folder.name)

"""
Output:

part-00000-tid-3189273343550774082-4fd9cdb9-e15a-41b0-9b7e-f65cc2288d3a-2267-2.c000.csv


"""

As you can see, the above code prints only one file, this happens because of the repartition() function as it takes one as input.

How to control the number of records per partition file in PySpark Azure Databricks?

In this section, I will teach you how to control the number of records per partition file in PySpark Azure Databricks.

Example:

sink_path = "mnt/azurelibadls/practice/partitionBy/partitionByCountry_2"

# Write file with maximum of two records per partition
df.write.option("header", True) \
.option("maxRecordsPerFile", 2) \
.partitionBy("Country") \
.format("csv").mode("overwrite") \
.save(f"/{sink_path}")

# Check number of records in each partition
for each_folder in dbutils.fs.ls(f"/{sink_path}/Country=Russia"):
    if each_folder.name[0] == "p":
        with  open(f"/dbfs/{sink_path}/Country=Russia/{each_folder.name}") as file:
            for line in file:
             print(line.strip())
        print("-----------")

"""
Output:

Athlete,Year,Sport,Gold,Silver,Bronze,Total
Olga Glatskikh,2004,Rhythmic Gymnastics,1,0,0,1
Anna Pavlova,2004,Swimming,0,0,2,2
-----------
Athlete,Year,Sport,Gold,Silver,Bronze,Total
Dan Izotov,2008,Swimming,0,1,0,1
-----------

"""

I have attached the complete code used in this blog in notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you use the PySpark partitionBy() function in Azure Databricks?

Using the partitionBy() function enhances data processing efficiency and facilitates data analysis. Depending on our needs for the data, the partitioning of the data can take place on the disc or in memory.

Real World Use Case Scenarios for PySpark DataFrame partitionBy() function in Azure Databricks?

Assume that you have 100M records of student data. The student dataset has their id, name, school, and city. You only have 10 student who studies in Chennai. And you got a requirement to fetch all the student who studies in Chennai. YOu can easily achieve this by filtering out records. But the time consumption is high because the execution will iterate through each record to check the city column. To avoid this we can partition and write student records on city column basics.

What are the alternatives of the partitionBy() function in PySpark Azure Databricks?

When you write your DataFrame into the hive table, you can use the bucketBy() function, which helps in partitioning records using the Hash Algorithm.

Final Thoughts

In this article, we have learned about the PySpark partitionBy() method to select the columns of DataFrame in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

PySpark in Azure Databricks, as explained by Arud Seka Berne S on azurelib.com.

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.