数据处理-链路统计Redis到Mysql1|学习笔记

简介: 快速学习数据处理-链路统计Redis到Mysql1

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):数据处理-链路统计Redis到Mysql1】学习笔记与课程紧密联系,让用户快速学习知识

课程地址https://developer.aliyun.com/learning/course/670/detail/11636


数据处理-链路统计Redis到Mysql1

 

内容介绍

一、回顾

二、Redis 是如何同步到 mysql 中去的

三、从 redis 读取数据同步到 mysql 的具体操作

 

一.回顾

数据都写到 redis 里面,然后这个数据界面里的数据的结果自动的就展示出来,这里面还差几步,redis 数据有一部分同步到 mysql 里面去,那为什么要同步 musql里面去?是怎么同步过去的?

这是第一步还有一部分就是Mysql里面的数据是怎么展现到前端的?

以及redis链路当前活跃的用户连接数是怎么从redis到我的界面的,这个代码属于前端在数据库中读取数据展现的一个过程,是前端工程师的工作

 

二、Redis 是如何同步到 mysql 中去的

Redis 同步到 mysql 主要是由 web 界面触发的,如何触发的?找到web工程,找到以后找到项目代码,项目代码中有一个定时的任务,双击定时任务

*每隔两个小时定时获取 redis 上的链路数据,存到 mysqL 对应的表中( datacoLLect ),*然后删掉redis 上已经备份到 mysqL 的数据。

默认的情况下系统自带的是按照上述代码每隔两小时执行的

但是我们不能等两小时,所以让它五秒钟发一次

 

三、从 redis 读取数据同步到 mysql 的具体操作

1. 具体代码

第一个代码是整个数据同步任务,cron用的是links中的crontime定时触发,每隔五秒钟运行一下代码。代码是如何运行的呢,进入到代码里面去看一下,选中redis读取数据同步到 mysql 的代码块 saveDataCollectData ();点击此代码块按住 Ctrl alt,这样就进入到了实际实现从 redis 读取数据同步到 mysql 的代码块,

@ Autowired

private IRealTimecomputDataservice realTimeComputDataservice ;

/**

*每隔两个小时定时获取 redis 上的链路数据,存到 mysqL 对应的表中( datacollect ,

*然后删掉 redis 上己经备份 mysqL 的数据。

*

*/

//@ Scheduled ( cron =”**0/2**?”)

@Scheduled ( cron ="0/5****?")

@ Transactional

public void BackupRedisLinkData () {

realTimecomputDataservice . saveDataCollectData ();

 

2. 下面看一下是如何实现的

首先创建了一个redis集群,前面我们在运行时有一段代码未显示出来,调整了环境配置文件之后就能读取数据了,此时读取的数据就是通过配置文件读取获得到我的19268100.160这个redis的集群。redis 集群拿到后继续向后走

@ Override

public void saveDatacollectData ()

JedisCluster jedisCluster = JedisConnctionUtil . getJedisCLusterO );

//记录当天的链璐总数

Map <192.168.2.141,100>

Map < String , Integer > linkCount = Trafficutil . trafficLinkInfo ( Constants . cSANTI _ MOITOR _ LP );

//循环存储所有的 IP

Set < String > keySet = linkCount . keySet ();

//遍历每一个 IP serverAddr 其中的一个 IP

for ( String serverAddr : keySet ){

//根据 serveraddr 查而链路历史数据

String hql =“ from Datacollect where server _ name =: serverName ";

Map く String ,0bject> params = new HashMap く~> O );

params . put (" serverName ", serverAddr );//192.168.2.111

//读取出数据库中 server _ name 与 serverAddr 相同的数据

Datacollect datacollect = datacollectDao . get ( hql , params );

// mysql 历史数据不存在,新存入数据

