CreateDirectStream 消费数据|学习笔记

简介: 快速学习 CreateDirectStream 消费数据

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建)CreateDirectStream 消费数据】学习笔记与课程紧密联系,让用户快速学习知识

课程地址https://developer.aliyun.com/learning/course/670/detail/11627


CreateDirectStream 消费数据

 

内容介绍:

一、CreateDirectStream 消费数据的步骤

二、CreateDirectStream 的代码实现

 

一、CreateDirectStream 消费数据的步骤

目标:掌握CreateDirectStream 消费数据的步骤

1、创建Spark conf

2、创建SparkContext

3、创建Streaming Context

4、读取kafka 内的数据ssc,kafkaParams,topics)

5、消费数据

6、开启 Streaming 任务+开启循环

 

二、CreateDirectStream 的代码实现

来到开发环境中,打开 ispider 并将其中的 main 关掉,找到test ,右键点击 scala 后,将复制出的CreateDirectStream 新建到Scala 的Object ,输入TestCreateDirectStream。

接下来处理消费数据的整个流程,用CreateDirectStream 的方法来读取并消费。

如下:

1、程序的入口

首先看代码逻辑,要执行该操作,就要创建出一个main 方法。

object TestCreateDirectStream {

//程序的入口

def main(args: Array[string]): Unit = {

//1创建 spark conf

valconf=newSparkConf().setMaster("local[2]").setAppName ("TestCreateDirectstream")

//2、创建 SparkContext

val sc=new SparkContext(conf)

//3、创建 streaming Context

val ssc=new StreamingContext(sc,Seconds(2))

//4、读取 kafka 内的数据 ssc,kafkaParams,topics)

KafkaUtils.createDirectstream()

//其中 createDirectstream() 会爆红,因为createDirectstream() 中需要很多参数,但实际里面没有参数。

查看缺的参数需要按ctrl ,会出现很多用法。需要用到(其中Class 参数是无用的):

def createDirectStream [K, V, KD <: Decoder[K], VD <: Decoder[V]](

jssc: JavaStreamingContext,

keyClass: Class[K],

valueClass: Class[V],

keyDecoderClass: Class[KD],

valueDecoderClass: Class[VD],

kafkaParams: JMap[String, String],

topics: JSet[String]

//实例kafkaParams

val kafkaParams = Map ("bootstrap.servers "->" 192.168.100.100:9092,192.168.100.110:9092,192.168.100.120:9092")

//实例 topics

val topics=Set("test01")

//接收数据

val kafkaDatas = KafkaUtils.createDirectStream [String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)

//只获取value 数据

val kafkaValue=kafkaDatas.map(_._2)

//5、消费数据

kafkaValue.foreachRDD(rdd=>rdd.foreach(println))

//6、开启 streaming 任务+开启循环

ssc.start()

ssc.awaitTermination()

相关文章
|
消息中间件 监控 Java
Kafka Producer异步发送消息技巧大揭秘
Kafka Producer异步发送消息技巧大揭秘
1215 0
|
资源调度 分布式计算 算法
【揭秘Yarn调度秘籍】打破资源分配的枷锁,Hadoop Yarn权重调度全攻略!
【8月更文挑战第24天】在大数据处理领域,Hadoop Yarn 是一种关键的作业调度与集群资源管理工具。它支持多种调度器以适应不同需求,默认采用FIFO调度器,但可通过引入基于权重的调度算法来提高资源利用率。该算法根据作业或用户的权重值决定资源分配比例,权重高的可获得更多计算资源,特别适合多用户共享环境。管理员需在Yarn配置文件中启用特定调度器(如CapacityScheduler),并通过设置队列权重来实现资源的动态调整。合理配置权重有助于避免资源浪费,确保集群高效运行,满足不同用户需求。
276 3
|
消息中间件 Kafka Python
异步Producer的实现与优势
【8月更文第29天】在分布式系统中,消息队列是处理大规模并发任务的核心组件之一。其中,Kafka 是一种广泛使用的分布式流处理平台,提供了高吞吐量、低延迟的消息传递能力。在设计生产者(Producer)时,选择同步还是异步模式会直接影响到系统的性能和可扩展性。
188 2
|
消息中间件 资源调度 Kafka
实时计算 Flink版操作报错合集之提交任务后,如何解决报错:UnavailableDispatcherOperationException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
数据管理 Linux Shell
export命令详解
export命令详解
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
4421 3
Flink CDC:新一代实时数据集成框架
|
分布式计算 Java Scala
spark 与 scala 的对应版本查看、在idea中maven版本不要选择17,弄了好久,换成11就可以啦
spark 与 scala 的对应版本查看、.在idea中maven版本不要选择17,弄了好久,换成11就可以啦
817 2
|
消息中间件 存储 Kafka
【Kafka大揭秘】掌握这些秘籍,让你的消息状态跟踪稳如老狗,再也不怕数据丢失的尴尬时刻!
【8月更文挑战第24天】Kafka作为一个领先的分布式流数据平台,凭借其出色的性能和扩展性广受青睐。为了保障消息的可靠传输与处理,Kafka提供了一系列核心机制:生产者确认确保消息成功到达;消费者位移管理支持消息追踪与恢复;事务性消息保证数据一致性;Kafka Streams的状态存储则适用于复杂的流处理任务。本文将详细解析这些机制并附带示例代码,帮助开发者构建高效稳定的消息处理系统。
184 5
|
Java 数据库连接 API
Flink报错问题之用Tumble窗口函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。