• Home
  • About Me

Software Theory and Practice

  • Home
  • About Me
Home  /  Analytics • Big Data • Hadoop • Programming • Scala • Spark  /  Apache Spark Analytical Window Functions
16 May 2017
Analytics

Apache Spark Analytical Window Functions

Alvin Henrick 1 Comment 45429 Views

It’s been a while since I wrote a posts here is one interesting one which will help you to do some cool stuff with Spark and Windowing functions.I would also like to thank and appreciate Suresh my colleague for helping me learn this awesome SQL functionality.
Window Functions helps us to compare current row with other rows in the same dataframe, calculating running totals , sequencing of events and sessionization of transactions etc.

I will cover couple of  examples which will demonstrate the usage of Window Functions.Let’s create the simple employee dataframe to work on the various analytical and ranking functions.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Create Spark Session
val sparkSession = SparkSession.builder.master("local").appName("Window Function").getOrCreate()
import sparkSession.implicits._

// Create Sample Dataframe
val empDF = sparkSession.createDataFrame(Seq(
      (7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
      (7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
      (7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
      (7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
      (7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
      (7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
      (7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
      (7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
      (7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
      (7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
      (7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
    )).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")

First of all we will need to define the window we will be working on i.e. we will partition by department (deptno) and order by salary (sal). Below is the code to do it via Spark Dataframe API.

val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
  • Rank salary within each department
//SQL
SELECT empno,deptno,sal,RANK() OVER (partition by deptno ORDER BY sal desc) as rank FROM emp;
//DF API
val rankTest = rank().over(partitionWindow)
empDF.select($"*", rankTest as "rank").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnorank
7788SCOTTANALYST756619Apr8730000201
7566JONESMANAGER78392Apr8129750202
7876ADAMSCLERK778823May8711000203
7839KINGPRESIDENT017Nov8150000101
7782CLARKMANAGER78399Jun8124500102
7369SMITHCLERK790217Dec8080020103
7698BLAKEMANAGER78391May8128500301
7499ALLENSALESMAN769820Feb811600300302
7844TURNERSALESMAN76988Sep8115000303
7521WARDSALESMAN769822Feb811250500304
7654MARTINSALESMAN769828Sep8112501400304
  • Dense Rank salary within each department
//SQL
SELECT empno,deptno,sal,DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal desc) as dense_rank FROM emp;
//DF API
val rankTest = dense_rank().over(partitionWindow)
empDF.select($"*", rankTest as "dense_rank").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnodense_rank
7788SCOTTANALYST756619Apr8730000201
7566JONESMANAGER78392Apr8129750202
7876ADAMSCLERK778823May8711000203
7839KINGPRESIDENT017Nov8150000101
7782CLARKMANAGER78399Jun8124500102
7369SMITHCLERK790217Dec8080020103
7698BLAKEMANAGER78391May8128500301
7499ALLENSALESMAN769820Feb811600300302
7844TURNERSALESMAN76988Sep8115000303
7521WARDSALESMAN769822Feb811250500304
7654MARTINSALESMAN769828Sep8112501400304
  • Row Number within each department
//SQL
SELECT empno,deptno,sal,ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal desc) as row_num FROM emp;
//DF API
val rowNumberTest = row_number().over(partitionWindow)
empDF.select($"*", rowNumberTest as "row_number").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnorow_number
7788SCOTTANALYST756619Apr8730000201
7566JONESMANAGER78392Apr8129750202
7876ADAMSCLERK778823May8711000203
7839KINGPRESIDENT017Nov8150000101
7782CLARKMANAGER78399Jun8124500102
7369SMITHCLERK790217Dec8080020103
7698BLAKEMANAGER78391May8128500301
7499ALLENSALESMAN769820Feb811600300302
7844TURNERSALESMAN76988Sep8115000303
7521WARDSALESMAN769822Feb811250500304
7654MARTINSALESMAN769828Sep8112501400305
  • Running Total (Salary) within each department
//SQL
SELECT empno,deptno,sal,sum(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as running_total FROM emp;
//DF API
val sumTest = sum($"sal").over(partitionWindow)
empDF.select($"*", sumTest as "running_total").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnorunning_total
7788SCOTTANALYST756619Apr8730000203000
7566JONESMANAGER78392Apr8129750205975
7876ADAMSCLERK778823May8711000207075
7839KINGPRESIDENT017Nov8150000105000
7782CLARKMANAGER78399Jun8124500107450
7369SMITHCLERK790217Dec8080020108250
7698BLAKEMANAGER78391May8128500302850
7499ALLENSALESMAN769820Feb811600300304450
7844TURNERSALESMAN76988Sep8115000305950
7521WARDSALESMAN769822Feb811250500308450
7654MARTINSALESMAN769828Sep8112501400308450
  • Lead function allows us to compare current row with subsequent rows within each partition depending on the second argument (offset) which is by default set to 1 i.e. next row but you can change that parameter 2 to compare against every other row.The 3rd parameter is default value to be returned when no subsequent values exists or null.
//SQL
SELECT empno,deptno,sal,lead(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as next_val FROM emp;
//DF API
val leadTest = lead($"sal", 1, 0).over(partitionWindow)
empDF.select($"*", leadTest as "next_val").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnonext_val
7788SCOTTANALYST756619Apr8730000202975
7566JONESMANAGER78392Apr8129750201100
7876ADAMSCLERK778823May8711000200
7839KINGPRESIDENT017Nov8150000102450
7782CLARKMANAGER78399Jun812450010800
7369SMITHCLERK790217Dec8080020100
7698BLAKEMANAGER78391May8128500301600
7499ALLENSALESMAN769820Feb811600300301500
7844TURNERSALESMAN76988Sep8115000301250
7521WARDSALESMAN769822Feb811250500301250
7654MARTINSALESMAN769828Sep8112501400300
  • Lag function allows us to compare current row with preceding rows within each partition depending on the second argument (offset) which is by default set to 1 i.e. previous row but you can change that parameter 2 to compare against every other preceding row.The 3rd parameter is default value to be returned when no preceding values exists or null.
//SQL
SELECT empno,deptno,sal,lag(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as pre_val FROM emp;
//DF API
val lagTest = lag($"sal", 1, 0).over(partitionWindow)
empDF.select($"*", lagTest as "prev_val").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnoprev_val
7788SCOTTANALYST756619Apr8730000200
7566JONESMANAGER78392Apr8129750203000
7876ADAMSCLERK778823May8711000202975
7839KINGPRESIDENT017Nov8150000100
7782CLARKMANAGER78399Jun8124500105000
7369SMITHCLERK790217Dec8080020102450
7698BLAKEMANAGER78391May8128500300
7499ALLENSALESMAN769820Feb811600300302850
7844TURNERSALESMAN76988Sep8115000301600
7521WARDSALESMAN769822Feb811250500301500
7654MARTINSALESMAN769828Sep8112501400301250
  • First value within each partition .i.e. highest salary (we are using order by descending) within each department can be compared against every member within each department.
//SQL
SELECT empno,deptno,sal,first_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as first_val FROM emp;
//DF API
val firstValTest = first($"sal").over(partitionWindow)
empDF.select($"*", firstValTest as "first_val").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnofirst_val
7788SCOTTANALYST756619Apr8730000203000
7566JONESMANAGER78392Apr8129750203000
7876ADAMSCLERK778823May8711000203000
7839KINGPRESIDENT017Nov8150000105000
7782CLARKMANAGER78399Jun8124500105000
7369SMITHCLERK790217Dec8080020105000
7698BLAKEMANAGER78391May8128500302850
7499ALLENSALESMAN769820Feb811600300302850
7844TURNERSALESMAN76988Sep8115000302850
7521WARDSALESMAN769822Feb811250500302850
7654MARTINSALESMAN769828Sep8112501400302850
  • Last value within each partition .i.e. lowet salary (we are using order by descending) within each department can be compared against every member within each department.
//SQL
SELECT empno,deptno,sal,last_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as last_val FROM emp;
//DF API
val lastValTest = last($"sal").over(partitionWindow)
empDF.select($"*", lastValTest as "last_val").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnolast_val
7788SCOTTANALYST756619Apr8730000203000
7566JONESMANAGER78392Apr8129750202975
7876ADAMSCLERK778823May8711000201100
7839KINGPRESIDENT017Nov8150000105000
7782CLARKMANAGER78399Jun8124500102450
7369SMITHCLERK790217Dec808002010800
7698BLAKEMANAGER78391May8128500302850
7499ALLENSALESMAN769820Feb811600300301600
7844TURNERSALESMAN76988Sep8115000301500
7521WARDSALESMAN769822Feb811250500301250
7654MARTINSALESMAN769828Sep8112501400301250

Oops what happened here the last_val has the same value as in sal column but we were expecting the lowest salary within the department in the last_val column so for that we really need to understand how the window operates and works. There are two types of frames ROW and RANGE.The details are explained in this posts from databricks.

This happens because default window frame is range between unbounded preceding and current row, so the last_value() never looks beyond current row unless you change the frame.

  • Last value fixed by supplying the window frame for last_val() to operate on. We will be using start frame current row and end frame unbounded following to get the last value.
//Define new window partition to operate on row frame
val partitionWindowWithUnboundedFollowing = Window.partitionBy($"deptno").orderBy($"sal".desc).rowsBetween(Window.currentRow, Window.unboundedFollowing)
//SQL
SELECT empno,deptno,sal,last_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as last_val FROM emp;
//DF API
val lastValTest2 = last($"sal").over(partitionWindowWithUnboundedFollowing)
empDF.select($"*", lastValTest2 as "last_val").show

Results  :

empnoenamejobmgrhiredatesalcommdeptnolast_val
7788SCOTTANALYST756619Apr8730000201100
7566JONESMANAGER78392Apr8129750201100
7876ADAMSCLERK778823May8711000201100
7839KINGPRESIDENT017Nov815000010800
7782CLARKMANAGER78399Jun812450010800
7369SMITHCLERK790217Dec808002010800
7698BLAKEMANAGER78391May8128500301250
7499ALLENSALESMAN769820Feb811600300301250
7844TURNERSALESMAN76988Sep8115000301250
7521WARDSALESMAN769822Feb811250500301250
7654MARTINSALESMAN769828Sep8112501400301250

If you see the above table the issue is resolved. To understand more in details please read through the databricks posts.

I really enjoy using window functions they are very powerful and sometimes solve complex problems with just one single line of SQL.

Thank You.

About Author

Alvin Henrick

Previous Article Apache Spark User Defined Functions
Next Article How to index geospatial data with Apache Spark Solr Connector and query with Solr Client

Related Posts

  • How to index geospatial data with Apache Spark Solr Connector and query with Solr Client

    How to index geospatial data with Apache Spark Solr Connector and query with Solr Client

  • Apache Spark User Defined Functions

    Apache Spark User Defined Functions

  • Query Nested JSON via Spark SQL

    Query Nested JSON via Spark SQL

Search

Recent Posts

  • How to index geospatial data with Apache Spark Solr Connector and query with Solr Client August 1, 2018
  • Apache Spark Analytical Window Functions May 16, 2017
  • Apache Spark User Defined Functions July 10, 2016
  • Query Nested JSON via Spark SQL November 26, 2015
  • Docker backup and restore volume container January 26, 2015

Calendar

May 2017
M T W T F S S
1234567
891011121314
15161718192021
22232425262728
293031  
« Jul   Aug »