How to read CSV files in PySpark Azure Databricks?

Are you looking to find out how to read CSV files into PySpark DataFrame in Azure Databricks cloud or maybe you are looking for a solution, to multiple CSV files into PySpark DataFrame in Azure Databricks using the read() method? If you are looking for any of these problem solutions, you have landed on the correct page. I will also show you how to use PySpark to read CSV files into DataFrames in Azure Databricks. I will explain it by taking a practical example. So don’t waste time let’s start with a step-by-step guide to understanding how to read CSV files into PySpark DataFrame.

In this blog, I will teach you the following with practical examples:

  • Different methods of reading single file
  • Read from multiple files
  • Read from multiple files using wild card
  • Read from directory
  • Common CSV options
  • Write CSV file

In PySpark Azure Databricks, the read method is used to load files from an external source into a DataFrame.

Apache Spark Official Documentation Link: DataFrameReader()

Create a simple DataFrame

Let’s understand the use of the fill() function with various examples. Let’s start by creating a DataFrame.

Gentle reminder:

In Databricks,

  • sparkSession made available as spark
  • sparkContext made available as sc

In case, you want to create it manually, use the below code.

from pyspark.sql.session import SparkSession

spark = SparkSession.builder 
    .master("local[*]") 
    .appName("azurelib.com") 
    .getOrCreate()

sc = spark.sparkContext

Folder Structure:

Before start learning let’s have a quick look at my folder structure and the files inside it.

The folder “read_write_csv” has 2 files and 1 folder in it and the folder “read_directory” has three files in it.

File structure:

/mnt/practice/read_write_csv/ <- base location
|– lap_times_1.csv
|– lap_times_2.csv
|– read_directory
|– lap_3.csv
|– lap_times_1.csv
|– lap_times_2.csv

Download the files and place them in the appropriate folder, as mentioned above. Each file has 50 records, excluding the header.

How to read a single CSV file in multiple ways into PySpark DataFrame in Azure Databricks?

To read a CSV file into a PySpark DataFrame, use the csv(“path”) method provided by DataFrameReader. In this section, I will teach you how to read a single CSV file using various practical methods with examples.

Examples:

# Varioud methods of reading file

# replace the file_path with the source file location of yours.

# Method 1:
df = spark.read.format("csv").load(f"{base_path}/lap_times_1.csv")
# Method 2:
df = spark.read.load(format="csv", path=f"{base_path}/lap_times_1.csv")
# Method 3:
df = spark.read.format("csv").option("path", f"{base_path}/lap_times_1.csv").load()
# Method 4:
df = spark.read.csv(file_path)

df.printSchema()
df.show(3)

#The above four example gives the following output.

"""
Output:

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)

+------+--------+---+--------+--------+------------+
|   _c0|     _c1|_c2|     _c3|     _c4|         _c5|
+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
|   841|      20|  1|       1|1:38.109|       98109|
|   841|      20|  2|       1|1:33.006|       93006|
+------+--------+---+--------+--------+------------+

"""

How to read multiple CSV files into PySpark DataFrame in Azure Databricks?

To read a CSV file into a PySpark DataFrame, use the csv(“path”) method provided by DataFrameReader. In this section, I will teach you how to read multiple CSV files using practical methods with examples.

Examples:

df = spark.read.option("header", True).csv(f"{base_path}/lap_times_1.csv")
print(f"First file count: {df.count()}")

df = spark.read.option("header", True).csv(f"{base_path}/lap_times_2.csv")
print(f"Second file count: {df.count()}")

# Reading multiple files appends the second file to the first.

multiple_files = [f"{base_path}/lap_times_1.csv", f"{base_path}/lap_times_2.csv"]

df = spark.read.option("header", True).csv(multiple_files)
print(f"Mulitple file count: {df.count()}")

"""
Output:

First file count: 50
Second file count: 50
Mulitple file count: 100

"""

As you know, we have two files each of which has 50 records, 2 * 50 = 100 records excluding headers.

How to read multiple CSV files using wildcard into PySpark DataFrame in Azure Databricks?

To read a CSV file into a PySpark DataFrame, use the csv(“path”) method provided by DataFrameReader. With practical examples, I will teach you how to read multiple CSV files using wildcards.

Examples:

