门店号管理

dev_0531
LegnaYet 6 years ago
parent e6de97c4af
commit 0f26f307dc

@ -0,0 +1,10 @@
package com.kiisoo.ic.behavior.count;
import com.lmax.disruptor.EventFactory;
public class BehaviorCountDTOMessageFactory implements EventFactory<BehaviorDTOMessage> {
@Override
public BehaviorDTOMessage newInstance() {
return new BehaviorDTOMessage();
}
}

@ -0,0 +1,27 @@
package com.kiisoo.ic.behavior.count;
import com.kiisoo.ic.generalize.mapper.PoiCustomerContactDataStatMapper;
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.store.mapper.StorePromotionDataDOMapper;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class BehaviorDTOMessage {
private int index;
private String id;
private String cpUserId;
private Date statDate;
private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
private PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper;
private StorePromotionDataDOMapper storePromotionDataDOMapper;
}

@ -0,0 +1,45 @@
package com.kiisoo.ic.behavior.count;
import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper;
import com.kiisoo.ic.generalize.mapper.PoiCustomerContactDataStatMapper;
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.store.mapper.StorePromotionDataDOMapper;
import com.lmax.disruptor.RingBuffer;
import java.util.Date;
public class BehaviorDTOMessageEventProducer {
private final RingBuffer<BehaviorDTOMessage> ringBuffer;
private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
private PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper;
private StorePromotionDataDOMapper storePromotionDataDOMapper;
public BehaviorDTOMessageEventProducer(RingBuffer<BehaviorDTOMessage> ringBuffer, PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper,PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper,StorePromotionDataDOMapper storePromotionDataDOMapper) {
this.ringBuffer = ringBuffer;
this.privilageCpUserStoreDOMapper =privilageCpUserStoreDOMapper;
this.poiCustomerContactDataStatMapper =poiCustomerContactDataStatMapper;
this.storePromotionDataDOMapper =storePromotionDataDOMapper;
}
public void produceData(int index, String cpUserId, Date statDate) {
// 获得下一个Event槽的下标
long seq = ringBuffer.next();
try {
// 给Event填充数据
//给这个区块放入 数据 如果此处不理解想想RingBuffer的结构图
ringBuffer.get(seq).setIndex(index);
ringBuffer.get(seq).setCpUserId(cpUserId);
ringBuffer.get(seq).setStatDate(statDate);
ringBuffer.get(seq).setPrivilageCpUserStoreDOMapper(privilageCpUserStoreDOMapper);
ringBuffer.get(seq).setPoiCustomerContactDataStatMapper(poiCustomerContactDataStatMapper);
ringBuffer.get(seq).setStorePromotionDataDOMapper(storePromotionDataDOMapper);
} finally {
// 发布Event激活观察者去消费 将sequence传递给该消费者
// 注意,最后的 ringBuffer.publish() 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
ringBuffer.publish(seq);
}
}
}

