门店号管理

dev_0531
LegnaYet 6 years ago
parent 9c3564620f
commit 1bca6e8a63

@ -1,95 +1,95 @@
package com.kiisoo.ic.job;
import com.kiisoo.ic.customer.CustomerService;
import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO;
import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper;
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.wx.service.QWMailListManageService;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.*;
@Service
@Slf4j
public class CustomerSyncJob {
@Autowired
private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
@Autowired
private QWMailListManageService qwMailListManageService;
@Autowired
private PoiStoreStaffDOMapper poiStoreStaffDOMapper;
@Autowired
private CustomerService customerService;
public void syncCustomer() throws ExecutionException, InterruptedException {
long beginTime = System.currentTimeMillis();
int BUFFER_SIZE = 1024 * 1024;
int THREAD_NUMBERS = 10;
/*
* createSingleProducerRingBuffer
* EventFactoryRingBuffer
* RingBuffer2 &
* RingBuffer( )
*/
final RingBuffer<DTOMessage> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<DTOMessage>() {
@Override
public DTOMessage newInstance() {
return new DTOMessage();
}
}, BUFFER_SIZE, new YieldingWaitStrategy());
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//创建消息处理器
BatchEventProcessor<DTOMessage> transProcessor = new BatchEventProcessor<DTOMessage>(
ringBuffer, sequenceBarrier, new DTOMessageHandler());
//这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略
ringBuffer.addGatingSequences(transProcessor.getSequence());
//把消息处理器提交到线程池
executors.submit(transProcessor);
//如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
Future<?> future = executors.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
List<PrivilageCpUserStoreDO> privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null);
long seq;
for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) {
String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId());
seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块
//给这个区块放入 数据 如果此处不理解想想RingBuffer的结构图
ringBuffer.get(seq).setCpUserId(cpUserId);
ringBuffer.get(seq).setPrivilageCpUserStoreDO(privilageCpUserStoreDO);
ringBuffer.get(seq).setQwMailListManageService(qwMailListManageService);
ringBuffer.get(seq).setCustomerService(customerService);
ringBuffer.get(seq).setPoiStoreStaffDOMapper(poiStoreStaffDOMapper);
// ringBuffer.get(seq).setCustomers(customers);
ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
}
return null;
}
});
future.get();//等待生产者结束
Thread.sleep(1000);//等上1秒等消费都处理完成
transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!
executors.shutdown();//终止线程
System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime));
}
}
//package com.kiisoo.ic.job;
//
//import com.kiisoo.ic.customer.CustomerService;
//import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO;
//import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper;
//import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
//import com.kiisoo.ic.wx.service.QWMailListManageService;
//import com.lmax.disruptor.*;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Service;
//
//import java.util.List;
//import java.util.concurrent.*;
//
//@Service
//@Slf4j
//public class CustomerSyncJob {
//
// @Autowired
// private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
//
// @Autowired
// private QWMailListManageService qwMailListManageService;
//
// @Autowired
// private PoiStoreStaffDOMapper poiStoreStaffDOMapper;
//
// @Autowired
// private CustomerService customerService;
//
// public void syncCustomer() throws ExecutionException, InterruptedException {
// long beginTime = System.currentTimeMillis();
//
// int BUFFER_SIZE = 1024 * 1024;
// int THREAD_NUMBERS = 10;
// /*
// * createSingleProducer创建一个单生产者的RingBuffer
// * 第一个参数叫EventFactory从名字上理解就是“事件工厂”其实它的职责就是产生数据填充RingBuffer的区块。
// * 第二个参数是RingBuffer的大小它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率
// * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略
// */
// final RingBuffer<DTOMessage> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<DTOMessage>() {
// @Override
// public DTOMessage newInstance() {
// return new DTOMessage();
// }
// }, BUFFER_SIZE, new YieldingWaitStrategy());
// //创建线程池
// ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
// //创建SequenceBarrier
// SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//
// //创建消息处理器
// BatchEventProcessor<DTOMessage> transProcessor = new BatchEventProcessor<DTOMessage>(
// ringBuffer, sequenceBarrier, new DTOMessageHandler());
//
// //这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略
// ringBuffer.addGatingSequences(transProcessor.getSequence());
//
// //把消息处理器提交到线程池
// executors.submit(transProcessor);
// //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
//
// Future<?> future = executors.submit(new Callable<Void>() {
// @Override
// public Void call() throws Exception {
// List<PrivilageCpUserStoreDO> privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null);
// long seq;
// for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) {
// String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId());
//
// seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块
//
// //给这个区块放入 数据 如果此处不理解想想RingBuffer的结构图
// ringBuffer.get(seq).setCpUserId(cpUserId);
// ringBuffer.get(seq).setStoreId(privilageCpUserStoreDO);
// ringBuffer.get(seq).setQwMailListManageService(qwMailListManageService);
// ringBuffer.get(seq).setCustomerService(customerService);
// ringBuffer.get(seq).setPoiStoreStaffDOMapper(poiStoreStaffDOMapper);
//// ringBuffer.get(seq).setCustomers(customers);
// ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
//
// }
// return null;
// }
// });
// future.get();//等待生产者结束
// Thread.sleep(1000);//等上1秒等消费都处理完成
// transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!
// executors.shutdown();//终止线程
//
// System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime));
// }
//}

Loading…
Cancel
Save