SpringBoot整合Disruptor

SpringBoot整合Disruptor

起男 44 2025-03-06

SpringBoot整合Disruptor

Disruptor是一种高性能、低延迟的消息队列框架,专为高吞吐量、低延迟的并发处理设计

核心特性:

  • 环形缓冲区:RingBuffer是disruptor的核心数据结构,所有事件都存储在这个缓冲区中。生产者将事件放入缓冲区,消费者从缓冲区中读取事件,环形缓冲区的设计避免了jvm的垃圾回收,并通过内存映射和内存对齐技术提高了内存管理效率
  • 无锁设计:disruptor采用了无锁架构,避免了线程之间的锁竞争,从而提高了并发性能
  • 高效的内存管理:通过环形缓冲区和内存对齐技术,disruptor在性能上优于传统的队列系统
  • 灵活的消费者模型:支持多个消费者并行消费不同的事件流,可以灵活应对复杂的事件处理需求

引入依赖

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.4</version>
        </dependency>

定义事件

@Data
public class OrderEvent {
    private String orderId;
    private BigDecimal amount;
    private LocalDateTime createTime;
}

事件工厂

public class OrderEventFactory implements EventFactory<OrderEvent> {
    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}

事件处理器

public class LogEventHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
        System.out.println("日志处理:"+orderEvent);
    }
}
public class PlayEventHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
        System.out.println("支付处理:"+orderEvent);
    }
}

配置类

@Configuration
public class DisruptorConfig {

    @Bean
    public Disruptor orderDisruptor(){
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
                new OrderEventFactory(),//事件工厂
                1024*256,//ringBuffer的字节大小
                Executors.newFixedThreadPool(2),//处理事件的线程池
                ProducerType.SINGLE,//单线程模式
                new BlockingWaitStrategy()//等待策略
        );
        //配置处理链
        disruptor.handleEventsWith(new PlayEventHandler())
                .then(new LogEventHandler());
        return disruptor;
    }

    @Bean
    public RingBuffer orderRingBuffer(){
        Disruptor disruptor = orderDisruptor();
        //启动disruptor线程
        disruptor.start();
        //获取ringbuffer环,用于接收生产者事件
        return disruptor.getRingBuffer();
    }
}

发布事件

@RestController
@RequestMapping("/order")
@RequiredArgsConstructor
public class OrderController {

    private final RingBuffer<OrderEvent> ringBuffer;

    @PostMapping("{orderId}/{amount}")
    public String create(@PathVariable String orderId,
                         @PathVariable BigDecimal amount){
        long sequence = ringBuffer.next();
        try {
            OrderEvent orderEvent = ringBuffer.get(sequence);
            orderEvent.setOrderId(orderId);
            orderEvent.setAmount(amount);
            orderEvent.setCreateTime(LocalDateTime.now());
        }finally {
            ringBuffer.publish(sequence);
        }
        return "ok...";
    }
}