@ -0,0 +1,149 @@
package com.kiisoo.ic.behavior.count;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.kiisoo.ic.config.WxCpConfiguration;
import com.kiisoo.ic.generalize.entity.PoiCustomerContactDataStat;
import com.kiisoo.ic.generalize.mapper.PoiCustomerContactDataStatMapper;
import com.kiisoo.ic.store.constant.Constants;
import com.kiisoo.ic.store.entity.WxCusInfoReqDO;
import com.kiisoo.ic.store.entity.WxCusInfoRespDO;
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.store.mapper.StorePromotionDataDOMapper;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.error.WxErrorException;
import me.chanjar.weixin.cp.api.WxCpExternalContactService;
import me.chanjar.weixin.cp.api.WxCpService;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import static com.kiisoo.ic.config.WxCpConfiguration.APPLICATIONID;
@Slf4j
public class BehaviorDTOMessageGroupConsumer implements WorkHandler<BehaviorDTOMessage> {
private WxCpService wxCpService;
int i = 0;
/**
*
*
* @param behaviorDtoMessage
* @throws Exception
*/
@Override
public void onEvent(BehaviorDTOMessage behaviorDtoMessage) throws Exception {
long s = System.currentTimeMillis();
behaviorDtoMessage.setId(UUID.randomUUID().toString());
String cpUserId = behaviorDtoMessage.getCpUserId();
Date statDate = behaviorDtoMessage.getStatDate();
PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper = behaviorDtoMessage.getPrivilageCpUserStoreDOMapper();
PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper = behaviorDtoMessage.getPoiCustomerContactDataStatMapper();
StorePromotionDataDOMapper storePromotionDataDOMapper = behaviorDtoMessage.getStorePromotionDataDOMapper();
wxCpService = WxCpConfiguration.getCpService(APPLICATIONID);
WxCpExternalContactService externalContactService = wxCpService.getExternalContactService();
try {
Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(cpUserId);
Long regionId = null;
WxCusInfoReqDO wxCusInfoReqDO = new WxCusInfoReqDO();
wxCusInfoReqDO.setAccess_token(wxCpService.getAccessToken());
wxCusInfoReqDO.setUserid(cpUserId);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
//todo 分开写便于同步历史数据 最大跨度30天 最久当天往前180天 上线时改为System.currentTimeMillis()
wxCusInfoReqDO.setStart_time(statDate.getTime() / 1000);
wxCusInfoReqDO.setEnd_time(statDate.getTime() / 1000);
WxCusInfoRespDO wxCusInfoRespDO = getWxCusInfo(wxCusInfoReqDO);
if (wxCusInfoRespDO == null) {
return;
}
wxCusInfoRespDO.setStoreId(storeId);
wxCusInfoRespDO.setRegionId(regionId);
wxCusInfoRespDO.setStatTime(statDate);
QueryWrapper<PoiCustomerContactDataStat> qw = new QueryWrapper();
qw.eq("store_id", storeId).eq("region_id", regionId).eq("stat_time", statDate).last("limit 1");
PoiCustomerContactDataStat poiCustomerContactDataStat = poiCustomerContactDataStatMapper.selectOne(qw);
int syncRes = 0;
if (poiCustomerContactDataStat == null) {
syncRes = storePromotionDataDOMapper.insertStorePromotionData(wxCusInfoRespDO);
} else {
Long newApplyCnt = 0L;
if (wxCusInfoRespDO.getNew_apply_cnt() != null) {
newApplyCnt = new Long(wxCusInfoRespDO.getNew_apply_cnt());
}
poiCustomerContactDataStat.setNewApplyCnt(newApplyCnt);
Long avgReplyTime = 0L;
if (wxCusInfoRespDO.getAvg_reply_time() != null) {
avgReplyTime = new Long(wxCusInfoRespDO.getAvg_reply_time());
}
poiCustomerContactDataStat.setAvgReplyTime(avgReplyTime);
Long chatCny = 0L;
if (wxCusInfoRespDO.getChat_cnt() != null) {
chatCny = new Long(wxCusInfoRespDO.getChat_cnt());
}
poiCustomerContactDataStat.setChatCnt(chatCny);
Long messageCnt = 0L;
if (wxCusInfoRespDO.getMessage_cnt() != null) {
messageCnt = new Long(wxCusInfoRespDO.getMessage_cnt());
}
poiCustomerContactDataStat.setMessageCnt(messageCnt);
Float replyPercentage = 0F;
if (wxCusInfoRespDO.getReply_percentage() != null) {
replyPercentage = wxCusInfoRespDO.getReply_percentage();
}
poiCustomerContactDataStat.setReplyPercentage(replyPercentage);
Long newContactCnt = 0L;
if (wxCusInfoRespDO.getNew_contact_cnt() != null) {
newContactCnt = new Long(wxCusInfoRespDO.getNew_contact_cnt());
}
poiCustomerContactDataStat.setNewContactCnt(newContactCnt);
Long negativeFeedbackCnt = 0L;
if (wxCusInfoRespDO.getNegative_feedback_cnt() != null) {
negativeFeedbackCnt = new Long(wxCusInfoRespDO.getNegative_feedback_cnt());
}
poiCustomerContactDataStat.setNegativeFeedbackCnt(negativeFeedbackCnt);
syncRes = poiCustomerContactDataStatMapper.updateById(poiCustomerContactDataStat);
}
if (syncRes == Constants.INSERT_FAIL) {
throw new RuntimeException("向数据库导入客户统计数据时出错");
}
} catch (Exception e) {
log.error("请求企业微信api获取客户统计数据或日期转换出错", e);
}
System.out.println(Thread.currentThread().getName() + "消费者消费了消息:" + behaviorDtoMessage.getIndex() + "|" + behaviorDtoMessage.getCpUserId());
s = s - System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + "结束时间:" + s + " | " + behaviorDtoMessage.getIndex());
}
/**
*
*
* @param wxCusInfoReqDO
* @return
* @throws WxErrorException
*/
public WxCusInfoRespDO getWxCusInfo(WxCusInfoReqDO wxCusInfoReqDO) throws WxErrorException {
String url = wxCpService.getWxCpConfigStorage().getApiUrl("/cgi-bin/externalcontact/get_user_behavior_data");
String result = wxCpService.post(url, wxCusInfoReqDO.toJson());
JSONObject parseObject = JSON.parseObject(result);
String[] behavior_data = JSONObject.parseObject(parseObject.getString("behavior_data"), String[].class);
return JSONObject.parseObject(behavior_data[0], WxCusInfoRespDO.class);
}
}

