开发者社区> 问答> 正文

filebeat+kafka+ELK分布式日志组件有哪些注意点?

filebeat+kafka+ELK分布式日志组件有哪些注意点?

展开
收起
kun坤 2020-04-23 19:28:09 949 0
1 条回答
写回答
取消 提交回答
  • #filebeat - 打包说明: 镜像在打包时,要添加上filebeat可执行文件(可在官网下载),可以使用supervisor管理服务。 - filebeat配置文件可参考以下例子:

    filebeat.prospectors:
    - input_type: log
      paths: /var/log/nginx.log
      document_type: nginx_log
      fields:
        cluster_name: ${CLUSTER_NAME}
        host: ${HOST}
        log_topics: app1_nginx  # nginx日志
    
    - input_type: log
      paths: /var/applog/*.log
      document_type: applog
      fields:
        cluster_name: ${CLUSTER_NAME}
        host: ${HOST}
        log_topics: app1_log  # 应用日志
    
    output.kafka:
      hosts: ["kafka1:9092","kafka2:9092",...,"kafkaN:9092"] # 取决你的集群节点数
      topic: '%{[fields][log_topics]}'
      partition.round_robin:
        reachable_only: false
      required_acks: 1
      compression: gzip
    

    #logstash - 机器数量: 可以找几台虚机(4c+8G)启动,尽量个数和kafka的节点数一致。 - input和output: kafka(filebeat日志流向的kafka)和es集群 - 配置注意 pipeline.batch.size: 2000 # 达到多少个events后向目标地址输送数据 pipeline.batch.delay: 10 # 等待多少秒向目标地址输送数据

    两个配置不冲突,哪个满足了就触发向目标输送数据,我们的目标地就是es集群。
    至于批量和延迟向目标输送数据应该好理解,避免频繁请求目标地址,导致目标地址高负载。
    
    • 关于filter
    
    一般会用到的有grok(正则切割日志)、json(json解析)、mutate(组合命令remove_field(去除无用字段)等等)
    很多这里不一一介绍了,推荐一个可以在线测试grok语法是否正确的工具:http://grokdebug.herokuapp.com/
    
    
    • 配置样例
    
    input {
        kafka{
            bootstrap_servers => "kafka1:9092,kafka2:9092,...,kafkaN:9092" # 前边的kafka
            auto_offset_reset => "latest"
            group_id => "app1" 
            consumer_threads => 1
            decorate_events => true
            codec => "json"
            topics => ["app1_log"]
        }
    }
    filter {
        if [fields][log_topics] == "app1_log" {
            grok {
                match => {"message" => '(?<time_local>[^\|]*)\|(?<code_line>[^\|]*)\|(?<level>[^\|]*)\|(?<log_json>.*)'}
            }
            mutate {
                gsub => ["log_json", "[\|]", "_"]  # 替换|为_
            }
            json {
                source => "log_json"
                remove_field=>["log_json"]
            }
        }
        # 可以有多个if
        # remove not care field
        mutate
        {
            remove_field => ["field1", "field2"]
        }
    }
    output {
        if [fields][log_topics] == "app1_log" {
            elasticsearch {
                 hosts => ["es1:9200", "es2:9200",..,"esN:9200"]
                index => "app1_log-%{+YYYY.MM.dd}"
            }
        }
        # 可以有多个if
    }
    
    

    关于es

    注意点: 注意修改number_of_shards数量等于节点数,es的number_of_shards默认为5

     跳过一次坑,没有修改number_of_shards,虽然机器多,但是日志散落不均匀导致总有es的某几个
    节点负载比较高,其他的却很清闲。
    
    2020-04-23 19:31:01
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载
基于ELK的大数据平台实践分享 立即下载