【实验】阿里云大数据助理工程师认证(ACA)- ACA认证配套实验-06-MaxCompute 数据传输(下)

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 【实验】阿里云大数据助理工程师认证(ACA)- ACA认证配套实验-06-MaxCompute 数据传输(下)

2.5 tunnel upload 扫描文件


t_tunnel建表参考:

drop table if exists t_tunnel;
create table t_tunnel (id int, name string);

欲将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv上传至t_tunnel,可以在上传前先做个预扫描

tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel --scan=only;

2.6 tunnel upload 行、列分隔符


本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):


                   drop table if exists t_tunnel;
                   create table t_tunnel (id int, name string);

将文件E:\ODPS_DEMO\resources\02-DataTransfer\people_delimeter.csv文件上传到 t_tunnel 中去:


truncate table t_tunnel;
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\people_delimiter.csv t_tunnel -fd || -rd &;
read t_tunnel;

指定数据文件的列分隔符 –fd || , 行分隔符 –rd &


2.7 tunnel upload多线程


本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):

drop table if exists t_tunnel;
create table t_tunnel (id int, name string);

使用多个线程将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv文件上传到 t_tunnel 中去:


truncate table t_tunnel;
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel -threads 8;
read t_tunnel;
count t_tunnel;


当数据文件较大地,可以通过-threads N 指定N个线程同时进行装载,加快速度


2.8 tunnel download 非分区表


将表 t_tunnel 下载到本地文件 t_tunnel.csv


tunnel download t_tunnel t_tunnel.csv;


数据下载当前路径下,可以指定导出文件的具体路径,指定path\t_tunnel.csv 即可 path 根据自己的机器情况指定


2.9 tunnel download 分区表


将表 t_tunnel_p 下载到本地文件 t_tunnel_p.csv


tunnel download t_tunnel_p/gender='male' t_tunnel_p.csv;


第 3 章:tunnel JAVA SDK 定制开发数


3.1 安装配置eclipse开发环境


首先,解压下载的eclipse-java-luna-SR2-win32-x86_64.zip到E:\ODPS_DEMO,然后执行以下操作:


(1) 将E:\ODPS_DEMO\InstallMedia\odps-eclipse-plugin-bundle-0.16.0.jar 拷贝至目录E:\ODPS_DEMO\eclipse\plugins

20200711134452720.png


(2) 执行E:\ODPS_DEMO\eclipse\eclipse.exe,打开 eclipse,点击 New -> Other

(3) 配置 odps


20200711134525781.png

20200711134538594.png


(4) 指定 JavaProject Name:

20200711134555362.png


3.2 单线程上传文件


此部分建表语句参考:

drop table if exists t_tunnel_sdk;

create table t_tunnel_sdk (

id    int,
name string
)
partitioned by (gender string);

本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\UploadSample.java


(1) 新增 Java 类:

(2) 类名为UploadSample,包名为 DTSample

(3) 设计该类实现功能为将单个文件上传至ODPS的表中,需要的输入参数为:


-f <file_name>
-t <table_name>
-c <config_file>
[-p ]
[-fd <field_delimiter>]

编写方法 printUsage,提示调用语法:

private static void printUsage(String msg) {
    System.out.println(
      "Usage: UploadSample -f file \\\n"
        + "                  -t table\\\n"
        + "                  -c config_file \\\n"
        + "                  [-p partition] \\\n"
        + "                  [-fd field_delimiter] \\\n" 
        );
    if (msg != null) {
    System.out.println(msg);
    }
  }

编写获取、解析输入参数的方法 :

