正文
一、传统日志收集的弊端
我们知道我们大多数是通过日志,然后判断程序哪里报错了,这样针对日志我们才能对症下一剂猛药。如果在集群环境中,成百上千的服务器,如果报错了,我们如何查找日志呢,一个一个日志文件这样排查么?那可就为难死我们了。
二、ELK收集系统过程
基于Elasticsearch、Logstash、Kibana可以实现分布式日志收集系统,再加上Kibana的可视化系统,对数据进行分析,嗯真香。
在请求过程中创建AOP,拦截请求,然后在Aop方法中开启异步线程,将消息发送到Kafka(单机或者集群),logstash接收kafka的日志,经过消息过滤,然后发送到ElasticSearch系统,然后经过Kibana可视化界面,对日志进行搜索分析等。
三、搭建ELK系统
Zookeeper搭建
Kafka搭建
ElasticSearch搭建
Kibana搭建
Logstash搭建
本文演示基于Docker-compose,所有的均为单机
1、搭建docker-compose
#下载docker-compose文件 sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose #授权 sudo chmod +x /usr/local/bin/docker-compose
2、创建目录
mkdir -p /usr/local/docker-compose/elk
3、在上面目录创建docker-compose.yml文件
version: '2' services: zookeeper: image: zookeeper:latest container_name: zookeper ports: - "2181:2181" kafka: image: wurstmeister/kafka:latest container_name: kafka volumes: - /etc/localtime:/etc/localtime ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.139.160 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_PORT: 9092 KAFKA_LOG_RETENTION_HOURS: 120 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 KAFKA_NUM_PARTITIONS: 3 KAFKA_DELETE_RETENTION_MS: 1000 elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.15.2 restart: always container_name: elasticsearch environment: - discovery.type=single-node #单点启动,实际生产不允许 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ports: - 9200:9200 kibana: image: docker.elastic.co/kibana/kibana:7.15.2 restart: always container_name: kibana ports: - 5601:5601 environment: - elasticsearch_url=http://192.168.139.160:9200 depends_on: - elasticsearch logstash: image: docker.elastic.co/logstash/logstash:7.15.2 volumes: - /data/logstash/pipeline/:/usr/share/logstash/pipeline/ - /data/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml - /data/logstash/config/pipelines.yml:/usr/share/logstash/config/pipelines.yml restart: always container_name: logstash ports: - 9600:9600 depends_on: - elasticsearch
4、启动.
1. #进入docker-compose所在的目录执行 2. [root@localhost elk]# docker-compose up
四、代码
切面类
package com.xiaojie.elk.aop; import com.alibaba.fastjson.JSONObject; import com.xiaojie.elk.pojo.RequestPojo; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; /** * @author xiaojie * @version 1.0 * @description: 日志切面类 * @date 2021/12/5 16:51 */ @Aspect @Component public class AopLogAspect { @Value("${server.port}") private String serverPort; @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 申明一个切点 里面是 execution表达式 @Pointcut("execution(* com.xiaojie.elk.service.*.*(..))") private void serviceAspect() { } @Autowired private LogContainer logContainer; // 请求method前打印内容 @Before(value = "serviceAspect()") public void methodBefore(JoinPoint joinPoint) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); RequestPojo requestPojo = new RequestPojo(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式 requestPojo.setRequestTime(df.format(new Date())); requestPojo.setUrl(request.getRequestURL().toString()); requestPojo.setMethod(request.getMethod()); requestPojo.setSignature(joinPoint.getSignature().toString()); requestPojo.setArgs(Arrays.toString(joinPoint.getArgs())); // IP地址信息 requestPojo.setAddress(getIpAddr(request) + ":" + serverPort); // 将日志信息投递到kafka中 String log = JSONObject.toJSONString(requestPojo); logContainer.put(log); } // 在方法执行完结后打印返回内容 /* @AfterReturning(returning = "o", pointcut = "serviceAspect()") public void methodAfterReturing(Object o) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); JSONObject respJSONObject = new JSONObject(); JSONObject jsonObject = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式 jsonObject.put("response_time", df.format(new Date())); jsonObject.put("response_content", JSONObject.toJSONString(o)); // IP地址信息 jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort); respJSONObject.put("response", jsonObject); logContainer.put(respJSONObject.toJSONString()); }*/ /** * 异常通知 * * @param point */ @AfterThrowing(pointcut = "serviceAspect()", throwing = "e") public void serviceAspect(JoinPoint joinPoint, Exception e) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式 RequestPojo requestPojo = new RequestPojo(); requestPojo.setRequestTime(df.format(new Date())); requestPojo.setUrl(request.getRequestURL().toString()); requestPojo.setMethod(request.getMethod()); requestPojo.setSignature(joinPoint.getSignature().toString()); requestPojo.setArgs(Arrays.toString(joinPoint.getArgs())); // IP地址信息 requestPojo.setAddress(getIpAddr(request) + ":" + serverPort); requestPojo.setError(e.toString()); // 将日志信息投递到kafka中 String log = JSONObject.toJSONString(requestPojo); logContainer.put(log); } public static String getIpAddr(HttpServletRequest request) { //X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。 String ipAddress = request.getHeader("x-forwarded-for"); if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("WL-Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getRemoteAddr(); if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) { //根据网卡取本机配置的IP InetAddress inet = null; try { inet = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); } ipAddress = inet.getHostAddress(); } } //对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割 if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15 if (ipAddress.indexOf(",") > 0) { ipAddress = ipAddress.substring(0, ipAddress.indexOf(",")); } } return ipAddress; } }
异步线程
package com.xiaojie.elk.aop; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; /** * @author xiaojie * @version 1.0 * @description: 开启异步线程发送日志 * @date 2021/12/5 16:50 */ @Component public class LogContainer { private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>(); @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public LogContainer() { // 初始化 new LogThreadKafka().start(); } /** * 存入日志 * * @param log */ public void put(String log) { logDeque.offer(log); } class LogThreadKafka extends Thread { @Override public void run() { while (true) { String log = logDeque.poll(); if (!StringUtils.isEmpty(log)) { // 将消息投递kafka中 kafkaTemplate.send("kafka-log", log); } } } } }
五、验证效果
完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 elk模块