Flink on yarn 实时日志收集到 kafka 打造日志检索系统

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 背景在 Flink on yarn 的模式下,程序运行的日志会分散的存储在不同的 DN 上,当 Flink 任务发生异常的时候,我们需要查看日志来定位问题,一般我们会选择通过 Flink UI 上面的 logs 来查看日志,或者登录到对应的服务器上去查看,但是在任务日志量非常大的情况下,生成的日志文件就非常多,这对于我们排查问题来说,就造成了很大的不便,所以,我们需要有一种统一的日志收集,检索,展示的方案来帮忙我们快速的分析日志,定位问题.

点击上方"JasonLee实时计算",选择"设为星标"


再也不用担心错过重要文章


后台回复"监控",获取 grafana 监控 Flink 最新的模板


背景


在 Flink on yarn 的模式下,程序运行的日志会分散的存储在不同的 DN 上,当 Flink 任务发生异常的时候,我们需要查看日志来定位问题,一般我们会选择通过 Flink UI 上面的 logs 来查看日志,或者登录到对应的服务器上去查看,但是在任务日志量非常大的情况下,生成的日志文件就非常多,这对于我们排查问题来说,就造成了很大的不便,所以,我们需要有一种统一的日志收集,检索,展示的方案来帮忙我们快速的分析日志,定位问题.


那么我们很容易就能想到 ELK 分布式日志收集解决方案 ELK 是 Elasticsearch、Logstash、Kibana 的简称,通过 Logstash 把日志同步到 Elasticsearch 然后在 Kibana 上图形界面展示 ES 中日志信息,这样就可以检索日志,快速的定位问题.那么第一个问题就来了,我们如何收集分布式日志? 传统的做法是在服务器上部署 flume 或者 filebeat 组件来收集日志,但是在生产环境上,我们可能会有上千台甚至上万台服务器,如果每一台机器都部署  flume 或者 filebeat 组件的话显得笨重且麻烦,而且如果后面增加机器的话,还需要在新增的机器上部署,并且延迟也比较大,这种方案的缺点非常明显,这显然是不能接受的,那有没有更简单,更友好的实时方案来收集这些日志呢? 我们是否可以把日志直接收集到 kafka 呢? 答案是肯定的,现在大多数项目(包括 Flink)都会选择log4j、slg4j 来进行 log 记录,所以可以利用 log4j(log4j2) KafkaAppender 把日志直接打到 kafka 里.这样既简化了繁琐的配置,又降低了延迟.下面就来看看具体的配置.


在 Flink 1.11.0 之前 Flink 使用的日志是 Log4j. 在 1.11.0 之后使用的是 Log4j2. 这两者的配置稍有不同,下面就分别介绍一下.


log4j 配置 (Flink 1.11.0 之前)


log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.brokerList=master:9092,storm1:9092,storm2:9092
log4j.appender.kafka.topic=flink_log_test
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.requiredNumAcks=0
log4j.appender.kafka.syncSend=true
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
# 自定义日志格式
log4j.appender.kafka.layout.ConversionPattern={"log_level":"%p",\
  "log_timestamp":"%d{ISO8601}",\
  "log_package":"%C",\
  "log_thread":"%t",\
  "log_file":"%F",\
  "log_line":"%L",\
  "log_message":"%m",\
  "log_path":"%X{log_path}",\
  "flink_job_name":"${sys:flink_job_name}"}
log4j.appender.kafka.level=INFO
# for package com.demo.kafka, log would be sent to kafka appender.
log4j.logger.kafka=INFO
# 打印源为kafka时指定log默认打印级别
log4j.logger.org.apache.kafka=WARN
# 日志的布局格式
#log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
## 添加自定义参数 k:v 格式,如果有多个 , 隔开
#log4j.appender.kafka.layout.UserFields=flink_job_name:${sys:flink_job_name},yarnContainerId:${sys:yarnContainerId}


为了简化下游的处理,我们需要把日志格式化成 JSON 格式,这里有两种方案,第一种是自己拼接一个 JSON 字符串,第二种是利用官方提供的 net.logstash.log4j.JSONEventLayoutV1 来格式化,如果这两种方案都不能满足你的需求,你可以自己定义 appender 继承 AppenderSkeleton 即可.这里还有另外一个问题,我们如何区分不同任务的日志呢?,如果运行多个 Flink 应用程序的话,多个 container 可能会运行在同一个机器上,那么就没有办法区分日志是哪个任务打的,所以我们这里利用 UserFields 添加了两个自定义的字段用来区分日志 flink_job_name 和 yarnContainerId,这样的话日志就非常清晰了.后面也可以根据 flink_job_name 来检索,所以这里还需要设置一个系统属性 yarnContainerId 让 log4j 可以解析到环境变量里的 yarnContainerId,  Flink 默认是没有加这个属性的,所以需要我们自己添加.


flink-conf.yaml 配置


添加下面两行即可,这样就可以拿到 containerId.


