How to use broadcast variables of PySpark in Azure Databricks?

Are you looking to find out how to cache a variable in PySpark using Azure Databricks cloud or maybe you are looking for a solution, to use cached variables of PySpark in Azure Databricks? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use cached broadcast variables on both PySpark’s DataFrame and RDD in Azure Databricks. I will explain it by taking a practical example. So please don’t waste time let’s start with a step-by-step guide to understanding how to create and use broadcast variables in PySpark using Azure Databricks.

In this blog, I will teach you the following with practical examples:

  • What is broadcast variable?
  • Create a broadcast variable
  • Access broadcast variable
  • Using a broadcast variable with RDD
  • Using a broadcast variable with DataFrame

The PySpark’s broadcasts are read-only variables, which cache the data in a cluster and make sure it is available in all nodes.

Syntax:

sc.broadcast()

What is the syntax of the broadcast() function in PySpark Azure Databricks?

The syntax is as follows:

SparkContext.broadcast(variable)
Parameter NameRequiredDescription
variable (T)YesIt represents any DataType that can be broadcasted.
Table 1: broadcast() Method in PySpark Databricks Parameter list with Details

Apache Spark Official Documentation Link: broadcast()

Create a simple DataFrame

Let’s understand the use of the lit() function with various examples. Let’s start by creating a DataFrame.

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

a) Create manual PySpark DataFrame

data = [
    (1,"Nils","M",74),
    (2,"Alena","F",93),
    (3,"Ammamaria","F",60),
    (4,"Yardley","M",46),
    (5,"Peadar","M",60),
    (6,"Horace","M",80)
]

df = spark.createDataFrame(data, schema=["id","name","gender","marks"])
df.printSchema()
df.show(5, truncate=False)

"""
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- marks: long (nullable = true)

+---+---------+------+-----+
|id |name     |gender|marks|
+---+---------+------+-----+
|1  |Nils     |M     |74   |
|2  |Alena    |F     |93   |
|3  |Ammamaria|F     |60   |
|4  |Yardley  |M     |46   |
|5  |Peadar   |M     |60   |
+---+---------+------+-----+
"""

b) Creating a DataFrame by reading files

Download and use the below source file.

# replace the file_path with the source file location which you have downloaded.

df_2 = spark.read.format("csv").option("header", True).load(file_path)
df_2.printSchema()

"""
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- marks: long (nullable = true)
"""

Note: Here, I will be using the manually created dataframe.

How to create a broadcast variable of PySpark DataFrame?

PySpark’s broadcast() function helps cache variables in all nodes. Let’s try to create on with a practical example.

Example:

gender_map = {"M": "Male", "F": "Female"}
broadcast_gender = sc.broadcast(gender_map)

How to access a broadcast variable of PySpark DataFrame?

In this example, let’s see how to access a cached variable with a practical example.

Example:

broadcast_gender.value["M"]
broadcast_gender.value["F"]

"""
Output:

'Male'
'Female'

"""

The value attributes help in accessing the broadcast variable. In the upcoming section, we will see how to use the broadcast variable with PySpark DataFrame and RDD.

How to use broadcast variables with PySpark RDD in Azure Databricks?

In this example, let’s see how to use a cached variable with PySpark RDD using a practical example.

Example:

In this example, let’s try to replace the gender value from ‘M’, and ‘F’ with ‘Male’ and ‘Female’ respectively.

rdd = df.rdd
rdd = rdd.map(lambda t: (t[0],t[1],broadcast_gender.value[t[2]],t[3]))
rdd.collect()

"""
Output:

[(1, 'Nils', 'Male', 74),
 (2, 'Alena', 'Female', 93),
 (3, 'Ammamaria', 'Female', 60),
 (4, 'Yardley', 'Male', 46),
 (5, 'Peadar', 'Male', 60),
 (6, 'Horace', 'Male', 80)]

"""

How to use broadcast variables with PySpark DataFrame in Azure Databricks?

In this example, let’s see how to use a cached variable with PySpark DataFrame step-by-step using a practical example.

Example:

In this example, let’s try to replace the gender value from ‘M’, and ‘F’ with ‘Male’ and ‘Female’ respectively using UDF.

Things to remember:

from pyspark.sql.functions import col, udf

# 1. User defined function
def convert_gender(char):
    return broadcast_gender.value[char]

# 2. Registering UDF
convert_gender = udf(convert_gender)

# 3. Use it accordingly
df_2 = df.withColumn("gender", convert_gender(col("gender")))
df_2.show()
"""
Output:

+---+---------+------+-----+
| id|     name|gender|marks|
+---+---------+------+-----+
|  1|     Nils|  Male|   74|
|  2|    Alena|Female|   93|
|  3|Ammamaria|Female|   60|
|  4|  Yardley|  Male|   46|
|  5|   Peadar|  Male|   60|
|  6|   Horace|  Male|   80|
+---+---------+------+-----+

"""

I have attached the complete code used in this blog in a notebook format in this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you use the broadcast variables in Azure Databricks?

These could be the possible reasons:

  1. To avoid expenses while joining a large dataset.
  2. To avoid a lot of data getting transferred over the network
  3. To avoid shuffling for decreasing the execution time of the job.

Real World Use Case Scenarios for PySpark broadcast variables in Azure Databricks?

Assume that you were given a student dataset and you were asked to convert the gender column values from ‘M’ and ‘FM’ into ‘Male’ and ‘Female’ respectively. We might use a small DataFrame and join both the DataFrame to replace it, but this results in shuffling while resulting in higher execution time. To solve this we might use the sc.broadcast(). Broadcast variables allow you to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

What are the alternatives to the broadcast variables function in PySpark Azure Databricks?

The PySpark SparkContext.broadcast() is the only way for creating a broadcast variable.

Final Thoughts

In this article, we have learned about creating and accessing PySpark’s broadcast variables in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

Arud Seka Berne S

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.