对于大部分开发者而言,Disruptor较为小众。尽管如此,在Log4j2、Storm 、HBase、Camel与SOFATracer等众多较为重要的项目中,您却总能够看到它的身影。

01 Disruptor简介

Disruptor是由 LMAX公司开源的一款高性能的内存队列,旨在解决传统 Java 内存队列的延迟和性能问题。它源于 LMAX 对高并发、高性能及无锁算法的研究,通过环形缓冲区、无锁设计和事件多播等特性,实现低延迟和高吞吐量。
相对于Java的内置队列,如:ArrayBlockingQueue(加锁有界)、LinkedBlockingQueue(加锁有界,可选无界)、LinkedTransferQueue(无锁无界)、ConcurrentLinkedQueue(无锁无界)、DelayQueue(加锁无界)与PriorityBlockingQueue(加锁无界)。这些队列要不就是加锁有界、加锁无界或者无锁无界,加锁势必会影响性能,而无界又存在着内存溢出的风险等问题。
而Disruptor 则在无锁的情况下还能保证队列有界,并且还保证了线程安全,即:
  • 使用 CAS 和序列号(Sequence)实现无锁并发,优于 ReentrantLock 的锁开销;

  • 使用Ring Buffer 固定大小(必须为 2 的幂),属于有界队列。

下面的这张图是 Disruptor 官网提供的 Disruptor 和 ArrayBlockingQueue 的延迟直方图对比。

根据早期的 Disruptor 官方介绍,基于 Disruptor 开发的零售金融交易平台单线程可支撑每秒 600 万订单,延迟低至纳秒级。也因此,您总能在一些顶级的开源项目中看到它的身影。例如,Log4j 2的异步模式就采用了Disruptor技术,下图是官网提供的 Log4j2 的性能压测结果:

其中,Loggers all async采用的是Disruptor,而Async Appender则采用的是ArrayBlockingQueue队列。由上图可见,单线程情况下,Loggers all async与Async Appender吞吐量相差不大,但是在64个线程的时候,Loggers all async的吞吐量则比Async Appender增加了12倍,是Sync模式的68倍。

02 核心概念

Disruptor一些主要的核心概念如下所示。 

2.1 Event

Event 是 Disruptor 中存储数据的单位,您也可以把 Event 理解为存放在队列中等待消费的消息对象。它不是Disruptor定义的一种数据类型,而是由用户来定义的一种对象结构,如下示例所示:

public class DataEvent <T> {
    private T data;
    public T getData() {        return data;    }
    public void setData(final T data) {        this.data = data;    }}

2.2 EventFactory

EventFactory 用于预分配 Ring Buffer 中的事件对象,以确保初始化时填充正确的事件实例,如下示例代码所示:

import com.lmax.disruptor.EventFactory;
public class DataEventFactory <T> implements EventFactory<DataEvent<T>> {
    @Override    public DataEvent<TnewInstance() {        return new DataEvent<>();    }}

2.3 Event Handler

EventHandler 是 Disruptor 中定义消费逻辑的核心接口,负责处理从 Ring Buffer 读取的事件。它用于定义具体的消费逻辑,处理事件数据,同时可支持复杂的工作流。您可以将其理解为生产消费者模型中的消费者。

当然,为了提升处理性能,您也可以这里对事件进行批量处理,达到 batchSizeThreshold 或 endOfBatch 时再提交,以优化性能。如下示例代码所示:

import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;import java.util.List;
public class DataEventHandler<T> implements EventHandler<DataEvent<T>> {    private final String handlerName;    private final List<T> batchMessages;    private final int batchSizeThreshold;
    public DataEventHandler(String handlerName, int batchSizeThreshold) {        this.handlerName = handlerName;        this.batchMessages = new ArrayList<>();        this.batchSizeThreshold = batchSizeThreshold;    }
    @Override    public void onEvent(DataEvent<T> event, long sequence, boolean endOfBatch) throws Exception {        try {            T message = event.getData();            System.out.println(handlerName + ' processing: ' + message +                    ' at sequence ' + sequence +                    ', endOfBatch: ' + endOfBatch);            batchMessages.add(message);            if (batchMessages.size() >= batchSizeThreshold || endOfBatch) {                processBatch();            }            Thread.sleep(100); // 模拟耗时操作        } catch (Exception e) {            System.err.println(handlerName + ' failed at sequence ' + sequence + ': ' + e.getMessage());            throw e;        }    }
    private void processBatch() {        System.out.println(handlerName + ' submitting batch of ' + batchMessages.size() +                ' messages: ' + batchMessages);        batchMessages.clear();    }}

定义好EventHandler后,Disruptor 提供如下三种处理方式:

  • 单一处理:

disruptor.handleEventsWith(new DataEventHandler('Handler1',5));
  • 并行处理:

disruptor.handleEventsWith(    new DataEventHandler('Handler1',5),    new DataEventHandler('Handler2',5));
  • 串行处理:

如下所示,Handler2 等待 Handler1 处理完成,以确保事件按顺序处理:

disruptor.handleEventsWith(new DataEventHandler('Handler1',5))         .then(new DataEventHandler('Handler2',5));

当调用 disruptor.handleEventsWith 设置消息的处理器时,我们提供的 EventHandler 会被包装为 BatchEventProcessor,BatchEventProcessor实现了 Runnable 接口,如下图所示。

2.4 Event Processor

Event Processor负责从 Ring Buffer 读取事件并执行消费逻辑,您可以通过实现EventProcessor或者BatchEventProcessor(支持批量处理)来实现。

2.5 Sequence

Sequence 是 Disruptor 中用于记录 Ring Buffer 槽位位置的无锁线程安全计数器,是实现无锁并发和高性能的关键组件。在 Disruptor 4.0.0 中,它通过 VarHandle 提供的 CAS (Compare-And-Swap)操作和内存屏障来管理生产者和消费者的进度,以确保线程安全。通过前后各填充 7 个 long 类型变量(共 56 字节),使 value 字段独占 64 字节缓存行,有效解决伪共享问题。

2.6 Sequencer

Sequencer 是 Disruptor 中协调生产者写入 Ring Buffer 的核心组件,负责无锁化的槽位分配、线程安全控制和数据覆盖防护。它通过 CAS(Compare-And-Swap)操作和序列号(Sequence)实现无锁并发,分为 SingleProducerSequencer 和 MultiProducerSequencer 两种实现,分别适用于单一生产者和多生产者场景。

2.7 Sequence Barrier

Sequence Barrier 是 Disruptor 中用于管理消费者读取进度的核心机制,确保消费者只读取已发布的事件,防止读取未生产或未完成的数据。它通过跟踪生产者和消费者的序列号(Sequence),结合等待策略(Wait Strategy),实现线程安全的消费控制,同时支持复杂的消费者依赖图。其主要负责:

  • 防止越界读取:确保消费者只读取生产者已发布的有效事件,防止访问未写入或未完全发布的数据。

  • 协调生产与消费:通过跟踪生产者的 cursor 和消费者的 Sequence,实现线程安全的进度控制。

  • 支持依赖图:允许消费者依赖其他消费者的序列号,形成复杂的工作流(如串行或并行处理)。

  • 等待策略:结合 Wait Strategy,控制消费者在等待新事件时的行为(如阻塞、自旋等)。

  • 通知机制:通过 alert 机制通知消费者新事件或异常状态。

2.8 WaitStrategy

WaitStrategy 是 Disruptor 中用于控制消费者在等待新事件时的行为的组件,负责在生产者尚未发布所需序列号时,决定消费者如何等待(如阻塞、自旋、超时等)。它与 Sequence Barrier 紧密协作,通过平衡 CPU 占用、延迟和吞吐量,适应不同场景的需求。它的核心作用包括:

  • 控制等待行为:决定消费者在等待指定序列号可用时的行为,如阻塞线程、忙循环或让出 CPU。

  • 平衡性能通过不同策略优化 CPU 占用、延迟和吞吐量,适应高并发、低延迟或资源受限的场景。

  • 线程安全:与 Sequence Barrier 协作,确保消费者只读取已发布的有效事件。

  • 通知机制:通过信号机制(如 signalAllWhenBlocking)唤醒等待的消费者,响应新事件或异常。

Disruptor提供的一些主要的等待策略如下:

  • BlockingWaitStrategy:使用锁和条件变量阻塞等待,节省 CPU 资源;

    解锁高性能环形队列:Disruptor 4.0.0 核心解析与应用实践
  • BusySpinWaitStrategy:通过忙循环等待,最低延迟、但高 CPU 占用;

  • YieldingWaitStrategy:通过 Thread.yield() 让出 CPU,平衡延迟和资源;

  • SleepingWaitStrategy:通过短暂睡眠(如 LockSupport.parkNanos)等待,适合中等延迟场景;

  • TimeoutBlockingWaitStrategy:在 BlockingWaitStrategy 基础上增加超时机制;

  • LiteBlockingWaitStrategy:优化通知机制,仅在有消费者等待时通知;

  • LiteTimeoutBlockingWaitStrategy:结合超时和优化通知;

  • PhasedBackoffWaitStrategy:分阶段等待,先自旋后切换到阻塞。

其中,CPU、延迟与吞吐量三者性能权衡如下:

  • CPU 占用:BusySpinWaitStrategy 最高,BlockingWaitStrategy 最低;

  • 延迟:BusySpinWaitStrategy 最低,BlockingWaitStrategy 较高;

  • 吞吐量:YieldingWaitStrategy 和 SleepingWaitStrategy 在高并发下表现较好。

在真实开发使用场景中,如果您追求较高性能,则可以选择:BusySpinWaitStrategy 或 YieldingWaitStrategy;如果您的资源有限,则可选择BlockingWaitStrategy  或  TimeoutBlockingWaitStrategy。

2.9 Ring Buffer

Ring Buffer 是 Disruptor 的最为核心的数据结构,用于存储Event并协调生产者与消费者之间的数据交换。它是一个固定大小的环形数组,通过预分配事件对象、无锁设计、缓存行优化和批量操作实现高性能和低延迟。Ring Buffer是 Disruptor 高吞吐量和低延迟的关键所在,解决了传统队列(如 LinkedBlockingQueue)在锁竞争、伪共享和垃圾回收上的性能瓶颈。其核心作用包括:

  • 存储Event:保存生产者发布的Event,供消费者处理。

  • 高效读写:通过预分配和循环使用槽位,避免动态内存分配和垃圾回收。

  • 无锁并发:结合 Sequencer 和 Sequence,实现生产者和消费者的线程安全访问。

  • 进度协调:通过Sequence管理生产和消费进度,防止覆盖未消费数据或读取未发布数据。

  • 高性能:利用 CPU 缓存、批量操作和缓存行填充,最大化吞吐量和最小化延迟。

其中,Ring Buffer的核心设计概念如下:

1)环形队列:

Ring Buffer 是一个固定大小的数组,逻辑上形成环形,槽位通过模运算(index = sequence & indexMask)循环使用。固定大小(2 的幂)简化索引计算,优化性能。

2)预分配:

