SpringBoot RabbitMQ实现消息队列 邮箱

简介: SpringBoot RabbitMQ实现消息队列 邮箱

主页:写程序的小王叔叔的博客欢迎来访👀

支持:点赞收藏关注


一、效果

image.png

二、RMQ可以实现的功能

【介绍】:集合了网上各种大佬的教学一起整理

2.1消息中间件:

image.png

image.png

image.pngimage.png

image.png

2.2rmq安装

2.3含义

image.png

image.png

2.4原理

image.png

image.png

image.pngimage.pngimage.pngimage.png


三、SpringBoot + RMQ集成项目消息队列及聊天功能

【实现】:根据各位大佬的整理的原理,我们自己实现下如何使用吧

3.1RMQ配置

在rmq安装成功之后,浏览器输入http://localhost:15672,账号密码:guest/guest登录之后,给这个guest账号设置初始交换机(代码中默认设置的交换机,我的是:EXCHANGE_Member)的权限,这个问题注意下,要不一直提示:

to exchange 'RabbitMQ_Exchange_Member' in vhost '/' refused for user 'guest', (这个错误找了半天并且找我们领导了,才知道的)

如下图:

image.png

3.1.1pom.xml(公共文件配置)

<!--RabbitMQ-->

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-amqp</artifactId>

       </dependency>

3.1.2spring.xml(公共文件配置)

##############RabbitMQ配置#######spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000#开启confirms回调P->Exchangespring.rabbitmq.publisher-confirms=true#开启returnedMessage回调Exchange->Queuespring.rabbitmq.publisher-returns=true#设置手动确认(ack) Queue->Cspring.rabbitmq.listener.simple.acknowledge-mode=manualspring.rabbitmq.listener.simple.prefetch=100

3.2Java RMQ类代码

3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)

image.png

rmqpConfig.java  rmqp基本配置

@ConfigurationpublicclassRabbitConfig {
privatestaticfinalLoggerLOGGER=LogManager.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.host}")
privateStringhost;
@Value("${spring.rabbitmq.port}")
privateintport;
@Value("${spring.rabbitmq.username}")
privateStringusername;
@Value("${spring.rabbitmq.password}")
privateStringpassword;
@Value("${spring.rabbitmq.virtual-host}")
privateStringvhost;
publicstaticfinalStringEXCHANGE_Member="RabbitMQ_Exchange_Member";//邮件:注册+登录   publicstaticfinalStringQUEUE_Member="RabbitMQ_Queue_Member";//邮件:注册+登录publicstaticfinalStringROUTINGKEY_Member="RabbitMQ_RoutingKey_Member";//邮件:注册+登录//建立一个连接容器,类型数据库的连接池@BeanpublicConnectionFactoryconnectionFactory() {
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPublisherConfirms(true);
returnconnectionFactory;
    }
@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
publicRabbitTemplaterabbitTemplate() {
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
template.setMandatory(true);
template.setEncoding("UTF-8");
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: truetemplate.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.info("消息成功消费");
            } else {
LOGGER.info("消息消费失败:"+cause);
            }
        });
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
StringcorrelationId=message.getMessageProperties().getCorrelationIdString();
LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
          });
returntemplate;
    }
/*** 交换机针对消费者配置* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念* DirectExchange:按照routingkey分发到指定队列,多关键字匹配*/@BeanpublicDirectExchangedirectExchange() {
returnnewDirectExchange(EXCHANGE_Member, true,false);
    }
/*** 队列** @return*/@BeanpublicQueuedirectQueue() {
returnnewQueue(QUEUE_Member, true); 
    }
/*** 绑定** @return*/@BeanpublicBindingdirectBinding() {
returnBindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTINGKEY_Member);
    }
@BeanpublicMessageConverterjsonMessageConverter() {
returnnewJackson2JsonMessageConverter();
    }
}

RMQProducer.java   rmqp消息提供端/消息发送端

@ComponentpublicclassRMQProducer {
privateLoggerLOGGER=LoggerFactory.getLogger(RMQProducer.class);
privateSimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@AutowiredprivateRabbitTemplaterabbitTemplate;
/**** 延迟消息队列信息* @param routingKeyName* @param msg*/publicvoidsendMsg(StringroutingKeyName,Stringmsg) {
LOGGER.info("消息发送成功,routingKeyName: {},msg:{},时间:{}", routingKeyName,msg,sdf.format(newDate()));
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_Member, routingKeyName, msg, newMessagePostProcessor() {
@OverridepublicMessagepostProcessMessage(Messagemessage) throwsAmqpException {
message.getMessageProperties().setContentEncoding("utf-8");
message.getMessageProperties().setExpiration("120000"); //设置消息存活时间returnmessage;
             }
         });
//rabbitTemplate.convertAndSend(routingKeyName, msg);     }
}

