This One Spark SQL Trick Will Instantly Upgrade Your Data Analysis Game
Apache Spark comes with a robust set of window functions, each designed for a different purpose. In this article, lets explore some of the window functions provided by Spark SQL, such as Ranking Functions, Analytical Functions, and Aggregate Functions. Specifically, we’ll dive deep into the following functions:
- ROW_NUMBER
- RANK
- DENSE_RANK
- LAG
- LEAD
- CUME_DIST
- PERCENT_RANK
- NTILE
If you’re looking to enhance your Spark SQL knowledge and use cases, these functions will come in handy for various types of data analysis. Let’s start with a quick introduction to each of these functions.
The following is the sample dataset that we’ll use for testing these functions:
Emp_dept_tbl looks like
ID |
FIRST_NAME |
LAST_NAME |
DESIGNATION |
DEPARTMENT |
SALARY |
---|---|---|---|---|---|
1001 |
Ram |
Ghadiyaram |
Director of Sales |
Sales |
30000 |
1002 |
Ravi |
Rangasamy |
Marketing Manager |
Sales |
25000 |
1003 |
Ramesh |
Rangasamy |
Assistant Manager |
Sales |
25000 |
1004 |
Prem |
Sure |
Account Coordinator |
Account |
15000 |
1005 |
Phani |
G |
Accountant II |
Account |
20000 |
1006 |
Krishna |
G |
Account Coordinator |
Account |
15000 |
1007 |
Rakesh |
Krishnamurthy |
Assistant Manager |
Sales |
25000 |
1008 |
Gally |
Johnson |
Manager |
Account |
28000 |
1009 |
Richard |
Grill |
Account Coordinator |
Account |
12000 |
1010 |
Sofia |
Ketty |
Sales Coordinator |
Sales |
20000 |
1. ROW_NUMBER
This window function assigns a unique sequential integer to rows within a partition of a result set. The number starts at 1 and increases for each subsequent row.
Query:
SELECT department, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num FROM emp_dept_tbl
The ROW_NUMBER
function generates a unique number for each row, ordered by salary within each department.
For more details, refer to the ROW_NUMBER() – Spark SQL Documentation.
Result:
+----------+------+-------+
|department|salary|row_num|
+----------+------+-------+
| Sales| 30000| 1|
| Sales| 25000| 2|
| Sales| 25000| 3|
| Sales| 25000| 4|
| Sales| 20000| 5|
| Account| 28000| 1|
| Account| 20000| 2|
| Account| 15000| 3|
| Account| 15000| 4|
| Account| 12000| 5|
+----------+------+-------+
2. Rank :
The RANK
function is similar to ROW_NUMBER
but it assigns the same rank to rows with the same value in the ordered column. However, when duplicates occur, the next rank is skipped.
Query:
SELECT department, salary,
RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk
FROM emp_dept_tbl
This ensures that rows with the same salary within a department share the same rank. However, the rank after duplicates will skip a value.
Check out more about RANK() – Spark SQL Documentation
Result:
+----------+------+---+
|department|salary|rnk|
+----------+------+---+
| Sales| 30000| 1|
| Sales| 25000| 2|
| Sales| 25000| 2|
| Sales| 25000| 2|
| Sales| 20000| 5|
| Account| 28000| 1|
| Account| 20000| 2|
| Account| 15000| 3|
| Account| 15000| 3|
| Account| 12000| 5|
+----------+------+---+
3. DENSE_RANK
The DENSE_RANK
function works like RANK
, but it does not leave gaps in ranking when duplicate values exist. If two rows share the same rank, the next rank will be consecutive.
Query:
SELECT department, salary,
DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk
FROM emp_dept_tbl
Here, unlike RANK
, the ranks are assigned sequentially without gaps.
Learn more about DENSE_RANK() – Spark SQL Documentation.
Result:
+----------+------+-------+
|department|salary|dns_rnk|
+----------+------+-------+
| Sales| 30000| 1|
| Sales| 25000| 2|
| Sales| 25000| 2|
| Sales| 25000| 2|
| Sales| 20000| 3|
| Account| 28000| 1|
| Account| 20000| 2|
| Account| 15000| 3|
| Account| 15000| 3|
| Account| 12000| 4|
+----------+------+-------+
All three related functions Row number, Rank, and Dense Rank
+-------+-----+-------+---+-------+
| dept| sal|row_num|rnk|dns_rnk|
+-------+-----+-------+---+-------+
| Sales|30000| 1| 1| 1|
| Sales|25000| 2| 2| 2|
| Sales|25000| 3| 2| 2|
| Sales|25000| 4| 2| 2|
| Sales|20000| 5| 5| 3|
|Account|28000| 1| 1| 1|
|Account|20000| 2| 2| 2|
|Account|15000| 3| 3| 3|
|Account|15000| 4| 3| 3|
|Account|12000| 5| 5| 4|
+-------+-----+-------+---+-------+
4. LAG
The lag()
function provides access to a value from a previous row in the same result set, allowing you to compare the current row’s value with the previous row’s value in an ordered partition. It is often used to calculate differences between consecutive rows, like the difference in sales from one month to the next.
Syntax:
LAG(expression, offset, default_value) OVER (PARTITION BY partition_column ORDER BY order_column)
- expression: The column or expression to reference.
- offset: The number of rows to look back (default is 1).
- default_value: The value to return when the lag is not available (e.g., for the first row, where there’s no prior value).
Query :
SELECT department, salary, LAG(salary, 1, 0) OVER (PARTITION BY department ORDER BY salary) AS prev_salary
FROM emp_dept_tbl
This function will allow you to compare the current salary with the previous one, and if there’s no previous salary (e.g., for the first record in each partition), it will return 0
.
Result :
+----------+------+-----------+
|department|salary|prev_salary|
+----------+------+-----------+
| Sales| 30000| 0|
| Sales| 25000| 30000|
| Sales| 25000| 25000|
| Sales| 25000| 25000|
| Sales| 20000| 25000|
| Account| 28000| 0|
| Account| 20000| 28000|
| Account| 15000| 20000|
| Account| 15000| 15000|
| Account| 12000| 15000|
+----------+------+-----------+
5. LEAD
The lead()
function is the opposite of lag()
. It provides access to a value from the next row in the result set, enabling comparisons between the current row and the next one.
Syntax:
LEAD(expression, offset, default_value) OVER (PARTITION BY partition_column ORDER BY order_column)
- expression: The column or expression to reference.
- offset: The number of rows to look ahead (default is 1).
- default_value: The value to return when the lead is not available (e.g., for the last row, where there’s no next value).
Query :
SELECT department, salary, LEAD(salary, 1, 0) OVER (PARTITION BY department ORDER BY salary) AS next_salary
FROM emp_dept_tbl
Here, the lead()
function compares the current salary with the next salary. If there’s no next row, it defaults to 0
.
Result:
+----------+------+-----------+
|department|salary|next_salary|
+----------+------+-----------+
| Sales| 30000| 25000|
| Sales| 25000| 25000|
| Sales| 25000| 25000|
| Sales| 25000| 20000|
| Sales| 20000| 0|
| Account| 28000| 20000|
| Account| 20000| 15000|
| Account| 15000| 15000|
| Account| 15000| 12000|
| Account| 12000| 0|
+----------+------+-----------+
Apache Spark SQL Documentation – Window Functions: LAG and LEAD
Explanation of LAG
and LEAD
:
LAG
: This function allows you to reference a value from the previous row in a partitioned set. In the query above, for each department, it will return the salary from the previous row (ordered by salary descending). The third argument0
is the default value when there is no previous row (e.g., for the first row in each partition).LEAD
: Similar toLAG
, but it looks at the next row’s value instead. In the query, it compares the current row’s salary with the next row’s salary. If there is no next row (e.g., for the last row in each partition), the default value0
will be used.
6.CUME_DIST (Cumulative Distribution)
The CUME_DIST
function computes the cumulative distribution of a value in a dataset. It calculates the relative position of a column value within its group.
Formula:
CUME_DIST(salary) = Number of rows with value ≤ salary / Total rows in the dataset
Query:
SELECT department, salary,
CUME_DIST() OVER (PARTITION BY department ORDER BY salary DESC) AS cum_dist
FROM emp_dept_tbl
For a deeper dive into cumulative distribution, see CUME_DIST() – Spark SQL Documentation.
Result :
+----------+------+--------+
|department|salary|cum_dist|
+----------+------+--------+
| Sales| 30000| 0.2|
| Sales| 25000| 0.8|
| Sales| 25000| 0.8|
| Sales| 25000| 0.8|
| Sales| 20000| 1.0|
| Account| 28000| 0.2|
| Account| 20000| 0.4|
| Account| 15000| 0.8|
| Account| 15000| 0.8|
| Account| 12000| 1.0|
+----------+------+--------+
7. PERCENT_RANK
PERCENT_RANK
works similarly to CUME_DIST
, but instead of a cumulative distribution, it returns the rank as a percentage. The first row in any dataset starts with a rank of 0, and the value is a double.
Formula:
PERCENT_RANK = (Rank of current row - 1) / (Total rows in partition - 1)
Query:
SELECT department, salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS perc_rnk
FROM emp_dept_tbl
Find out more about PERCENT_RANK() – Spark SQL Documentation.
Result:
+----------+------+---+--------+
|department|salary|rnk|perc_rnk|
+----------+------+---+--------+
| Sales| 30000| 1| 0.0|
| Sales| 25000| 2| 0.25|
| Sales| 25000| 2| 0.25|
| Sales| 25000| 2| 0.25|
| Sales| 20000| 5| 1.0|
| Account| 28000| 1| 0.0|
| Account| 20000| 2| 0.25|
| Account| 15000| 3| 0.5|
| Account| 15000| 3| 0.5|
| Account| 12000| 5| 1.0|
+----------+------+---+--------+
8. NTILE
The NTILE
function divides the number of rows in a partition into a specified number of ranked groups or “buckets”. Each row in the partition is assigned to a specific bucket based on the sorting order.
Query:
SELECT department, salary,
NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS ntile
FROM emp_dept_tbl
This query divides the dataset into four “buckets” based on salary.
Check out more about NTILE() – Spark SQL Documentation.
Result:
+----------+------+-----+
|department|salary|ntile|
+----------+------+-----+
| Sales| 30000| 1|
| Sales| 25000| 1|
| Sales| 25000| 2|
| Sales| 25000| 3|
| Sales| 20000| 4|
| Account| 28000| 1|
| Account| 20000| 1|
| Account| 15000| 2|
| Account| 15000| 3|
| Account| 12000| 4|
+----------+------+-----+
Complete Apache Spark, Scala code snippet to execute in local system:
package com.examples
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, SparkSession}
/**
* @author : Ram Ghadiyaram
*/
object AnalyticalFunctionTest extends App {
Logger.getLogger("org").setLevel(Level.WARN)
val spark: SparkSession = SparkSession.builder().master("local[*]").appName(this.getClass.getName)
.config("spark.sql.warehouse.dir", new java.io.File("spark-warehouse").getAbsolutePath)
.master("local[*]")
.getOrCreate()
import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
Seq(
"ID,FIRST_NAME,LAST_NAME,DESIGNATION,DEPARTMENT,SALARY",
"1001,Ram,Ghadiyaram,Director of Sales,Sales,30000",
"1002,Ravi,Rangasamy,Marketing Manager,Sales,25000",
"1003,Ramesh,Rangasamy,Assistant Manager,Sales,25000",
"1004,Prem,Sure,Account Coordinator,Account,15000",
"1005,Phani ,G,Accountant II,Account,20000",
"1006,Krishna,G,Account Coordinator,Account,15000",
"1007,Rakesh,Krishnamurthy,Assistant Manager,Sales,25000",
"1008,Gally,Johnson,Manager,Account,28000",
"1009,Richard,Grill,Account Coordinator,Account,12000",
"1010,Sofia,Ketty,Sales Coordinator,Sales,20000"
)
).toDS()
val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
frame.show()
frame.printSchema()
frame.createOrReplaceTempView("emp_dept_tbl")
println("1) Row number :")
spark.sql(
"""
|SELECT department, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC)
|AS row_num FROM emp_dept_tbl
|""".stripMargin).show()
println("2) Rank :")
spark.sql(
"""
|SELECT department, salary, RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk FROM emp_dept_tbl
|""".stripMargin
).show()
println("3) DENSE_RANK :")
spark.sql(
"""
|SELECT department, salary, DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk FROM emp_dept_tbl
|""".stripMargin
).show()
println("All 3 Row number, Rank, and Dense Rank together ")
spark.sql(
"""
|SELECT department AS dept, salary AS sal,
|ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num,
|RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
|DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk
|FROM emp_dept_tbl
|""".stripMargin
).show()
println("4) LAG : Comparing current salary with previous row's salary")
spark.sql(
"""
|SELECT department, salary, LAG(salary, 1, 0) OVER (PARTITION BY department ORDER BY salary DESC) AS prev_salary
|FROM emp_dept_tbl
|""".stripMargin
).show()
println("5) LEAD : Comparing current salary with next row's salary")
spark.sql(
"""
|SELECT department, salary, LEAD(salary, 1, 0) OVER (PARTITION BY department ORDER BY salary DESC) AS next_salary
|FROM emp_dept_tbl
|""".stripMargin
).show()
println("6) CUME_DIST :")
spark.sql(
"""
|SELECT department, salary, CUME_DIST() OVER (PARTITION BY department ORDER BY salary DESC) AS cum_dist FROM emp_dept_tbl
|""".stripMargin
).show()
println("7) PERCENT_RANK :")
spark.sql(
"""
|SELECT department, salary, RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
|PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS perc_rnk FROM emp_dept_tbl
|""".stripMargin
).show()
println("8) NTILE :")
spark.sql(
"""
|SELECT department, salary, NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS ntile FROM emp_dept_tbl
|""".stripMargin
).show()
}
Software used to test: Spark 3.0.0 , Intellij , Scala 2.12 in windows environment
Conclusion
Window functions in Apache Spark SQL offer an incredibly powerful way to perform complex calculations across different partitions of your data. By using ROW_NUMBER, RANK, DENSE_RANK, LAG, LEAD, CUME_DIST, PERCENT_RANK, and NTILE, you can unlock a wealth of analytic capabilities for your Spark applications.
For a full list of window functions and their use cases, check the Apache Spark SQL Functions Documentation.
Happy coding 😃, and may your queries run faster and more efficiently with Spark!