使用Apache Qpid JMS对接阿里云物联网平台 ,配置SELECTOR不生效,求解答 maven
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.51.0</version>
</dependency>
初始化代码
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = connectionFactory.createConnection(userNameBuilder.toString(), password);
(JmsConnection) connection).addConnectionListener(new DefaultIoTJmsConnectionListener());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(queue, "topic='1'");
consumer.setMessageListener((MessageListener)listener);
connection.start();
看了源码还是没有找到原因
这里的代码是什么作用?
if (resourceInfo.getSelector() != null && !resourceInfo.getSelector().trim().equals("")) {
filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resourceInfo.getSelector()));
}
onMessage前好像没有selector相关的操作?
private boolean deliverNextPending() {
if (session.isStarted() && messageQueue.isRunning() && messageListener != null) {
dispatchLock.lock();
try {
JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
if (envelope == null) {
return false;
}
TraceableMessage facade = envelope.getMessage().getFacade();
if (consumeExpiredMessage(envelope)) {
LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
doAckExpired(envelope);
tracer.asyncDeliveryInit(facade, address);
tracer.asyncDeliveryComplete(facade, DeliveryOutcome.EXPIRED, null);
} else if (session.redeliveryExceeded(envelope)) {
LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
applyRedeliveryPolicyOutcome(envelope);
tracer.asyncDeliveryInit(facade, address);
tracer.asyncDeliveryComplete(facade, DeliveryOutcome.REDELIVERIES_EXCEEDED, null);
} else {
final JmsMessage copy;
boolean deliveryFailed = false;
boolean autoAckOrDupsOk = acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
if (autoAckOrDupsOk) {
copy = copy(doAckDelivered(envelope));
} else {
copy = copy(ackFromReceive(envelope));
}
session.clearSessionRecovered();
try {
tracer.asyncDeliveryInit(facade, address);
messageListener.onMessage(copy);
} catch (RuntimeException rte) {
deliveryFailed = true;
tracer.asyncDeliveryComplete(facade, DeliveryOutcome.APPLICATION_ERROR, rte);
} finally {
if(!deliveryFailed) {
tracer.asyncDeliveryComplete(facade, DeliveryOutcome.DELIVERED, null);
}
}
if (autoAckOrDupsOk && !session.isSessionRecovered()) {
if (!deliveryFailed) {
doAckConsumed(envelope);
} else {
doAckReleased(envelope);
}
}
}
} catch (Exception e) {
// TODO - There are two cases where we can get an error here, one being
// and error returned from the attempted ACK that was sent and the
// other being an error while attempting to copy the incoming message.
// We need to decide how to respond to these.
session.getConnection().onException(e);
} finally {
dispatchLock.unlock();
if (isPullConsumer()) {
try {
startConsumerResource();
} catch (JMSException e) {
LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e);
}
}
}
}
遇到了同样的问题,在CSDN看到了,希望阿里云团队能够给出正确、标准的答案~请查看
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。