Disruptor 使用。

dev_0531
kevin jiang 6 years ago
parent fd7e90e5dd
commit 0614f4b17b

@ -0,0 +1,76 @@
package com.kiisoo.ic.job;
import com.kiisoo.ic.customer.CustomerService;
import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO;
import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper;
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.wx.service.QWMailListManageService;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.*;
/**
* Disruptor
*/
@Service
@Slf4j
public class CustomerDataJob {
// 指定 ring buffer字节大小必需为2的N次方(能将求模运算转为位运算提高效率 ),否则影响性能
private static final int BUFFER_SIZE = 1024 * 1024;
//固定线程数
private static final int THREAD_NUMBERS = 100;
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
@Autowired
private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
@Autowired
private QWMailListManageService qwMailListManageService;
@Autowired
private PoiStoreStaffDOMapper poiStoreStaffDOMapper;
@Autowired
private CustomerService customerService;
public void handle(){
// 创建缓冲池
Disruptor<DTOMessage> disruptor = new Disruptor<>(new DTOMessageFactory(), BUFFER_SIZE, executors,
ProducerType.SINGLE, new BlockingWaitStrategy());
RingBuffer<DTOMessage> ringBuffer = disruptor.getRingBuffer();
// 创建100个消费者来处理同一个生产者发的消息(这100个消费者不重复消费消息)
DTOMessageGroup1Consumer[] consumers = new DTOMessageGroup1Consumer[THREAD_NUMBERS];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new DTOMessageGroup1Consumer();
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.start();
DTOMessageEventProducer producer = new DTOMessageEventProducer(ringBuffer, qwMailListManageService, customerService, poiStoreStaffDOMapper);
List<PrivilageCpUserStoreDO> privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null);
int index =0;
for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) {
String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId());
producer.produceData(index,privilageCpUserStoreDO, cpUserId);
index++;
}
disruptor.shutdown();
}
}

