Java程序员清洗数据的小故事

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 阿里云SLS YYDS,世上无难事,只要肯攀登,办法总比困难多

背景

公司在已有IM项目中开发新业务功能,后端没埋点,团队没有BI,领导提了几个数据指标,统计每天A聊天功能使用人数,B功能功能使用人数,整体功能使用人数,还有就是最离谱的,要统计一次消息发送在服务器内消耗的时间。

了解到该项目线上有8台服务器,服务器日志存储7天,没有权限直接下载日志;接了阿里云SLS,SLS上会存120天的日志,超过天数的会放在阿里云oss 上进行冷备。

跟业务开发的同事明确日志格式以及相关指标数据提取对应的日志特征。关于消息耗时,就针对业务相关的消息,取了一百条消息的相关日志,进行取平均值,实在是不能每条都监控到。


开搞

计划

统计人数的任务

方案1,通过阿里云的sls 编写查询语句进行查询统计,下载少量日志编写代码进行验证sls 数据是否准确;

方案2,通过运维下载相关周期的日志,编写相关清洗代码,进行统计

统计耗时的任务

方案,通过上面查到的发消息人可确定消息内容,基于消息内容检索全部日志,整理贯穿服务的日志记录来计算消息转发耗时。

统计方案1 实践

阿里云sls 功能是采集服务日志,构建相关索引,提供日志聚合,搜索服务。当然其强大的支持sql 进行查询的功能,属实是强,但是也存在一定的学习成本。话不多说,来活儿了,就先想想怎么干。

image.png

既然有sls 这种神器(对我而言确实是),就直接用sls 进行搜索,这样能节省一定采集数据的时间,也不用劳烦运维同学。没有明确要哪一天的日志,我就先写一个能查出数据的语句出来。

那先基于A聊天功能实现一个搜索语句

__tag__:__path__: "/home/logs/im/MsgRcvMoblie.log" and "RCV from\:" and "groupchat" 
| select  
approx_distinct(cast(split(cast(split(content, ';') as array(varchar))[1],'from:') as array(varchar))[2]) AS num 
where content like '%hjbp%'

解释一下上面的语句:

第一行是明确日志文件以及相关日志格式,相当于明确数据源;

第二行的’|‘ 相当于linux 中的管道,这样就可以通过sql 对前面的数据源进行格式化;

第三行使用到了sls 支持的几个函数,具体可以通过阿里云日志服务帮助文档 搜索查看;

第四行使用了支持的like 模糊查询。

以上语句按一天的时间范围执行下来查询结果是不精确的。

image.png

为了能精确一些,想着一天我分三次查询缩小时间范围是可以做到精确查询的,那么这个语句就是可用的了,接下来进行验证逻辑的编写。

直接通过sls 是可以下载少量日志的,而且是csv 格式的,通过Java 让我从心底里抵触,就选择了5年前学了点儿皮毛的python 进行处理。大概的处理跟上面语句很类似,只是数据源比较多,通过pandas 库很快的读取csv 文件中的日志,并且方便处理成文件。

很快啊,验证的结果也是类似的贴一下我low low的代码

# 获取有关发消息各个指标数据
def get_data():
    # 读取日志
    f = open('D:/work/log/downloaded_data.txt',encoding='UTF-8')
    # 设置一个set 存用户id
    hjbp = set()
    for line in f:
        jSon = json.loads(line)
        content = jSon['content']
        # 获取符合标准的数据,有了上面sls 的准备写起来也就轻松了
        if content.__contains__('RCV from') and content.__contains__('groupchat') and  content.__contains__('hjbp'):
            hjbp.add(str.split(str.split(content,";")[0],"from:")[1])
    print("hjbp:", len(hjbp))
#main 函数执行
if __name__ == '__main__':
    get_data()

跟运维要了一天的数据,然后执行对比sls 查到的结果,一比一完美复刻,嗯,阿里云sls yyds。

上面这代码,Java 也是可以实现的啊,怎么就非得用python呢?实际上,我写完sls 的查询语句后,直接实现第二个任务,就是统计耗时的任务了,耗时任务实现中详解。

可天不遂人愿,产品需求是要30天的数据,sls 上述语句,5个指标分别执行,一天一个指标要执行3次,30天,我好崩溃,这5*3*30 查询次数谁顶得住啊,sls 咋这么废物啊。赶紧想解决方案,好在写了验证脚本,方案一其实已经实现了大半的方案二功能。在上述脚本中增加mysql 中的group by 功能即可实现,哈哈哈。不禁佩服起自己的严谨。

