主要从事消息中间件的相关研发工作,著有《RabbitMQ实战指南》。
上接: 1. Kafka解惑之Old Producer(1)—— Beginning 2. Kafka解惑之Old Producer(2)——Sync Analysis 讲述完了Sync模式下的结构脉络,下面就来聊一聊Async的,Async会将客户端所要发送的消息暂存在LinkedBlockingQueue中,然后通过特制的ProducerSendThread来根据条件发送。
上接Kafka解惑之Old Producer(1)—— Beginning 欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。 一下子扩展的有点多,我们还是先回到DefaultEventHandler上来,当调用producer.send方法发送消息的时候,紧接着就是调用DefaultEventHandler的handle方法。
欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。 众所周知,目前Kafka的最新版本已经到达1.0.0,很多公司运行的kafka也大多升级到了0.10.
欢迎支持笔者新书:《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。 Introduction 有很多人问过我这么一类问题:RabbitMQ如何确保消息可靠?很多时候,笔者的回答都是:说来话长的事情何来长话短说。
Kafka从0.8.x版本开始引入副本机制,这样可以极大的提高集群的可靠性和稳定性。不过这也使得Kafka变得更加复杂起来,失效副本就是所要面临的一个难题。
目前的Kafka监控产品有很多,比如Kafka Manager、 Kafka Monitor、KafkaOffsetMonitor、Kafka Web Console、Burrow等,都有各自的优缺点,就个人而言用的最多的还是Kafka Manager,不过这些并不是十分的完美。
KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。
Kafka消息序列化和反序列化
接上一篇:Kafka消息序列化和反序列化(上)。 有序列化就会有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起来只需要配置一下key.deserializer和value.deseriaizer。
Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。
Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器。本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理。
要想更好的学习Kafka,了解其核心原理,阅读源码是必不可少的步骤。本文主要讲述的是如何搭建Kafka的源码环境,主要针对的Windows操作系统下IntelliJ IDEA编译器,其余操作系统或者IDE可以类推。
rabbitmqctl join_cluster {cluster_node} [–ram] 将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。更多详细内容请参考RabbitMQ安装。
本文主要阐述应用与集群相关的一些操作管理命令,包括关闭、重置、开启服务,还有建立集群的一些信息。有关集群搭建更多的信息可以参考RabbitMQ的安装及集群搭建方法。 rabbitmqctl stop [pid_file] 用于停止运行RabbitMQ的Erlang虚拟机和RabbitMQ服务应用。
前面讲述的都是使用rabbitmqctl工具来管理RabbitMQ,有些时候你是否会觉得这种方式是不是不太友好?而且为能够运行rabbitmqctl工具,当前的用户需要拥有访问Erlang cookie的权限,由于服务器可能是以guest或者rabbit用户身份来运行的,因此你需要获得这些文件的访问权限,这样就引申出来一些权限管理的问题。
在RabbitMQ中,用户是访问控制(Access Control)的基本单元,且单个用户可以跨越多个vhost进行授权。针对一至多个vhost,用户可以被赋予不同级别的访问权限,并使用标准的用户名和密码来认证用户。
每一个RabbitMQ服务器都能创建虚拟的消息服务器,我们称之为虚拟主机(virtual host),简称为vhost。每一个vhost本质上是一个独立的小型RabbitMQ服务器,拥有自己独立的队列、交换器以及绑定关系等待,并且它拥有自己独立的权限。
RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。
确保RabbitMQ能够健康的运行还不足以让人放松警惕。考虑这样一种情况:小明为小张创建了一个队列并绑定了一个交换器,之后某人由于疏忽而阴差阳错的删除了这个队列而无人得知,最后小张在使用这个队列的时候就会报出“NOT FOUND”的错误。
本文接RabbitMQ之监控(1)。 不管是通过HTTP API接口还是客户端,获取的数据都是为了提供监控视图之用,不过这一切都基于RabbitMQ服务运行完好的情况下。虽然可以通过某些其他工具或方法来检测RabbitMQ进程是否在运行(如:ps aux | grep rabbitmq),或者5672端口是否开启(如:telnet xxx.xxx.xxx.xxx 5672),但是这样依旧不能真正的评判RabbitMQ是否还具备服务外部请求的能力。
负载均衡的方案有很多,适合RabbitMQ使用的处理HAProxy之外还有LVS。LVS是Linux Virtual Server的简称,也就是Linux虚拟服务器, 是一个由章文嵩博士发起的自由软件项目,它的官方站点是www.linuxvirtualserver.org。
试想下如果前面配置的HAProxy主机192.168.0.9突然宕机或者网卡失效,那么虽然RabbitMQ集群没有任何故障,但是对于外界的客户端来说所有的连接都会被断开,结果将是灾难性的。
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。
rabbitmq_tracing插件相当于Firehose的GUI版本,它同样能跟踪RabbitMQ中消息的流入流出情况。rabbitmq_tracing插件同样会对流入流出的消息做封装,然后将封装后的消息日志存入相应的trace文件之中。
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。
1. 概述 正常情况下,很难观察到RabbitMQ网络分区的发生。为了更好的理解网络分区,需要某些手段将其模拟出来,以便对其做相应的分析处理,进而在正式应用环境中遇到类似情形可以处理的游刃有余。
如果在使用RabbitMQ的过程中出现了异常情况,通过翻阅RabbitMQ的服务日志可以让你在处理异常的过程中事半功倍。RabbitMQ日志中会有明确的事件日期、事件内容以及事件等级等。
网络分区的意义 RabbitMQ的模型类似交换机模型,且采用erlang这种电信网络方面的专用语言实现。RabbitMQ集群是不能跨LAN部署(如果要WAN部署需要采用专门的插件)的,也就是基于网络情况良好的前提下运行的。
如果你一直使用RabbitMQ作为业务的消息中间件,难免会遇到网络分区(Network Partitions)的故障,也许你当时会束手无策,一脸懵逼,不过希望在看完这篇文章之后,能给你一点解决网络分区的思路。
在某些情况下,整个应用的瓶颈不在于CPU或者磁盘,而是受网络带宽的影响。当然你可以选择在业务代码中对每一条消息做压缩处理,之后再发送到kafka中,之后业务消费端再进行解压处理,这种方式对应消息的压缩效率是非常低。
1.概述 对于RabbitMQ运维层面来说,扩容和迁移是必不可少。扩容比较简单,一般往集群中加入新的机器节点即可,不过新的机器节点中是没有消息的,如果想要新加入的节点能快速的存储消息还是需要做点小手术的。
本文翻译RabbitMQ官方文档:Highly Available (Mirrored) Queues,原文地址:http://www.rabbitmq.com/ha.html。(翻译水平有限,不喜轻喷~~) 高可用(镜像)队列 默认情况下,queues存放在RabbitMQ集群的单个节点之上。
概述 Kafka端到端审计是指生产者生产的消息存入至broker,以及消费者从broker中消费消息这个过程之间消息个数及延迟的审计,以此可以检测是否有数据丢失,是否有数据重复以及端到端的延迟等。
本篇翻译的是RabbitMQ官方文档关于API的内容,原文链接:http://www.rabbitmq.com/api-guide.html。博主对其内容进行大体上的翻译,有些许部分会保留英文,个人觉得这样更加有韵味,如果全部翻译成中文,会存在偏差,文不达意(主要是功力浅薄~~)。
1 概述 Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成。
RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点来充实一下。
在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。
ChannelN是整个RabbitMQ客户端最核心的一个类了,其包含的功能点甚多,这里需要分类阐述。 首先来看看ChannelN的成员变量: private final Map _consumers = Collections.
AMQPImpl类包括AMQP接口(public class AMQImpl implements AMQP)主要囊括了AMQP协议中的通信帧的类别。 这里以Connection.Start帧做一个例子。
AMQCommand是用来处理AMQ命令的,其包含了Method, Content Heaeder和Content Body. 下面是通过wireshark抓包的AMQP协议 上图中的Basic.Publish命令就包含Method, Content header以及Content body。
AMQChannel是一个抽象类,是ChannelN的父类。其中包含唯一的抽象方法: /** * Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand.
Frame是指AMQP协议层面的通信帧(一个正式定义的连接数据包)。 我们来看下Frame类中的成员变量有哪些: /** Frame type code */ public final int type; /** Frame channel number, 0-655...
关于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1.. _channelMax)。
上一篇文章([一]RabbitMQ-客户端源码之ConnectionFactory)中阐述了conn.start()方法完成之后客户端就已经和broker建立了正常的连接,而这个Connection的关键就在于这个start()方法之内,下面我们来慢慢分析。
首先看一段amqp-client发送端的示例代码(展示出主要部分): ConnectionFactory factory = new ConnectionFactory(); factory.
概述 消息中间件有很多种,进程也会拿几个来对比对比,其中一种对比项就是消费模式。消息的消费模式分Push,Push两种,或者两者兼具。RabbitMQ的消费模式就是兼具Push和Pull。
RabbitMQ作为一个工业级的消息中间件,肯定是缺少不了监控的,RabbitMQ提供了WEB版的页面监控(访问地址:http://xxx.xxx.xxx.xxx:15672/,默认端口号是15672。
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。 为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。
1.概述 在实际工作中会经常遇到一些bug,有些就需要用到文件句柄,文件描述符等概念,比如报错: too many open files, 如果你对相关知识一无所知,那么debug起来将会异常痛苦。
前几天遇到一个bug,查看发送日志发现java.io.IOException: Broken pipe的错误,通过深入了解发现当kafka producer发送的消息体大于Broker配置的默认值时就会报这个异常。