【实验】阿里云大数据助理工程师认证(ACA)- ACA认证配套实验-06-MaxCompute 数据传输(下)

本文涉及的产品
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
数据传输服务 DTS,数据同步 1个月
简介: 【实验】阿里云大数据助理工程师认证(ACA)- ACA认证配套实验-06-MaxCompute 数据传输(下)

2.5 tunnel upload 扫描文件


t_tunnel建表参考:

drop table if exists t_tunnel;
create table t_tunnel (id int, name string);

欲将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv上传至t_tunnel,可以在上传前先做个预扫描

tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel --scan=only;

2.6 tunnel upload 行、列分隔符


本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):


                   drop table if exists t_tunnel;
                   create table t_tunnel (id int, name string);

将文件E:\ODPS_DEMO\resources\02-DataTransfer\people_delimeter.csv文件上传到 t_tunnel 中去:


truncate table t_tunnel;
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\people_delimiter.csv t_tunnel -fd || -rd &;
read t_tunnel;

指定数据文件的列分隔符 –fd || , 行分隔符 –rd &


2.7 tunnel upload多线程


本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):

drop table if exists t_tunnel;
create table t_tunnel (id int, name string);

使用多个线程将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv文件上传到 t_tunnel 中去:


truncate table t_tunnel;
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel -threads 8;
read t_tunnel;
count t_tunnel;


当数据文件较大地,可以通过-threads N 指定N个线程同时进行装载,加快速度


2.8 tunnel download 非分区表


将表 t_tunnel 下载到本地文件 t_tunnel.csv


tunnel download t_tunnel t_tunnel.csv;


数据下载当前路径下,可以指定导出文件的具体路径,指定path\t_tunnel.csv 即可 path 根据自己的机器情况指定


2.9 tunnel download 分区表


将表 t_tunnel_p 下载到本地文件 t_tunnel_p.csv


tunnel download t_tunnel_p/gender='male' t_tunnel_p.csv;


第 3 章:tunnel JAVA SDK 定制开发数


3.1 安装配置eclipse开发环境


首先,解压下载的eclipse-java-luna-SR2-win32-x86_64.zip到E:\ODPS_DEMO,然后执行以下操作:


(1) 将E:\ODPS_DEMO\InstallMedia\odps-eclipse-plugin-bundle-0.16.0.jar 拷贝至目录E:\ODPS_DEMO\eclipse\plugins

20200711134452720.png


(2) 执行E:\ODPS_DEMO\eclipse\eclipse.exe,打开 eclipse,点击 New -> Other

(3) 配置 odps


20200711134525781.png

20200711134538594.png


(4) 指定 JavaProject Name:

20200711134555362.png


3.2 单线程上传文件


此部分建表语句参考:

drop table if exists t_tunnel_sdk;

create table t_tunnel_sdk (

id    int,
name string
)
partitioned by (gender string);

本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\UploadSample.java


(1) 新增 Java 类:

(2) 类名为UploadSample,包名为 DTSample

(3) 设计该类实现功能为将单个文件上传至ODPS的表中,需要的输入参数为:


-f <file_name>
-t <table_name>
-c <config_file>
[-p ]
[-fd <field_delimiter>]

编写方法 printUsage,提示调用语法:

private static void printUsage(String msg) {
    System.out.println(
      "Usage: UploadSample -f file \\\n"
        + "                  -t table\\\n"
        + "                  -c config_file \\\n"
        + "                  [-p partition] \\\n"
        + "                  [-fd field_delimiter] \\\n" 
        );
    if (msg != null) {
    System.out.println(msg);
    }
  }

编写获取、解析输入参数的方法 :

