diff --git a/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java b/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java index efa969d..7d56e4c 100644 --- a/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java +++ b/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java @@ -1,95 +1,95 @@ -package com.kiisoo.ic.job; - -import com.kiisoo.ic.customer.CustomerService; -import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; -import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; -import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; -import com.kiisoo.ic.wx.service.QWMailListManageService; -import com.lmax.disruptor.*; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.List; -import java.util.concurrent.*; - -@Service -@Slf4j -public class CustomerSyncJob { - - @Autowired - private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; - - @Autowired - private QWMailListManageService qwMailListManageService; - - @Autowired - private PoiStoreStaffDOMapper poiStoreStaffDOMapper; - - @Autowired - private CustomerService customerService; - - public void syncCustomer() throws ExecutionException, InterruptedException { - long beginTime = System.currentTimeMillis(); - - int BUFFER_SIZE = 1024 * 1024; - int THREAD_NUMBERS = 10; - /* - * createSingleProducer创建一个单生产者的RingBuffer, - * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 - * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 - * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 - */ - final RingBuffer ringBuffer = RingBuffer.createSingleProducer(new EventFactory() { - @Override - public DTOMessage newInstance() { - return new DTOMessage(); - } - }, BUFFER_SIZE, new YieldingWaitStrategy()); - //创建线程池 - ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); - //创建SequenceBarrier - SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); - - //创建消息处理器 - BatchEventProcessor transProcessor = new BatchEventProcessor( - ringBuffer, sequenceBarrier, new DTOMessageHandler()); - - //这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略 - ringBuffer.addGatingSequences(transProcessor.getSequence()); - - //把消息处理器提交到线程池 - executors.submit(transProcessor); - //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类 - - Future future = executors.submit(new Callable() { - @Override - public Void call() throws Exception { - List privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null); - long seq; - for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) { - String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId()); - - seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块 - - //给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图 - ringBuffer.get(seq).setCpUserId(cpUserId); - ringBuffer.get(seq).setPrivilageCpUserStoreDO(privilageCpUserStoreDO); - ringBuffer.get(seq).setQwMailListManageService(qwMailListManageService); - ringBuffer.get(seq).setCustomerService(customerService); - ringBuffer.get(seq).setPoiStoreStaffDOMapper(poiStoreStaffDOMapper); -// ringBuffer.get(seq).setCustomers(customers); - ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见 - - } - return null; - } - }); - future.get();//等待生产者结束 - Thread.sleep(1000);//等上1秒,等消费都处理完成 - transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) - executors.shutdown();//终止线程 - - System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime)); - } -} +//package com.kiisoo.ic.job; +// +//import com.kiisoo.ic.customer.CustomerService; +//import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; +//import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; +//import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; +//import com.kiisoo.ic.wx.service.QWMailListManageService; +//import com.lmax.disruptor.*; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Service; +// +//import java.util.List; +//import java.util.concurrent.*; +// +//@Service +//@Slf4j +//public class CustomerSyncJob { +// +// @Autowired +// private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; +// +// @Autowired +// private QWMailListManageService qwMailListManageService; +// +// @Autowired +// private PoiStoreStaffDOMapper poiStoreStaffDOMapper; +// +// @Autowired +// private CustomerService customerService; +// +// public void syncCustomer() throws ExecutionException, InterruptedException { +// long beginTime = System.currentTimeMillis(); +// +// int BUFFER_SIZE = 1024 * 1024; +// int THREAD_NUMBERS = 10; +// /* +// * createSingleProducer创建一个单生产者的RingBuffer, +// * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 +// * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 +// * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 +// */ +// final RingBuffer ringBuffer = RingBuffer.createSingleProducer(new EventFactory() { +// @Override +// public DTOMessage newInstance() { +// return new DTOMessage(); +// } +// }, BUFFER_SIZE, new YieldingWaitStrategy()); +// //创建线程池 +// ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); +// //创建SequenceBarrier +// SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); +// +// //创建消息处理器 +// BatchEventProcessor transProcessor = new BatchEventProcessor( +// ringBuffer, sequenceBarrier, new DTOMessageHandler()); +// +// //这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略 +// ringBuffer.addGatingSequences(transProcessor.getSequence()); +// +// //把消息处理器提交到线程池 +// executors.submit(transProcessor); +// //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类 +// +// Future future = executors.submit(new Callable() { +// @Override +// public Void call() throws Exception { +// List privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null); +// long seq; +// for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) { +// String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId()); +// +// seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块 +// +// //给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图 +// ringBuffer.get(seq).setCpUserId(cpUserId); +// ringBuffer.get(seq).setStoreId(privilageCpUserStoreDO); +// ringBuffer.get(seq).setQwMailListManageService(qwMailListManageService); +// ringBuffer.get(seq).setCustomerService(customerService); +// ringBuffer.get(seq).setPoiStoreStaffDOMapper(poiStoreStaffDOMapper); +//// ringBuffer.get(seq).setCustomers(customers); +// ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见 +// +// } +// return null; +// } +// }); +// future.get();//等待生产者结束 +// Thread.sleep(1000);//等上1秒,等消费都处理完成 +// transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) +// executors.shutdown();//终止线程 +// +// System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime)); +// } +//}