if ( datacollect ==null){

datacollect = new Datacollect ();

datacollect . setId ( UUID . randomUUID (). tostring ();

datacollect . setserverName ( serverAddr );

datacollect . setLastThreeDay sNum ( linkCount . get ( serverAddr ));

datacollect . setBeforeYesterdayNum (0);

Trafficutil . trafficLinkInfo 里面传入的是一个参数,Constants . cSANTI _ MOITOR _ LP );这个参数是流量数据的键值表示,这个参数在数据写入时redis就是key,这个key以此参数为前缀,再加一个时间戳,就是key的一个格式。

写入数据时 redis 里面的 key 就是写入的链路统计link前缀,按照这个作为前缀加一个时间戳向里面写,在读取时将 Constants . cSANTI _ MOITOR _ LP );作为前缀塞到 trafficLinkInfo 方法中。

前缀传递过去以后又弄了一个 redis 集群,

redis 由 cluster 传递进来,strflage 是刚传递过来的前缀 strflage+“*”拼成了一个字符串传递到

jedisConnectionUtil.keys 里面去

public static Map く String , Integer > trafficLinkInfo ( string strFlage ) {// strFlage CSANTI MONITOR LR

//实例化redis集群

 JedisCluster jedsCluster = JedisConnectionUtil . getJedisCluster ();

//从 redis 中拿出所有以 CSANTI_MONITOR_LP 作为前缀的 key

 set < String > keySet = JedisConnectionUtil . keys ( jedisCluster , pattern : strFlage +"*”); 

3. keys

keys 是干什么的,点击进去

public static Treeset < String > keys ( Jediscluster jc , string pattern ){

TreeSet < String > keys = new TreeSet く>();

Map く String , JedisPool > clusterNodes = jc · getclusterNodes (); for ( String k : clusterNodes . keySet (){

JedisPool jp = clusterNodes · get ( k );

Jedis jedis = jp .getResourceO3

try {

keys .addA1l( jedis . keys ( pattern ));

catch ( Exception e ){

e . printstackTrace ();

finally{

//用完一定要 close 这个链接

jedis . close ();

一定要 cLose .这个链接!!!

return keys 

4.JC

Jc是一个 redis 集群,而 pattern 是一个前缀通过 getclusterNodes 获得了 redis 里集群的一个节点,通过节点遍历节点找到 key,key 再往后初始化 clusterNodes 获得一个节点,然后再根据getResource属性来实践化一个 redis ,这个 redis 也就是 jedis,jedis.keys 就是拿到了以 pattern 为前缀的所有的 key,keys 是 string 中的一个 keyset,获得的所有数据写入到 treeset 中。

写入进去以后关闭集群,然后在返回keys通过这步就拿到了redis里面所有的以CSANTI_MONITOR_LP为前缀的属性,遍历所有的key拿到其中的某一个key,拿到key

以后,key通过定义的redis集群.getkey拿到的结果就是key对应的所有value值,这里面的value就是redis里选中的所有结果

try {

//循坏所有 key

for ( String str : keySet )// str CSANTIMONITOR _LP1553088805099

LinkJsonVO j5onVO= neW LinkJ5onvo()3

//获得某一个个 key 对应 value (" serversCountNap ":【"192.168.2.141“:30)," activeNumMap ":【"192.168.2.141":

String value = jedisCluster . get ( str );

{" serversCountMap ":"192.168.2.141”;30)," activeNumNap ":{"192.168.2.141”:"5"}转换 inkJsonvo

jsonVO =JsonResolveUtil.resolveLinkJ5on( value );

ServersCountN

Map < String , Integer > serversCountMap = jsonVO . getserversCountMap ();

//获取每个serversCountMap中的 IP

Set < String >keySet2= serversCountMap keySet ();

//遍历每个 IP string 就是每个 IP

for ( String string :keySet2){

//先从 LnkCount 中我取这个 IP 的值,

// Integer orDefault = linkCount . get ( string );

//若没有获得到,就赋值0 ,若能获得到,将其取出(第一次是空的,后面每次都有值)

orDefault = orDefault == null ?0:orDefault;

//获取{"192.168.2.141":30)中虹 P 对应的值一﹣>30 Integer orDefault2= serversCountMap . get ( string );

//将总统z计 inkCount 内的值和这个 IP 的值求和,并推送到总统计变量中

4. jsonVO

JsonVO是一个bean,bean的主要用途是从redis缓存数据库中读取数据,读取过来以后转化成bean,这就是前面将计算出来的数据写入到redis时初始化两个小的map,小map将它转化为大map,大map的key就是这里来的,redis里的数据转化为JsonVO

通过JsonResolveUtil.resolveLinkJ5on( value )返回JsonVO,

5. LinkJsonvo

JonResolveUtil.resolveLinkJ5on( value 是将string类型的结果转化成bean,传入的参数为字符串如果等于null或者等于空就返回return null,如果不等于就将这个string类型的值转化成LinkJsonvo的class,也就转化成了bean,这个bean就是LinkJsonvo jsonV0,传入过来的字符串已经通过这几行代码转化成了LinkJsonvo jsonV0,字符串返回的是一个jsonvo的bean,所以前面恰好是用LinkJsonvo接收的

public class JsonResolveUtil {

public static Jsonvo resolveJson ( String value ){

if ( null == valuell value =="")

return null ;

JsonVo jsonvO= new Jsonvo ();

jsonvo = JSON . parseobject ( value , Jsonvo . class );

return jsonVO ;

public static LinkJsonvo resolveLinkJson ( String value ){

if (nul1= valuell value ==""){

return null ;

LinkJsonvo jsonV0= new LinkJsonvo ();

jsonVO = JSON . parseobject ( value , LinkJsonvo . class );

return jsonVO ;

6. 如何获取结果

JsonVo的结果是从 bean 中 getserverscountMap,这个获取到的结果就是获取到了 redis 中的 serverscountMap,

192.168.100.160:12IP+数值,返回的结果是一个String , Integer,就变成了Ip+数值,转化完以后调用map中的keyset,拿到的是一个集合,keyset是string类型的key有多个值,有可能是多个,在我们的代码中只有一个服务器可能只有一个,如果是多台服务器就是多个值。keyset中拿到的有可能是多个值,多个值再遍历每个值,遍历每个值做什么?

string就是其中的一个IP,这个IP先到linkcount中去获取,刚刚我们看到了linkcount,但是并没有仔细研究,linkcount就是用来记录当前链路的总数,现在第一次运行程序的时候他是空的,空的时候从空的map里面去获取key肯定获取不到,获取不到就是空的。

在向下看如果Default等于null?0orDefault这是一个三元运算符,如果这个值等于null就返回0,如果不等于null那是什么就返回什么。

第一次肯定是0,那orDefault就是0在从serversCountMap . get ( string ),serversCountMap指的是String ,和 Integer,是一个IP值,serversCountMap 在. get key就获取了map中的key对应的值value,这个值是12,所以这里面get的值拿到的Default2就是12,就是012,前面的string是一个ip,ip拿到以后一个循环就做完了。

前面在redis中拿到了所有的key,在这里面第一个key刚遍历完,在遍历第二个key,是一模一样的流程,拿到key转化成jsonvo在获取到下一个serverscountMap,拿到服务器ip以后,再去linkcount里面将ip和数值得到,循环的目的是为了得到所有的数据的serverscountMap的ip对应的总和,对应的总和计算出来以后,把linkcount进行返回,最终将一个ip对应的这批数据的所有总和算出来返回了

try

//循环所有的key

 for ( String str : keySet )// str CSANTIMONITOR _LP1553088805099

LinkJsonvo jsonV0= new LinkJsonVo ();

//获得某一个 key 对应 vaLue ({" serverscountMap ":{"192.168.2.141”:30)," activeNunap ":{"192.168.2.141”:

String value = jediscluster . get ( str );

//将【" serversCountMap ":{"192.168.2.141”:30)," activeNumMap ":{"192.168.2.141":"5"转换成L inkJsonVO类的

jsonVo = JsonResolveUtil . resolveLinkJson ( value );

//获取jsonVO中的包含serveraddress ->"192.168.2.141":35)

Map < String , Integer > serversCountMap = jsonVo . getserversCountMap (); Set < String >keySet2= serversCountMap . keySet く);

//获取每个 serversCountMap 中的 IP

for ( String string :keySet2){

//先从 LinkCount 中获取这个 IP 的值,

 Integer orDefault = linkCount . get ( string );

//若没有获得到,就赋值0,若能获得到,将其取出(第一次是空的,后面每次都有值)

orDefault = orDefault ==null?0:orDefault;

//获取 {"192.168.2.141":30}中IP对应的值﹣->30

Integer orDefault2= serversCountMap . get ( string );

//}将总统讯 linkCount 内的值和这个 IP 的值求和,并推送到总统计变量中

linkCount . put ( string , orDefault +orDefault2);

相关文章
|
2月前
|
缓存 NoSQL 关系型数据库
MySQL 与 Redis 如何保证双写一致性?
我是小假 期待与你的下一次相遇 ~
407 7
|
5月前
|
关系型数据库 应用服务中间件 nginx
Docker一键安装中间件(RocketMq、Nginx、MySql、Minio、Jenkins、Redis)
本系列脚本提供RocketMQ、Nginx、MySQL、MinIO、Jenkins和Redis的Docker一键安装与配置方案,适用于快速部署微服务基础环境。
|
7月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
2月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
504 5
|
6月前
|
NoSQL Linux Redis
每天百万访问也不怕,Redis帮你搞定UV统计
本文介绍了使用Redis实现高性能UV统计系统的方法。Redis凭借其内存数据库特性,支持毫秒级响应和自动去重,非常适合高并发场景下的访客统计。核心思路是利用Redis的Set数据结构作为"每日签到墙",通过记录用户访问ID实现自动去重,并设置24小时过期时间。文章提供了Python代码示例,展示如何记录用户访问和获取当日UV统计数据,还可扩展实现多页面UV统计。相比传统数据库方案,Redis方案更加轻量高效,是中小型网站实现流量统计的理想选择。
530 0
|
11月前
|
存储 关系型数据库 MySQL
MySQL索引学习笔记
本文深入探讨了MySQL数据库中慢查询分析的关键概念和技术手段。
735 81
|
9月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
1. 先更新Mysql,再更新Redis,如果更新Redis失败,可能仍然不⼀致 2. 先删除Redis缓存数据,再更新Mysql,再次查询的时候在将数据添加到缓存中 这种⽅案能解决1 ⽅案的问题,但是在⾼并发下性能较低,⽽且仍然会出现数据不⼀致的问题,⽐如线程1删除了 Redis缓存数据,正在更新Mysql,此时另外⼀个查询再查询,那么就会把Mysql中⽼数据⼜查到 Redis中 1. 使用MQ异步同步, 保证数据的最终一致性 我们项目中会根据业务情况 , 使用不同的方案来解决Redis和Mysql的一致性问题 : 1. 对于一些一致性要求不高的场景 , 不做处理例如 : 用户行为数据 ,
|
9月前
|
消息中间件 缓存 NoSQL
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
|
11月前
|
缓存 NoSQL 关系型数据库
Redis与MySQL的数据一致性
在高并发环境下,保持 Redis 和 MySQL 的数据一致性是一个复杂但重要的问题。通过采用读写穿透、写穿透、分布式锁、双写一致性保障和延时双删策略,可以有效地减少数据不一致的风险,确保系统的稳定性和可靠性。通过合理的缓存策略和数据同步机制,可以显著提升系统的性能和用户体验。
528 22
|
12月前
|
NoSQL 关系型数据库 Redis
《docker高级篇(大厂进阶):1.Docker复杂安装详说》包括:安装mysql主从复制、安装redis集群
《docker高级篇(大厂进阶):1.Docker复杂安装详说》包括:安装mysql主从复制、安装redis集群
341 14

推荐镜像

更多