private static String accessId;
  private static String accessKey;  
  private static String OdpsEndpoint;
  private static String TunnelEndpoint;
  private static String project;
  private static String table;
  private static String partition;
  private static String fieldDelimeter;
  private static String file;
  private static void printUsage(String msg) {
    System.out.println(
      "Usage: UploadSample -f file \\\n"
        + "                  -t table\\\n"
        + "                  -c config_file \\\n"
        + "                  [-p partition] \\\n"
        + "                  [-fd field_delimiter] \\\n" 
        );
    if (msg != null) {
    System.out.println(msg);
    }
  }
  private static void parseArgument(String[] args) {
    for (int i = 0; i < args.length; i++) {
      if ("-f".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException("source file not specified in -f");
        }
        file = args[i];
      }
      else if ("-t".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException("ODPS table not specified in -t");
        }
        table = args[i];
      } 
      else if ("-c".equals(args[i])) {
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
              "ODPS configuration file not specified in -c");
        }
        try {
          InputStream is = new FileInputStream(args[i]);
          Properties props = new Properties();
          props.load(is);
          accessId = props.getProperty("access_id");
          accessKey = props.getProperty("access_key");
          project = props.getProperty("project_name");
          OdpsEndpoint = props.getProperty("end_point");
          TunnelEndpoint = props.getProperty("tunnel_endpoint");
        } catch (IOException e) {
          throw new IllegalArgumentException(
              "Error reading ODPS config file '" + args[i] + "'.");
        }
      }
      else if ("-p".equals(args[i])){
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
              "odps table partition not specified in -p");
        }
        partition = args[i];
      } 
      else if ("-fd".equals(args[i])){
        if (++i ==  args.length) {
          throw new IllegalArgumentException(
             "odps table partition not specified in -p");
        }
        fieldDelimeter = args[i];
      } 
    } 
    if(file == null) {
      throw new IllegalArgumentException(
          "Missing argument -f file");
    }
    if (table == null) {
      throw new IllegalArgumentException(
          "Missing argument -t table");
    }  
    if (accessId == null || accessKey == null || 
        project == null || OdpsEndpoint == null || TunnelEndpoint == null) {
      throw new IllegalArgumentException(
          "ODPS conf not set, please check -c odps.conf");
      }
  }

(4) 编写方法,从文件中读出记录,同时将这些记录格式化

 读出记录,逐列处理
  import java.text.DecimalFormat;
  import java.text.ParseException;
  import java.text.SimpleDateFormat;
  import java.util.Date;
  import java.util.TimeZone;
  import com.aliyun.odps.Column;
  import com.aliyun.odps.OdpsType;
  import com.aliyun.odps.TableSchema;
  import com.aliyun.odps.data.ArrayRecord;
  import com.aliyun.odps.data.Record;
class RecordConverter {
  private TableSchema schema;
  private String nullTag;
  private SimpleDateFormat dateFormater;
  private DecimalFormat doubleFormat;
  private String DEFAULT_DATE_FORMAT_PATTERN = "yyyyMMddHHmmss";
  public RecordConverter(TableSchema schema, String nullTag, String dateFormat,
      String tz) {
    this.schema = schema;
    this.nullTag = nullTag;
    if (dateFormat == null) {
      this.dateFormater = new SimpleDateFormat(DEFAULT_DATE_FORMAT_PATTERN);
    } else {
      dateFormater = new SimpleDateFormat(dateFormat);
    }
    dateFormater.setLenient(false);
    dateFormater.setTimeZone(TimeZone.getTimeZone(tz == null ? "GMT" : tz));
    doubleFormat = new DecimalFormat();
    doubleFormat.setMinimumFractionDigits(0);
    doubleFormat.setMaximumFractionDigits(20);
  }
  /**
   * record to String array
   * */
  public String[] format(Record r) {
    int cols = schema.getColumns().size();
    String[] line = new String[cols];
    String colValue = null;
    for (int i = 0; i < cols; i++) {
      Column column = schema.getColumn(i);
      OdpsType t = column.getType();
      switch (t) {
      case BIGINT: {
        Long v = r.getBigint(i);
        colValue = v == null ? null : v.toString();
        break;
      }
      case DOUBLE: {
        Double v = r.getDouble(i);
        if (v == null) {
          colValue = null;
        } else {
          colValue = doubleFormat.format(v).replaceAll(",", "");
        }
        break;
      }
      case DATETIME: {
        Date v = r.getDatetime(i);
        if (v == null) {
          colValue = null;
        } else {
          colValue = dateFormater.format(v);
        }
        break;
      }
      case BOOLEAN: {
        Boolean v = r.getBoolean(i);
        colValue = v == null ? null : v.toString();
        break;
      }
      case STRING: {
        String v = r.getString(i);
        colValue = (v == null ? null : v.toString());
        break;
      }
      default:
        throw new RuntimeException("Unknown column type: " + t);
      }
      if (colValue == null) {
        line[i] = nullTag;
      } else {
        line[i] = colValue;
      }
    }
    return line;
  }
  /**
   * String array to record
   * @throws ParseException 
   * */
  public Record parse(String[] line){
    if (line == null) {
      return null;
    }
    int columnCnt = schema.getColumns().size();
    Column[] cols = new Column[columnCnt];
    for (int i = 0; i < columnCnt; ++i) {
      Column c = new Column(schema.getColumn(i).getName(), 
          schema.getColumn(i).getType());          
      cols[i] = c;
    }
    ArrayRecord r = new ArrayRecord(cols);
    int i = 0;
    for (String v : line) {
      if (v.equals(nullTag)) {
        i++;
        continue;
      }
      if (i >= columnCnt) {
        break;
      }
      OdpsType type = schema.getColumn(i).getType();
      switch (type) {
      case BIGINT:
        r.setBigint(i, Long.valueOf(v));
        break;
      case DOUBLE:
        r.setDouble(i, Double.valueOf(v));
        break;
      case DATETIME:
        try {
          r.setDatetime(i, dateFormater.parse(v));
        } catch (ParseException e) {
          throw new RuntimeException(e.getMessage());
        }
        break;
      case BOOLEAN:
        v = v.trim().toLowerCase();
        if (v.equals("true") || v.equals("false")) {
          r.setBoolean(i, v.equals("true") ? true : false);
        } else if (v.equals("0") || v.equals("1")) {
          r.setBoolean(i, v.equals("1") ? true : false);
        } else {
          throw new RuntimeException(
              "Invalid boolean type, expect: true|false|0|1");
        }
        break;
      case STRING:
        r.setString(i, v);
        break;
      default:
        throw new RuntimeException("Unknown column type");
      }
      i++;
    }
    return r;
  }
}

