使用E-MapReduce服务将Kafka数据导入OSS

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服...

概述

kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服务集成了大量开源组件和阿里云产品的对接工具,所以本文直接在E-MapReduce集群上运行这个例子。

这个例子使用开源的Flume工具作为中转,将kafka和OSS连接起来。Flume开源组件将来也可能出现在E-MapReduce平台上。

场景举例

下面举一个最简单的例子,如果已经有一个线上的Kafka集群,则可以直接跳到第4步。

  1. 在Kafka Home目录下启动Kafka服务进程,配置文件中Zookeeper的地址配置为E-MapReduce自带的服务地址 emr-header-1:2181
    bin/kafka-server-start.sh config/server.properties
  2. 创建一个Kafka的topic,名字为test
    bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 1 --partitions 1 --topic test
  3. 向Kafka test topic内写入数据,数据内容为本机的性能监控数据
    vmstat 1 | bin/kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
  4. 在Flume Home目录下配置并启动Flume服务

新建一个配置文件:conf/kafka-example.conf。其中source指定为kafka的对应topic,sink使用HDFS Sinker,并且路径指定为OSS的路径。因为E-MapReduce服务为我们实现了一个高效的OSS FileSystem(兼容Hadoop FileSystem),所以可以直接指定OSS路径,HDFS Sinker自动将数据写入OSS。

# Name the components on this agent
a1.sources = source1
a1.sinks = oss1
a1.channels = c1

# Describe/configure the source
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.zookeeperConnect = emr-header-1:2181
a1.sources.source1.topic = test
a1.sources.source1.groupId = flume
a1.sources.source1.channels = c1
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.kafka.consumer.timeout.ms = 100

# Describe the sink
a1.sinks.oss1.type = hdfs
a1.sinks.oss1.hdfs.path = oss://emr-examples/kafka/%{topic}/%y-%m-%d
a1.sinks.oss1.hdfs.rollInterval = 10
a1.sinks.oss1.hdfs.rollSize = 0
a1.sinks.oss1.hdfs.rollCount = 0
a1.sinks.oss1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.oss1.channel = c1

启动Flume服务:

bin/flume-ng agent --conf conf --conf-file conf/kafka-example.conf --name a1 -Dflume.root.logger=INFO,console --classpath '/usr/lib/hadoop-current/share/hadoop/tools/lib/*'

从日志中可以看到Flume HDFS sinker将数据写到了OSS,并且是每10秒钟轮转一次。

2016-12-05 18:41:04,794 (hdfs-oss1-call-runner-1) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming oss://emr-perform/kafka/test/16-12-05/Flume
Data.1480934454657.tmp to oss://emr-perform/kafka/test/16-12-05/FlumeData.1480934454657
2016-12-05 18:41:04,852 (hdfs-oss1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

备注:如果遇到如下 Exception,是因为Flume自带的httpclient jar包和EMR冲突:

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchFieldError: INSTANCE

通过删除Flume自带的 httpclient jar 包可以避免冲突(统一使用EMR Hadoop带的httpclient):
rm ~/apache-flume-1.8.0-bin/lib/httpclient-4.2.1.jar

查看OSS上的结果

$ hadoop fs -ls oss://emr-examples/kafka/test/16-12-05/
Found 6 items
-rw-rw-rw-   1     162691 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934394566
-rw-rw-rw-   1        925 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934407580
-rw-rw-rw-   1       1170 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934418597
-rw-rw-rw-   1       1092 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934430613
-rw-rw-rw-   1       1254 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638
-rw-rw-rw-   1        588 2016-12-05 18:41 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934454657

$ hadoop fs -cat oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638 
 0  0      0 1911216  50036 1343828    0    0     0     0 1341 2396  1  1 98  0  0
 0  0      0 1896964  50052 1343824    0    0     0   112 1982 2511 15  1 84  0  0
 1  0      0 1896552  50052 1343828    0    0     0    76 2314 3329  3  4 94  0  0
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 5  0      0 1903016  50052 1343828    0    0     0     0 2277 3249  2  4 94  0  0
 0  0      0 1902892  50052 1343828    0    0     0     0 1417 2366  5  0 95  0  0
 0  0      0 1902892  50052 1343828    0    0     0     0 1072 2243  0  0 99  0  0
 0  0      0 1902892  50068 1343824    0    0     0   144 1275 2283  1  0 99  0  0
 1  0      0 1903024  50068 1343828    0    0     0    24 1099 2071  1  1 99  0  0
 0  0      0 1903272  50068 1343832    0    0     0     0 1294 2238  1  1 99  0  0
 1  0      0 1903412  50068 1343832    0    0     0     0 1024 2094  1  0 99  0  0
 2  0      0 1903148  50076 1343836    0    0     0    68 1879 2766  1  1 98  0  0
 1  0      0 1903288  50092 1343840    0    0     0    92 1147 2240  1  0 99  0  0
 0  0      0 1902792  50092 1343844    0    0     0    28 1456 2388  1  1 98  0  0

参考资料

  1. http://kafka.apache.org/quickstart
  2. https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
148 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
68 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
400 9
|
5月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
214 0
|
5月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
|
5月前
|
消息中间件 SQL Java
实时数仓 Hologres产品使用合集之如何用python将kafka数据写入
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
5月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
66 0
|
5月前
|
消息中间件 缓存 Kafka
图解Kafka:架构设计、消息可靠、数据持久、高性能背后的底层原理
【8月更文挑战第15天】在构建高吞吐量和高可靠性的消息系统时,Apache Kafka 成为了众多开发者和企业的首选。其独特的架构设计、消息可靠传输机制、数据持久化策略以及高性能实现方式,使得 Kafka 能够在分布式系统中大放异彩。本文将通过图解的方式,深入解析 Kafka 的这些核心特性,帮助读者更好地理解和应用这一强大的消息中间件。
230 0