基于ActiveMQ的点对点收发消息

简介: ActiveMQ是apache的一个开源消息引擎。可以作为即通引擎或者消息中间件引擎。 准备 下载ActiveMQ http://activemq.apache.org/download.html 进入\bin\win64双击InstallService.bat安装为系统服务。

ActiveMQ是apache的一个开源消息引擎。可以作为即通引擎或者消息中间件引擎。

准备

下载ActiveMQ

http://activemq.apache.org/download.html

进入\bin\win64双击InstallService.bat安装为系统服务。然后启动这个系统服务

访问 http://localhost:8161/admin/queues.jsp 可以看到消息队列列表 账号密码默认就是admin admin,配置在conf/jetty-realm.properties中

然后将根目录下的activemq-all-5.13.2.jar引入项目,就可以进行开发了

首先对ActiveMQ进行简单封装

 1 package activeMQStu;
 2 
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4 
 5 import javax.jms.*;
 6 import java.util.Arrays;
 7 
 8 /**
 9  * Created by lvyahui on 2016/4/23.
10  */
11 public class ActiveMQServer {
12 
13 
14     private String user;
15     private String pass;
16     private String url;
17 
18     private Session session;
19     private Connection connection;
20     private ActiveMQConnectionFactory connectionFactory;
21 
22     public ActiveMQConnectionFactory getConnectionFactory() {
23         return connectionFactory;
24     }
25 
26     public ActiveMQServer(String user, String pass, String url) throws JMSException {
27         this.user = user;
28         this.pass = pass;
29         this.url = url;
30 
31         this.connectionFactory = new ActiveMQConnectionFactory(this.user, this.pass, this.url);
32         /*
33         * 必须指明哪些包的类是可以序列化的
34         * 否则会报错:ActiveMQ Serializable class not available to broker. Reason: ClassNotFoundException
35         * 参考:http://activemq.apache.org/objectmessage.html
36         * */
37         connectionFactory.setTrustedPackages(Arrays.asList("activeMQStu"));
38         this.connection = connectionFactory.createConnection();
39         connection.start();
40     }
41 
42     public Session getSession() throws JMSException {
43         if (session == null) {
44             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
45         }
46         return session;
47     }
48 
49     public Queue createQueue(String name) throws JMSException {
50         return getSession().createQueue(name);
51     }
52 
53     public void close() throws JMSException {
54         if (session != null) {
55             session.commit();
56             session.close();
57         }
58         if (connection != null) {
59             connection.close();
60         }
61     }
62 
63 }

消息

 1 package activeMQStu;
 2 
 3 import java.io.Serializable;
 4 
 5 /**
 6  * Created by lvyahui on 2016/4/23.
 7  */
 8 public class User implements Serializable {
 9 
10     private String username ;
11     private String password ;
12     private String salt ;
13 
14     public String getUsername() {
15         return username;
16     }
17 
18     public void setUsername(String username) {
19         this.username = username;
20     }
21 
22     public String getPassword() {
23         return password;
24     }
25 
26     public void setPassword(String password) {
27         this.password = password;
28     }
29 
30     public String getSalt() {
31         return salt;
32     }
33 
34     public void setSalt(String salt) {
35         this.salt = salt;
36     }
37 
38     @Override
39     public String toString() {
40         return "User{" +
41                 "username='" + username + '\'' +
42                 ", password='" + password + '\'' +
43                 ", salt='" + salt + '\'' +
44                 '}';
45     }
46 }

消息生产者

 1 package activeMQStu.queue;
 2 
 3 import activeMQStu.ActiveMQServer;
 4 import activeMQStu.User;
 5 import org.apache.activemq.command.ActiveMQObjectMessage;
 6 
 7 import javax.jms.JMSException;
 8 import javax.jms.MessageProducer;
 9 import javax.jms.ObjectMessage;
10 import javax.jms.Queue;
11 
12 /**
13  * Created by lvyahui on 2016/4/23.
14  */
15 public class UserProducer {
16 
17     private MessageProducer producer;
18 
19     public UserProducer(ActiveMQServer activeMQServer, String queueName) throws JMSException {
20         Queue queue = activeMQServer.createQueue(queueName);
21         producer = activeMQServer.getSession().createProducer(queue);
22     }
23 
24     public void produce(User user) throws JMSException {
25         ObjectMessage objectMessage = new ActiveMQObjectMessage();
26         objectMessage.setObject(user);
27         producer.send(objectMessage);
28     }
29 }

