storm从入门到放弃(三),放弃使用 StreamId 特性

简介:   序:StreamId是storm中实现DAG有向无环图的重要一个特性,但是从实际生产环境来看,这个功能其实蛮影响生产环境的稳定性的,我们系统在迭代时会带来整体服务的不可用。   StreamId是storm中实现DAG有向无环图的重要一个特性,官方也提供对应的接口实现让开发者自己灵活化构造自己的ADG图。

  序:StreamId是storm中实现DAG有向无环图的重要一个特性,但是从实际生产环境来看,这个功能其实蛮影响生产环境的稳定性的,我们系统在迭代时会带来整体服务的不可用。

  StreamId是storm中实现DAG有向无环图的重要一个特性,官方也提供对应的接口实现让开发者自己灵活化构造自己的ADG图。但是从我这一年从事流式计算的工作中得到的结果也很尴尬的,很多人不知道storm的这一个特性,甚至某些数据中也没有提及。当然这也比较幸运,不知道这个特性就可以少踩点坑了。因为从实际生产环境来看,这个功能其实蛮影响生产环境的稳定性的,为什么这么说,hey,hey,look dowm。

  原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/7283442.html

  

  实际开发中,很多人没有用streamid,其实只是没有显示指定罢了,默认streamid的名称为default,这也就是为什么消息可以由一个bolt发往另一个bolt了。我们自己显示指定streamid可以实现进入某一个bolt的消息,某些消息发给下游的Abolt,另一些消息发给下游的Bbolt。

  比如有这样一个需求砸向你的脸上,有很多其他系统的消息发送到kafka某一个主题中,现在用storm去kafka消费该主题,在bolt-业务这个节点进行消息类型的判断,然后根据判断将消息发送到不同的下游bolt进行处理以便将这些消息发往不同的渠道接口中。这样一个需求我们利用streamid很容易实现,看起来也没有什么问题。关于sreamid的使用可以文章末尾。

  为什么在实际生产我不建议这样使用,生产中经常会面对迭代开发的情况,业务不断的变化,你的代码也要不断的修改,第三方接口的变动,你也要不断的修改与第三方交互的程序。如果这周要修改bolt-微信,然后到发布的时候,你必须停掉整个拓扑任务这明显不是我们想要的,我们期望的是只停掉bolt-微信而不影响其他的业务线。这个时候就会发现这个实现方式很鸡肋的。那我们应该怎么做,看一下我在某信用卡中心的实现方案,看了后,你会替我庆幸我没有为了图前期的简单而采用显示streamid导致后面每该一处很小的功能导致整个拓扑任务不提供服务一段时间。

  我们的系统会收到交易信息,然后根据业务bolt进行处理,然后形成话术推送给不同的渠道bolt,这些渠道bolt对接各个部门(这些部门接受到我们的话术后,将话术推送给微信用户,支付宝用户等),而我们的对外渠道多大15个左右。同时应为业务的不断提出,以及对接部门接口的变化我们这些渠道bolt也要跟随变化。所以我们在业务bolt和渠道bolt中引入了第三方消息系统kafka队列,而不是用storm内部的Disruptor队列。这样原本一个拓扑任务,我们进行拆分为一个业务拓扑,以及多个其渠道拓扑,渠道拓扑与业务拓扑通信通过kafka的主题来协调。如果某一天我们要修改微信渠道的业务,我们只需要停掉微信拓扑即可,整个系统并不会受到影响,原本推送给微信渠道的消息也不会因此丢失它保存在kafka主题中,一旦微信拓扑上线即可立马消费掉。   

  后话,我这样说有点绝对了,具体看系统的情况来权衡。

  streamid在storm中的正确打开方式。

public class ProduceRecordSpout extends BaseRichSpout {

    private static final long serialVersionUID = 1L;

    private SpoutOutputCollector collector;

    private String recordLines;
    private String type;

    public ProduceRecordSpout(String type, String lines) {
        this.type = type;
        recordLines = lines;
    }
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
public void nextTuple() { Utils.sleep(5000); System.out.println("record is "+recordLines); List<Object> values = new Values(type, recordLines); collector.emit(values, values); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("type", "record")); } }

public class DistributeByTypeBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String type = input.getString(0); String word = input.getString(1); switch (type) { case Type.NUMBER: collector.emit("stream-number-saver", input, new Values(type, word)); collector.emit(input, new Values("other", "message coming")); break; case Type.STRING: collector.emit("stream-string-saver", input, new Values(type, word)); collector.emit(input, new Values("other", "message coming")); break; default: collector.emit(input, new Values(type, word)); } collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("stream-number-saver", new Fields("type", "word")); declarer.declareStream("stream-string-saver", new Fields("type", "word")); declarer.declare(new Fields("type", "word")); } }

