RequestId=201607081345229bb89a0abce6ddd4, ErrorCode=MalformedDataStream, ErrorMessage=The data stream you provided was not well-formed or did not validate against schema.
at com.aliyun.odps.datahub.DatahubWriter.write(DatahubWriter.java:158)
at com.aliyun.odps.datahub.DatahubWriter.write(DatahubWriter.java:109)
at com.moloong.web.collector.ODPSDataCollection_bank$1.run(ODPSDataCollection_bank.java:300)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我往odps写数据的时候报错,搜了一下报错是因为多线程要新建一个DatahubWriter,我已经新建了还是未能解决,代码如下
for (String key : packMap.keySet()) {
String partSpec = "pt='" + key + "'";
synchronized (partSpeclock) {
if (!partitionSet.contains(odpsTableName + partSpec)) {
ODPSUtil.preparePartition(odpsTableName, partSpec);
partitionSet.add(odpsTableName + partSpec);
}
}
Thread.sleep(2);
DatahubWriter newWriter = client.openDatahubWriter();
String packId = newWriter.write(new PartitionSpec(partSpec), packMap.get(key)).getPackId();
}
麻烦大神们给点意见- -!
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。