文章目录
前言
1.消息属性
RabbitMQ是基于AMQP消息传输协议来实现的消息中间件;类似HTTP有header和body两部分数据,Message是RabbitMQ中的消息体概念。
Message由Properties和Body组成,前者是一些元信息,如消息的优先级、持久化、传输格式(如JSON)、延迟等高级特性,Body则是传递的消息数据实体
2.消息投递
Exchange、Queue与Routing Key三个概念是理解RabbitMQ消息投递的关键。RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。
Producer只能将自己的消息投递到Exchange中,由Exchange按照routing_key投递到对应的Queue中。
3.消息可靠性
不同于HTTP的同步访问,RabbitMQ中,Producer并不知道消息是否被可靠地投递到了Consumer中处理。那么,RabbitMQ是如何保证消息的可靠投递?
主要是两点:第一,消息确认机制。Consumer处理完消息后,需要发送确认消息给Broker Server,可以选择“确认接收”、“丢弃”、“重新投递”三种方式。如果Consumer在Broker Server收到确认消息之前挂了,Broker Server便会重新投递该消息。
第二,可以选择数据持久化,这样即使RabbitMQ重启,也不会丢失消息
一、Hello World(简单)模式
在下图中,“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代
表使用者保留的消息缓冲区
博主这里使用JAVA实现。
1.导入依赖
<!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>
2.消息生产者
publicclassProducer { privatefinalstaticStringQUEUE_NAME="hello"; publicstaticvoidmain(String[] args) throwsException { //创建一个连接工厂ConnectionFactoryfactory=newConnectionFactory(); factory.setHost("192.168.10.130"); factory.setUsername("guest"); factory.setPassword("guest"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel()) { /*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null); Stringmessage="hello world"; /*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } } }
3.消息消费者
publicclassConsumer { privatefinalstaticStringQUEUE_NAME="hello"; publicstaticvoidmain(String[] args) throwsException { ConnectionFactoryfactory=newConnectionFactory(); factory.setHost("192.168.10.130"); factory.setUsername("guest"); factory.setPassword("guest"); Connectionconnection=factory.newConnection(); Channelchannel=connection.createChannel(); System.out.println("等待接收消息...."); //推送的消息如何进行消费的接口回调DeliverCallbackdeliverCallback=(consumerTag,delivery)->{ Stringmessage=newString(delivery.getBody()); System.out.println(message); }; //取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallbackcancelCallback=(consumerTag)->{ System.out.println("消息消费被中断"); }; /*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
二、Work Queues(工作)模式
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进
程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
1.抽取工具类
publicclassRabbitMqUtils { //得到一个连接的 channelpublicstaticChannelgetChannel() throwsException{ //创建一个连接工厂ConnectionFactoryfactory=newConnectionFactory(); factory.setHost("192.168.10.130"); factory.setUsername("guest"); factory.setPassword("guest"); Connectionconnection=factory.newConnection(); Channelchannel=connection.createChannel(); returnchannel; } }
2.启动两个工作线程
publicclassWorker01 { privatestaticfinalStringQUEUE_NAME="hello"; publicstaticvoidmain(String[] args) throwsException { Channelchannel=RabbitMqUtils.getChannel(); DeliverCallbackdeliverCallback=(consumerTag,delivery)->{ StringreceivedMessage=newString(delivery.getBody()); System.out.println("接收到消息:"+receivedMessage); }; CancelCallbackcancelCallback=(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }; System.out.println("C2 消费者启动等待消费......"); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
3.启动一个发送线程
publicclassTask01 { privatestaticfinalStringQUEUE_NAME="hello"; publicstaticvoidmain(String[] args) throwsException { try(Channelchannel=RabbitMqUtils.getChannel();) { channel.queueDeclare(QUEUE_NAME,false,false,false,null); //从控制台当中接受信息Scannerscanner=newScanner(System.in); while (scanner.hasNext()){ Stringmessage=scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成:"+message); } } } }
4.结果
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且
是按照有序的一个接收一次消息
总结
以上就是RabbitMQ 核心部分之简单模式和工作模式的相关知识,希望对你有所帮助。
积跬步以至千里,积怠惰以至深渊。时代在这跟着你一起努力哦!