# 获取有关发消息各个指标数据
def get_data():
    # 读取日志
    f = open('D:/work/log/downloaded_data.txt',encoding='UTF-8')
    # 设置一个set 存用户id
    hjbp = set()
    # 声明两个list
    date = []
    bplist = []
    t = "2022-03-23"
    for line in f:
        jSon = json.loads(line)
        content = jSon['content']
        nt = time.strftime('%Y-%m-%d', time.localtime(int(jSon['__time__'])))
        if not(t.__eq__(nt)):
            date.append(t)
            bplist.append(len(hjbp))
            t = nt
            hjbp.clear()
        # 获取符合标准的数据,有了上面sls 的准备写起来也就轻松了
        if content.__contains__('RCV from') and content.__contains__('groupchat') and  content.__contains__('hjbp'):
            hjbp.add(str.split(str.split(content,";")[0],"from:")[1])
    # 字典中的key值即为csv中列名 输出到csv
    dataframe = pd.DataFrame({'date': date, 'bpcount': bplist, 'sjcount': sjlist, 'tmcount': tmlist, 'pccount': plist, 'cyjcount': cyjlist})
    dataframe.to_csv("D:/work/log/test.csv", index=False, sep=',')
#main 函数执行
if __name__ == '__main__':
    get_data()

好在第一次跟运维要数据,给的是跨天的24小时数据,还能基于sls 验证一下这个脚本的实用性,完美。

可数据来源又成了问题,一天的日志100多m,30天的日志数据下载,无疑也是对运维工作难度的强迫,谁让咱心善呢,那sls 肯定开放api 吧,我写脚本基于写好的语句进行调用,不禁对自己更加佩服了。

api 调用的权限走工单,开好了发现不能用,运维那边还有事儿,我先准备好调用的代码,可没权限无法验证写的脚本是否能用。巧妇难为无米之炊,实在无奈,情况急转直下,眼看不能如期交付任务,一个比较好的运维弟弟给我带来了曙光。帮我提了一个阿里云的工单,跟工作人员反馈之后,建议点开sql 增强就行了。

赶紧实验,打开下图的sql 增强,点开之后会提示有费用,果然,阿里云就是这么会挣钱。实验下来,确实精准查30天你的数据也是可以的。

image.png

__tag__:__path__: "/home/logs/im/MsgRcvMoblie.log" and "RCV from\:" and "groupchat" 
| select 
  approx_distinct(cast(split(cast(split(content, ';') as array(varchar))[1],'from:') as array(varchar))[2]) AS bpnum ,date_format(__time__,'%Y-%m-%d') as d 
  where content like '%hjbp%' group by d  order by d 

优化最开始的查询语句,加上按天归集的group by 语句,搜索范围改成一个月,非常不错,sls yyds 哈哈哈哈。至此,圆满完成相关指标的统计,这要自己一个劲儿的搜,不得累死啊。

耗时任务

当写完sls 语句的时候,我觉得已经完成了大半的任务,接下来,搞这个耗时的吧,本来想着搜到相关数据之后,在sls 上直接看,肉眼找到接收时间跟发送时间,然后作差。当我搜了10条日志之后,我发现不对劲儿,这tm也很累,一个多小时才搞了这么点儿。

那时候还没想着通过api 进行查询数据,先下载好日志样本,也就是那一百条消息。然后到sls 上搜,基本上能搜到相关的10几条日志,然后下载,下载下来的是csv 格式的数据。用Java 读是不是很头疼吧。我认为是比较麻烦的,就选择了用python 处理。

写脚本,下数据。

# 获取有关发消息之间差值方法
def make_data():
    ave = 0
    for info in os.listdir('D:/downloads/csv'):
        domain = os.path.abspath('D:/downloads/csv')
        fn = os.path.join(domain, info)
        content = pd.read_csv(fn,encoding='ISO-8859-1')
        max = 1
        min = 1
        for i in range(len(content)):
            stime = str(content['content'][i]).split(' ')[0].replace('_',' ')
            if 'nan' == stime:
                continue
            time = dt.datetime.strptime(stime, '%Y-%m-%d %H:%M:%S.%f').timestamp() * 1000
            if(i == 0):
                max = time
                min = time
                continue
            if(max < time):
                max = time
                continue
            if(time < min):
                min = time
                continue
        ave += max - min
    print(ave / 100)