df = spark.read.option("header", True).csv(f"{base_path}/lap_times_*.csv")
print(f"Multiple file count using wildcard(*): {df.count()}")

"""
Output:

Multiple file count using wildcard(*): 100

"""

As you know, we have two files each of which has 50 records, 2 * 50 = 100 records excluding headers.

How to read CSV files from a directory into PySpark DataFrame in Azure Databricks?

To read a CSV file into a PySpark DataFrame, use the csv(“path”) method provided by DataFrameReader. With examples, I will teach you how to read CSV files from a directory using various read method.

Examples:

df = spark.read.option("header", True).csv(f"{base_path}/read_directory/")
print(f"Multiple file count using wildcard(*): {df.count()}")

"""
Output:

Multiple file count using wildcard(*): 150

"""

As you know, we have two files each of which has 50 records, 3 * 50 = 150 records excluding headers.

Commonly used CSV option while reading files into PySpark DataFrame in Azure Databricks?

There are numerous ways to work with CSV files using the PySpark CSV dataset. Some of the most significant choices are discussed with examples in the section below.

Option 1: header

The “header” option represents the first record of the file to be the column. Let’s see with an example.

# 1. Set header = False
spark.read.option("header", False).csv(f"{base_path}/lap_times_1.csv").show(3)

"""
Output:

+------+--------+---+--------+--------+------------+
|   _c0|     _c1|_c2|     _c3|     _c4|         _c5|
+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
|   841|      20|  1|       1|1:38.109|       98109|
|   841|      20|  2|       1|1:33.006|       93006|
+------+--------+---+--------+--------+------------+

"""

# 2. Set header = True
spark.read.option("header", True).csv(f"{base_path}/lap_times_1.csv").show(3)

"""
Output:

+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
+------+--------+---+--------+--------+------------+
|   841|      20|  1|       1|1:38.109|       98109|
|   841|      20|  2|       1|1:33.006|       93006|
|   841|      20|  3|       1|1:32.713|       92713|
+------+--------+---+--------+--------+------------+

"""

Option 2: delimiter

The “delimiter” option represents what basic record values are terminated. Let’s see with an example.

spark.read.option("delimiter", "|").csv(f"{base_path}/lap_times_1.csv").show(3, truncate=False)

"""
Output:

+----------------------------------------------+
|_c0                                           |
+----------------------------------------------+
|raceId,driverId,lap,position,time,milliseconds|
|841,20,1,1,"1:38.109",98109                   |
|841,20,2,1,"1:33.006",93006                   |
+----------------------------------------------+

"""

spark.read.option("delimiter", ",").csv(f"{base_path}/lap_times_1.csv").show(3)

"""
Output:

+------+--------+---+--------+--------+------------+
|   _c0|     _c1|_c2|     _c3|     _c4|         _c5|
+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
|   841|      20|  1|       1|1:38.109|       98109|
|   841|      20|  2|       1|1:33.006|       93006|
+------+--------+---+--------+--------+------------+

"""

Option 3: inferSchema

The “inferSchema” option analyze the column datatype itself. Let’s see with an example.

spark.read.option("header", True) \
.option("inferSchema", False).csv(f"{base_path}/lap_times_1.csv").printSchema()

"""
Output:

root
 |-- raceId: string (nullable = true)
 |-- driverId: string (nullable = true)
 |-- lap: string (nullable = true)
 |-- position: string (nullable = true)
 |-- time: string (nullable = true)
 |-- milliseconds: string (nullable = true)

"""

spark.read.option("header", True) \
.option("inferSchema", True).csv(f"{base_path}/lap_times_1.csv").printSchema()

"""
Output:

root
 |-- raceId: integer (nullable = true)
 |-- driverId: integer (nullable = true)
 |-- lap: integer (nullable = true)
 |-- position: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- milliseconds: integer (nullable = true)

"""

Option 4: nullValue

By passing a value to nullValue, it makes the specified data to be null. Let’s see with an example.

spark.read.option("nullValue", "841").csv(f"{base_path}/lap_times_1.csv").show(3)

"""
Output:

+------+--------+---+--------+--------+------------+
|   _c0|     _c1|_c2|     _c3|     _c4|         _c5|
+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
|  null|      20|  1|       1|1:38.109|       98109|
|  null|      20|  2|       1|1:33.006|       93006|
+------+--------+---+--------+--------+------------+

"""

