It’s been a while since I wrote a posts here is one interesting one which will help you to do some cool stuff with Spark and Windowing functions.I would also like to thank and appreciate Suresh my colleague for helping me learn this awesome SQL functionality.
Window Functions helps us to compare current row with other rows in the same dataframe, calculating running totals , sequencing of events and sessionization of transactions etc.
I will cover couple of examples which will demonstrate the usage of Window Functions.Let’s create the simple employee dataframe to work on the various analytical and ranking functions.
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ // Create Spark Session val sparkSession = SparkSession.builder.master("local").appName("Window Function").getOrCreate() import sparkSession.implicits._ // Create Sample Dataframe val empDF = sparkSession.createDataFrame(Seq( (7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10), (7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30), (7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30), (7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20), (7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30), (7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30), (7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10), (7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20), (7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10), (7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30), (7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20) )).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
First of all we will need to define the window we will be working on i.e. we will partition by department (deptno) and order by salary (sal). Below is the code to do it via Spark Dataframe API.
val partitionWindow = Window.partitionBy($"deptno").orderBy($"sal".desc)
- Rank salary within each department
//SQL SELECT empno,deptno,sal,RANK() OVER (partition by deptno ORDER BY sal desc) as rank FROM emp;
//DF API val rankTest = rank().over(partitionWindow) empDF.select($"*", rankTest as "rank").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | rank |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 1 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 2 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 3 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 1 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 2 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 3 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 1 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 2 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 3 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 4 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 4 |
- Dense Rank salary within each department
//SQL SELECT empno,deptno,sal,DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal desc) as dense_rank FROM emp;
//DF API val rankTest = dense_rank().over(partitionWindow) empDF.select($"*", rankTest as "dense_rank").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | dense_rank |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 1 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 2 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 3 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 1 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 2 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 3 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 1 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 2 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 3 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 4 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 4 |
- Row Number within each department
//SQL SELECT empno,deptno,sal,ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal desc) as row_num FROM emp;
//DF API val rowNumberTest = row_number().over(partitionWindow) empDF.select($"*", rowNumberTest as "row_number").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | row_number |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 1 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 2 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 3 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 1 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 2 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 3 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 1 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 2 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 3 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 4 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 5 |
- Running Total (Salary) within each department
//SQL SELECT empno,deptno,sal,sum(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as running_total FROM emp;
//DF API val sumTest = sum($"sal").over(partitionWindow) empDF.select($"*", sumTest as "running_total").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | running_total |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 3000 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 5975 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 7075 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 5000 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 7450 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 8250 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 2850 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 4450 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 5950 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 8450 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 8450 |
- Lead function allows us to compare current row with subsequent rows within each partition depending on the second argument (offset) which is by default set to 1 i.e. next row but you can change that parameter 2 to compare against every other row.The 3rd parameter is default value to be returned when no subsequent values exists or null.
//SQL SELECT empno,deptno,sal,lead(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as next_val FROM emp;
//DF API val leadTest = lead($"sal", 1, 0).over(partitionWindow) empDF.select($"*", leadTest as "next_val").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | next_val |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 2975 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 1100 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 0 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 2450 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 800 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 0 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 1600 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 1500 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 1250 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 1250 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 0 |
- Lag function allows us to compare current row with preceding rows within each partition depending on the second argument (offset) which is by default set to 1 i.e. previous row but you can change that parameter 2 to compare against every other preceding row.The 3rd parameter is default value to be returned when no preceding values exists or null.
//SQL SELECT empno,deptno,sal,lag(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as pre_val FROM emp;
//DF API val lagTest = lag($"sal", 1, 0).over(partitionWindow) empDF.select($"*", lagTest as "prev_val").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | prev_val |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 0 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 3000 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 2975 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 0 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 5000 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 2450 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 0 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 2850 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 1600 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 1500 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 1250 |
- First value within each partition .i.e. highest salary (we are using order by descending) within each department can be compared against every member within each department.
//SQL SELECT empno,deptno,sal,first_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as first_val FROM emp;
//DF API val firstValTest = first($"sal").over(partitionWindow) empDF.select($"*", firstValTest as "first_val").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | first_val |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 3000 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 3000 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 3000 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 5000 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 5000 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 5000 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 2850 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 2850 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 2850 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 2850 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 2850 |
- Last value within each partition .i.e. lowet salary (we are using order by descending) within each department can be compared against every member within each department.
//SQL SELECT empno,deptno,sal,last_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc) as last_val FROM emp;
//DF API val lastValTest = last($"sal").over(partitionWindow) empDF.select($"*", lastValTest as "last_val").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | last_val |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 3000 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 2975 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 1100 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 5000 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 2450 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 800 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 2850 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 1600 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 1500 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 1250 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 1250 |
Oops what happened here the last_val has the same value as in sal column but we were expecting the lowest salary within the department in the last_val column so for that we really need to understand how the window operates and works. There are two types of frames ROW and RANGE.The details are explained in this posts from databricks.
This happens because default window frame is range between unbounded preceding and current row, so the last_value() never looks beyond current row unless you change the frame.
- Last value fixed by supplying the window frame for last_val() to operate on. We will be using start frame current row and end frame unbounded following to get the last value.
//Define new window partition to operate on row frame val partitionWindowWithUnboundedFollowing = Window.partitionBy($"deptno").orderBy($"sal".desc).rowsBetween(Window.currentRow, Window.unboundedFollowing)
//SQL SELECT empno,deptno,sal,last_value(sal) OVER (PARTITION BY deptno ORDER BY sal desc ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as last_val FROM emp;
//DF API val lastValTest2 = last($"sal").over(partitionWindowWithUnboundedFollowing) empDF.select($"*", lastValTest2 as "last_val").show
Results :
empno | ename | job | mgr | hiredate | sal | comm | deptno | last_val |
---|---|---|---|---|---|---|---|---|
7788 | SCOTT | ANALYST | 7566 | 19Apr87 | 3000 | 0 | 20 | 1100 |
7566 | JONES | MANAGER | 7839 | 2Apr81 | 2975 | 0 | 20 | 1100 |
7876 | ADAMS | CLERK | 7788 | 23May87 | 1100 | 0 | 20 | 1100 |
7839 | KING | PRESIDENT | 0 | 17Nov81 | 5000 | 0 | 10 | 800 |
7782 | CLARK | MANAGER | 7839 | 9Jun81 | 2450 | 0 | 10 | 800 |
7369 | SMITH | CLERK | 7902 | 17Dec80 | 800 | 20 | 10 | 800 |
7698 | BLAKE | MANAGER | 7839 | 1May81 | 2850 | 0 | 30 | 1250 |
7499 | ALLEN | SALESMAN | 7698 | 20Feb81 | 1600 | 300 | 30 | 1250 |
7844 | TURNER | SALESMAN | 7698 | 8Sep81 | 1500 | 0 | 30 | 1250 |
7521 | WARD | SALESMAN | 7698 | 22Feb81 | 1250 | 500 | 30 | 1250 |
7654 | MARTIN | SALESMAN | 7698 | 28Sep81 | 1250 | 1400 | 30 | 1250 |
If you see the above table the issue is resolved. To understand more in details please read through the databricks posts.
I really enjoy using window functions they are very powerful and sometimes solve complex problems with just one single line of SQL.
Thank You.