1 前言
当下消息中间件盛行,主要提供了异步、解耦、削峰平谷的作用,其实早期Java本身也提供了类似的消息服务,在研习各个消息中间件之前应该先了解下这个历史。
2 JMS简介(取自百度百科
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。
Java 消息服务(Java Message Service,JMS)是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。
JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
通信传递的消息交换了计算机之间至关重要的数据——而非用户之间——并且包含了例如事件通知和服务请求之类的信息。通信通常用来协调在不同的系统中或是用不同的编程语言所写的程序。
使用JMS接口,程序员可以调用IBM的MQSeries,Progress Software的SonicMQ和其他流行通信产品商家的消息服务。另外,JMS支持包含串行Java对象的消息和包含可扩展标记语言(XML)页面的消息。
总之,JMS自身并不是一种消息传送系统;它是消息传送客户端与消息传送系统通信时所需接口和类的一个抽象。与JDBC访问数据库,JNDI抽象访问命令目录接口服务一样,JMS抽象可以访问消息服务提供者,使用JMS,应用程序的消息传送客户端可以实现跨消息服务器产品的移植。JMS规范是一个具有单向优势的,健壮的规范,包括了一组丰富的消息传送语义,并和简单而灵活的API结合,用于将消息传送合并到应用程序中。
3 JMS的消息模型
JMS具有两种通信模式:
1、Point-to-Point Messaging Domain (点对点)
2、Publish/Subscribe Messaging Domain (发布/订阅模式)
任何JMS的提供者可以实现其中的一种或两种模型,这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。
(1)、Point-to-Point Messaging Domain(点对点通信模型)
a、模式图:
b、涉及到的概念:
在点对点通信模式中,应用程序由消息队列,发送方,接收方组成。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
c、特点:
- • 每个消息只要一个消费者
- •
发送者和接收者在时间上是没有时间的约束,也就是说发送者在发送完消息之后,不管接收者有没有接受消息,都不会影响发送方发送消息到消息队列中。 - •
发送方不管是否在发送消息,接收方都可以从消息队列中去到消息(The receiver can fetch message whether it is running or not when the sender sends the message) - •
接收方在接收完消息之后,需要向消息队列应答成功
(2)、Publish/Subscribe Messaging Domain(发布/订阅通信模型)
a、模式图:
b、涉及到的概念:
在发布/订阅消息模型中,发布者发布一个消息,该消息通过topic传递给所有的客户端。该模式下,发布者与订阅者都是匿名的,即发布者与订阅者都不知道对方是谁。并且可以动态的发布与订阅Topic。Topic主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。
c、特点:
- • 一个消息可以传递个多个订阅者(即:一个消息可以有多个接受方)
- •
发布者与订阅者具有时间约束,针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。 - •
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
4 JMS编程模型
- 1.管理对象(Administered objects)-连接工厂(Connection Factories)和目的地(Destination)
- 2.连接对象(Connections)
- 3.会话(Sessions)
- 4.消息生产者(Message Producers)
- 5.消息消费者(Message Consumers)
- 6.消息监听者(Message Listeners)
(1)、Connection Factories
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。客户端使用一个连接工厂对象连接到JMS服务提供者,它创建了JMS服务提供者和客户端之间的连接。JMS客户端(如发送者或接受者)会在JNDI名字空间中搜索并获取该连接。使用该连接,客户端能够与目的地通讯,往队列或话题发送/接收消息。
QueueConnectionFactory queueConnFactory = (QueueConnectionFactory) initialCtx.lookup ("primaryQCF"); Queue purchaseQueue = (Queue) initialCtx.lookup ("Purchase_Queue"); Queue returnQueue = (Queue) initialCtx.lookup ("Return_Queue");
(2)、Destination
目的地指明消息被发送的目的地以及客户端接收消息的来源。JMS使用两种目的地,队列和话题。如下代码指定了一个队列和话题:
创建一个队列Session:
QueueSession ses = con.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); //get the Queue object Queue t = (Queue) ctx.lookup ("myQueue"); //create QueueReceiver QueueReceiver receiver = ses.createReceiver(t);
(3)、Connection
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
连接对象封装了与JMS提供者之间的虚拟连接,如果我们有一个ConnectionFactory对象,可以使用它来创建一个连接。
Connection connection = connectionFactory.createConnection();
(4)、Session
Session 是我们对消息进行操作的接口,可以通过session创建生产者、消费者、消息等。Session 提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。
我们可以在连接创建完成之后创建session:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
这里面提供了参数两个参数,第一个参数是是否支持事务,第二个是事务的类型
(5)、Producter
消息生产者由Session创建,用于往目的地发送消息。生产者实现MessageProducer接口,我们可以为目的地、队列或话题创建生产者;
MessageProducer producer = session.createProducer(dest); MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(topic);
(6)、Consumer
消息消费者由Session创建,用于接收被发送到Destination的消息。
MessageConsumer consumer = session.createConsumer(dest); MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(topic);
(7)、MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
5 JMS实现-ActiveMQ
5.1 介绍
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。
官网地址:https://activemq.apache.org/
因为该mq历史久远,当前鲜有人用,不详细介绍,我们了解下怎么用有一个体感就行。万一真用到,只能说你接到了一个锅,慢慢爬坑吧。
5.1.1 特色
- •支持Java消息服务(JMS) 1.1 版本
- •Spring Framework
- •集群 (Clustering)
- •支持的编程语言包括:C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby
- •协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP [1]
5.2 安装
安装ActiveMQ开源消息总线:http://activemq.apache.org/activemq-5111-release.html
ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin
5.3 实践
5.3.1 消息生产者Receive消息确认的三种方式:
- •
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage
方法成功返回的时候,会话自动确认客户收到的消息。 - •
Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。 - •
Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。
5.3.2 springboot中使用activeMQ
pom
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.40</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency></dependencies>
配置
packagecom.jf.config; importorg.apache.activemq.command.ActiveMQQueue; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.context.annotation.Bean; importorg.springframework.context.annotation.Configuration; importjavax.jms.Queue; publicclassQueueConfig { "${queue}")//拿到配置文件中的队列名 (privateStringqueue; publicQueuequeue(){ returnnewActiveMQQueue(queue); } }
发送消息类(消息生产者)
packagecom.jf.config; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.jms.core.JmsMessagingTemplate; importorg.springframework.stereotype.Component; importjavax.jms.Queue; publicclassSendServer { privateJmsMessagingTemplatejmsMessagingTemplate; privateQueuequeue; publicvoidsend(Stringmsg){ jmsMessagingTemplate.convertAndSend(queue,msg); } }
消息消费者
packagecom.jf.listen; importcom.alibaba.fastjson.JSON; importcom.alibaba.fastjson.JSONObject; importorg.springframework.jms.annotation.JmsListener; importorg.springframework.stereotype.Component; publicclassConsumer { destination="${queue}") (publicvoidreceive(Stringmsg){ //解析jsonJSONObjectjsonObject=JSON.parseObject(msg); if ("email".equals(jsonObject.get("type"))){ System.out.println("监听器收到消息:发送email给"+jsonObject.get("to")+",内容为:"+jsonObject.get("content")); } } }