|
|
|
|
package com.kiisoo.ic.job.detail;
|
|
|
|
|
|
|
|
|
|
import com.kiisoo.ic.customer.CustomerService;
|
|
|
|
|
import com.kiisoo.ic.employee.entity.PrivilageCpUserDO;
|
|
|
|
|
import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper;
|
|
|
|
|
import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper;
|
|
|
|
|
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
|
|
|
|
|
import com.kiisoo.ic.wx.service.QWMailListManageService;
|
|
|
|
|
import com.lmax.disruptor.*;
|
|
|
|
|
import com.lmax.disruptor.dsl.Disruptor;
|
|
|
|
|
import com.lmax.disruptor.dsl.ProducerType;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Disruptor多个消费者不重复处理生产者发送过来的消息
|
|
|
|
|
*/
|
|
|
|
|
@Service
|
|
|
|
|
@Slf4j
|
|
|
|
|
public class CustomerDataJob {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
|
|
|
|
|
private static final int BUFFER_SIZE = 1024 * 1024;
|
|
|
|
|
//固定线程数
|
|
|
|
|
private static final int THREAD_NUMBERS = 30;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
private PrivilageCpUserDOMapper privilageCpUserDOMapper;
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
private QWMailListManageService qwMailListManageService;
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
private PoiStoreStaffDOMapper poiStoreStaffDOMapper;
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
private CustomerService customerService;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void handle(){
|
|
|
|
|
|
|
|
|
|
//创建线程池
|
|
|
|
|
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
|
|
|
|
|
// 创建缓冲池
|
|
|
|
|
Disruptor<DetailDTOMessage> disruptor = new Disruptor<>(new DetailDTOMessageFactory(), BUFFER_SIZE, executors,
|
|
|
|
|
ProducerType.SINGLE, new BlockingWaitStrategy());
|
|
|
|
|
RingBuffer<DetailDTOMessage> ringBuffer = disruptor.getRingBuffer();
|
|
|
|
|
|
|
|
|
|
// 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息)
|
|
|
|
|
DetailDTOMessageGroupConsumer[] consumers = new DetailDTOMessageGroupConsumer[THREAD_NUMBERS];
|
|
|
|
|
for (int i = 0; i < consumers.length; i++) {
|
|
|
|
|
consumers[i] = new DetailDTOMessageGroupConsumer();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
disruptor.handleEventsWithWorkerPool(consumers);
|
|
|
|
|
|
|
|
|
|
disruptor.start();
|
|
|
|
|
|
|
|
|
|
DetailDTOMessageEventProducer producer = new DetailDTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper);
|
|
|
|
|
List<PrivilageCpUserDO> privilageCpUserDOS = privilageCpUserDOMapper.selectList(null);
|
|
|
|
|
int index =0;
|
|
|
|
|
for (PrivilageCpUserDO privilageCpUserDO : privilageCpUserDOS) {
|
|
|
|
|
Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(privilageCpUserDO.getCpUserId());
|
|
|
|
|
producer.produceData(index,storeId,privilageCpUserDO.getCpUserId());
|
|
|
|
|
index++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
disruptor.shutdown();
|
|
|
|
|
}
|
|
|
|
|
}
|