在idea上使用scala插件,创建scala-maven工程成功运行,打jar包出错
代码:
package com.chanct.idap.ssm.spark import com.chanct.idap.ssm.common.{HbaseUtils, PlatformConfig} import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by lichao on 16-4-4. * */ object KafKa2SparkStreaming2Hbase { def main(args: Array[String]) { val zkQuorum = "c1d8:2181,c1d9:2181,c1d10:2181" val group = "1" val topics = "scala_api_topic" val numThreads = 2 // val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").setMaster("local[2]").set("spark.eventLog.overwrite","true") val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").set("spark.eventLog.overwrite","true") val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() wordCounts.foreachRDD { rdd => rdd.foreachPartition { partition => val conf = PlatformConfig.loadHbaseConf() //加载hbase配置文件 val conn = HbaseUtils.getConnection(conf) //获取hbase连接 val userTable = TableName.valueOf("WCTest") val table = conn.getTable(userTable) partition.foreach { w => try { val put = new Put(Bytes.toBytes(System.currentTimeMillis().toString)) put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("value"), Bytes.toBytes(w._1.toString)) put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("count"), Bytes.toBytes(w._2.toString)) table.put(put) } catch { case _: Exception => println("raw error!") } } table.close() conn.close() } } wordCounts.print() ssc.start() ssc.awaitTermination() } }
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.chanct.idap</groupId> <artifactId>ssm</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <inceptionYear>2016</inceptionYear> <properties> <scala.version>2.10.4</scala.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <!-- <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories>--> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.4.5</version> <exclusions> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>2.6.0-mr1-cdh5.4.5</version> <exclusions> <exclusion> <groupId>javax.servlet.jsp</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.0.0-cdh5.4.5</version> <type>pom</type> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.0-cdh5.4.5</version> <exclusions> <exclusion> <artifactId>servlet-api-2.5</artifactId> <groupId>org.mortbay.jetty</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.specs</groupId> <artifactId>specs</artifactId> <version>1.2.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-assembly_2.10</artifactId> <version>1.3.0-cdh5.4.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.0</version> </dependency> <dependency> <groupId>org.codehaus.jettison</groupId> <artifactId>jettison</artifactId> <version>1.3.7</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_2.10</artifactId> <version>1.3.0-cdh5.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.0-cdh5.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.3.0-cdh5.4.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.0</version> </dependency> </dependencies> <build> <!-- <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory>--> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.7</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <!--<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin>--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> </transformer> </transformers> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> <!-- <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting>--> </project>maven打包方法:
spark 提交命令:
spark-submit --class com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase --master yarn-cluster --num-executors 3 --driver-memory 1g --executor-memory 512m --executor-cores 1 /home/lc/ssm-1.0-SNAPSHOT.jar
最后的错误:
Exception in thread "Thread-63" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at org.apache.spark.SparkContext.stop(SparkContext.scala:1386) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:107) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) 16/04/12 10:33:55 INFO cluster.YarnClusterSchedulerBackend: Shutting down all executors 16/04/12 10:33:55 INFO cluster.YarnClusterSchedulerBackend: Asking each executor to shut down 16/04/12 10:33:55 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped! 16/04/12 10:33:55 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 16/04/12 10:33:55 INFO storage.MemoryStore: MemoryStore cleared 16/04/12 10:33:55 INFO storage.BlockManager: BlockManager stopped 16/04/12 10:33:55 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 16/04/12 10:33:55 WARN scheduler.EventLoggingListener: Event log hdfs://nameservice1/user/spark/applicationHistory/application_1460352776638_0056 already exists. Overwriting... 16/04/12 10:33:55 INFO Remoting: Remoting shut down 16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 16/04/12 10:33:55 INFO spark.SparkContext: Successfully stopped SparkContext 16/04/12 10:33:55 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: Job aborted due to stage failure: Task 2 in stage 7.0 failed 4 times, most recent failure: Lost task 2.3 in stage 7.0 (TID 84, c1d3): java.lang.NullPointerException at com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase$$anonfun$main$1$$anonfun$apply$1.apply(KafKa2SparkStreaming2Hbase.scala:47) at com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase$$anonfun$main$1$$anonfun$apply$1.apply(KafKa2SparkStreaming2Hbase.scala:42) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace:) 16/04/12 10:33:55 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. 16/04/12 10:33:55 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1460352776638_0056
问题解决了,spark与hbase衔接由于版本的问题,会有jar包找不到的问题,在spark配置中加入spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
在spark提交任务的时候:spark-submit --master yarn-cluster --driver-class-path /etc/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar ....
http://community.cloudera.com/t5/Apache-Hadoop-Concepts-and/Class-not-found-running-Spark-sample-hbase-importformat-py-in/td-p/27084
终于搞完了,花了一天时间还是很有收获的
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。