Disruptor(二)RingBuffer读取

上一章主要介绍Ring Buffer的数据结构,本章主要讲解如何使用Disruptor从Ring Buffer中读取数据。

1、消费者通过ProcessingSequenceBarrier读取数据

能够读取数据的前提是数据已经写入到Ring Buffer中,关于数据的写入,后面一章节会详细讲解。

RingBuffer的元素的大小是2n次方(上面ringBufferSize8,从序号0开始)。消费者(Consumer)是一个想从RingBuffer里读取数据的线程,它可以通过访问ProcessingSequenceBarrier对象和RingBuffer进行交互。消费者也需要知道它将要处理的序号,每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了RingBuffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9

2、消费者BatchEventProcessor

关于消费者如何通过调用SequenceBarrier对象的waitFor()方法,传递它所需要的下一个序号。本章节以BatchEventProcessor批量事件处理器为例进行讲解,首先查看类图。

主要继承EventProcessor接口和Runnable接口,本章主要介绍run方法,对于BatchEventProcessor的初始化暂时不做讲解。

public void run() {
    // 线程是否运行
    if (!running.compareAndSet(false, true)) {
        throw new IllegalStateException("Thread is already running");
    }
    // 将ProcessingSequenceBarrier的alerted设置成false
    sequenceBarrier.clearAlert();
    // start事件处理
    notifyStart();

    T event = null;
    // 获取当前事件处理器的下一个sequence
    long nextSequence = sequence.get() + 1L;
    try {
        while (true) {
            try {
                // 从ProcessingSequenceBarrier获取可用的availableSequence 
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                // 下一个nextSequence比可用的availableSequence小的时候,获取事件,并触发事件处理
                while (nextSequence <= availableSequence) {
                    event = dataProvider.get(nextSequence);
                    // 消费者事件处理
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }
                // 设置当前事件处理器已经处理的sequence
                sequence.set(availableSequence);
            } catch (final TimeoutException e) {
                // 超时处理
                notifyTimeout(sequence.get());
            } catch (final AlertException ex) {
                if (!running.get()) {
                    break;
                }
            } catch (final Throwable ex) {
                // 异常事件处理
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    } finally {
        // 关闭事件处理
        notifyShutdown();
        running.set(false);
    }
}

拿到了数据后,消费者(Consumer)会更新自己的标识(cursor),消费者(Consumer)现在只需要通过简单通过ProcessingSequenceBarrier拿到可用的Ringbuffer中的Sequence序号就可以可以读取数据了。因为这些新的节点的确已经写入了数据(RingBuffer本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。另一个好处是可以用多个消费者(Consumer)去读同一个RingBuffer,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在Disruptor的协调下实现真正的并发数据处理。

3ProcessingSequenceBarrier获取可用序号

在上面的BatchEventProcessor中的run方法中有如下调用

final long availableSequence = sequenceBarrier.waitFor(nextSequence);

获取RingBuffer最大可访问的availableSequence序号,在上面的例子中是10

首先看下ProcessingSequenceBarrier的类图。

其实现了SequenceBarrier接口,用于和RingBuffer之间进行交互,下面主要看下构造函数和waitFor函数。

public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException {
    // 检查clert异常
    checkAlert();
    // 通过waitStrategy策略获取可用的序号,cursorSequence为当前的Sequence,dependentSequence为依赖的Sequence[]
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    // 产生比预期的sequence小,可能序号被重置回老的的oldSequence值
    //可参考https://github.com/LMAX-Exchange/disruptor/issues/76
    if (availableSequence < sequence) {
        return availableSequence;
    }
    // 获取最大的可用的已经发布的sequence,可能比sequence小
    // 会在多生产者中出现,当生产者1获取到序号13,生产者2获取到14;生产者1没发布,生产者2发布,会导致获取的可用序号为12,而sequence为13
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

4、WaitStrategy策略

waitFor函数的主要功能为获取到可用的sequence并返回给事件处理器。SequenceBarrier内部有一个WaitStrategy方法来决定它如何等待这个序号,我现在不会去描述它的细节,代码的注释里已经概括了每一种WaitStrategy的优点和缺点,目前的实现方式主要有以下几种,后续会做详细介绍。

  • BlockingWaitStrategy
  • BusySpinWaitStrategy
  • LiteBlockingWaitStrategy
  • PhasedBackoffWaitStrategy
  • SleepingWaitStrategy
  • TimeoutBlockingWaitStrategy
  • YieldingWaitStrategy