Markets

This One Spark SQL Trick Will Instantly Upgrade Your Data Analysis Game

Apache Spark comes with a robust set of window functions, each designed for a different purpose. In this article, lets explore some of the window functions provided by Spark SQL, such as Ranking Functions, Analytical Functions, and Aggregate Functions. Specifically, we’ll dive deep into the following functions:

  • ROW_NUMBER
  • RANK
  • DENSE_RANK
  • LAG
  • LEAD
  • CUME_DIST
  • PERCENT_RANK
  • NTILE

If you’re looking to enhance your Spark SQL knowledge and use cases, these functions will come in handy for various types of data analysis. Let’s start with a quick introduction to each of these functions.

The following is the sample dataset that we’ll use for testing these functions:

Emp_dept_tbl looks like

ID

FIRST_NAME

LAST_NAME

DESIGNATION

DEPARTMENT

SALARY

1001

Ram

Ghadiyaram

Director of Sales

Sales

30000

1002

Ravi

Rangasamy

Marketing Manager

Sales

25000

1003

Ramesh

Rangasamy

Assistant Manager

Sales

25000

1004

Prem

Sure

Account Coordinator

Account

15000

1005

Phani

G

Accountant II

Account

20000

1006

Krishna

G

Account Coordinator

Account

15000

1007

Rakesh

Krishnamurthy

Assistant Manager

Sales

25000

1008

Gally

Johnson

Manager

Account

28000

1009

Richard

Grill

Account Coordinator

Account

12000

1010

Sofia

Ketty

Sales Coordinator

Sales

20000

1. ROW_NUMBER

This window function assigns a unique sequential integer to rows within a partition of a result set. The number starts at 1 and increases for each subsequent row.

Query:

SELECT department, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num FROM emp_dept_tbl

The ROW_NUMBER function generates a unique number for each row, ordered by salary within each department.

For more details, refer to the ROW_NUMBER() – Spark SQL Documentation.

Result:

+----------+------+-------+
|department|salary|row_num|
+----------+------+-------+
|     Sales| 30000|      1|
|     Sales| 25000|      2|
|     Sales| 25000|      3|
|     Sales| 25000|      4|
|     Sales| 20000|      5|
|   Account| 28000|      1|
|   Account| 20000|      2|
|   Account| 15000|      3|
|   Account| 15000|      4|
|   Account| 12000|      5|
+----------+------+-------+

2. Rank :

The RANK function is similar to ROW_NUMBER but it assigns the same rank to rows with the same value in the ordered column. However, when duplicates occur, the next rank is skipped.

Query:

SELECT department, salary,
RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk
FROM emp_dept_tbl

This ensures that rows with the same salary within a department share the same rank. However, the rank after duplicates will skip a value.

Check out more about RANK() – Spark SQL Documentation

Result:

+----------+------+---+
|department|salary|rnk|
+----------+------+---+
|     Sales| 30000|  1|
|     Sales| 25000|  2|
|     Sales| 25000|  2|
|     Sales| 25000|  2|
|     Sales| 20000|  5|
|   Account| 28000|  1|
|   Account| 20000|  2|
|   Account| 15000|  3|
|   Account| 15000|  3|
|   Account| 12000|  5|
+----------+------+---+

3. DENSE_RANK

The DENSE_RANK function works like RANK, but it does not leave gaps in ranking when duplicate values exist. If two rows share the same rank, the next rank will be consecutive.

Query:

SELECT department, salary,
DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk
FROM emp_dept_tbl

Here, unlike RANK, the ranks are assigned sequentially without gaps.

Learn more about DENSE_RANK() – Spark SQL Documentation.

Result:

+----------+------+-------+
|department|salary|dns_rnk|
+----------+------+-------+
|     Sales| 30000|      1|
|     Sales| 25000|      2|
|     Sales| 25000|      2|
|     Sales| 25000|      2|
|     Sales| 20000|      3|
|   Account| 28000|      1|
|   Account| 20000|      2|
|   Account| 15000|      3|
|   Account| 15000|      3|
|   Account| 12000|      4|
+----------+------+-------+

All three related functions Row number, Rank, and Dense Rank  
+-------+-----+-------+---+-------+
|   dept|  sal|row_num|rnk|dns_rnk|
+-------+-----+-------+---+-------+
|  Sales|30000|      1|  1|      1|
|  Sales|25000|      2|  2|      2|
|  Sales|25000|      3|  2|      2|
|  Sales|25000|      4|  2|      2|
|  Sales|20000|      5|  5|      3|
|Account|28000|      1|  1|      1|
|Account|20000|      2|  2|      2|
|Account|15000|      3|  3|      3|
|Account|15000|      4|  3|      3|
|Account|12000|      5|  5|      4|
+-------+-----+-------+---+-------+ 

4. LAG

The lag() function provides access to a value from a previous row in the same result set, allowing you to compare the current row’s value with the previous row’s value in an ordered partition. It is often used to calculate differences between consecutive rows, like the difference in sales from one month to the next.

Syntax:

