maxCompute的UDAF demo,实现累加功能

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: maxCompute的UDAF demo,实现累加功能

需求:将表中数据按照name聚合,并且count进行累加

name count
Jan 1
Jan 2
Feb 3
Feb 1
Mar 1
Mar 5

预期结果:

name count
Jan 3
Feb 7
Mar 13

使用idea的maxCompute studio新增UDAF

_
_
然后自动生成未实现的方法,我的字段name是string,count是bigint
所以@Resolve("string,bigint->bigint")

新增一个class用来存储字段

        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }

这样newBuffer就可以这样写了

    public Writable newBuffer() {
        return new MyBuffer();
    }

还需要一个Map来存储key(name)和value(count),一个long类型的参数存储累加的值

    Long old_count = 0L;//存储累加值
    private LongWritable ret = new LongWritable();//存储输出值

完整代码参考:

public class UDAFTest extends Aggregator {
    private static class MyBuffer implements Writable {
        private String  name;
        private Long count;
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(name);
            out.writeLong(count);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            name =  in.readUTF();
            count = in.readLong();
        }
    }



    @Override
    public Writable newBuffer() {
        return new MyBuffer();
    }
    Map<String,Long> map = new LinkedHashMap<>();
    Long old_count = 0L;
    @Override
    public void iterate(Writable buffer, Writable[] args) throws UDFException {
        String arg = String.valueOf(args[0]);
        Long cnt = Long.parseLong(String.valueOf(args[1]));
        MyBuffer buf = (MyBuffer) buffer;

        if (arg != null) {
            if(map.containsKey(arg)){
                Long newcnt = map.get(arg);
                old_count = cnt+newcnt;
                map.put(arg,old_count);
            }else {
                map.put(arg,old_count+cnt);
            }
        }
        buf.name = arg;
        buf.count = map.get(arg);

    }
    private LongWritable ret = new LongWritable();
    @Override
    public Writable terminate(Writable arg0) throws UDFException {
        MyBuffer buffer = (MyBuffer) arg0;
        ret.set(buffer.count);
        return ret;
    }

    @Override
    public void merge(Writable buffer, Writable partial) throws UDFException {
        MyBuffer buf = (MyBuffer) buffer;
        MyBuffer p = (MyBuffer) partial;
        buf.name = p.name;
        buf.count = p.count;
    }


}

然后通过maxCompute studio发布下
_

发布名为test20191119,这样就可以在Dataworks中调用了。

其中原表数据:
_

_

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2月前
|
缓存 分布式计算 NoSQL
大数据-43 Redis 功能扩展 Lua 脚本 对Redis扩展 eval redis.call redis.pcall
大数据-43 Redis 功能扩展 Lua 脚本 对Redis扩展 eval redis.call redis.pcall
31 2
|
2月前
|
SQL 分布式计算 NoSQL
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
大数据-42 Redis 功能扩展 发布/订阅模式 事务相关的内容 Redis弱事务
25 2
|
4月前
|
机器学习/深度学习 搜索推荐 算法
飞天大数据平台产品问题之AIRec在阿里巴巴飞天大数据平台中的功能如何解决
飞天大数据平台产品问题之AIRec在阿里巴巴飞天大数据平台中的功能如何解决
|
5月前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用合集之如何实现类似mysql实例中的数据库功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
JSON 分布式计算 大数据
MaxCompute产品使用合集之使用数据服务功能,但发现ODPS数据源不支持,该如何解决
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
SQL 分布式计算 DataWorks
MaxCompute产品使用合集之整库离线同步至MC的配置中,是否可以清除原表所有分区数据的功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
分布式计算 大数据 MaxCompute
MaxCompute产品使用合集之如何实现根据商品维度统计每件商品的断货时长的功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用合集之如何实现类似for循环的功能
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
121 3
|
5月前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之使用ODPS Tunnel Upload功能时,遇到报错:Java 堆内存不足,该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
存储 分布式计算 资源调度
ODPS SQL问题之ODPS逻辑层包含哪些角色,它们各自的功能是什么
ODPS SQL 问题之ODPS逻辑层包含哪些角色,它们各自的功能是什么