Are you looking to find out how to calculate row number, rank, or dense rank of PySpark DataFrame columns in Azure Databricks cloud or maybe you are looking for a solution, to calculate lag, or lead value in PySpark Databricks using the window functions? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use PySpark window functions in DataFrames in Azure Databricks. I will explain it by taking a practical example. So don’t waste time let’s start with a step-by-step guide to understanding how to calculate the window function in PySpark DataFrame.
The window function is used to make aggregate operations in a specific window frame on DataFrame columns in PySpark Azure Databricks.
Contents
- 1 What is the syntax of the window functions in PySpark Azure Databricks?
- 2 Create a simple DataFrame
- 3 How to perform Window Functions in PySpark Azure Databricks?
- 4 How to rank records in PySpark Azure Databricks?
- 5 How to rank records without any gaps in PySpark Azure Databricks?
- 6 How to perform lead window function in PySpark Azure Databricks?
- 7 How to perform lag window function in PySpark Azure Databricks?
- 8 How to perform aggregation window function in PySpark Azure Databricks?
- 9 When should you use the PySpark window functions in Azure Databricks?
- 10 Real World Use Case Scenarios for PySpark DataFrame window functions in Azure Databricks?
- 11 What are the alternatives to the window functions in PySpark Azure Databricks?
- 12 Final Thoughts
What is the syntax of the window functions in PySpark Azure Databricks?
The syntax is as follows:
window_function().over(window_spec)
Parameter Name | Required | Description |
window_spec (Window) | Yes | It represents the window frame and result order based on a column. |
Create a simple DataFrame
Let’s understand the use of window functions with a variety of examples. Let’s start by creating a DataFrame.
Gentle reminder:
In Databricks,
- sparkSession made available as spark
- sparkContext made available as sc
In case, you want to create it manually, use the below code.
from pyspark.sql.session import SparkSession
spark = SparkSession.builder
.master("local[*]")
.appName("azurelib.com")
.getOrCreate()
sc = spark.sparkContext
a) Create manual PySpark DataFrame
data = [
("Lewis","McLaren",10.0),
("Nick","McLaren",2.0),
("Nico","McLaren",6.0),
("Fernando","McLaren",3.0),
("Heikki","McLaren",8.0),
("Kazuki","Ferrari",9.0),
("Sébastien","Ferrari",7.0),
("Kimi","Ferrari",6.0)
]
df = spark.createDataFrame(data, schema=["driver_name","team","points"])
df.printSchema()
df.show(truncate=False)
"""
root
|-- driver_name: string (nullable = true)
|-- team: string (nullable = true)
|-- points: double (nullable = true)
+-----------+-------+------+
|driver_name|team |points|
+-----------+-------+------+
|Lewis |McLaren|10.0 |
|Nick |McLaren|2.0 |
|Nico |McLaren|6.0 |
|Fernando |McLaren|3.0 |
|Heikki |McLaren|8.0 |
|Kazuki |Ferrari|9.0 |
|Sébastien |Ferrari|7.0 |
|Kimi |Ferrari|6.0 |
+-----------+-------+------+
"""
b) Creating a DataFrame by reading files
Download and use the below source file.
# replace the file_path with the source file location which you have downloaded.
df_2 = spark.read.format("csv").option("inferSchema", True).option("header", True).load("file_path")
df_2.printSchema()
"""
root
|-- driver_name: string (nullable = true)
|-- team: string (nullable = true)
|-- points: double (nullable = true)
"""
Note: Here, I will be using the manually created DataFrame.
How to perform Window Functions in PySpark Azure Databricks?
By providing window specifications to a ranking function, we can perform ranking operations. First, try to understand partitionBy() and ordeBy() function of WindowSpec. The partitionBy() function is nothing but an operation that has to be performed on a group of column values, and the orderBy() function is used to rank them in a particular order.
Step by step procedure for window operation:
- Create a new column for example: ‘row_number’.
- Pass the window specifications to the over() function.
- Specify orderBy and paritionBy(optional)
Example using orderBy():
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window_spec = Window.orderBy('driver_name')
df.withColumn('row_number', row_number().over(window_spec)) \
.select("row_number", "driver_name").show()
"""
Output:
+----------+-----------+
|row_number|driver_name|
+----------+-----------+
| 1| Fernando|
| 2| Heikki|
| 3| Kazuki|
| 4| Kimi|
| 5| Lewis|
| 6| Nick|
| 7| Nico|
| 8| Sébastien|
+----------+-----------+
"""
Example using paritionBy():
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window_spec = Window.partitionBy('team').orderBy('driver_name')
df.withColumn('row_number', row_number().over(window_spec)) \
.select("row_number", "team", "driver_name").show()
"""
Output:
+----------+-------+-----------+
|row_number| team|driver_name|
+----------+-------+-----------+
| 1|Ferrari| Kazuki|
| 2|Ferrari| Kimi|
| 3|Ferrari| Sébastien|
| 1|McLaren| Fernando|
| 2|McLaren| Heikki|
| 3|McLaren| Lewis|
| 4|McLaren| Nick|
| 5|McLaren| Nico|
+----------+-------+-----------+
"""
How to rank records in PySpark Azure Databricks?
To assign a rank to the result within a window partition, use the rank() window function. When there are ties, this function creates gaps in the rank. Let’s try to rank drivers based on points.
Example:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc
window_spec = Window.orderBy(desc('points'))
df.withColumn('rank', rank().over(window_spec)).show()
"""
Output:
+-----------+-------+------+----+
|driver_name| team|points|rank|
+-----------+-------+------+----+
| Lewis|McLaren| 10.0| 1|
| Kazuki|Ferrari| 9.0| 2|
| Heikki|McLaren| 8.0| 3|
| Sébastien|Ferrari| 7.0| 4|
| Nico|McLaren| 6.0| 5|
| Kimi|Ferrari| 6.0| 5|
| Fernando|McLaren| 3.0| 7|
| Nick|McLaren| 2.0| 8|
+-----------+-------+------+----+
"""
The rank was determined based on the points of drivers, as you can see drivers Nico and Kimi scored 6 points and ranked as 5 then the next rank driver will be ranked as 7.
How to rank records without any gaps in PySpark Azure Databricks?
To obtain the result with the rank of rows within a window partition without any gaps, use the dense rank() window function. The difference between this and the rank() function is that the latter leaves gaps in the rankings when there are ties.
Example:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, desc
window_spec = Window.orderBy(desc('points'))
df.withColumn('dense_rank', dense_rank().over(window_spec)).show()
"""
Output:
+-----------+-------+------+----------+
|driver_name| team|points|dense_rank|
+-----------+-------+------+----------+
| Lewis|McLaren| 10.0| 1|
| Kazuki|Ferrari| 9.0| 2|
| Heikki|McLaren| 8.0| 3|
| Sébastien|Ferrari| 7.0| 4|
| Nico|McLaren| 6.0| 5|
| Kimi|Ferrari| 6.0| 5|
| Fernando|McLaren| 3.0| 6|
| Nick|McLaren| 2.0| 7|
+-----------+-------+------+----------+
"""
The rank was determined based on the points of drivers, as you can see drivers Nico and Kimi scored 6 points and ranked as 5 then the next rank driver will be ranked as 6.
How to perform lead window function in PySpark Azure Databricks?
The lead() returns the value that is offset rows after the current row.
Example:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead
window_spec = Window.orderBy("points")
df.withColumn("lead", lead("points", offset=1).over(window_spec)).show()
"""
Output:
+-----------+-------+------+----+
|driver_name| team|points|lead|
+-----------+-------+------+----+
| Nick|McLaren| 2.0| 3.0|
| Fernando|McLaren| 3.0| 6.0|
| Nico|McLaren| 6.0| 6.0|
| Kimi|Ferrari| 6.0| 7.0|
| Sébastien|Ferrari| 7.0| 8.0|
| Heikki|McLaren| 8.0| 9.0|
| Kazuki|Ferrari| 9.0|10.0|
| Lewis|McLaren| 10.0|null|
+-----------+-------+------+----+
"""
How to perform lag window function in PySpark Azure Databricks?
The lag() returns the value that is offset rows after the current row.
Example:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
window_spec = Window.orderBy("points")
df.withColumn("lag", lag("points", offset=1).over(window_spec)).show()
"""
Output:
+-----------+-------+------+----+
|driver_name| team|points| lag|
+-----------+-------+------+----+
| Nick|McLaren| 2.0|null|
| Fernando|McLaren| 3.0| 2.0|
| Nico|McLaren| 6.0| 3.0|
| Kimi|Ferrari| 6.0| 6.0|
| Sébastien|Ferrari| 7.0| 6.0|
| Heikki|McLaren| 8.0| 7.0|
| Kazuki|Ferrari| 9.0| 8.0|
| Lewis|McLaren| 10.0| 9.0|
+-----------+-------+------+----+
"""
How to perform aggregation window function in PySpark Azure Databricks?
In this section, I’ll describe how to use the WindowSpec and PySpark SQL Aggregate window functions to determine the minimum and maximum value for each team. We don’t need to utilize an order by clause when working with aggregate functions.
Example:
from pyspark.sql.window import Window
from pyspark.sql.functions import min, max
window_spec = Window.partitionBy("team").orderBy("team", "points")
df\
.withColumn("min_points", min("points").over(window_spec))\
.withColumn("max_points", max("points").over(window_spec))\
.show()
"""
Output:
+-----------+-------+------+----------+----------+
|driver_name| team|points|min_points|max_points|
+-----------+-------+------+----------+----------+
| Kimi|Ferrari| 6.0| 6.0| 6.0|
| Sébastien|Ferrari| 7.0| 6.0| 7.0|
| Kazuki|Ferrari| 9.0| 6.0| 9.0|
| Nick|McLaren| 2.0| 2.0| 2.0|
| Fernando|McLaren| 3.0| 2.0| 3.0|
| Nico|McLaren| 6.0| 2.0| 6.0|
| Heikki|McLaren| 8.0| 2.0| 8.0|
| Lewis|McLaren| 10.0| 2.0| 10.0|
+-----------+-------+------+----------+----------+
"""
I have attached the complete code used in this blog in a notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.
When should you use the PySpark window functions in Azure Databricks?
These are some of the possible reasons:
- To number rows
- To rank records
- To rank records without gaps
- To perform an operation in a window
- To perform an operation in a window by ordering records and so on
Real World Use Case Scenarios for PySpark DataFrame window functions in Azure Databricks?
- Assume you have a dataset without an identifier column, You can use the PySpark row_number function to add numbers to each record.
- Assume that you have a race results dataset that includes the race’s ID, name, player name, team name, and points scored by the driver. You have to rank each player based on the points scored by the players. You can use the PySpark rank function to rank each player by their points.
What are the alternatives to the window functions in PySpark Azure Databricks?
You can use the PySpark User Defined Functions (UDF) for ranking, numbering rows, and a lot more things in a PySpark DataFrame. But the PySpark in-built functions are better performing than PySpark UDF, compile-time safe, and should be used instead of creating your own custom functions (UDF). Avoid utilizing custom UDF at all costs if the performance of your PySpark application is crucial, because they cannot be guaranteed to perform.
Final Thoughts
In this article, we have learned about the PySpark window methods to perform window aggregation on columns of DataFrame in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.
Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.
- For Azure Study material Join Telegram group : Telegram group link:
- Azure Jobs and other updates Follow me on LinkedIn: Azure Updates on LinkedIn
- Azure Tutorial Videos: Videos Link
- Azure Databricks Lesson 1
- Azure Databricks Lesson 2
- Azure Databricks Lesson 3
- Azure Databricks Lesson 4
- Azure Databricks Lesson 5
- Azure Databricks Lesson 6
- Azure Databricks Lesson 7
As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.