diff --git a/src/main/java/com/kiisoo/ic/common/AbstractMsgDisruptor.java b/src/main/java/com/kiisoo/ic/common/AbstractMsgDisruptor.java new file mode 100644 index 0000000..67b55cd --- /dev/null +++ b/src/main/java/com/kiisoo/ic/common/AbstractMsgDisruptor.java @@ -0,0 +1,95 @@ +package com.kiisoo.ic.common; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +/** + * 消息Disruptor + * Created by hc on 2019/10/22 + */ +@Setter +@Getter +@Slf4j +public abstract class AbstractMsgDisruptor { + + /** + * + */ + Disruptor disruptor; + + /** + * 消息生产者 + */ + MsgEventProducer producer; + + /** + * 初始化 + */ + @PostConstruct + public void init(){ + + String name = getName(); + + ThreadFactory producerFactory = Executors.defaultThreadFactory(); + + // 创建缓冲池 + MsgEventFactory eventFactory = new MsgEventFactory(); + + disruptor = new Disruptor<>(eventFactory, getRingBufferSize(), producerFactory, + ProducerType.SINGLE, new BlockingWaitStrategy()); + + RingBuffer ringBuffer = disruptor.getRingBuffer(); + + disruptor.handleEventsWithWorkerPool(getMsgEventHandle()); + + disruptor.start(); + + producer = new MsgEventProducer(ringBuffer); + } + + /** + * 结束关闭 + */ + @PreDestroy + public void close(){ + disruptor.shutdown(); + } + + /** + * 消息发送 + * @param dataMsg + */ + public void send(DisruptorDataI dataMsg){ + producer.onData(dataMsg); + } + + /** + * 环形队列槽数量 + * 必须是2的N次方 + * @return 数量 + */ + public abstract int getRingBufferSize(); + + /** + * 处理消息对象 + * 多个Woker同时消费(消费者不重复消费消息) + * @return 处理woker + */ + public abstract AbstractMsgEventHandler[] getMsgEventHandle(); + + /** + * 主题名称 + */ + public abstract String getName(); + +} diff --git a/src/main/java/com/kiisoo/ic/common/AbstractMsgEventHandler.java b/src/main/java/com/kiisoo/ic/common/AbstractMsgEventHandler.java new file mode 100644 index 0000000..cf3261e --- /dev/null +++ b/src/main/java/com/kiisoo/ic/common/AbstractMsgEventHandler.java @@ -0,0 +1,22 @@ +package com.kiisoo.ic.common; + +import com.lmax.disruptor.WorkHandler; + +/** + * 数据槽工厂 + * Created by hc on 2019/10/22 + */ +public abstract class AbstractMsgEventHandler implements WorkHandler { + + @Override + public void onEvent(MsgEvent msgEvent) throws Exception { + DisruptorDataI dataMsg = msgEvent.getData(); + handler(dataMsg); + } + + /** + * 消息对象处理 + * @param dataMsg 消息 + */ + public abstract void handler(DisruptorDataI dataMsg); +} \ No newline at end of file diff --git a/src/main/java/com/kiisoo/ic/common/DisruptorCallackI.java b/src/main/java/com/kiisoo/ic/common/DisruptorCallackI.java new file mode 100644 index 0000000..520fcea --- /dev/null +++ b/src/main/java/com/kiisoo/ic/common/DisruptorCallackI.java @@ -0,0 +1,9 @@ +package com.kiisoo.ic.common; + +/** + * 消费者回调接口 + */ +public interface DisruptorCallackI { + + void handel(DisruptorDataI dataMsg) throws Exception; +} diff --git a/src/main/java/com/kiisoo/ic/common/DisruptorConsume.java b/src/main/java/com/kiisoo/ic/common/DisruptorConsume.java new file mode 100644 index 0000000..44b906e --- /dev/null +++ b/src/main/java/com/kiisoo/ic/common/DisruptorConsume.java @@ -0,0 +1,26 @@ +package com.kiisoo.ic.common; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +/** + * 消费者 + */ +@Setter +@Slf4j +public class DisruptorConsume extends AbstractMsgEventHandler { + + /** 回调业务 */ + private DisruptorCallackI disruptorCallack; + + @Override + public void handler(DisruptorDataI dataMsg) { + try { + disruptorCallack.handel(dataMsg); + }catch (Exception e){ + log.error("DisruptorConsume error",e); + } + + } + +} diff --git a/src/main/java/com/kiisoo/ic/common/DisruptorDataI.java b/src/main/java/com/kiisoo/ic/common/DisruptorDataI.java new file mode 100644 index 0000000..372b426 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/common/DisruptorDataI.java @@ -0,0 +1,8 @@ +package com.kiisoo.ic.common; + +/** + * 数据 + */ +public interface DisruptorDataI { + +} diff --git a/src/main/java/com/kiisoo/ic/common/MsgEvent.java b/src/main/java/com/kiisoo/ic/common/MsgEvent.java new file mode 100644 index 0000000..6694815 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/common/MsgEvent.java @@ -0,0 +1,19 @@ +package com.kiisoo.ic.common; + +import lombok.Getter; +import lombok.Setter; + +/** + * 数据槽 + * Created by hc on 2019/10/22 + */ +@Getter +@Setter +public class MsgEvent { + + /** + * 接口收到的一次一批消息 + */ + private DisruptorDataI data; + +} diff --git a/src/main/java/com/kiisoo/ic/common/MsgEventFactory.java b/src/main/java/com/kiisoo/ic/common/MsgEventFactory.java new file mode 100644 index 0000000..a01af00 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/common/MsgEventFactory.java @@ -0,0 +1,15 @@ +package com.kiisoo.ic.common; + +import com.lmax.disruptor.EventFactory; + +/** + * 数据槽工厂 + * Created by hc on 2019/10/22 + */ +public class MsgEventFactory implements EventFactory { + + @Override + public MsgEvent newInstance() { + return new MsgEvent(); + } +} diff --git a/src/main/java/com/kiisoo/ic/common/MsgEventProducer.java b/src/main/java/com/kiisoo/ic/common/MsgEventProducer.java new file mode 100644 index 0000000..e5a85bb --- /dev/null +++ b/src/main/java/com/kiisoo/ic/common/MsgEventProducer.java @@ -0,0 +1,26 @@ +package com.kiisoo.ic.common; + +import com.lmax.disruptor.RingBuffer; + +/** + * 消息生产者 + * Created by hc on 2019/10/22 + */ +public class MsgEventProducer { + + private final RingBuffer ringBuffer; + + public MsgEventProducer(RingBuffer ringBuffer){ + this.ringBuffer = ringBuffer; + } + + public void onData(DisruptorDataI dataMsg){ + long sequence = ringBuffer.next(); + try{ + MsgEvent event = ringBuffer.get(sequence); + event.setData(dataMsg); + }finally{ + ringBuffer.publish(sequence); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/kiisoo/ic/synchronous/controller/SynchronousController.java b/src/main/java/com/kiisoo/ic/synchronous/controller/SynchronousController.java index df49276..8f46508 100644 --- a/src/main/java/com/kiisoo/ic/synchronous/controller/SynchronousController.java +++ b/src/main/java/com/kiisoo/ic/synchronous/controller/SynchronousController.java @@ -5,6 +5,8 @@ import com.google.gson.JsonObject; import com.kiisoo.ic.customer.CustomerService; import com.kiisoo.ic.store.entity.PoiStore; import com.kiisoo.ic.synchronous.entity.TurnBackDTO; +import com.kiisoo.ic.synchronous.entity.TurnBackVO; +import com.kiisoo.ic.synchronous.service.BackReceiveService; import com.kiisoo.ic.utils.DataImportUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -32,6 +34,8 @@ public class SynchronousController { this.customerService = customerService; } + @Autowired + private BackReceiveService backReceiveService; /** * 导入店铺接口 * @return @@ -75,16 +79,16 @@ public class SynchronousController { return DataImportUtil.succ(1); } -// /** -// * 好友添加回调通知 -// * @return 是否成功 -// */ -// @RequestMapping(value = "/ics/customer",method = RequestMethod.POST,consumes = "application/json") -// @ResponseBody -// public String turnQueueBack(@RequestBody List turnBackDTOS){ -// String str = JSON.toJSONString(turnBackDTOS); -// System.out.println(str); -// turnBackDTOS.forEach(customerService::turnBack); -// return DataImportUtil.succ(1); -// } + /** + * 好友添加回调通知 + * @return 是否成功 + */ + @RequestMapping(value = "/ics/queue/customer",method = RequestMethod.POST,consumes = "application/json") + @ResponseBody + public String turnQueueBack(@RequestBody List turnBackDTOS){ + TurnBackVO turnBackVO = new TurnBackVO(); + turnBackVO.setTurnBackDTOS(turnBackDTOS); + backReceiveService.send(turnBackVO); + return DataImportUtil.succ(1); + } } diff --git a/src/main/java/com/kiisoo/ic/synchronous/entity/TurnBackVO.java b/src/main/java/com/kiisoo/ic/synchronous/entity/TurnBackVO.java new file mode 100644 index 0000000..b3659b4 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/synchronous/entity/TurnBackVO.java @@ -0,0 +1,13 @@ +package com.kiisoo.ic.synchronous.entity; + +import com.kiisoo.ic.common.DisruptorDataI; +import lombok.Data; + +import java.util.List; + +@Data +public class TurnBackVO implements DisruptorDataI { + + List turnBackDTOS; + +} diff --git a/src/main/java/com/kiisoo/ic/synchronous/service/BackDisposeService.java b/src/main/java/com/kiisoo/ic/synchronous/service/BackDisposeService.java new file mode 100644 index 0000000..c31d9f0 --- /dev/null +++ b/src/main/java/com/kiisoo/ic/synchronous/service/BackDisposeService.java @@ -0,0 +1,22 @@ +package com.kiisoo.ic.synchronous.service; + +import com.alibaba.fastjson.JSON; +import com.kiisoo.ic.common.DisruptorCallackI; +import com.kiisoo.ic.common.DisruptorDataI; +import com.kiisoo.ic.customer.CustomerService; +import com.kiisoo.ic.synchronous.entity.TurnBackVO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class BackDisposeService implements DisruptorCallackI { + + @Autowired + private CustomerService customerService; + + @Override + public void handel(DisruptorDataI dataMsg) throws Exception { + TurnBackVO channelStateOrderDTO = (TurnBackVO)dataMsg; + channelStateOrderDTO.getTurnBackDTOS().forEach(customerService::turnBack); + } +} diff --git a/src/main/java/com/kiisoo/ic/synchronous/service/BackReceiveService.java b/src/main/java/com/kiisoo/ic/synchronous/service/BackReceiveService.java new file mode 100644 index 0000000..0215fcf --- /dev/null +++ b/src/main/java/com/kiisoo/ic/synchronous/service/BackReceiveService.java @@ -0,0 +1,33 @@ +package com.kiisoo.ic.synchronous.service; + +import com.kiisoo.ic.common.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class BackReceiveService extends AbstractMsgDisruptor{ + + @Autowired + private BackDisposeService backDisposeService; + + @Override + public int getRingBufferSize() { + return 1024; + } + + @Override + public AbstractMsgEventHandler[] getMsgEventHandle() { + AbstractMsgEventHandler[] list = new AbstractMsgEventHandler[10]; + for(int i = 0; i < 10; i++){ + DisruptorConsume disruptorConsume = new DisruptorConsume(); + disruptorConsume.setDisruptorCallack(backDisposeService); + list[i] = disruptorConsume; + } + return list; + } + + @Override + public String getName() { + return null; + } +} diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index f7fd93a..0368238 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -8,7 +8,7 @@ mybatis: spring: datasource: - url: jdbc:mysql://192.168.0.215:3306/ic?characterEncoding=utf8&allowMultiQueries=true&autoReconnect=true + url: jdbc:mysql://192.168.0.215:3306/bsd?characterEncoding=utf8&allowMultiQueries=true&autoReconnect=true username: p2p password: p2p driver-class-name: com.mysql.jdbc.Driver