From 0614f4b17bf7f94036ee12b15822492d5fde3542 Mon Sep 17 00:00:00 2001 From: kevin jiang Date: Sat, 16 May 2020 17:40:11 +0800 Subject: [PATCH] =?UTF-8?q?Disruptor=20=E4=BD=BF=E7=94=A8=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/kiisoo/ic/job/CustomerDataJob.java | 76 +++++++++++++ .../com/kiisoo/ic/job/CustomerSyncJob.java | 18 ++- .../java/com/kiisoo/ic/job/DTOMessage.java | 1 + .../ic/job/DTOMessageEventProducer.java | 44 ++++++++ .../com/kiisoo/ic/job/DTOMessageFactory.java | 10 ++ .../ic/job/DTOMessageGroup1Consumer.java | 106 ++++++++++++++++++ .../com/kiisoo/ic/job/DTOMessageHandler.java | 4 +- .../kiisoo/ic/job/EventExceptionHandler.java | 23 ++++ .../store/mapper/PoiStoreStaffDOMapper.java | 2 + .../PrivilageUserShopRelationDOMapper.xml | 5 + 10 files changed, 277 insertions(+), 12 deletions(-) create mode 100644 src/main/java/com/kiisoo/ic/job/CustomerDataJob.java create mode 100644 src/main/java/com/kiisoo/ic/job/DTOMessageEventProducer.java create mode 100644 src/main/java/com/kiisoo/ic/job/DTOMessageFactory.java create mode 100644 src/main/java/com/kiisoo/ic/job/DTOMessageGroup1Consumer.java create mode 100644 src/main/java/com/kiisoo/ic/job/EventExceptionHandler.java diff --git a/src/main/java/com/kiisoo/ic/job/CustomerDataJob.java b/src/main/java/com/kiisoo/ic/job/CustomerDataJob.java new file mode 100644 index 0000000..c6025ca --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/CustomerDataJob.java @@ -0,0 +1,76 @@ +package com.kiisoo.ic.job; + +import com.kiisoo.ic.customer.CustomerService; +import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; +import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; +import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; +import com.kiisoo.ic.wx.service.QWMailListManageService; +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.*; + +/** + * Disruptor多个消费者不重复处理生产者发送过来的消息 + */ +@Service +@Slf4j +public class CustomerDataJob { + + + // 指定 ring buffer字节大小,必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能 + private static final int BUFFER_SIZE = 1024 * 1024; + //固定线程数 + private static final int THREAD_NUMBERS = 100; + + //创建线程池 + ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); + + @Autowired + private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper; + + @Autowired + private QWMailListManageService qwMailListManageService; + + @Autowired + private PoiStoreStaffDOMapper poiStoreStaffDOMapper; + + @Autowired + private CustomerService customerService; + + + + public void handle(){ + // 创建缓冲池 + Disruptor disruptor = new Disruptor<>(new DTOMessageFactory(), BUFFER_SIZE, executors, + ProducerType.SINGLE, new BlockingWaitStrategy()); + RingBuffer ringBuffer = disruptor.getRingBuffer(); + + // 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息) + DTOMessageGroup1Consumer[] consumers = new DTOMessageGroup1Consumer[THREAD_NUMBERS]; + for (int i = 0; i < consumers.length; i++) { + consumers[i] = new DTOMessageGroup1Consumer(); + } + disruptor.handleEventsWithWorkerPool(consumers); + + disruptor.start(); + + DTOMessageEventProducer producer = new DTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper); + List privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null); + int index =0; + for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) { + String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId()); + producer.produceData(index,privilageCpUserStoreDO, cpUserId); + index++; + } + + disruptor.shutdown(); + } +} diff --git a/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java b/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java index ac6dc48..efa969d 100644 --- a/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java +++ b/src/main/java/com/kiisoo/ic/job/CustomerSyncJob.java @@ -7,8 +7,6 @@ import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper; import com.kiisoo.ic.wx.service.QWMailListManageService; import com.lmax.disruptor.*; import lombok.extern.slf4j.Slf4j; -import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo; -import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -32,10 +30,10 @@ public class CustomerSyncJob { private CustomerService customerService; public void syncCustomer() throws ExecutionException, InterruptedException { - long beginTime=System.currentTimeMillis(); + long beginTime = System.currentTimeMillis(); - int BUFFER_SIZE=1024; - int THREAD_NUMBERS=4; + int BUFFER_SIZE = 1024 * 1024; + int THREAD_NUMBERS = 10; /* * createSingleProducer创建一个单生产者的RingBuffer, * 第一个参数叫EventFactory,从名字上理解就是“事件工厂”,其实它的职责就是产生数据填充RingBuffer的区块。 @@ -47,7 +45,7 @@ public class CustomerSyncJob { public DTOMessage newInstance() { return new DTOMessage(); } - }, BUFFER_SIZE,new YieldingWaitStrategy()); + }, BUFFER_SIZE, new YieldingWaitStrategy()); //创建线程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //创建SequenceBarrier @@ -64,7 +62,7 @@ public class CustomerSyncJob { executors.submit(transProcessor); //如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类 - Future future=executors.submit(new Callable() { + Future future = executors.submit(new Callable() { @Override public Void call() throws Exception { List privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null); @@ -72,7 +70,7 @@ public class CustomerSyncJob { for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) { String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId()); - seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块 + seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块 //给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图 ringBuffer.get(seq).setCpUserId(cpUserId); @@ -83,7 +81,7 @@ public class CustomerSyncJob { // ringBuffer.get(seq).setCustomers(customers); ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见 - } + } return null; } }); @@ -92,6 +90,6 @@ public class CustomerSyncJob { transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!) executors.shutdown();//终止线程 - System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime)); + System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime)); } } diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessage.java b/src/main/java/com/kiisoo/ic/job/DTOMessage.java index c516db3..e734f69 100644 --- a/src/main/java/com/kiisoo/ic/job/DTOMessage.java +++ b/src/main/java/com/kiisoo/ic/job/DTOMessage.java @@ -18,6 +18,7 @@ import java.util.List; @Builder public class DTOMessage { + private int index; private String id; private String cpUserId; private PrivilageCpUserStoreDO privilageCpUserStoreDO; diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessageEventProducer.java b/src/main/java/com/kiisoo/ic/job/DTOMessageEventProducer.java new file mode 100644 index 0000000..8aac2c9 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/DTOMessageEventProducer.java @@ -0,0 +1,44 @@ +package com.kiisoo.ic.job; + +import com.kiisoo.ic.customer.CustomerService; +import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; +import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; +import com.kiisoo.ic.wx.service.QWMailListManageService; +import com.lmax.disruptor.RingBuffer; + +public class DTOMessageEventProducer { + + private final RingBuffer ringBuffer; + + private QWMailListManageService qwMailListManageService; + + private CustomerService customerService; + + private PoiStoreStaffDOMapper poiStoreStaffDOMapper; + + public DTOMessageEventProducer(RingBuffer ringBuffer, QWMailListManageService qwMailListManageService,CustomerService customerService,PoiStoreStaffDOMapper poiStoreStaffDOMapper) { + this.ringBuffer = ringBuffer; + this.qwMailListManageService =qwMailListManageService; + this.customerService = customerService; + this.poiStoreStaffDOMapper = poiStoreStaffDOMapper; + } + + public void produceData(int index,PrivilageCpUserStoreDO privilageCpUserStoreDO, String cpUserId) { + long seq = ringBuffer.next(); // 获得下一个Event槽的下标 + try { + // 给Event填充数据 + //给这个区块放入 数据 如果此处不理解,想想RingBuffer的结构图 + ringBuffer.get(seq).setIndex(index); + ringBuffer.get(seq).setCpUserId(cpUserId); + ringBuffer.get(seq).setPrivilageCpUserStoreDO(privilageCpUserStoreDO); + ringBuffer.get(seq).setQwMailListManageService(qwMailListManageService); + ringBuffer.get(seq).setCustomerService(customerService); + ringBuffer.get(seq).setPoiStoreStaffDOMapper(poiStoreStaffDOMapper); +// ringBuffer.get(seq).setCustomers(customers); + } finally { + // 发布Event,激活观察者去消费, 将sequence传递给该消费者 + // 注意,最后的 ringBuffer.publish() 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。 + ringBuffer.publish(seq); + } + } +} diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessageFactory.java b/src/main/java/com/kiisoo/ic/job/DTOMessageFactory.java new file mode 100644 index 0000000..f7b653c --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/DTOMessageFactory.java @@ -0,0 +1,10 @@ +package com.kiisoo.ic.job; + +import com.lmax.disruptor.EventFactory; + +public class DTOMessageFactory implements EventFactory { + @Override + public DTOMessage newInstance() { + return new DTOMessage(); + } +} diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessageGroup1Consumer.java b/src/main/java/com/kiisoo/ic/job/DTOMessageGroup1Consumer.java new file mode 100644 index 0000000..c2d2d07 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/DTOMessageGroup1Consumer.java @@ -0,0 +1,106 @@ +package com.kiisoo.ic.job; + +import com.kiisoo.ic.customer.CustomerService; +import com.kiisoo.ic.store.entity.PoiStoreStaff; +import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO; +import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper; +import com.kiisoo.ic.synchronous.entity.TurnBackDTO; +import com.kiisoo.ic.synchronous.entity.WxDataDTO; +import com.kiisoo.ic.wx.service.QWMailListManageService; +import com.lmax.disruptor.WorkHandler; +import lombok.extern.slf4j.Slf4j; +import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.UUID; + +@Slf4j +public class DTOMessageGroup1Consumer implements WorkHandler { + + int i = 0; + /** + * 处理事件, 如入库操作 + * + * @param dtoMessage + * @throws Exception + */ + @Override + public void onEvent(DTOMessage dtoMessage) throws Exception { + long s = System.currentTimeMillis(); +// System.out.println(Thread.currentThread().getName() + "开始时间:" + s + " | " + dtoMessage.getIndex()); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dtoMessage.setId(UUID.randomUUID().toString()); + String cpUserId = dtoMessage.getCpUserId(); + CustomerService customerService = dtoMessage.getCustomerService(); + PoiStoreStaffDOMapper poiStoreStaffDOMapper = dtoMessage.getPoiStoreStaffDOMapper(); + QWMailListManageService qwMailListManageService = dtoMessage.getQwMailListManageService(); + PrivilageCpUserStoreDO cpUserStoreDO = dtoMessage.getPrivilageCpUserStoreDO(); + List customers = null; + try { + customers = qwMailListManageService.getCustomer(cpUserId); + } catch (Exception e) { + log.error("查询联系人失败:" + cpUserId, e); + } + if (CollectionUtils.isEmpty(customers)) { + return; + } + for (WxCpUserExternalContactInfo customer : customers) { + TurnBackDTO turnBackDTO = new TurnBackDTO(); + + WxCpUserExternalContactInfo.ExternalContact externalContact = customer.getExternalContact(); + List followedUsers = customer.getFollowedUsers(); + + turnBackDTO.setEaCode(""); + if (CollectionUtils.isNotEmpty(followedUsers)) { + for (WxCpUserExternalContactInfo.FollowedUser followedUser : followedUsers) { + if (cpUserId.equals(followedUser.getUserId())) { + String state = followedUser.getState(); + WxCpUserExternalContactInfo.Tag[] tags = followedUser.getTags(); + if (StringUtils.isNotBlank(state)) { + //判断是否有导购码 + turnBackDTO.setEaCode(state); + } else if (tags != null && tags.length > 0) { + //判断是否有打tag + //todo 根据tag获取导购码 + for (int j = 0; j < tags.length; j++) { + String groupName = tags[j].getGroupName(); + if ("导购".equals(groupName)) { + String tagName = tags[j].getTagName(); + String staffCode = poiStoreStaffDOMapper.selectStaffCodeByTagNew(cpUserStoreDO.getStoreId(), tagName); + if (StringUtils.isNotBlank(staffCode)) { + turnBackDTO.setEaCode(staffCode); + } else { + //todo 绑定在标签导购上,后续删除 + turnBackDTO.setEaCode(tagName); + } + } + } + } + Long joinTimeL = followedUser.getCreateTime(); + Long time = new Long(joinTimeL); + String joinTime = sdf.format(time * 1000); + turnBackDTO.setJoinTime(joinTime); + } + } + } + //type 1 微信,type 2 企业微信 + turnBackDTO.setType(externalContact.getType()); + turnBackDTO.setUserId(cpUserId); + turnBackDTO.setName(externalContact.getName()); + WxDataDTO wxDataDTO = new WxDataDTO(); + wxDataDTO.setAvatarUrl(externalContact.getAvatar()); + wxDataDTO.setUserId(externalContact.getExternalUserId()); + wxDataDTO.setUnionId(externalContact.getUnionId()); + turnBackDTO.setWxData(wxDataDTO); + + customerService.turnBack(turnBackDTO); + } + System.out.println(Thread.currentThread().getName() + "消费者消费了消息:" + dtoMessage.getIndex() + "|" + dtoMessage.getCpUserId()); + s = s - System.currentTimeMillis(); + System.out.println(Thread.currentThread().getName() + "结束时间:" + s + " | " + dtoMessage.getIndex()); + } +} diff --git a/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java b/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java index 5463472..8874708 100644 --- a/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java +++ b/src/main/java/com/kiisoo/ic/job/DTOMessageHandler.java @@ -8,6 +8,7 @@ import com.kiisoo.ic.synchronous.entity.TurnBackDTO; import com.kiisoo.ic.synchronous.entity.WxDataDTO; import com.kiisoo.ic.wx.service.QWMailListManageService; import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.WorkHandler; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo; import org.apache.commons.collections.CollectionUtils; @@ -28,8 +29,6 @@ public class DTOMessageHandler implements EventHandler { /** * 处理事件, 如入库操作 * @param dtoMessage - * @param l - * @param b * @throws Exception */ @Override @@ -111,4 +110,5 @@ public class DTOMessageHandler implements EventHandler { customerService.turnBack(turnBackDTO); } } + } diff --git a/src/main/java/com/kiisoo/ic/job/EventExceptionHandler.java b/src/main/java/com/kiisoo/ic/job/EventExceptionHandler.java new file mode 100644 index 0000000..8e25166 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/job/EventExceptionHandler.java @@ -0,0 +1,23 @@ +package com.kiisoo.ic.job; + +import com.lmax.disruptor.ExceptionHandler; + + +public class EventExceptionHandler implements ExceptionHandler { + + @Override + public void handleEventException(Throwable ex, long sequence, Object event) { + System.out.println("handleEventException:" + ex); + } + + @Override + public void handleOnShutdownException(Throwable ex) { + System.out.println("handleEventException:" + ex); + } + + @Override + public void handleOnStartException(Throwable ex) { + System.out.println("handleOnStartException:" + ex); + } + +} diff --git a/src/main/java/com/kiisoo/ic/store/mapper/PoiStoreStaffDOMapper.java b/src/main/java/com/kiisoo/ic/store/mapper/PoiStoreStaffDOMapper.java index d46a82b..637a714 100644 --- a/src/main/java/com/kiisoo/ic/store/mapper/PoiStoreStaffDOMapper.java +++ b/src/main/java/com/kiisoo/ic/store/mapper/PoiStoreStaffDOMapper.java @@ -63,4 +63,6 @@ public interface PoiStoreStaffDOMapper extends BaseMapper { String selectUserNameById(@Param("id") Long id); + String selectStaffCodeByTagNew(@Param("storeId") Long store,@Param("tag") String tag); + } diff --git a/src/main/resources/mapper/PrivilageUserShopRelationDOMapper.xml b/src/main/resources/mapper/PrivilageUserShopRelationDOMapper.xml index c09f5ba..d5e5c41 100644 --- a/src/main/resources/mapper/PrivilageUserShopRelationDOMapper.xml +++ b/src/main/resources/mapper/PrivilageUserShopRelationDOMapper.xml @@ -75,4 +75,9 @@ privilage_user t5 where t4.user_id = t5.id and t4.id = #{id} + +