RMQReceiver.java   rmqp消费端

/**** 消费者* @author Administrator**/@ComponentpublicclassRMQReceiverimplementsChannelAwareMessageListener{
privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());
@RabbitListener(queues=RabbitConfig.QUEUE_Member )
@RabbitHandlerpublicvoidhandler(Messagemessage, Channelchannel) throwsIOException  {
logger.info("接收处理队列Member当中的消息: "+newString(message.getBody()) );
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    }
}

3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)

image.pngimage.png


rmqpConfig.java  rmqp配置类

@ConfigurationpublicclassRabbitConfig {
privatestaticfinalLoggerLOGGER=LogManager.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.host}")
privateStringhost;
@Value("${spring.rabbitmq.port}")
privateintport;
@Value("${spring.rabbitmq.username}")
privateStringusername;
@Value("${spring.rabbitmq.password}")
privateStringpassword;
@Value("${spring.rabbitmq.virtual-host}")
privateStringvhost;
/**** 交换机*/publicstaticfinalStringEXCHANGE_Order="RabbitMQ_Exchange_Order";// 下单/*** 队列*/publicstaticfinalStringQUEUE_Order="RabbitMQ_Queue_Order";// 下单publicstaticfinalStringQUEUE_Pay="RabbitMQ_Queue_Pay";// 支付/**** 路由*/publicStringROUTINGKEY_Order="RabbitMQ_RoutingKey_Order";// 下单//建立一个连接容器,类型数据库的连接池@BeanpublicConnectionFactoryconnectionFactory() {
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPublisherConfirms(true);
returnconnectionFactory;
    }
@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
publicRabbitTemplaterabbitTemplate() {
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
template.setMandatory(true);
template.setEncoding("UTF-8");
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: truetemplate.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.info("消息成功消费");
            } else {
LOGGER.info("消息消费失败:"+cause);
            }
        });
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
StringcorrelationId=message.getMessageProperties().getCorrelationIdString();
LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
returntemplate;
    }

 

/////////Fanout广播配置////////////////////**** 交换机配置* @return*/@BeanpublicFanoutExchangefanoutExchange() {
returnnewFanoutExchange(EXCHANGE_Order);
    }
/*** 队列配置* @return*/@BeanpublicQueuefanoutQueueOrder() {
returnnewQueue(QUEUE_Order);
    }
@BeanpublicQueuefanoutQueuePay() {
returnnewQueue(QUEUE_Pay);
    }
/**** 绑定交换机和队列* @return*/@BeanpublicBindingbindFanoutExchangeOrder() {   
returnBindingBuilder.bind(fanoutQueueOrder()).to(fanoutExchange());
    }
@BeanpublicBindingbindFanoutExchangePay() {  
returnBindingBuilder.bind(fanoutQueuePay()).to(fanoutExchange());
    }
@BeanpublicMessageConverterjsonMessageConverter() {
returnnewJackson2JsonMessageConverter();
    }
}

RMQProducer.java     rmqp消息提供端/消息发送端

/**** 提供者* @author Administrator**/@ComponentpublicclassRMQProducer {
privateLoggerLOGGER=LoggerFactory.getLogger(RMQProducer.class);
privateSimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@AutowiredprivateRabbitTemplaterabbitTemplate;
/**** 延迟消息队列信息* @param routingKeyName* @param msg* @param string */publicvoidsendMsgtoFound(StringexchangeName , Stringmsg) {
LOGGER.info("消息发送成功,msg:{},时间:{}",msg,sdf.format(newDate()));
rabbitTemplate.convertAndSend(exchangeName , msg);
    }
}

RMQReceiver.java  rmqp消费端

