|  |  |  |  | package com.kiisoo.ic.job.detail; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | import com.kiisoo.ic.customer.CustomerService; | 
					
						
							|  |  |  |  | import com.kiisoo.ic.employee.entity.PrivilageCpUserDO; | 
					
						
							|  |  |  |  | import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper; | 
					
						
							|  |  |  |  | import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; | 
					
						
							|  |  |  |  | import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; | 
					
						
							|  |  |  |  | import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; | 
					
						
							|  |  |  |  | import com.kiisoo.ic.wx.service.QWMailListManageService; | 
					
						
							|  |  |  |  | import com.lmax.disruptor.*; | 
					
						
							|  |  |  |  | import com.lmax.disruptor.dsl.Disruptor; | 
					
						
							|  |  |  |  | import com.lmax.disruptor.dsl.ProducerType; | 
					
						
							|  |  |  |  | import lombok.extern.slf4j.Slf4j; | 
					
						
							|  |  |  |  | import org.springframework.beans.factory.annotation.Autowired; | 
					
						
							|  |  |  |  | import org.springframework.stereotype.Service; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | import java.util.List; | 
					
						
							|  |  |  |  | import java.util.concurrent.*; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | /** | 
					
						
							|  |  |  |  |  * Disruptor多个消费者不重复处理生产者发送过来的消息 | 
					
						
							|  |  |  |  |  */ | 
					
						
							|  |  |  |  | @Service | 
					
						
							|  |  |  |  | @Slf4j | 
					
						
							|  |  |  |  | public class CustomerDataJob { | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
 | 
					
						
							|  |  |  |  |     private static final int BUFFER_SIZE = 1024 * 1024; | 
					
						
							|  |  |  |  |     //固定线程数
 | 
					
						
							|  |  |  |  |     private static final int THREAD_NUMBERS = 30; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     @Autowired | 
					
						
							|  |  |  |  |     private PrivilageCpUserDOMapper privilageCpUserDOMapper; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     @Autowired | 
					
						
							|  |  |  |  |     private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     @Autowired | 
					
						
							|  |  |  |  |     private QWMailListManageService qwMailListManageService; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     @Autowired | 
					
						
							|  |  |  |  |     private PoiStoreStaffDOMapper poiStoreStaffDOMapper; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     @Autowired | 
					
						
							|  |  |  |  |     private CustomerService customerService; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     public void handle(){ | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         //创建线程池
 | 
					
						
							|  |  |  |  |         ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); | 
					
						
							|  |  |  |  |         // 创建缓冲池
 | 
					
						
							|  |  |  |  |         Disruptor<DTOMessage> disruptor = new Disruptor<>(new DTOMessageFactory(), BUFFER_SIZE, executors, | 
					
						
							|  |  |  |  |                 ProducerType.SINGLE, new BlockingWaitStrategy()); | 
					
						
							|  |  |  |  |         RingBuffer<DTOMessage> ringBuffer = disruptor.getRingBuffer(); | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         // 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息)
 | 
					
						
							|  |  |  |  |         DTOMessageGroup1Consumer[] consumers = new DTOMessageGroup1Consumer[THREAD_NUMBERS]; | 
					
						
							|  |  |  |  |         for (int i = 0; i < consumers.length; i++) { | 
					
						
							|  |  |  |  |             consumers[i] = new DTOMessageGroup1Consumer(); | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         disruptor.handleEventsWithWorkerPool(consumers); | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         disruptor.start(); | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         DTOMessageEventProducer producer = new DTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper); | 
					
						
							|  |  |  |  |         List<PrivilageCpUserDO> privilageCpUserDOS = privilageCpUserDOMapper.selectList(null); | 
					
						
							|  |  |  |  |         int index =0; | 
					
						
							|  |  |  |  |         for (PrivilageCpUserDO privilageCpUserDO : privilageCpUserDOS) { | 
					
						
							|  |  |  |  |             Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(privilageCpUserDO.getCpUserId()); | 
					
						
							|  |  |  |  |             producer.produceData(index,storeId,privilageCpUserDO.getCpUserId()); | 
					
						
							|  |  |  |  |             index++; | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         disruptor.shutdown(); | 
					
						
							|  |  |  |  |     } | 
					
						
							|  |  |  |  | } |