|
|
|
@ -1,67 +1,67 @@
|
|
|
|
|
package com.kiisoo.ic.job;
|
|
|
|
|
|
|
|
|
|
import com.lmax.disruptor.*;
|
|
|
|
|
import com.lmax.disruptor.dsl.Disruptor;
|
|
|
|
|
import com.lmax.disruptor.dsl.EventHandlerGroup;
|
|
|
|
|
import com.lmax.disruptor.dsl.ProducerType;
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
|
|
|
|
public class Demo {
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) throws InterruptedException, ExecutionException {
|
|
|
|
|
long beginTime=System.currentTimeMillis();
|
|
|
|
|
|
|
|
|
|
int BUFFER_SIZE=1024;
|
|
|
|
|
int THREAD_NUMBERS=4;
|
|
|
|
|
/*
|
|
|
|
|
* createSingleProducer创建一个单生产者的RingBuffer,
|
|
|
|
|
* 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。
|
|
|
|
|
* 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率
|
|
|
|
|
* 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略
|
|
|
|
|
*/
|
|
|
|
|
final RingBuffer<DTOMessage> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<DTOMessage>() {
|
|
|
|
|
@Override
|
|
|
|
|
public DTOMessage newInstance() {
|
|
|
|
|
return new DTOMessage();
|
|
|
|
|
}
|
|
|
|
|
}, BUFFER_SIZE,new YieldingWaitStrategy());
|
|
|
|
|
//创建线程池
|
|
|
|
|
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
|
|
|
|
|
//创建SequenceBarrier
|
|
|
|
|
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
|
|
|
|
|
|
|
|
|
|
//创建消息处理器
|
|
|
|
|
BatchEventProcessor<DTOMessage> transProcessor = new BatchEventProcessor<DTOMessage>(
|
|
|
|
|
ringBuffer, sequenceBarrier, new DTOMessageHandler());
|
|
|
|
|
|
|
|
|
|
//这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略
|
|
|
|
|
ringBuffer.addGatingSequences(transProcessor.getSequence());
|
|
|
|
|
|
|
|
|
|
//把消息处理器提交到线程池
|
|
|
|
|
executors.submit(transProcessor);
|
|
|
|
|
//如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
|
|
|
|
|
|
|
|
|
|
Future<?> future=executors.submit(new Callable<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public Void call() throws Exception {
|
|
|
|
|
long seq;
|
|
|
|
|
for(int i=0;i<10000;i++){
|
|
|
|
|
seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块
|
|
|
|
|
|
|
|
|
|
//给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图
|
|
|
|
|
ringBuffer.get(seq).setPrice(Math.random()*9999);
|
|
|
|
|
ringBuffer.get(seq).setName("张三" + i);
|
|
|
|
|
ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
|
|
|
|
|
}
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
future.get();//等待生产者结束
|
|
|
|
|
Thread.sleep(1000);//等上1秒,等消费都处理完成
|
|
|
|
|
transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
|
|
|
|
|
executors.shutdown();//终止线程
|
|
|
|
|
|
|
|
|
|
System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//package com.kiisoo.ic.job;
|
|
|
|
|
//
|
|
|
|
|
//import com.lmax.disruptor.*;
|
|
|
|
|
//import com.lmax.disruptor.dsl.Disruptor;
|
|
|
|
|
//import com.lmax.disruptor.dsl.EventHandlerGroup;
|
|
|
|
|
//import com.lmax.disruptor.dsl.ProducerType;
|
|
|
|
|
//
|
|
|
|
|
//import java.util.concurrent.*;
|
|
|
|
|
//
|
|
|
|
|
//public class Demo {
|
|
|
|
|
//
|
|
|
|
|
// public static void main(String[] args) throws InterruptedException, ExecutionException {
|
|
|
|
|
// long beginTime=System.currentTimeMillis();
|
|
|
|
|
//
|
|
|
|
|
// int BUFFER_SIZE=1024;
|
|
|
|
|
// int THREAD_NUMBERS=4;
|
|
|
|
|
// /*
|
|
|
|
|
// * createSingleProducer创建一个单生产者的RingBuffer,
|
|
|
|
|
// * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。
|
|
|
|
|
// * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率
|
|
|
|
|
// * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略
|
|
|
|
|
// */
|
|
|
|
|
// final RingBuffer<DTOMessage> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<DTOMessage>() {
|
|
|
|
|
// @Override
|
|
|
|
|
// public DTOMessage newInstance() {
|
|
|
|
|
// return new DTOMessage();
|
|
|
|
|
// }
|
|
|
|
|
// }, BUFFER_SIZE,new YieldingWaitStrategy());
|
|
|
|
|
// //创建线程池
|
|
|
|
|
// ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
|
|
|
|
|
// //创建SequenceBarrier
|
|
|
|
|
// SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
|
|
|
|
|
//
|
|
|
|
|
// //创建消息处理器
|
|
|
|
|
// BatchEventProcessor<DTOMessage> transProcessor = new BatchEventProcessor<DTOMessage>(
|
|
|
|
|
// ringBuffer, sequenceBarrier, new DTOMessageHandler());
|
|
|
|
|
//
|
|
|
|
|
// //这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略
|
|
|
|
|
// ringBuffer.addGatingSequences(transProcessor.getSequence());
|
|
|
|
|
//
|
|
|
|
|
// //把消息处理器提交到线程池
|
|
|
|
|
// executors.submit(transProcessor);
|
|
|
|
|
// //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
|
|
|
|
|
//
|
|
|
|
|
// Future<?> future=executors.submit(new Callable<Void>() {
|
|
|
|
|
// @Override
|
|
|
|
|
// public Void call() throws Exception {
|
|
|
|
|
// long seq;
|
|
|
|
|
// for(int i=0;i<10000;i++){
|
|
|
|
|
// seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块
|
|
|
|
|
//
|
|
|
|
|
// //给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图
|
|
|
|
|
// ringBuffer.get(seq).setPrice(Math.random()*9999);
|
|
|
|
|
// ringBuffer.get(seq).setName("张三" + i);
|
|
|
|
|
// ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
|
|
|
|
|
// }
|
|
|
|
|
// return null;
|
|
|
|
|
// }
|
|
|
|
|
// });
|
|
|
|
|
// future.get();//等待生产者结束
|
|
|
|
|
// Thread.sleep(1000);//等上1秒,等消费都处理完成
|
|
|
|
|
// transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
|
|
|
|
|
// executors.shutdown();//终止线程
|
|
|
|
|
//
|
|
|
|
|
// System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
|
|
|
|
|
// }
|
|
|
|
|
//}
|
|
|
|
|