package com.kiisoo.ic.job; import com.kiisoo.ic.customer.CustomerService; import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; import com.kiisoo.ic.wx.service.QWMailListManageService; import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.*; /** * Disruptor多个消费者不重复处理生产者发送过来的消息 */ @Service @Slf4j public class CustomerDataJob { // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能 private static final int BUFFER_SIZE = 1024 * 1024; //固定线程数 private static final int THREAD_NUMBERS = 80; @Autowired private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; @Autowired private QWMailListManageService qwMailListManageService; @Autowired private PoiStoreStaffDOMapper poiStoreStaffDOMapper; @Autowired private CustomerService customerService; public void handle(){ //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); // 创建缓冲池 Disruptor disruptor = new Disruptor<>(new DTOMessageFactory(), BUFFER_SIZE, executors, ProducerType.SINGLE, new BlockingWaitStrategy()); RingBuffer ringBuffer = disruptor.getRingBuffer(); // 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息) DTOMessageGroup1Consumer[] consumers = new DTOMessageGroup1Consumer[THREAD_NUMBERS]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new DTOMessageGroup1Consumer(); } disruptor.handleEventsWithWorkerPool(consumers); disruptor.start(); DTOMessageEventProducer producer = new DTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper); List privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null); int index =0; for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) { String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId()); producer.produceData(index,privilageCpUserStoreDO, cpUserId); index++; } disruptor.shutdown(); } }