@ -0,0 +1,85 @@
package com.kiisoo.ic.behavior.count;
import com.kiisoo.ic.employee.entity.PrivilageCpUserDO;
import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper;
import com.kiisoo.ic.generalize.mapper.PoiCustomerContactDataStatMapper;
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.store.mapper.StorePromotionDataDOMapper;
import com.kiisoo.ic.utils.DateUtils;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.error.WxErrorException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* Disruptor
*/
@Service
@Slf4j
public class CustomerBehaviorJob {
// 指定 ring buffer字节大小必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
private static final int BUFFER_SIZE = 1024 * 1024;
//固定线程数
private static final int THREAD_NUMBERS = 100;
@Autowired
private PrivilageCpUserDOMapper privilageCpUserDOMapper;
@Autowired
private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
@Autowired
private PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper;
@Autowired
private StorePromotionDataDOMapper storePromotionDataDOMapper;
public void handle(Date startDate, Date endDate) throws WxErrorException {
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
// 创建缓冲池
Disruptor<BehaviorDTOMessage> disruptor = new Disruptor<>(new BehaviorCountDTOMessageFactory(), BUFFER_SIZE, executors,
ProducerType.SINGLE, new BlockingWaitStrategy());
RingBuffer<BehaviorDTOMessage> ringBuffer = disruptor.getRingBuffer();
// 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息)
BehaviorDTOMessageGroupConsumer[] consumers = new BehaviorDTOMessageGroupConsumer[THREAD_NUMBERS];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new BehaviorDTOMessageGroupConsumer();
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.start();
BehaviorDTOMessageEventProducer producer = new BehaviorDTOMessageEventProducer(ringBuffer, privilageCpUserStoreDOMapper,poiCustomerContactDataStatMapper,storePromotionDataDOMapper);
List<PrivilageCpUserDO> privilageCpUserDOS = privilageCpUserDOMapper.selectList(null);
List<String> cpUserIds = privilageCpUserDOS.stream().map(privilageCpUserDO -> privilageCpUserDO.getCpUserId()).collect(Collectors.toList());
List<Date> dates = DateUtils.getdays(startDate, endDate);
int index =0;
for (Date date:dates){
for (String cpUserId : cpUserIds) {
producer.produceData(index,cpUserId,date);
index++;
}
}
disruptor.shutdown();
}
}

@ -0,0 +1,29 @@
package com.kiisoo.ic.behavior.count;
import java.text.SimpleDateFormat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.Date;
@Controller
@RequestMapping("/job")
@Slf4j
public class SyncBehaviorJobController {
@Autowired
private CustomerBehaviorJob customerBehaviorJob;
@GetMapping("/sync/behavior")
public void getCustomerInfo(@RequestParam("startDate") String startDate, @RequestParam("endDate")String endDate){
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
customerBehaviorJob.handle(sdf.parse(startDate),sdf.parse(endDate));
}catch (Exception e){
log.error("",e);
}
}
}
Loading…
Cancel
Save