LAG(expression, offset, default_value) OVER (PARTITION BY partition_column ORDER BY order_column)
  • expression: The column or expression to reference.
  • offset: The number of rows to look back (default is 1).
  • default_value: The value to return when the lag is not available (e.g., for the first row, where there’s no prior value).

Query :

SELECT department, salary, LAG(salary, 1, 0) OVER (PARTITION BY department ORDER BY salary) AS prev_salary
FROM emp_dept_tbl

This function will allow you to compare the current salary with the previous one, and if there’s no previous salary (e.g., for the first record in each partition), it will return 0.

Result :

+----------+------+-----------+
|department|salary|prev_salary|
+----------+------+-----------+
|     Sales| 30000|          0|
|     Sales| 25000|      30000|
|     Sales| 25000|      25000|
|     Sales| 25000|      25000|
|     Sales| 20000|      25000|
|   Account| 28000|          0|
|   Account| 20000|      28000|
|   Account| 15000|      20000|
|   Account| 15000|      15000|
|   Account| 12000|      15000|
+----------+------+-----------+

5. LEAD

The lead() function is the opposite of lag(). It provides access to a value from the next row in the result set, enabling comparisons between the current row and the next one.

Syntax:

LEAD(expression, offset, default_value) OVER (PARTITION BY partition_column ORDER BY order_column)
  • expression: The column or expression to reference.
  • offset: The number of rows to look ahead (default is 1).
  • default_value: The value to return when the lead is not available (e.g., for the last row, where there’s no next value).

Query :

SELECT department, salary, LEAD(salary, 1, 0) OVER (PARTITION BY department ORDER BY salary) AS next_salary
FROM emp_dept_tbl

Here, the lead() function compares the current salary with the next salary. If there’s no next row, it defaults to 0.

Result:

+----------+------+-----------+
|department|salary|next_salary|
+----------+------+-----------+
|     Sales| 30000|      25000|
|     Sales| 25000|      25000|
|     Sales| 25000|      25000|
|     Sales| 25000|      20000|
|     Sales| 20000|          0|
|   Account| 28000|      20000|
|   Account| 20000|      15000|
|   Account| 15000|      15000|
|   Account| 15000|      12000|
|   Account| 12000|          0|
+----------+------+-----------+

Apache Spark SQL Documentation – Window Functions: LAG and LEAD

Explanation of LAG and LEAD:

  1. LAG: This function allows you to reference a value from the previous row in a partitioned set. In the query above, for each department, it will return the salary from the previous row (ordered by salary descending). The third argument 0 is the default value when there is no previous row (e.g., for the first row in each partition).
  2. LEAD: Similar to LAG, but it looks at the next row’s value instead. In the query, it compares the current row’s salary with the next row’s salary. If there is no next row (e.g., for the last row in each partition), the default value 0 will be used.

6.CUME_DIST (Cumulative Distribution)

The CUME_DIST function computes the cumulative distribution of a value in a dataset. It calculates the relative position of a column value within its group.

Formula:

CUME_DIST(salary) = Number of rows with value ≤ salary / Total rows in the dataset

Query:

SELECT department, salary,
CUME_DIST() OVER (PARTITION BY department ORDER BY salary DESC) AS cum_dist
FROM emp_dept_tbl

For a deeper dive into cumulative distribution, see CUME_DIST() – Spark SQL Documentation.

Result :

+----------+------+--------+
|department|salary|cum_dist|
+----------+------+--------+
|     Sales| 30000|     0.2|
|     Sales| 25000|     0.8|
|     Sales| 25000|     0.8|
|     Sales| 25000|     0.8|
|     Sales| 20000|     1.0|
|   Account| 28000|     0.2|
|   Account| 20000|     0.4|
|   Account| 15000|     0.8|
|   Account| 15000|     0.8|
|   Account| 12000|     1.0|
+----------+------+--------+

7. PERCENT_RANK

PERCENT_RANK works similarly to CUME_DIST, but instead of a cumulative distribution, it returns the rank as a percentage. The first row in any dataset starts with a rank of 0, and the value is a double.

Formula:

PERCENT_RANK = (Rank of current row - 1) / (Total rows in partition - 1)

Query:

SELECT department, salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS perc_rnk
FROM emp_dept_tbl

Find out more about PERCENT_RANK() – Spark SQL Documentation.

Result:

+----------+------+---+--------+
|department|salary|rnk|perc_rnk|
+----------+------+---+--------+
|     Sales| 30000|  1|     0.0|
|     Sales| 25000|  2|    0.25|
|     Sales| 25000|  2|    0.25|
|     Sales| 25000|  2|    0.25|
|     Sales| 20000|  5|     1.0|
|   Account| 28000|  1|     0.0|
|   Account| 20000|  2|    0.25|
|   Account| 15000|  3|     0.5|
|   Account| 15000|  3|     0.5|
|   Account| 12000|  5|     1.0|
+----------+------+---+--------+

8. NTILE

The NTILE function divides the number of rows in a partition into a specified number of ranked groups or “buckets”. Each row in the partition is assigned to a specific bucket based on the sorting order.

Query:

SELECT department, salary,
NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS ntile
FROM emp_dept_tbl

