You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
bsdgy-server/src/main/java/com/kiisoo/ic/common/AbstractMsgDisruptor.java

96 lines
2.0 KiB
Java

package com.kiisoo.ic.common;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* 消息Disruptor
* Created by hc on 2019/10/22
*/
@Setter
@Getter
@Slf4j
public abstract class AbstractMsgDisruptor {
/**
*
*/
Disruptor<MsgEvent> disruptor;
/**
* 消息生产者
*/
MsgEventProducer producer;
/**
* 初始化
*/
@PostConstruct
public void init(){
String name = getName();
ThreadFactory producerFactory = Executors.defaultThreadFactory();
// 创建缓冲池
MsgEventFactory eventFactory = new MsgEventFactory();
disruptor = new Disruptor<>(eventFactory, getRingBufferSize(), producerFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());
RingBuffer<MsgEvent> ringBuffer = disruptor.getRingBuffer();
disruptor.handleEventsWithWorkerPool(getMsgEventHandle());
disruptor.start();
producer = new MsgEventProducer(ringBuffer);
}
/**
* 结束关闭
*/
@PreDestroy
public void close(){
disruptor.shutdown();
}
/**
* 消息发送
* @param dataMsg
*/
public void send(DisruptorDataI dataMsg){
producer.onData(dataMsg);
}
/**
* 环形队列槽数量
* 必须是2的N次方
* @return 数量
*/
public abstract int getRingBufferSize();
/**
* 处理消息对象
* 多个Woker同时消费(消费者不重复消费消息)
* @return 处理woker
*/
public abstract AbstractMsgEventHandler[] getMsgEventHandle();
/**
* 主题名称
*/
public abstract String getName();
}