前阵子对Spring Jms实现进行了一些扩展,借此机会系统化地研究了一下Spring对JMS的支持,整理成文,希望大家能够喜欢!
从继承关系上,我们先来看下接口 JmsOperations,基本上可以归纳出这几类方法:
Conveniencemethods for sending messages
Conveniencemethods for sending auto-converted messages
Conveniencemethods for receiving messages
Conveniencemethods for receiving auto-converted messages
Conveniencemethods for browsing messages
充分遵循了CQRS原则。一个MQ其实就是Wrapper后的Queue,数据结构的知识告诉我们,queue有两种存储结构:Array and LinkedList。Array擅长随机读取,LinkedList则擅长删除更新操作,一旦底层采用 了LinkedList结构,Brower就是个大问题,这个要格外注意一下。
This template uses a org.springframework.jms.support.destination.DynamicDestinationResolver and a SimpleMessageConverter as default strategies for resolving a destination name or converting a message, respectively. These defaults can be overridden through the "destinationResolver" and "messageConverter" bean properties.
NOTE: The ConnectionFactory used with this template should return pooled Connections (or a single shared Connection) as well as pooled Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations is going to suffer.
/** * Execute the action specified by the given action object within a * JMS Session. Generalized version of {@code execute(SessionCallback)}, * allowing the JMS Connection to be started on the fly. * <p>Use {@code execute(SessionCallback)} for the general case. * Starting the JMS Connection is just necessary for receiving messages, * which is preferably achieved through the {@code receive} methods. * @param action callback object that exposes the Session * @param startConnection whether to start the Connection * @return the result object from working with the Session * @throws JmsException if there is any problem * @see #execute(SessionCallback) * @see #receive */ public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException { Assert.notNull(action, "Callback object must not be null"); Connection conToClose = null; Session sessionToClose = null; try { //通过事务同步管理器获取与当前线程绑定的Resouce,这里是JmsResourceHolder Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( getConnectionFactory(), this.transactionalResourceFactory, startConnection); if (sessionToUse == null) { conToClose = createConnection(); sessionToClose = createSession(conToClose); if (startConnection) { conToClose.start(); } sessionToUse = sessionToClose; } if (logger.isDebugEnabled()) { logger.debug("Executing callback on JMS Session: " + sessionToUse); } return action.doInJms(sessionToUse); } catch (JMSException ex) { //注意这里的妙处 - 异常转译 throw convertJmsAccessException(ex); } finally { JmsUtils.closeSession(sessionToClose); ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); } }
/** * Send the given JMS message. * @param session the JMS Session to operate on * @param destination the JMS Destination to send to * @param messageCreator callback to create a JMS Message * @throws JMSException if thrown by JMS API methods */ protected void doSend(Session session, Destination destination, MessageCreator messageCreator) throws JMSException { Assert.notNull(messageCreator, "MessageCreator must not be null"); MessageProducer producer = createProducer(session, destination); try { Message message = messageCreator.createMessage(session); if (logger.isDebugEnabled()) { logger.debug("Sending created message: " + message); } doSend(producer, message); // Check commit - avoid commit call within a JTA transaction. if (session.getTransacted() && isSessionLocallyTransacted(session)) { // Transacted session created by this template -> commit. JmsUtils.commitIfNecessary(session); } } finally { JmsUtils.closeMessageProducer(producer); } }
public void convertAndSend( Destination destination, final Object message, final MessagePostProcessor postProcessor) throws JmsException { send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = getRequiredMessageConverter().toMessage(message, session); return postProcessor.postProcessMessage(msg);//注意这里不是对消息发送的后置处理,而是对消息Converter的后置处理(消息发送前的一个Hook) } }); }
/** * Actually receive a JMS message. * @param session the JMS Session to operate on * @param consumer the JMS MessageConsumer to receive with * @return the JMS Message received, or {@code null} if none * @throws JMSException if thrown by JMS API methods */ protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { try { // Use transaction timeout (if available). long timeout = getReceiveTimeout(); JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory()); if (resourceHolder != null && resourceHolder.hasTimeout()) { timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); } Message message = doReceive(consumer, timeout); if (session.getTransacted()) { // Commit necessary - but avoid commit call within a JTA transaction. if (isSessionLocallyTransacted(session)) { // Transacted session created by this template -> commit. JmsUtils.commitIfNecessary(session); } } else if (isClientAcknowledge(session)) { // Manually acknowledge message, if any. if (message != null) { message.acknowledge(); } } return message; } finally { JmsUtils.closeMessageConsumer(consumer); } }
org.springframework.jms.config包里面放置了对Jms schema的解析,这是spring为我们提供的一个非常有用的特性,schema用的好的话,也可以做到面向接口编程,扩展性极好。这方面感兴趣的同学,推荐阅读这里http://openwebx.org/docs/Webx3_Guide_Book.html#d0e574,深入了解下Webx是怎么利用Schema实现OCP原则的。
org.springframework.jms.core.support包里面就一个抽象类JmsGatewaySupport,暂时没怎么用,就是在afterPropertiesSet方法里面内置了一个initGateway方法,用来做一些定制化操作(custominitialization behavior)。
/** * Create a MessageConsumer for the given JMS Session, * registering a MessageListener for the specified listener. * @param session the JMS Session to work on * @return the MessageConsumer * @throws JMSException if thrown by JMS methods * @see #executeListener */ protected MessageConsumer createListenerConsumer(final Session session) throws JMSException { Destination destination = getDestination(); if (destination == null) { destination = resolveDestinationName(session, getDestinationName()); } MessageConsumer consumer = createConsumer(session, destination); if (this.taskExecutor != null) { consumer.setMessageListener(new MessageListener() { public void onMessage(final Message message) { taskExecutor.execute(new Runnable() { public void run() { processMessage(message, session); } }); } }); } else { consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { processMessage(message, session); } }); } return consumer; }怎么样,很简单吧?非常简单的调度算法,也没有失败重连等高级功能。如果需要这些功能,怎么办?ok,是时候DefaultMessageListenerContainer出场了, 一个功能相对比较丰富的 Listener 容器,和 SimpleMessageListenerContainer 不同,它使用 AsyncMessageListenerInvoker 执行一个 looped 的 MessageConsumer.receive() 调用来接收消息,注意这里的 Executor ,默认是 SimpleAsyncTaskExecutor ,文档里写的很清楚:
NOTE: This implementation does not reuse threads! Consider a thread-pooling TaskExecutor implementation instead, in particular for executing a large number of short-lived tasks.来看看这个类里面几个重要的成员变量,首先是 concurrentConsumers和 maxConcurrentConsumers。 通过设置 setConcurrency 方法,可以 scale up number of consumers between the minimum number ofconsumers ( concurrentConsumers ) and the maximum number of consumers(maxConcurrentConsumers)。那么 单个消费任务如何消费消息呢,这里又有一个变量需要注意一下,即 idleTaskExecutionLimit ,官方的解释很清楚了:
Within each task execution, a number of message reception attempts (according to the "maxMessagesPerTask" setting) will each wait for an incoming message (according to the "receiveTimeout" setting). If all of those receive attempts in a given task return without a message, the task is considered idle with respect to received messages. Such a task may still be rescheduled; however, once it reached the specified "idleTaskExecutionLimit", it will shut down (in case of dynamic scaling).接下来,我们来看这个类里面最最重要的调度方法,在其内部类AsyncMessageListenerInvoker里面,如下:
public void run() { synchronized (lifecycleMonitor) { activeInvokerCount++; lifecycleMonitor.notifyAll(); } boolean messageReceived = false; try { if (maxMessagesPerTask < 0) { messageReceived = executeOngoingLoop(); } else { int messageCount = 0; while (isRunning() && messageCount < maxMessagesPerTask) { messageReceived = (invokeListener() || messageReceived); messageCount++; } } } catch (Throwable ex) { clearResources(); if (!this.lastMessageSucceeded) { // We failed more than once in a row - sleep for recovery interval // even before first recovery attempt. sleepInbetweenRecoveryAttempts(); } this.lastMessageSucceeded = false; boolean alreadyRecovered = false; synchronized (recoveryMonitor) { if (this.lastRecoveryMarker == currentRecoveryMarker) { handleListenerSetupFailure(ex, false); recoverAfterListenerSetupFailure(); currentRecoveryMarker = new Object(); } else { alreadyRecovered = true; } } if (alreadyRecovered) { handleListenerSetupFailure(ex, true); } } finally { synchronized (lifecycleMonitor) { decreaseActiveInvokerCount(); lifecycleMonitor.notifyAll(); } if (!messageReceived) { this.idleTaskExecutionCount++; } else { this.idleTaskExecutionCount = 0; } synchronized (lifecycleMonitor) { if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { // We're shutting down completely. scheduledInvokers.remove(this); if (logger.isDebugEnabled()) { logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); } lifecycleMonitor.notifyAll(); clearResources(); } else if (isRunning()) { int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); if (nonPausedConsumers < 1) { logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + "Check your thread pool configuration! Manual recovery necessary through a start() call."); } else if (nonPausedConsumers < getConcurrentConsumers()) { logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + "to be triggered by remaining consumers."); } } } } } private boolean executeOngoingLoop() throws JMSException { boolean messageReceived = false; boolean active = true; while (active) { synchronized (lifecycleMonitor) { boolean interrupted = false; boolean wasWaiting = false; while ((active = isActive()) && !isRunning()) { if (interrupted) { throw new IllegalStateException("Thread was interrupted while waiting for " + "a restart of the listener container, but container is still stopped"); } if (!wasWaiting) { decreaseActiveInvokerCount(); } wasWaiting = true; try { lifecycleMonitor.wait(); } catch (InterruptedException ex) { // Re-interrupt current thread, to allow other threads to react. Thread.currentThread().interrupt(); interrupted = true; } } if (wasWaiting) { activeInvokerCount++; } if (scheduledInvokers.size() > maxConcurrentConsumers) { active = false; } } if (active) { messageReceived = (invokeListener() || messageReceived); } } return messageReceived; }
org.springframework.jms.listener.endpoint包里面提供了一些JavaEE特性 – 对JCA的支持,这里就不展开了。
(1) Spring对JMS的封装停留在JMS 1.1规范上(1.0.2中的支持Deprecated了),JMS 2的支持在最新的4.0 版本中未曾找见;
(2) 消息发送&接收的时候没有预留钩子方法。比方说我们有这样的需求 - 跟踪消息走向,在消息发送完后向本地的agent写一点数据,agent定时,定量推送数据去server端做统计运算,展示等。这个时候就没有out-of-box的方法可以去实现,当然变通的方法也有不少,但不适合和开源版本融合;
(3) 缺少一些容错策略,比方说消息发送失败,如何处理?
(4) 缺少连接复用,一种很重要的提升性能策略。