@ -7,8 +7,6 @@ import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.wx.service.QWMailListManageService;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -32,10 +30,10 @@ public class CustomerSyncJob {
private CustomerService customerService;
public void syncCustomer() throws ExecutionException, InterruptedException {
long beginTime=System.currentTimeMillis();
long beginTime = System.currentTimeMillis();
int BUFFER_SIZE=1024;
int THREAD_NUMBERS=4;
int BUFFER_SIZE = 1024 * 1024;
int THREAD_NUMBERS = 10;
/*
* createSingleProducerRingBuffer
* EventFactoryRingBuffer
@ -47,7 +45,7 @@ public class CustomerSyncJob {
public DTOMessage newInstance() {
return new DTOMessage();
}
}, BUFFER_SIZE,new YieldingWaitStrategy());
}, BUFFER_SIZE, new YieldingWaitStrategy());
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
@ -64,7 +62,7 @@ public class CustomerSyncJob {
executors.submit(transProcessor);
//如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
Future<?> future=executors.submit(new Callable<Void>() {
Future<?> future = executors.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
List<PrivilageCpUserStoreDO> privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null);
@ -72,7 +70,7 @@ public class CustomerSyncJob {
for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) {
String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId());
seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块
seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块
//给这个区块放入 数据 如果此处不理解想想RingBuffer的结构图
ringBuffer.get(seq).setCpUserId(cpUserId);
@ -83,7 +81,7 @@ public class CustomerSyncJob {
// ringBuffer.get(seq).setCustomers(customers);
ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
}
}
return null;
}
});
@ -92,6 +90,6 @@ public class CustomerSyncJob {
transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!
executors.shutdown();//终止线程
System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime));
}
}

@ -18,6 +18,7 @@ import java.util.List;
@Builder
public class DTOMessage {
private int index;
private String id;
private String cpUserId;
private PrivilageCpUserStoreDO privilageCpUserStoreDO;

@ -0,0 +1,44 @@
package com.kiisoo.ic.job;
import com.kiisoo.ic.customer.CustomerService;
import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO;
import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper;
import com.kiisoo.ic.wx.service.QWMailListManageService;
import com.lmax.disruptor.RingBuffer;
public class DTOMessageEventProducer {
private final RingBuffer<DTOMessage> ringBuffer;
private QWMailListManageService qwMailListManageService;
private CustomerService customerService;
private PoiStoreStaffDOMapper poiStoreStaffDOMapper;
public DTOMessageEventProducer(RingBuffer<DTOMessage> ringBuffer, QWMailListManageService qwMailListManageService,CustomerService customerService,PoiStoreStaffDOMapper poiStoreStaffDOMapper) {
this.ringBuffer = ringBuffer;
this.qwMailListManageService =qwMailListManageService;
this.customerService = customerService;
this.poiStoreStaffDOMapper = poiStoreStaffDOMapper;
}
public void produceData(int index,PrivilageCpUserStoreDO privilageCpUserStoreDO, String cpUserId) {
long seq = ringBuffer.next(); // 获得下一个Event槽的下标
try {
// 给Event填充数据
//给这个区块放入 数据 如果此处不理解想想RingBuffer的结构图
ringBuffer.get(seq).setIndex(index);
ringBuffer.get(seq).setCpUserId(cpUserId);
ringBuffer.get(seq).setPrivilageCpUserStoreDO(privilageCpUserStoreDO);
ringBuffer.get(seq).setQwMailListManageService(qwMailListManageService);
ringBuffer.get(seq).setCustomerService(customerService);
ringBuffer.get(seq).setPoiStoreStaffDOMapper(poiStoreStaffDOMapper);
// ringBuffer.get(seq).setCustomers(customers);
} finally {
// 发布Event激活观察者去消费 将sequence传递给该消费者
// 注意,最后的 ringBuffer.publish() 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
ringBuffer.publish(seq);
}
}
}

@ -0,0 +1,10 @@
package com.kiisoo.ic.job;
import com.lmax.disruptor.EventFactory;
public class DTOMessageFactory implements EventFactory<DTOMessage> {
@Override
public DTOMessage newInstance() {
return new DTOMessage();
}
}

@ -0,0 +1,106 @@
package com.kiisoo.ic.job;
import com.kiisoo.ic.customer.CustomerService;
import com.kiisoo.ic.store.entity.PoiStoreStaff;
import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO;
import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper;
import com.kiisoo.ic.synchronous.entity.TurnBackDTO;
import com.kiisoo.ic.synchronous.entity.WxDataDTO;
import com.kiisoo.ic.wx.service.QWMailListManageService;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.UUID;
@Slf4j
public class DTOMessageGroup1Consumer implements WorkHandler<DTOMessage> {
int i = 0;
/**
*
*
* @param dtoMessage
* @throws Exception
*/
@Override
public void onEvent(DTOMessage dtoMessage) throws Exception {
long s = System.currentTimeMillis();
// System.out.println(Thread.currentThread().getName() + "开始时间:" + s + " | " + dtoMessage.getIndex());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
dtoMessage.setId(UUID.randomUUID().toString());
String cpUserId = dtoMessage.getCpUserId();
CustomerService customerService = dtoMessage.getCustomerService();
PoiStoreStaffDOMapper poiStoreStaffDOMapper = dtoMessage.getPoiStoreStaffDOMapper();
QWMailListManageService qwMailListManageService = dtoMessage.getQwMailListManageService();
PrivilageCpUserStoreDO cpUserStoreDO = dtoMessage.getPrivilageCpUserStoreDO();
List<WxCpUserExternalContactInfo> customers = null;
try {
customers = qwMailListManageService.getCustomer(cpUserId);
} catch (Exception e) {
log.error("查询联系人失败:" + cpUserId, e);
}
if (CollectionUtils.isEmpty(customers)) {
return;
}
for (WxCpUserExternalContactInfo customer : customers) {
TurnBackDTO turnBackDTO = new TurnBackDTO();
WxCpUserExternalContactInfo.ExternalContact externalContact = customer.getExternalContact();
List<WxCpUserExternalContactInfo.FollowedUser> followedUsers = customer.getFollowedUsers();
turnBackDTO.setEaCode("");
if (CollectionUtils.isNotEmpty(followedUsers)) {
for (WxCpUserExternalContactInfo.FollowedUser followedUser : followedUsers) {
if (cpUserId.equals(followedUser.getUserId())) {
String state = followedUser.getState();
WxCpUserExternalContactInfo.Tag[] tags = followedUser.getTags();
if (StringUtils.isNotBlank(state)) {
//判断是否有导购码
turnBackDTO.setEaCode(state);
} else if (tags != null && tags.length > 0) {
//判断是否有打tag
//todo 根据tag获取导购码
for (int j = 0; j < tags.length; j++) {
String groupName = tags[j].getGroupName();
if ("导购".equals(groupName)) {
String tagName = tags[j].getTagName();
String staffCode = poiStoreStaffDOMapper.selectStaffCodeByTagNew(cpUserStoreDO.getStoreId(), tagName);
if (StringUtils.isNotBlank(staffCode)) {
turnBackDTO.setEaCode(staffCode);
} else {
//todo 绑定在标签导购上,后续删除
turnBackDTO.setEaCode(tagName);
}
}
}
}
Long joinTimeL = followedUser.getCreateTime();
Long time = new Long(joinTimeL);
String joinTime = sdf.format(time * 1000);
turnBackDTO.setJoinTime(joinTime);
}
}
}
//type 1 微信type 2 企业微信
turnBackDTO.setType(externalContact.getType());
turnBackDTO.setUserId(cpUserId);
turnBackDTO.setName(externalContact.getName());
WxDataDTO wxDataDTO = new WxDataDTO();
wxDataDTO.setAvatarUrl(externalContact.getAvatar());
wxDataDTO.setUserId(externalContact.getExternalUserId());
wxDataDTO.setUnionId(externalContact.getUnionId());
turnBackDTO.setWxData(wxDataDTO);
customerService.turnBack(turnBackDTO);
}
System.out.println(Thread.currentThread().getName() + "消费者消费了消息:" + dtoMessage.getIndex() + "|" + dtoMessage.getCpUserId());
s = s - System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + "结束时间:" + s + " | " + dtoMessage.getIndex());
}
}

