go| go并发实战: 搭配 influxdb + grafana 高性能实时日志监控系统

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
日志服务 SLS,月写入数据量 50GB 1个月
全局流量管理 GTM,标准版 1个月
简介: go| go并发实战: 搭配 influxdb + grafana 高性能实时日志监控系统
Go并发编程案例解析

继续好玩的并发编程实战, 上一篇 go| 感受并发编程的乐趣 前篇.

实战内容: 实时处理读取/解析日志文件, 搭配 influxdb(时序数据库) 存储, grafana 展示, 并提供系统的简单监控.

0x00: 初始化, 面向过程编程

用面向过程的方式, 对问题进行简单的梳理, 代码如下:

package main

func main() {
    // read log file

    // process log

    // write data
}

这里并没有写具体的实现, 因为到这里, 我们就可以开始考虑 封装

0x01: 过程封装, 使用 LogPorcess 结构体

引入 LogProcess 结构体, 将整个任务 面向对象 化, 伪代码如下:

package main

import (
    "fmt"
    "strings"
)

type LogProcess struct {
    path string // 日志文件路径
    dsn string // influxdb dsn
}

func (lp *LogProcess) Read() {
    path := lp.path
    fmt.Println(path)
}

func (lp *LogProcess) Process() {
    log := "hello world"
    fmt.Println(strings.ToUpper(log))
}

func (lp *LogProcess) Write()  {
    dsn := lp.dsn
    fmt.Println(dsn)
}

func main() {
    lp := &LogProcess{
        path: "test path",
        dsn: "test dsn",
    }

    // read log file
    lp.Read()

    // process log
    lp.Process()

    // write data
    lp.Write()
}

0x02: 加上 go 和 chan, 并发就是如此简单

加上 go 关键字, 轻松实现协程:

func main() {
    lp := &LogProcess{
        path: "test path",
        dsn: "test dsn",
    }

    // read log file
    go lp.Read()

    // process log
    go lp.Process()

    // write data
    go lp.Write()

    time.Sleep(time.Second) // 新手必知: 保证程序退出前, 协程可以执行完
}

加上 chan, 轻松实现协程间通信:

type LogProcess struct {
    path string // 日志文件路径
    dsn string // influxdb dsn
    rc chan string // read chan
    wc chan string // write chan
}

func (lp *LogProcess) Read() {
    path := lp.path
    fmt.Println(path)

    lp.rc <- "test data"
}

func (lp *LogProcess) Process() {
    log := <- lp.rc
    lp.wc <- strings.ToUpper(log)
}

func (lp *LogProcess) Write()  {
    dsn := lp.dsn
    fmt.Println(dsn)

    data := <- lp.wc
    fmt.Println(data)
}

0x03: 引入 interface, 方便以后扩展

现在是从 文件 读取, 如果以后要从 其他数据源 读取呢? 这个时候就可以用上接口:

type Reader interface {
    Read(rc chan string)
}

type ReadFromFile struct {
    path string
}

func (r *ReadFromFile) Read(rc chan string) {
    // read from file
}

同理, 数据写入到 influxdb 也可以加入接口, 方便以后扩展.

0x04: 读取文件的细节

实时读取日志文件要怎么实现呢? 直接上代码, 细节有很多, 注意 注释:

  • 实时 读取怎么实现: 从文件末尾开始读取
  • 怎么一行一行的读取日志: buf.ReadBytes('\n')
  • 输出怎么多了换行呢: 截取掉最后的换行符 line[:len(line)-1]
func (r *ReadFromFile) Read(rc chan []byte) {
    f, err := os.Open(r.path)
    if err != nil {
        panic(err)
    }
    defer f.Close()

    f.Seek(0, 2) // 文件末尾
    buf := bufio.NewReader(f) // []byte 数据类型, rc chan 的类型也相应进行了修改

    for {
        line, err := buf.ReadBytes('\n')
        // todo: 处理日志切割, inode 变化的情况
        if err == io.EOF {
            time.Sleep(500 * time.Millisecond)
        } else if err != nil {
            panic(err)
        } else { // 需要写到这里
            rc <- line[:len(line)-1]
        }
    }
}