public class SaveBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("个人微信:intsmaze"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }

public class SaveDefaultBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("个人微博:猥琐发育的码农"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }

public class SaveTwoBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { System.out.println("博客链接:http://www.cnblogs.com/intsmaze/p/7283442.html"+ "SourceComponent=" + input.getSourceComponent() + ", SourceStreamId=" + input.getSourceStreamId() + ", type=" + input.getString(0) + ", value=" + input.getString(1)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class StreamTopologyMain { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout-number", new ProduceRecordSpout(Type.NUMBER, "80966 31"), 1); builder.setSpout("spout-string", new ProduceRecordSpout(Type.STRING, "hello the word"), 1); builder.setBolt("bolt-distributor", new DistributeByTypeBolt(), 2) .shuffleGrouping("spout-number") .shuffleGrouping("spout-string"); builder.setBolt("bolt-number-saver", new SaveBolt(), 1).shuffleGrouping("bolt-distributor", "stream-number-saver"); builder.setBolt("bolt-string-saver", new SaveTwoBolt(), 1).shuffleGrouping("bolt-distributor", "stream-string-saver"); builder.setBolt("bolt-default-saver", new SaveDefaultBolt(), 1).shuffleGrouping("bolt-distributor"); Config conf = new Config(); conf.setDebug(false); String name = StreamTopologyMain.class.getSimpleName(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60 * 60 * 1000); cluster.shutdown(); } } interface Type { String NUMBER = "NUMBER"; String STRING = "STRING"; }

 

作者: intsmaze(刘洋)
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
7月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
585 1
|
7月前
|
运维 监控 Java
面经:Storm实时计算框架原理与应用场景
【4月更文挑战第11天】本文是关于Apache Storm实时流处理框架的面试攻略和核心原理解析。文章分享了面试常见主题,包括Storm的架构与核心概念(如Spout、Bolt、Topology、Tuple和Ack机制),编程模型与API,部署与运维,以及应用场景与最佳实践。通过代码示例展示了如何构建一个简单的WordCountTopology,强调理解和运用Storm的关键知识点对于面试和实际工作的重要性。
448 4
面经:Storm实时计算框架原理与应用场景
|
7月前
|
消息中间件 存储 Apache
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
645 2
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景
|
7月前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
491 0
|
消息中间件 SQL 数据库
跳槽高级开发就这么答:谈谈你对RocketMQ分布式事务原理的理解
有位工作五年的小伙伴在面试的时候被问到RocketMQ的分布式事务实现原理。他说他只知道RocketMQ能够支持事务,但是没有了解过它的事务实现原理。 今天,我给大家分享一下我对这个问题的理解。
112 0
|
消息中间件 存储 缓存
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的系统服务底层原理以及高性能存储设计挖掘深入
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的系统服务底层原理以及高性能存储设计挖掘深入
260 0
精华推荐 | 【深入浅出RocketMQ原理及实战】「性能原理挖掘系列」透彻剖析贯穿RocketMQ的系统服务底层原理以及高性能存储设计挖掘深入
|
存储 NoSQL 算法
【Storm】Storm实战之频繁二项集挖掘(附源码)
针对大叔据实时处理的入门,除了使用WordCount示例之外,还需要相对更深入点的示例来理解Storm,因此,本篇博文利用Storm实现了频繁项集挖掘的案例,以方便更好的入门Storm。
118 0
【Storm】Storm实战之频繁二项集挖掘(附源码)
|
前端开发 数据库 索引
分布式幂等问题解决方案三部曲
# 纲要 文章目的:本文旨在提炼一套分布式幂等问题的思考框架,而非解决某个具体的分布式幂等问题。在这个框架体系内,会有一些方案举例说明。 文章目标:希望读者能通过这套思考框架设计出符合自己业务的完备的幂等解决方案。 文章内容: (1)背景介绍,为什么会有幂等。 (2)什么是幂等,这个定义非常重要,决定了整个思考框架。 (3)解决幂等问题的三部曲,也是作者的思考框架。 (4)总结
4463 0
|
流计算 Java Scala
带你读《Flink原理、实战与性能优化》之二:环境准备
这是一部以实战为导向,能指导读者零基础掌握Flink并快速完成进阶的著作,从功能、原理、实战和调优等4个维度循序渐进地讲解了如何利用Flink进行分布式流式应用开发。作者是该领域的资深专家,现就职于第四范式,曾就职于明略数据。
|
流计算 API Java
带你读《Flink原理、实战与性能优化》之三:Flink编程模型
这是一部以实战为导向,能指导读者零基础掌握Flink并快速完成进阶的著作,从功能、原理、实战和调优等4个维度循序渐进地讲解了如何利用Flink进行分布式流式应用开发。作者是该领域的资深专家,现就职于第四范式,曾就职于明略数据。