private static String accessId;
  private static String accessKey;  
  private static String OdpsEndpoint;
  private static String TunnelEndpoint;
  private static String project;
  private static String table;
  private static String partition;
  private static String fieldDelimeter;
  private static String file;
  private static void printUsage(String msg) {
    System.out.println(
      "Usage: UploadSample -f file \\\n"
        + "                  -t table\\\n"
        + "                  -c config_file \\\n"
        + "                  [-p partition] \\\n"
        + "                  [-fd field_delimiter] \\\n" 
        );
    if (msg != null) {
    System.out.println(msg);
    }
  }
  private static void parseArgument(String[] args) {
    for (int i = 0; i < args.length; i++) {
      if ("-f".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException("source file not specified in -f");
        }
        file = args[i];
      }
      else if ("-t".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException("ODPS table not specified in -t");
        }
        table = args[i];
      } 
      else if ("-c".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
              "ODPS configuration file not specified in -c");
        }
        try {
          InputStream is = new FileInputStream(args[i]);
          Properties props = new Properties();
          props.load(is);
          accessId = props.getProperty("access_id");
          accessKey = props.getProperty("access_key");
          project = props.getProperty("project_name");
          OdpsEndpoint = props.getProperty("end_point");
          TunnelEndpoint = props.getProperty("tunnel_endpoint");
        } catch (IOException e) {
          throw new IllegalArgumentException(
              "Error reading ODPS config file '" + args[i] + "'.");
        }
      }
      else if ("-p".equals(args[i])){
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
              "odps table partition not specified in -p");
        }
        partition = args[i];
      } 
      else if ("-fd".equals(args[i])){
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
             "odps table partition not specified in -p");
        }
        fieldDelimeter = args[i];
      } 
    } 
    if(file == null) {
      throw new IllegalArgumentException(
          "Missing argument -f file");
    }
    if (table == null) {
      throw new IllegalArgumentException(
          "Missing argument -t table");
    }  
    if (accessId == null || accessKey == null || 
        project == null || OdpsEndpoint == null || TunnelEndpoint == null) {
      throw new IllegalArgumentException(
          "ODPS conf not set, please check -c odps.conf");
      }
  }

(4) 编写方法,从文件中读出记录,同时将这些记录格式化

 读出记录,逐列处理
  import java.text.DecimalFormat;
  import java.text.ParseException;
  import java.text.SimpleDateFormat;
  import java.util.Date;
  import java.util.TimeZone;
  import com.aliyun.odps.Column;
  import com.aliyun.odps.OdpsType;
  import com.aliyun.odps.TableSchema;
  import com.aliyun.odps.data.ArrayRecord;
  import com.aliyun.odps.data.Record;
