加入队列
parent
d8a103e909
commit
16e2b9b3e9
@ -0,0 +1,95 @@
|
||||
package com.kiisoo.ic.common;
|
||||
|
||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
/**
|
||||
* 消息Disruptor
|
||||
* Created by hc on 2019/10/22
|
||||
*/
|
||||
@Setter
|
||||
@Getter
|
||||
@Slf4j
|
||||
public abstract class AbstractMsgDisruptor {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
Disruptor<MsgEvent> disruptor;
|
||||
|
||||
/**
|
||||
* 消息生产者
|
||||
*/
|
||||
MsgEventProducer producer;
|
||||
|
||||
/**
|
||||
* 初始化
|
||||
*/
|
||||
@PostConstruct
|
||||
public void init(){
|
||||
|
||||
String name = getName();
|
||||
|
||||
ThreadFactory producerFactory = Executors.defaultThreadFactory();
|
||||
|
||||
// 创建缓冲池
|
||||
MsgEventFactory eventFactory = new MsgEventFactory();
|
||||
|
||||
disruptor = new Disruptor<>(eventFactory, getRingBufferSize(), producerFactory,
|
||||
ProducerType.SINGLE, new BlockingWaitStrategy());
|
||||
|
||||
RingBuffer<MsgEvent> ringBuffer = disruptor.getRingBuffer();
|
||||
|
||||
disruptor.handleEventsWithWorkerPool(getMsgEventHandle());
|
||||
|
||||
disruptor.start();
|
||||
|
||||
producer = new MsgEventProducer(ringBuffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 结束关闭
|
||||
*/
|
||||
@PreDestroy
|
||||
public void close(){
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息发送
|
||||
* @param dataMsg
|
||||
*/
|
||||
public void send(DisruptorDataI dataMsg){
|
||||
producer.onData(dataMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* 环形队列槽数量
|
||||
* 必须是2的N次方
|
||||
* @return 数量
|
||||
*/
|
||||
public abstract int getRingBufferSize();
|
||||
|
||||
/**
|
||||
* 处理消息对象
|
||||
* 多个Woker同时消费(消费者不重复消费消息)
|
||||
* @return 处理woker
|
||||
*/
|
||||
public abstract AbstractMsgEventHandler[] getMsgEventHandle();
|
||||
|
||||
/**
|
||||
* 主题名称
|
||||
*/
|
||||
public abstract String getName();
|
||||
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
package com.kiisoo.ic.common;
|
||||
|
||||
import com.lmax.disruptor.WorkHandler;
|
||||
|
||||
/**
|
||||
* 数据槽工厂
|
||||
* Created by hc on 2019/10/22
|
||||
*/
|
||||
public abstract class AbstractMsgEventHandler implements WorkHandler<MsgEvent> {
|
||||
|
||||
@Override
|
||||
public void onEvent(MsgEvent msgEvent) throws Exception {
|
||||
DisruptorDataI dataMsg = msgEvent.getData();
|
||||
handler(dataMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息对象处理
|
||||
* @param dataMsg 消息
|
||||
*/
|
||||
public abstract void handler(DisruptorDataI dataMsg);
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
package com.kiisoo.ic.common;
|
||||
|
||||
/**
|
||||
* 消费者回调接口
|
||||
*/
|
||||
public interface DisruptorCallackI {
|
||||
|
||||
void handel(DisruptorDataI dataMsg) throws Exception;
|
||||
}
|
||||
@ -0,0 +1,26 @@
|
||||
package com.kiisoo.ic.common;
|
||||
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 消费者
|
||||
*/
|
||||
@Setter
|
||||
@Slf4j
|
||||
public class DisruptorConsume extends AbstractMsgEventHandler {
|
||||
|
||||
/** 回调业务 */
|
||||
private DisruptorCallackI disruptorCallack;
|
||||
|
||||
@Override
|
||||
public void handler(DisruptorDataI dataMsg) {
|
||||
try {
|
||||
disruptorCallack.handel(dataMsg);
|
||||
}catch (Exception e){
|
||||
log.error("DisruptorConsume error",e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,8 @@
|
||||
package com.kiisoo.ic.common;
|
||||
|
||||
/**
|
||||
* 数据
|
||||
*/
|
||||
public interface DisruptorDataI {
|
||||
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
package com.kiisoo.ic.common;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* 数据槽
|
||||
* Created by hc on 2019/10/22
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
public class MsgEvent {
|
||||
|
||||
/**
|
||||
* 接口收到的一次一批消息
|
||||
*/
|
||||
private DisruptorDataI data;
|
||||
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
package com.kiisoo.ic.common;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
|
||||
/**
|
||||
* 数据槽工厂
|
||||
* Created by hc on 2019/10/22
|
||||
*/
|
||||
public class MsgEventFactory implements EventFactory<MsgEvent> {
|
||||
|
||||
@Override
|
||||
public MsgEvent newInstance() {
|
||||
return new MsgEvent();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,26 @@
|
||||
package com.kiisoo.ic.common;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
/**
|
||||
* 消息生产者
|
||||
* Created by hc on 2019/10/22
|
||||
*/
|
||||
public class MsgEventProducer {
|
||||
|
||||
private final RingBuffer<MsgEvent> ringBuffer;
|
||||
|
||||
public MsgEventProducer(RingBuffer<MsgEvent> ringBuffer){
|
||||
this.ringBuffer = ringBuffer;
|
||||
}
|
||||
|
||||
public void onData(DisruptorDataI dataMsg){
|
||||
long sequence = ringBuffer.next();
|
||||
try{
|
||||
MsgEvent event = ringBuffer.get(sequence);
|
||||
event.setData(dataMsg);
|
||||
}finally{
|
||||
ringBuffer.publish(sequence);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,13 @@
|
||||
package com.kiisoo.ic.synchronous.entity;
|
||||
|
||||
import com.kiisoo.ic.common.DisruptorDataI;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class TurnBackVO implements DisruptorDataI {
|
||||
|
||||
List<TurnBackDTO> turnBackDTOS;
|
||||
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
package com.kiisoo.ic.synchronous.service;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.kiisoo.ic.common.DisruptorCallackI;
|
||||
import com.kiisoo.ic.common.DisruptorDataI;
|
||||
import com.kiisoo.ic.customer.CustomerService;
|
||||
import com.kiisoo.ic.synchronous.entity.TurnBackVO;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class BackDisposeService implements DisruptorCallackI {
|
||||
|
||||
@Autowired
|
||||
private CustomerService customerService;
|
||||
|
||||
@Override
|
||||
public void handel(DisruptorDataI dataMsg) throws Exception {
|
||||
TurnBackVO channelStateOrderDTO = (TurnBackVO)dataMsg;
|
||||
channelStateOrderDTO.getTurnBackDTOS().forEach(customerService::turnBack);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
package com.kiisoo.ic.synchronous.service;
|
||||
|
||||
import com.kiisoo.ic.common.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Service
|
||||
public class BackReceiveService extends AbstractMsgDisruptor{
|
||||
|
||||
@Autowired
|
||||
private BackDisposeService backDisposeService;
|
||||
|
||||
@Override
|
||||
public int getRingBufferSize() {
|
||||
return 1024;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractMsgEventHandler[] getMsgEventHandle() {
|
||||
AbstractMsgEventHandler[] list = new AbstractMsgEventHandler[10];
|
||||
for(int i = 0; i < 10; i++){
|
||||
DisruptorConsume disruptorConsume = new DisruptorConsume();
|
||||
disruptorConsume.setDisruptorCallack(backDisposeService);
|
||||
list[i] = disruptorConsume;
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue