SpringBoot整合Disruptor
Disruptor是一种高性能、低延迟的消息队列框架,专为高吞吐量、低延迟的并发处理设计
核心特性:
- 环形缓冲区:RingBuffer是disruptor的核心数据结构,所有事件都存储在这个缓冲区中。生产者将事件放入缓冲区,消费者从缓冲区中读取事件,环形缓冲区的设计避免了jvm的垃圾回收,并通过内存映射和内存对齐技术提高了内存管理效率
- 无锁设计:disruptor采用了无锁架构,避免了线程之间的锁竞争,从而提高了并发性能
- 高效的内存管理:通过环形缓冲区和内存对齐技术,disruptor在性能上优于传统的队列系统
- 灵活的消费者模型:支持多个消费者并行消费不同的事件流,可以灵活应对复杂的事件处理需求
引入依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
定义事件
@Data
public class OrderEvent {
private String orderId;
private BigDecimal amount;
private LocalDateTime createTime;
}
事件工厂
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
事件处理器
public class LogEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
System.out.println("日志处理:"+orderEvent);
}
}
public class PlayEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
System.out.println("支付处理:"+orderEvent);
}
}
配置类
@Configuration
public class DisruptorConfig {
@Bean
public Disruptor orderDisruptor(){
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
new OrderEventFactory(),//事件工厂
1024*256,//ringBuffer的字节大小
Executors.newFixedThreadPool(2),//处理事件的线程池
ProducerType.SINGLE,//单线程模式
new BlockingWaitStrategy()//等待策略
);
//配置处理链
disruptor.handleEventsWith(new PlayEventHandler())
.then(new LogEventHandler());
return disruptor;
}
@Bean
public RingBuffer orderRingBuffer(){
Disruptor disruptor = orderDisruptor();
//启动disruptor线程
disruptor.start();
//获取ringbuffer环,用于接收生产者事件
return disruptor.getRingBuffer();
}
}
发布事件
@RestController
@RequestMapping("/order")
@RequiredArgsConstructor
public class OrderController {
private final RingBuffer<OrderEvent> ringBuffer;
@PostMapping("{orderId}/{amount}")
public String create(@PathVariable String orderId,
@PathVariable BigDecimal amount){
long sequence = ringBuffer.next();
try {
OrderEvent orderEvent = ringBuffer.get(sequence);
orderEvent.setOrderId(orderId);
orderEvent.setAmount(amount);
orderEvent.setCreateTime(LocalDateTime.now());
}finally {
ringBuffer.publish(sequence);
}
return "ok...";
}
}