发送消息

 1 package activeMQStu;
 2 
 3 import activeMQStu.queue.UserProducer;
 4 import org.apache.activemq.ActiveMQConnection;
 5 
 6 import javax.jms.JMSException;
 7 
 8 /**
 9  * Created by lvyahui on 2016/4/23.
10  */
11 public class ProduceApp {
12     public static void main(String[] args) throws JMSException {
13         ActiveMQServer activeMQServer = new ActiveMQServer(
14                 ActiveMQConnection.DEFAULT_USER,
15                 ActiveMQConnection.DEFAULT_PASSWORD,
16                 "tcp://localhost:61616"
17         );
18         UserProducer producer = new UserProducer(activeMQServer,"queue.devlyh");
19         for(int i =0 ;i < 100;i++){
20             User user = new User();
21             user.setUsername("lvyahui".concat(String.valueOf(i)));
22             user.setPassword("admin888" + i);
23             user.setSalt("salt"+i);
24             producer.produce(user);
25         }
26         activeMQServer.close();
27     }
28 }

 

 

 

运行成功后再页面可以看到有100条消息进入了名字为queue.devlyh的队列中

消息接受者

 1 package activeMQStu.queue;
 2 
 3 import activeMQStu.ActiveMQServer;
 4 import activeMQStu.User;
 5 
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageConsumer;
 8 import javax.jms.ObjectMessage;
 9 
10 /**
11  * Created by lvyahui on 2016/4/23.
12  */
13 public class UserConsumer {
14     private MessageConsumer consumer;
15 
16     public UserConsumer(ActiveMQServer activeMQServer,String queueName) throws JMSException {
17         this.consumer = activeMQServer.getSession()
18                 .createConsumer(activeMQServer.createQueue(queueName));
19     }
20 
21     public User consume() throws JMSException {
22         ObjectMessage objectMessage  = (ObjectMessage) consumer.receive();
23         User user = (User) objectMessage.getObject();
24         return user;
25     }
26 }

接收消息

 1 package activeMQStu;
 2 
 3 import activeMQStu.queue.UserConsumer;
 4 import org.apache.activemq.ActiveMQConnection;
 5 import org.apache.activemq.ActiveMQConnectionFactory;
 6 
 7 import javax.jms.JMSException;
 8 import java.util.Arrays;
 9 
10 /**
11  * Created by lvyahui on 2016/4/23.
12  */
13 public class ConsumeApp {
14     public static void main(String[] args) throws JMSException {
15         ActiveMQServer activeMQServer = new ActiveMQServer(
16                 ActiveMQConnection.DEFAULT_USER,
17                 ActiveMQConnection.DEFAULT_PASSWORD,
18                 "tcp://localhost:61616"
19         );
20 
21 //        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) activeMQServer.getConnectionFactory();
22 
23         UserConsumer consumer = new UserConsumer(activeMQServer,"queue.devlyh");
24         while(true){
25             User user = consumer.consume();
26             System.out.println(user);
27         }
28     }
29 }

执行效果如下

 

目录
相关文章
|
1天前
|
数据采集 人工智能 安全
|
10天前
|
云安全 监控 安全
|
2天前
|
自然语言处理 API
万相 Wan2.6 全新升级发布!人人都能当导演的时代来了
通义万相2.6全新升级,支持文生图、图生视频、文生视频,打造电影级创作体验。智能分镜、角色扮演、音画同步,让创意一键成片,大众也能轻松制作高质量短视频。
912 150
|
15天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
1647 8
|
6天前
|
人工智能 前端开发 文件存储
星哥带你玩飞牛NAS-12:开源笔记的进化之路,效率玩家的新选择
星哥带你玩转飞牛NAS,部署开源笔记TriliumNext!支持树状知识库、多端同步、AI摘要与代码高亮,数据自主可控,打造个人“第二大脑”。高效玩家的新选择,轻松搭建专属知识管理体系。
366 152
|
7天前
|
人工智能 自然语言处理 API
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸
一句话生成拓扑图!next-ai-draw-io 结合 AI 与 Draw.io,通过自然语言秒出架构图,支持私有部署、免费大模型接口,彻底解放生产力,绘图效率直接爆炸。
605 152
|
9天前
|
人工智能 安全 前端开发
AgentScope Java v1.0 发布,让 Java 开发者轻松构建企业级 Agentic 应用
AgentScope 重磅发布 Java 版本,拥抱企业开发主流技术栈。
571 13
|
2天前
|
编解码 人工智能 机器人
通义万相2.6,模型使用指南
智能分镜 | 多镜头叙事 | 支持15秒视频生成 | 高品质声音生成 | 多人稳定对话