package com.kiisoo.ic.job.detail; import com.kiisoo.ic.customer.CustomerService; import com.kiisoo.ic.employee.entity.PrivilageCpUserDO; import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper; import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; import com.kiisoo.ic.wx.service.QWMailListManageService; import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import java.util.concurrent.*; /** * Disruptor多个消费者不重复处理生产者发送过来的消息 */ @Service @Slf4j public class CustomerDataJob { // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能 private static final int BUFFER_SIZE = 1024 * 1024; //固定线程数 private static final int THREAD_NUMBERS = 30; @Autowired private PrivilageCpUserDOMapper privilageCpUserDOMapper; @Autowired private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; @Autowired private QWMailListManageService qwMailListManageService; @Autowired private PoiStoreStaffDOMapper poiStoreStaffDOMapper; @Autowired private CustomerService customerService; public void handle(){ //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); // 创建缓冲池 Disruptor disruptor = new Disruptor<>(new DetailDTOMessageFactory(), BUFFER_SIZE, executors, ProducerType.SINGLE, new BlockingWaitStrategy()); RingBuffer ringBuffer = disruptor.getRingBuffer(); // 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息) DetailDTOMessageGroup1Consumer[] consumers = new DetailDTOMessageGroup1Consumer[THREAD_NUMBERS]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new DetailDTOMessageGroup1Consumer(); } disruptor.handleEventsWithWorkerPool(consumers); disruptor.start(); DetailDTOMessageEventProducer producer = new DetailDTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper); List privilageCpUserDOS = privilageCpUserDOMapper.selectList(null); int index =0; for (PrivilageCpUserDO privilageCpUserDO : privilageCpUserDOS) { Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(privilageCpUserDO.getCpUserId()); producer.produceData(index,storeId,privilageCpUserDO.getCpUserId()); index++; } disruptor.shutdown(); } }