(5) 编写主方法,读取文件,上传到odps表中去:

public static void main(String args[]) {
    try {
       parseArgument(args);
     } catch (IllegalArgumentException e) {
       printUsage(e.getMessage());
       System.exit(2);
     }    
     Account account = new AliyunAccount(accessId, accessKey);
     Odps odps = new Odps(account);
     odps.setDefaultProject(project);
     odps.setEndpoint(OdpsEndpoint);  
     BufferedReader br = null;
     try {      
       TableTunnel tunnel = new TableTunnel(odps);
       tunnel.setEndpoint(TunnelEndpoint);
       TableTunnel.UploadSession uploadSession = null;
       if(partition != null) {
         PartitionSpec spec = new PartitionSpec(partition);
         System.out.println(spec.toString());
         uploadSession=tunnel.createUploadSession(project, table,spec);
         System.out.println("Session Status is : " + uploadSession.getStatus().toString());
       }
       else
       {
         uploadSession= tunnel.createUploadSession(project, table);
         //System.out.println("Session Status is : " + uploadSession.getStatus().toString());
       }        
       Long blockid = (long) 0;
       RecordWriter recordWriter = uploadSession.openRecordWriter(blockid, true);
       Record record = uploadSession.newRecord();
       TableSchema schema = uploadSession.getSchema();      
       RecordConverter converter = new RecordConverter(schema, "NULL", null, null);      
       br = new BufferedReader(new FileReader(file));
       Pattern pattern = Pattern.compile(fieldDelimeter);
       String line = null;
       while ((line = br.readLine()) != null) {
         String[] items=pattern.split(line,0);
         record = converter.parse(items);
         recordWriter.write(record);
       }       
       recordWriter.close();      
       Long[] blocks = {blockid};
       uploadSession.commit(blocks);   
     System.out.println("Upload succeed!");
     } catch (TunnelException e) {
       e.printStackTrace();
     } catch (IOException e) {
       e.printStackTrace();
     }finally {
       try {
         if (br != null)
           br.close();
       } catch (IOException ex) {
         ex.printStackTrace();
       }
     }
  } 

(6) 在 ODPS 中建表:

odpscmd -f E:\ODPS_DEMO\resources\02-DataTransfer\crt_tbl.sql

(7) 在 eclipse 中设置测试运行参数:

2020071113481976.png

将下列参数填入program parameter:


-f E:\ODPS_DEMO\resources\02-DataTransfer\uploadDataSet.csv -t t_tunnel_sdk -p ‘gender=“Male”’ -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini


20200711134839315.png


通过console查看输出信息:

去ODPS project里检查上传结果:

read t_tunnel_sdk;


20200711134903229.png


3.3 单线程下载文件


本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\DownloadSample.java


(1) 在已有的名称为 DataTransfer 的 Java project中的 DTSample 包下新增 Java 类:

(2) 编写方法 printUsage,提示调用该程序的输入参数;


(3) 编写方法 parseArgument,获取并解析输入参数;


(4) 编写类RecordConverter用来格式化数据,生成 Record 记录;


(5) 编写方法 main 方法,实现单线程数据下载:


//根据输入参数中的配置文件,配置阿里云账号
  Account account = new AliyunAccount(accessId, accessKey);
  Odps odps = new Odps(account);
  odps.setDefaultProject(project);
  odps.setEndpoint(OdpsEndpoint);
