Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS AI 助手,专业版
简介: Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中

思路



两种方式,一种可优化(foreachRDD后,直接创建连接Mysql),一种在(foreachRDD后通过foreachPartition,通过分区获取)



代码实现


import java.sql.DriverManager
import Spark.UpdateStateByKey.workds
import Spark.WordCount.ssc
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object MysqlByKey extends App{
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
  val ssc = new StreamingContext(sparkConf,Seconds(10))
  // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制
  // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份
  // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在
  // 内存数据丢失的时候,可以从checkpoint中恢复数据
  // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
  ssc.checkpoint("E:/test")
  // 实现基础的wordcount逻辑
  val lines = ssc.socketTextStream("hadoop2", 9999)
  //val lines = ssc.textFileStream("E:/test")
  val words = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
  //将结果写入MySql
  words.foreachRDD(rdd => rdd.foreachPartition(line => {
    Class.forName("com.mysql.jdbc.Driver")
    //获取mysql连接
    val conn = DriverManager.getConnection("jdbc:mysql://192.168.57.101:3306/test", "root", "1234")
    //把数据写入mysql
    try {
      for (row <- line) {
        val sql = "insert into wordcount(word,wordcount)values('" + row._1 + "','" + row._2 + "')"
        conn.prepareStatement(sql).executeUpdate()
      }
    } finally {
      conn.close()
    }
  }))
   /*方法二
words.foreachRDD(rdd=>{
     rdd.foreachPartition(partionOfRecords=>{
       if(partionOfRecords.size>0){
         val connection = createConnection()
         partionOfRecords.foreach(record=>{
           val sql = "insert into wordcount(word,wordcount) values("+record._1+","+record._2+")"
           connection.createStatement().execute(sql)
         })
         connection.close()
       }
     })
   })
  //获取通过jdbc连接数据库
  def createConnection()={
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://hadoop2:3306/test","root","1234")
  }*/
  words.print()
  ssc.start()
  ssc.awaitTermination()
}



文章知识点与官方知识档案匹配,可进一步学


相关实践学习
自建数据库迁移到云数据库
本场景将引导您将网站的自建数据库平滑迁移至云数据库RDS。通过使用RDS,您可以获得稳定、可靠和安全的企业级数据库服务,可以更加专注于发展核心业务,无需过多担心数据库的管理和维护。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
消息中间件 canal 缓存
项目实战:一步步实现高效缓存与数据库的数据一致性方案
Hello,大家好!我是热爱分享技术的小米。今天探讨在个人项目中如何保证数据一致性,尤其是在缓存与数据库同步时面临的挑战。文中介绍了常见的CacheAside模式,以及结合消息队列和请求串行化的方法,确保数据一致性。通过不同方案的分析,希望能给大家带来启发。如果你对这些技术感兴趣,欢迎关注我的微信公众号“软件求生”,获取更多技术干货!
718 6
项目实战:一步步实现高效缓存与数据库的数据一致性方案
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
191 3
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
314 0
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
375 0
|
人工智能 JavaScript 关系型数据库
【02】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-ui设计图figmaUI设计准备-figma汉化插件-mysql数据库设计-优雅草卓伊凡商业项目实战
【02】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-ui设计图figmaUI设计准备-figma汉化插件-mysql数据库设计-优雅草卓伊凡商业项目实战
495 14
【02】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-ui设计图figmaUI设计准备-figma汉化插件-mysql数据库设计-优雅草卓伊凡商业项目实战
|
人工智能 JavaScript 安全
【01】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-需求改为思维导图-设计数据库-确定基础架构和设计-优雅草卓伊凡商业项目实战
【01】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-需求改为思维导图-设计数据库-确定基础架构和设计-优雅草卓伊凡商业项目实战
685 13
【01】Java+若依+vue.js技术栈实现钱包积分管理系统项目-商业级电玩城积分系统商业项目实战-需求改为思维导图-设计数据库-确定基础架构和设计-优雅草卓伊凡商业项目实战
|
SQL NoSQL 数据库
Cassandra数据库与Cql实战笔记
Cassandra数据库与Cql实战笔记
262 1
Cassandra数据库与Cql实战笔记
|
SQL 关系型数据库 MySQL
学成在线笔记+踩坑(3)——【内容模块】课程分类查询、课程增改删、课程计划增删改查,统一异常处理+JSR303校验
课程分类查询、课程新增、统一异常处理、统一封装结果类、JSR303校验、修改课程、查询课程计划、新增/修改课程计划
学成在线笔记+踩坑(3)——【内容模块】课程分类查询、课程增改删、课程计划增删改查,统一异常处理+JSR303校验
|
前端开发 应用服务中间件 API
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
359 0

推荐镜像

更多