1. 首页
  2. >
  3. 编程技术
  4. >
  5. Java

SpringBoot RabbitMQ消息可靠发送与接收

环境: springboot2.2.11.RELEASE + RabbitMQ3.7.4

RabbitMQ在以下情况会出现消息的丢失:

  • 交换机、队列、消息未持久化,mq重启后会出现消息丢失。
  • 生产者发出的消息第一步是投递到交换机,这一步可能因为网络原因导致失败。
  • 消息正常投递到交换机后,通过路由key路由到队列的时候出现失败。(没有符合的队列)
  • 代码层面,配置层面,考虑不全导致消息丢失。
  • 消费端接收到相关消息之后,消费端还没来得及处理消息,消费端机器就宕机了,此时消息如果处理不当会有丢失风险。

RabbitMQ中有两种方案来实现消息的可靠发送,分别如下:

RabbitMQ的事物机制

RabbitMQ的生产者确认机制

准备环境:分别建立交换机和队列

1、交换机test-exchange 持久化,类型为topic

2、队列test-queue test-queue 持久化。

3、交换机与队列绑定, Routing key = tm.#

方案1:事物机制

pom.xml依赖

<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter-amqp</artifactId> 		</dependency> 		<dependency> 			<groupId>org.springframework.boot</groupId> 			<artifactId>spring-boot-starter-web</artifactId> 		</dependency>

application.yml配置

spring:   rabbitmq:     host: localhost     port: 5672     username: guest     password: guest     listener:       simple:         concurrency: 5         maxConcurrency: 10         prefetch: 5         retry:           enabled: true           initialInterval: 3000           maxAttempts: 3         defaultRequeueRejected: fals

这里配置很简单主要就是消息监听并发数配置,重试相关的配置。

RabbitConfig.java 配置RabbitMQ

@Configuration public class RabbitConfig { 	 	private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class) ; 	 	@Bean 	public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) { 		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory) ; 		rabbitTemplate.setChannelTransacted(true) ; 		return rabbitTemplate ; 	} 	 	@Bean("rabbitTransactionManager")     public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {         return new RabbitTransactionManager(connectionFactory);     } 	 }

这里必须配置rabbit事物Bean,因为我们的发送消息的方法添加注解事物时需要设置事务管理器。

rabbitTemplate.setChannelTransacted(true) ;这行代码必须设置为true开启事物功能。

MessageSend.java 消息发送

@Component public class MessageSend { 	 	private static Logger logger = LoggerFactory.getLogger(MessageSend.class) ; 	 	@Resource 	private RabbitTemplate rabbitTemplate ; 	 	@Transactional(rollbackFor = {Exception.class}, transactionManager = "rabbitTransactionManager") 	public void send(String msg) { 		logger.info("准备发送消息:{}", msg); 		rabbitTemplate.convertAndSend("test-exchange", "tm.1", msg) ; 		if ("1".equals(msg)) { 			throw new RuntimeException("消息内容不真实") ; 		} 	} 	 }

注意这里要给发送消息的方法添加@Transactional,同时还要设置事物管理器的Bean Name。

MessageController.java 接口发送。

@RestController @RequestMapping("/messages") public class MessageController { 	 	@Resource 	private MessageSend ms ; 	 	@GetMapping("/send") 	public Object send(String msg) { 		ms.send(msg) ; 		return "success" ; 	} 	 }

测试:

先发送一条正常的消息

http://localhost:8080/messages/send?msg=123

控制台输出:


SpringBoot RabbitMQ消息可靠发送与接收


SpringBoot RabbitMQ消息可靠发送与接收

rabbitmq接受到了我们的消息。

接着发送消息内容为1的消息,当消息内容为1时,发送程序会抛出异常,看看消息是否能够回滚。

http://localhost:8080/messages/send?msg=1


SpringBoot RabbitMQ消息可靠发送与接收


SpringBoot RabbitMQ消息可靠发送与接收

rabbit并没有收到消息,说明消息回滚了。

接下来把@Transactional注解去掉后看看


SpringBoot RabbitMQ消息可靠发送与接收

rabbit收到了消息,这也说明前面配置的事物是生效的。

到这里基于事物机制的消息可靠传输就完了,接下来看看基于消息确认机制。


方案2:消息确认机制

application.yml

spring:   rabbitmq:     host: localhost     port: 5672     username: guest     password: guest     virtual-host: test     publisherConfirmType: correlated     publisherReturns: true     listener:       simple:         concurrency: 5         maxConcurrency: 10         prefetch: 5         acknowledgeMode: MANUAL         retry:           enabled: true           initialInterval: 3000           maxAttempts: 3         defaultRequeueRejected: false 

publisherConfirmType取值说明:

  • NONE值是禁用发布确认模式,是默认值
  • CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
  • SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

