From 0f26f307dcb044aaad02d7d0249e8fc91d3c2684 Mon Sep 17 00:00:00 2001 From: LegnaYet <1023868505@qq.com> Date: Mon, 18 May 2020 15:37:36 +0800 Subject: [PATCH] =?UTF-8?q?=E9=97=A8=E5=BA=97=E5=8F=B7=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../count/BehaviorCountDTOMessageFactory.java | 10 ++ .../ic/behavior/count/BehaviorDTOMessage.java | 27 ++++ .../BehaviorDTOMessageEventProducer.java | 45 ++++++ .../BehaviorDTOMessageGroupConsumer.java | 149 ++++++++++++++++++ .../behavior/count/CustomerBehaviorJob.java | 85 ++++++++++ .../count/SyncBehaviorJobController.java | 29 ++++ 6 files changed, 345 insertions(+) create mode 100644 src/main/java/com/kiisoo/ic/behavior/count/BehaviorCountDTOMessageFactory.java create mode 100644 src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessage.java create mode 100644 src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessageEventProducer.java create mode 100644 src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessageGroupConsumer.java create mode 100644 src/main/java/com/kiisoo/ic/behavior/count/CustomerBehaviorJob.java create mode 100644 src/main/java/com/kiisoo/ic/behavior/count/SyncBehaviorJobController.java diff --git a/src/main/java/com/kiisoo/ic/behavior/count/BehaviorCountDTOMessageFactory.java b/src/main/java/com/kiisoo/ic/behavior/count/BehaviorCountDTOMessageFactory.java new file mode 100644 index 0000000..4a99555 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/behavior/count/BehaviorCountDTOMessageFactory.java @@ -0,0 +1,10 @@ +package com.kiisoo.ic.behavior.count; + +import com.lmax.disruptor.EventFactory; + +public class BehaviorCountDTOMessageFactory implements EventFactory { + @Override + public BehaviorDTOMessage newInstance() { + return new BehaviorDTOMessage(); + } +} diff --git a/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessage.java b/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessage.java new file mode 100644 index 0000000..55ee0e0 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessage.java @@ -0,0 +1,27 @@ +package com.kiisoo.ic.behavior.count; + +import com.kiisoo.ic.generalize.mapper.PoiCustomerContactDataStatMapper; +import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; +import com.kiisoo.ic.store.mapper.StorePromotionDataDOMapper; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class BehaviorDTOMessage { + + private int index; + private String id; + private String cpUserId; + private Date statDate; + private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; + private PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper; + private StorePromotionDataDOMapper storePromotionDataDOMapper; + +} diff --git a/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessageEventProducer.java b/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessageEventProducer.java new file mode 100644 index 0000000..27a0316 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessageEventProducer.java @@ -0,0 +1,45 @@ +package com.kiisoo.ic.behavior.count; + +import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper; +import com.kiisoo.ic.generalize.mapper.PoiCustomerContactDataStatMapper; +import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; +import com.kiisoo.ic.store.mapper.StorePromotionDataDOMapper; +import com.lmax.disruptor.RingBuffer; + +import java.util.Date; + +public class BehaviorDTOMessageEventProducer { + + private final RingBuffer ringBuffer; + + private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; + private PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper; + private StorePromotionDataDOMapper storePromotionDataDOMapper; + + + public BehaviorDTOMessageEventProducer(RingBuffer ringBuffer, PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper,PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper,StorePromotionDataDOMapper storePromotionDataDOMapper) { + this.ringBuffer = ringBuffer; + this.privilageCpUserStoreDOMapper =privilageCpUserStoreDOMapper; + this.poiCustomerContactDataStatMapper =poiCustomerContactDataStatMapper; + this.storePromotionDataDOMapper =storePromotionDataDOMapper; + } + + public void produceData(int index, String cpUserId, Date statDate) { + // 获得下一个Event槽的下标 + long seq = ringBuffer.next(); + try { + // 给Event填充数据 + //给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图 + ringBuffer.get(seq).setIndex(index); + ringBuffer.get(seq).setCpUserId(cpUserId); + ringBuffer.get(seq).setStatDate(statDate); + ringBuffer.get(seq).setPrivilageCpUserStoreDOMapper(privilageCpUserStoreDOMapper); + ringBuffer.get(seq).setPoiCustomerContactDataStatMapper(poiCustomerContactDataStatMapper); + ringBuffer.get(seq).setStorePromotionDataDOMapper(storePromotionDataDOMapper); + } finally { + // 发布Event,激活观察者去消费, 将sequence传递给该消费者 + // 注意,最后的 ringBuffer.publish() 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。 + ringBuffer.publish(seq); + } + } +} diff --git a/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessageGroupConsumer.java b/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessageGroupConsumer.java new file mode 100644 index 0000000..4afa622 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/behavior/count/BehaviorDTOMessageGroupConsumer.java @@ -0,0 +1,149 @@ +package com.kiisoo.ic.behavior.count; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.kiisoo.ic.config.WxCpConfiguration; +import com.kiisoo.ic.generalize.entity.PoiCustomerContactDataStat; +import com.kiisoo.ic.generalize.mapper.PoiCustomerContactDataStatMapper; +import com.kiisoo.ic.store.constant.Constants; +import com.kiisoo.ic.store.entity.WxCusInfoReqDO; +import com.kiisoo.ic.store.entity.WxCusInfoRespDO; +import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; +import com.kiisoo.ic.store.mapper.StorePromotionDataDOMapper; +import com.lmax.disruptor.WorkHandler; +import lombok.extern.slf4j.Slf4j; +import me.chanjar.weixin.common.error.WxErrorException; +import me.chanjar.weixin.cp.api.WxCpExternalContactService; +import me.chanjar.weixin.cp.api.WxCpService; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.UUID; + +import static com.kiisoo.ic.config.WxCpConfiguration.APPLICATIONID; + +@Slf4j +public class BehaviorDTOMessageGroupConsumer implements WorkHandler { + + private WxCpService wxCpService; + int i = 0; + /** + * 处理事件, 如入库操作 + * + * @param behaviorDtoMessage + * @throws Exception + */ + @Override + public void onEvent(BehaviorDTOMessage behaviorDtoMessage) throws Exception { + long s = System.currentTimeMillis(); + + behaviorDtoMessage.setId(UUID.randomUUID().toString()); + String cpUserId = behaviorDtoMessage.getCpUserId(); + Date statDate = behaviorDtoMessage.getStatDate(); + PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper = behaviorDtoMessage.getPrivilageCpUserStoreDOMapper(); + PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper = behaviorDtoMessage.getPoiCustomerContactDataStatMapper(); + StorePromotionDataDOMapper storePromotionDataDOMapper = behaviorDtoMessage.getStorePromotionDataDOMapper(); + + wxCpService = WxCpConfiguration.getCpService(APPLICATIONID); + WxCpExternalContactService externalContactService = wxCpService.getExternalContactService(); + + try { + Long storeId = privilageCpUserStoreDOMapper.selectOneByCpUserId(cpUserId); + Long regionId = null; + + WxCusInfoReqDO wxCusInfoReqDO = new WxCusInfoReqDO(); + wxCusInfoReqDO.setAccess_token(wxCpService.getAccessToken()); + wxCusInfoReqDO.setUserid(cpUserId); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + //todo 分开写便于同步历史数据 最大跨度30天 最久当天往前180天 上线时改为System.currentTimeMillis() + wxCusInfoReqDO.setStart_time(statDate.getTime() / 1000); + wxCusInfoReqDO.setEnd_time(statDate.getTime() / 1000); + WxCusInfoRespDO wxCusInfoRespDO = getWxCusInfo(wxCusInfoReqDO); + if (wxCusInfoRespDO == null) { + return; + } + wxCusInfoRespDO.setStoreId(storeId); + wxCusInfoRespDO.setRegionId(regionId); + wxCusInfoRespDO.setStatTime(statDate); + + QueryWrapper qw = new QueryWrapper(); + qw.eq("store_id", storeId).eq("region_id", regionId).eq("stat_time", statDate).last("limit 1"); + PoiCustomerContactDataStat poiCustomerContactDataStat = poiCustomerContactDataStatMapper.selectOne(qw); + int syncRes = 0; + if (poiCustomerContactDataStat == null) { + syncRes = storePromotionDataDOMapper.insertStorePromotionData(wxCusInfoRespDO); + } else { + Long newApplyCnt = 0L; + if (wxCusInfoRespDO.getNew_apply_cnt() != null) { + newApplyCnt = new Long(wxCusInfoRespDO.getNew_apply_cnt()); + } + poiCustomerContactDataStat.setNewApplyCnt(newApplyCnt); + + Long avgReplyTime = 0L; + if (wxCusInfoRespDO.getAvg_reply_time() != null) { + avgReplyTime = new Long(wxCusInfoRespDO.getAvg_reply_time()); + } + poiCustomerContactDataStat.setAvgReplyTime(avgReplyTime); + + Long chatCny = 0L; + if (wxCusInfoRespDO.getChat_cnt() != null) { + chatCny = new Long(wxCusInfoRespDO.getChat_cnt()); + } + poiCustomerContactDataStat.setChatCnt(chatCny); + + Long messageCnt = 0L; + if (wxCusInfoRespDO.getMessage_cnt() != null) { + messageCnt = new Long(wxCusInfoRespDO.getMessage_cnt()); + } + poiCustomerContactDataStat.setMessageCnt(messageCnt); + + Float replyPercentage = 0F; + if (wxCusInfoRespDO.getReply_percentage() != null) { + replyPercentage = wxCusInfoRespDO.getReply_percentage(); + } + poiCustomerContactDataStat.setReplyPercentage(replyPercentage); + + Long newContactCnt = 0L; + if (wxCusInfoRespDO.getNew_contact_cnt() != null) { + newContactCnt = new Long(wxCusInfoRespDO.getNew_contact_cnt()); + } + poiCustomerContactDataStat.setNewContactCnt(newContactCnt); + + Long negativeFeedbackCnt = 0L; + if (wxCusInfoRespDO.getNegative_feedback_cnt() != null) { + negativeFeedbackCnt = new Long(wxCusInfoRespDO.getNegative_feedback_cnt()); + } + poiCustomerContactDataStat.setNegativeFeedbackCnt(negativeFeedbackCnt); + + syncRes = poiCustomerContactDataStatMapper.updateById(poiCustomerContactDataStat); + } + + if (syncRes == Constants.INSERT_FAIL) { + throw new RuntimeException("向数据库导入客户统计数据时出错"); + } + } catch (Exception e) { + log.error("请求企业微信api获取客户统计数据或日期转换出错!", e); + } + + System.out.println(Thread.currentThread().getName() + "消费者消费了消息:" + behaviorDtoMessage.getIndex() + "|" + behaviorDtoMessage.getCpUserId()); + s = s - System.currentTimeMillis(); + System.out.println(Thread.currentThread().getName() + "结束时间:" + s + " | " + behaviorDtoMessage.getIndex()); + } + + + /** + * 获取联系客户统计数据 + * + * @param wxCusInfoReqDO + * @return + * @throws WxErrorException + */ + public WxCusInfoRespDO getWxCusInfo(WxCusInfoReqDO wxCusInfoReqDO) throws WxErrorException { + String url = wxCpService.getWxCpConfigStorage().getApiUrl("/cgi-bin/externalcontact/get_user_behavior_data"); + String result = wxCpService.post(url, wxCusInfoReqDO.toJson()); + JSONObject parseObject = JSON.parseObject(result); + String[] behavior_data = JSONObject.parseObject(parseObject.getString("behavior_data"), String[].class); + return JSONObject.parseObject(behavior_data[0], WxCusInfoRespDO.class); + } +} diff --git a/src/main/java/com/kiisoo/ic/behavior/count/CustomerBehaviorJob.java b/src/main/java/com/kiisoo/ic/behavior/count/CustomerBehaviorJob.java new file mode 100644 index 0000000..ac60d01 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/behavior/count/CustomerBehaviorJob.java @@ -0,0 +1,85 @@ +package com.kiisoo.ic.behavior.count; + +import com.kiisoo.ic.employee.entity.PrivilageCpUserDO; +import com.kiisoo.ic.employee.mapper.PrivilageCpUserDOMapper; +import com.kiisoo.ic.generalize.mapper.PoiCustomerContactDataStatMapper; +import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; +import com.kiisoo.ic.store.mapper.StorePromotionDataDOMapper; +import com.kiisoo.ic.utils.DateUtils; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; +import me.chanjar.weixin.common.error.WxErrorException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +/** + * Disruptor多个消费者不重复处理生产者发送过来的消息 + */ +@Service +@Slf4j +public class CustomerBehaviorJob { + + + // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能 + private static final int BUFFER_SIZE = 1024 * 1024; + //固定线程数 + private static final int THREAD_NUMBERS = 100; + + + @Autowired + private PrivilageCpUserDOMapper privilageCpUserDOMapper; + @Autowired + private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; + @Autowired + private PoiCustomerContactDataStatMapper poiCustomerContactDataStatMapper; + @Autowired + private StorePromotionDataDOMapper storePromotionDataDOMapper; + + + public void handle(Date startDate, Date endDate) throws WxErrorException { + + //创建线程池 + ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); + // 创建缓冲池 + Disruptor disruptor = new Disruptor<>(new BehaviorCountDTOMessageFactory(), BUFFER_SIZE, executors, + ProducerType.SINGLE, new BlockingWaitStrategy()); + RingBuffer ringBuffer = disruptor.getRingBuffer(); + + // 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息) + BehaviorDTOMessageGroupConsumer[] consumers = new BehaviorDTOMessageGroupConsumer[THREAD_NUMBERS]; + for (int i = 0; i < consumers.length; i++) { + consumers[i] = new BehaviorDTOMessageGroupConsumer(); + } + + disruptor.handleEventsWithWorkerPool(consumers); + + disruptor.start(); + + BehaviorDTOMessageEventProducer producer = new BehaviorDTOMessageEventProducer(ringBuffer, privilageCpUserStoreDOMapper,poiCustomerContactDataStatMapper,storePromotionDataDOMapper); + + + List privilageCpUserDOS = privilageCpUserDOMapper.selectList(null); + List cpUserIds = privilageCpUserDOS.stream().map(privilageCpUserDO -> privilageCpUserDO.getCpUserId()).collect(Collectors.toList()); + + List dates = DateUtils.getdays(startDate, endDate); + int index =0; + for (Date date:dates){ + for (String cpUserId : cpUserIds) { + producer.produceData(index,cpUserId,date); + index++; + } + } + + + disruptor.shutdown(); + } +} diff --git a/src/main/java/com/kiisoo/ic/behavior/count/SyncBehaviorJobController.java b/src/main/java/com/kiisoo/ic/behavior/count/SyncBehaviorJobController.java new file mode 100644 index 0000000..0770ea9 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/behavior/count/SyncBehaviorJobController.java @@ -0,0 +1,29 @@ +package com.kiisoo.ic.behavior.count; +import java.text.SimpleDateFormat; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; + +import java.util.Date; + +@Controller +@RequestMapping("/job") +@Slf4j +public class SyncBehaviorJobController { + @Autowired + private CustomerBehaviorJob customerBehaviorJob; + + @GetMapping("/sync/behavior") + public void getCustomerInfo(@RequestParam("startDate") String startDate, @RequestParam("endDate")String endDate){ + try { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + customerBehaviorJob.handle(sdf.parse(startDate),sdf.parse(endDate)); + }catch (Exception e){ + log.error("",e); + } + } +}