java实现一个发布订阅模式
用java模拟一个消息队列(发布订阅模式)
思路
常见的消息队列有4个元素
topic
consumer
producer
broker
topic作为一个主题来关联producer和consumer
consumer用来消费消息
producer用来产生消息
broker作为一个协调器来协调整个队列的运作
实现
topic
topic需要存出一个消费者集合和消息缓存
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Topic {
private String name;
private List<Consumer> consumerList;
private List<Object> messageList;
}
consumer接口
所有的消费者实现类都要实现这个接口
public interface Consumer {
void receiveMessage(Object message);
}
broker
broker需要实现创建topic、协助consumer订阅topic、协助producer发布消息、协助consumer消费消息等操作
@Slf4j
@Component
public class PubSubManager {
private final Map<String, Topic> topicMap;
private final ExecutorService producerThreadPool;
private final ExecutorService consumerThreadPool;
private PubSubManager() {
topicMap = new ConcurrentHashMap<>();
producerThreadPool = Executors.newFixedThreadPool(5);
consumerThreadPool = Executors.newFixedThreadPool(5);
}
private static class PubSubManagerHolder {
private static final PubSubManager INSTANCE = new PubSubManager();
}
public static PubSubManager getInstance() {
return PubSubManagerHolder.INSTANCE;
}
/**
* 创建主题
*/
void createTopic(String topicName) {
Topic topic = Topic.builder().name(topicName).messageList(new ArrayList<>()).consumerList(new ArrayList<>()).build();
topicMap.put(topicName, topic);
}
/**
* 订阅主题
*/
void subscribe(String topicName, Consumer consumer) {
Topic topic = topicMap.get(topicName);
if (topic == null) {
throw new RuntimeException("topic not exist");
}
topic.getConsumerList().add(consumer);
}
/**
* 取消订阅
*
* @author yehao
*/
void unsubscribe(String topicName, Consumer consumer) {
Topic topic = topicMap.get(topicName);
if (topic == null) {
throw new RuntimeException("topic not exist");
}
topic.getConsumerList().remove(consumer);
}
/**
* 生产消息
*/
public void produce(String topicName, Object message) {
Topic topic = topicMap.get(topicName);
if (topic == null) {
throw new RuntimeException("topic not exist");
}
producerThreadPool.submit(() -> consume(topic, message));
}
/**
* 消费消息
*/
private void consume(Topic topic, Object message) {
topic.getMessageList().add(message);
List<Future<?>> futureList = new ArrayList<>();
for (Consumer consumer : topic.getConsumerList()) {
futureList.add(consumerThreadPool.submit(() -> consumer.receiveMessage(message)));
}
for (Future<?> future : futureList) {
try {
future.get();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
topic.getMessageList().remove(message);
}
}
测试
实现一个测试consumer
public class TestConsumer implements Consumer {
@Override
public void receiveMessage(Object message) {
System.out.printf("TestConsumer[%s] receive message: %s%n", hashCode(), message);
}
}
将主线程作为producer,每秒产生一条消息
public class Test {
public static void main(String[] args) {
PubSubManager pubSubManager = PubSubManager.getInstance();
pubSubManager.createTopic("test");
pubSubManager.subscribe("test", new TestConsumer());
pubSubManager.subscribe("test", new TestConsumer());
pubSubManager.subscribe("test", new TestConsumer());
while (true) {
pubSubManager.produce("test", "hello");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
控制台消息:
TestConsumer[350340640] receive message: hello
TestConsumer[727802134] receive message: hello
TestConsumer[1370913204] receive message: hello
TestConsumer[1370913204] receive message: hello
TestConsumer[350340640] receive message: hello
TestConsumer[727802134] receive message: hello
TestConsumer[1370913204] receive message: hello
TestConsumer[727802134] receive message: hello
TestConsumer[350340640] receive message: hello
TestConsumer[1370913204] receive message: hello
TestConsumer[727802134] receive message: hello
TestConsumer[350340640] receive message: hello