注意:在springboot2.2.0.RELEASE版本之前amqp正式支持的属性是(spring.rabbitmq.publisher-confirm),用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-confirm-type属性配置代替,用来配置更多的确认类型;

publisherReturns: 可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃。简单说就是消息不能正确送达到队列将回调。

RabbitConfig.java 配置

@Configuration public class RabbitConfig { 	 	private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class) ; 	 	@Bean 	public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) { 		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory) ; 		// rabbitTemplate.setChannelTransacted(true) ; 		rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { 			@Override 			public void confirm(CorrelationData correlationData, boolean ack, String cause) { 				if (ack) { 					logger.info("消息发送成功, 关联数据的ID:{}", correlationData.getId()) ; 		        } else { 		        	logger.error("消息发送失败, 失败原因:{}", cause) ; 		        } 			} 		}) ; 		rabbitTemplate.setMandatory(true) ; 		rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { 			@Override 			public void returnedMessage(Message message, int replyCode, String replyText,  					String exchange, String routingKey) { 				logger.error("发送出现错误,请检查exchange:{}, routingKey: {}是否配置正确, 消息内容:{}",  						exchange,  						routingKey, 						new String(message.getBody(), Charset.forName("UTF-8"))) ; 			} 		}) ; 		return rabbitTemplate ; 	} 	 	@Bean("rabbitTransactionManager")     public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {         return new RabbitTransactionManager(connectionFactory);     } 	 } 

rabbitTemplate.setChannelTransacted(true) ; 这个代码一定要注释了或者不要设置或者设置为false;rabbitmq中的事物机制和消息确认机制是互斥的。。。

setConfirmCallback方法:只要消息到达了交换机ack会true(这时候它是不管该交换机是否与队列有绑定),当消息无法到达交换机(如:交换机不存在)ack为false。

setReturnCallback方法:用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调(也就是说到达了交换机,但是没有队列与该交换机绑定)

MessageSend.java 发送程序

@Component public class MessageSend { 	 	private static Logger logger = LoggerFactory.getLogger(MessageSend.class) ; 	 	@Resource 	private RabbitTemplate rabbitTemplate ; 	 	public void sendConfirm(String msg) { 		logger.info("准备发送消息:{}", msg); 		rabbitTemplate.convertAndSend("test-exchange1", "tm.1", msg) ; 	} 	 }

注意:这里我故意把交换机名字写错加了一个1,就是为了测试用。

MessageController.java 发送程序

@RestController @RequestMapping("/messages") public class MessageController { 	 	@Resource 	private MessageSend ms ; 	 	@GetMapping("/send") 	public Object send(String msg) { 		ms.send(msg) ; 		return "success" ; 	} 	 	@GetMapping("/send2") 	public Object send2(String msg) { 		ms.sendConfirm(msg) ; 		return "success" ; 	} 	 }

测试:

上面我把交换机名字已经故意写错了,test-exchange1,实际是:test-exchange。

访问:/messages/send2?msg=1


SpringBoot RabbitMQ消息可靠发送与接收

控制台输出了错误信息,这些信息就是在setConfirmCallback方法中写的,这证明了当消息无法到达交换机时就被触发。

接下来把test-exchange1改正确:把key改错误

public void sendConfirm(String msg) { 		logger.info("准备发送消息:{}", msg); 		rabbitTemplate.convertAndSend("test-exchange", "tm1.1", msg) ; 	}

测试:


SpringBoot RabbitMQ消息可靠发送与接收

当交换机正确的时候setConfirmCallback方法的执行是正确的ack为true。但是同时setReturnCallback方法也被调用了,因为我们把key写错了,消息没法路由到正确的queue上,所以回调了。

到此 两种可靠消息发送就结束了,接下来简单说下接受消息。

首先是配置中我们要把消息的确认设置为手动:

acknowledgeMode: MANUAL

接受消息的方法:

@RabbitListener(queues= {"test-queue"}) 	@RabbitHandler 	public void listner(Message message, Channel channel) { 		System.out.println("接受到消息.....income") ; 		byte[] body = message.getBody() ; 		MessageProperties mps = message.getMessageProperties() ; 			String content = new String(body, Charset.forName("UTF-8")) ; 			try { 				System.out.println("接受到消息来自交换机: 【" + mps.getReceivedExchange() + "】, 队列:【" + mps.getConsumerQueue() + "】:\n\t\t内容: " + content) ; 				channel.basicAck(message.getMessageProperties().getDeliveryTag(), true) ; 			} catch (Exception e) { 				e.printStackTrace();         channel.basicReject(mps.getDeliveryTag(), false) ; 			} 	}

这里通过 basicAck应答消息,basicReject拒绝消息。通过手动确认的机制来确保消息的正常消费。