本文主要讲解使用 Disruptor 的 DSL 演示生产者和消费者的数据交换,和以往的线程间通信不同,disruptor 使用消息传递的方式,通过 RingBuffer 进行线程间的数据传递和通信,下面分别从一对一和多对一的模型进行讲解。
下面主要通过计算区间[0 , 100000000)中的所有数值相加为例子讲解 Disruptor 中的 dsl 使用。
1、一对一
一个生产者和一个消费者之间进行数据传递,使用 disruptor 主要涉及到 RingBuffer 中的 ValueEvent 定义,ValueAdditionEventHandler 消费者处理,以及生产者发布。
1.1 RingBuffer 中 ValueEvent 定义
package com.lmax.disruptor.charles;
import com.lmax.disruptor.EventFactory;
public final class ValueEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(final long value) {
this.value = value;
}
public static final EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
public ValueEvent newInstance() {
return new ValueEvent();
}
};
}
1.2 ValueAdditionEventHandler 消费者数据处理
package com.lmax.disruptor.charles;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.util.PaddedLong;
import java.util.concurrent.CountDownLatch;
public final class ValueAdditionEventHandler implements EventHandler<ValueEvent> {
private final PaddedLong value = new PaddedLong();
private long count;
private CountDownLatch latch;
public long getValue() {
return value.get();
}
public void reset(final CountDownLatch latch, final long expectedCount) {
value.set(0L);
this.latch = latch;
count = expectedCount;
}
@Override
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
value.set(value.get() + event.getValue());
if (count == sequence) {
latch.countDown();
}
}
}
使用 CountDownLatch 保证在处理完消费者数据后在退出,保证结果的正确性,其中 ValueAdditionEventHandler.value 为计算结果,每次增加事件中的数值。
1.3 数据发布
/**
*
* UniCast a series of items between 1 publisher and 1 event processor.
*
* +----+ +-----+
* | P1 |--->| EP1 |
* +----+ +-----+
*
* Disruptor:
* ==========
* track to prevent wrap
* +------------------+
* | |
* | v
* +----+ +====+ +====+ +-----+
* | P1 |--->| RB |<---| SB | | EP1 |
* +----+ +====+ +====+ +-----+
* claim get ^ |
* | |
* +--------+
* waitFor
*
* P1 - Publisher 1
* RB - RingBuffer
* SB - SequenceBarrier
* EP1 - EventProcessor 1
*
*/
public class OneToOneDisruptor {
private static int RING_BUFFER_SIZE = 1024 * 16;
private static long ITERATIONS = 1000L * 1000L * 100L;
public static void main(String[] args) throws InterruptedException {
// 单个生产者ProducerType.SINGLE,消费者的等待策略为YieldingWaitStrategy
Disruptor<ValueEvent> disruptor =
new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY,
RING_BUFFER_SIZE,
DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE,
new YieldingWaitStrategy());
ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
// 设置处理者
disruptor.handleEventsWith(handler);
// 启动disruptor
disruptor.start();
// CountDownLatch是为了保证发布的数据被处理完后,才输出结果
CountDownLatch latch = new CountDownLatch(1);
long expectedCount = ITERATIONS - 1;
handler.reset(latch, expectedCount);
// 生产者生产消息,暂时不用translator
for (int i = 0; i <ITERATIONS; i++) {
// 生产者设置数据并发布
long next = disruptor.getRingBuffer().next();
disruptor.getRingBuffer().get(next).setValue(i);
disruptor.getRingBuffer().publish(next);
}
// 闭锁,等所有的发布的数据被处理完成后,向下执行
latch.await();
System.out.println("mutiProcess: " + handler.getValue());
disruptor.shutdown();
// 单个线程本地计算结果
locoalCaculate();
}
/**
* 单个线程本地计算
*/
private static void locoalCaculate() {
long total = 0l;
for (int i = 0; i <ITERATIONS; i++) {
total += i;
}
System.out.println("local: " + total);
}
}
2、多对一
多个生产者和一个消费者之间进行数据传递,和一对一不同的是,涉及到生产者 ValuePublisher 定义。和单生产者不同的时,需要让多个生产者同时工作,并且每个生产者处理其中的某个区间,在本例子中将分为 2 个区间,2 个生产者每个发布各自区间中的数据。首先看下 ValuePublisher 的定义
2.1 生产者 ValuePublisher 定义
package com.lmax.disruptor.charles;
import java.util.concurrent.CyclicBarrier;
import com.lmax.disruptor.RingBuffer;
public final class ValuePublisher implements Runnable {
private final CyclicBarrier cyclicBarrier;
private final RingBuffer<ValueEvent> ringBuffer;
private final long start;
private final long end;
public ValuePublisher(
final CyclicBarrier cyclicBarrier,
final RingBuffer<ValueEvent> ringBuffer,
final long start,
final long end) {
this.cyclicBarrier = cyclicBarrier;
this.ringBuffer = ringBuffer;
this.start = start;
this.end = end;
}
@Override
public void run() {
try {
cyclicBarrier.await();
for (long i = start; i <end; i++) {
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.get(sequence);
event.setValue(i);
ringBuffer.publish(sequence);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
CyclicBarrier 确保两个生产者同时生产数据,每个生产者处理[start, end)中数据的发布。
2.2 多生产者数据发布
package com.lmax.disruptor.charles;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
/**
*
*
* Sequence a series of events from multiple publishers going to one event processor.
*
* +----+
* | P1 |------+
* +----+ |
* v
* +----+ +-----+
* | P2 |--->| EP1 |
* +----+ +-----+
*
* Disruptor:
* ==========
* track to prevent wrap
* +--------------------+
* | |
* | v
* +----+ +====+ +====+ +-----+
* | P1 |--->| RB |<---| SB | | EP1 |
* +----+ +====+ +====+ +-----+
* ^ get ^ |
* +----+ | | |
* | P2 |------+ +---------+
* +----+ waitFor
*
*
* P1 - Publisher 1
* P2 - Publisher 2
* RB - RingBuffer
* SB - SequenceBarrier
* EP1 - EventProcessor 1
*
*
*/
public class ManyToOneDisruptor {
private static int RING_BUFFER_SIZE = 1024 * 16;
private static long ITERATIONS = 1000L * 1000L * 100L;
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
// 单个生产者ProducerType.MULTI,消费者的等待策略为YieldingWaitStrategy
Disruptor<ValueEvent> disruptor =
new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY,
RING_BUFFER_SIZE,
DaemonThreadFactory.INSTANCE,
ProducerType.MULTI,
new YieldingWaitStrategy());
ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
// 设置处理者
disruptor.handleEventsWith(handler);
// 启动disruptor
disruptor.start();
// CountDownLatch是为了保证发布的数据被处理完后,才输出结果
CountDownLatch latch = new CountDownLatch(1);
handler.reset(latch, ITERATIONS - 1);
// 保证2个生产者同时生产数据
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
// 定义生产者,以及生产的数据区间[start, end)
ValuePublisher publisher1 = new ValuePublisher(cyclicBarrier,
disruptor.getRingBuffer(),
0,
ITERATIONS / 2);
new Thread(publisher1).start();
ValuePublisher publisher2 = new ValuePublisher(cyclicBarrier,
disruptor.getRingBuffer(),
ITERATIONS / 2,
ITERATIONS);
new Thread(publisher2).start();
// 所有的生产者线程都同时运行
cyclicBarrier.await();
// 等待计算完成
latch.await();
System.out.println("mutiProcess: " + handler.getValue());
disruptor.shutdown();
// 单个线程本地计算
locoalCaculate();
}
/**
* 单个线程本地计算
*/
private static void locoalCaculate() {
long total = 0l;
for (int i = 0; i <ITERATIONS; i++) {
total += i;
}
System.out.println("localProcess: " + total);
}
}
上面主要演示了一对一和多对一的使用,关于其他的使用方式,可以访问 Disruptor 进行查看其他的官方例子。