java自带的延迟队列:DelayQueue
平常我们使用延迟队列时, 都会考虑rabbit MQ或者rocket MQ,那小项目如果我们也想用延迟队列的时候怎么办呢?比如我们一个小项目里也需要针对预约或者订单的过期进行通知,这时候如果有一个延迟队列, 业务实现起来就方便多了。
java还真的自带了一个延迟队列:DelayQueue
DelayQueue
先通过一张图来看看DelayQueue的类结构:
![image](https://prod-files-secure.s3.us-west-2.amazonaws.com/371abca5-94fd-4d13-a43e-bbcb27be7c63/e4cf0ef9-6bd5-4383-8dca-2521ac82b2bf/Untitled.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Content-Sha256=UNSIGNED-PAYLOAD&X-Amz-Credential=ASIAZI2LB466ZIBAX4HN%2F20250205%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Date=20250205T155647Z&X-Amz-Expires=3600&X-Amz-Security-Token=IQoJb3JpZ2luX2VjEC8aCXVzLXdlc3QtMiJIMEYCIQDIYNCD4D%2F3ZOx2qIjbVWHrpbtlB7UkD7m%2BB5g5JZFjcAIhAJlnKqQ2jIk7208lOTKlMQfgjuo6zrUDN3SUKTUOzoJFKv8DCEgQABoMNjM3NDIzMTgzODA1IgxYPypIVuCPUSxP%2F3cq3AP%2FvH9CjjZjEqn%2FN7Ztg3SEEEzzJ%2BM737FKnIRcdpa1%2FItQOadnRFBkcVByLLltnhqijRfeUsUiKnu5%2FPirkDVuiXOO2%2FOHFTNmNdvQ766%2F9NcqD0a3YohcOBtwUu0r5oXjOUeUHeZo%2FRic3vi294bjraRO1izGG6bi38CHQbJD2JpB6MbFhOX0ETiapSlg4dn%2FXCncTeWxLDDEStgWRElMd%2B6v1C4XNsCYpkTI9aBNmZtF6%2B6tTV4T23%2FpZJHKaaUZuNXX15Gbkjob%2Fv2XsLveDQ%2FVzeC5A7NOoRdnPX9%2F6YU%2FbV4EPJGtJYLUlir3hwkiZ%2BbZYv3ZvhLRUUg0lzpSyZ2UYhGBM2LEHSRWgkC7kbclu1vD2xc%2Fs1D9bjnwxeYHpC36D3Op8O7pSJbgt2VAwflHflFnGU0rZF1Im1LVX%2FRa%2FjFoP9jtdFkdMV0dSo0kZ6HvLVNi0FlJzADhq8XQ0ueEsOruXsjxLbrcGxXWUp79NPh4tSp6%2BjNLeyIujHFYpaH3fp7pF4CcfVw4W2Nknq6QKz0r7jnQvO21jX0WUYKFJe9aEsqUtWsTDf8lsTbTMEZk%2F639PNPtBTx8YPVRvuXLoJhzil4xA118o3%2FvRfaP76XTrzvAW%2Bb9EzCSgY69BjqkAXJNTmvk1IuYOphubitsdcgS9Ls266EOyiFfIln65swtK94DJfwheleblKvvUIVHeTnhW8g4fwnpX9wAJcwg2ZCDBkeJ8Cj%2BODLJC7PYU%2FzDHqsOAbmP2GwH1travjgphbhZiboCteQA%2B79qZgmJMR%2F3IR5A%2FPIX6FXIE%2F49kGJ5a4JUcDaZH0YAbnGMA4zhqWYxCgWUmPf4miAz4BPRNPreWWOJ&X-Amz-Signature=94c0820308b1846f74532acd40a2bc0cd6523e460ea7e83732fa8b9132dd9d5f&X-Amz-SignedHeaders=host&x-id=GetObject)
流量控制
DelayQueue实现了BlockingQueue接口并且继承了AbstractQueue抽象类。
BlockingQueue接口是java1.5引入的,它的功能是为队列提供流量控制(FLow Controller)。当一个线程往一个已经满了的BLockingQueue里添加元素时,会被阻塞住,直到其他线程处理完队列里面的数据之后腾出空间了,被阻塞的线程才能成功添加新的元素到队列里。当一个BlockingQueue为空时,如果一个线程执行delete操作,也会被阻塞,直到有新的数据入队。
![image](https://prod-files-secure.s3.us-west-2.amazonaws.com/371abca5-94fd-4d13-a43e-bbcb27be7c63/259fe33d-5d33-4cb1-859c-110dc4e76be0/Untitled.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Content-Sha256=UNSIGNED-PAYLOAD&X-Amz-Credential=ASIAZI2LB466ZIBAX4HN%2F20250205%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Date=20250205T155647Z&X-Amz-Expires=3600&X-Amz-Security-Token=IQoJb3JpZ2luX2VjEC8aCXVzLXdlc3QtMiJIMEYCIQDIYNCD4D%2F3ZOx2qIjbVWHrpbtlB7UkD7m%2BB5g5JZFjcAIhAJlnKqQ2jIk7208lOTKlMQfgjuo6zrUDN3SUKTUOzoJFKv8DCEgQABoMNjM3NDIzMTgzODA1IgxYPypIVuCPUSxP%2F3cq3AP%2FvH9CjjZjEqn%2FN7Ztg3SEEEzzJ%2BM737FKnIRcdpa1%2FItQOadnRFBkcVByLLltnhqijRfeUsUiKnu5%2FPirkDVuiXOO2%2FOHFTNmNdvQ766%2F9NcqD0a3YohcOBtwUu0r5oXjOUeUHeZo%2FRic3vi294bjraRO1izGG6bi38CHQbJD2JpB6MbFhOX0ETiapSlg4dn%2FXCncTeWxLDDEStgWRElMd%2B6v1C4XNsCYpkTI9aBNmZtF6%2B6tTV4T23%2FpZJHKaaUZuNXX15Gbkjob%2Fv2XsLveDQ%2FVzeC5A7NOoRdnPX9%2F6YU%2FbV4EPJGtJYLUlir3hwkiZ%2BbZYv3ZvhLRUUg0lzpSyZ2UYhGBM2LEHSRWgkC7kbclu1vD2xc%2Fs1D9bjnwxeYHpC36D3Op8O7pSJbgt2VAwflHflFnGU0rZF1Im1LVX%2FRa%2FjFoP9jtdFkdMV0dSo0kZ6HvLVNi0FlJzADhq8XQ0ueEsOruXsjxLbrcGxXWUp79NPh4tSp6%2BjNLeyIujHFYpaH3fp7pF4CcfVw4W2Nknq6QKz0r7jnQvO21jX0WUYKFJe9aEsqUtWsTDf8lsTbTMEZk%2F639PNPtBTx8YPVRvuXLoJhzil4xA118o3%2FvRfaP76XTrzvAW%2Bb9EzCSgY69BjqkAXJNTmvk1IuYOphubitsdcgS9Ls266EOyiFfIln65swtK94DJfwheleblKvvUIVHeTnhW8g4fwnpX9wAJcwg2ZCDBkeJ8Cj%2BODLJC7PYU%2FzDHqsOAbmP2GwH1travjgphbhZiboCteQA%2B79qZgmJMR%2F3IR5A%2FPIX6FXIE%2F49kGJ5a4JUcDaZH0YAbnGMA4zhqWYxCgWUmPf4miAz4BPRNPreWWOJ&X-Amz-Signature=324a9095ef9c3f4e1edd5a85c69c60f1327f9c05be94c64a0c60080f71ab7453&X-Amz-SignedHeaders=host&x-id=GetObject)
优先级排序
并且DelayQueue实际存储元素的地方是一个优先级队列,队列元素会根据过期时间的长短进行排序。
![image](https://prod-files-secure.s3.us-west-2.amazonaws.com/371abca5-94fd-4d13-a43e-bbcb27be7c63/a0d01878-d9c7-4d53-9fe9-acec31405d9c/Untitled.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Content-Sha256=UNSIGNED-PAYLOAD&X-Amz-Credential=ASIAZI2LB466ZIBAX4HN%2F20250205%2Fus-west-2%2Fs3%2Faws4_request&X-Amz-Date=20250205T155647Z&X-Amz-Expires=3600&X-Amz-Security-Token=IQoJb3JpZ2luX2VjEC8aCXVzLXdlc3QtMiJIMEYCIQDIYNCD4D%2F3ZOx2qIjbVWHrpbtlB7UkD7m%2BB5g5JZFjcAIhAJlnKqQ2jIk7208lOTKlMQfgjuo6zrUDN3SUKTUOzoJFKv8DCEgQABoMNjM3NDIzMTgzODA1IgxYPypIVuCPUSxP%2F3cq3AP%2FvH9CjjZjEqn%2FN7Ztg3SEEEzzJ%2BM737FKnIRcdpa1%2FItQOadnRFBkcVByLLltnhqijRfeUsUiKnu5%2FPirkDVuiXOO2%2FOHFTNmNdvQ766%2F9NcqD0a3YohcOBtwUu0r5oXjOUeUHeZo%2FRic3vi294bjraRO1izGG6bi38CHQbJD2JpB6MbFhOX0ETiapSlg4dn%2FXCncTeWxLDDEStgWRElMd%2B6v1C4XNsCYpkTI9aBNmZtF6%2B6tTV4T23%2FpZJHKaaUZuNXX15Gbkjob%2Fv2XsLveDQ%2FVzeC5A7NOoRdnPX9%2F6YU%2FbV4EPJGtJYLUlir3hwkiZ%2BbZYv3ZvhLRUUg0lzpSyZ2UYhGBM2LEHSRWgkC7kbclu1vD2xc%2Fs1D9bjnwxeYHpC36D3Op8O7pSJbgt2VAwflHflFnGU0rZF1Im1LVX%2FRa%2FjFoP9jtdFkdMV0dSo0kZ6HvLVNi0FlJzADhq8XQ0ueEsOruXsjxLbrcGxXWUp79NPh4tSp6%2BjNLeyIujHFYpaH3fp7pF4CcfVw4W2Nknq6QKz0r7jnQvO21jX0WUYKFJe9aEsqUtWsTDf8lsTbTMEZk%2F639PNPtBTx8YPVRvuXLoJhzil4xA118o3%2FvRfaP76XTrzvAW%2Bb9EzCSgY69BjqkAXJNTmvk1IuYOphubitsdcgS9Ls266EOyiFfIln65swtK94DJfwheleblKvvUIVHeTnhW8g4fwnpX9wAJcwg2ZCDBkeJ8Cj%2BODLJC7PYU%2FzDHqsOAbmP2GwH1travjgphbhZiboCteQA%2B79qZgmJMR%2F3IR5A%2FPIX6FXIE%2F49kGJ5a4JUcDaZH0YAbnGMA4zhqWYxCgWUmPf4miAz4BPRNPreWWOJ&X-Amz-Signature=820288b9d5f8a5735caec1c962a2f330c5d60919afdf11498155e858850a3624&X-Amz-SignedHeaders=host&x-id=GetObject)
也就是说,DelayQueue会保证最早过期的数据最早被消费。
队列元素
根据DelayQueue类的定义,我们可以看到这里用到了一个泛型<E extends Delayed> 。只有泛型E才能被放入队列里,而E是一个实现了Delayed接口的子类。
/**
* A mix-in style interface for marking objects that should be
* acted upon after a given delay.
*
* <p>An implementation of this interface must define a
* {@code compareTo} method that provides an ordering consistent with
* its {@code getDelay} method.
*
* @since 1.5
* @author Doug Lea
*/
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
所以我们在写业务的时候,需要自定义一个Delayed子类,并且提供一个获取实例对象的延迟时间方法。
例子🌰
自定义延迟消息
public class DelayedMessage implements Delayed {
private final long expiredTime;
private final Object data;
public Object getData() {
return data;
}
public DelayedMessage(long expiredTime, Object data) {
this.expiredTime = expiredTime;
this.data = data;
}
@Override
public String toString() {
return "DelayedMessage{" +
"expiredTime=" + expiredTime +
", data=" + data +
'}';
}
@Override
public long getDelay(TimeUnit unit) {
long diffTime = expiredTime - System.currentTimeMillis();
return unit.convert(diffTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}
使用DelayQueue
public class DelayQueueDemo {
public static void main(String[] args) {
DelayQueue<DelayedMessage> delayQueue = new DelayQueue<>();
long baseTime = System.currentTimeMillis();
delayQueue.put(new DelayedMessage(baseTime + 5000, "5s"));
delayQueue.put(new DelayedMessage(baseTime + 6000, "6s"));
delayQueue.put(new DelayedMessage(baseTime + 3000, "3s"));
delayQueue.put(new DelayedMessage(baseTime + 4000, "4s"));
delayQueue.put(new DelayedMessage(baseTime + 7000, "7s"));
delayQueue.forEach(message -> System.out.println(message));
new Thread(() -> {
while (!delayQueue.isEmpty()) {
try {
DelayedMessage message = delayQueue.take();
System.out.println(message.getData());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println("main thread end");
}
}
控制台:
DelayedMessage{expiredTime=1707362591673, data=3s}
DelayedMessage{expiredTime=1707362592673, data=4s}
DelayedMessage{expiredTime=1707362593673, data=5s}
DelayedMessage{expiredTime=1707362594673, data=6s}
DelayedMessage{expiredTime=1707362595673, data=7s}
main thread end
3s
4s
5s
6s
7s