You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
9.8 KiB
Java

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.kiisoo.ic.job.detail;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.kiisoo.ic.common.utils.RedisUtil;
import com.kiisoo.ic.customer.CustomerService;
import com.kiisoo.ic.customer.entity.OpSellerCustomerRelation;
import com.kiisoo.ic.customer.mapper.OpSellerCustomerRelationDOMapper;
import com.kiisoo.ic.employee.entity.PrivilageCpUserDO;
import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper;
import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper;
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.wx.service.QWMailListManageService;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.error.WxErrorException;
import me.chanjar.weixin.cp.bean.WxCpUser;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.*;
/**
* Disruptor多个消费者不重复处理生产者发送过来的消息
*/
@Service
@Slf4j
public class CustomerDataJob {
// 指定 ring buffer字节大小必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
private static final int BUFFER_SIZE = 1024 * 1024;
//固定线程数
private static final int THREAD_NUMBERS = 60;
@Autowired
private PrivilageCpUserDOMapper privilageCpUserDOMapper;
@Autowired
private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
@Autowired
private OpSellerCustomerRelationDOMapper opSellerCustomerRelationDOMapper;
@Autowired
private QWMailListManageService qwMailListManageService;
@Autowired
private PoiStoreStaffDOMapper poiStoreStaffDOMapper;
@Autowired
private CustomerService customerService;
@Autowired
private RedisUtil redisUtil;
public void handle(String cpUserIds) throws WxErrorException {
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
// 创建缓冲池
Disruptor<DetailDTOMessage> disruptor = new Disruptor<>(new DetailDTOMessageFactory(), BUFFER_SIZE, executors,
ProducerType.SINGLE, new BlockingWaitStrategy());
RingBuffer<DetailDTOMessage> ringBuffer = disruptor.getRingBuffer();
// 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息)
DetailDTOMessageGroupConsumer[] consumers = new DetailDTOMessageGroupConsumer[THREAD_NUMBERS];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new DetailDTOMessageGroupConsumer();
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.start();
int index = 0;
DetailDTOMessageEventProducer producer = new DetailDTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper);
if (StringUtils.isBlank(cpUserIds)) {
// 获取账号的总数
List<PrivilageCpUserDO> privilageCpUserDOS = privilageCpUserDOMapper.selectList(null);
// 获取账号的总客户数
List<Map<String, Object>> customerList = opSellerCustomerRelationDOMapper.selectCustomersByCpUserId();
Map<String, Object> customerMap = new HashMap<>();
for (int i = 0; i < customerList.size(); i++) {
Map<String, Object> item = customerList.get(i);
customerMap.put(item.get("cpUserId").toString(), item.get("sumCustomer"));
}
for (PrivilageCpUserDO privilageCpUserDO : privilageCpUserDOS) {
Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(privilageCpUserDO.getCpUserId());
ProduceDTO produceDTO = new ProduceDTO();
produceDTO.setIndex(index);
produceDTO.setStoreId(storeId);
produceDTO.setCpUserId(privilageCpUserDO.getCpUserId());
if (null != customerMap.get(privilageCpUserDO.getCpUserId())) {
String sumCustomer = customerMap.get(privilageCpUserDO.getCpUserId()).toString();
produceDTO.setSumCustomer(Integer.parseInt(sumCustomer));
} else {
produceDTO.setSumCustomer(0);
}
producer.produceData(produceDTO);
index++;
}
} else {
// String[] split = cpUserIds.split(",");
// List<String> cpUserIdList = Arrays.asList(split);
List<String> cpUserIdList = new ArrayList<>();
List<WxCpUser> wxCpUsers = qwMailListManageService.syncUser();
for (int i = 0; i < wxCpUsers.size(); i++) {
WxCpUser item = wxCpUsers.get(i);
cpUserIdList.add(item.getUserId());
}
// 获取账号的总客户数
List<Map<String, Object>> customerList = opSellerCustomerRelationDOMapper.selectCustomersByCpUserIdByList(cpUserIdList);
Map<String, Object> customerMap = new HashMap<>();
for (int i = 0; i < customerList.size(); i++) {
Map<String, Object> item = customerList.get(i);
if(null != item.get("cpUserId")){
customerMap.put(item.get("cpUserId").toString(), item.get("sumCustomer"));
}
}
for (String cpUserId : cpUserIdList) {
QueryWrapper<PrivilageCpUserDO> cpUserDOQueryWrapper = new QueryWrapper<>();
cpUserDOQueryWrapper.eq("cp_user_id", cpUserId).last("limit 1");
PrivilageCpUserDO privilageCpUserDO = privilageCpUserDOMapper.selectOne(cpUserDOQueryWrapper);
if (privilageCpUserDO == null) {
continue;
}
//删除原有的数据
QueryWrapper<OpSellerCustomerRelation> relationQueryWrapper = new QueryWrapper<>();
relationQueryWrapper.eq("type", 3).eq("user_id", privilageCpUserDO.getId());
opSellerCustomerRelationDOMapper.delete(relationQueryWrapper);
Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(cpUserId);
ProduceDTO produceDTO = new ProduceDTO();
produceDTO.setIndex(index);
produceDTO.setStoreId(storeId);
produceDTO.setCpUserId(privilageCpUserDO.getCpUserId());
if (null != customerMap.get(privilageCpUserDO.getCpUserId())) {
String sumCustomer = customerMap.get(privilageCpUserDO.getCpUserId()).toString();
produceDTO.setSumCustomer(Integer.parseInt(sumCustomer));
} else {
produceDTO.setSumCustomer(0);
}
producer.produceData(produceDTO);
index++;
}
}
disruptor.shutdown();
}
public void handleList(List<String> cpUserIds) {
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
// 创建缓冲池
Disruptor<DetailDTOMessage> disruptor = new Disruptor<>(new DetailDTOMessageFactory(), BUFFER_SIZE, executors,
ProducerType.SINGLE, new BlockingWaitStrategy());
RingBuffer<DetailDTOMessage> ringBuffer = disruptor.getRingBuffer();
// 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息)
DetailDTOMessageGroupConsumer[] consumers = new DetailDTOMessageGroupConsumer[THREAD_NUMBERS];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new DetailDTOMessageGroupConsumer();
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.start();
int index = 0;
DetailDTOMessageEventProducer producer = new DetailDTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper);
// 获取账号的总客户数
List<Map<String, Object>> customerList = opSellerCustomerRelationDOMapper.selectCustomersByCpUserIdByList(cpUserIds);
Map<String, Object> customerMap = new HashMap<>();
for (int i = 0; i < customerList.size(); i++) {
Map<String, Object> item = customerList.get(i);
customerMap.put(item.get("cpUserId").toString(), item.get("sumCustomer"));
}
for (String cpUserId : cpUserIds) {
QueryWrapper<PrivilageCpUserDO> cpUserDOQueryWrapper = new QueryWrapper<>();
cpUserDOQueryWrapper.eq("cp_user_id", cpUserId).last("limit 1");
PrivilageCpUserDO privilageCpUserDO = privilageCpUserDOMapper.selectOne(cpUserDOQueryWrapper);
if (privilageCpUserDO == null) {
continue;
}
//删除原有的数据
QueryWrapper<OpSellerCustomerRelation> relationQueryWrapper = new QueryWrapper<>();
relationQueryWrapper.eq("type", 3).eq("user_id", privilageCpUserDO.getId());
opSellerCustomerRelationDOMapper.delete(relationQueryWrapper);
Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(cpUserId);
ProduceDTO produceDTO = new ProduceDTO();
produceDTO.setIndex(index);
produceDTO.setStoreId(storeId);
produceDTO.setCpUserId(privilageCpUserDO.getCpUserId());
if (null != customerMap.get(privilageCpUserDO.getCpUserId())) {
String sumCustomer = customerMap.get(privilageCpUserDO.getCpUserId()).toString();
produceDTO.setSumCustomer(Integer.parseInt(sumCustomer));
} else {
produceDTO.setSumCustomer(0);
}
producer.produceData(produceDTO);
index++;
}
disruptor.shutdown();
}
}