创建SparkConf和SparkSession对象,然后导入隐式转换,接着读取json文件生成DataFrame,并将其注册为临时视图"people"。之后使用spark.udf.register方法注册名为addName的UDF,该UDF接收一个字符串参数并在前面添加"Name:"。最后通过spark.sql执行包含自定义函数的SQL语句,并展示结果,最后停止SparkSession。
Spark使用UserDefinedAggregateFunction和Aggregator。
以计算平均工资为例,展示了三种实现方式:
RDD实现:创建SparkConf和SparkContext,通过makeRDD生成数据,使用map和reduce方法计算总薪资和人数,最后求平均值并输出。
弱类型UDAF:定义MyAverageUDAF类继承UserDefinedAggregateFunction,实现inputSchema、bufferSchema、dataType、deterministic等方法,以及initialize、update、merge、evaluate等函数。在Spark中使用udf注册并在SQL中调用
强类型UDAF:定义Buff case类和MyAverageUDAF类继承Aggregator,实现zero、reduce、merge、finish等方法,以及bufferEncoder和outputEncoder。同样在Spark中注册并在SQL中调用。
如何利用IDEA开发Spark - SQL
创建子模块Spark - SQL并添加依赖:在pom.xml中添加spark - sql_2.12版本3.0.0的依赖。
创建Spark - SQL的测试代码:
创建SparkConf和SparkSession对象。
读取json文件生成DataFrame并展示。
进行SQL风格和DSL风格的语法操作,包括创建临时视图、执行SQL查询、选择特定列等。