class RecordConverter {
  private TableSchema schema;
  private String nullTag;
  private SimpleDateFormat dateFormater;
  private DecimalFormat doubleFormat;
  private String DEFAULT_DATE_FORMAT_PATTERN = "yyyyMMddHHmmss";
  public RecordConverter(TableSchema schema, String nullTag, String dateFormat,
      String tz) {
    this.schema = schema;
    this.nullTag = nullTag;
    if (dateFormat == null) {
      this.dateFormater = new SimpleDateFormat(DEFAULT_DATE_FORMAT_PATTERN);
    } else {
      dateFormater = new SimpleDateFormat(dateFormat);
    }
    dateFormater.setLenient(false);
    dateFormater.setTimeZone(TimeZone.getTimeZone(tz == null ? "GMT" : tz));
    doubleFormat = new DecimalFormat();
    doubleFormat.setMinimumFractionDigits(0);
    doubleFormat.setMaximumFractionDigits(20);
  }
  /**
   * record to String array
   * */
  public String[] format(Record r) {
    int cols = schema.getColumns().size();
    String[] line = new String[cols];
    String colValue = null;
    for (int i = 0; i < cols; i++) {
      Column column = schema.getColumn(i);
      OdpsType t = column.getType();
      switch (t) {
      case BIGINT: {
        Long v = r.getBigint(i);
        colValue = v == null ? null : v.toString();
        break;
      }
      case DOUBLE: {
        Double v = r.getDouble(i);
        if (v == null) {
          colValue = null;
        } else {
          colValue = doubleFormat.format(v).replaceAll(",", "");
        }
        break;
      }
      case DATETIME: {
        Date v = r.getDatetime(i);
        if (v == null) {
          colValue = null;
        } else {
          colValue = dateFormater.format(v);
        }
        break;
      }
      case BOOLEAN: {
        Boolean v = r.getBoolean(i);
        colValue = v == null ? null : v.toString();
        break;
      }
      case STRING: {
        String v = r.getString(i);
        colValue = (v == null ? null : v.toString());
        break;
      }
      default:
        throw new RuntimeException("Unknown column type: " + t);
      }
      if (colValue == null) {
        line[i] = nullTag;
      } else {
        line[i] = colValue;
      }
    }
    return line;
  }
  /**
   * String array to record
   * @throws ParseException 
   * */
  public Record parse(String[] line){
    if (line == null) {
      return null;
    }
    int columnCnt = schema.getColumns().size();
    Column[] cols = new Column[columnCnt];
    for (int i = 0; i < columnCnt; ++i) {
      Column c = new Column(schema.getColumn(i).getName(), 
          schema.getColumn(i).getType());          
      cols[i] = c;
    }
    ArrayRecord r = new ArrayRecord(cols);
    int i = 0;
    for (String v : line) {
      if (v.equals(nullTag)) {
        i++;
        continue;
      }
      if (i >= columnCnt) {
        break;
      }
      OdpsType type = schema.getColumn(i).getType();
      switch (type) {
      case BIGINT:
        r.setBigint(i, Long.valueOf(v));
        break;
      case DOUBLE:
        r.setDouble(i, Double.valueOf(v));
        break;
      case DATETIME:
        try {
          r.setDatetime(i, dateFormater.parse(v));
        } catch (ParseException e) {
          throw new RuntimeException(e.getMessage());
        }
        break;
      case BOOLEAN:
        v = v.trim().toLowerCase();
        if (v.equals("true") || v.equals("false")) {
          r.setBoolean(i, v.equals("true") ? true : false);
        } else if (v.equals("0") || v.equals("1")) {
          r.setBoolean(i, v.equals("1") ? true : false);
        } else {
          throw new RuntimeException(
              "Invalid boolean type, expect: true|false|0|1");
        }
        break;
      case STRING:
        r.setString(i, v);
        break;
      default:
        throw new RuntimeException("Unknown column type");
      }
      i++;
    }
    return r;
  }
}

(5) 编写主方法,读取文件,上传到odps表中去:

public static void main(String args[]) {
    try {
       parseArgument(args);
     } catch (IllegalArgumentException e) {
       printUsage(e.getMessage());
       System.exit(2);
     }    
     Account account = new AliyunAccount(accessId, accessKey);
     Odps odps = new Odps(account);
     odps.setDefaultProject(project);
     odps.setEndpoint(OdpsEndpoint);  
     BufferedReader br = null;
     try {      
       TableTunnel tunnel = new TableTunnel(odps);
       tunnel.setEndpoint(TunnelEndpoint);
       TableTunnel.UploadSession uploadSession = null;
       if(partition != null) {
         PartitionSpec spec = new PartitionSpec(partition);
         System.out.println(spec.toString());
         uploadSession=tunnel.createUploadSession(project, table,spec);
         System.out.println("Session Status is : " + uploadSession.getStatus().toString());
       }
       else
       {
         uploadSession= tunnel.createUploadSession(project, table);
         //System.out.println("Session Status is : " + uploadSession.getStatus().toString());
       }        
       Long blockid = (long) 0;
       RecordWriter recordWriter = uploadSession.openRecordWriter(blockid, true);
       Record record = uploadSession.newRecord();
       TableSchema schema = uploadSession.getSchema();      
       RecordConverter converter = new RecordConverter(schema, "NULL", null, null);      
       br = new BufferedReader(new FileReader(file));
       Pattern pattern = Pattern.compile(fieldDelimeter);
       String line = null;
       while ((line = br.readLine()) != null) {
         String[] items=pattern.split(line,0);
         record = converter.parse(items);
         recordWriter.write(record);
       }       
       recordWriter.close();      
       Long[] blocks = {blockid};
       uploadSession.commit(blocks);   
     System.out.println("Upload succeed!");
     } catch (TunnelException e) {
       e.printStackTrace();
     } catch (IOException e) {
       e.printStackTrace();
     }finally {
       try {
         if (br != null)
           br.close();
       } catch (IOException ex) {
         ex.printStackTrace();
       }
     }
  } 