所有槽位在初始化时通过 EventFactory 预分配Event对象,避免运行时动态分配。重用Event对象减少垃圾回收(GC)压力。

3)无锁并发:

生产者通过 Sequencer 申请序列号(Sequence),写入数据;消费者通过 SequenceBarrier 检查可用序列号,读取数据;使用 CAS(Compare-And-Swap)操作避免传统锁。

4)缓存行优化:

关键字段(如 entries)通过填充避免伪共享;序列号(Sequence)独占缓存行,减少多线程竞争。

5)批量操作:

支持批量申请和发布序列号(如 next(n)、publish(lo, hi)),减少元数据更新频率。

下面这张图摘自 Disruptor 官网,展示了 LMAX 系统使用 Disruptor 的示例。图中,每个消费者都有自己的Sequence,通过此Sequence取得自己在环形队列中消费的位置,再通过SequenceBarrier来等待可用Event 的出现,等到Event 出现了就用get方法取出具体的Event 给EventHandler来处理。

03 使用示例

结合上面的示例代码,这里再实现一个生产者EventProducer ,主要用于申请序列、写入事件与发布序列。如下面的示例代码所示:
import com.lmax.disruptor.RingBuffer;
public class EventProducer<T> {    private final RingBuffer<DataEvent<T>> ringBuffer;
    public EventProducer(RingBuffer<DataEvent<T>> ringBuffer) {        this.ringBuffer = ringBuffer;    }    public void produce(T data) {        // 申请序列        long sequence = ringBuffer.next();        try {            DataEvent<T> event = ringBuffer.get(sequence);            // 填充数据            event.setData(data);        } finally {            // 发布序列            ringBuffer.publish(sequence);        }    }}
主程序DisruptorExample如下所示:
import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import java.util.concurrent.Executors;
public class DisruptorExample {
    public static void main(String[] args) throws InterruptedException {        // 创建 Disruptor,Ring Buffer 大小为 1024(2 的幂)        Disruptor<DataEvent<String>> disruptor = new Disruptor<>(                new DataEventFactory<>(),                1024,                Executors.defaultThreadFactory()        );
        // 连接多个 EventHandler        disruptor.handleEventsWith(                new DataEventHandler<>('Handler1',5),                new DataEventHandler<>('Handler2',5)        );
        // 启动 Disruptor        RingBuffer<DataEvent<String>> ringBuffer = disruptor.start();
        // 创建生产者        EventProducer<String> producer = new EventProducer<>(ringBuffer);
        // 生产 10 条消息        for (int= 0; i < 10; i++) {            producer.produce('Hello Disruptor #' + i);            Thread.sleep(500); // 模拟生产间隔        }
        // 模拟等待消费者处理完成并关闭        Thread.sleep(2000);         disruptor.shutdown();    }}
最后,别忘记引入Disruptor Maven包:
<dependency>   <groupId>com.lmax</groupId>   <artifactId>disruptor</artifactId>   <version>4.0.0</version></dependency>
示例运行结果如下图所示:

04 最后小结

毋庸置疑,Disruptor 通过环形队列、无锁设计、缓存行填充、批量处理、预申请机制和线程安全等技术,解决了传统队列在高并发下的性能瓶颈与完全问题。它适用于高频交易、日志处理与实时数据处理等场景,其性能也远超传统队列。但是,固定大小的Ring Buffer 可能导致内存浪费,您需根据实际场景预估容量。同时,在多生产者场景下,CAS 竞争可能导致瓶颈,因此需优化生产者数量。更多相关的资料,请参考如下官网资料链接:
https://github.com/LMAX-Exchange/disruptorhttps://lmax-exchange./disruptor/user-guide/index.htmlhttps:///articles/lmax.html