• Home
  • About Me

Software Theory and Practice

  • Home
  • About Me
Home  /  Analytics • Big Data • Scala • Spark • UDF  /  Apache Spark User Defined Functions
10 July 2016
Analytics

Apache Spark User Defined Functions

Alvin Henrick 1 Comment 33092 Views

I have been working with Apache Spark for a while now and would like to share some UDF tips and tricks I have learned over the past year.

Below is the sample data (i.e. people.json) used  to demonstrate example of UDF in Apache Spark.

{"name":"Michael","age":15,"sal":1.00}
{"name":"Andy", "age":30,"sal":1.00}
{"name":"Justin", "age":19,"sal":1.00}

The code below displays various way to declare and use UDF with Apache Spark.I have extracted and explained each of them in the section below it.The examples have been tested with Apache Spark version 1.5.2 and 1.6.2.

package com.udf.exampe

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}


object UDFTest {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf()
    sparkConf.setMaster("local")
    sparkConf.setSparkHome(System.getenv("SPARK_HOME"))
    sparkConf.setAppName("UDFTest")

    val sc = new SparkContext(sparkConf)

    implicit val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val addOne = (i: Int) => i + 1

    val df = sqlContext.read.json("/path/to/json/people.json")

    // Works only with DSL API
    val addOneByFunction = udf(addOne)

    //DSL
    df.select(addOneByFunction($"age") as "test1D").show

    // Works With Sql Expr and DSL API
    val addOneByRegister = sqlContext.udf.register("addOneByRegister", addOne)

    //SQL
    df.selectExpr("addOneByRegister(age) as test2S").show

    //DSL
    df.select(addOneByRegister($"age") as "test2D").show

    //DSL by callUDF
    df.select(callUDF("addOneByRegister", $"age") as "test2CallUDF").show

    // How to pass literal values to UDF ?

    val addByLit = udf((i: Int, x: Int) => x + i)

    //DSL `lit` function
    df.select(addByLit(lit(3), $"age") as "testLitF").show

    def addByCurryFunction(i: Int) = udf((x: Int) => x + i)

    //DSL literal value via CURRYING
    df.select(addByCurryFunction(2)($"age") as "testLitC").show

    def addByCurry(i: Int) = (x: Int) => x + i

    val addByCurryRegister = sqlContext.udf.register("addByCurryRegister", addByCurry(2))

    //SQL literal value via CURRYING
    df.selectExpr("addByCurryRegister(age) as testLitC1").show

    //DSL literal value via CURRYING
    df.select(addByCurryRegister($"age") as "testLitC2").show

    //More than 22 UDF Argument
    val udfWith23Arg = udf((array: Seq[Double]) => array.sum)

    //DSL Example
    df.select(udfWith23Arg(array($"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", 
      $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", 
      $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal")) as "total").show

    sc.stop()
  }

}
  • Example  1 : The example below wraps simple Scala function addOne as Spark UDF via call to higher order function org.apache.spark.sql.functions.udf. You can only use the returned function via DSL API.
val addOne = (i: Int) => i + 1

val addOneByFunction = udf(addOne)
//DSL : Usage 
df.select(addOneByFunction($"age") as "test1D").show
  • Example  2 : The example below wraps simple Scala function as Spark UDF via call to higher order function sqlContext.udf.register.You can use both registered function (with SQL) and returned function (with DSL).
val addOne = (i: Int) => i + 1

// Works With Sql Expr and DSL API
val addOneByRegister = sqlContext.udf.register("addOneByRegister", addOne)

//SQL : Usage
df.selectExpr("addOneByRegister(age) as test2S").show

//DSL : Usage
df.select(addOneByRegister($"age") as "test2D").show

//DSL way to invoke UDF via callUDF function in org.apache.sql.functions package
df.select(callUDF("addOneByRegister", $"age") as "test2CallUDF").show
  • Example  3 : The example below wraps simple Scala function literal which takes two parameters as input and returns the sum of  the two parameters as Spark UDF via call to higher order function org.apache.sql.functions.udf. You can only use the returned function via DSL API.
  1.  We will pass the first parameter as literal value via lit function in org.apache.spark.sql.functions package
  2.  We will pass second parameter age column from DataFrame.
val addByLit = udf((i: Int, x: Int) => x + i)

//DSL : Usage : `lit` function to pass literal value 3 ==> will return 3 + age 
df.select(addByLit(lit(3), $"age") as "testLitF").show
  • Example  4 :The example below wraps simple Scala function addByCurryFunction as Spark UDF via call to higher order function org.apache.sql.functions.udf. You can only use the returned function via DSL API.
def addByCurryFunction(i: Int) = udf((x: Int) => x + i)

/DSL : Usage : pass literal value 2 via CURRYING
df.select(addByCurryFunction(2)($"age") as "testLitC").show
  • Example  5 : The example below wraps simple Scala function addByCurry as Spark UDF via call to higher order function sqlContext.udf.register. You can use both registered function (with SQL) and returned function (with DSL).
 def addByCurry(i: Int) = (x: Int) => x + i

 val addByCurryRegister = sqlContext.udf.register("addByCurryRegister", addByCurry(2))

 //SQL : Usage : literal value 2 via CURRYING ==> will return 2 + age
 df.selectExpr("addByCurryRegister(age) as testLitC1").show

 //DSL : Usage :  literal value 2 via CURRYING ==> will return 2 + age
 df.select(addByCurryRegister($"age") as "testLitC2").show

The last example is important because  org.apache.spark.sql.functions.udf  function (Scala Doc) will allow you to create udf with max 10 parameters and sqlContext.udf.register function (Scala Doc) allow you to create udf with max 22 parameters.

If you have a situation where you need to pass more than 22 parameters to UDF.It’s not very hard to write your own implementation via extending the Spark library.The challenge is maintaining that extended library because spark is evolving everyday we really never wanted to deal with the breaking changes in future so we ended passing those parameter via org.apache.spark.sql.functions.array function and it just worked absolutely perfectly for us. Similarly you can also use org.apache.spark.sql.functions.struct function instead if you like.

 //More than 22 UDF Argument
 val udfWith23Arg = udf((array: Seq[Double]) => array.sum)

 //DSL Example
 df.select(udfWith23Arg(array($"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal",
 $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal",
 $"sal", $"sal", $"sal", $"sal")) as "total").show

Hopefully the information provided here is helpful.

Good Luck!!!

About Author

Alvin Henrick

Previous Article Query Nested JSON via Spark SQL
Next Article Apache Spark Analytical Window Functions

Related Posts

  • How to index geospatial data with Apache Spark Solr Connector and query with Solr Client

    How to index geospatial data with Apache Spark Solr Connector and query with Solr Client

  • Apache Spark Analytical Window Functions

    Apache Spark Analytical Window Functions

  • Query Nested JSON via Spark SQL

    Query Nested JSON via Spark SQL

Search

Recent Posts

  • How to index geospatial data with Apache Spark Solr Connector and query with Solr Client August 1, 2018
  • Apache Spark Analytical Window Functions May 16, 2017
  • Apache Spark User Defined Functions July 10, 2016
  • Query Nested JSON via Spark SQL November 26, 2015
  • Docker backup and restore volume container January 26, 2015

Calendar

July 2016
M T W T F S S
 123
45678910
11121314151617
18192021222324
25262728293031
« Nov   May »