How to use foreach() function in PySpark Azure Databricks?

Are you looking to find how to use foreach() function on PySpark RDD in the Azure Databricks cloud or maybe you are looking for a solution, to loop through each element or record using foreach() method on RDD in PySpark Databricks using the map methods? If you are looking for any of these problem solutions, then you have landed on the correct page. I will also show you what and how to use the PySpark foreach() function in PySpark Azure Databricks. I will explain it by taking a practical example. So don’t waste time let’s start with a step-by-step guide to understanding how to select columns in PySpark DataFrame.

In this blog, I will teach you the following with practical examples:

  • Syntax of foreach()
  • Using foreach() on RDD

foreach() is a transformation used to iterate all records and returns nothing.

Syntax: dataframe_name.foreach()

What is the syntax of the foreach() function in PySpark Azure Databricks?

The syntax is as follows:

foreach(f)
Parameter NameRequiredDescription
f (function)YesIt represents the function to be applied.
Table 1: foreach() Method in PySpark Databricks Parameter list with Details

Official documentation link: foreach()

Create a simple RDD

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

a) Create manual PySpark RDD

data = [
    "This is a sample line",
    "This is another sample line",
    "Again a sample line"
]

rdd = spark.sparkContext.parallelize(data)
rdd.collect()

"""
['This is a sample line', 'This is another sample line', 'Again a sample line']
"""

b) Creating an RDD by reading files

Download and use the below source file.

# replace the file_path with the source file location which you have downloaded.

rdd_2 = sc.textFile(file_path)
rdd_2.collect()

"""
['This is a sample line', 'This is another sample line', 'Again a sample line']
"""

Note: Here, I will be using the manually created RDD.

How to use foreach() function in PySpark Azure Databricks?

The PySpark foreach() is a transformation, which is used to iterate fetched records of RDD and return nothing. This method in PySpark runs on the cluster so each worker which contains these records is running, but they are printing out on the Spark workers stdout, not in the driver/your shell session.

Example:

rdd.foreach(lambda line: print(line))
rdd.collect()

# Prints result only in worker's log

Alternative method:

Use toLocalIterator() method to bring back results to the driver node.

for line in rdd.toLocalIterator():
    print(line)

"""
Output:

This is a sample line
This is another sample line
Again a sample line

"""

I have attached the complete code used in this blog in notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you use the foreach() function in Azure Databricks?

When you want to iterate or loop through an RDD, use the PySpark the foreach() method on RDD.

Real World Use Case Scenarios for foreach() function in Azure Databricks?

Assume that you want to check each record in an RDD by printing the results but you want to use the driver node for this. In this scenario, you can go for foreach() function. But note that the foreach prints the result in the worker log, it wouldn’t be visible in the databricks notebook. In order to print on the driver node use the toLocalIterator() fiction to print it on the databricks notebook.

What are the alternatives of the foreach() function in PySpark Azure Databricks?

There are multiple alternatives to the foreach() function, which are as follows:

  • collect() and use Python for loop
  • take() and use Python for loop

Final Thought

In this article, we have learned about the PySpark foreach() transformation in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

Arud Seka Berne S

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.