How to collect unique records of a column in PySpark Azure Databricks?

Are you looking to find out how to collect the entire column into a list without duplicates of PySpark DataFrame using Azure Databricks cloud or maybe you are looking for a solution, to collect column data by grouping without duplicate values in PySpark Databricks using the collect_set() function? If you are looking for any of these problem solutions, you have landed on the correct page. I will also help you how to use PySpark collect_set() function with multiple examples in Azure Databricks. I will explain it by taking a practical example. So please don’t waste time let’s start with a step-by-step guide to understand how to use the collect_set() function in PySpark.

In this blog, I will teach you the following with practical examples:

  • Syntax of collect_set() function
  • Collect column values without duplication
  • Using collect_set() as an aggregation function

The Pyspark collect_set() function is used to return a list of objects without duplicates.

Syntax:

collect_set()

What is the syntax of the collect_set() function in PySpark Azure Databricks?

The syntax is as follows:

collect_set(column)
Parameter NameRequiredDescription
column (str, Column)YesIt represents the column value to be collected together.
Table 1: collect_set() Method in PySpark Databricks Parameter list with Details

Apache Spark Official Documentation Link: collect_set()

Create a simple DataFrame

Let’s understand the use of the collect_set() function with various examples. Let’s start by creating a DataFrame.

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

a) Create manual PySpark DataFrame

data = [
    (1,"Humberto","Human Resources",None),
    (2,"Nada","Engineering",500),
    (3,"Letta","Support",700),
    (4,"Garry","Engineering",500),
    (5,"Jeanne","Support",600)
]

columns = ["id","name","dept","salary"]
df = spark.createDataFrame(data, schema=columns)
df.printSchema()
df.show(truncate=False)

"""
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: long (nullable = true)

+---+--------+---------------+------+
|id |name    |dept           |salary|
+---+--------+---------------+------+
|1  |Humberto|Human Resources|null  |
|2  |Nada    |Engineering    |500   |
|3  |Letta   |Support        |700   |
|4  |Garry   |Engineering    |500   |
|5  |Jeanne  |Support        |600   |
+---+--------+---------------+------+
"""

b) Creating a DataFrame by reading files

Download and use the below source file.

# replace the file_path with the source file location which you have downloaded.

df_2 = spark.read.format("csv").option("header", True).load(file_path)
df_2.printSchema()

"""
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: long (nullable = true)
"""

Note: Here, I will be using the manually created DataFrame.

How to collect unique column values in PySpark Azure Databricks?

In this example, let’s see how to use the collect_set() function in different ways to collect column values with duplication in PySpark Azure Databricks.

Example:

In this example, let’s try to collect the “salary” column values without duplication. Also, we have a null value. Let’s write a code and see the result.

from pyspark.sql.functions import collect_set

df.select(collect_set("salary").alias("salaries")).show(truncate=False)

"""
Output:

+---------------+
|salaries       |
+---------------+
|[700, 500, 600]|
+---------------+

"""

You can see that we have all the values, excluding the duplicates, and there are no null values. The collect_set() function omits the null values.

How to use the collect_set() function as an aggregation function in PySpark Azure Databricks?

In this example, let’s see how to use the collect_set() function as an aggregation function in PySpark Azure Databricks.

Example:

In this example, let’s try to collect the “salary” column values department-wise without duplication. Also, we have a null value. Let’s write a code and see the result.

from pyspark.sql.functions import collect_set

df.groupBy("dept").agg(collect_set("salary").alias("salaries")).show(truncate=False)

"""
Output:

+---------------+----------+
|dept           |salaries  |
+---------------+----------+
|Human Resources|[]        |
|Engineering    |[500]     |
|Support        |[700, 600]|
+---------------+----------+

"""

You can see that we have all the values grouped by department, excluding the duplicate values, and there are no null values. The collect_set() function omits the null values.

I have attached the complete code used in this blog in notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you use the PySpark collect_set() in Azure Databricks?

When you want to get all records into a list without any duplication, you can use the PySpark collect_set() function. This function returns unique records in a non-deterministic manner after shuffling.

Real World Use Case Scenarios for PySpark DataFrame collect_set() in Azure Databricks?

Let’s assume you have a student dataset across different states. For example Students A and B are from “Tamil Nadu” and Students C and D are from “Kerala”. You want to fetch only the unique records that are “Tamil Nadu” and “Kerala” into a list [“Tamil Nadu”, “Kerala”] without any duplication. You can use the PySpark collect_set() function to achieve the above-mentioned requirement.

What are the alternatives to the collect_set() function in PySpark Azure Databricks?

There are alternatives to the collect_set() function, which are as follows:

  • collect_list(): used for getting records into a single list with duplication. In order to get the unique records you can chain this collect_list() function with a distinct() function.

Final Thoughts

In this article, we have learned about the PySpark collect_set() method of DataFrame in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

PySpark in Azure Databricks, as explained by Arud Seka Berne S on azurelib.com.

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.