java自带的延迟队列:DelayQueue
平常我们使用延迟队列时, 都会考虑rabbit MQ或者rocket MQ,那小项目如果我们也想用延迟队列的时候怎么办呢?比如我们一个小项目里也需要针对预约或者订单的过期进行通知,这时候如果有一个延迟队列, 业务实现起来就方便多了。
java还真的自带了一个延迟队列:DelayQueue
DelayQueue
先通过一张图来看看DelayQueue的类结构:
流量控制
DelayQueue实现了BlockingQueue接口并且继承了AbstractQueue抽象类。
BlockingQueue接口是java1.5引入的,它的功能是为队列提供流量控制(FLow Controller)。当一个线程往一个已经满了的BLockingQueue里添加元素时,会被阻塞住,直到其他线程处理完队列里面的数据之后腾出空间了,被阻塞的线程才能成功添加新的元素到队列里。当一个BlockingQueue为空时,如果一个线程执行delete操作,也会被阻塞,直到有新的数据入队。
优先级排序
并且DelayQueue实际存储元素的地方是一个优先级队列,队列元素会根据过期时间的长短进行排序。
也就是说,DelayQueue会保证最早过期的数据最早被消费。
队列元素
根据DelayQueue类的定义,我们可以看到这里用到了一个泛型<E extends Delayed> 。只有泛型E才能被放入队列里,而E是一个实现了Delayed接口的子类。
/**
* A mix-in style interface for marking objects that should be
* acted upon after a given delay.
*
* <p>An implementation of this interface must define a
* {@code compareTo} method that provides an ordering consistent with
* its {@code getDelay} method.
*
* @since 1.5
* @author Doug Lea
*/
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
所以我们在写业务的时候,需要自定义一个Delayed子类,并且提供一个获取实例对象的延迟时间方法。
例子🌰
自定义延迟消息
public class DelayedMessage implements Delayed {
private final long expiredTime;
private final Object data;
public Object getData() {
return data;
}
public DelayedMessage(long expiredTime, Object data) {
this.expiredTime = expiredTime;
this.data = data;
}
@Override
public String toString() {
return "DelayedMessage{" +
"expiredTime=" + expiredTime +
", data=" + data +
'}';
}
@Override
public long getDelay(TimeUnit unit) {
long diffTime = expiredTime - System.currentTimeMillis();
return unit.convert(diffTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}
使用DelayQueue
public class DelayQueueDemo {
public static void main(String[] args) {
DelayQueue<DelayedMessage> delayQueue = new DelayQueue<>();
long baseTime = System.currentTimeMillis();
delayQueue.put(new DelayedMessage(baseTime + 5000, "5s"));
delayQueue.put(new DelayedMessage(baseTime + 6000, "6s"));
delayQueue.put(new DelayedMessage(baseTime + 3000, "3s"));
delayQueue.put(new DelayedMessage(baseTime + 4000, "4s"));
delayQueue.put(new DelayedMessage(baseTime + 7000, "7s"));
delayQueue.forEach(message -> System.out.println(message));
new Thread(() -> {
while (!delayQueue.isEmpty()) {
try {
DelayedMessage message = delayQueue.take();
System.out.println(message.getData());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println("main thread end");
}
}
控制台:
DelayedMessage{expiredTime=1707362591673, data=3s}
DelayedMessage{expiredTime=1707362592673, data=4s}
DelayedMessage{expiredTime=1707362593673, data=5s}
DelayedMessage{expiredTime=1707362594673, data=6s}
DelayedMessage{expiredTime=1707362595673, data=7s}
main thread end
3s
4s
5s
6s
7s