开发者社区> 问答> 正文

reds上传数据到odps上报错ErrorCode=MalformedDataStream

RequestId=201607081345229bb89a0abce6ddd4, ErrorCode=MalformedDataStream, ErrorMessage=The data stream you provided was not well-formed or did not validate against schema.

at com.aliyun.odps.datahub.DatahubWriter.write(DatahubWriter.java:158)
at com.aliyun.odps.datahub.DatahubWriter.write(DatahubWriter.java:109)
at com.moloong.web.collector.ODPSDataCollection_bank$1.run(ODPSDataCollection_bank.java:300)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

我往odps写数据的时候报错,搜了一下报错是因为多线程要新建一个DatahubWriter,我已经新建了还是未能解决,代码如下
for (String key : packMap.keySet()) {

                                            String partSpec = "pt='" + key + "'";
                                            synchronized (partSpeclock) {
                                                if (!partitionSet.contains(odpsTableName + partSpec)) {
                                                    ODPSUtil.preparePartition(odpsTableName, partSpec);
                                                    partitionSet.add(odpsTableName + partSpec);
                                                }
                                            }
                                            Thread.sleep(2);
                                            DatahubWriter newWriter = client.openDatahubWriter();
                                            String packId = newWriter.write(new PartitionSpec(partSpec), packMap.get(key)).getPackId();
                                        }

麻烦大神们给点意见- -!

展开
收起
乌龟男 2016-07-08 14:20:58 3489 0
2 条回答
写回答
取消 提交回答
  • writer不是线程安全的,需要一个线程起一个。

    2019-07-17 19:53:20
    赞同 展开评论 打赏
  • MaxCompute 生态

    看起来是上传的数据和表 schema 不匹配?

    2019-07-17 19:53:20
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Data+AI时代大数据平台应该如何建设 立即下载
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载