(6) 在 ODPS 中建表:

odpscmd -f E:\ODPS_DEMO\resources\02-DataTransfer\crt_tbl.sql

(7) 在 eclipse 中设置测试运行参数:

2020071113481976.png

将下列参数填入program parameter:


-f E:\ODPS_DEMO\resources\02-DataTransfer\uploadDataSet.csv -t t_tunnel_sdk -p ‘gender=“Male”’ -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini


20200711134839315.png


通过console查看输出信息:

去ODPS project里检查上传结果:

read t_tunnel_sdk;


20200711134903229.png


3.3 单线程下载文件


本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\DownloadSample.java


(1) 在已有的名称为 DataTransfer 的 Java project中的 DTSample 包下新增 Java 类:

(2) 编写方法 printUsage,提示调用该程序的输入参数;


(3) 编写方法 parseArgument,获取并解析输入参数;


(4) 编写类RecordConverter用来格式化数据,生成 Record 记录;


(5) 编写方法 main 方法,实现单线程数据下载:


//根据输入参数中的配置文件,配置阿里云账号
  Account account = new AliyunAccount(accessId, accessKey);
  Odps odps = new Odps(account);
  odps.setDefaultProject(project);
  odps.setEndpoint(OdpsEndpoint);
//基于上述云账号,创建服务入口类
  TableTunnel tunnel = new TableTunnel(odps);
  tunnel.setEndpoint(TunnelEndpoint);
//创建从上述odps服务通道中下载数据的会话,分为分区的表和非分区表两种:
  TableTunnel.DownloadSession session;
  if(partition != null) {
    PartitionSpec spec = new PartitionSpec(partition);
    session= tunnel.createDownloadSession(project, table, spec);
  }else{
    session= tunnel.createDownloadSession(project, table);
  }
//从odps表中读出记录,格式化后,写入到本地文件:
  RecordReader reader = session.openRecordReader(0L, session.getRecordCount(),
 true);
  TableSchema schema = session.getSchema();
  Record record;
  RecordConverter converter = new RecordConverter(schema, "NULL", null, null);
  String[] items = new String[schema.getColumns().size()];
  while ((record = reader.read()) != null) {
    items = converter.format(record);
    for(int i=0; i<items.length; ++i) {
      if(i>0) out.write(fieldDelimeter.getBytes());
      out.write(items[i].getBytes());
    }
    out.write(lineDelimiter.getBytes());
  }
  reader.close();
  out.close();

(6) 在 eclipse 中设置测试运行参数:

将下列参数填入 program parameter


-f E:\ODPS_DEMO\resources\02-DataTransfer\downloadMaleDataSet.csv -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini -t t_tunnel_sdk -p ‘gender=“Male”’

20200711135003353.png


去查看下载文件E:\ODPS_DEMO\resources\02-DataTransfer\downloadMale

DataSet.csv,并和ODPS表 t_tunnel_sdk 中的数据对比。


第 4 章:实验总结


4.1 实验总结


MaxCompute提供的数据传输功能,tunnel命令集方便我们上传本地数据到单表、分区表,并支持上传时自定义设计列分隔符、行分隔符、及数据容错等能力,数据量较大,还可以指定线程数据来加速数据的上传,实验详细介绍了日常工作中常用的功能。


