UpdateStateByKey、Tranform 算子_2|学习笔记

简介: 快速学习 UpdateStateByKey、Tranform 算子_2

开发者学堂课程【大数据实时计算框架 Spark 快速入门:UpdateStateByKey、Tranform 算子_2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/100/detail/1725


UpdateStateByKey、Tranform 算子_2


内容简介:

一、UpdateStateByKey 相关代码

二、Transform Operation变换操作介绍

三、TraratormOperation 相关代码

 

一、UpdateStateByKey 相关代码

1 package com.snsxt.stuay.streaming;

2

3 import java.util.Arrays;

19

20 public class UpdateStateByKeyWordcount{

21

22 public static void main(string[] args){

23SparkConfconf=newSparkConf().setAppName("UpdateStateByKeyWordcount").setMaster("local[2]");

24JavaStreamingContextjss= new JavaStreamingContext(conf,Durations.seconds(5));

25jssd.checkpoint(".);

26

27JavaReceiverInputDstream<String> linesmissc.socketTextStream("node24",8888);

28JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,string>()(

29

30private static final long serialVersionUID = 1L;

31

32@Override

33 public Iterable<String> call(string line) throws Exception{

34 return Arrays.asList(line.split(""));

35

36});

37 JavaPairDStream<String, Integer> pairs =words.mapToPair(new PairFunction<String, string, Integer>()(

38

39private static final long serialversionUID=1L;

40

41@Override

42public Tuple2<String, Integer> call(string word)throws Exception{

43return new Tuple2<String, Integer>(word,1);

44}

45);

46 JavaPairDStream<String,Integer>wordcountspairs.updateStateByKey(new Function2<List<Integer>, Optionale

47

48private static final long serialVersionUID= 1L;

49

50   //实际上,对于每个单词,每次 batch 计算的时候,都会调用这个函数,第一个参values 相当于这个 batch 中

51   // 这个 key 对应的新的一组值,可能有多个,可能 2 个 1,(xuruyun,1)(xuruyun,1),那么这个 values 就是(1,1)

52//那么第二个参数表示的是这个 key 之前的状态,我们看类型 Integer 也就知道了,这里是泛型自己指定的

 

二、Transform Operation(变换操作)

The transform operation(along with is variations like transformwith)allows arbitraryRDO-to-RDD functions to be applyed on a DStream. it can be used to apply any ROD operation that is not expdsed in the DStream API. For example,the functionality of joining every batch in a data stream wth another dataset is not directly exposed in the DStream API.However, you can easily use transform to do this. This enables very powerful possibilities.For example,one can do real-time data cleaning by joining the input data stream with precomputed spam information (maybe generated with Spark as well) and then filtering based on it.

译文:变换操作(类似于 transforn with 的变体)允许将任意的 RDO 到 RDD 函数应用于 DStream 。它可以用于应用任何在 DStream API 中没有展开的 ROD 操作。例如,在数据流中加入每个批处理的功能另一个数据集不会直接暴露在 DS Stream API.不过,您可以轻松地使用 transform 来完成此操作。这非常具有可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可能使用 Spark 生成)连接起来,然后根据这些信息进行匹配,从而实现实时数据清理。

 

三、TraratormOperation 相关代码

1  package com.shsxt.study.streaming;

2

3  import java.util.ArrayList;

20

21 public class TransformOperation{

22

23  public static void main(String[] args)(

24 SparkConf conf=newSparkConf().setAppName("TransformOperation").setMaster("local[2]");

25JavaStreamingContex jssc=new JavaStreamingContext(conf,Durations.seconds(20));

26

27 //用户对于网上的广告可以进行点击!点击之后可以进行实时计算,但是有些用户就是刷广告!

28//所以说我们要有一个黑名单机制!只要是黑名单中的用户点击的广告,我们就给过掉!

29

30 // 先来模拟一个名单数据 RDDtrue 代表启用,false 代表不启用!

31 List<Tuple2<String,Boolean>> blacklist =new ArrayList<Tuple2<String, Boolean>>();

32blacklist.add(new Tuple2<String,Boolean>("yasaka", true));

33 blacklist.add(new Tuple2<String,Boolean>("xuruyun",false));

34

35finalJavaPairRDO<String,Boolean>blacklistRDD =jssc.se().parallelizePairs(blacklist);

36

37    // time adId name

38  JavaReceiverInputDStream<String>adsClickLogDStream =jssc.socketTextStream("node24",8888);

39

40  JavaPairDStream<String,String>adsClickLogPairDStream = adsClickLogDStream.mapToPair(new PairFunction<String, String, String>()

41

42  private static final long serialVersionuID=1L;

43

44  @Override

45 public Tuple2<String, String> call(string line)throws Exception(

46 return new Tuple2<String,String>(line.split("")[2],line);

47

48));

49

50JavaDStream<String>normalLogs=adsClickLogPairDStream.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String() {

51

52  private static final long serialVersionuID = 1L;

53

54 @Override

55 public JavaRDD<String> call(JavaPairRDD<String, String> userLogBatchrDo)

56  throws Exception{

57

58 JavaPairRDD<String,Tuple2<String,Optional<Boolean>>> joinedRDD=userLogBatchRDD.leftOuterJoin(blacklistRDD);

59

60

61 JavaPairRDD<String,Tuple2<String, Optional<Boolean>>> filteredRDD =

62 joinedRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>,Boolean>()

63

64 private static final long serialVersionUID = 1L;

65

66 @Override

67publicBoolean call(Tuple2<String,Tuple2<String,Optional<Boolean>>> tuple)

68 throws Exception{

69 

70 if(tuple.2.2.isPresent()&&tuple.2.2.get()){

71 return false;

72}

73

74return true;

75}

相关文章
|
2天前
|
数据采集 人工智能 安全
|
11天前
|
云安全 监控 安全
|
3天前
|
自然语言处理 API
万相 Wan2.6 全新升级发布!人人都能当导演的时代来了
通义万相2.6全新升级,支持文生图、图生视频、文生视频,打造电影级创作体验。智能分镜、角色扮演、音画同步,让创意一键成片,大众也能轻松制作高质量短视频。
1017 151
|
3天前
|
编解码 人工智能 机器人
通义万相2.6,模型使用指南
智能分镜 | 多镜头叙事 | 支持15秒视频生成 | 高品质声音生成 | 多人稳定对话
|
16天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
1711 9
|
8天前
|
人工智能 自然语言处理 API
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸
一句话生成拓扑图!next-ai-draw-io 结合 AI 与 Draw.io,通过自然语言秒出架构图,支持私有部署、免费大模型接口,彻底解放生产力,绘图效率直接爆炸。
652 152
|
10天前
|
人工智能 安全 前端开发
AgentScope Java v1.0 发布,让 Java 开发者轻松构建企业级 Agentic 应用
AgentScope 重磅发布 Java 版本,拥抱企业开发主流技术栈。
620 12
|
10天前
|
人工智能 自然语言处理 API
Next AI Draw.io:当AI遇见Draw.io图表绘制
Next AI Draw.io 是一款融合AI与图表绘制的开源工具,基于Next.js实现,支持自然语言生成架构图、流程图等专业图表。集成多款主流大模型,提供智能绘图、图像识别优化、版本管理等功能,部署简单,安全可控,助力技术文档与系统设计高效创作。
690 151