springboot整合rabbitMQ
声明
javaBean方式
创建交换机
@Bean
Exchange demoExchange(){
return ExchangeBuilder
.directExchange("交换机名称")
.build();
}
创建队列
@Bean
Queue demoQueue(){
return QueueBuilder
.durable("队列名称")
.build();
}
绑定交换机和队列
@Bean
Binding queueBinging(){
return BindingBuilder
.bind(demoQueue())//队列
.to(demoExchange())//交换机
.with("路由key")
.noargs();
}
注解方式
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "队列名",declare = "是否持久化"),
exchange = @Exchange(value = "交换机名",type = "类型",declare = "是否持久化",ignoreDeclarationExceptions = "声明异常则忽略"),
key = "路由key"
)
)
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
//交换机名称、路由key、消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME,ROUTING_KEY,"mag");
}
消费者
@Component
@Slf4j
public class DemoConsumer {
@RabbitListener(queues = "监听的队列名称")
public void getQueue1(Message message){
log.info("接收queue消息:{}",new String(message.getBody()));
}
}
消息接收确认
配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #设置手动消息确认
消费者
@SneakyThrows
@RabbitListener(queues = "监听的队列")
public void getQueue2(Message message,/*@Header Long amqp_deliveryTag*/ Channel channel){
log.info("接收queue2消息:{}",new String(message.getBody()));
//手动消息确认,参数:表示消息投递序号、是否批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
其它函数:
basicNack表示失败确认,参数:
- 消息投递序号
- 是否批量确认
- 值为
true
消息将重新入队列basicReject拒绝消息,与
basicNack
区别在于不能进行批量操作,其他用法很相似。参数:
- 消息投递序号
- 值为
true
消息将重新入队列
发布确认
可以获取消息是否发送到交换机
配置文件
spring:
rabbitmq:
publisher-confirm-type: correlated
修改RabbitTemplate
@Component
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 消息确认,消息只要被rabbitmq接收到就会触发confirmCallback回调
*
* @param correlationData 对象内部只有一个id属性,用来表示当前消息的唯一性
* @param ack 消息投递到broker的状态,true表示成功
* @param s 投递失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack){
log.info("{}消息投递成功",correlationData.getId());
}else {
log.info("{}消息投递失败:{}",correlationData.getId(),s);
}
}
}
生产者
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("交换机名","路由key","消息",correlationData);
消息返回
当消息没有被发送到队列时返回给生产者
配置文件
spring:
rabbitmq:
publisher-returns: true
修改RabbitTemplate
@Component
@Slf4j
public class RabbitConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息退回:消息体:{},响应code:{},响应内容:{},交换机:{},路由key:{}",
returnedMessage.getMessage(),returnedMessage.getReplyCode(),returnedMessage.getReplyText(),
returnedMessage.getExchange(),returnedMessage.getRoutingKey());
}
}
延迟队列
rabbitmq的延迟队列是基于死信队列和消息过期实现的
配置文件
//普通交换机
@Bean
Exchange commonExchange(){
return ExchangeBuilder
.directExchange(COMMON_EXCHANGE)
.build();
}
//死信交换机
@Bean
Exchange deadExchange(){
return ExchangeBuilder
.directExchange(DEAD_EXCHANGE)
.build();
}
//普通队列
@Bean
Queue commonQueue(){
//设置参数
Map<String,Object> arguments = new HashMap<>();
//死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置路由key
arguments.put("x-dead-letter-routing-key",DEAD_BINDING);
//过期时间
arguments.put("x-message-ttl",10000);
return QueueBuilder
.durable(COMMON_QUEUE)
.withArguments(arguments)//设置参数
.build();
}
//死信队列
@Bean
Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
//普通队列和交换机绑定
@Bean
Binding commonBinding(){
return BindingBuilder
.bind(commonQueue())
.to(commonExchange())
.with(COMMON_BINDING)
.noargs();
}
//死信队列和交换机绑定
@Bean
Binding deadBinding(){
return BindingBuilder
.bind(deadQueue())
.to(deadExchange())
.with(DEAD_BINDING)
.noargs();
}
- 生产者:正常向普通交换机发送消息即可
- 消费者:监听死信队列(当普通队列消息过期后,消息会发给死信交换机,然后到死信队列)
事务
配置文件
spring:
rabbitmq:
publisher-confirm-type: none #需要关闭消息的发布确认
修改RabbitTemplate
@Component
@Slf4j
public class RabbitConfig{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setChannelTransacted(true);//开启事务
}
}
配置类
/**
* 启用rabbitmq事务
* @param factory
* @return
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory factory){
return new RabbitTransactionManager(factory);
}
生产者
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")//使用事务
public void send(){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("exchange_name","roting_key","mag",correlationData);
//int i = 1/0;
}
生产者发送消息时,如果发送消息前后发生异常,消息将不会发出
RPC模式
远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束
同步阻塞
在一条消息接受到回复前不能发送其他消息
生产者,使用convertSendAndReceive方法
Object resp=rabbitTemplate.convertSendAndReceive("exchange_name","roting_key","mag");
消费者,方法的返回值就是给生产者的响应
@RabbitListener(queues = "queue_name")
public String getMag(String mag){
log.info("获取队列消息:{}",mag);
return mag+"abc";
}
异步
可以不用等待回复,继续发送别的消息
配置类
/**
* 配置AsyncRabbitTemplate
* SpringBoot 没有默认的AsyncRabbitTemplate注入
* @param rabbitTemplate
* @return
*/
@Bean
AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
return new AsyncRabbitTemplate(rabbitTemplate);
}
生产者
AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("exchange_name","roting_key","mag");
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
log.info("接收响应:{}",result);
}
@Override
public void onFailure(Throwable ex) {
}
});