还有一个需要优化的地方, 一般日志文件都会采取 轮转 策略(详见上篇blog devops| 日志服务实践), 文件可能更新了, 所以读取文件时, 还需要加一个判断.

0x05: 日志解析, 又见正则

日志的解析比较简单, 按照日志的格式正则匹配即可:

// 使用结构体来记录匹配到的日志数据
type Log struct {
    TimeLocal                    time.Time
    BytesSent                    int
    Path, Method, Scheme, Status string
    UpstreamTime, RequestTime    float64
}

func (l *LogProcess) Process() {
    // 正则
    re := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(
\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([d\.-]+)`)

    loc, _ := time.LoadLocation("PRC")
    for v := range l.rc {
        str := string(v)
        ret := re.FindStringSubmatch(str)
        if len(ret) != 14 {
            log.Println(str)
            continue
        }

        msg := &Log{}
        t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
        if err != nil {
            log.Println(ret[4])
        }
        msg.TimeLocal = t

        byteSent, _ := strconv.Atoi(ret[8])
        msg.BytesSent = byteSent

        // Get /for?query=t HTTP/1.0
        reqSli := strings.Split(ret[6], " ")
        if len(reqSli) != 3 {
            log.Println(ret[6])
            continue
        }
        msg.Method = reqSli[0]
        msg.Scheme = reqSli[2]
        // url parse
        u, err := url.Parse(reqSli[1])
        if err != nil {
            log.Println(reqSli[1])
            continue
        }
        msg.Path = u.Path
        msg.Status = ret[7]
        upTime, _ := strconv.ParseFloat(ret[12], 64)
        reqTime, _ := strconv.ParseFloat(ret[13], 64)
        msg.UpstreamTime = upTime
        msg.RequestTime = reqTime

        l.wc <- msg
    }
}

0x06: 上手 influxdb

influxdb 是时序数据库的一种, 包含如下基础概念:

  • database: 数据库
  • measurement: 数据库中的表
  • points: 表里的一行数据

其中 points 包含以下内容:

  • tags: 有索引的属性
  • fields: 值
  • time: 时间戳, 也是自动生成的主索引

使用 docker 快速开启 InfluxDb Server:

    influxdb:
        image: influxdb:1.4.3-alpine
        ports:
            - "8086:8086"
        #     - "8083:8083" # admin
        #     - "2003:2003" # graphite
        environment:
            INFLUXDB_DB: log
            INFLUXDB_USER: log
            INFLUXDB_USER_PASSWORD: logpass
        #     INFLUXDB_GRAPHITE_ENABLED: 1
        #     INFLUXDB_ADMIN_ENABLED: 1
        # volumes:
        #     - ./data/influxdb:/var/lib/influxdb

influxdb 使用 go 语言实现, 稍微修改一下官方文档中示例, 就可以使用 client:

InfluxDB Client: https://github.com/influxdata/influxdb/tree/master/client
// 写入也使用接口
type Writer interface {
    Write(wc chan *Log)
}

type WriteToInfluxdb struct {
    dsn string
}

// 只在官方示例代码上做了一点修改
func (w *WriteToInfluxdb) Write(wc chan *Log) {
    // dsn 示例: http://localhost:8086@log@logpass@log@s
    dsnSli := strings.Split(w.dsn, "@")

    // Create a new HTTPClient
    c, err := client.NewHTTPClient(client.HTTPConfig{
        Addr:     dsnSli[0],
        Username: dsnSli[1],
        Password: dsnSli[2],
    })
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    // Create a new point batch
    bp, err := client.NewBatchPoints(client.BatchPointsConfig{
        Database:  dsnSli[3],
        Precision: dsnSli[4],
    })
    if err != nil {
        log.Fatal(err)
    }

    for v := range wc {
        // Create a point and add to batch
        tags := map[string]string{
            "Path": v.Path,
            "Method": v.Method,
            "Scheme": v.Scheme,
            "Status": v.Status,
        }
        fields := map[string]interface{}{
            "bytesSent":   v.BytesSent,
            "upstreamTime": v.UpstreamTime,
            "RequestTime":   v.RequestTime,
        }

        pt, err := client.NewPoint("log", tags, fields, v.TimeLocal)
        if err != nil {
            log.Fatal(err)
        }
        bp.AddPoint(pt)

        // Write the batch
        if err := c.Write(bp); err != nil {
            log.Fatal(err)
        }

        // Close client resources
        if err := c.Close(); err != nil {
            log.Fatal(err)
        }
    }
}

0x07: 使用 Grafana 接入 InfluxDB 数据源

Grafana 使用 docker 也可以轻松部署:

    grafana:
        image: grafana/grafana:5.1.0-beta1
        ports:
            - "3000:3000"
        environment:
            GF_SERVER_ROOT_URL: http://grafana.server.name
            GF_SECURITY_ADMIN_PASSWORD: secret

官网效果图:

Grafana

0x08: 简单监控系统实现

作为一个 实时 系统, 需要后台常驻运行, 怎么查看系统的运行状态的呢?

加入一个简单的监控系统, 通过 http 请求查看系统实时运行状态:

// 需要监控的系统状态
type SystemInfo struct {
    LogLine int `json:"logline"` // 总日志处理数
    Tps float64 `json:"tps"`
    ReadChanLen int `json:"readchanlen"` // read chan 长度
    WriteChanLen int `json:"writechanlen"` // write chan 长度
    RunTime string `json:"runtime"` // 运行总时间
    ErrNum int `json:"errnum"` // 错误数
}

// 监控类
type Monitor struct {
    startTime time.Time
    data SystemInfo
}

// 启动监控, 其实就是一个简单的 http server
func (m *Monitor) start(lp *LogProcess) {
    http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
        m.data.RunTime = time.Now().Sub(m.startTime).String()
        m.data.ReadChanLen = len(lp.rc)
        m.data.WriteChanLen = len(lp.wc)

        ret, _ := json.MarshalIndent(m.data, "", "\t")

        io.WriteString(writer, string(ret))
    })

    http.ListenAndServe(":9091", nil)
}

func main() {
    ...

    // 运行监控
    m := &Monitor{
        startTime: time.Now(),
        data: SystemInfo{},
    }
    m.start(l)
}

监控数据中的 TPS 稍微有点难处理:

  • 启动一个定时器, 比如 5s
  • 记录下时间间隔内的 LogLine(日志处理行数)

这样我们就可以用 LogLine 来估算系统的 TPS 了

0x09: 写在最后

并发编程实战, 总会给人带来 又完成了了不起的任务 的感觉, 特别是会了解更多的细节.

能够相遇, 也是一种快乐吧.
目录
相关文章
|
11天前
|
存储 前端开发 数据可视化
Grafana Loki,轻量级日志系统
本文介绍了基于Grafana、Loki和Alloy构建的轻量级日志系统。Loki是一个由Grafana Labs开发的日志聚合系统,具备高可用性和多租户支持,专注于日志而非指标,通过标签索引而非内容索引实现高效存储。Alloy则是用于收集和转发日志至Loki的强大工具。文章详细描述了系统的架构、组件及其工作流程,并提供了快速搭建指南,包括准备步骤、部署命令及验证方法。此外,还展示了如何使用Grafana查看日志,以及一些基本的LogQL查询示例。最后,作者探讨了Loki架构的独特之处,提出了“巨型单体模块化”的概念,即一个应用既可单体部署也可分布式部署,整体协同实现全部功能。
230 69
Grafana Loki,轻量级日志系统
|
3月前
|
安全 Go
用 Zap 轻松搞定 Go 语言中的结构化日志
在现代应用程序开发中,日志记录至关重要。Go 语言中有许多日志库,而 Zap 因其高性能和灵活性脱颖而出。本文详细介绍如何在 Go 项目中使用 Zap 进行结构化日志记录,并展示如何定制日志输出,满足生产环境需求。通过基础示例、SugaredLogger 的便捷使用以及自定义日志配置,帮助你在实际开发中高效管理日志。
95 1
|
3月前
|
Prometheus 运维 监控
智能运维实战:Prometheus与Grafana的监控与告警体系
【10月更文挑战第26天】Prometheus与Grafana是智能运维中的强大组合,前者是开源的系统监控和警报工具,后者是数据可视化平台。Prometheus具备时间序列数据库、多维数据模型、PromQL查询语言等特性,而Grafana支持多数据源、丰富的可视化选项和告警功能。两者结合可实现实时监控、灵活告警和高度定制化的仪表板,广泛应用于服务器、应用和数据库的监控。
408 3
|
20天前
|
存储 监控 算法
内网监控系统之 Go 语言布隆过滤器算法深度剖析
在数字化时代,内网监控系统对企业和组织的信息安全至关重要。布隆过滤器(Bloom Filter)作为一种高效的数据结构,能够快速判断元素是否存在于集合中,适用于内网监控中的恶意IP和违规域名筛选。本文介绍其原理、优势及Go语言实现,提升系统性能与响应速度,保障信息安全。
27 5
|
5天前
|
监控 关系型数据库 MySQL
【01】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-硬件设备实时监控系统运营版发布-本产品基于企业级开源项目Zabbix深度二开-分步骤实现预计10篇合集-自营版
【01】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-硬件设备实时监控系统运营版发布-本产品基于企业级开源项目Zabbix深度二开-分步骤实现预计10篇合集-自营版
15 0
|
2月前
|
存储 数据采集 Prometheus
Grafana Prometheus Altermanager 监控系统
Grafana、Prometheus 和 Alertmanager 是一套强大的开源监控系统组合。Prometheus 负责数据采集与存储,Alertmanager 处理告警通知,Grafana 提供可视化界面。本文简要介绍了这套系统的安装配置流程,包括各组件的下载、安装、服务配置及开机自启设置,并提供了访问地址和重启命令。适用于希望快速搭建高效监控平台的用户。
141 20
|
2月前
|
Prometheus Cloud Native Linux
Prometheus+Grafana新手友好教程:从零开始搭建轻松掌握强大的警报系统
本文介绍了使用 Prometheus 和 Grafana 实现邮件报警的方案,包括三种主要方法:1) 使用 Prometheus 的 Alertmanager 组件;2) 使用 Grafana 的内置告警通知功能;3) 使用第三方告警组件如 OneAlert。同时,详细描述了环境准备、Grafana 安装配置及预警设置的步骤,确保用户能够成功搭建并测试邮件报警功能。通过这些配置,用户可以在系统或应用出现异常时及时收到邮件通知,保障系统的稳定运行。
151 1
|
3月前
|
缓存 监控 前端开发
在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统
本文深入探讨了在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统。
195 1
|
3月前
|
存储 负载均衡 监控
如何利用Go语言的高效性、并发支持、简洁性和跨平台性等优势,通过合理设计架构、实现负载均衡、构建容错机制、建立监控体系、优化数据存储及实施服务治理等步骤,打造稳定可靠的服务架构。
在数字化时代,构建高可靠性服务架构至关重要。本文探讨了如何利用Go语言的高效性、并发支持、简洁性和跨平台性等优势,通过合理设计架构、实现负载均衡、构建容错机制、建立监控体系、优化数据存储及实施服务治理等步骤,打造稳定可靠的服务架构。
78 1
|
3月前
|
数据库连接 Go 数据库
Go语言中的错误注入与防御编程。错误注入通过模拟网络故障、数据库错误等,测试系统稳定性
本文探讨了Go语言中的错误注入与防御编程。错误注入通过模拟网络故障、数据库错误等,测试系统稳定性;防御编程则强调在编码时考虑各种错误情况,确保程序健壮性。文章详细介绍了这两种技术在Go语言中的实现方法及其重要性,旨在提升软件质量和可靠性。
55 1

热门文章

最新文章