Are you looking to find out how to aggregate columns of PySpark DataFrame columns in Azure Databricks cloud or maybe you are looking for a solution, to do calculations on a group of records in PySpark Databricks using the PySpark in-built methods? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use PySpark to do aggregation on both singe and multiple-column values in DataFrames in Azure Databricks. I will explain it by taking a practical example. So don’t waste time let’s start with a step-by-step guide to understanding how to aggregate columns in PySpark DataFrame.
In this blog, I will teach you the following with practical examples:
- Single column aggregation
- Multiple column aggregation
- Group by column aggregation
- Multiple aggregation of a group of records using agg() function
- Rename aggregated column using alias() function
Contents
- 1 Create a simple DataFrame
- 2 How to do aggregation on a column in PySpark Azure Databricks?
- 3 How to do aggregation on multiple columns in PySpark Azure Databricks?
- 4 How to do aggregation on a group of records in PySpark Azure Databricks?
- 5 How to do multiple aggregation on a group of records in PySpark Azure Databricks?
- 6 How to rename aggregated columns in PySpark Azure Databricks?
- 7 What are the various inbuilt aggregation functions used commonly in PySpark Azure Databricks?
- 8 When should you use aggregation functions in PySpark Azure Databricks?
- 9 Real World Use Case Scenarios for using aggregation functions in PySpark Azure Databricks?
- 10 What are the alternatives to PySpark’s inbuilt aggregation function in PySpark Azure Databricks?
- 11 Final Thoughts
Create a simple DataFrame
Let’s understand how to use PySpark’s in-built aggregation function with a variety of examples. Let’s start by creating a DataFrame.
Gentle reminder:
In Databricks,
- sparkSession made available as spark
- sparkContext made available as sc
In case, you want to create it manually, use the below code.
from pyspark.sql.session import SparkSession
spark = SparkSession.builder
.master("local[*]")
.appName("azurelib.com")
.getOrCreate()
sc = spark.sparkContext
a) Create manual PySpark DataFrame
data = [
("chevrolet vega 2300","USA",15.5,90,28.0,"1970-01-01"),
("chevrolet vega 2300","USA",15.5,90,28.0,"1970-01-01"),
("toyota corona","Japan",14.0,95,25.0,"1970-01-01"),
("ford pinto","USA",19.0,75,25.0,"1971-01-01"),
("amc gremlin","USA",13.0,100,19.0,"1971-01-01"),
("plymouth satellite custom","USA",15.5,105,16.0,"1971-01-01"),
("datsun 510 (sw)","Japan",17.0,92,28.0,"1972-01-01"),
("toyouta corona mark ii (sw)","Japan",14.5,97,23.0,"1972-01-01"),
("dodge colt (sw)","USA",15.0,80,28.0,"1972-01-01"),
("toyota corolla 1600 (sw)","Japan",16.5,88,27.0,"1972-01-01")
]
columns = ["name","origin","acceleration","horse_power","miles_per_gallon","year"]
df = spark.createDataFrame(data, schema=columns)
df.printSchema()
df.show(truncate=False)
"""
root
|-- name: string (nullable = true)
|-- origin: string (nullable = true)
|-- acceleration: double (nullable = true)
|-- horse_power: long (nullable = true)
|-- miles_per_gallon: double (nullable = true)
|-- year: string (nullable = true)
+---------------------------+------+------------+-----------+----------------+----------+
|name |origin|acceleration|horse_power|miles_per_gallon|year |
+---------------------------+------+------------+-----------+----------------+----------+
|chevrolet vega 2300 |USA |15.5 |90 |28.0 |1970-01-01|
|chevrolet vega 2300 |USA |15.5 |90 |28.0 |1970-01-01|
|toyota corona |Japan |14.0 |95 |25.0 |1970-01-01|
|ford pinto |USA |19.0 |75 |25.0 |1971-01-01|
|amc gremlin |USA |13.0 |100 |19.0 |1971-01-01|
|plymouth satellite custom |USA |15.5 |105 |16.0 |1971-01-01|
|datsun 510 (sw) |Japan |17.0 |92 |28.0 |1972-01-01|
|toyouta corona mark ii (sw)|Japan |14.5 |97 |23.0 |1972-01-01|
|dodge colt (sw) |USA |15.0 |80 |28.0 |1972-01-01|
|toyota corolla 1600 (sw) |Japan |16.5 |88 |27.0 |1972-01-01|
+---------------------------+------+------------+-----------+----------------+----------+
"""
b) Creating a DataFrame by reading files
Download and use the below source file.
# replace the file_path with the source file location which you have downloaded.
df_2 = spark.read.format("csv").option("inferSchema", True).option("header", True).load(file_path)
df_2.printSchema()
"""
root
|-- name: string (nullable = true)
|-- origin: string (nullable = true)
|-- acceleration: double (nullable = true)
|-- horse_power: integer (nullable = true)
|-- miles_per_gallon: double (nullable = true)
|-- year: timestamp (nullable = true)
"""
Note: Here, I will be using the manually created DataFrame.
How to do aggregation on a column in PySpark Azure Databricks?
In this section, you will learn how to perform aggregation on a single column of PySpark DataFrame. So, let’s try to find how many records are there in our DataFrame.
Example:
from pyspark.sql.functions import count
df.select(count("name")).show()
"""
Output:
+-----------+
|count(name)|
+-----------+
| 10|
+-----------+
"""
How to do aggregation on multiple columns in PySpark Azure Databricks?
In this section, you will learn how to perform aggregation on multiple columns of PySpark DataFrame. So, let’s try to find out the minimum and maximum horsepower in our DataFrame.
Example:
from pyspark.sql.functions import min, max
df.select(min("horse_power"), max("horse_power")).show()
"""
Output:
+----------------+----------------+
|min(horse_power)|max(horse_power)|
+----------------+----------------+
| 75| 105|
+----------------+----------------+
"""
How to do aggregation on a group of records in PySpark Azure Databricks?
In this section, you will learn how to perform aggregation on a group of records in PySpark DataFrame. So, let’s get deep in.
- Find out the number of cars per region
- Find out year wise highest horsepower cars of each region.
Solution 1:
from pyspark.sql.functions import count
df.groupBy("origin").count().show()
"""
Output:
+------+-----+
|origin|count|
+------+-----+
| USA| 6|
| Japan| 4|
+------+-----+
"""
Solution 2:
from pyspark.sql.functions import max
df.groupBy("year", "origin").max("horse_power").show()
"""
Output:
+----------+------+----------------+
| year|origin|max(horse_power)|
+----------+------+----------------+
|1970-01-01| USA| 90|
|1970-01-01| Japan| 95|
|1971-01-01| USA| 105|
|1972-01-01| Japan| 97|
|1972-01-01| USA| 80|
+----------+------+----------------+
"""
How to do multiple aggregation on a group of records in PySpark Azure Databricks?
In this section, you will learn how to perform multiple aggregations on a group of records in PySpark DataFrame using agg() function. So, let’s try to find out the minimum, average and maximum horsepower cars in our DataFrame.
Example:
from pyspark.sql.functions import min, avg, max
df.groupBy("origin").agg(
min("horse_power"),
avg("horse_power"),
max("horse_power")
).show()
"""
Output:
+------+----------------+----------------+----------------+
|origin|min(horse_power)|avg(horse_power)|max(horse_power)|
+------+----------------+----------------+----------------+
| USA| 75| 90.0| 105|
| Japan| 88| 93.0| 97|
+------+----------------+----------------+----------------+
"""
How to rename aggregated columns in PySpark Azure Databricks?
In this section, you will learn how to rename aggregated columns in PySpark DataFrame using the alias() function.
Example:
from pyspark.sql.functions import min, avg, max
df.groupBy("origin").agg(
min("horse_power").alias("min_hp"),
avg("horse_power").alias("avg_hp"),
max("horse_power").alias("max_hp")
).show()
"""
Output:
+------+------+------+------+
|origin|min_hp|avg_hp|max_hp|
+------+------+------+------+
| USA| 75| 90.0| 105|
| Japan| 88| 93.0| 97|
+------+------+------+------+
"""
What are the various inbuilt aggregation functions used commonly in PySpark Azure Databricks?
In this section, you will learn the various standard aggregation function available in PySpark.
1. count() and countDistinct()
- count() returns the number of elements in a column.
- countDistinct() returns the number of distinct elements in a columns, and always returns the max distinct value.
from pyspark.sql.functions import count, countDistinct
df.select(count("name"), countDistinct("year", "acceleration")).show()
"""
Output:
+-----------+----------------------------------+
|count(name)|count(DISTINCT year, acceleration)|
+-----------+----------------------------------+
| 10| 9|
+-----------+----------------------------------+
"""
2. sum() and sumDistinct()
- sum() returns the sum of elements in a column.
- sumDistinct() returns the sum of distinct elements in columns, and always returns the max distinct value.
from pyspark.sql.functions import sum, sumDistinct
df.select(sum("horse_power"), sumDistinct("horse_power")).show()
"""
Output:
+----------------+-------------------------+
|sum(horse_power)|sum(DISTINCT horse_power)|
+----------------+-------------------------+
| 912| 822|
+----------------+-------------------------+
"""
3. stddev(), stddev_samp(), and stddev_pop()
- stddev() alias for stddev_samp.
- stddev_samp() returns the sample standard deviation of values in a column.
- stddev_pop() returns the population standard deviation of the values in a column.
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop
df.select(stddev("horse_power"), stddev_samp("horse_power"), stddev_pop("horse_power")).show()
"""
Output:
+------------------------+------------------------+-----------------------+
|stddev_samp(horse_power)|stddev_samp(horse_power)|stddev_pop(horse_power)|
+------------------------+------------------------+-----------------------+
| 8.929352346801718| 8.929352346801718| 8.471127433818948|
+------------------------+------------------------+-----------------------+
"""
4. avg() and mean()
- avg() returns the average of values in the input column.
- mean() is an alias of avg() function, works exactly like avg().
from pyspark.sql.functions import avg, mean
df.select(avg("acceleration"), mean("acceleration")).show()
"""
Output:
+-----------------+-----------------+
|avg(acceleration)|avg(acceleration)|
+-----------------+-----------------+
| 15.55| 15.55|
+-----------------+-----------------+
"""
5. first() and last()
- first() returns the first value in a column.
- last() returns the last value in a column.
from pyspark.sql.functions import first, last
df.select(first("horse_power"), last("horse_power")).show()
"""
Output:
+------------------+-----------------+
|first(horse_power)|last(horse_power)|
+------------------+-----------------+
| 90| 88|
+------------------+-----------------+
"""
6. min() and max()
- min() returns the minimum value in a column.
- max() returns the maximum value in a column.
from pyspark.sql.functions import min, max
df.select(min("horse_power"), max("horse_power")).show()
"""
Output:
+----------------+----------------+
|min(horse_power)|max(horse_power)|
+----------------+----------------+
| 75| 105|
+----------------+----------------+
"""
7. variance(), variance_samp(), and variance_pop()
- variance() alias for var_samp.
- var_samp() returns the unbiased variance of the values in a column.
- var_pop() returns the population variance of the values in a column.
from pyspark.sql.functions import variance, var_samp, var_pop
df.select(variance("horse_power"), var_samp("horse_power"), var_pop("horse_power")).show()
"""
Output:
+---------------------+---------------------+--------------------+
|var_samp(horse_power)|var_samp(horse_power)|var_pop(horse_power)|
+---------------------+---------------------+--------------------+
| 79.73333333333333| 79.73333333333333| 71.76|
+---------------------+---------------------+--------------------+
"""
8. collect_list() and collect_set()
- collect_list() returns all values from an input column with duplicates.
- collect_set() returns all values from an input column with duplicate values eliminated.
from pyspark.sql.functions import collect_list(), collect_set()
df.select(collect_list("horse_power"), collect_set("horse_power")).show(truncate=False)
"""
Output:
+------------------------------------------+--------------------------------------+
|collect_list(horse_power) |collect_set(horse_power) |
+------------------------------------------+--------------------------------------+
|[90, 90, 95, 75, 100, 105, 92, 97, 80, 88]|[88, 100, 75, 90, 105, 97, 80, 95, 92]|
+------------------------------------------+--------------------------------------+
"""
I have attached the complete code used in this blog in notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.
When should you use aggregation functions in PySpark Azure Databricks?
These are some of the possible reasons:
- To count the number of records
- To count the distinct records
- To add numeric columns
- To add distinct numeric records
- To get an average of a numeric column
- To find the standard deviation
- To find variance
- To find the first and last value
- To find the minimum and maximum values
- To collect column values into a list
- To collect distinct column values into a list
Real World Use Case Scenarios for using aggregation functions in PySpark Azure Databricks?
Assume that you were given a dataset for analyzing purposes. For example:
- To find out the number of records in the dataset by grouping records, use the count() function along with groupBy().
- To find out the maximum and minimum values in the numeric column, use the min() and max() aggregation functions.
- To find out the average value in a numeric column, use the avg() or mean() function.
The PySpark commonly used aggregation functions are explained in detail with a practical example in the above section, so have a look at it.
What are the alternatives to PySpark’s inbuilt aggregation function in PySpark Azure Databricks?
You can use the PySpark User Defined Functions (UDF) for aggregating things in a PySpark DataFrame. But the PySpark in-built functions are better performing than PySpark UDF, compile-time safe, and should be used instead of creating your own custom functions (UDF). Avoid utilizing custom UDF at all costs if the performance of your PySpark application is crucial because they cannot be guaranteed to perform.
Final Thoughts
In this article, we have learned about the PySpark fill() method to select the columns of DataFrame in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.
Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.
- For Azure Study material Join Telegram group : Telegram group link:
- Azure Jobs and other updates Follow me on LinkedIn: Azure Updates on LinkedIn
- Azure Tutorial Videos: Videos Link
- Azure Databricks Lesson 1
- Azure Databricks Lesson 2
- Azure Databricks Lesson 3
- Azure Databricks Lesson 4
- Azure Databricks Lesson 5
- Azure Databricks Lesson 6
- Azure Databricks Lesson 7
As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.