2.5 tunnel upload 扫描文件
t_tunnel建表参考:
drop table if exists t_tunnel; create table t_tunnel (id int, name string);
欲将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv上传至t_tunnel,可以在上传前先做个预扫描
tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel --scan=only;
2.6 tunnel upload 行、列分隔符
本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):
drop table if exists t_tunnel; create table t_tunnel (id int, name string);
将文件E:\ODPS_DEMO\resources\02-DataTransfer\people_delimeter.csv文件上传到 t_tunnel 中去:
truncate table t_tunnel; tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\people_delimiter.csv t_tunnel -fd || -rd &; read t_tunnel;
指定数据文件的列分隔符 –fd || , 行分隔符 –rd &
2.7 tunnel upload多线程
本节实验首先保证表t_tunnel存在,若不存在,请参考下面语句处理(或直接重新创建表t_tunnel):
drop table if exists t_tunnel; create table t_tunnel (id int, name string);
使用多个线程将文件E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv文件上传到 t_tunnel 中去:
truncate table t_tunnel; tunnel upload E:\ODPS_DEMO\resources\02-DataTransfer\crawd.csv t_tunnel -threads 8; read t_tunnel; count t_tunnel;
当数据文件较大地,可以通过-threads N 指定N个线程同时进行装载,加快速度
2.8 tunnel download 非分区表
将表 t_tunnel 下载到本地文件 t_tunnel.csv
tunnel download t_tunnel t_tunnel.csv;
数据下载当前路径下,可以指定导出文件的具体路径,指定path\t_tunnel.csv 即可 path 根据自己的机器情况指定
2.9 tunnel download 分区表
将表 t_tunnel_p 下载到本地文件 t_tunnel_p.csv
tunnel download t_tunnel_p/gender='male' t_tunnel_p.csv;
第 3 章:tunnel JAVA SDK 定制开发数
3.1 安装配置eclipse开发环境
首先,解压下载的eclipse-java-luna-SR2-win32-x86_64.zip到E:\ODPS_DEMO,然后执行以下操作:
(1) 将E:\ODPS_DEMO\InstallMedia\odps-eclipse-plugin-bundle-0.16.0.jar 拷贝至目录E:\ODPS_DEMO\eclipse\plugins
(2) 执行E:\ODPS_DEMO\eclipse\eclipse.exe,打开 eclipse,点击 New -> Other
(3) 配置 odps
(4) 指定 JavaProject Name:
3.2 单线程上传文件
此部分建表语句参考:
drop table if exists t_tunnel_sdk;
create table t_tunnel_sdk (
id int, name string ) partitioned by (gender string);
本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\UploadSample.java
(1) 新增 Java 类:
(2) 类名为UploadSample,包名为 DTSample
(3) 设计该类实现功能为将单个文件上传至ODPS的表中,需要的输入参数为:
-f <file_name> -t <table_name> -c <config_file> [-p ] [-fd <field_delimiter>]
编写方法 printUsage,提示调用语法:
private static void printUsage(String msg) { System.out.println( "Usage: UploadSample -f file \\\n" + " -t table\\\n" + " -c config_file \\\n" + " [-p partition] \\\n" + " [-fd field_delimiter] \\\n" ); if (msg != null) { System.out.println(msg); } }
编写获取、解析输入参数的方法 :
private static String accessId; private static String accessKey; private static String OdpsEndpoint; private static String TunnelEndpoint; private static String project; private static String table; private static String partition; private static String fieldDelimeter; private static String file; private static void printUsage(String msg) { System.out.println( "Usage: UploadSample -f file \\\n" + " -t table\\\n" + " -c config_file \\\n" + " [-p partition] \\\n" + " [-fd field_delimiter] \\\n" ); if (msg != null) { System.out.println(msg); } } private static void parseArgument(String[] args) { for (int i = 0; i < args.length; i++) { if ("-f".equals(args[i])) { if (++i == args.length) { throw new IllegalArgumentException("source file not specified in -f"); } file = args[i]; } else if ("-t".equals(args[i])) { if (++i == args.length) { throw new IllegalArgumentException("ODPS table not specified in -t"); } table = args[i]; } else if ("-c".equals(args[i])) { if (++i == args.length) { throw new IllegalArgumentException( "ODPS configuration file not specified in -c"); } try { InputStream is = new FileInputStream(args[i]); Properties props = new Properties(); props.load(is); accessId = props.getProperty("access_id"); accessKey = props.getProperty("access_key"); project = props.getProperty("project_name"); OdpsEndpoint = props.getProperty("end_point"); TunnelEndpoint = props.getProperty("tunnel_endpoint"); } catch (IOException e) { throw new IllegalArgumentException( "Error reading ODPS config file '" + args[i] + "'."); } } else if ("-p".equals(args[i])){ if (++i == args.length) { throw new IllegalArgumentException( "odps table partition not specified in -p"); } partition = args[i]; } else if ("-fd".equals(args[i])){ if (++i == args.length) { throw new IllegalArgumentException( "odps table partition not specified in -p"); } fieldDelimeter = args[i]; } } if(file == null) { throw new IllegalArgumentException( "Missing argument -f file"); } if (table == null) { throw new IllegalArgumentException( "Missing argument -t table"); } if (accessId == null || accessKey == null || project == null || OdpsEndpoint == null || TunnelEndpoint == null) { throw new IllegalArgumentException( "ODPS conf not set, please check -c odps.conf"); } }
(4) 编写方法,从文件中读出记录,同时将这些记录格式化
读出记录,逐列处理 import java.text.DecimalFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; import com.aliyun.odps.TableSchema; import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.Record; class RecordConverter { private TableSchema schema; private String nullTag; private SimpleDateFormat dateFormater; private DecimalFormat doubleFormat; private String DEFAULT_DATE_FORMAT_PATTERN = "yyyyMMddHHmmss"; public RecordConverter(TableSchema schema, String nullTag, String dateFormat, String tz) { this.schema = schema; this.nullTag = nullTag; if (dateFormat == null) { this.dateFormater = new SimpleDateFormat(DEFAULT_DATE_FORMAT_PATTERN); } else { dateFormater = new SimpleDateFormat(dateFormat); } dateFormater.setLenient(false); dateFormater.setTimeZone(TimeZone.getTimeZone(tz == null ? "GMT" : tz)); doubleFormat = new DecimalFormat(); doubleFormat.setMinimumFractionDigits(0); doubleFormat.setMaximumFractionDigits(20); } /** * record to String array * */ public String[] format(Record r) { int cols = schema.getColumns().size(); String[] line = new String[cols]; String colValue = null; for (int i = 0; i < cols; i++) { Column column = schema.getColumn(i); OdpsType t = column.getType(); switch (t) { case BIGINT: { Long v = r.getBigint(i); colValue = v == null ? null : v.toString(); break; } case DOUBLE: { Double v = r.getDouble(i); if (v == null) { colValue = null; } else { colValue = doubleFormat.format(v).replaceAll(",", ""); } break; } case DATETIME: { Date v = r.getDatetime(i); if (v == null) { colValue = null; } else { colValue = dateFormater.format(v); } break; } case BOOLEAN: { Boolean v = r.getBoolean(i); colValue = v == null ? null : v.toString(); break; } case STRING: { String v = r.getString(i); colValue = (v == null ? null : v.toString()); break; } default: throw new RuntimeException("Unknown column type: " + t); } if (colValue == null) { line[i] = nullTag; } else { line[i] = colValue; } } return line; } /** * String array to record * @throws ParseException * */ public Record parse(String[] line){ if (line == null) { return null; } int columnCnt = schema.getColumns().size(); Column[] cols = new Column[columnCnt]; for (int i = 0; i < columnCnt; ++i) { Column c = new Column(schema.getColumn(i).getName(), schema.getColumn(i).getType()); cols[i] = c; } ArrayRecord r = new ArrayRecord(cols); int i = 0; for (String v : line) { if (v.equals(nullTag)) { i++; continue; } if (i >= columnCnt) { break; } OdpsType type = schema.getColumn(i).getType(); switch (type) { case BIGINT: r.setBigint(i, Long.valueOf(v)); break; case DOUBLE: r.setDouble(i, Double.valueOf(v)); break; case DATETIME: try { r.setDatetime(i, dateFormater.parse(v)); } catch (ParseException e) { throw new RuntimeException(e.getMessage()); } break; case BOOLEAN: v = v.trim().toLowerCase(); if (v.equals("true") || v.equals("false")) { r.setBoolean(i, v.equals("true") ? true : false); } else if (v.equals("0") || v.equals("1")) { r.setBoolean(i, v.equals("1") ? true : false); } else { throw new RuntimeException( "Invalid boolean type, expect: true|false|0|1"); } break; case STRING: r.setString(i, v); break; default: throw new RuntimeException("Unknown column type"); } i++; } return r; } }
(5) 编写主方法,读取文件,上传到odps表中去:
public static void main(String args[]) { try { parseArgument(args); } catch (IllegalArgumentException e) { printUsage(e.getMessage()); System.exit(2); } Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setDefaultProject(project); odps.setEndpoint(OdpsEndpoint); BufferedReader br = null; try { TableTunnel tunnel = new TableTunnel(odps); tunnel.setEndpoint(TunnelEndpoint); TableTunnel.UploadSession uploadSession = null; if(partition != null) { PartitionSpec spec = new PartitionSpec(partition); System.out.println(spec.toString()); uploadSession=tunnel.createUploadSession(project, table,spec); System.out.println("Session Status is : " + uploadSession.getStatus().toString()); } else { uploadSession= tunnel.createUploadSession(project, table); //System.out.println("Session Status is : " + uploadSession.getStatus().toString()); } Long blockid = (long) 0; RecordWriter recordWriter = uploadSession.openRecordWriter(blockid, true); Record record = uploadSession.newRecord(); TableSchema schema = uploadSession.getSchema(); RecordConverter converter = new RecordConverter(schema, "NULL", null, null); br = new BufferedReader(new FileReader(file)); Pattern pattern = Pattern.compile(fieldDelimeter); String line = null; while ((line = br.readLine()) != null) { String[] items=pattern.split(line,0); record = converter.parse(items); recordWriter.write(record); } recordWriter.close(); Long[] blocks = {blockid}; uploadSession.commit(blocks); System.out.println("Upload succeed!"); } catch (TunnelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { try { if (br != null) br.close(); } catch (IOException ex) { ex.printStackTrace(); } } }
(6) 在 ODPS 中建表:
odpscmd -f E:\ODPS_DEMO\resources\02-DataTransfer\crt_tbl.sql
(7) 在 eclipse 中设置测试运行参数:
将下列参数填入program parameter:
-f E:\ODPS_DEMO\resources\02-DataTransfer\uploadDataSet.csv -t t_tunnel_sdk -p ‘gender=“Male”’ -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini
通过console查看输出信息:
去ODPS project里检查上传结果:
read t_tunnel_sdk;
3.3 单线程下载文件
本实验可参考脚本:E:\ODPS_DEMO\resources\02-DataTransfer\DownloadSample.java
(1) 在已有的名称为 DataTransfer 的 Java project中的 DTSample 包下新增 Java 类:
(2) 编写方法 printUsage,提示调用该程序的输入参数;
(3) 编写方法 parseArgument,获取并解析输入参数;
(4) 编写类RecordConverter用来格式化数据,生成 Record 记录;
(5) 编写方法 main 方法,实现单线程数据下载:
//根据输入参数中的配置文件,配置阿里云账号 Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setDefaultProject(project); odps.setEndpoint(OdpsEndpoint); //基于上述云账号,创建服务入口类 TableTunnel tunnel = new TableTunnel(odps); tunnel.setEndpoint(TunnelEndpoint); //创建从上述odps服务通道中下载数据的会话,分为分区的表和非分区表两种: TableTunnel.DownloadSession session; if(partition != null) { PartitionSpec spec = new PartitionSpec(partition); session= tunnel.createDownloadSession(project, table, spec); }else{ session= tunnel.createDownloadSession(project, table); } //从odps表中读出记录,格式化后,写入到本地文件: RecordReader reader = session.openRecordReader(0L, session.getRecordCount(), true); TableSchema schema = session.getSchema(); Record record; RecordConverter converter = new RecordConverter(schema, "NULL", null, null); String[] items = new String[schema.getColumns().size()]; while ((record = reader.read()) != null) { items = converter.format(record); for(int i=0; i<items.length; ++i) { if(i>0) out.write(fieldDelimeter.getBytes()); out.write(items[i].getBytes()); } out.write(lineDelimiter.getBytes()); } reader.close(); out.close();
(6) 在 eclipse 中设置测试运行参数:
将下列参数填入 program parameter
-f E:\ODPS_DEMO\resources\02-DataTransfer\downloadMaleDataSet.csv -fd , -c E:\ODPS_DEMO\odpscmd_public\conf\odps_config.ini -t t_tunnel_sdk -p ‘gender=“Male”’
去查看下载文件E:\ODPS_DEMO\resources\02-DataTransfer\downloadMale
DataSet.csv,并和ODPS表 t_tunnel_sdk 中的数据对比。
第 4 章:实验总结
4.1 实验总结
MaxCompute提供的数据传输功能,tunnel命令集方便我们上传本地数据到单表、分区表,并支持上传时自定义设计列分隔符、行分隔符、及数据容错等能力,数据量较大,还可以指定线程数据来加速数据的上传,实验详细介绍了日常工作中常用的功能。
另外,MaxCompute还提供了tunnel JAVA SDK 方便我们进行程序开发时使用数据传输功能