How to use PySpark Accumulator in Azure Databricks?

Are you looking to find out how to use PySpark Accumulator in Azure Databricks cloud or maybe you are looking for a solution, to perform aggregation using PySpark accumulator in Azure Databricks? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to perform count and sum operations using accumulators in Azure Databricks. I will explain it by taking a practical example. So please don’t waste time let’s start with a step-by-step guide to understanding how to use PySpark Accumulator in Azure Databricks.

In this blog, I will teach you the following with practical examples:

  • What is accumulator
  • Create an accumulator variable
  • Access accumulator variable
  • Using an accumulator variable with RDD

The PySpark’s accumulators are write-only variables, where you can initiate a numeric variable once and update it many. It helps in performing sum and counter operations, similar to map reduce operations.

Syntax:

sc.accumulator()

What is the syntax of the accumulator() function in PySpark Azure Databricks?

The syntax is as follows:

SparkContext.accumulator(numeric_value)
Parameter NameRequiredDescription
numeric_value (int)YesIt represents the initial numeric value.
Table 1: accumulator() Method in PySpark Databricks Parameter list with Details

Apache Spark Official Documentation Link: accumulator()

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

Create a simple RDD

a) Create manual PySpark RDD

data = [
    (1,"Nils","M",74),
    (2,"Alena","F",93),
    (3,"Ammamaria","F",60),
    (4,"Yardley","M",46),
    (5,"Peadar","M",60)
]

rdd = sc.parallelize(data)
rdd.collect()

"""
[
(1, 'Nils', 'M', 74),
 (2, 'Alena', 'F', 93),
 (3, 'Ammamaria', 'F', 60),
 (4, 'Yardley', 'M', 46),
 (5, 'Peadar', 'M', 60)
]
"""

b) Creating an RDD from DataFrame by reading files

Download and use the below source file.

# replace the file_path with the source file location which you have downloaded.

df = spark.read.format("csv").option("header", True).load(file_path)
rdd = df.rdd
rdd.collect()

"""
[
(1, 'Nils', 'M', 74),
 (2, 'Alena', 'F', 93),
 (3, 'Ammamaria', 'F', 60),
 (4, 'Yardley', 'M', 46),
 (5, 'Peadar', 'M', 60)
]
"""

Note: Here, I will be using the manually created RDD.

How to create a accumulator variable of PySpark DataFrame?

PySpark’s accumulator() function helps in creating an accumulator variable. Let’s try to create one with a practical example.

Example:

# Accumulator are initialized only once

# Interger
int_acc = sc.accumulator(0)

# Float
float_acc = sc.accumulator(0.0)

Note: We can only specify an integer variable inside the accumulator() function. Because the accumulator() function performs count and sum operations.

In the above example, we have used both integer and float values. If you want to have a decimal value use the float number inside the accumulator function.

How to access an accumulator variable of PySpark DataFrame?

In this example, let’s see how to access an accumulator variable with a practical example.

Example:

print(f"Integer value: {int_acc.value}")
print(f"Float value: {float_acc.value}")

"""
Output:

Integer value: 0
Float value: 0.0

"""

In the above example, the value attribute helps in fetching the value. Note that the value used is called by the driver.

How to use accumulator variables with PySpark RDD in Azure Databricks?

In this example, let’s see how to use an accumulator with PySpark RDD using some practical examples.

Example 1:

In this example, let’s try to get the total marks of students using the accumulator’s function.

# Sum operation

total_marks = sc.accumulator(0)
rdd.foreach(lambda t: total_marks.add(t[3]))
print(f"The class total marks: {total_marks.value} out of {rdd.count() * 100}.")

"""
Output:

The class total marks: 333 out of 500.

"""

Example 2:

In this example, let’s try to get the number of students using the accumulator’s function.

# Count operation

student_count = sc.accumulator(0)
rdd.foreach(lambda _: student_count.add(1))
print(f"Total number of sudents: {student_count.value}")

"""
Output:

Total number of sudents: 5

"""

In the above two examples, we have used add() function of the accumulator, which helps in performing the count and sum operation. This function was only called by task-running worker nodes.

I have attached the complete code used in this blog in a notebook format in this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you use the accumulator variables in Azure Databricks?

These could be the possible reasons:

  1. When you don’t want to shuffle the data between executors and nodes for simple count and sum operations.
  2. When you want the operation to be handled by the executor and called by the driver.

Real World Use Case Scenarios for PySpark accumulator varibales in Azure Databricks?

Assume you were given a student and his marks, you want to calculate the total marks obtained by him without shuffling. You can go for SparkContext Accumulators. The PySpark Accumulator is the shared variable that is used with the RDD and DataFrame to perform the sum and the counter operations.

What are the alternatives to the accumulator variables function in PySpark Azure Databricks?

The PySpark SparkContext.accumulator() is the only way for creating an accumulator.

Final Thoughts

In this article, we have learned about creating and accessing PySpark’s accumulator variables in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

PySpark in Azure Databricks, as explained by Arud Seka Berne S on azurelib.com.

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.