Disruptor(五)DSL相关实战

本文主要讲解使用Disruptor的DSL演示生产者和消费者的数据交换,和以往的线程间通信不同,disruptor使用消息传递的方式,通过RingBuffer进行线程间的数据传递和通信,下面分别从一对一和多对一的模型进行讲解。

下面主要通过计算区间[0 , 100000000)中的所有数值相加为例子讲解Disruptor中的dsl使用。

1、一对一

一个生产者和一个消费者之间进行数据传递,使用disruptor主要涉及到RingBuffer中的ValueEvent定义,ValueAdditionEventHandler消费者处理,以及生产者发布。

1.1 RingBufferValueEvent定义

package com.lmax.disruptor.charles;

import com.lmax.disruptor.EventFactory;
 
public final class ValueEvent {
 
    private long value;
    public long getValue() {
        return value;
    }
 
    public void setValue(final long value) {
        this.value = value;
    }
 
    public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
        public ValueEvent newInstance() {
            return new ValueEvent();
        }
    };
 
}

1.2 ValueAdditionEventHandler消费者数据处理

package com.lmax.disruptor.charles;
 
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.util.PaddedLong;
 
import java.util.concurrent.CountDownLatch;
 
public final class ValueAdditionEventHandler implements EventHandler<ValueEvent> {
 
    private final PaddedLong value = new PaddedLong();
    private long count;
    private CountDownLatch latch;
 
    public long getValue() {
        return value.get();
    }
 
    public void reset(final CountDownLatch latch, final long expectedCount) {
        value.set(0L);
        this.latch = latch;
        count = expectedCount;
    }
 
    @Override
    public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
        value.set(value.get() + event.getValue());
 
        if (count == sequence) {
            latch.countDown();
        }
    }
 
}

使用CountDownLatch保证在处理完消费者数据后在退出,保证结果的正确性,其中ValueAdditionEventHandler.value为计算结果,每次增加事件中的数值。

1.3 数据发布

/**
 * <pre>
 * UniCast a series of items between 1 publisher and 1 event processor.
 *
 * +----+    +-----+
 * | P1 |--->| EP1 |
 * +----+    +-----+
 *
 * Disruptor:
 * ==========
 *              track to prevent wrap
 *              +------------------+
 *              |                  |
 *              |                  v
 * +----+    +====+    +====+   +-----+
 * | P1 |--->| RB |<---| SB |   | EP1 |
 * +----+    +====+    +====+   +-----+
 *      claim      get    ^        |
 *                        |        |
 *                        +--------+
 *                          waitFor
 *
 * P1  - Publisher 1
 * RB  - RingBuffer
 * SB  - SequenceBarrier
 * EP1 - EventProcessor 1
 *
 * </pre>
 */
public class OneToOneDisruptor {
 
    private static int RING_BUFFER_SIZE = 1024 * 16;
    private static long ITERATIONS      = 1000L * 1000L * 100L;
 
    public static void main(String[] args) throws InterruptedException {
        // 单个生产者ProducerType.SINGLE,消费者的等待策略为YieldingWaitStrategy
        Disruptor<ValueEvent> disruptor =
                new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY,
                        RING_BUFFER_SIZE,
                        DaemonThreadFactory.INSTANCE,
                        ProducerType.SINGLE,
                        new YieldingWaitStrategy());
 
        ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
        // 设置处理者
        disruptor.handleEventsWith(handler);
        // 启动disruptor
        disruptor.start();
 
        // CountDownLatch是为了保证发布的数据被处理完后,才输出结果
        CountDownLatch latch = new CountDownLatch(1);
        long expectedCount = ITERATIONS - 1;
        handler.reset(latch, expectedCount);
 
        // 生产者生产消息,暂时不用translator
        for (int i = 0; i < ITERATIONS; i++) {
            // 生产者设置数据并发布
            long next = disruptor.getRingBuffer().next();
            disruptor.getRingBuffer().get(next).setValue(i);
            disruptor.getRingBuffer().publish(next);
        }
 
        // 闭锁,等所有的发布的数据被处理完成后,向下执行
        latch.await();
        System.out.println("mutiProcess: " + handler.getValue());
        disruptor.shutdown();
        // 单个线程本地计算结果
        locoalCaculate();
    }
 
    /**
     * 单个线程本地计算
     */
    private static void locoalCaculate() {
        long total = 0l;
        for (int i = 0; i < ITERATIONS; i++) {
            total += i;
        }
        System.out.println("local: " + total);
    }
 
}

2、多对一

