From 6be2e97075b6e1eb3df5ad6aff6d7efeb9214b6e Mon Sep 17 00:00:00 2001 From: kevin jiang Date: Sat, 16 May 2020 01:46:51 +0800 Subject: [PATCH] =?UTF-8?q?Disruptor=20=E4=BD=BF=E7=94=A8=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/kiisoo/ic/job/DTOMessage.java | 18 +++++ .../ic/job/DTOMessageEventTranslator.java | 20 ++++++ .../com/kiisoo/ic/job/DTOMessageHandler.java | 25 +++++++ .../com/kiisoo/ic/job/DTOMessagePublish.java | 23 +++++++ src/main/java/com/kiisoo/ic/job/Demo.java | 67 +++++++++++++++++++ 5 files changed, 153 insertions(+) create mode 100644 src/main/java/com/kiisoo/ic/job/DTOMessage.java create mode 100644 src/main/java/com/kiisoo/ic/job/DTOMessageEventTranslator.java create mode 100644 src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java create mode 100644 src/main/java/com/kiisoo/ic/job/DTOMessagePublish.java create mode 100644 src/main/java/com/kiisoo/ic/job/Demo.java diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessage.java b/src/main/java/com/kiisoo/ic/job/DTOMessage.java new file mode 100644 index 0000000..3f4f2a7 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/DTOMessage.java @@ -0,0 +1,18 @@ +package com.kiisoo.ic.job; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class DTOMessage { + + private String id; + private String name; + private double price; + +} diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessageEventTranslator.java b/src/main/java/com/kiisoo/ic/job/DTOMessageEventTranslator.java new file mode 100644 index 0000000..0c0272d --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/DTOMessageEventTranslator.java @@ -0,0 +1,20 @@ +package com.kiisoo.ic.job; + +import com.lmax.disruptor.EventTranslator; + +import java.util.Random; + +public class DTOMessageEventTranslator implements EventTranslator { + + private Random random = new Random(); + + private DTOMessage generateTradeTransaction(DTOMessage trade) { +// System.out.println("DTOMessageEventTranslator" + trade.toString()); + return trade; + } + + @Override + public void translateTo(DTOMessage dtoMessage, long l) { + this.generateTradeTransaction(dtoMessage); + } +} diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java b/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java new file mode 100644 index 0000000..c603ed0 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java @@ -0,0 +1,25 @@ +package com.kiisoo.ic.job; + +import com.lmax.disruptor.EventHandler; +import lombok.extern.slf4j.Slf4j; + +import java.util.UUID; + +@Slf4j +public class DTOMessageHandler implements EventHandler { + + + /** + * 处理事件, 如入库操作 + * @param dtoMessage + * @param l + * @param b + * @throws Exception + */ + @Override + public void onEvent(DTOMessage dtoMessage, long l, boolean b) throws Exception { + dtoMessage.setId(UUID.randomUUID().toString()); + log.info( dtoMessage.toString()); + // 入库操作 + } +} diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessagePublish.java b/src/main/java/com/kiisoo/ic/job/DTOMessagePublish.java new file mode 100644 index 0000000..1258274 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/DTOMessagePublish.java @@ -0,0 +1,23 @@ +package com.kiisoo.ic.job; + +import com.lmax.disruptor.dsl.Disruptor; + +import java.util.concurrent.CountDownLatch; + +public class DTOMessagePublish implements Runnable { + private Disruptor disruptor; + private CountDownLatch latch; + + public DTOMessagePublish(Disruptor disruptor, CountDownLatch latch) { + this.disruptor = disruptor; + this.latch = latch; + } + + @Override + public void run() { + for (int i = 0; i < 3; i++) { + disruptor.publishEvent(new DTOMessageEventTranslator()); + } + latch.countDown(); + } +} diff --git a/src/main/java/com/kiisoo/ic/job/Demo.java b/src/main/java/com/kiisoo/ic/job/Demo.java new file mode 100644 index 0000000..6ada722 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/Demo.java @@ -0,0 +1,67 @@ +package com.kiisoo.ic.job; + +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.EventHandlerGroup; +import com.lmax.disruptor.dsl.ProducerType; + +import java.util.concurrent.*; + +public class Demo { + + public static void main(String[] args) throws InterruptedException, ExecutionException { + long beginTime=System.currentTimeMillis(); + + int BUFFER_SIZE=1024; + int THREAD_NUMBERS=4; + /* + * createSingleProducer创建一个单生产者的RingBuffer, + * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 + * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 + * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 + */ + final RingBuffer ringBuffer = RingBuffer.createSingleProducer(new EventFactory() { + @Override + public DTOMessage newInstance() { + return new DTOMessage(); + } + }, BUFFER_SIZE,new YieldingWaitStrategy()); + //创建线程池 + ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); + //创建SequenceBarrier + SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + + //创建消息处理器 + BatchEventProcessor transProcessor = new BatchEventProcessor( + ringBuffer, sequenceBarrier, new DTOMessageHandler()); + + //这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略 + ringBuffer.addGatingSequences(transProcessor.getSequence()); + + //把消息处理器提交到线程池 + executors.submit(transProcessor); + //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类 + + Future future=executors.submit(new Callable() { + @Override + public Void call() throws Exception { + long seq; + for(int i=0;i<10000;i++){ + seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块 + + //给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图 + ringBuffer.get(seq).setPrice(Math.random()*9999); + ringBuffer.get(seq).setName("张三" + i); + ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见 + } + return null; + } + }); + future.get();//等待生产者结束 + Thread.sleep(1000);//等上1秒,等消费都处理完成 + transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) + executors.shutdown();//终止线程 + + System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); + } +}