当我把所有日志数据下载下来之后,通过这样一个函数,卡卡卡,就得到了100条日志的耗时一相加一平均,害。齐活儿。

其实上面数据是不准的,希望有机会能在现有的业务中加点儿埋点,记录好消息在服务端的耗时。

总结

世上无难事,只要肯攀登,办法总比困难多;虽然上面代码比较简陋,而且简单。为什么鄙人还好意思拿出来讲呢,实在是这种锲而不舍的精神,解决了一个又一个的难题,让我精神大好,荣光换发,而且也体会到了这种结构性的数据日志,通过代码处理起来是最轻松的。而且也不禁感叹阿里云的sls 功能强大,但是使用起来入门还是有一定的成本的,比如说帮助文档里的那么多函数。希望有一天能出现识别人语言并实现需求的AI,大家都能轻松应对工作。

大家加油!!!

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
6天前
|
关系型数据库 MySQL Java
【MySQL+java+jpa】MySQL数据返回项目的感悟
【MySQL+java+jpa】MySQL数据返回项目的感悟
23 1
|
2月前
|
XML 数据采集 存储
使用Java和XPath在XML文档中精准定位数据
在数据驱动的时代,从复杂结构中精确提取信息至关重要。XML被广泛用于数据存储与传输,而XPath则能高效地在这些文档中导航和提取数据。本文深入探讨如何使用Java和XPath精准定位XML文档中的数据,并通过小红书的实际案例进行分析。首先介绍了XML及其挑战,接着阐述了XPath的优势。然后,提出从大型XML文档中自动提取特定产品信息的需求,并通过代理IP技术、设置Cookie和User-Agent以及多线程技术来解决实际网络环境下的数据抓取问题。最后,提供了一个Java示例代码,演示如何集成这些技术以高效地从XML源中抓取数据。
使用Java和XPath在XML文档中精准定位数据
|
2月前
|
存储 算法 Java
惊!Java程序员必看:JVM调优揭秘,堆溢出、栈溢出如何巧妙化解?
【8月更文挑战第29天】在Java领域,JVM是代码运行的基础,但需适当调优以发挥最佳性能。本文探讨了JVM中常见的堆溢出和栈溢出问题及其解决方法。堆溢出发生在堆空间不足时,可通过增加堆空间、优化代码及释放对象解决;栈溢出则因递归调用过深或线程过多引起,调整栈大小、优化算法和使用线程池可有效应对。通过合理配置和调优JVM,可确保Java应用稳定高效运行。
126 4
|
2月前
|
算法 Java 程序员
在Java的编程世界里,多态不仅仅是一种代码层面的技术,它是思想的碰撞,是程序员对现实世界复杂性的抽象映射,是对软件设计哲学的深刻领悟。
在Java的编程世界里,多态不仅仅是一种代码层面的技术,它是思想的碰撞,是程序员对现实世界复杂性的抽象映射,是对软件设计哲学的深刻领悟。
60 9
|
2月前
|
Java 程序员
Java数据类型:为什么程序员都爱它?
Java数据类型:为什么程序员都爱它?
43 1
|
8天前
|
Java
jvm复习,深入理解java虚拟机一:运行时数据区域
这篇文章深入探讨了Java虚拟机的运行时数据区域,包括程序计数器、Java虚拟机栈、本地方法栈、Java堆、方法区、元空间和运行时常量池,并讨论了它们的作用、特点以及与垃圾回收的关系。
39 19
jvm复习,深入理解java虚拟机一:运行时数据区域
|
9天前
|
JSON 前端开发 Java
震惊!图文并茂——Java后端如何响应不同格式的数据给前端(带源码)
文章介绍了Java后端如何使用Spring Boot框架响应不同格式的数据给前端,包括返回静态页面、数据、HTML代码片段、JSON对象、设置状态码和响应的Header。
51 1
震惊!图文并茂——Java后端如何响应不同格式的数据给前端(带源码)
|
4天前
|
存储 Java API
如何使用 Java 记录简化 Spring Data 中的数据实体
如何使用 Java 记录简化 Spring Data 中的数据实体
26 9
|
4天前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
13 1
|
5天前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
11 1