Option 5: timestampFormat

The “timestampFormat” parses the string time format to time format, but it needs a defined schema. Let’s see with an example.

print("a) Without schema:")

spark.read \
.option("header", True) \
.option("timestampFormat", "m:ss.SSS") \
.csv(f"{base_path}/lap_times_1.csv") \
.select("time").printSchema()

"""
Output:

a) Without schema:
root
 |-- time: string (nullable = true)

"""
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, TimestampType

lap_schema = StructType([
    StructField('raceId', IntegerType()),
    StructField('driverId', IntegerType()),
    StructField('lap', IntegerType()),
    StructField('position', DoubleType()),
    StructField('time', TimestampType()), # Change me later -> 01:38.0
    StructField('milliseconds', IntegerType())
])

print("b) With schema:")
df = spark.read \
.schema(lap_schema) \
.option("header", True) \
.option("timestampFormat", "m:ss.SSS") \
.csv(f"{base_path}/lap_times_1.csv") \
.select("time").printSchema()

"""
Output:

b) With schema:
root
 |-- time: timestamp (nullable = true)

"""

Datetime Patterns for Formatting and Parsing: link

Similarly, we have dateFormat and a lot of options, which you can refer it by clicking here.

How to set multiple options in PySpark DataFrame in Azure Databricks?

Examples:

df = spark.read \
.options(header=True, inferSchema=True, nullValues="841") \
.csv(f"{base_path}/lap_times_1.csv")

df.printSchema()
df.show(3)

"""
Output:

root
 |-- raceId: integer (nullable = true)
 |-- driverId: integer (nullable = true)
 |-- lap: integer (nullable = true)
 |-- position: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- milliseconds: integer (nullable = true)

+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
+------+--------+---+--------+--------+------------+
|   841|      20|  1|       1|1:38.109|       98109|
|   841|      20|  2|       1|1:33.006|       93006|
|   841|      20|  3|       1|1:32.713|       92713|
+------+--------+---+--------+--------+------------+

"""

How to write CSV files using DataFrameWriter method in Azure Databricks?

To write a CSV file into a PySpark DataFrame, use the save(“path”) method provided by DataFrameReader. In this section, I will teach you how to write CSV files using various practical methods with examples.

Examples:

df.write.save("target_location")

1. Make use of the option while writing CSV files into the target location.

df.write.options(header=True).save(“target_location”)

2. Using mode() while writing files, There are multiple modes available and they are:

  • overwrite – mode is used to overwrite the existing file.
  • append – To add the data to the existing file.
  • ignore – Ignores write operation when the file already exists.
  • error(default) – When the file already exists, it returns an error.

df.write.mode(“overwrite”).save(“target_location”)

I have attached the complete code used in this blog in a notebook format in this GitHub link. You can download and import this notebook in databricks, jupyter notebook, etc.

When should you use the PySpark read in Azure Databricks?

These could be the possible reasons:

  1. To read a single CSV file
  2. To read multiple CSV files
  3. To read CSV files using wild card
  4. To read CSV files from a directory

Real World Use Case Scenarios for PySpark read function in Azure Databricks?

Assume you were given a parquet files dataset location and asked to read files using PySpark, you can use the PySpark spark.read() to fetch and convert the parquet file into a DataFrame.

What are the alternatives to the read function in PySpark Azure Databricks?

The PySpark function read() is the only one that helps in reading files from multiple locations.

Final Thoughts

In this article, we have learned about the PySpark read and write methods to read or write CSV files into PySparks DataFrame in Azure Databricks along with the examples explained clearly. I have also covered different scenarios with practical examples that could be possible. I hope the information that was provided helped in gaining knowledge.

Please share your comments and suggestions in the comment section below and I will try to answer all your queries as time permits.

PySpark in Azure Databricks, as explained by Arud Seka Berne S on azurelib.com.

As a big data engineer, I design and build scalable data processing systems and integrate them with various data sources and databases. I have a strong background in Python and am proficient in big data technologies such as Hadoop, Hive, Spark, Databricks, and Azure. My interest lies in working with large datasets and deriving actionable insights to support informed business decisions.