使用df = spark.sql(" select * from table_name limit 100 ")能正常获取数据,但是使用spark.read.jdbc(url=url, table="table_name", predicates=predicates, properties=properties)分区并发从maxcompute获取出现类型转换异常。 Python代码: predicates = [] date_list = {"2020-05-03 13:13:00": "2020-05-03 13:17:00", "2020-05-03 15:28:00": "2020-05-03 15:32:00", "2020-05-03 15:43:00": "2020-05-03 15:47:00"}
for start_date, end_date in date_list.items():
predicates.append(" dt >= '" + start_date + "' and dt < '" + end_date + "'")
properties = {"access_id": "*************",
"access_key": "******************",
"project_name": "************"}
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
url = "jdbc:odps:http://service.odps.aliyun.com/api"
df = spark.read.jdbc(url=url, table="ods_cp_analogval", predicates=predicates, properties=properties)
print(df.count()) # 能正常获取数据
print(df.take(10)) # 出现异常
异常:
"D:\Program Files (x86)\Python378\python.exe" D:\IdeaProjects\MySpark-Python\yakyang\spark\aliyun\odps_spark_parallel.py Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 20/07/14 06:46:33 WARN CupidConf: load odps default configs failed. 20/07/14 06:46:33 WARN CupidConf: load odps default configs failed. 3 [Stage 0:> (0 + 3) / 3]20/07/14 06:46:57 WARN OdpsConnection: open read record, start=0, cnt=9759 20/07/14 06:46:57 WARN OdpsConnection: open read record, start=0, cnt=9906 [Stage 0:=======================================> (2 + 1) / 3]20/07/14 06:46:57 WARN OdpsConnection: open read record, start=0, cnt=9762 29427 [Stage 2:> (0 + 1) / 1]20/07/14 06:47:03 WARN OdpsConnection: open read record, start=0, cnt=9759 20/07/14 06:47:03 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4) java.sql.SQLException: Error happened when transforming [B@4ed7a22e into java.sql.Timestamp at com.aliyun.odps.jdbc.utils.transformer.to.jdbc.ToJdbcTimestampTransformer.transform(ToJdbcTimestampTransformer.java:79) at com.aliyun.odps.jdbc.OdpsResultSet.transformToJdbcType(OdpsResultSet.java:656) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:535) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:523) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:439) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:438) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:347) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/07/14 06:47:03 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4, localhost, executor driver): java.sql.SQLException: Error happened when transforming [B@4ed7a22e into java.sql.Timestamp at com.aliyun.odps.jdbc.utils.transformer.to.jdbc.ToJdbcTimestampTransformer.transform(ToJdbcTimestampTransformer.java:79) at com.aliyun.odps.jdbc.OdpsResultSet.transformToJdbcType(OdpsResultSet.java:656) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:535) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:523) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:439) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:438) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:347) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
20/07/14 06:47:04 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job Traceback (most recent call last): File "D:\IdeaProjects\MySpark-Python\yakyang\spark\aliyun\odps_spark_parallel.py", line 35, in print(df.take(10)) File "D:\Aliyun\spark-2.3.0-odps0.32.2\python\pyspark\sql\dataframe.py", line 504, in take return self.limit(num).collect() File "D:\Aliyun\spark-2.3.0-odps0.32.2\python\pyspark\sql\dataframe.py", line 466, in collect port = self._jdf.collectToPython() File "D:\Program Files (x86)\Python378\lib\site-packages\py4j\java_gateway.py", line 1305, in call answer, self.gateway_client, self.target_id, self.name) File "D:\Aliyun\spark-2.3.0-odps0.32.2\python\pyspark\sql\utils.py", line 63, in deco return f(*a, **kw) File "D:\Program Files (x86)\Python378\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 4, localhost, executor driver): java.sql.SQLException: Error happened when transforming [B@4ed7a22e into java.sql.Timestamp at com.aliyun.odps.jdbc.utils.transformer.to.jdbc.ToJdbcTimestampTransformer.transform(ToJdbcTimestampTransformer.java:79) at com.aliyun.odps.jdbc.OdpsResultSet.transformToJdbcType(OdpsResultSet.java:656) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:535) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:523) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:439) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:438) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:347) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:368) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:3195) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:3225) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.sql.SQLException: Error happened when transforming [B@4ed7a22e into java.sql.Timestamp at com.aliyun.odps.jdbc.utils.transformer.to.jdbc.ToJdbcTimestampTransformer.transform(ToJdbcTimestampTransformer.java:79) at com.aliyun.odps.jdbc.OdpsResultSet.transformToJdbcType(OdpsResultSet.java:656) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:535) at com.aliyun.odps.jdbc.OdpsResultSet.getTimestamp(OdpsResultSet.java:523) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:439) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$11.apply(JdbcUtils.scala:438) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:347) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:329) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
Process finished with exit code 1
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。