当前位置: 首页 > news >正文

HIVE自定义UDAF函数

GenericUDAFEvaluator的方法:

文章目录

    • GenericUDAFEvaluator的方法:
      • 一、介绍
      • 二、UDAF编写步骤
        • 步骤1:
        • 步骤2:
          • init()方法:
          • iterate()方法:
          • merge()方法:
          • terminate()方法:
          • getNewAggregationBuffer()方法:
          • reset()方法:
        • 步骤3:

HIVE提供了丰富的内置函数,但是对于一些复杂逻辑还是需要自定义函数来实现,对此,HIVE也提供了一些自定义的接口和类。

UDF:一进一出,一对一的关系数据

UDTF:一进多处,一对多的关系数据

UDAF:多进一出,多对一的关系数据

一、介绍

// 确定各个阶段输入输出参数的数据格式ObjectInspectors  
public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;  
  
// 保存数据聚集结果的类  
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;  
  
// 重置聚集结果  
public void reset(AggregationBuffer agg) throws HiveException;  
  
// map阶段,迭代处理输入sql传过来的列数据  
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;  
  
// map与combiner结束返回结果,得到部分数据聚集结果  
public Object terminatePartial(AggregationBuffer agg) throws HiveException;  
  
// combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。  
public void merge(AggregationBuffer agg, Object partial) throws HiveException;  
  
// reducer阶段,输出最终结果  
public Object terminate(AggregationBuffer agg) throws HiveException; 

自定义的UDAF的执行逻辑如图:图片信息来自于:https://blog.csdn.net/zyz_home/article/details/79889519
请添加图片描述
请添加图片描述

二、UDAF编写步骤

模拟max()函数

步骤1:

自定义缓冲类MaxBuffer,继承类GenericUDAFEvaluator.AbstractAggregationBuffer

public class MaxBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {
    // 用于接收结果
    private int ans;
    public MaxBuffer(){}
    public MaxBuffer(int ans){this.ans = ans;}
    public int getAns(){
        return ans;
    }
    public void setAns(int ans){
        this.ans = ans;
    }
    public void add(int next){
        ans = Math.max(this.ans, next);
    }

}

步骤2:

自定义处理类MaxEvaluator,继承GenericUDAFEvaluator,重写方法

public class MaxEvaluator extends GenericUDAFEvaluator {}

创建三个变量,输入、输出、缓冲区

private PrimitiveObjectInspector in;  // 输入的参数
private ObjectInspector out;  // 输出的参数
private PrimitiveObjectInspector buffer;  // 缓冲区
init()方法:

根据不同的阶段,处理参数

@Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
      super.init(m,parameters);
      if (Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m) ){
        // map阶段,处理每一行数据
        in = (PrimitiveObjectInspector) parameters[0];
      }else{
        // combine和reduce阶段,处理与聚合结果
        buffer = (PrimitiveObjectInspector) parameters[0];
      }

      // 输出的类型为int
      out = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
      return out;
    }

Mode共4个模式

iterate()方法:

每行数据调用一次

@Override
// 每行执行一次,对于map阶段
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
  // 调用缓冲区数据
  ((MaxBuffer)agg).add((Integer) parameters[0]);
}

terminatePartial()方法:

Partial2阶段会调用,类似于map端的combine,预聚合

@Override
// 预聚合,对于combine阶段
public Object terminatePartial(AggregationBuffer agg) throws HiveException {

  return terminate(agg);
}
merge()方法:

Partial2阶段和final阶段都会调用,聚合buffer中的数据

@Override
// 合并缓冲区,对于combine、reduce阶段
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
  int in = (int) buffer.getPrimitiveJavaObject(partial);
  int ans = ((MaxBuffer) agg).getAns();
  ((MaxBuffer)agg).add(in);
}
terminate()方法:

final阶段调用,会聚合最终结果

@Override// 最终聚合public Object terminate(AggregationBuffer agg) throws HiveException {return ((MaxBuffer)agg).getAns();}
getNewAggregationBuffer()方法:

得到一个新的缓冲区,会对这一组数据做处理

 @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { return new MaxBuffer(); }
reset()方法:

初始化缓冲区,可置空缓冲区

@Overridepublic void reset(AggregationBuffer agg) throws HiveException {    ((MaxBuffer)agg).setAns(0);}

步骤3:

自定义类MaxFunc,继承类AbstractGenericUDAFResolver,重写getEvaluator方法

public class MaxBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {    // 用于接收结果    private int ans;    public MaxBuffer(){}    public MaxBuffer(int ans){this.ans = ans;}    public int getAns(){        return ans;    }    public void setAns(int ans){        this.ans = ans;    }    public void add(int next){        ans = Math.max(this.ans, next);    }}
    // 用于接收结果
    private int ans;
    public MaxBuffer(){}
    public MaxBuffer(int ans){this.ans = ans;}
    public int getAns(){
        return ans;
    }
    public void setAns(int ans){
        this.ans = ans;
    }
    public void add(int next){
        ans = Math.max(this.ans, next);
    }
}
public class MaxEvaluator extends GenericUDAFEvaluator {