/**** 消费者* @author Administrator**/@ComponentpublicclassRMQReceiverimplementsChannelAwareMessageListener{
privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());
@RabbitListener(queues=RabbitConfig.QUEUE_Order )
@RabbitHandlerpublicvoidhandlerOrder(Messagemessage, Channelchannel) throwsIOException  {
logger.info("接收处理队列QUEUE_Order当中的消息: "+newString(message.getBody()) );
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    }
@RabbitListener(queues=RabbitConfig.QUEUE_Pay )
@RabbitHandlerpublicvoidhandlerPay(Messagemessage, Channelchannel) throwsIOException  {
logger.info("接收处理队列QUEUE_Pay当中的消息: "+newString(message.getBody()) );
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定   }
/*** @param trim* @return*/privateMap<String, String>mapStringToMap(Stringstr) {
str=str.substring(1, str.length() -1);
String[] strs=str.split(",");
Map<String, String>map=newHashMap<String, String>();
for (Stringstring : strs) {
Stringkey=string.split("=")[0].trim();
Stringvalue=string.split("=")[1];
map.put(key, value);
        }
returnmap;
   }
}

3.2.3使用交换机

topicExchange: 通配符方式分发消息-(订阅)

四、解决

4.1 工具安装

guest/guest登录失败如何解决:

image.png

解决办法:执行如下命令

命令1:rabbitmqctl set_user_tags guest administrator

命令2:rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'

重启rabbitmq即可。

image.png

重启服务:

image.png

------------------------ 我是愉快的分割线 -----------------------------

停止:service rabbitmq-server stop
启动:service rabbitmq-server start
查看状态:service rabbitmq-server status

4.2什么时间让它消费

什么时间手动消费(手动消费:不消费永远都在rmqp中保留)

操作:目前的得到的方案是:将消费端的监听事件关闭,不用监听,则这样的消息会永远停留在rmqp的交换机-队列-路由中

/**** 消费者* @author Administrator**/@ComponentpublicclassRMQReceiverimplementsChannelAwareMessageListener{
privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());
//@RabbitListener(queues = RabbitConfig.QUEUE_Member )//@RabbitHandlerpublicvoidhandler(Messagemessage, Channelchannel) throwsIOException  {
logger.info("接收处理队列Member当中的消息: "+newString(message.getBody()) );
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
// channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    }
}

如下图:

image.png

五、邮件发送

5.1QQ邮箱授权码获取

image.png

5.2邮箱配置

pom.xml<!--Mail--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency>
spring.xml--》配置邮箱########Mail配置########################QQsmtp.qq.com##sinasmtp.sina.cn##aliyunsmtp.aliyun.com##163smtp.163.com#126邮箱SMTP服务器地址:smtp.126.com,端口号:465或者994#163邮箱SMTP服务器地址:smtp.163.com,端口号:465或者994#yeah邮箱SMTP服务器地址:smtp.yeah.net,端口号:465或者994##发送方spring.mail.host=smtp.qq.com##邮件地址spring.mail.from=1247622527@qq.com#用户名spring.mail.username=1247622527@qq.com##客户端授权码(不是邮箱密码,这个在qq邮箱设置里面自动生成的)spring.mail.password=----------------》》》邮箱的授权码#端口号465或587spring.properties.mail.smtp.port: 25##编码格式spring.mail.default-encoding=UTF-8spring.mail.properties.mail.smtp.auth=truespring.mail.properties.mail.smtp.starttls.enable=truespring.mail.properties.mail.smtp.starttls.required=truespring.mail.properties.mail.smtp.ssl.enable=true

sendMailUtil.java

@ComponentpublicclassSendMailUtil {
privatestaticfinalLoggerLOGGER=LogManager.getLogger(SendMailUtil.class);
@AutowiredprivateJavaMailSendermailSender;
//发送方邮件的发送地址@Value("${spring.mail.host}")
publicstaticStringsendMailHost;
//发送方发送邮件的账号@Value("${spring.mail.username}")
publicstaticStringsendMailUsername;
//发送方发送邮件的客户端授权码@Value("${pring.mail.password}")
publicstaticStringsendMailPassword;
//发送方发送邮件的端口@Value("${spring.properties.mail.smtp.port}")
publicstaticStringsendMailPort;
@Value("${spring.mail.from")
publicstaticStringsendMailFrom;
publicstaticvoidsendSimpleMail(Stringto, Stringsubject, Stringcontent) throwsException{
//创建连接对象 连接到邮件服务器Propertiesproperties=newProperties();
//设置发送邮件的基本参数//发送邮件服务器properties.put("mail.smtp.host", sendMailHost);
//发送端口properties.put("mail.smtp.port", sendMailPort);
properties.put("mail.smtp.auth", "true");
//设置发送邮件的账号和密码Sessionsession=Session.getInstance(properties, newAuthenticator() {
@OverrideprotectedPasswordAuthenticationgetPasswordAuthentication() {
//两个参数分别是发送邮件的账户和密码returnnewPasswordAuthentication(sendMailUsername,sendMailPassword);
            }
        });
