logstash整合kafka

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: logstash整合kafka

今天要搞一搞的,是把logstash和kafka整合起来,由于我们使用的是logstash1.5.0+版本,此版本下官方已经提拱了plugin用来整合kafka,这篇文章的目的就是简单的搭建这么一个环境。

kafka的安装

kafka是基于scala实现的,scala是一种jvm语言,也就是说你得先装jdk,我就不从jdk开始介绍如何安装了,我们直接开始安装kafka,其实这玩意儿官方提供了编译好的版本,你只需要下载,解压,运行即可~

当然,如果想搭建集群,还是需要了解一下kafka的配置的,这不是我们关注的重点,so,我们就简单地先跑起来它吧:

  1. 下载
  2. 解压,我们解压在/usr/local
  3. 运行,为了测试方便,我们需要一共开启四个终端窗口:

首先,我们要运行zookeeper,kafka自带了zookeeper,所以我们无需下载,只需要在/usr/local/kafka目录下执行:

bin/zookeeper-server-start.sh config/zookeeper.properties

然后,再开启一个终端,执行:

bin/kafka-server-start.sh config/server.properties

这样,我们的kafka就已经运行起来了,不过还不是集群环境,只有一个borker哟~但是,我们测试足够了。

再然后,开启一个新的终端,执行:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

这样我们就创建了一个用于测试的topic,接下来继续执行:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

该命令执行完毕后会阻塞终端,你可以随便输入一些数据,每一行都相当于一个消息,会发送给kafka。

最后,再再开启一个新终端,执行:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

你会看到你之前输入的那些消息都会显示在终端中,这就完成了kafka的测试环境搭建。

值得注意的是,上面消息生产者的命令,需要参数broker-list,也就是说我们的kafka生产者必须自己知道所有的kafka broker的位置,而其它命令则只需要填写zookeeper的位置即可,我不清楚这样做的用意是什么,我只是隐约感到有些问题(如果broker出现扩容,如何更新应用代码中的borker信息??),但这不是我们本次的关注的重点。

logstash-->kafka

我们现在想要干的,是从logstash的Shipper中收集到的数据发送给kafka,所以我们需要安装logstash-output-kafka插件。

但是由于未知原因,我试图安装插件时却碰到了报错:

[root@kazaff logstash-1.5.0]# bin/plugin install logstash-output-kafka
Validating logstash-output-kafka
Plugin logstash-output-kafka does not exist
ERROR: Installation aborted, verification failed for logstash-output-kafka 

不像是被墙了的味道,因为提醒的是不存在,而不是网络连接超时。本来还想搭建一个翻墙环境,后来执行了一下这个命令:

bin/plugin list  

竟然发现kafka插件已经预装好了,我也是醉了。OK,我们可以继续了,接下来就是配置一下logstash:

input {
    stdin{}
}


output {
    kafka {
        broker_list => "localhost:9092"
        topic_id => "test"
        compression_codec => "snappy"
    }
}

不多做解释了,在终端运行logstash后,就可以直接输入“helloworld”测试一下了,如果没有问题的话,你将会在之前的kafka消费者终端中看到输出:

{"message":"hello world!","@version":"1","@timestamp":"2015-06-11T10:01:21.183Z","host":"kazaff"}

就是这么简单啦~

参考

Kafka快速入门

Logstash入门教程 - 启动命令行参数及插件安装

logstash-input-file以及logstash-output-kafka插件性能测试

相关文章
|
SQL 消息中间件 存储
Hue和Kibana有什么区别?Logstash和kafka区别?
本文分享几款hive可视化工具,然后区分一下Hue和Kibana,区分一下Logstash和kafka
180 1
|
1月前
|
消息中间件 监控 Kafka
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
74 3
|
消息中间件 关系型数据库 MySQL
Logstash接收Kafka数据写入至ES
Logstash接收Kafka数据写入至ES
300 0
|
消息中间件 数据采集 关系型数据库
logstash集成kafka,mysql实现数据采集
logstash是一个非常灵活好用的数据采集框架工具,可以通过简单的配置满足绝大多数数据采集场景的需求。 采集数据一个非常典型的场景就是将数据先放到kafka队列里削峰,然后从kafka队列里读取数据到mysql或其他存储系统中进行保存。
207 0
|
消息中间件 Java Kafka
filebeat+kafka+logstash+elasticsearch+kibana实现日志收集解决方案
filebeat+kafka+logstash+elasticsearch+kibana实现日志收集解决方案
167 0
|
消息中间件 分布式计算 Kafka
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
339 0
Rocketmq、Rabbitmq、Kafka、Mongo、Elasticsearch、Logstash、Kibana、Nacos、Skywalking、Seata、Spark、Zookeeper安装
|
消息中间件 Java Kafka
Elastic实战:logstash将kafka数据同步到es时,如何将字符串型时间字段转换为时间戳
今天群里有同学问如何将字符串型的时间字段转换为long类型的时间戳。特记录下供后续参考。 原问题: > 我接收数据方传过来的数据,其中有个时间类型是字符串类型,格式为:yyyy-MM-dd hh:mm:ss,我需要转成时间戳保存,我按照网上的方法试了好多种都无法成功转换。 > 数据方把数据发到kafka,我用logstash读kafka,经过处理存到es
330 0
Elastic实战:logstash将kafka数据同步到es时,如何将字符串型时间字段转换为时间戳
|
消息中间件 编解码 JSON
Kafka、Logstash、Nginx日志收集入门
Kafka、Logstash、Nginx日志收集入门
229 0
|
消息中间件 存储 JSON
kafka日志写入logstash
kafka日志写入logstash
796 0
|
28天前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
65 9

相关实验场景

更多