Lesson 5: Azure Databricks Spark Tutorial – DataFrame API

In this lesson 5 of our Azure Spark tutorial series I will take you through Spark Dataframe, RDD, schema and other operations and its internal working. I will also take you through how and where you can access various Azure Databricks functionality needed in your day to day big data analytics processing. In case you haven’t gone through my first Lesson 1 of Azure Databricks tutorial, I would highly recommend going to lesson 1 to understand the Azure Databricks from scratch. For creating your first Azure Databricks free trial account follow this link : Create Azure Databricks Account. Let’s dive into the tutorial now.

What is the RDD and what was the problem with RDD

RDD is the most basic abstraction in Spark, whenever we read/write the data in the spark or Databricks under the hood it is represented as RDD. RDD stands for Resilient Distributed Dataset. Internally data get divided into the partitions or chunk and all these partitions can be represents through RDD.

Problem with RDD

RDD has a compute function that produces an Iterator[T] for the data that will be stored in the RDD. Compute function (or computation) is opaque to Spark. That is, Spark does not know what you are doing in the compute function. Whether you are performing a join, filter, select, or aggregation, Spark only sees it as a lambda expression. Besides this Iterator[T] data type is also opaque for Python RDD.

Hence Spark doesn’t comprehend your compute function and data type of the object. This won’t allow Spark to use its own power to optimize the code written by you in proper efficient query plan. Eventually this may cause for slower performance.

DataFrame

It will solve the problem of RDD. It is structure API which helps in better performance and space efficiency across Spark components. Spark DataFrames are like distributed in-memory tables with named columns and schemas, where each column has a specific data type: integer, string, array, map, real, date, timestamp, etc.

DataFrames are immutable and Spark keeps a lineage of all transformations. It mean whenever you make any changes in the dataframe like add column, rename column, filter rows, this will create a new dataframe.

What do you mean by schema of DataFrame?

Schema of Dataframe represents the number of column and type of the column represents by the dataframe. For example the schema for the employee dataframe could be :

employee_name STRING, employee_address STRING, salary INT

Databricks Spark Data Types

Data TypeValue assigned in ScalaValue assigned in PythonAPI to Instantiate
ByteTypeByteintDataTypes.ByteType
ShortTypeShortintDataTypes.ShortType
IntegerTypeIntintDataTypes.IntegerType
LongTypeLongintDataTypes.LongType
FloatTypeFloatfloatDataTypes.FloatType
DoubleTypeDoublefloatDataTypes.DoubleType
StringTypeStringstrDataTypes.StringType
BooleanTypeBooleanboolDataTypes.BooleanType
DecimalTypejava.math.BigDecimaldecimal.DecimalDecimalType
Table 1: Basic Data Type in Spark

Data TypeValue assigned in ScalaValue assigned in PythonSCALA API to InstantiatePython API to Instantiate
BinaryTypeArray[Byte]bytearrayDataTypes.BinaryTypeBinaryType()
TimestampTypejava.sql.Timestampdatetime.datetimeDataTypes.TimestampTypeTimestampType()
DateTypejava.sql.Datedatetime.dateDataTypes.DateTypeDateType()
ArrayTypescala.collection.SeqList, tuple, or arrayDataTypes.createArrayType(ElementType)ArrayType(dataType, [nullable])
MapTypescala.collection.MapdictDataTypes.createMapType(keyType,
valueType)
MapType(keyType, valueType, [nul lable])
StructTypeorg.apache.spark.sql.RowList or tupleStructType(ArrayType[fieldTypes])
StructType([fields])
StructField  StructField(name, dataType, [nulla ble])StructField(name, dataType, [nul lable])
Table 2: Complex Data Type in Spark

How to create DataFrame in Spark with Schema

There are multiple ways in which dataframe can be created. It also depends on the type of the data file as well. Let’s start with the very basic way:

Scala Example:

import org.apache.spark.sql.types._
val schema = StructType(Array(
 StructField("name", StringType, false),
 StructField("address", StringType, false),
 StructField("salary", IntegerType, false)))
val data = Seq(Row("Mike","Wellington",2000),Row("Sam","New York",232432))
val someDF = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(schema)
)
someDF.printSchema()

Python Example:

from pyspark.sql.types import *
schema = StructType([
 StructField("name", StringType()),
 StructField("address",StringType()),
 StructField("salary", IntegerType())])
data = [["Mike","Wellington",2000],["Sam","New York",232432]]
emp_df = spark.createDataFrame(data, schema)
emp_df.printSchema()

How to create DataFrame from CSV file in SCALA?

Simple dataframe creation from csv file

val df = spark.read.csv("file_path.csv")

Create dataframe by auto read schema

val df= spark.read.format("csv").option("inferSchema", "true").load("file_path.csv")

Create dataframe with header



val df= spark.read.format("csv").option("inferSchema", "true").option("header","true").load("file_path.csv")

How to create DataFrame from CSV file in pySpark?

Simple dataframe creation from csv file

df = spark.read.csv("file_path.csv")

Create dataframe by auto read schema

df= spark.read.format("csv").option("inferSchema", "true").load("file_path.csv")

Create dataframe with header


df= spark.read.format("csv").option("inferSchema", "true").option("header","true").load("file_path.csv")

How to create DataFrame from JSON file in SCALA?

Simple dataframe creation from csv file

val df = spark.read.json("file_path.json")

Create dataframe by auto read schema

val df= spark.read.format("json").option("inferSchema", "true").load("file_path.json")

Create dataframe with header



val df= spark.read.format("json").option("inferSchema", "true").option("header","true").load("file_path.json")

How to create DataFrame from JSON file in pySpark?

Simple dataframe creation from csv file

df = spark.read.json("file_path.json")

Create dataframe by auto read schema

df= spark.read.format("json").option("inferSchema", "true").load("file_path.json")

Create dataframe with header


df= spark.read.format("json").option("inferSchema", "true").option("header","true").load("file_path.json")

What are the different ways to define schema in Spark?

Schema can be defined in two different ways in spark.

  1. Programmatical way
  2. Declarative way

Above we have seen the Programmatical way. In the declarative way we employ a Data Definition Language (DDL) string, which is much simpler and easier to read.

Defining the dataframe using DDL schema example SCALA:

import org.apache.spark.sql.types._
val schema = "name STRING, address STRING, salary INT"
val data = Seq(Row("Mike","Wellington",2000),Row("Sam","New York",232432))

val empDf = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType.fromDDL(schema)
)
empDf.printSchema()

Defining the dataframe using DDL schema example pyspark Python :

from pyspark.sql.types import *

data = [["Mike","Wellington",2000],["Sam","New York",232432]]
schema1 = "name STRING, address STRING, salary INT"
emp_df = spark.createDataFrame(data, schema1)
emp_df.printSchema()

Why you should define schema up front instead of schema-on-read approach?

It has three benefit as follows :
• You relieve Spark from the onus of inferring data types.
• You prevent Spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming.
• You can detect errors early if data doesn’t match the schema.

Apache spark documentation official Link

Final Thoughts

In this series of the Azure Databricks Spark tutorial we have covered the Apache Spark dataframe creation concepts. We have learned:

TopicDetails
DataframeHow to create dataframe in scala and pyspark
SchemaHow to define schema programmatically and using the DDL
LoadHow to load variety of files like JSON,CSV TXT etc

In the next lesson we will see how to apply operations on dataframe and columns.

Leave a Comment