How to use window functions in PySpark Azure Databricks?

Are you looking to find out how to calculate row number, rank, or dense rank of PySpark DataFrame columns in Azure Databricks cloud or maybe you are looking for a solution, to calculate lag, or lead value in PySpark Databricks using the window functions? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use PySpark window functions in DataFrames in Azure Databricks. I will explain it by taking a practical example. So don’t waste time let’s start with a step-by-step guide to understanding how to calculate the window function in PySpark DataFrame.

The window function is used to make aggregate operations in a specific window frame on DataFrame columns in PySpark Azure Databricks.

What is the syntax of the window functions in PySpark Azure Databricks?

The syntax is as follows:

window_function().over(window_spec)
Parameter NameRequiredDescription
window_spec (Window)YesIt represents the window frame and result order based on a column.
Table 1: window Methods in PySpark Databricks Parameter list with Details

Create a simple DataFrame

Let’s understand the use of window functions with a variety of examples. Let’s start by creating a DataFrame.

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

a) Create manual PySpark DataFrame

data = [
    ("Lewis","McLaren",10.0),
    ("Nick","McLaren",2.0),
    ("Nico","McLaren",6.0),
    ("Fernando","McLaren",3.0),
    ("Heikki","McLaren",8.0),
    ("Kazuki","Ferrari",9.0),
    ("Sébastien","Ferrari",7.0),
    ("Kimi","Ferrari",6.0)
]

df = spark.createDataFrame(data, schema=["driver_name","team","points"])
df.printSchema()
df.show(truncate=False)

"""
root
 |-- driver_name: string (nullable = true)
 |-- team: string (nullable = true)
 |-- points: double (nullable = true)

+-----------+-------+------+
|driver_name|team   |points|
+-----------+-------+------+
|Lewis      |McLaren|10.0  |
|Nick       |McLaren|2.0   |
|Nico       |McLaren|6.0   |
|Fernando   |McLaren|3.0   |
|Heikki     |McLaren|8.0   |
|Kazuki     |Ferrari|9.0   |
|Sébastien  |Ferrari|7.0   |
|Kimi       |Ferrari|6.0   |
+-----------+-------+------+
"""

b) Creating a DataFrame by reading files

Download and use the below source file.

# replace the file_path with the source file location which you have downloaded.

df_2 = spark.read.format("csv").option("inferSchema", True).option("header", True).load("file_path")
df_2.printSchema()

"""
root
 |-- driver_name: string (nullable = true)
 |-- team: string (nullable = true)
 |-- points: double (nullable = true)
"""

Note: Here, I will be using the manually created DataFrame.

How to perform Window Functions in PySpark Azure Databricks?

By providing window specifications to a ranking function, we can perform ranking operations. First, try to understand partitionBy() and ordeBy() function of WindowSpec. The partitionBy() function is nothing but an operation that has to be performed on a group of column values, and the orderBy() function is used to rank them in a particular order.

Step by step procedure for window operation:

  • Create a new column for example: ‘row_number’.
  • Pass the window specifications to the over() function.
  • Specify orderBy and paritionBy(optional)

Example using orderBy():

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy('driver_name')

df.withColumn('row_number', row_number().over(window_spec)) \
.select("row_number", "driver_name").show()

"""
Output:

+----------+-----------+
|row_number|driver_name|
+----------+-----------+
|         1|   Fernando|
|         2|     Heikki|
|         3|     Kazuki|
|         4|       Kimi|
|         5|      Lewis|
|         6|       Nick|
|         7|       Nico|
|         8|  Sébastien|
+----------+-----------+

"""

Example using paritionBy():

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy('team').orderBy('driver_name')

df.withColumn('row_number', row_number().over(window_spec)) \
.select("row_number", "team", "driver_name").show()

"""
Output:

+----------+-------+-----------+
|row_number|   team|driver_name|
+----------+-------+-----------+
|         1|Ferrari|     Kazuki|
|         2|Ferrari|       Kimi|
|         3|Ferrari|  Sébastien|
|         1|McLaren|   Fernando|
|         2|McLaren|     Heikki|
|         3|McLaren|      Lewis|
|         4|McLaren|       Nick|
|         5|McLaren|       Nico|
+----------+-------+-----------+

"""

How to rank records in PySpark Azure Databricks?

To assign a rank to the result within a window partition, use the rank() window function. When there are ties, this function creates gaps in the rank. Let’s try to rank drivers based on points.

Example:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc

window_spec = Window.orderBy(desc('points'))
df.withColumn('rank', rank().over(window_spec)).show()

"""
Output:

+-----------+-------+------+----+
|driver_name|   team|points|rank|
+-----------+-------+------+----+
|      Lewis|McLaren|  10.0|   1|
|     Kazuki|Ferrari|   9.0|   2|
|     Heikki|McLaren|   8.0|   3|
|  Sébastien|Ferrari|   7.0|   4|
|       Nico|McLaren|   6.0|   5|
|       Kimi|Ferrari|   6.0|   5|
|   Fernando|McLaren|   3.0|   7|
|       Nick|McLaren|   2.0|   8|
+-----------+-------+------+----+

"""

The rank was determined based on the points of drivers, as you can see drivers Nico and Kimi scored 6 points and ranked as 5 then the next rank driver will be ranked as 7.

How to rank records without any gaps in PySpark Azure Databricks?

To obtain the result with the rank of rows within a window partition without any gaps, use the dense rank() window function. The difference between this and the rank() function is that the latter leaves gaps in the rankings when there are ties.