多个生产者和一个消费者之间进行数据传递,和一对一不同的是,涉及到生产者ValuePublisher定义。和单生产者不同的时,需要让多个生产者同时工作,并且每个生产者处理其中的某个区间,在本例子中将分为2个区间,2个生产者每个发布各自区间中的数据。首先看下ValuePublisher的定义

2.1 生产者ValuePublisher定义  

package com.lmax.disruptor.charles;
 
import java.util.concurrent.CyclicBarrier;
 
import com.lmax.disruptor.RingBuffer;
 
public final class ValuePublisher implements Runnable {
    private final CyclicBarrier cyclicBarrier;
    private final RingBuffer<ValueEvent> ringBuffer;
    private final long start;
    private final long end;
 
    public ValuePublisher(
            final CyclicBarrier cyclicBarrier,
            final RingBuffer<ValueEvent> ringBuffer,
            final long start,
            final long end) {
        this.cyclicBarrier = cyclicBarrier;
        this.ringBuffer = ringBuffer;
        this.start = start;
        this.end = end;
    }
 
    @Override
    public void run() {
        try {
            cyclicBarrier.await();
            for (long i = start; i < end; i++) {
                long sequence = ringBuffer.next();
                ValueEvent event = ringBuffer.get(sequence);
                event.setValue(i);
                ringBuffer.publish(sequence);
            }
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }
 
}

CyclicBarrier确保两个生产者同时生产数据,每个生产者处理[start, end)中数据的发布。

2.2 多生产者数据发布

package com.lmax.disruptor.charles;
 
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
 
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
 
/**
 * <pre>
 *
 * Sequence a series of events from multiple publishers going to one event processor.
 *
 * +----+
 * | P1 |------+
 * +----+      |
 *             v
 * +----+    +-----+
 * | P2 |--->| EP1 |
 * +----+    +-----+
 *
 * Disruptor:
 * ==========
 *             track to prevent wrap
 *             +--------------------+
 *             |                    |
 *             |                    v
 * +----+    +====+    +====+    +-----+
 * | P1 |--->| RB |<---| SB |    | EP1 |
 * +----+    +====+    +====+    +-----+
 *             ^   get    ^         |
 * +----+      |          |         |
 * | P2 |------+          +---------+
 * +----+                   waitFor
 *            
 *
 * P1  - Publisher 1
 * P2  - Publisher 2
 * RB  - RingBuffer
 * SB  - SequenceBarrier
 * EP1 - EventProcessor 1
 *
 * </pre>
 */
public class ManyToOneDisruptor {
 
    private static int RING_BUFFER_SIZE = 1024 * 16;
    private static long ITERATIONS      = 1000L * 1000L * 100L;
 
    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
 
        // 单个生产者ProducerType.MULTI,消费者的等待策略为YieldingWaitStrategy
        Disruptor<ValueEvent> disruptor =
                new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY,
                        RING_BUFFER_SIZE,
                        DaemonThreadFactory.INSTANCE,
                        ProducerType.MULTI,
                        new YieldingWaitStrategy());
 
        ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
        // 设置处理者
        disruptor.handleEventsWith(handler);
        // 启动disruptor
        disruptor.start();
 
        // CountDownLatch是为了保证发布的数据被处理完后,才输出结果
        CountDownLatch latch = new CountDownLatch(1);
        handler.reset(latch, ITERATIONS - 1);
 
        // 保证2个生产者同时生产数据
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
 
        // 定义生产者,以及生产的数据区间[start, end)
        ValuePublisher publisher1 = new ValuePublisher(cyclicBarrier,
                disruptor.getRingBuffer(),
                0,
                ITERATIONS / 2);
        new Thread(publisher1).start();
 
        ValuePublisher publisher2 = new ValuePublisher(cyclicBarrier,
                disruptor.getRingBuffer(),
                ITERATIONS / 2,
                ITERATIONS);
        new Thread(publisher2).start();
 
        // 所有的生产者线程都同时运行
        cyclicBarrier.await();
        // 等待计算完成
        latch.await();
        System.out.println("mutiProcess: " + handler.getValue());
        disruptor.shutdown();
        // 单个线程本地计算
        locoalCaculate();
    }
 
    /**
     * 单个线程本地计算
     */
    private static void locoalCaculate() {
        long total = 0l;
        for (int i = 0; i < ITERATIONS; i++) {
            total += i;
        }
        System.out.println("localProcess: " + total);
    }
 
}

上面主要演示了一对一和多对一的使用,关于其他的使用方式,可以访问 [Disruptor](https://github.com/LMAX-Exchange/disruptor) 进行查看其他的官方例子。