package com.kiisoo.ic.common; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** * 消息Disruptor * Created by hc on 2019/10/22 */ @Setter @Getter @Slf4j public abstract class AbstractMsgDisruptor { /** * */ Disruptor disruptor; /** * 消息生产者 */ MsgEventProducer producer; /** * 初始化 */ @PostConstruct public void init(){ String name = getName(); ThreadFactory producerFactory = Executors.defaultThreadFactory(); // 创建缓冲池 MsgEventFactory eventFactory = new MsgEventFactory(); disruptor = new Disruptor<>(eventFactory, getRingBufferSize(), producerFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); RingBuffer ringBuffer = disruptor.getRingBuffer(); disruptor.handleEventsWithWorkerPool(getMsgEventHandle()); disruptor.start(); producer = new MsgEventProducer(ringBuffer); } /** * 结束关闭 */ @PreDestroy public void close(){ disruptor.shutdown(); } /** * 消息发送 * @param dataMsg */ public void send(DisruptorDataI dataMsg){ producer.onData(dataMsg); } /** * 环形队列槽数量 * 必须是2的N次方 * @return 数量 */ public abstract int getRingBufferSize(); /** * 处理消息对象 * 多个Woker同时消费(消费者不重复消费消息) * @return 处理woker */ public abstract AbstractMsgEventHandler[] getMsgEventHandle(); /** * 主题名称 */ public abstract String getName(); }