一、ActiveMQ学习
1、基本概念以及介绍
Apache ActiveMQ是最受欢迎和强有力的开源消息和集成模式服务器,支持许多跨语言客户端和协议,便利使用企业集成模式还有许多先进的特性。
相关命令
activemq start:启动 activemq stop:关闭
管理界面:
http://127.0.0.1:8161/admin/ login:admin password:admin
监听端口
ActiveMQ默认端口是61616。 windows查看端口:netstat -an|find "61616" unix查看端口:netstat -nl|grep 61616
2、两种消息模式
(1)点对点模式
点对点的模式主要建立在一个队列上面,当连接一个队列的时候,发送端不需要知道接收端是否正在接收,发送的消息,将会先进入消息队列中,如果
有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存到activemq服务器,直到接收端接收信息,点对点的消息模式可以有多个发送端,
多个接收端,但是一条消息,只会被一个接收端接收,那个接收端先连上activemq,则会先接收到,而后来的接收端则接收不到那条信息。
(2)订阅/发布模式
订阅/发布模式,同样可以有多个发送端与接收端,但是接收端和发送端存在时间上的依赖,如果发送端发送消息的时候,接收端并没有接收信息,那么
activemq不会保存信息,认为消息已经发送。换一种说法,就是发送端发送消息的时候,接收端不在线,是接收端不到信息的,哪怕以后监听消息,同样
也是接收不到的。这个模式还有一个特点,发送端发送的消息,将会被所有的接收端接收。不类似点对点,一条消息只会被一个接收端接收到。
点对点模式发送端
public class QueueSender { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = factory.createConnection(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 开启连接 conn.start(); Destination des = session.createQueue("sampleQueue"); MessageProducer producer = session.createProducer(des); // 默认为persistent,当activemq关闭时,队列数据将会被保存。当为non_persistent时,activemq关闭时,队列数据将会被清空。 producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(session.createTextMessage("Hello World!")); producer.close(); session.close(); conn.close(); } }
点对点模式接收端
public class QueueConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = factory.createConnection(); // 开启连接 conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination des = session.createQueue("sampleQueue"); MessageConsumer consumer = session.createConsumer(des); consumer.setMessageListener(message -> { if (message instanceof TextMessage) { TextMessage tm = TextMessage.class.cast(message); try { System.out.println(tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 程序等待 System.in.read(); consumer.close(); session.close(); conn.close(); } }
订阅模式发送端
public class TopicSender { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = factory.createConnection(); // 开启连接 conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination des = session.createTopic("myTopic"); MessageProducer producer = session.createProducer(des); // 默认为persistent,当activemq关闭时,队列数据将会被保存。当为non_persistent时,activemq关闭时,队列数据将会被清空。 producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(session.createTextMessage("Hello World!")); producer.close(); session.close(); conn.close(); } }
订阅模式接收端
public class TopicConsumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = factory.createConnection(); // 开启连接 conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination des = session.createTopic("myTopic"); MessageConsumer consumer = session.createConsumer(des); consumer.setMessageListener(message -> { if (message instanceof TextMessage) { TextMessage tm = TextMessage.class.cast(message); try { System.out.println(tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 程序等待 System.in.read(); consumer.close(); session.close(); conn.close(); } }
3、发送消息的数据类型
//纯字符串的数据 session.createTextMessage(); //序列化的对象 session.createObjectMessage(); //流,可以用来传递文件等 session.createStreamMessage(); //用来传递字节 session.createBytesMessage(); //这个方法创建出来的就是一个map,可以把它当作map来用,当你看了它的一些方法,你就懂了 session.createMapMessage(); //这个方法,拿到的是javax.jms.Message,是所有message的接口 session.createMessage();
4、保证消息的成功处理
创建session的时候使用客户端确认模式,如:
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); AUTO_ACKNOWLEDGE:
当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端
收到同样的消息。
CLIENT_ACKNOWLEDGE:
由客户端进行确认,当客户端接收到消息进行处理后,客户端调用Message接口的acknowledge方法进行确认,如果不确定,activemq将会发给下一个接收端处理。
注意:CLIENT_ACKNOWLEDGE模式只在点对点模式有效,订阅/发布模式中,即使不确认,也不会保存消息。
二、Spring中的JMS
1、Spring集成JMS各包介绍
Spring提供了JMS集成框架,简化了JMS API的使用,JMS大致能分为两块功能:消息的生产和消费。JMSTemplate类用于消息的生产和异步消息的接收,
Spring提供了许多消息消息监听器容器用来创建消息驱动的POJO,同时也提供了声明式的方法来创建消息监听器。
org.springframework.jms.core提供了使用JMS的核心功能,包括通过处理资源创建和释放来简化JMS API使用的JMS模板类。Spring模板类的设计原则是提供
帮助方法执行通用的操作和为了更加复杂的使用,而把处理任务的核心委托给用户实现的回调接口。
org.springframework.jms.support提供了JMSException的异常转化功能,会把受检的JMSException转换成非受检异常,例如javax.jms.JMSException会被包装为
UncategorizedJmsException。
org.springframework.jms.support.converter提供了在Java对象和JMS消息之间的消息转换器抽象。
org.springframework.jms.support.destination提供了许多管理JMS目的地的策略,比如为存储在JNDI的目的地提供服务定位器。
org.springframework.jms.annotation提供了支持使用@JMSListener注解的必要设施。
org.springframework.jms.config提供了jms命名空间的解析实现以及配置监听器容器的java配置支持。
org.springframework.jms.connection提供了独立应用程序中适合使用的连接工厂的使用。同时也包含了JMS的平台事务管理器,也就是JmsTransactionManager。
2、使用Spring JMS
(1)JmsTemplate
JmsTemplate类简化了JMS的使用,因为当发送消息和异步接收消息它会处理资源的创建和释放,使用JmsTemplate只需要实现回调接口。
MessageCreator回调接口指定session就能创建消息,对于更复杂的使用,SessionCallback提供用户Jms会话和ProducerCallbacn回调。
备注:JmsTemplate实例一旦配置就是线程安全的,这就意味着可以配置一个JmsTemplate类的实例然后注入共享引用到多个协作者中,
简单来说,JmsTemplate是有状态的,因为它维护者一个连接工厂的引用,但是这种状态是非会话状态。
(2)Connections
JmsTemplate需要一个连接工厂的引用,通常被客户端应用作为工厂创建连接,而且它还包含了一些配置参数,许多参数是供应相关的,比如SSL配置选项。
(3)Caching Messaging Resources
在连接工厂和发送操作间,有三个中间对象会被创建和销毁,如下:
ConnectionFactory->Connection->Session->MessageProducer->send
为了优化资源的使用、提升性能,将会提供两个连接工厂。
(4)连接工厂分类
SingleConnectionFactory:每次都会创建一个新的连接,适合测试
CachingConnectionFactory:基于SingleConnectionFactory的基础上增加了Sessions、MessageProducers和MessageConsumer的缓存,初始缓存大小为1,
通过sessionCacheSize可以增加缓存sessions的数量。
注意:实际缓存Sessions的数量将会比设定的要大,因为缓存的sessions是基于确认模式的。
备注:推荐使用apache的PooledConnectionFactory,PooledConnectionFactory支持Connection、Session和MessageProducer的池化,连接、会话和消息生产者实例
使用后会返回池里,以便后续使用。而Consumer不会池化,消费者处理完消息后应该关闭,而不是放到池里后续使用。尽管消费者空闲,但ActiveMQ会继续发送消息
到消费者的预获取缓冲区,消息会在预获取缓冲区阻塞,直到消费者再次被激活。
(5)Destination Management
JmsTemplate会把目的地名的解析委托给JMS目的地解析对象,也就是DestinationResolver的实现。DynamicDestinationResolver是JmsTemplate使用的
默认实现,它提供解析动态目的地。
通常在JMS应用中的目的地只在运行时得知,当应用程序部署时是不会被创建的。这是因为在交互式的系统组件中有共享程序逻辑,这些系统组件会根据
一个已知的命名规则在运行期间创建目的地。
(6)Message Listener Containers
消息监听容器用来接收来自JMS消息队列的消息并驱动已注入的消息监听器。监听容器负责所有消息接收的线程并分发消息给监听器,消息监听器容器是在
MDP(message-driven POJO)和消息提供者的媒介,它关注接收消息的注册、参与事务、资源获取和释放以及异常转化。
分类:
SimpleMessageListenerContainer
这种消息监听容器会在启动时创建固定数量的JMS sessions和consumers,并且用标准的MessageConsumer.setMessageListener()注册监听器,然后让JMS
消息生产者调用回调接口。
注意:SimpleMessageListenerContainer并不允许动态适应运行时需求以及参与外部事务,它只支持本地事务,将sessionTransacted标识设为true即可。来自消息监听器的异常
会导致回滚,而且消息会重新发送。
DefaultMessageListenerContainer
这种消息监听器是使用最多的,和SimpleMessageListenerContainer相反,这种容器变体允许动态适应运行时需求,当配置了JtaTransactionManager时,每条被
接收的消息都会伴随XA transaction而注册。这种监听器在JMS生产者的低需求、参与外部管理事务的功能和兼容Java EE环境中打造了平衡。
(7)Transaction management
Spring提供了JmsTransactionManager为单个连接工厂管理事务,允许应用程序利用Spring的事务管理特性。JmsTransactionManager管理本地资源事务,
绑定来自指定连接工厂的JMS连接/会话对到线程中,JmsTemplate自动检测事务资源并相应进行操作。
三、SpringBoot与ActiveMQ集合使用
1、加入相关依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.iboxpay</groupId> <artifactId>activemq_learning</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <name>activemq_learning</name> <description>activemq_learning for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <!-- spring框架 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <!-- spring测试框架 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- mybatis与spring集成 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.2</version> </dependency> <!-- mysql连接驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <!-- mq连接池 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency> <!-- jsp --> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <!-- jstl中包含标准标签库 --> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> </dependency> <!--javamelody应用监控 --> <dependency> <groupId>net.bull.javamelody</groupId> <artifactId>javamelody-spring-boot-starter</artifactId> <version>1.74.0</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> <!-- commons lang --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!-- commons code --> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2、集成相关配置
@Configuration @PropertySource("classpath:jms.properties") public class ActiveMQConfig { @Autowired private JmsConfig jmsConfig; @Autowired private MyListener myListener; /** * ActiveMQ连接池 * @return */ @Bean(name = "pooledConnectionFactory", destroyMethod = "stop") public PooledConnectionFactory pooledConnectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(jmsConfig.getBrokerUrl()); return new PooledConnectionFactory(connectionFactory); } /** * 发送或接收目的地 * @return */ @Bean("activeMQQueue") public ActiveMQQueue activeMQQueue() { return new ActiveMQQueue(jmsConfig.getDestinationName()); } /** * JmsTemplate消息发送模板类配置 * @param connectionFactory * @param activeMQQueue * @return */ @Bean public JmsTemplate jmsTemplate(@Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory, @Qualifier("activeMQQueue") ActiveMQQueue activeMQQueue) { JmsTemplate jmsTemplate = new JmsTemplate(); // 指定连接工厂 jmsTemplate.setConnectionFactory(connectionFactory); // 开启事务 jmsTemplate.setSessionTransacted(true); // 指定确认模式,默认为自动确认 jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); // 指定发送目的地 jmsTemplate.setDefaultDestination(activeMQQueue); return jmsTemplate; } /** * jms事务管理器,创建session时需打开事务 * @param connectionFactory * @return */ @Bean(name = "jmsTransactionManager") public JmsTransactionManager jmsTransactionManager( @Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory) { return new JmsTransactionManager(connectionFactory); } /** * 消息监听器容器,用来接收来自指定目的地的消息 * @param connectionFactory * @param jmsTransactionManager * @param threadPoolTaskExecutor * @param activeMQQueue * @return */ @Bean public DefaultMessageListenerContainer defaultMessageListenerContainer( @Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory, @Qualifier("jmsTransactionManager") JmsTransactionManager jmsTransactionManager, @Qualifier("threadPoolTaskExecutor") ThreadPoolTaskExecutor threadPoolTaskExecutor, @Qualifier("activeMQQueue") ActiveMQQueue activeMQQueue) { DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 开启事务 container.setSessionTransacted(true); container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); container.setDestination(activeMQQueue); // 指定事务管理器 container.setTransactionManager(jmsTransactionManager); // 指定消息监听器 container.setMessageListener(myListener); // 并发消费者数量 container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers()); // 最大并发消费者数量 container.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers()); // 指定任务执行器运行监听线程 container.setTaskExecutor(threadPoolTaskExecutor); // 接收时长 container.setReceiveTimeout(jmsConfig.getReceiveTimeout()); return container; } /** * 线程池任务执行器配置 * @return */ @Bean("threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(20); threadPoolTaskExecutor.setMaxPoolSize(100); threadPoolTaskExecutor.setKeepAliveSeconds(300); threadPoolTaskExecutor.setQueueCapacity(1000); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); return threadPoolTaskExecutor; } }
@Component @ConfigurationProperties(prefix = "jms") public class JmsConfig { /** * broker地址 */ private String brokerUrl; /** * 并发连接消费者数 */ private Integer concurrentConsumers; /** * 最大并发连接消费者数 */ private Integer maxConcurrentConsumers; /** * 目的地 */ private String destinationName; /** * 消息接收时长 */ private Integer receiveTimeout; public String getBrokerUrl() { return brokerUrl; } public Integer getConcurrentConsumers() { return concurrentConsumers; } public Integer getMaxConcurrentConsumers() { return maxConcurrentConsumers; } public String getDestinationName() { return destinationName; } public Integer getReceiveTimeout() { return receiveTimeout; } public void setBrokerUrl(String brokerUrl) { this.brokerUrl = brokerUrl; } public void setConcurrentConsumers(Integer concurrentConsumers) { this.concurrentConsumers = concurrentConsumers; } public void setMaxConcurrentConsumers(Integer maxConcurrentConsumers) { this.maxConcurrentConsumers = maxConcurrentConsumers; } public void setDestinationName(String destinationName) { this.destinationName = destinationName; } public void setReceiveTimeout(Integer receiveTimeout) { this.receiveTimeout = receiveTimeout; } @Override public String toString() { return "JmsConfig [brokerUrl=" + brokerUrl + ", concurrentConsumers=" + concurrentConsumers + ", maxConcurrentConsumers=" + maxConcurrentConsumers + ", destinationName=" + destinationName + ", receiveTimeout=" + receiveTimeout + "]"; } }
#================ jms相关配置 ===============# #broker地址 jms.brokerURL=tcp://127.0.0.1:61616 #并发消费者数量 jms.concurrentConsumers=1 #最大并发消费者数量 jms.maxConcurrentConsumers=1 #发送目的地名称 jms.destinationName=spring_jms_test #消息监听器容器接收时长 jms.receiveTimeout=0
以上为jms.properties属性文件内容
3、消息监听器配置
@Component public class MyListener implements MessageListener { private static Logger logger = LoggerFactory.getLogger(MyListener.class); @Override public void onMessage(Message message) { if (message instanceof TextMessage) { try { TextMessage textMessage = TextMessage.class.cast(message); System.out.println(textMessage.getText()); } catch (JMSException e) { logger.error("接收消息失败:{}", e.getMessage(), e); } } } }
4、消息发送控制器
@Controller public class JmsController { @Autowired private JmsTemplate jmsTemplate; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; @RequestMapping("sendMessage.action") @ResponseBody public Map<String, Object> sendMessage(String textMessage) { Map<String, Object> resp = new HashMap<>(); if (StringUtils.isBlank(textMessage)) { resp.put("statusCode", 0); resp.put("msg", "消息不能为空!"); return resp; } // 异步发送消息 threadPoolTaskExecutor.execute(() -> { // 实现回调接口创建消息 jmsTemplate.send(session -> { return session.createTextMessage(textMessage); }); }); resp.put("statusCode", 1); resp.put("msg", "消息发送成功!"); return resp; } }