package com.kiisoo.ic.job.detail; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.kiisoo.ic.customer.CustomerService; import com.kiisoo.ic.customer.entity.OpSellerCustomerRelation; import com.kiisoo.ic.customer.mapper.OpSellerCustomerRelationDOMapper; import com.kiisoo.ic.employee.entity.PrivilageCpUserDO; import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper; import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; import com.kiisoo.ic.wx.service.QWMailListManageService; import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * Disruptor多个消费者不重复处理生产者发送过来的消息 */ @Service @Slf4j public class CustomerDataJob { // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能 private static final int BUFFER_SIZE = 1024 * 1024; //固定线程数 private static final int THREAD_NUMBERS = 30; @Autowired private PrivilageCpUserDOMapper privilageCpUserDOMapper; @Autowired private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; @Autowired private OpSellerCustomerRelationDOMapper opSellerCustomerRelationDOMapper; @Autowired private QWMailListManageService qwMailListManageService; @Autowired private PoiStoreStaffDOMapper poiStoreStaffDOMapper; @Autowired private CustomerService customerService; public void handle(String cpUserIds) { //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); // 创建缓冲池 Disruptor disruptor = new Disruptor<>(new DetailDTOMessageFactory(), BUFFER_SIZE, executors, ProducerType.SINGLE, new BlockingWaitStrategy()); RingBuffer ringBuffer = disruptor.getRingBuffer(); // 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息) DetailDTOMessageGroupConsumer[] consumers = new DetailDTOMessageGroupConsumer[THREAD_NUMBERS]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new DetailDTOMessageGroupConsumer(); } disruptor.handleEventsWithWorkerPool(consumers); disruptor.start(); int index = 0; DetailDTOMessageEventProducer producer = new DetailDTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper); if (StringUtils.isBlank(cpUserIds)) { // 获取账号的总数 List privilageCpUserDOS = privilageCpUserDOMapper.selectList(null); // 获取账号的总客户数 List> customerList = opSellerCustomerRelationDOMapper.selectCustomersByCpUserId(); Map customerMap = new HashMap<>(); for (int i = 0; i < customerList.size(); i++) { Map item = customerList.get(i); customerMap.put(item.get("cpUserId").toString(), item.get("sumCustomer")); } for (PrivilageCpUserDO privilageCpUserDO : privilageCpUserDOS) { Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(privilageCpUserDO.getCpUserId()); ProduceDTO produceDTO = new ProduceDTO(); produceDTO.setIndex(index); produceDTO.setStoreId(storeId); produceDTO.setCpUserId(privilageCpUserDO.getCpUserId()); if (null != customerMap.get(privilageCpUserDO.getCpUserId())) { String sumCustomer = customerMap.get(privilageCpUserDO.getCpUserId()).toString(); produceDTO.setSumCustomer(Integer.parseInt(sumCustomer)); } else { produceDTO.setSumCustomer(0); } producer.produceData(produceDTO); index++; } } else { String[] split = cpUserIds.split(","); List cpUserIdList = Arrays.asList(split); // 获取账号的总客户数 List> customerList = opSellerCustomerRelationDOMapper.selectCustomersByCpUserIdByList(cpUserIdList); Map customerMap = new HashMap<>(); for (int i = 0; i < customerList.size(); i++) { Map item = customerList.get(i); customerMap.put(item.get("cpUserId").toString(), item.get("sumCustomer")); } for (String cpUserId : cpUserIdList) { QueryWrapper cpUserDOQueryWrapper = new QueryWrapper<>(); cpUserDOQueryWrapper.eq("cp_user_id", cpUserId).last("limit 1"); PrivilageCpUserDO privilageCpUserDO = privilageCpUserDOMapper.selectOne(cpUserDOQueryWrapper); if (privilageCpUserDO == null) { continue; } //删除原有的数据 QueryWrapper relationQueryWrapper = new QueryWrapper<>(); relationQueryWrapper.eq("type", 3).eq("user_id", privilageCpUserDO.getId()); opSellerCustomerRelationDOMapper.delete(relationQueryWrapper); Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(cpUserId); ProduceDTO produceDTO = new ProduceDTO(); produceDTO.setIndex(index); produceDTO.setStoreId(storeId); produceDTO.setCpUserId(privilageCpUserDO.getCpUserId()); if (null != customerMap.get(privilageCpUserDO.getCpUserId())) { String sumCustomer = customerMap.get(privilageCpUserDO.getCpUserId()).toString(); produceDTO.setSumCustomer(Integer.parseInt(sumCustomer)); } else { produceDTO.setSumCustomer(0); } producer.produceData(produceDTO); index++; } } disruptor.shutdown(); } public void handleList(List cpUserIds) { //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); // 创建缓冲池 Disruptor disruptor = new Disruptor<>(new DetailDTOMessageFactory(), BUFFER_SIZE, executors, ProducerType.SINGLE, new BlockingWaitStrategy()); RingBuffer ringBuffer = disruptor.getRingBuffer(); // 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息) DetailDTOMessageGroupConsumer[] consumers = new DetailDTOMessageGroupConsumer[THREAD_NUMBERS]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new DetailDTOMessageGroupConsumer(); } disruptor.handleEventsWithWorkerPool(consumers); disruptor.start(); int index = 0; DetailDTOMessageEventProducer producer = new DetailDTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper); // 获取账号的总客户数 List> customerList = opSellerCustomerRelationDOMapper.selectCustomersByCpUserIdByList(cpUserIds); Map customerMap = new HashMap<>(); for (int i = 0; i < customerList.size(); i++) { Map item = customerList.get(i); customerMap.put(item.get("cpUserId").toString(), item.get("sumCustomer")); } for (String cpUserId : cpUserIds) { QueryWrapper cpUserDOQueryWrapper = new QueryWrapper<>(); cpUserDOQueryWrapper.eq("cp_user_id", cpUserId).last("limit 1"); PrivilageCpUserDO privilageCpUserDO = privilageCpUserDOMapper.selectOne(cpUserDOQueryWrapper); if (privilageCpUserDO == null) { continue; } //删除原有的数据 QueryWrapper relationQueryWrapper = new QueryWrapper<>(); relationQueryWrapper.eq("type", 3).eq("user_id", privilageCpUserDO.getId()); opSellerCustomerRelationDOMapper.delete(relationQueryWrapper); Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(cpUserId); ProduceDTO produceDTO = new ProduceDTO(); produceDTO.setIndex(index); produceDTO.setStoreId(storeId); produceDTO.setCpUserId(privilageCpUserDO.getCpUserId()); if (null != customerMap.get(privilageCpUserDO.getCpUserId())) { String sumCustomer = customerMap.get(privilageCpUserDO.getCpUserId()).toString(); produceDTO.setSumCustomer(Integer.parseInt(sumCustomer)); } else { produceDTO.setSumCustomer(0); } producer.produceData(produceDTO); index++; } disruptor.shutdown(); } }