Lesson 6: Azure Databricks Spark Tutorial – DataFrame Column

In this lesson 6 of our Azure Spark tutorial series I will take you through Spark Dataframe columns and how you can do various operations on it and its internal working. I will also take you through how and where you can access various Azure Databricks functionality needed in your day to day big data analytics processing. In case you haven’t gone through my first Lesson 1 of Azure Databricks tutorial, I would highly recommend going to lesson 1 to understand the Azure Databricks from scratch. For creating your first Azure Databricks free trial account follow this link : Create Azure Databricks Account. Let’s dive into the tutorial now.

Columns in Databricks Spark, pyspark Dataframe

Assume that we have a dataframe as follows :

schema1 = "name STRING, address STRING, salary INT"
emp_df = spark.createDataFrame(data, schema1)

Now we do following operations for the columns.

How to get the list of columns in Dataframe using Spark, pyspark

//Scala Code
emp_df.columns
#python code
emp_df.columns

How to get the column object from Dataframe using Spark, pyspark

//Scala code
emp_df.col("Salary")

How to use column with expression function in Databricks spark and pyspark

expr() is the function available inside the import org.apache.spark.sql.functions package for the SCALA and pyspark.sql.functions package for the pyspark. Hence we need to import this package to use the expr.

Example of expr() function:

//SCALA
import org.apache.spark.sql.functions._
emp_df.select(expr("Salary * 10")).show()
#python
from pyspark.sql.functions import *
emp_df.select(expr("Salary * 10")).show()

How to use $ column shorthand operator in Dataframe using Databricks Spark

emp_df.select($"salary").show()

Transformations and actions in Databricks Spark and pySpark

Projections and Filters:

A projection in general world of analytics referred as a way to return only the
rows matching a certain relational condition by using filters.

Here projections can be done using the select() method, while filters can be expressed using the filter() or where() method.

Point to be noted select won’t print the result until and unless u will use either show(), take() funtions of spark or dispaly() function of Databricks.

For example if we wanted to pull the name of all the employees where salary is more than 2000 then we use select and filter as follows:

Scala code :

emp_df.select("salary").where(col("salary")>2000).show()
Output::

+------+
|salary|
+------+
|232432|
+------+

pyspark Python code:

emp_df.select("salary").where(col("salary")>2000).show()

Aggregate Functions:

Scala code :

