You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
96 lines
2.0 KiB
Java
96 lines
2.0 KiB
Java
package com.kiisoo.ic.common;
|
|
|
|
import com.lmax.disruptor.BlockingWaitStrategy;
|
|
import com.lmax.disruptor.RingBuffer;
|
|
import com.lmax.disruptor.dsl.Disruptor;
|
|
import com.lmax.disruptor.dsl.ProducerType;
|
|
import lombok.Getter;
|
|
import lombok.Setter;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
import javax.annotation.PostConstruct;
|
|
import javax.annotation.PreDestroy;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
|
/**
|
|
* 消息Disruptor
|
|
* Created by hc on 2019/10/22
|
|
*/
|
|
@Setter
|
|
@Getter
|
|
@Slf4j
|
|
public abstract class AbstractMsgDisruptor {
|
|
|
|
/**
|
|
*
|
|
*/
|
|
Disruptor<MsgEvent> disruptor;
|
|
|
|
/**
|
|
* 消息生产者
|
|
*/
|
|
MsgEventProducer producer;
|
|
|
|
/**
|
|
* 初始化
|
|
*/
|
|
@PostConstruct
|
|
public void init(){
|
|
|
|
String name = getName();
|
|
|
|
ThreadFactory producerFactory = Executors.defaultThreadFactory();
|
|
|
|
// 创建缓冲池
|
|
MsgEventFactory eventFactory = new MsgEventFactory();
|
|
|
|
disruptor = new Disruptor<>(eventFactory, getRingBufferSize(), producerFactory,
|
|
ProducerType.SINGLE, new BlockingWaitStrategy());
|
|
|
|
RingBuffer<MsgEvent> ringBuffer = disruptor.getRingBuffer();
|
|
|
|
disruptor.handleEventsWithWorkerPool(getMsgEventHandle());
|
|
|
|
disruptor.start();
|
|
|
|
producer = new MsgEventProducer(ringBuffer);
|
|
}
|
|
|
|
/**
|
|
* 结束关闭
|
|
*/
|
|
@PreDestroy
|
|
public void close(){
|
|
disruptor.shutdown();
|
|
}
|
|
|
|
/**
|
|
* 消息发送
|
|
* @param dataMsg
|
|
*/
|
|
public void send(DisruptorDataI dataMsg){
|
|
producer.onData(dataMsg);
|
|
}
|
|
|
|
/**
|
|
* 环形队列槽数量
|
|
* 必须是2的N次方
|
|
* @return 数量
|
|
*/
|
|
public abstract int getRingBufferSize();
|
|
|
|
/**
|
|
* 处理消息对象
|
|
* 多个Woker同时消费(消费者不重复消费消息)
|
|
* @return 处理woker
|
|
*/
|
|
public abstract AbstractMsgEventHandler[] getMsgEventHandle();
|
|
|
|
/**
|
|
* 主题名称
|
|
*/
|
|
public abstract String getName();
|
|
|
|
}
|