01 Disruptor简介
-
使用 CAS 和序列号(Sequence)实现无锁并发,优于 ReentrantLock 的锁开销;
-
使用Ring Buffer 固定大小(必须为 2 的幂),属于有界队列。
根据早期的 Disruptor 官方介绍,基于 Disruptor 开发的零售金融交易平台单线程可支撑每秒 600 万订单,延迟低至纳秒级。也因此,您总能在一些顶级的开源项目中看到它的身影。例如,Log4j 2的异步模式就采用了Disruptor技术,下图是官网提供的 Log4j2 的性能压测结果:
其中,Loggers all async采用的是Disruptor,而Async Appender则采用的是ArrayBlockingQueue队列。由上图可见,单线程情况下,Loggers all async与Async Appender吞吐量相差不大,但是在64个线程的时候,Loggers all async的吞吐量则比Async Appender增加了12倍,是Sync模式的68倍。
02 核心概念
2.1 Event
Event 是 Disruptor 中存储数据的单位,您也可以把 Event 理解为存放在队列中等待消费的消息对象。它不是Disruptor定义的一种数据类型,而是由用户来定义的一种对象结构,如下示例所示:
public class DataEvent <T> {
private T data;
public T getData() {
return data;
}
public void setData(final T data) {
this.data = data;
}
}
2.2 EventFactory
EventFactory 用于预分配 Ring Buffer 中的事件对象,以确保初始化时填充正确的事件实例,如下示例代码所示:
import com.lmax.disruptor.EventFactory;
public class DataEventFactory <T> implements EventFactory<DataEvent<T>> {
@Override
public DataEvent<T> newInstance() {
return new DataEvent<>();
}
}
2.3 Event Handler
EventHandler 是 Disruptor 中定义消费逻辑的核心接口,负责处理从 Ring Buffer 读取的事件。它用于定义具体的消费逻辑,处理事件数据,同时可支持复杂的工作流。您可以将其理解为生产消费者模型中的消费者。
当然,为了提升处理性能,您也可以这里对事件进行批量处理,达到 batchSizeThreshold 或 endOfBatch 时再提交,以优化性能。如下示例代码所示:
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.List;
public class DataEventHandler<T> implements EventHandler<DataEvent<T>> {
private final String handlerName;
private final List<T> batchMessages;
private final int batchSizeThreshold;
public DataEventHandler(String handlerName, int batchSizeThreshold) {
this.handlerName = handlerName;
this.batchMessages = new ArrayList<>();
this.batchSizeThreshold = batchSizeThreshold;
}
@Override
public void onEvent(DataEvent<T> event, long sequence, boolean endOfBatch) throws Exception {
try {
T message = event.getData();
System.out.println(handlerName + ' processing: ' + message +
' at sequence ' + sequence +
', endOfBatch: ' + endOfBatch);
batchMessages.add(message);
if (batchMessages.size() >= batchSizeThreshold || endOfBatch) {
processBatch();
}
Thread.sleep(100); // 模拟耗时操作
} catch (Exception e) {
System.err.println(handlerName + ' failed at sequence ' + sequence + ': ' + e.getMessage());
throw e;
}
}
private void processBatch() {
System.out.println(handlerName + ' submitting batch of ' + batchMessages.size() +
' messages: ' + batchMessages);
batchMessages.clear();
}
}
定义好EventHandler后,Disruptor 提供如下三种处理方式:
-
单一处理:
disruptor.handleEventsWith(new DataEventHandler('Handler1',5));
-
并行处理:
disruptor.handleEventsWith(
new DataEventHandler('Handler1',5),
new DataEventHandler('Handler2',5)
);
-
串行处理:
如下所示,Handler2 等待 Handler1 处理完成,以确保事件按顺序处理:
disruptor.handleEventsWith(new DataEventHandler('Handler1',5))
.then(new DataEventHandler('Handler2',5));
当调用 disruptor.handleEventsWith 设置消息的处理器时,我们提供的 EventHandler 会被包装为 BatchEventProcessor,BatchEventProcessor实现了 Runnable 接口,如下图所示。
2.4 Event Processor
Event Processor负责从 Ring Buffer 读取事件并执行消费逻辑,您可以通过实现EventProcessor或者BatchEventProcessor(支持批量处理)来实现。
2.5 Sequence
Sequence 是 Disruptor 中用于记录 Ring Buffer 槽位位置的无锁线程安全计数器,是实现无锁并发和高性能的关键组件。在 Disruptor 4.0.0 中,它通过 VarHandle 提供的 CAS (Compare-And-Swap)操作和内存屏障来管理生产者和消费者的进度,以确保线程安全。通过前后各填充 7 个 long 类型变量(共 56 字节),使 value 字段独占 64 字节缓存行,有效解决伪共享问题。
2.6 Sequencer
Sequencer 是 Disruptor 中协调生产者写入 Ring Buffer 的核心组件,负责无锁化的槽位分配、线程安全控制和数据覆盖防护。它通过 CAS(Compare-And-Swap)操作和序列号(Sequence)实现无锁并发,分为 SingleProducerSequencer 和 MultiProducerSequencer 两种实现,分别适用于单一生产者和多生产者场景。
2.7 Sequence Barrier
Sequence Barrier 是 Disruptor 中用于管理消费者读取进度的核心机制,确保消费者只读取已发布的事件,防止读取未生产或未完成的数据。它通过跟踪生产者和消费者的序列号(Sequence),结合等待策略(Wait Strategy),实现线程安全的消费控制,同时支持复杂的消费者依赖图。其主要负责:
-
防止越界读取:确保消费者只读取生产者已发布的有效事件,防止访问未写入或未完全发布的数据。
-
协调生产与消费:通过跟踪生产者的 cursor 和消费者的 Sequence,实现线程安全的进度控制。
-
支持依赖图:允许消费者依赖其他消费者的序列号,形成复杂的工作流(如串行或并行处理)。
-
等待策略:结合 Wait Strategy,控制消费者在等待新事件时的行为(如阻塞、自旋等)。
-
通知机制:通过 alert 机制通知消费者新事件或异常状态。
2.8 WaitStrategy
WaitStrategy 是 Disruptor 中用于控制消费者在等待新事件时的行为的组件,负责在生产者尚未发布所需序列号时,决定消费者如何等待(如阻塞、自旋、超时等)。它与 Sequence Barrier 紧密协作,通过平衡 CPU 占用、延迟和吞吐量,适应不同场景的需求。它的核心作用包括:
-
控制等待行为:决定消费者在等待指定序列号可用时的行为,如阻塞线程、忙循环或让出 CPU。
-
平衡性能:通过不同策略优化 CPU 占用、延迟和吞吐量,适应高并发、低延迟或资源受限的场景。
-
线程安全:与 Sequence Barrier 协作,确保消费者只读取已发布的有效事件。
-
通知机制:通过信号机制(如 signalAllWhenBlocking)唤醒等待的消费者,响应新事件或异常。
Disruptor提供的一些主要的等待策略如下:
-
BlockingWaitStrategy:使用锁和条件变量阻塞等待,节省 CPU 资源;
-
BusySpinWaitStrategy:通过忙循环等待,最低延迟、但高 CPU 占用;
-
YieldingWaitStrategy:通过 Thread.yield() 让出 CPU,平衡延迟和资源;
-
SleepingWaitStrategy:通过短暂睡眠(如 LockSupport.parkNanos)等待,适合中等延迟场景;
-
TimeoutBlockingWaitStrategy:在 BlockingWaitStrategy 基础上增加超时机制;
-
LiteBlockingWaitStrategy:优化通知机制,仅在有消费者等待时通知;
-
LiteTimeoutBlockingWaitStrategy:结合超时和优化通知;
-
PhasedBackoffWaitStrategy:分阶段等待,先自旋后切换到阻塞。
其中,CPU、延迟与吞吐量三者性能权衡如下:
-
CPU 占用:BusySpinWaitStrategy 最高,BlockingWaitStrategy 最低;
-
延迟:BusySpinWaitStrategy 最低,BlockingWaitStrategy 较高;
-
吞吐量:YieldingWaitStrategy 和 SleepingWaitStrategy 在高并发下表现较好。
在真实开发使用场景中,如果您追求较高性能,则可以选择:BusySpinWaitStrategy 或 YieldingWaitStrategy;如果您的资源有限,则可选择BlockingWaitStrategy 或 TimeoutBlockingWaitStrategy。
2.9 Ring Buffer
Ring Buffer 是 Disruptor 的最为核心的数据结构,用于存储Event并协调生产者与消费者之间的数据交换。它是一个固定大小的环形数组,通过预分配事件对象、无锁设计、缓存行优化和批量操作实现高性能和低延迟。Ring Buffer是 Disruptor 高吞吐量和低延迟的关键所在,解决了传统队列(如 LinkedBlockingQueue)在锁竞争、伪共享和垃圾回收上的性能瓶颈。其核心作用包括:
-
存储Event:保存生产者发布的Event,供消费者处理。
-
高效读写:通过预分配和循环使用槽位,避免动态内存分配和垃圾回收。
-
无锁并发:结合 Sequencer 和 Sequence,实现生产者和消费者的线程安全访问。
-
进度协调:通过Sequence管理生产和消费进度,防止覆盖未消费数据或读取未发布数据。
-
高性能:利用 CPU 缓存、批量操作和缓存行填充,最大化吞吐量和最小化延迟。
其中,Ring Buffer的核心设计概念如下:
1)环形队列:
Ring Buffer 是一个固定大小的数组,逻辑上形成环形,槽位通过模运算(index = sequence & indexMask)循环使用。固定大小(2 的幂)简化索引计算,优化性能。
2)预分配:
所有槽位在初始化时通过 EventFactory 预分配Event对象,避免运行时动态分配。重用Event对象减少垃圾回收(GC)压力。
3)无锁并发:
生产者通过 Sequencer 申请序列号(Sequence),写入数据;消费者通过 SequenceBarrier 检查可用序列号,读取数据;使用 CAS(Compare-And-Swap)操作避免传统锁。
4)缓存行优化:
关键字段(如 entries)通过填充避免伪共享;序列号(Sequence)独占缓存行,减少多线程竞争。
5)批量操作:
支持批量申请和发布序列号(如 next(n)、publish(lo, hi)),减少元数据更新频率。
下面这张图摘自 Disruptor 官网,展示了 LMAX 系统使用 Disruptor 的示例。图中,每个消费者都有自己的Sequence,通过此Sequence取得自己在环形队列中消费的位置,再通过SequenceBarrier来等待可用Event 的出现,等到Event 出现了就用get方法取出具体的Event 给EventHandler来处理。
03 使用示例
import com.lmax.disruptor.RingBuffer;
public class EventProducer<T> {
private final RingBuffer<DataEvent<T>> ringBuffer;
public EventProducer(RingBuffer<DataEvent<T>> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void produce(T data) {
// 申请序列
long sequence = ringBuffer.next();
try {
DataEvent<T> event = ringBuffer.get(sequence);
// 填充数据
event.setData(data);
} finally {
// 发布序列
ringBuffer.publish(sequence);
}
}
}
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
public class DisruptorExample {
public static void main(String[] args) throws InterruptedException {
// 创建 Disruptor,Ring Buffer 大小为 1024(2 的幂)
Disruptor<DataEvent<String>> disruptor = new Disruptor<>(
new DataEventFactory<>(),
1024,
Executors.defaultThreadFactory()
);
// 连接多个 EventHandler
disruptor.handleEventsWith(
new DataEventHandler<>('Handler1',5),
new DataEventHandler<>('Handler2',5)
);
// 启动 Disruptor
RingBuffer<DataEvent<String>> ringBuffer = disruptor.start();
// 创建生产者
EventProducer<String> producer = new EventProducer<>(ringBuffer);
// 生产 10 条消息
for (int i = 0; i < 10; i++) {
producer.produce('Hello Disruptor #' + i);
Thread.sleep(500); // 模拟生产间隔
}
// 模拟等待消费者处理完成并关闭
Thread.sleep(2000);
disruptor.shutdown();
}
}
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0</version>
</dependency>
04 最后小结
https://github.com/LMAX-Exchange/disruptor
https://lmax-exchange./disruptor/user-guide/index.html
https:///articles/lmax.html