emp_df.agg(countDistinct('name) as 'UniqueNames).show()
Output::

+-----------+
|UniqueNames|
+-----------+
|          2|
+-----------+

pyspark Python code:

emp_df.agg(countDistinct("name").alias("UniqueNames")).show()
Output:
+-----------+
|UniqueNames|
+-----------+
|          2|
+-----------+

Distinct Function:

To get the distinct records you can use the distinct function.

empDf.distinct().show()
Output::
+----+----------+------+
|name|   address|salary|
+----+----------+------+
|Mike|Wellington|  2000|
| Sam|  New York|232432|
+----+----------+------+

Add, Rename and Drop columns in dataframe in Databricks Spark, pyspark

Add a new column in the Dataframe:

empDf.withColumn("Bonus", col("Salary")*2).show()
Output::
+----+----------+------+------+
|name|   address|salary| Bonus|
+----+----------+------+------+
|Mike|Wellington|  2000|  4000|
| Sam|  New York|232432|464864|
+----+----------+------+------+

Rename a column in Dataframe:

empDf.withColumnRenamed("Salary", "Salary_New").show()
Output::
+----+----------+----------+
|name|   address|Salary_New|
+----+----------+----------+
|Mike|Wellington|      2000|
| Sam|  New York|    232432|
+----+----------+----------+

DataFrame transformations are immutable, when we rename a column using withColumnRenamed() we get a new DataFrame while retaining the original with the old column name.

Drop a column in the Dataframe:

empDf.drop("salary").show()
Output::

+----+----------+
|name|   address|
+----+----------+
|Mike|Wellington|
| Sam|  New York|
+----+----------+

List of Action Functions in Azure Databricks Spark

Function NameDescriptionExample
collect()Returns an array that contains all rows in this Dataset.df.collect()
count()Returns the number of rows in the Dataset.df.count()
describe(cols: String*)Computes basic statistics for numeric and string columns, including count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.df.describe(“col1”, “col2”)
first()Returns the first row. Alias for head().df.first()
head()Returns the first row.df.head()
head(n: Int)Returns the first n rows.df.head(10)
show(numRows: Int, 
truncate: Int, vertical: Boolean)
Displays the Dataset in a tabular form. For example:
If vertical enabled, this command prints output rows vertically (one line per column value)?
numRows
Number of rows to show

truncate
If set to more than 0, truncates strings to truncate characters and all cells will be aligned right.

vertical
If set to true, prints output rows vertically (one line per column value).
df.show(10,1,true)
show(numRows: Int,
 truncate: Int)
numRows
Number of rows to show

truncate
If set to more than 0, truncates strings to truncate characters and all cells will be aligned right.
df.show(10,1)
show(numRows: Int,
 truncate: Boolean)
numRows
Number of rows to show

truncate
Whether truncate long strings. If true, strings more than 20 characters will be truncated and all cells will be aligned right
df.show(10,true)
show(truncate: Boolean)truncate
Whether truncate long strings. If true, strings more than 20 characters will be truncated and all cells will be aligned right
df.show(true)
show()Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters will be truncated, and all cells will be aligned right.df.show()
show(numRows: Int)Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated, and all cells will be aligned right. For example:
numRows
Number of rows to show
df.show(10)
summary(statistics: String*)Computes specified statistics for numeric and string columns. Available statistics are:

count
mean
stddev
min
max
arbitrary approximate percentiles specified as a percentage (e.g. 75%)
If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
df.summary()
tail(n: Int)Returns the last n rows in the Dataset.

Running tail requires moving data into the application’s driver process, and doing so with a very large n can crash the driver process with OutOfMemoryError.
df.tail(10)
take(n: Int)Returns the first n rows in the Dataset.

Running take requires moving data into the application’s driver process, and doing so with a very large n can crash the driver process with OutOfMemoryError.
df.take(10)
Table 1: List of Action Functions in Apache Spark

List of Transformation Functions in Azure Databricks Spark

Function NameDescription
cache()Persist this Dataset with the default storage level (MEMORY_AND_DISK).
columnsReturns all column names as an array.
createGlobalTempView(viewName: String)Creates a global temporary view using the given name.
createOrReplaceGlobalTempView(viewName: String)Creates or replaces a global temporary view using the given name
createOrReplaceTempView(viewName: String)Creates a local temporary view using the given name.
createTempView(viewName: String)Creates a local temporary view using the given name.
explain()Prints the physical plan to the console for debugging purposes
persist(newLevel: StorageLevel)Persist this Dataset with the given storage level.

newLevel
One of: MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
persist()Persist this Dataset with the default storage level (MEMORY_AND_DISK).
printSchema(level: Int)Prints the schema up to the given level to the console in a nice tree format.
printSchema()Prints the schema to the console in a nice tree format.
unpersist()Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
writeTo()Create a write configuration builder for v2 sources.
coalesce(numPartitions: Int)Returns a new Dataset that has exactly numPartitions partitions, when the fewer partitions are requested
distinct()Returns a new Dataset that contains only the unique rows from this Dataset.
dropDuplicates(colNames: Array[String])Returns a new Dataset with duplicate rows removed, considering only the subset of columns
dropDuplicates()Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias for distinct.

For a static batch Dataset, it just drops duplicate rows. For a streaming Dataset, it will keep all data across triggers as intermediate state to drop duplicates rows. You can use withWatermark to limit how late the duplicate data can be and system will accordingly limit the state. In addition, too late data older than watermark will be dropped to avoid any possibility of duplicates.
except(other: Dataset[T])Returns a new Dataset containing rows in this Dataset but not in another Dataset. This is equivalent to EXCEPT DISTINCT in SQL.
exceptAll(other: Dataset[T])Returns a new Dataset containing rows in this Dataset but not in another Dataset while preserving the duplicates. This is equivalent to EXCEPT ALL in SQL.
filter(conditionExpr: String)Filters rows using the given SQL expression.

peopleDs.filter(“age > 15”)
filter(condition: Column)Filters rows using the given SQL expression.

peopleDs.filter(“age > 15”)
intersect(other: Dataset[T])Returns a new Dataset containing rows only in both this Dataset and another Dataset. This is equivalent to INTERSECT in SQL.
intersectAll(other: Dataset[T])Returns a new Dataset containing rows only in both this Dataset and another Dataset while preserving the duplicates. This is equivalent to INTERSECT ALL in SQL.
limit(n: Int)Returns a new Dataset by taking the first n rows.
orderBy(sortExprs: Column*)Returns a new Dataset sorted by the given expressions. This is an alias of the sort function.
repartition(partitionExprs: Column*)Returns a new Dataset partitioned by the given partitioning expressions, using spark.sql.shuffle.partitions as number of partitions. The resulting Dataset is hash partitioned.

This is the same operation as “DISTRIBUTE BY” in SQL (Hive QL).
repartition(numPartitions: Int)Returns a new Dataset that has exactly numPartitions partitions.
sort(sortExprs: Column*)Returns a new Dataset sorted by the given expressions. For example:

ds.sort($”col1″, $”col2″.desc)
union(other: Dataset[T])Returns a new Dataset containing union of rows in this Dataset and another Dataset.
unionAll(other: Dataset[T])Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is an alias for union.
Table 2: List of Transformation Functions in Databricks

Apache Spark Official Page Link

Final Thoughts

In this series of the Azure Databricks Spark tutorial we have covered the Apache Spark dataframe column functions. We have learned:

In the next lesson we will see spark sql.

DeepakGoyal

Deepak Goyal is certified Azure Cloud Solution Architect. He is having around decade and half experience in designing, developing and managing enterprise cloud solutions. He is also Big data certified professional and passionate cloud advocate.