大屏接口

dev_0531
yechenhao 6 years ago
parent 6be2e97075
commit ea75369185

@ -0,0 +1,94 @@
package com.kiisoo.ic.job;
import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO;
import com.kiisoo.ic.store.mapper.PrivilageCpUserStoreDOMapper;
import com.kiisoo.ic.wx.service.QWMailListManageService;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.*;
@Service
@Slf4j
public class CustomerSyncJob {
@Autowired
private PrivilageCpUserStoreDOMapper privilageCpUserStoreDOMapper;
@Autowired
private QWMailListManageService qwMailListManageService;
public void syncCustomer() throws ExecutionException, InterruptedException {
long beginTime=System.currentTimeMillis();
int BUFFER_SIZE=1024;
int THREAD_NUMBERS=4;
/*
* createSingleProducerRingBuffer
* EventFactoryRingBuffer
* RingBuffer2 &
* RingBuffer( )
*/
final RingBuffer<DTOMessage> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<DTOMessage>() {
@Override
public DTOMessage newInstance() {
return new DTOMessage();
}
}, BUFFER_SIZE,new YieldingWaitStrategy());
//创建线程池
ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
//创建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//创建消息处理器
BatchEventProcessor<DTOMessage> transProcessor = new BatchEventProcessor<DTOMessage>(
ringBuffer, sequenceBarrier, new DTOMessageHandler());
//这一部的目的是让RingBuffer根据消费者的状态 如果只有一个消费者的情况可以省略
ringBuffer.addGatingSequences(transProcessor.getSequence());
//把消息处理器提交到线程池
executors.submit(transProcessor);
//如果存大多个消费者 那重复执行上面3行代码 把TradeTransactionInDBHandler换成其它消费者类
Future<?> future=executors.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
List<PrivilageCpUserStoreDO> privilageCpUserStoreDOS = privilageCpUserStoreDOMapper.selectList(null);
long seq;
for (PrivilageCpUserStoreDO privilageCpUserStoreDO : privilageCpUserStoreDOS) {
String cpUserId = privilageCpUserStoreDOMapper.selectCpUserIdByStoreId(privilageCpUserStoreDO.getStoreId());
List<WxCpUserExternalContactInfo> customers = null;
try {
customers = qwMailListManageService.getCustomer(cpUserId);
}catch (Exception e) {
log.error("查询联系人失败:"+cpUserId,e);
}
if (CollectionUtils.isNotEmpty(customers)){
continue;
}
seq=ringBuffer.next();//占个坑 --ringBuffer一个可用区块
//给这个区块放入 数据 如果此处不理解想想RingBuffer的结构图
ringBuffer.get(seq).setCpUserId(cpUserId);
ringBuffer.get(seq).setPrivilageCpUserStoreDO(privilageCpUserStoreDO);
ringBuffer.get(seq).setCustomers(customers);
ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
}
return null;
}
});
future.get();//等待生产者结束
Thread.sleep(1000);//等上1秒等消费都处理完成
transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!
executors.shutdown();//终止线程
System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
}
}

@ -1,9 +1,13 @@
package com.kiisoo.ic.job;
import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo;
import java.util.List;
@Data
@AllArgsConstructor
@ -12,7 +16,8 @@ import lombok.NoArgsConstructor;
public class DTOMessage {
private String id;
private String name;
private double price;
private String cpUserId;
private PrivilageCpUserStoreDO privilageCpUserStoreDO;
private List<WxCpUserExternalContactInfo> customers;
}

@ -1,14 +1,34 @@
package com.kiisoo.ic.job;
import com.kiisoo.ic.customer.CustomerService;
import com.kiisoo.ic.store.entity.PoiStoreStaff;
import com.kiisoo.ic.store.entity.PrivilageCpUserStoreDO;
import com.kiisoo.ic.store.mapper.PoiStoreStaffDOMapper;
import com.kiisoo.ic.synchronous.entity.TurnBackDTO;
import com.kiisoo.ic.synchronous.entity.WxDataDTO;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.cp.bean.WxCpUserExternalContactInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.UUID;
@Slf4j
@Service
public class DTOMessageHandler implements EventHandler<DTOMessage> {
@Autowired
private PoiStoreStaffDOMapper poiStoreStaffDOMapper;
@Autowired
private CustomerService customerService;
/**
*
* @param dtoMessage
@ -18,8 +38,64 @@ public class DTOMessageHandler implements EventHandler<DTOMessage> {
*/
@Override
public void onEvent(DTOMessage dtoMessage, long l, boolean b) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
dtoMessage.setId(UUID.randomUUID().toString());
log.info( dtoMessage.toString());
// 入库操作
String cpUserId = dtoMessage.getCpUserId();
PrivilageCpUserStoreDO cpUserStoreDO = dtoMessage.getPrivilageCpUserStoreDO();
List<WxCpUserExternalContactInfo> customers = dtoMessage.getCustomers();
for (WxCpUserExternalContactInfo customer:customers){
TurnBackDTO turnBackDTO = new TurnBackDTO();
WxCpUserExternalContactInfo.ExternalContact externalContact = customer.getExternalContact();
List<WxCpUserExternalContactInfo.FollowedUser> followedUsers = customer.getFollowedUsers();
turnBackDTO.setEaCode("");
if (CollectionUtils.isNotEmpty(followedUsers)){
for (WxCpUserExternalContactInfo.FollowedUser followedUser:followedUsers){
if (cpUserId.equals(followedUser.getUserId())){
String state = followedUser.getState();
WxCpUserExternalContactInfo.Tag[] tags = followedUser.getTags();
if (StringUtils.isNotBlank(state)){
//判断是否有导购码
turnBackDTO.setEaCode(state);
}else if(tags != null && tags.length > 0){
//判断是否有打tag
//todo 根据tag获取导购码
for (int j = 0;j<tags.length;j++){
String groupName = tags[j].getGroupName();
if ("导购".equals(groupName)){
String tagName = tags[j].getTagName();
Long staffId = poiStoreStaffDOMapper.selectStaffIdByTag(cpUserStoreDO.getStoreId(), tagName);
if (staffId != null){
PoiStoreStaff poiStoreStaff = poiStoreStaffDOMapper.selectById(staffId);
if (poiStoreStaff!=null){
turnBackDTO.setEaCode(poiStoreStaff.getStaffCode());
}
}else{
//todo 绑定在标签导购上,后续删除
turnBackDTO.setEaCode(tagName);
}
}
}
}
Long joinTimeL = followedUser.getCreateTime();
Long time=new Long(joinTimeL);
String joinTime = sdf.format(time*1000);
turnBackDTO.setJoinTime(joinTime);
}
}
}
//type 1 微信type 2 企业微信
turnBackDTO.setType(externalContact.getType());
turnBackDTO.setUserId(cpUserId);
turnBackDTO.setName(externalContact.getName());
WxDataDTO wxDataDTO = new WxDataDTO();
wxDataDTO.setAvatarUrl(externalContact.getAvatar());
wxDataDTO.setUserId(externalContact.getExternalUserId());
wxDataDTO.setUnionId(externalContact.getUnionId());
turnBackDTO.setWxData(wxDataDTO);
customerService.turnBack(turnBackDTO);
}
}
}

@ -0,0 +1,25 @@
package com.kiisoo.ic.job;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@Controller
@RequestMapping("/job")
@Slf4j
public class SyncJobController {
@Autowired
private CustomerSyncJob customerSyncJob;
@GetMapping("/sync/customer")
public void getCustomerInfo(){
try {
customerSyncJob.syncCustomer();
}catch (Exception e){
log.error("",e);
}
}
}
Loading…
Cancel
Save