package rizhichuli.sparkstreaming
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import java.util.Properties
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.util._
//import org.apache.spark.examples.streaming._
import org.apache.kafka.serializer.StringDecoder
import org.apache.spark.streaming.util._
import kafka.utils.VerifiableProperties //红色的为后加上的,并且有错,无法找到kafka下的utils等包
/**
* @author ${穆金星}
*/
object App {
def main(args : Array[String]) {
//if(args.length<4){
// System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
// System.exit(1)
// }
System.setProperty("hadoop.home.dir", "D:\\appbase\\hadoop-2.7.4\\hadoop-2.7.4")
val topic=Set("linuxSysteminfos")
val brokers="10.218.7.232:9092"
val sparkConf=new SparkConf().setMaster("local[2]").setAppName("App")
val sc=new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(10))
//ssc.sparkContext.setLogLevel("ERROR")
val kafkaParams=Map[String,String]("metadata.broker.list"->brokers,"serializer.class"->"kafka.serializer.StringEnvoder")
val kafkaStream=KafkaUtils.createDirectStream(ssc,kafkaParams,topic)
kafkaStream.print()
ssc.start()
ssc.awaitTermination()
}
}
运行后出现的错误是
17/09/20 22:35:10 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3074)
at java.lang.Class.getConstructor(Class.java:1817)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/09/20 22:35:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodException: scala.runtime.Nothing$.<init>(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:3074)
at java.lang.Class.getConstructor(Class.java:1817)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:150)
at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
添加kafka.utils.VerifiableProperties类后发现
object utils is not a member of package org.apache.kafka
但是确定kafka包已经包含在了maven中且其中有此类,忘各位大神能够帮忙解答
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。