Disruptor(三)RingBuffer单生产者写入

上一章主要介绍了消费者从RingBuffer读取数据,本章主要介绍单个生产者如何向RingBuffer数据写入数据。在RingBuffer数据写入过程中如何不要让Ring重叠,写入后通知消费者,生产者一端的批处理,以及多个生产者如何协同工作。

RingBuffer写入数据的过程涉及到两阶段提交(two-phasecommit)

1)生产者需要申请buffer里的下一个节点。

2)当生产者向节点写完数据,需要调用调用publish发布数据。

1、单个生产者SingleProducerSequencer数据写入

在后台由ProducerSequencer负责所有的交互细节,来从RingBuffer中找到下一个节点,然后才允许生产者向它写入数据。

在图中一个生产者写入RingBufferSingleProducerSequencer对象拥有所有正在访问RingBuffer的消费者gatingSequences列表(区别于队列需要追踪队列的头和尾,而且它们有时候会指向相同的位置)。,Disruptor中由消费者负责通知它们处理到了哪个序列号,而不是RingBuffer

如果想确定我们没有让RingBuffer重叠,需要检查所有的消费者们都读到了哪里。在上图中有2个消费者,一个消费者顺利的读到了最大序号13(用蓝色高亮),第二个消费者有点儿落后停在序号6。因此消费者2在赶上消费者1之前要跑完整个RingBuffer一圈的距离。

现在生产者想要写入RingBuffer中序号6占据的节点,因为它是RingBuffer当前游标的下一个节点。但是SingleProducerSequencer明白现在不能写入,因为有一个消费者正在占用它。所以SingleProducerSequencer停下来自旋(spins),等待,直到那个消费者离开。

2、申请下一个节点

现在可以想像消费者2已经处理完了一批节点,并且向前移动了它的序号。可能它挪到了序号9(因为消费端的批处理方式,现实中我会预计它到达13

上图显示了当消费者2挪动到序号9时发生的情况。SingleProducerSequencer会看到下一个节点序号6那个已经可以用了。它会抢占这个节点上的Entry(我还没有特别介绍Entry对象,基本上它是一个放写入到某个序号的RingBuffer数据的桶),把下一个序号(14)更新成Entry的序号,然后把Entry返回给生产者。生产者可以接着往Entry里写入数据。

3、提交新的数据

将生产的数据提交,通知消费之。

绿色表示最近写入的Entry,序号是14,通过publish方法提交,设置RingBuffercursor14,通知消费者14被更新了,可以读取了(不同的WaitStrategy实现以不同的方式来实现提醒,取决于它是否采用阻塞模式)。现在消费者2可以读Entry14的数据进行消费了。

看完上面的原理后下面分析SingleProducerSequencer是如何获取序号和提交数据的。

4SingleProducerSequencer生产者类图

SingleProducerSequencer继承AbstractSequencer,实现了Sequencer接口。

Sequencer提供增加删除消费者序列,创建SequenceBarrier,获取最小序号,和最大发布的序号。

Cursored获取当前的游标。

Sequenced获取当前ringbuffer大小,获取想一个序号,以及提交数据接口。

5、消费者和生产者直接的关联

首先看下AbstractSequencer中定义

// 生产者的当前的游标位置
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 消费者当前处理的序号集合
protected volatile Sequence[] gatingSequences = new Sequence[0];

由于volatile只能保存可见性和禁止编译器优化,当时不能保证互斥性,多线程并发读写的话会有问题。

private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

使用AtomicReferenceFieldUpdater原子字段更新解决多线程更新gatingSequences问题

具体实现参照SequenceGroups中使用CAS进行更新。

public final void addGatingSequences(Sequence... gatingSequences) {
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
public boolean removeGatingSequence(Sequence sequence) {
    return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
}

6、生产者使用next获取下一个可用的序号

public long next(int n) {
    if (n < 1) {
        throw new IllegalArgumentException("n must be > 0");
    }
    // 当前的最小序号(单个生产者为生产者的游标)
    long nextValue = this.nextValue;
    // 下一个序号
    long nextSequence = nextValue + n;
    // 重叠点位置
    long wrapPoint = nextSequence - bufferSize;
    // 缓存的消费者处理的序号
    long cachedGatingSequence = this.cachedValue;
    // wrapPoint > cachedGatingSequence,
    // 重叠位置大于缓存的消费者处理的序号,说明有消费者没有处理完成,不能够防止数据
    // cachedGatingSequence > nextValue
    // 只会在https://github.com/LMAX-Exchange/disruptor/issues/76情况下存在
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
        long minSequence;
        // 等待不重叠后退出循环
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
            // 通知消费者处理事件
            waitStrategy.signalAllWhenBlocking();
            // 生产者等待的时候后自旋,后续需要使用策略
            LockSupport.parkNanos(1L);
        }
        // 缓存消费者和生产者的最小序号
        this.cachedValue = minSequence;
    }
    // 设置生产者下一个可用的的序号
    this.nextValue = nextSequence;
    return nextSequence;
}

7、生产者使用publish发布数据

public void publish(long sequence) { // 设置生产者的游标序号
    cursor.set(sequence);
    // 通知消费者处理事件
    waitStrategy.signalAllWhenBlocking();
}

当发布数据后,消费者sequenceBarrier.waitFor(nextSequence)就能够获取RingBuffer最大可访问的availableSequence序号,处理数据了。

8、消费者消费数据

再回忆下ProcessingSequenceBarrierwaitFor函数,其中调用到了sequencer.getHighestPublishedSequence(sequence,
availableSequence);

public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException {
    // 检查clert异常
    checkAlert();
    // 通过waitStrategy策略获取可用的序号,cursorSequence为当前的Sequence,dependentSequence为依赖的Sequence[]
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
    // 产生比预期的sequence小,可能序号被重置回老的的oldSequence值
    //可参考https://github.com/LMAX-Exchange/disruptor/issues/76
    if (availableSequence < sequence) {
        return availableSequence;
    }
    // 获取最大的可用的已经发布的sequence,可能比sequence小
    // 会在多生产者中出现,当生产者1获取到序号13,生产者2获取到14;生产者1没发布,生产者2发布,会导致获取的可用序号为12,而sequence为13
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
public long getHighestPublishedSequence(long lowerBound, long availableSequence) {
    return availableSequence;
}

SingleProducerSequencer的getHighestPublishedSequence方法中直接返回可用的availableSequence,通知消费者消费数据。通过以上步骤,生产者和消费者就协同起来了。