diff --git a/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java b/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java new file mode 100644 index 0000000..2544fca --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java @@ -0,0 +1,94 @@ +package com.kiisoo.ic.job; + +import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; +import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; +import com.kiisoo.ic.wx.service.QWMailListManageService; +import com.lmax.disruptor.*; +import lombok.extern.slf4j.Slf4j; +import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo; +import org.apache.commons.collections.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.concurrent.*; + +@Service +@Slf4j +public class CustomerSyncJob { + + @Autowired + private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; + + @Autowired + private QWMailListManageService qwMailListManageService; + + public void syncCustomer() throws ExecutionException, InterruptedException { + long beginTime=System.currentTimeMillis(); + + int BUFFER_SIZE=1024; + int THREAD_NUMBERS=4; + /* + * createSingleProducer创建一个单生产者的RingBuffer, + * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 + * 第二个参数是RingBuffer的大小,它必须是2的指数倍 目的是为了将求模运算转为&运算提高效率 + * 第三个参数是RingBuffer的生产都在没有可用区块的时候(可能是消费者(或者说是事件处理器) 太慢了)的等待策略 + */ + final RingBuffer ringBuffer = RingBuffer.createSingleProducer(new EventFactory() { + @Override + public DTOMessage newInstance() { + return new DTOMessage(); + } + }, BUFFER_SIZE,new YieldingWaitStrategy()); + //创建线程池 + ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); + //创建SequenceBarrier + SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); + + //创建消息处理器 + BatchEventProcessor transProcessor = new BatchEventProcessor( + ringBuffer, sequenceBarrier, new DTOMessageHandler()); + + //这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略 + ringBuffer.addGatingSequences(transProcessor.getSequence()); + + //把消息处理器提交到线程池 + executors.submit(transProcessor); + //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类 + + Future future=executors.submit(new Callable() { + @Override + public Void call() throws Exception { + List privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null); + long seq; + for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) { + String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId()); + List customers = null; + try { + customers = qwMailListManageService.getCustomer(cpUserId); + }catch (Exception e) { + log.error("查询联系人失败:"+cpUserId,e); + } + if (CollectionUtils.isNotEmpty(customers)){ + continue; + } + seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块 + + //给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图 + ringBuffer.get(seq).setCpUserId(cpUserId); + ringBuffer.get(seq).setPrivilageCpUserStoreDO(privilageCpUserStoreDO); + ringBuffer.get(seq).setCustomers(customers); + ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见 + + } + return null; + } + }); + future.get();//等待生产者结束 + Thread.sleep(1000);//等上1秒,等消费都处理完成 + transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) + executors.shutdown();//终止线程 + + System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); + } +} diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessage.java b/src/main/java/com/kiisoo/ic/job/DTOMessage.java index 3f4f2a7..09d1b31 100644 --- a/src/main/java/com/kiisoo/ic/job/DTOMessage.java +++ b/src/main/java/com/kiisoo/ic/job/DTOMessage.java @@ -1,9 +1,13 @@ package com.kiisoo.ic.job; +import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo; + +import java.util.List; @Data @AllArgsConstructor @@ -12,7 +16,8 @@ import lombok.NoArgsConstructor; public class DTOMessage { private String id; - private String name; - private double price; + private String cpUserId; + private PrivilageCpUserStoreDO privilageCpUserStoreDO; + private List customers; } diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java b/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java index c603ed0..30e3b40 100644 --- a/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java +++ b/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java @@ -1,14 +1,34 @@ package com.kiisoo.ic.job; +import com.kiisoo.ic.customer.CustomerService; +import com.kiisoo.ic.store.entity.PoiStoreStaff; +import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; +import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; +import com.kiisoo.ic.synchronous.entity.TurnBackDTO; +import com.kiisoo.ic.synchronous.entity.WxDataDTO; import com.lmax.disruptor.EventHandler; import lombok.extern.slf4j.Slf4j; +import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import java.text.SimpleDateFormat; +import java.util.List; import java.util.UUID; @Slf4j +@Service public class DTOMessageHandler implements EventHandler { + @Autowired + private PoiStoreStaffDOMapper poiStoreStaffDOMapper; + + @Autowired + private CustomerService customerService; + /** * 处理事件, 如入库操作 * @param dtoMessage @@ -18,8 +38,64 @@ public class DTOMessageHandler implements EventHandler { */ @Override public void onEvent(DTOMessage dtoMessage, long l, boolean b) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); dtoMessage.setId(UUID.randomUUID().toString()); - log.info( dtoMessage.toString()); - // 入库操作 + String cpUserId = dtoMessage.getCpUserId(); + PrivilageCpUserStoreDO cpUserStoreDO = dtoMessage.getPrivilageCpUserStoreDO(); + List customers = dtoMessage.getCustomers(); + for (WxCpUserExternalContactInfo customer:customers){ + TurnBackDTO turnBackDTO = new TurnBackDTO(); + + WxCpUserExternalContactInfo.ExternalContact externalContact = customer.getExternalContact(); + List followedUsers = customer.getFollowedUsers(); + + turnBackDTO.setEaCode(""); + if (CollectionUtils.isNotEmpty(followedUsers)){ + for (WxCpUserExternalContactInfo.FollowedUser followedUser:followedUsers){ + if (cpUserId.equals(followedUser.getUserId())){ + String state = followedUser.getState(); + WxCpUserExternalContactInfo.Tag[] tags = followedUser.getTags(); + if (StringUtils.isNotBlank(state)){ + //判断是否有导购码 + turnBackDTO.setEaCode(state); + }else if(tags != null && tags.length > 0){ + //判断是否有打tag + //todo 根据tag获取导购码 + for (int j = 0;j