另外,MaxCompute还提供了tunnel JAVA SDK 方便我们进行程序开发时使用数据传输功能

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
SQL 存储 分布式计算
ODPS技术架构深度剖析与实战指南——从零开始掌握阿里巴巴大数据处理平台的核心要义与应用技巧
【10月更文挑战第9天】ODPS是阿里巴巴推出的大数据处理平台,支持海量数据的存储与计算,适用于数据仓库、数据挖掘等场景。其核心组件涵盖数据存储、计算引擎、任务调度、资源管理和用户界面,确保数据处理的稳定、安全与高效。通过创建项目、上传数据、编写SQL或MapReduce程序,用户可轻松完成复杂的数据处理任务。示例展示了如何使用ODPS SQL查询每个用户的最早登录时间。
120 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
人工智能 分布式计算 DataWorks
连续四年!阿里云领跑中国公有云大数据平台
近日,国际数据公司(IDC)发布《中国大数据平台市场份额,2023:数智融合时代的真正到来》报告——2023年中国大数据平台公有云服务市场规模达72.2亿元人民币,其中阿里巴巴市场份额保持领先,占比达40.2%,连续四年排名第一。
239 12
|
3月前
|
人工智能 Cloud Native 数据管理
重磅升级,阿里云发布首个“Data+AI”驱动的一站式多模数据平台
阿里云发布首个AI多模数据管理平台DMS,助力业务决策提效10倍
394 17
|
3月前
|
SQL 人工智能 大数据
阿里云牵头起草!首个大数据批流融合国家标准发布
近日,国家市场监督管理总局、国家标准化管理委员会正式发布大数据领域首个批流融合国家标准GB/T 44216-2024《信息技术 大数据 批流融合计算技术要求》,该标准由阿里云牵头起草,并将于2025年2月1日起正式实施。
97 7
|
3月前
|
SQL 人工智能 大数据
首个大数据批流融合国家标准正式发布,阿里云为牵头起草单位!
近日,国家市场监督管理总局、国家标准化管理委员会正式发布大数据领域首个批流融合国家标准 GB/T 44216-2024《信息技术 大数据 批流融合计算技术要求》,该标准由阿里云牵头起草,并将于2025年2月1日起正式实施。
|
4月前
|
Oracle 关系型数据库 数据库
阿里云数据库 ACP 问题之阿里云数据库ACP认证与ACA认证有什么区别
阿里云数据库 ACP 问题之阿里云数据库ACP认证与ACA认证有什么区别
173 1
阿里云数据库 ACP 问题之阿里云数据库ACP认证与ACA认证有什么区别
|
3月前
|
存储 SQL 分布式计算
Java连接阿里云MaxCompute例
要使用Java连接阿里云MaxCompute数据库,首先需在项目中添加MaxCompute JDBC驱动依赖,推荐通过Maven管理。避免在代码中直接写入AccessKey,应使用环境变量或配置文件安全存储。示例代码展示了如何注册驱动、建立连接及执行SQL查询。建议使用RAM用户提升安全性,并根据需要配置时区和公网访问权限。具体步骤和注意事项请参考阿里云官方文档。
365 10
|
3月前
|
机器学习/深度学习 数据可视化 大数据
阿里云大数据的应用示例
阿里云大数据应用平台为企业提供高效数据处理与业务洞察工具,涵盖Quick BI、DataV及PAI等核心产品。DT203课程通过实践教学,帮助学员掌握数据可视化、报表设计及机器学习分析技能,提升数据驱动决策能力。Quick BI简化复杂数据分析,DataV打造震撼可视化大屏,PAI支持全面的数据挖掘与算法应用。课程面向CSP、ISV及数据工程师等专业人士,为期两天,结合面授与实验,助力企业加速数字化转型。完成课程后,学员将熟练使用阿里云工具进行数据处理与分析。[了解更多](https://edu.aliyun.com/training/DT203)
|
4月前
|
机器学习/深度学习 分布式计算 BI
MaxCompute 与阿里云其他服务的协同工作
【8月更文第31天】在当今的数据驱动时代,企业需要处理和分析海量数据以获得有价值的洞察。阿里云提供了一系列的服务来满足不同层次的需求,从数据存储到高级分析。MaxCompute(原名 ODPS)作为阿里云的大规模数据处理平台,提供了强大的计算能力和丰富的功能,可以与阿里云的其他服务无缝集成,形成完整的大数据解决方案。本文将探讨 MaxCompute 如何与其他阿里云服务协同工作,包括存储服务 OSS、数据分析服务 Quick BI 以及机器学习平台 PAI。
58 1

热门文章

最新文章