Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase

简介: 环境为CDH5.8,开发工具为IDEA,大数据目前最新的API,送给大家避免踩坑!!

亲自摸索,送给大家,原创文章,转载注明哦。



import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.OutputFormat
/**
  * Created by sunyulong on 16/9/19.
  */
object OBDSQL extends App{
  //kafka topic
  val topics = List(("aaa",1)).toMap
  //zookeeper
  val zk = "10.1.11.71,10.1.11.72,10.1.11.73"
  val conf = new SparkConf() setMaster "yarn-cluster" setAppName "SparkStreamingETL"
  //create streaming context
  val ssc = new StreamingContext(conf , Seconds(1))
  //get every lines from kafka
  val lines = KafkaUtils.createStream(ssc,zk,"sparkStreaming",topics).map(_._2)
  //get spark context
  val sc = ssc.sparkContext
  //get sql context
  val sqlContext = new SQLContext(sc)
  //process every rdd AND save as HTable
  lines.foreachRDD(rdd => {
    //case class implicits
    import sqlContext.implicits._
    //filter empty rdd
    if (!rdd.isEmpty) {
      //register a temp table
      rdd.map(_.split(",")).map(p => Persion(p(0), p(1).trim.toDouble, p(2).trim.toInt, p(3).trim.toDouble)).toDF.registerTempTable("oldDriver")
      //use spark SQL
      val rs = sqlContext.sql("select count(1) from oldDriver")
      //create hbase conf
      val hconf = HBaseConfiguration.create()
      hconf.set("hbase.zookeeper.quorum",zk)
      hconf.set("hbase.zookeeper.property.clientPort", "2181")
      hconf.set("hbase.defaults.for.version.skip", "true")
      hconf.set(TableOutputFormat.OUTPUT_TABLE, "obd_pv")
      hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]])
      val jobConf = new JobConf(hconf)
      //convert every line to hbase lines
      rs.rdd.map(line => (System.currentTimeMillis(),line(0))).map(line =>{
        //create hbase put
        val put = new Put(Bytes.toBytes(line._1))
        //add column
        put.addColumn(Bytes.toBytes("pv"),Bytes.toBytes("pv"),Bytes.toBytes(line._2.toString))
        //retuen type
        (new ImmutableBytesWritable,put)
      }).saveAsNewAPIHadoopDataset(jobConf)     //save as HTable
    }
  })
  //streaming start
  ssc start()
  ssc awaitTermination()
}

//the entity of persion for SparkSQL
case class Persion(gender: String, tall: Double, age: Int, driverAge: Double)


相关文章
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
43 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
98 0
|
7月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
104 5
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
57 0
|
6月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
102 0
|
6月前
|
消息中间件 分布式计算 Kafka
利用Spark将Kafka数据流写入HDFS
利用Spark将Kafka数据流写入HDFS
101 0
|
7月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
139 0
|
7月前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
98 0
|
7月前
|
消息中间件 分布式计算 Kafka
Spark与Kafka的集成与流数据处理
Spark与Kafka的集成与流数据处理
|
7月前
|
消息中间件 分布式计算 Kafka
使用Kafka与Spark Streaming进行流数据集成
使用Kafka与Spark Streaming进行流数据集成