//创建邮件对象Messagemessage=newMimeMessage(session);
//设置发件人message.setFrom(newInternetAddress(sendMailUsername));
//设置收件人message.setRecipient(Message.RecipientType.TO,newInternetAddress(to));
//设置主题message.setSubject(subject);
// 设置邮件内容message.setContent(content,"text/html;charset=UTF-8"); 
//发送一封邮件Transport.send(message);
   }
publicvoidsendHtmlMail(Stringto, Stringsubject, Stringcontent) {
//获取MimeMessage对象MimeMessagemessage=mailSender.createMimeMessage();
MimeMessageHelpermessageHelper;
try {
messageHelper=newMimeMessageHelper(message, true);
//邮件发送人messageHelper.setFrom(sendMailFrom);
//邮件接收人messageHelper.setTo(subject);
//邮件主题message.setSubject(subject);
//邮件内容,html格式messageHelper.setText(content, true);
//发送mailSender.send(message);
        } catch (MessagingExceptione) {
        }
   }
publicvoidsendAttachmentsMail(Stringto, Stringsubject, Stringcontent, StringfilePath) {
MimeMessagemessage=mailSender.createMimeMessage();
try {
MimeMessageHelperhelper=newMimeMessageHelper(message, true);
helper.setFrom(sendMailFrom);
helper.setTo(to);
helper.setSubject(subject);
helper.setText(content, true);
FileSystemResourcefile=newFileSystemResource(newFile(filePath));
StringfileName=filePath.substring(filePath.lastIndexOf(File.separator));
helper.addAttachment(fileName, file);
mailSender.send(message);
           } catch (MessagingExceptione) {
//日志信息             }
   }
publicstaticvoidmain(String[] args) throwsException {
//sendSimpleMail("1901660505@qq.com","主题:邮箱注册","内容:这是一个邮件注册码,请输入:"+ IdGenerate.random2FiveId()) ;//System.exit(0);   }
}

业务代码.Java

@OverridepublicObjectuserSendNewMailCode(StringmemberId , StringnewMemberEmail) {
Map<String , Object>mapMemberInfo=newHashMap<String, Object>(); 
if(!"".equals(memberId) &&!"".equals(newMemberEmail)) {
Membermember=memberService.getOnlyOneMemberInfoByMemberId(memberId);
StringvalidEmail=IdGenerate.random2FiveId();
//邮箱登录注册后,可绑定更改新邮箱if (member!=null&&newMemberEmail.equals(member.getMemberEmail())) {
try {
SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:新邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ "+validEmail+" ] 填入邮箱注册码中!");
        } catch (Exceptione) {
e.printStackTrace();
mapMemberInfo.put("exception", e.getMessage());
        }
//邮箱未登录注册后,任意邮箱可绑定      }elseif(member!=null&&!newMemberEmail.equals(member.getMemberEmail())){
try {
SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ "+validEmail+" ] 填入邮箱注册码中!");
        } catch (Exceptione) {
e.printStackTrace();
mapMemberInfo.put("exception", e.getMessage());
        }
      }
List<Object>memberInfo=newArrayList<Object>();
mapMemberInfo.put("newMemberEmail", newMemberEmail);
mapMemberInfo.put("validEmail", validEmail);
memberInfo.add(mapMemberInfo);
returnmapMemberInfo;
    }
returnmapMemberInfo;
  }

转载声明:本文为博主原创文章,未经博主允许不得转载


⚠️注意 ~

💯本期内容就结束了,如果内容有误,麻烦大家评论区指出!

如有疑问❓可以在评论区留言💬或私信留言💬,尽我最大能力🏃‍♀️帮大家解决👨‍🏫!

如果我的文章有帮助到您,欢迎点赞+关注✔️鼓励博主🏃,您的鼓励是我分享的动力🏃🏃🏃~


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
11天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
26 6
|
2月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
83 8
|
2月前
|
Java 数据安全/隐私保护 Spring
springboot实现邮箱发送(激活码)功能
本文介绍了如何在Spring Boot应用中配置和使用邮箱发送功能,包括开启邮箱的SMTP服务、添加Spring Boot邮件发送依赖、配置application.properties文件,以及编写邮件发送的代码实现。
87 2
springboot实现邮箱发送(激活码)功能
|
2月前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
2月前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
2月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
71 4
|
3月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
86 16
|
3月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
77 9
|
3月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
52 1
|
3月前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。

相关产品

  • 云消息队列 MQ