I have been working with Apache Spark for a while now and would like to share some UDF tips and tricks I have learned over the past year.
Below is the sample data (i.e. people.json) used to demonstrate example of UDF in Apache Spark.
{"name":"Michael","age":15,"sal":1.00} {"name":"Andy", "age":30,"sal":1.00} {"name":"Justin", "age":19,"sal":1.00}
The code below displays various way to declare and use UDF with Apache Spark.I have extracted and explained each of them in the section below it.The examples have been tested with Apache Spark version 1.5.2 and 1.6.2.
package com.udf.exampe import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import org.apache.spark.{SparkConf, SparkContext} object UDFTest { def main(args: Array[String]) { val sparkConf = new SparkConf() sparkConf.setMaster("local") sparkConf.setSparkHome(System.getenv("SPARK_HOME")) sparkConf.setAppName("UDFTest") val sc = new SparkContext(sparkConf) implicit val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val addOne = (i: Int) => i + 1 val df = sqlContext.read.json("/path/to/json/people.json") // Works only with DSL API val addOneByFunction = udf(addOne) //DSL df.select(addOneByFunction($"age") as "test1D").show // Works With Sql Expr and DSL API val addOneByRegister = sqlContext.udf.register("addOneByRegister", addOne) //SQL df.selectExpr("addOneByRegister(age) as test2S").show //DSL df.select(addOneByRegister($"age") as "test2D").show //DSL by callUDF df.select(callUDF("addOneByRegister", $"age") as "test2CallUDF").show // How to pass literal values to UDF ? val addByLit = udf((i: Int, x: Int) => x + i) //DSL `lit` function df.select(addByLit(lit(3), $"age") as "testLitF").show def addByCurryFunction(i: Int) = udf((x: Int) => x + i) //DSL literal value via CURRYING df.select(addByCurryFunction(2)($"age") as "testLitC").show def addByCurry(i: Int) = (x: Int) => x + i val addByCurryRegister = sqlContext.udf.register("addByCurryRegister", addByCurry(2)) //SQL literal value via CURRYING df.selectExpr("addByCurryRegister(age) as testLitC1").show //DSL literal value via CURRYING df.select(addByCurryRegister($"age") as "testLitC2").show //More than 22 UDF Argument val udfWith23Arg = udf((array: Seq[Double]) => array.sum) //DSL Example df.select(udfWith23Arg(array($"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal")) as "total").show sc.stop() } }
- Example 1 : The example below wraps simple Scala function addOne as Spark UDF via call to higher order function org.apache.spark.sql.functions.udf. You can only use the returned function via DSL API.
val addOne = (i: Int) => i + 1 val addOneByFunction = udf(addOne) //DSL : Usage df.select(addOneByFunction($"age") as "test1D").show
- Example 2 : The example below wraps simple Scala function as Spark UDF via call to higher order function sqlContext.udf.register.You can use both registered function (with SQL) and returned function (with DSL).
val addOne = (i: Int) => i + 1 // Works With Sql Expr and DSL API val addOneByRegister = sqlContext.udf.register("addOneByRegister", addOne) //SQL : Usage df.selectExpr("addOneByRegister(age) as test2S").show //DSL : Usage df.select(addOneByRegister($"age") as "test2D").show //DSL way to invoke UDF via callUDF function in org.apache.sql.functions package df.select(callUDF("addOneByRegister", $"age") as "test2CallUDF").show
- Example 3 : The example below wraps simple Scala function literal which takes two parameters as input and returns the sum of the two parameters as Spark UDF via call to higher order function org.apache.sql.functions.udf. You can only use the returned function via DSL API.
- We will pass the first parameter as literal value via lit function in org.apache.spark.sql.functions package
- We will pass second parameter age column from DataFrame.
val addByLit = udf((i: Int, x: Int) => x + i) //DSL : Usage : `lit` function to pass literal value 3 ==> will return 3 + age df.select(addByLit(lit(3), $"age") as "testLitF").show
- Example 4 :The example below wraps simple Scala function addByCurryFunction as Spark UDF via call to higher order function org.apache.sql.functions.udf. You can only use the returned function via DSL API.
def addByCurryFunction(i: Int) = udf((x: Int) => x + i) /DSL : Usage : pass literal value 2 via CURRYING df.select(addByCurryFunction(2)($"age") as "testLitC").show
- Example 5 : The example below wraps simple Scala function addByCurry as Spark UDF via call to higher order function sqlContext.udf.register. You can use both registered function (with SQL) and returned function (with DSL).
def addByCurry(i: Int) = (x: Int) => x + i val addByCurryRegister = sqlContext.udf.register("addByCurryRegister", addByCurry(2)) //SQL : Usage : literal value 2 via CURRYING ==> will return 2 + age df.selectExpr("addByCurryRegister(age) as testLitC1").show //DSL : Usage : literal value 2 via CURRYING ==> will return 2 + age df.select(addByCurryRegister($"age") as "testLitC2").show
The last example is important because org.apache.spark.sql.functions.udf function (Scala Doc) will allow you to create udf with max 10 parameters and sqlContext.udf.register function (Scala Doc) allow you to create udf with max 22 parameters.
If you have a situation where you need to pass more than 22 parameters to UDF.It’s not very hard to write your own implementation via extending the Spark library.The challenge is maintaining that extended library because spark is evolving everyday we really never wanted to deal with the breaking changes in future so we ended passing those parameter via org.apache.spark.sql.functions.array function and it just worked absolutely perfectly for us. Similarly you can also use org.apache.spark.sql.functions.struct function instead if you like.
//More than 22 UDF Argument val udfWith23Arg = udf((array: Seq[Double]) => array.sum) //DSL Example df.select(udfWith23Arg(array($"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal", $"sal")) as "total").show
Hopefully the information provided here is helpful.
Good Luck!!!