@ -8,6 +8,7 @@ import com.kiisoo.ic.synchronous.entity.TurnBackDTO;
import com.kiisoo.ic.synchronous.entity.WxDataDTO;
import com.kiisoo.ic.wx.service.QWMailListManageService;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo;
import org.apache.commons.collections.CollectionUtils;
@ -28,8 +29,6 @@ public class DTOMessageHandler implements EventHandler<DTOMessage> {
/**
*
* @param dtoMessage
* @param l
* @param b
* @throws Exception
*/
@Override
@ -111,4 +110,5 @@ public class DTOMessageHandler implements EventHandler<DTOMessage> {
customerService.turnBack(turnBackDTO);
}
}
}

@ -0,0 +1,23 @@
package com.kiisoo.ic.job;
import com.lmax.disruptor.ExceptionHandler;
public class EventExceptionHandler implements ExceptionHandler {
@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
System.out.println("handleEventException" + ex);
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("handleEventException" + ex);
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("handleOnStartException" + ex);
}
}

@ -63,4 +63,6 @@ public interface PoiStoreStaffDOMapper extends BaseMapper<PoiStoreStaff> {
String selectUserNameById(@Param("id") Long id);
String selectStaffCodeByTagNew(@Param("storeId") Long store,@Param("tag") String tag);
}

@ -75,4 +75,9 @@
privilage_user t5 where t4.user_id = t5.id and t4.id = #{id}
</select>
<select id="selectStaffCodeByTagNew" resultType="java.lang.String">
select t2.staff_code from poi_store_staff_tag t, poi_store_staff t2 where t2.store_id = t.store_id and t2.id = t.staff_id and t.store_id = #{storeId} and t.tag = #{tag}
limit 1
</select>
</mapper>

Loading…
Cancel
Save