How to cache RDD and DataFrame in PySpark Azure Databricks?

Are you looking to find out how to cache RDD in PySpark on the Azure Databricks cloud or maybe you are looking for a solution, to cache DataFrame in PySpark on Azure Databricks? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use caching by solution a problem in PySpark Azure Databricks. I will explain it with a practical example. So please don’t waste time let’s start with a step-by-step guide to understand how to cache RDD and DataFrame in PySpark Azure Databricks.

In this blog, I will teach you the following with practical examples:

  • Problem Statement
  • Caching RDD
  • Caching DataFrame

The PySpark’s cache() function is used for storing intermediate results of transformation. The cache() function will not store intermediate results unitil you call an action.

Syntax:

dataframe_name.cache()

Apache Spark Official documentation link: cache()

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

Why do we need to cache intermediate transformation results in PySpark Azure Databrciks?

In this section, let’s try to understand why we need to cache the intermediate results of transformation in PySpark Azure Databricks with an example.

Example:

from pyspark.sql.functions import col

data = [
    ("Mamie","Treharne"),
    ("Erv","Colam"),
    ("Daren","Salliss"),
    ("Vania","Laundon"),
    ("Jay","Kees")
]

df1 = spark.createDataFrame(data, schema=["f_name","l_name"]) # <- (1)

df2 = df1.filter(col("l_name").endswith("s")) # <- (2)
df2.show()

df3 = df2.filter(col("f_name").startswith("D")) # <- (3)
df3.show()

"""
Output:

+------+-------+
|f_name| l_name|
+------+-------+
| Daren|Salliss|
|   Jay|   Kees|
+------+-------+

+------+-------+
|f_name| l_name|
+------+-------+
| Daren|Salliss|
+------+-------+

"""

In the above, we can see that we have three transformations and two actions. Let’s consider we have n records in our DataFrame. As we all know unless we have an action, the code will not get executed because of lazy evaluation.

In the above code, the df2.show() is the first action and this triggers the (1) -> spark.createDataFrame and (2) -> df1.filter transformations.

We also have another transformation df3.show(), which triggers the (1) -> spark.createDataFrame and df1.filter again.

Therefore, in this cache we are triggering the (1) -> spark.createDataFrame and (2) -> df1.filter twice. Whenever the dataset is huge, this leads to performance issues. This can be easily solved by caching the intermediate result of these transformations.

How to cache DataFrame in PySpark Azure Databricks?

In this section, let’s see how to cache DataFrame in PySpark Azure Databricks by improving the above mentioned example.

Example:

from pyspark.sql.functions import col

data = [
    ("Mamie","Treharne"),
    ("Erv","Colam"),
    ("Daren","Salliss"),
    ("Vania","Laundon"),
    ("Jay","Kees")
]

df1 = spark.createDataFrame(data, schema=["f_name","l_name"]).cache() # <- (1)

df2 = df1.filter(col("l_name").endswith("s")).cache() # <- (2)
df2.show()

df3 = df2.filter(col("f_name").startswith("D"))
df3.show()

"""
Output:

+------+-------+
|f_name| l_name|
+------+-------+
| Daren|Salliss|
|   Jay|   Kees|
+------+-------+

+------+-------+
|f_name| l_name|
+------+-------+
| Daren|Salliss|
+------+-------+

"""

Since the (1) and (2) transformation was cached, the df2.filter() will not run the (1) and (2) transformation again. It runs the transformation on top of cached transformation results.

How to cache RDD in PySpark Azure Databricks?

In this section, let’s see how to cache RDD in PySpark Azure Databricks with an example.

Example:

rdd1 = df.rdd.cache() # <- (1)

rdd2 = rdd1.filter(lambda tup: tup[1][-1] == 's') # <- (2)
rdd2.collect()

rdd3 = rdd2.filter(lambda tup: tup[0][0] == 'D') # <- (3)
rdd3.collect()

"""
Output:

[Row(f_name='Daren', l_name='Salliss'), Row(f_name='Jay', l_name='Kees')]

[Row(f_name='Daren', l_name='Salliss')]

"""

Since the (1) and (2) transformation was cached, the df2.filter() will not run the (1) and (2) transformation again. It runs the transformation on top of cached transformation results.

I have attached the complete code used in this blog in notebook format to this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you cache transformation results in PySpark using Azure Databricks?

Think about a situation where we use the caching technique to perform multiple PySpark transformations in a lineage. The performance of subsequent transformations that employ the outcomes of earlier transformations is considerably improved by caching the intermediate results.

Real World Use Case Scenarios for caching intermediate results in PySpark Azure Databricks?

Assume that you perform a lot of transformations and actions. Also, each transformation is dependent on the other. In this scenario, each action triggers the first transformation because the second transformation depends on the first one. Therefore, it becomes inefficient. In order to overcome this, we use the cache, which stores the intermediate results of transformation in memory.

What are the alternatives for caching intermediate results in PySpark using Azure Databricks?

There are multiple alternatives for caching intermediate results in a PySpark DataFrame, which are as follows:

  • persist(): used for caching RDD and DataFrame in different Storage Levels.

Final Thoughts

In this article, we have learned about how to cache RDD and DataFrame in PySpark Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

Arud Seka Berne S

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.