开发者社区> 问答> 正文

Spark并发从Maxcompute获取数据,出现数据类型转换异常

使用df = spark.sql(" select * from table_name limit 100 ")能正常获取数据,但是使用spark.read.jdbc(url=url, table="table_name", predicates=predicates, properties=properties)分区并发从maxcompute获取出现类型转换异常。 Python代码: predicates = [] date_list = {"2020-05-03 13:13:00": "2020-05-03 13:17:00", "2020-05-03 15:28:00": "2020-05-03 15:32:00", "2020-05-03 15:43:00": "2020-05-03 15:47:00"}

for start_date, end_date in date_list.items():
    predicates.append(" dt >= '" + start_date + "' and dt < '" + end_date + "'")

properties = {"access_id": "*************",
              "access_key": "******************",
              "project_name": "************"}

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL data source example") \
    .getOrCreate()

url = "jdbc:odps:http://service.odps.aliyun.com/api"

df = spark.read.jdbc(url=url, table="ods_cp_analogval", predicates=predicates, properties=properties)

print(df.count())  # 能正常获取数据
print(df.take(10)) # 出现异常

异常:

"D:\Program Files (x86)\Python378\python.exe" D:\IdeaProjects\MySpark-Python\yakyang\spark\aliyun\odps_spark_parallel.py Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 20/07/14 06:46:33 WARN CupidConf: load odps default configs failed. 20/07/14 06:46:33 WARN CupidConf: load odps default configs failed. 3 [Stage 0:> (0 + 3) / 3]20/07/14 06:46:57 WARN OdpsConnection: open read record, start=0, cnt=9759 20/07/14 06:46:57 WARN OdpsConnection: open read record, start=0, cnt=9906 [Stage 0:=======================================> (2 + 1) / 3]20/07/14 06:46:57 WARN OdpsConnection: open read record, start=0, cnt=9762 29427 [Stage 2:> (0 + 1) / 1]20/07/14 06:47:03 WARN OdpsConnection: open read record, start=0, cnt=9759 20/07/14 06:47:03 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4) java.sql.SQLException: Error happened when transforming [B@4ed7a22e into java.sql.Timestamp at com.aliyun.odps.jdbc.utils.transformer.to.jdbc.ToJdbcTimestampTransformer.transform(ToJdbcTimestampTransformer.java:79) at com.aliyun.odps.jdbc.OdpsResultSet.transformToJdbcType(OdpsResultSet.java:656) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:535) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:523) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:439) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:438) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:347) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/07/14 06:47:03 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4, localhost, executor driver): java.sql.SQLException: Error happened when transforming [B@4ed7a22e into java.sql.Timestamp at com.aliyun.odps.jdbc.utils.transformer.to.jdbc.ToJdbcTimestampTransformer.transform(ToJdbcTimestampTransformer.java:79) at com.aliyun.odps.jdbc.OdpsResultSet.transformToJdbcType(OdpsResultSet.java:656) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:535) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:523) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:439) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:438) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:347) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

20/07/14 06:47:04 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job Traceback (most recent call last): File "D:\IdeaProjects\MySpark-Python\yakyang\spark\aliyun\odps_spark_parallel.py", line 35, in print(df.take(10)) File "D:\Aliyun\spark-2.3.0-odps0.32.2\python\pyspark\sql\dataframe.py", line 504, in take return self.limit(num).collect() File "D:\Aliyun\spark-2.3.0-odps0.32.2\python\pyspark\sql\dataframe.py", line 466, in collect port = self._jdf.collectToPython() File "D:\Program Files (x86)\Python378\lib\site-packages\py4j\java_gateway.py", line 1305, in call answer, self.gateway_client, self.target_id, self.name) File "D:\Aliyun\spark-2.3.0-odps0.32.2\python\pyspark\sql\utils.py", line 63, in deco return f(*a, **kw) File "D:\Program Files (x86)\Python378\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 4, localhost, executor driver): java.sql.SQLException: Error happened when transforming [B@4ed7a22e into java.sql.Timestamp at com.aliyun.odps.jdbc.utils.transformer.to.jdbc.ToJdbcTimestampTransformer.transform(ToJdbcTimestampTransformer.java:79) at com.aliyun.odps.jdbc.OdpsResultSet.transformToJdbcType(OdpsResultSet.java:656) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:535) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:523) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:439) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:438) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:347) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:368) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:3195) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:3225) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.sql.SQLException: Error happened when transforming [B@4ed7a22e into java.sql.Timestamp at com.aliyun.odps.jdbc.utils.transformer.to.jdbc.ToJdbcTimestampTransformer.transform(ToJdbcTimestampTransformer.java:79) at com.aliyun.odps.jdbc.OdpsResultSet.transformToJdbcType(OdpsResultSet.java:656) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:535) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:523) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:439) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:438) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:347) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

Process finished with exit code 1

展开
收起
1259988323058484 2020-07-14 08:32:50 5947 0
1 条回答
写回答
取消 提交回答
  • MaxCompute 是面向分析的企业级 SaaS 模式云数据仓库,以 Serverless 架构提供快速、全托管的在线数据仓库服务,消除了传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您可以经济并高效的分析处理海量数据。

    您好,这一列应该是String,但是Spark用getTimestamp获取数据,可以把数据改成getTimestamp试试。 如有更多疑问咨询可以加入MaxCompute开发者社区钉群点击链接

    2020-07-24 17:22:08
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载