在使用sparksql的时候,除了系统内置的一些函数之外,我们还可以自定义UDF函数,然后在查询的时候使用这个UDF函数即可。整个UDF函数的流程如下:
这个UDF其实就是一个方法,在使用的时候传入一个值然后返回一个新值,也就是传入一个参数,返回一个参数。下面我们用代码来演示一下UDF函数的使用。
package org.example import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.udf object Demo { case class UserPoJo(name: String, age: Long, sex: String) def main(args: Array[String]): Unit = { //初始化sparksession,这里暂时不允许到集群中,所以简单的编写一个,如果需要运行在集群中,根据之前的文章修改成对应的sparkconf即可。 val session = SparkSession.builder().config(new SparkConf()).appName("demo").master("local[*]").getOrCreate(); //使用spark sql的时候我们都需要在业务代码里面引入下面的依赖,在引入implicits的时候,spark会把对象隐式的转成dataset对象,所以一般这些对象我们可以直接使用即可 import session.implicits._ var user = Seq(UserPoJo("张三", 12, "男"), UserPoJo("李四", 17, "男"), UserPoJo("王五", 20, "男")) val userDF = user.toDF() userDF.createOrReplaceTempView("user") //注册udf函数 session.udf.register("converToAge", converToAge(_: Int): String) //使用udf函数 session.sql("select name,converToAge(age) as age1 from user").show() } //编写udf方法 def converToAge(age: Int): String = { age match { case `age` if age > 18 => "成年" case _ => "未成年" } } }
备注:
1、在上面我们自定义了一个udf函数:converToAge,在编程的时候其实这就是一个方法,具体的逻辑我们可以自由控制。
2、在使用udf之前,必须要先把这个udf函数在session里面注册后才能使用,不注册的话是使用不了的。
运行下,看下效果
可以看到udf函数生效了。
以上就是UDF函数的使用,非常的简单,就是一进一出,在后面我们还会介绍UDAF,UDTF等函数,到时候再说。
还没有评论,来说两句吧...