    private PrimitiveObjectInspector in;  // 输入的参数
    private ObjectInspector out;  // 输出的参数
    private PrimitiveObjectInspector buffer;  // 缓冲区

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        return new MaxBuffer();
    }

    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        super.init(m,parameters);
        if (Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m) ){
            in = (PrimitiveObjectInspector) parameters[0];
        }else{
            buffer = (PrimitiveObjectInspector) parameters[0];
        }

        // 输出的类型
        out = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
        return out;
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
        ((MaxBuffer)agg).setAns(0);
    }

    @Override
    // 每行执行一次
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        // 调用缓冲区数据
        ((MaxBuffer)agg).add((Integer) parameters[0]);
    }

    @Override
    // 预聚合
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {

        return terminate(agg);
    }

    @Override
    // 合并缓冲区
    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
        int in = (int) buffer.getPrimitiveJavaObject(partial);
        int ans = ((MaxBuffer) agg).getAns();
        ((MaxBuffer)agg).add(in);
    }

    @Override
    // 最终聚合
    public Object terminate(AggregationBuffer agg) throws HiveException {
        return ((MaxBuffer)agg).getAns();
    }
}
public class MaxFunc extends AbstractGenericUDAFResolver {
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
        return new MaxEvaluator();
    }

    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        return new MaxEvaluator();
    }
}

打包上传集群,测试

create TEMPORARY FUNCTION self_max as 'com.lnnu.udaf.MaxFunc'using jar 'udafmaxv1.jar';with d as (  select 1 as num, 'key' as k  union all   select 2 as num, 'key' as k)select   k, self_max(num)from d group by k 

相关文章:

  • java计算机毕业设计基于安卓Android的校园助手APP
  • 计算机网络笔记(王道考研) 第四章:网络层
  • MySQL必知必会---检索数据
  • ORACLE认证课程
  • emplace()
  • 【代码随想录】回溯算法刷题
  • Pyhon——datetime、calendar模块
  • 爆刷leetcode——链表(二)
  • 【翻译】Style-Aware Normalized Loss for Improving Arbitrary Style Transfer
  • Spring的component-scan XML配置和@ComponentScan注解配置
  • Java 基础知识回顾(一)
  • shell脚本之函数的引入
  • JDK的下载与安装详细教程
  • 刷题记录(NC231128 Steadily Growing Steam,NC21467 [NOIP2018]货币系统,NC235950 多重背包)
  • 详解红黑树【C++实现】
  • 0基础学习移动端适配
  • 30秒的PHP代码片段(1)数组 - Array
  • echarts花样作死的坑
  • Fabric架构演变之路
  • IDEA常用插件整理
  • Java程序员幽默爆笑锦集
  • Java深入 - 深入理解Java集合
  • Service Worker
  • Spring框架之我见(三)——IOC、AOP
  • yii2中session跨域名的问题
  • 聊聊spring cloud的LoadBalancerAutoConfiguration
  • 扫描识别控件Dynamic Web TWAIN v12.2发布,改进SSL证书
  • 王永庆:技术创新改变教育未来
  • 一加3T解锁OEM、刷入TWRP、第三方ROM以及ROOT
  • Android开发者必备:推荐一款助力开发的开源APP
  • NLPIR智能语义技术让大数据挖掘更简单
  • ​Z时代时尚SUV新宠:起亚赛图斯值不值得年轻人买?
  • ​批处理文件中的errorlevel用法
  • !$boo在php中什么意思,php前戏
  • #define 用法
  • #NOIP 2014# day.1 T3 飞扬的小鸟 bird
  • (2021|NIPS,扩散,无条件分数估计,条件分数估计)无分类器引导扩散
  • (4)logging(日志模块)
  • (NSDate) 时间 (time )比较
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (附源码)spring boot球鞋文化交流论坛 毕业设计 141436
  • (汇总)os模块以及shutil模块对文件的操作
  • (生成器)yield与(迭代器)generator
  • (十三)Java springcloud B2B2C o2o多用户商城 springcloud架构 - SSO单点登录之OAuth2.0 根据token获取用户信息(4)...
  • (转)四层和七层负载均衡的区别
  • .apk 成为历史!
  • .describe() python_Python-Win32com-Excel
  • .NET 8 中引入新的 IHostedLifecycleService 接口 实现定时任务
  • .net core 调用c dll_用C++生成一个简单的DLL文件VS2008
  • .Net Core 中间件验签
  • .NET 材料检测系统崩溃分析
  • .NET高级面试指南专题十一【 设计模式介绍,为什么要用设计模式】
  • .NET性能优化(文摘)
  • .secret勒索病毒数据恢复|金蝶、用友、管家婆、OA、速达、ERP等软件数据库恢复
  • ::前边啥也没有