//基于上述云账号,创建服务入口类
  TableTunnel tunnel = new TableTunnel(odps);
  tunnel.setEndpoint(TunnelEndpoint);
//创建从上述odps服务通道中下载数据的会话,分为分区的表和非分区表两种:
  TableTunnel.DownloadSession session;
  if(partition != null) {
    PartitionSpec spec = new PartitionSpec(partition);
    session= tunnel.createDownloadSession(project, table, spec);
  }else{
    session= tunnel.createDownloadSession(project, table);
  }
//从odps表中读出记录,格式化后,写入到本地文件:
  RecordReader reader = session.openRecordReader(0L, session.getRecordCount(),
 true);
  TableSchema schema = session.getSchema();
  Record record;
  RecordConverter converter = new RecordConverter(schema, "NULL", null, null);
  String[] items = new String[schema.getColumns().size()];
  while ((record = reader.read()) != null) {
    items = converter.format(record);
    for(int i=0; i<items.length; ++i) {
      if(i>0) out.write(fieldDelimeter.getBytes());
      out.write(items[i].getBytes());
    }
    out.write(lineDelimiter.getBytes());
  }
  reader.close();
  out.close();

(6) 在 eclipse 中设置测试运行参数:

将下列参数填入 program parameter


-f E:\ODPS_DEMO\resources\02-DataTransfer\downloadMaleDataSet.csv -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini -t t_tunnel_sdk -p ‘gender=“Male”’

20200711135003353.png


去查看下载文件E:\ODPS_DEMO\resources\02-DataTransfer\downloadMale

DataSet.csv,并和ODPS表 t_tunnel_sdk 中的数据对比。


第 4 章:实验总结


4.1 实验总结


MaxCompute提供的数据传输功能,tunnel命令集方便我们上传本地数据到单表、分区表,并支持上传时自定义设计列分隔符、行分隔符、及数据容错等能力,数据量较大,还可以指定线程数据来加速数据的上传,实验详细介绍了日常工作中常用的功能。


另外,MaxCompute还提供了tunnel JAVA SDK 方便我们进行程序开发时使用数据传输功能

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
6月前
|
大数据 Linux 网络安全
大数据开发工程师基本功修炼之史上最全Linux学习笔记(建议)
大数据开发工程师基本功修炼之史上最全Linux学习笔记(建议)
183 0
|
4月前
|
分布式计算 安全 大数据
HAS插件式Kerberos认证框架:构建安全可靠的大数据生态系统
在教育和科研领域,研究人员需要共享大量数据以促进合作。HAS框架可以提供一个安全的数据共享平台,确保数据的安全性和合规性。
|
3月前
|
SQL 开发框架 大数据
【数据挖掘】顺丰科技2022年秋招大数据挖掘与分析工程师笔试题
顺丰科技2022年秋招大数据挖掘与分析工程师笔试题解析,涵盖了多领域选择题和编程题,包括动态规划、数据库封锁协议、概率论、SQL、排序算法等知识点。
84 0
|
6月前
|
分布式计算 监控 大数据
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
106 1
《吊打面试官》- 大数据工程师50道中大厂面试真题保姆级详解
|
6月前
|
监控 关系型数据库 MySQL
数据传输DTS腾讯云上的mysql同步到阿里云上的mysql可以操作吗?
数据传输DTS腾讯云上的mysql同步到阿里云上的mysql可以操作吗?
343 0
|
6月前
|
SQL 分布式计算 算法
程序员必备的面试技巧——大数据工程师面试必备技能
程序员必备的面试技巧——大数据工程师面试必备技能
114 0
|
6月前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用合集之数据传输完成后发现了脏数据字段如何解决
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
NoSQL 关系型数据库 数据库
数据传输服务DTS(Data Transmission Service)是阿里云提供的实时数据流服务
【2月更文挑战第29天】数据传输服务DTS(Data Transmission Service)是阿里云提供的实时数据流服务
69 5
|
6月前
|
关系型数据库 MySQL 数据库
使用阿里云的数据传输服务DTS(Data Transmission Service)进行MySQL 5.6到MySQL 8.0的迁移
【2月更文挑战第29天】使用阿里云的数据传输服务DTS(Data Transmission Service)进行MySQL 5.6到MySQL 8.0的迁移
325 2
|
6月前
|
存储 弹性计算 监控
【数据传输服务用户测评】阿里云DTS和MongoShake的性能对比
本文聚焦DTS MongoDB->MongoDB 和 MongoShake 数据同步的性能,分别针对副本集/分片集群架构、单表/多表、全量/增量同步进行性能的对比。
86255 9

热门文章

最新文章