env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID
env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID
log4j2 配置(Flink 1.11.0 之后)
# kafka appender config
rootLogger.appenderRef.kafka.ref = Kafka
appender.kafka.type=Kafka
appender.kafka.name=Kafka
appender.kafka.syncSend=true
appender.kafka.ignoreExceptions=false
appender.kafka.topic=flink_log_test
appender.kafka.property.type=Property
appender.kafka.property.name=bootstrap.servers
appender.kafka.property.value=master:9092,storm1:9092,storm2:9092
appender.kafka.layout.type=JSONLayout
apender.kafka.layout.value=net.logstash.log4j.JSONEventLayoutV1
appender.kafka.layout.compact=true
appender.kafka.layout.complete=false
appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=logdir
appender.kafka.layout.additionalField1.value=${sys:log.file}
appender.kafka.layout.additionalField2.type=KeyValuePair
appender.kafka.layout.additionalField2.key=flink_job_name
appender.kafka.layout.additionalField2.value=${sys:flink_job_name}
appender.kafka.layout.additionalField3.type=KeyValuePair
appender.kafka.layout.additionalField3.key=yarnContainerId
appender.kafka.layout.additionalField3.value=${sys:yarnContainerId}
# 自定义布局格式
#appender.kafka.layout.type=PatternLayout
#appender.kafka.layout.pattern={"log_level":"%p","log_timestamp":"%d{ISO8601}","log_thread":"%t","log_file":"%F", "log_line":"%L","log_message":"'%m'","log_path":"%X{log_path}","job_name":"${sys:flink_job_name}"}%n


log4j2 同样也可以自定义 JSON 字符串或者利用 JSONEventLayoutV1 格式化日志,添加额外字段和 log4j 不太一样,需要通过 appender.kafka.layout.additionalField1 来添加,格式如下:


appender.kafka.layout.additionalField1.type=KeyValuePair
appender.kafka.layout.additionalField1.key=logdir
appender.kafka.layout.additionalField1.value=${sys:log.file}


这里同样也是添加了 flink_job_name,yarnContainerId 字段,还加了 logdir 字段,这样就可以看到完整的日志路径了.如果还需要更多的信息也可以自己添加.


提交任务


# 第一个任务
flink run -d -m yarn-cluster \
-Dyarn.application.name=test \
-Dyarn.application.queue=flink \
-Dmetrics.reporter.promgateway.groupingKey="jobname=test" \
-Dmetrics.reporter.promgateway.jobName=test \
-c flink.streaming.FlinkStreamingDemo \
-Denv.java.opts="-Dflink_job_name=test" \
/home/jason/bigdata/flink/flink-1.13.2/flink-1.13.0-1.0-SNAPSHOT.jar
# 第二个任务
flink run -d -m yarn-cluster \
-Dyarn.application.name=test1 \
-Dyarn.application.queue=spark \
-Dmetrics.reporter.promgateway.groupingKey="jobname=test1" \
-Dmetrics.reporter.promgateway.jobName=test1 \
-c flink.streaming.FlinkStreamingDemo \
-Denv.java.opts="-Dflink_job_name=test1" \
/home/jason/bigdata/flink/flink-1.13.2/flink-1.13.0-1.0-SNAPSHOT.jar


这里需要注意的是,flink_job_name 也需要通过 -Dflink_job_name=test 方式设置一下.然后来消费一下 flink_log_test 这个 topic 看看日志数据如下所示:


{
    "thread":"Checkpoint Timer",
    "level":"INFO",
    "loggerName":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
    "message":"Triggering checkpoint 7 (type=CHECKPOINT) @ 1629016409942 for job dbb2fb501566711e3ba3a0feca2bcd59.",
    "endOfBatch":false,
    "loggerFqcn":"org.apache.logging.slf4j.Log4jLogger",
    "instant":{
        "epochSecond":1629016409,
        "nanoOfSecond":948000000
    },
    "threadId":70,
    "threadPriority":5,
    "logdir":"/home/jason/bigdata/hadoop/hadoop-2.9.0/logs/userlogs/application_1629044405912_0003/container_1629044405912_0003_01_000001/jobmanager.log",
    "flink_job_name":"test",
    "yarnContainerId":"container_1629044405912_0003_01_000001"
}
{
    "thread":"jobmanager-future-thread-1",
    "level":"INFO",
    "loggerName":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator",
    "message":"Completed checkpoint 5 for job a1b2a78965da9340168ff964a92729a0 (50960 bytes in 57 ms).",
    "endOfBatch":false,
    "loggerFqcn":"org.apache.logging.slf4j.Log4jLogger",
    "instant":{
        "epochSecond":1629016456,
        "nanoOfSecond":304000000
    },
    "threadId":52,
    "threadPriority":5,
    "logdir":"/home/jason/bigdata/hadoop/hadoop-2.9.0/logs/userlogs/application_1629044405912_0004/container_1629044405912_0004_01_000001/jobmanager.log",
    "flink_job_name":"test1",
    "yarnContainerId":"container_1629044405912_0004_01_000001"
}


可以看到我们增加的 3 个字段都能正常显示.至此,我们的应用程序日志最终都保存在 Kafka 中.然后就可以接 ELK 这套框架了,今天先写到这里,后面有时间的话,会继续更新后面的部分.


相关文章
|
11天前
|
消息中间件 存储 监控
Kafka的logs目录下的文件都是什么日志?
Kafka的logs目录下的文件都是什么日志?
30 11
|
1月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
77 2
|
27天前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
1月前
|
资源调度 Oracle Java
实时计算 Flink版产品使用问题之在YARN集群上运行时,如何查看每个并行度的详细处理数据情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 资源调度 数据处理
实时计算 Flink版产品使用问题之-s参数在yarn-session.sh命令中是否有效
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
关系型数据库 MySQL Java
为什么 Flink 无法实时写入 MySQL?
Flink 1.10 使用 flink-jdbc 连接器的方式与 MySQL 交互,读数据和写数据都能完成,但是在写数据时,发现 Flink 程序执行完毕之后,才能在 MySQL 中查询到插入的数据。即,虽然是流计算,但却不能实时的输出计算结果?
为什么 Flink 无法实时写入 MySQL?
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
26天前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
1月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。