Kafka修炼日志(二):Connect简明使用教程

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Connect是Kafka 0.9版本新增的功能,可以方便的从其它源导入数据到Kafka数据流(指定Topic中),也可以方便的从Kafka数据流(指定Topic中)导出数据到其它源。

Connect是Kafka 0.9版本新增的功能,可以方便的从其它源导入数据到Kafka数据流(指定Topic中),也可以方便的从Kafka数据流(指定Topic中)导出数据到其它源。

     下面结合官方教程详述如何使用File Connector导入数据到Kafka Topic,和导出数据到File:

(1)创建文本文件test.txt,作为其它数据源。

[root@localhost home]# echo -e "connector\ntest" > test.txt

image.gifimage.gif

(2)启动Connect实验脚本,此脚本为官方提供的实验脚本,默认Connector是 File Connector。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

image.gifimage.gif

出现下方错误,是因为文件位置不对,默认应将test.txt文件建立在Kafka目录下,和bin目录同级。

[2017-03-20 13:36:14,879] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:106)

image.gifimage.gif

出现下方错误,是因为Standalone模式Zookeeper会自动停止工作,重启Zookeeper服务器即可,如错误继续出现,重启Kafka服务器即可。

[2017-03-20 13:38:07,832] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)
[2017-03-20 13:38:22,833] ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:304)

image.gifimage.gif

(3)查看导出文件,test.sink.txt,可以看到消费到的消息。

[root@localhost kafka_2.12-0.10.2.0]# cat test.sink.txt
connector
test

image.gifimage.gif

(4)消息已被存储到Topic:connect-test ,也可以启动一个消费者消费消息。

[root@localhost kafka_2.12-0.10.2.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning &

image.gifimage.gif

消费者消费的消息 :

[root@localhost kafka_2.12-0.10.2.0]# {"schema":{"type":"string","optional":false},"payload":"connector"}
{"schema":{"type":"string","optional":false},"payload":"test"}

image.gifimage.gif

(5)编辑文件test.txt,新增一条消息,由于Connector此时已经启动,可以实时的看到消费者消费到的新消息。

[root@localhost kafka_2.12-0.10.2.0]# echo "Another line" >> test.txt

image.gifimage.gif

新的消息,已被实时消费:

[root@localhost kafka_2.12-0.10.2.0]# {"schema":{"type":"string","optional":false},"payload":"connector"}
{"schema":{"type":"string","optional":false},"payload":"test"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}

image.gifimage.gif

 

本文属作者原创,转贴请声明!

相关文章
|
1月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
32 4
|
1月前
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
38 1
|
1月前
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
26 1
|
1月前
|
数据采集 监控 Java
SpringBoot日志全方位超详细手把手教程,零基础可学习 日志如何配置及SLF4J的使用......
本文是关于SpringBoot日志的详细教程,涵盖日志的定义、用途、SLF4J框架的使用、日志级别、持久化、文件分割及格式配置等内容。
100 0
SpringBoot日志全方位超详细手把手教程,零基础可学习 日志如何配置及SLF4J的使用......
|
21天前
|
消息中间件 Kafka API
|
2月前
|
消息中间件 Kafka API
python之kafka日志
python之kafka日志
25 3
|
2月前
|
消息中间件 Kafka API
kafka使用教程
kafka使用教程
|
2月前
|
消息中间件 存储 监控
Kafka的logs目录下的文件都是什么日志?
Kafka的logs目录下的文件都是什么日志?
107 11
|
25天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
42 1
下一篇
无影云桌面