Example:

from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, desc

window_spec = Window.orderBy(desc('points'))
df.withColumn('dense_rank', dense_rank().over(window_spec)).show()

"""
Output:

+-----------+-------+------+----------+
|driver_name|   team|points|dense_rank|
+-----------+-------+------+----------+
|      Lewis|McLaren|  10.0|         1|
|     Kazuki|Ferrari|   9.0|         2|
|     Heikki|McLaren|   8.0|         3|
|  Sébastien|Ferrari|   7.0|         4|
|       Nico|McLaren|   6.0|         5|
|       Kimi|Ferrari|   6.0|         5|
|   Fernando|McLaren|   3.0|         6|
|       Nick|McLaren|   2.0|         7|
+-----------+-------+------+----------+

"""

The rank was determined based on the points of drivers, as you can see drivers Nico and Kimi scored 6 points and ranked as 5 then the next rank driver will be ranked as 6.

How to perform lead window function in PySpark Azure Databricks?

The lead() returns the value that is offset rows after the current row.

Example:

from pyspark.sql.window import Window
from pyspark.sql.functions import lead

window_spec = Window.orderBy("points")
df.withColumn("lead", lead("points", offset=1).over(window_spec)).show()

"""
Output:

+-----------+-------+------+----+
|driver_name|   team|points|lead|
+-----------+-------+------+----+
|       Nick|McLaren|   2.0| 3.0|
|   Fernando|McLaren|   3.0| 6.0|
|       Nico|McLaren|   6.0| 6.0|
|       Kimi|Ferrari|   6.0| 7.0|
|  Sébastien|Ferrari|   7.0| 8.0|
|     Heikki|McLaren|   8.0| 9.0|
|     Kazuki|Ferrari|   9.0|10.0|
|      Lewis|McLaren|  10.0|null|
+-----------+-------+------+----+

"""

How to perform lag window function in PySpark Azure Databricks?

The lag() returns the value that is offset rows after the current row.

Example:

from pyspark.sql.window import Window
from pyspark.sql.functions import lag

window_spec = Window.orderBy("points")
df.withColumn("lag", lag("points", offset=1).over(window_spec)).show()

"""
Output:

+-----------+-------+------+----+
|driver_name|   team|points| lag|
+-----------+-------+------+----+
|       Nick|McLaren|   2.0|null|
|   Fernando|McLaren|   3.0| 2.0|
|       Nico|McLaren|   6.0| 3.0|
|       Kimi|Ferrari|   6.0| 6.0|
|  Sébastien|Ferrari|   7.0| 6.0|
|     Heikki|McLaren|   8.0| 7.0|
|     Kazuki|Ferrari|   9.0| 8.0|
|      Lewis|McLaren|  10.0| 9.0|
+-----------+-------+------+----+

"""

How to perform aggregation window function in PySpark Azure Databricks?

In this section, I’ll describe how to use the WindowSpec and PySpark SQL Aggregate window functions to determine the minimum and maximum value for each team. We don’t need to utilize an order by clause when working with aggregate functions.

Example:

from pyspark.sql.window import Window
from pyspark.sql.functions import min, max

window_spec = Window.partitionBy("team").orderBy("team", "points")

df\
.withColumn("min_points", min("points").over(window_spec))\
.withColumn("max_points", max("points").over(window_spec))\
.show()

"""
Output:

+-----------+-------+------+----------+----------+
|driver_name|   team|points|min_points|max_points|
+-----------+-------+------+----------+----------+
|       Kimi|Ferrari|   6.0|       6.0|       6.0|
|  Sébastien|Ferrari|   7.0|       6.0|       7.0|
|     Kazuki|Ferrari|   9.0|       6.0|       9.0|
|       Nick|McLaren|   2.0|       2.0|       2.0|
|   Fernando|McLaren|   3.0|       2.0|       3.0|
|       Nico|McLaren|   6.0|       2.0|       6.0|
|     Heikki|McLaren|   8.0|       2.0|       8.0|
|      Lewis|McLaren|  10.0|       2.0|      10.0|
+-----------+-------+------+----------+----------+

"""

I have attached the complete code used in this blog in a notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you use the PySpark window functions in Azure Databricks?

These are some of the possible reasons:

  1. To number rows
  2. To rank records
  3. To rank records without gaps
  4. To perform an operation in a window
  5. To perform an operation in a window by ordering records and so on

Real World Use Case Scenarios for PySpark DataFrame window functions in Azure Databricks?

  • Assume you have a dataset without an identifier column, You can use the PySpark row_number function to add numbers to each record.
  • Assume that you have a race results dataset that includes the race’s ID, name, player name, team name, and points scored by the driver. You have to rank each player based on the points scored by the players. You can use the PySpark rank function to rank each player by their points.

What are the alternatives to the window functions in PySpark Azure Databricks?

You can use the PySpark User Defined Functions (UDF) for ranking, numbering rows, and a lot more things in a PySpark DataFrame. But the PySpark in-built functions are better performing than PySpark UDF, compile-time safe, and should be used instead of creating your own custom functions (UDF). Avoid utilizing custom UDF at all costs if the performance of your PySpark application is crucial, because they cannot be guaranteed to perform.

Final Thoughts

In this article, we have learned about the PySpark window methods to perform window aggregation on columns of DataFrame in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

PySpark in Azure Databricks, as explained by Arud Seka Berne S on azurelib.com.

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.