信息发布→ 登录 注册 退出

Springboot整合RabbitMQ实现发送验证码的示例代码

发布时间:2026-01-11

点击量:
目录
  • 1. RabbitMQ的介绍
  • 2. 搭建环境
    • 2.1引入jar包
    • 2.2生产者配置
      • 2.2.1Rabbit配置类
      • 2.2.2 application.yml文件配置
    • 2.3消费者配置
      • 2.3.1 消费者配置类(同生产者)
      • 2.3.2 application.yml文件配置
  • 3.写发送短信验证码的代码
    • 3.1写一个controller来调用发送验证码的接口
      • 3.2 生成验证码
        • 3.3发送短信验证码
          • 3.4 实现验证码的校对

          1. RabbitMQ的介绍

          • MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
          • 开发中消息队列通常有如下应用场景:

          1、任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

          2、应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。并且有如下优点。

          1.使得简单,功能强大。

          2.基于AMQP协议。

          3.社区活跃,文档完善。

          4.高并发性能好,这主要得益于Erlang语言。

          5.Spring Boot默认已集成RabbitMQ

          • 组成部分说明如下:
          • Broker:消息队列服务进程,此进程包括两个部分:
          • Exchange和Queue。
          • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

          Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

          Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

          Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

          消息发布接收流程:

          -----发送消息-----

          1、生产者和Broker建立TCP连接。

          2、生产者和Broker建立通道。

          3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

          4、Exchange将消息转发到指定的Queue(队列)

          ----接收消息-----

          1、消费者和Broker建立TCP连接

          2、消费者和Broker建立通道

          3、消费者监听指定的Queue(队列)

          4、当有消息到达Queue时Broker默认将消息推送给消费者。

          5、消费者接收到消息

          2. 搭建环境

          • 实现原理:
          • 在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。
          • 多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

          2.1引入jar包

           <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-web</artifactId>
                  </dependency>
                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-amqp</artifactId>
                  </dependency>
                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-test</artifactId>
                  </dependency>
                   <!--redis-->
                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-data-redis</artifactId>
                  </dependency>

          2.2生产者配置

          2.2.1Rabbit配置类

          package com.cui.user.config;
          
          import org.springframework.amqp.core.*;
          import org.springframework.beans.factory.annotation.Qualifier;
          import org.springframework.beans.factory.annotation.Value;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          /** rabbitmq配置类   配置交换机,消息队列,并且绑定交换机和queue
           * @Author  Cui
           * @Date 2025-4-9 14:55
           */
          @Configuration
          public class RabbitmqConfig {
          	//队列bean的名称  cms  用来发送短信验证码
          	public static final String QUEUE_INFORM_CMS= "queue_inform_cms";
          	//队列bean的名称  email  用来发送邮件
          	//public static final String QUEUE_INFORM_EMAIL= "queue_inform_email";
          	//交换机的名称
          	public static final String EXCHANGE_TOPIC_INFORM_="exchange_topic_inform";
          	//队列的名称
          	@Value("${cxp.mq.queue}")
          	public  String queue_cms_postpage_name;
          	//routingKey
          	@Value("${cxp.mq.routingKey}")
          	public  String routingKey;
          	/**
          	 * 交换机配置使用direct类型
          	 * @return the exchange
          	 */
          	@Bean(EXCHANGE_TOPIC_INFORM_)
          	public Exchange EXCHANGE_TOPICS_INFORM() {
          		//durable(true) 持久化,mq重启之后交换机还在
          		return ExchangeBuilder.directExchange(EXCHANGE_TOPIC_INFORM_).durable(true).build();
          	}
          	//声明队列
          	@Bean(QUEUE_INFORM_CMS)
          	public Queue QUEUE_CMS_POSTPAGE() {
          		Queue queue = new Queue(QUEUE_INFORM_CMS);
          		return queue;
          	 * 绑定队列到交换机
          	 *
          	 * @param queue    the queue
          	 * @param exchange the exchange
          	 * @return the binding
          	@Bean
          	public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_CMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM_) Exchange exchange) {
          		return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
          }

          2.2.2 application.yml文件配置

          server:
            port: ${PORT:8002}
          spring:
            application:
              name: cxp-service-manage-user
          
          #Redis配置
            redis:
              host: 127.0.0.1
              port: 6379
              jedis:
                pool:
                  max-active: 8
                  max-wait: -1
                  max-idle: 500
                  min-idle: 0
              lettuce:
                shutdown-timeout: 0
            datasource:
              url: jdbc:mysql://localhost:3306/system_user?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
              username: root
              password: 123456
              driver-class-name: com.mysql.jdbc.Driver
            rabbitmq:
              port: 5672
              username: guest
              password: guest
              virtualHost: /
          cxp:
            mq:
              #cms客户端监控的队列名称(不同的客户端监控的队列不能重复)
              queue: queue_inform_cms
              routingKey: inform.#.sms.#	#此routingKey邮件消费者和信息消费者通用
          mybatis:
            mapper-locations: classpath:mapper/*Mapper.xml
            type-aliases-package: com.cui.model.entity.user
          mapper:
            mappers: com.cui.model.BaseMapper #通用基类配置
            identity: mysql
          pagehelper:
            helperDialect: mysql
            reasonable: true
            supportMethodsArguments: true
            params: count=countSql
          eureka:
            client:
              registerWithEureka: true #服务注册开关
              fetchRegistry: true #服务发现开关
              serviceUrl: #Eureka客户端与Eureka服务端进行交互的地址,多个中间用逗号分隔
                defaultZone: ${EUREKA_SERVER:http://localhost:50101/eureka/,http://localhost:50102/eureka/}
            instance:
              prefer-ip-address:  true  #将自己的ip地址注册到Eureka服务中
              ip-address: ${IP_ADDRESS:127.0.0.1}
              instance-id: ${spring.application.name}:${server.port} #指定实例id
          ribbon:
            MaxAutoRetries: 2 #最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试,如果eureka中找不到服务则直接走断路器
            MaxAutoRetriesNextServer: 3 #切换实例的重试次数
            OkToRetryOnAllOperations: false  #对所有操作请求都进行重试,如果是get则可以,如果是post,put等操作没有实现幂等的情况下是很危险的,所以设置为false
            ConnectTimeout: 5000  #请求连接的超时时间
            ReadTimeout: 6000 #请求处理的超时时间

          2.3消费者配置

          引入jar包,这里需引入阿里云通信多的jar包和Redis的jar包

          <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-data-redis</artifactId>
                  </dependency>
                  <!--阿里云通信-->
                  <dependency>
                      <groupId>com.aliyun</groupId>
                      <artifactId>aliyun-java-sdk-core</artifactId>
                      <version>4.4.0</version>
                      <artifactId>aliyun-java-sdk-dysmsapi</artifactId>
                      <version>1.0.0</version>
          
                  <!-- 导入Eureka客户端的依赖 -->
                      <groupId>org.springframework.cloud</groupId>
                      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
                  <!-- feign相关依赖  -->
                      <artifactId>spring-cloud-starter-openfeign</artifactId>
                    <dependency>
                      <artifactId>spring-boot-starter-test</artifactId>
                      <scope>test</scope>
                      <artifactId>spring-boot-starter-amqp</artifactId>

          2.3.1 消费者配置类(同生产者)

          package com.cui.sms.config;
          
          import org.springframework.amqp.core.*;
          import org.springframework.beans.factory.annotation.Qualifier;
          import org.springframework.beans.factory.annotation.Value;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          /** rabbitmq配置类   配置交换机,消息队列,并且绑定交换机和queue
           * @Author Cui
           * @Date 2025-4-9 14:55
           */
          @Configuration
          public class RabbitmqConfig {
          	//队列bean的名称  cms  用来发送短信验证码
          	public static final String QUEUE_INFORM_CMS= "queue_inform_cms";
          	//队列bean的名称  email  用来发送邮件
          	//public static final String QUEUE_INFORM_EMAIL= "queue_inform_email";
          	//交换机的名称
          	public static final String EXCHANGE_TOPIC_INFORM_="exchange_topic_inform";
          	//队列的名称
          	@Value("${cxp.mq.queue}")
          	public  String queue_cms_postpage_name;
          	//routingKey
          	@Value("${cxp.mq.routingKey}")
          	public  String routingKey;
          	/**
          	 * 交换机配置使用direct类型
          	 * @return the exchange
          	 */
          	@Bean(EXCHANGE_TOPIC_INFORM_)
          	public Exchange EXCHANGE_TOPICS_INFORM() {
          		//durable(true) 持久化,mq重启之后交换机还在
          		return ExchangeBuilder.directExchange(EXCHANGE_TOPIC_INFORM_).durable(true).build();
          	}
          	//声明队列
          	@Bean(QUEUE_INFORM_CMS)
          	public Queue QUEUE_CMS_POSTPAGE() {
          		Queue queue = new Queue(QUEUE_INFORM_CMS);
          		return queue;
          	 * 绑定队列到交换机
          	 *
          	 * @param queue    the queue
          	 * @param exchange the exchange
          	 * @return the binding
          	@Bean
          	public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_CMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM_) Exchange exchange) {
          		return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
          }

          2.3.2 application.yml文件配置

          server:
            port: 8103
          spring:
            application:
              name: cxp-manager-service-sms
            rabbitmq:
              host: 127.0.0.1
              port: 5672
              username: guest
              password: guest
              virtualHost: /
          
          #Redis配置
            redis:
              port: 6379
              password: 123456
              jedis:
                pool:
                  max-active: 8
                  max-wait: -1
                  max-idle: 500
                  min-idle: 0
              lettuce:
                shutdown-timeout: 0
          aliyun:
            sms:
              accessKeyId: XXXXXXXXXXXXXXXXXXXX
              accessKeySecret: XXXXXXXXXXXXXXXXXXXX
              template_code: XXXXXXXXXXX
              sign_name: XXXX
          cxp:
            mq:
              #cms客户端监控的队列名称(不同的客户端监控的队列不能重复)
              queue: queue_inform_cms
              routingKey: inform.sms	#此routingKey用来监听信息
          eureka:
            client:
              registerWithEureka: true #服务注册开关
              fetchRegistry: true #服务发现开关
              serviceUrl: #Eureka客户端与Eureka服务端进行交互的地址,多个中间用逗号分隔
                defaultZone: ${EUREKA_SERVER:http://localhost:50101/eureka/,http://localhost:50102/eureka/}
            instance:
              prefer-ip-address:  true  #将自己的ip地址注册到Eureka服务中
              ip-address: ${IP_ADDRESS:127.0.0.1}
              instance-id: ${spring.application.name}:${server.port} #指定实例id
          ribbon:
            MaxAutoRetries: 2 #最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试,如果eureka中找不到服务则直接走断路器
            MaxAutoRetriesNextServer: 3 #切换实例的重试次数
            OkToRetryOnAllOperations: false  #对所有操作请求都进行重试,如果是get则可以,如果是post,put等操作没有实现幂等的情况下是很危险的,所以设置为false
            ConnectTimeout: 5000  #请求连接的超时时间
            ReadTimeout: 6000 #请求处理的超时时间

          3.写发送短信验证码的代码

          3.1写一个controller来调用发送验证码的接口

          /**
          	 *  发送短信验证码
          	 * @param phone
          	 * @return
          	 */
          	@ApiOperation(value = "发送短信验证码",notes = "发送短信验证码")
          	@GetMapping("/sendSms")
          	public ResponseResult sendSms(String phone){
          		LOGGER.info("要发送的手机号为:{}", phone);
          		userService.sendSms(phone);
          		return new ResponseResult(UserMsg.SUCCESS.getMsgCd(), UserMsg.SUCCESS.getMsgInfo());
          	}

          3.2 生成验证码

          后台生成六位数的随机验证码,并且将验证码存入Redis中,设置五分钟的过期时间(用于用户注册时的校对),将验证码存到RabbitMQ中,当调用发送接口时,生产端将信息发送到绑定的队列中。

          /**
          	 * 向注册用户发送发送验证码
          	 * @param phone  注册的用户的手机号
          	 */
          	@Override
          	public void sendSms(String phone) {
          		//1.生成六位随机验证码
          		Random random = new Random();//随机函数
          		int code = random.nextInt(999999);//设置随机数的最大值
          		if(code<100000){  //如果验证码小于六位数,加100000保证验证码为6位数
          			code+=100000;
          		}
          		//System.out.println("短信验证码:"+code);
          		LOGGER.info("生成的短信验证码为:{{}}", code);
          		//2.将验证码存入redis
          		redisTemplate.boundValueOps("code_"+phone).set(code+"");
          		redisTemplate.boundValueOps("code_"+phone).expire(5, TimeUnit.MINUTES);//设置验证码五分钟到期
          		//3.将验证码存入RabbitMQ
          		Map<String,String> map = new HashMap<String, String>();
          		map.put("phone", phone);
          		map.put("code", code+"");
          		//以json格式存到RabbitMQ消息队列中
          		rabbitTemplate.convertAndSend(EXCHANGE_TOPIC_INFORM_, routingKey, JSON.toJSONString(map));
          	}

          3.3发送短信验证码

          在RabbitMQ的消费者端监听短信的routingKey ,当收到生产端发来的消息后,便会调用阿里云通信向用户发送短信

          package com.cui.sms.mq;
          
          import com.alibaba.fastjson.JSON;
          import com.aliyuncs.CommonResponse;
          import com.cui.sms.utils.SmsUtil;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.amqp.core.Message;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.beans.factory.annotation.Value;
          import org.springframework.stereotype.Component;
          import java.util.Map;
          /**
           * @Author Cui
           * @Date 2025-4-9 15:40
           * 监听MQ,发送短信验证码
           */
          @Component
          public class SmsMessageConsumer {
          	private static final Logger LOGGER = LoggerFactory.getLogger(SmsMessageConsumer.class);
          	@Autowired
          	private SmsUtil smsUtil;
          	@Value("${aliyun.sms.template_code}")
          	private String templateCode;
          	@Value("${aliyun.sms.param}")
          	private String param;  //短信参数
          	@RabbitListener(queues = {"${cxp.mq.queue}"})
          	public void onMessage(Message message) {
          		String jsonString= new String(message.getBody());//得到mq中存入的json格式的消息
          		Map<String,String> map = JSON.parseObject(jsonString, Map.class);//将json格式转换为Map格式
          		String phone = map.get("phone");//mq中存入的手机号
          		String code = map.get("code");//mq中存入的验证码
          		//System.out.println("手机号"+phone+"验证码"+code);
          		LOGGER.info("发送的手机号为:{} ,发送的验证码为 :{}",phone, code);
          		//调用阿里云通信
          		CommonResponse commonResponse = smsUtil.sendSms(phone, templateCode, param.replace("[value]", code));
          	}
          }

          3.4 实现验证码的校对

          用户收到验证码并且填写完相应的信息后,点击注册,将自己的信息发送到后台,后台收到信息后,取出存在Redis中的验证码,和用户的验证码进行比较,然后将结果返回给前端。代码如下所示:

          @PostMapping("/save")
          	@ApiOperation(value = "新增用户",notes = "新增用户")
          	public ResponseResult add(@RequestBody User user, String smsCode){
          		LOGGER.info("新增的用户的信息为:{},用户收到的验证码为:{}", user.toString(),smsCode);
          		//对用户密码进行加密后在存入数据库
          		BCryptPasswordEncoder encoder = new BCryptPasswordEncoder();
          		String newPassword = encoder.encode(user.getPassword());
          		user.setPassword(newPassword);
          		userService.add(user,smsCode );
          		return new ResponseResult(UserMsg.SUCCESS.getMsgCd(), UserMsg.SUCCESS.getMsgInfo());
          	}
          /**
          	 * 用户注册
          	 * @param user  用户对象信息
          	 * @param smsCode  短信验证码
          	 */
          	@Override
          	public void add(User user, String smsCode) {
          		//获取系统验证码
          		String sysCode = (String) redisTemplate.boundValueOps("code_" + user.getPhone()).get();
          		//比较短信验证码
          		LOGGER.info("从Redis中取到的短信验证码为:{{}}",smsCode+"  用户收到的的短信验证码为:{{}}",smsCode);
          		if(sysCode==null||"".equals(smsCode)){
          			throw new RuntimeException("验证码未发送或已过期!请稍后重试");
          		}
          		if(!smsCode.equals(sysCode)){
          			throw new RuntimeException("验证码不正确,请重新输入!");
          		}
          		if(user.getUsername()==null){
          			user.setUsername(user.getPhone());
          		}
          		User searchUser = new User();
          		//将用户传来的手机号传给searchUser,去查询数据库中是否存在该手机号
          		searchUser.setPhone(user.getPhone());
          		if(userDao.selectCount(searchUser)>0){
          			throw  new RuntimeException("该手机号已被注册!");
          		}
          		//设置user的其他参数
          		user.setCreated(new Date());
          		user.setUpdated(new Date());
          		user.setPoints(0);//积分初始值为0
          		user.setStatus("1");//状态1
          		user.setIsEmailCheck("0");//邮箱认证
          		user.setIsMobileCheck("1");//手机认证
          		userDao.insert(user);
          	}
          在线客服
          服务热线

          服务热线

          4008888355

          微信咨询
          二维码
          返回顶部
          ×二维码

          截屏,微信识别二维码

          打开微信

          微信号已复制,请打开微信添加咨询详情!