Disruptor 使用。

dev_0531
kevin jiang 6 years ago
parent 2663dbe851
commit 6be2e97075

@ -0,0 +1,18 @@
package com.kiisoo.ic.job;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class DTOMessage {
private String id;
private String name;
private double price;
}

@ -0,0 +1,20 @@
package com.kiisoo.ic.job;
import com.lmax.disruptor.EventTranslator;
import java.util.Random;
public class DTOMessageEventTranslator implements EventTranslator<DTOMessage> {
private Random random = new Random();
private DTOMessage generateTradeTransaction(DTOMessage trade) {
// System.out.println("DTOMessageEventTranslator" + trade.toString());
return trade;
}
@Override
public void translateTo(DTOMessage dtoMessage, long l) {
this.generateTradeTransaction(dtoMessage);
}
}

@ -0,0 +1,25 @@
package com.kiisoo.ic.job;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
@Slf4j
public class DTOMessageHandler implements EventHandler<DTOMessage> {
/**
*
* @param dtoMessage
* @param l
* @param b
* @throws Exception
*/
@Override
public void onEvent(DTOMessage dtoMessage, long l, boolean b) throws Exception {
dtoMessage.setId(UUID.randomUUID().toString());
log.info( dtoMessage.toString());
// 入库操作
}
}

@ -0,0 +1,23 @@
package com.kiisoo.ic.job;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.CountDownLatch;
public class DTOMessagePublish implements Runnable {
private Disruptor<DTOMessage> disruptor;
private CountDownLatch latch;
public DTOMessagePublish(Disruptor<DTOMessage> disruptor, CountDownLatch latch) {
this.disruptor = disruptor;
this.latch = latch;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
disruptor.publishEvent(new DTOMessageEventTranslator());
}
latch.countDown();
}
}

@ -0,0 +1,67 @@
package com.kiisoo.ic.job;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.*;
public class Demo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long beginTime=System.currentTimeMillis();
int BUFFER_SIZE=1024;
int THREAD_NUMBERS=4;
/*
* createSingleProducerRingBuffer
* EventFactoryRingBuffer
* RingBuffer2 &
* RingBuffer( )
*/
final RingBuffer<DTOMessage> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<DTOMessage>() {
@Override
public DTOMessage newInstance() {
return new DTOMessage();
}
}, BUFFER_SIZE,new YieldingWaitStrategy());
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//创建消息处理器
BatchEventProcessor<DTOMessage> transProcessor = new BatchEventProcessor<DTOMessage>(
ringBuffer, sequenceBarrier, new DTOMessageHandler());
//这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略
ringBuffer.addGatingSequences(transProcessor.getSequence());
//把消息处理器提交到线程池
executors.submit(transProcessor);
//如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
Future<?> future=executors.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
long seq;
for(int i=0;i<10000;i++){
seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块
//给这个区块放入 数据 如果此处不理解想想RingBuffer的结构图
ringBuffer.get(seq).setPrice(Math.random()*9999);
ringBuffer.get(seq).setName("张三" + i);
ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
}
return null;
}
});
future.get();//等待生产者结束
Thread.sleep(1000);//等上1秒等消费都处理完成
transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!
executors.shutdown();//终止线程
System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
}
}
Loading…
Cancel
Save