This query divides the dataset into four “buckets” based on salary.

Check out more about NTILE() – Spark SQL Documentation.

Result:

+----------+------+-----+
|department|salary|ntile|
+----------+------+-----+
|     Sales| 30000|    1|
|     Sales| 25000|    1|
|     Sales| 25000|    2|
|     Sales| 25000|    3|
|     Sales| 20000|    4|
|   Account| 28000|    1|
|   Account| 20000|    1|
|   Account| 15000|    2|
|   Account| 15000|    3|
|   Account| 12000|    4|
+----------+------+-----+

Complete Apache Spark, Scala code snippet to execute in local system:

package com.examples

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataset, SparkSession}

/**
 * @author : Ram Ghadiyaram
 */
object AnalyticalFunctionTest extends App {
  Logger.getLogger("org").setLevel(Level.WARN)

  val spark: SparkSession = SparkSession.builder().master("local[*]").appName(this.getClass.getName)
    .config("spark.sql.warehouse.dir", new java.io.File("spark-warehouse").getAbsolutePath)
    .master("local[*]")
    .getOrCreate()

  import spark.implicits._

  val csvData: Dataset[String] = spark.sparkContext.parallelize(
    Seq(
      "ID,FIRST_NAME,LAST_NAME,DESIGNATION,DEPARTMENT,SALARY",
      "1001,Ram,Ghadiyaram,Director of Sales,Sales,30000",
      "1002,Ravi,Rangasamy,Marketing Manager,Sales,25000",
      "1003,Ramesh,Rangasamy,Assistant Manager,Sales,25000",
      "1004,Prem,Sure,Account Coordinator,Account,15000",
      "1005,Phani ,G,Accountant II,Account,20000",
      "1006,Krishna,G,Account Coordinator,Account,15000",
      "1007,Rakesh,Krishnamurthy,Assistant Manager,Sales,25000",
      "1008,Gally,Johnson,Manager,Account,28000",
      "1009,Richard,Grill,Account Coordinator,Account,12000",
      "1010,Sofia,Ketty,Sales Coordinator,Sales,20000"
    )
  ).toDS()

  val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
  frame.show()
  frame.printSchema()
  frame.createOrReplaceTempView("emp_dept_tbl")

  println("1) Row number :")
  spark.sql(
    """
      |SELECT department, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC)
      |AS row_num FROM emp_dept_tbl
      |""".stripMargin).show()

  println("2) Rank  :")
  spark.sql(
    """
      |SELECT department, salary, RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS rnk FROM emp_dept_tbl
      |""".stripMargin
  ).show()

  println("3) DENSE_RANK  :")
  spark.sql(
    """
      |SELECT department, salary, DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk FROM emp_dept_tbl
      |""".stripMargin
  ).show()
  println("All 3 Row number, Rank, and Dense Rank together ")
  spark.sql(
    """
      |SELECT department AS dept, salary AS sal,
      |ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num,
      |RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
      |DENSE_RANK() OVER(PARTITION BY department ORDER BY salary DESC) AS dns_rnk
      |FROM emp_dept_tbl
      |""".stripMargin
  ).show()
  println("4) LAG : Comparing current salary with previous row's salary")
  spark.sql(
    """
      |SELECT department, salary, LAG(salary, 1, 0) OVER (PARTITION BY department ORDER BY salary DESC) AS prev_salary
      |FROM emp_dept_tbl
      |""".stripMargin
  ).show()

  println("5) LEAD : Comparing current salary with next row's salary")
  spark.sql(
    """
      |SELECT department, salary, LEAD(salary, 1, 0) OVER (PARTITION BY department ORDER BY salary DESC) AS next_salary
      |FROM emp_dept_tbl
      |""".stripMargin
  ).show()


  println("6) CUME_DIST :")
  spark.sql(
    """
      |SELECT department, salary, CUME_DIST() OVER (PARTITION BY department  ORDER BY salary DESC) AS cum_dist FROM emp_dept_tbl
      |""".stripMargin
  ).show()

  println("7) PERCENT_RANK :")
  spark.sql(
    """
      |SELECT department, salary, RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rnk,
      |PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS perc_rnk FROM emp_dept_tbl
      |""".stripMargin
  ).show()

  println("8) NTILE :")
  spark.sql(
    """
      |SELECT department, salary, NTILE(4) OVER (PARTITION BY department ORDER BY salary DESC) AS ntile FROM emp_dept_tbl
      |""".stripMargin
  ).show()
}

Software used to test: Spark 3.0.0 , Intellij , Scala 2.12 in windows environment

Conclusion

Window functions in Apache Spark SQL offer an incredibly powerful way to perform complex calculations across different partitions of your data. By using ROW_NUMBER, RANK, DENSE_RANK, LAG, LEAD, CUME_DIST, PERCENT_RANK, and NTILE, you can unlock a wealth of analytic capabilities for your Spark applications.

For a full list of window functions and their use cases, check the Apache Spark SQL Functions Documentation.

Happy coding 😃, and may your queries run faster and more efficiently with Spark!

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button