使用E-MapReduce提交Storm作业处理Kafka数据

简介: 本文演示如何在E-MapReduce上部署Storm集群和Kafka集群,并运行Storm作业消费Kafka数据。 环境准备 本文选择在杭州Region进行测试,版本选择EMR-3.

本文演示如何在E-MapReduce上部署Storm集群和Kafka集群,并运行Storm作业消费Kafka数据。

环境准备

本文选择在杭州Region进行测试,版本选择EMR-3.8.0,本次测试需要的组件版本有:

  • Kafka:2.11_1.0.0
  • Storm: 1.0.1

本文使用阿里云EMR服务自动化搭建Kafka集群,详细过程请参考创建集群

以上版本依赖包经过测试可用,如果你再测试过程中引入了其他依赖,也一同添加在Storm lib中,具体操作如下:

上述操作需要在Kafka集群的每台机器执行一遍。执行完在E-MapReduce控制台重启Storm服务,如下:

查看操作历史,待Storm重启完毕:

开发Storm和Kafka作业

E-MapReduce已经提供了现成的示例代码,直接使用即可,地址如下:
e-mapreduce-demo
e-mapreduce-sdk
Topic数据准备
登录到Kafka集群
创建一个test topic,分区数10,副本数2

/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:/kafka-1.0.0 --topic test --create

向test topic写入100条数据

/usr/lib/kafka-current/bin/kafka-producer-perf-test.sh --num-records 100 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test

说明 以上命令在kafka集群的emr-header-1节点执行,当然也可以客户端机器上执行。
运行Storm作业
登录到Hadoop集群,将第二步中编译得到的
examples-1.1-shaded.jar拷贝到集群emr-header-1上,这里我放在root根目录下面。提交作业:

/usr/lib/storm-current/bin/storm jar examples-1.1-shaded.jar com.aliyun.emr.example.storm.StormKafkaSample test aaa.bbb.ccc.ddd hdfs://emr-header-1:9000 sample

  • 查看作业运行

    • 查看Storm运行状态
      查看集群上服务的WebUI有2种方式:

      本文选择使用SSH隧道方式,访问地址:
      http://localhost:9999/index.html 。可以看到我们刚刚提交的Topology。点进去可以看到执行详情:

      • 查看HDFS输出
      • 查看HDFS文件输出
        [root@emr-header-1 ~]# hadoop fs -ls /foo/
        -rw-r--r--   3 root hadoop     615000 2018-02-11 13:37 /foo/bolt-2-0-1518327393692.txt
        -rw-r--r--   3 root hadoop     205000 2018-02-11 13:37 /foo/bolt-2-0-1518327441777.txt
        [root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l
        200
        
        向kafka写120条数
        [root@emr-header-1 ~]# /usr/lib/kafka-current/bin/kafka-producer-perf-test.sh --num-records 120 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test
        120 records sent, 816.326531 records/sec (0.80 MB/sec), 35.37 ms avg latency, 134.00 ms max latency, 35 ms 50th, 39 ms 95th, 41 ms 99th, 134 ms 99.9th
        
        查看HDFS文件输出
        [root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l
        320
        

        总结

        至此,我们成功实现了在E-MapReduce上部署一套Storm集群和一套Kafka集群,并运行Storm作业消费Kafka数据。当然,E-MapReduce也支持Spark Streaming和Flink组件,同样可以方便在Hadoop集群上运行,处理Kafka数据。

        说明
        由于E-MapReduce没有单独的Storm集群类别,所以我们是创建的Hadoop集群,并安装了Storm组件。如果你在使用过程中用不到其他组件,可以很方便地在E-MapReduce管理控制台将那些组件停掉。这样,可以将Hadoop集群作为一个纯粹的Storm集群使用。
        (本文作者为阿里云大数据产品文档工程师)
  • 相关文章
    |
    11月前
    |
    消息中间件 存储 缓存
    kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
    Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
    |
    数据采集 分布式计算 Hadoop
    使用Hadoop MapReduce进行大规模数据爬取
    使用Hadoop MapReduce进行大规模数据爬取
    |
    消息中间件 存储 运维
    为什么说Kafka还不是完美的实时数据通道
    【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
    488 1
    |
    消息中间件 Java Kafka
    Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
    Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
    362 1
    |
    消息中间件 Java Kafka
    Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
    【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
    1259 9
    |
    消息中间件 监控 Kafka
    实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    SQL 分布式计算 关系型数据库
    Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
    Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
    338 0
    |
    SQL 分布式计算 关系型数据库
    Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
    Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
    215 0
    |
    SQL 分布式计算 关系型数据库
    Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
    Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
    256 0
    |
    消息中间件 负载均衡 Java
    "Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
    【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
    264 3