延迟任务的10种实现方式,主要内容如下:
- 手动无线循环
- ScheduledExecutorService
- DelayQueue
- Redis zset 数据判断的方式
- Redis 键空间通知的方式
- Netty 提供的 HashedWheelTimer 工具类
- RabbitMQ 死信队列
- RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange
- Spring Scheduled
- Quartz
什么是延迟任务
顾明思议,我们把需要延迟执行的任务叫做延迟任务
。延迟任务的使用场景
有以下这些:
- 红包 24 小时未被查收,需要延迟执退还业务;
- 每个月账单日,需要给用户发送当月的对账单;
- 订单下单之后 30 分钟后,用户如果没有付钱,系统需要自动取消订单。
等事件都需要使用延迟任务。
延迟任务实现思路分析
延迟任务实现的关键是在某个时间节点执行某个任务
。基于这个信息我们可以想到实现延迟任务的手段
有以下两个:
- 自己手写一个
“死循环”
一直判断当前时间节点有没有要执行的任务;
- 借助
JDK
或者第三方提供的工具
类来实现延迟任务。
- JDK 实现延迟任务能想到的关键词是:
DelayQueue、ScheduledExecutorService
- 第三方提供的延迟任务执行方法就:
Redis、Netty、MQ
等手段。
延迟任务实现
无限循环
此方式需要开启一个无限循环一直扫描任务
,然后使用一个 Map 集合用来存储任务和延迟执行的时间,实现代码如下:
import java.time.Instant; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Iterator; import java.util.Map;
public class DelayTaskExample { private static Map<String, Long> _TaskMap = new HashMap<>();
public static void main(String[] args) { System.out.println("程序启动时间:" + LocalDateTime.now()); _TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli());
loopTask(); }
public static void loopTask() { Long itemLong = 0L; while (true) { Iterator it = _TaskMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry entry = (Map.Entry) it.next(); itemLong = (Long) entry.getValue(); if (Instant.now().toEpochMilli() >= itemLong) { System.out.println("执行任务:" + entry.getKey() + " ,执行时间:" + LocalDateTime.now()); _TaskMap.remove(entry.getKey()); } } } } }
|
执行的结果为:
程序启动时间:2020-04-12T18:51:28.188 执行任务:task-1 ,执行时间:2020-04-12T18:51:31.189
|
任务延迟了 3s 钟执行了,符合我们的预期。
Java API
Java API 提供了两种实现延迟任务的方法:DelayQueue
和 ScheduledExecutorService
。
ScheduledExecutorService
使用 ScheduledExecutorService
来以固定的频率
一直执行任务,实现代码如下:
public class DelayTaskExample { public static void main(String[] args) { System.out.println("程序启动时间:" + LocalDateTime.now()); scheduledExecutorServiceTask(); }
public static void scheduledExecutorServiceTask() { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleWithFixedDelay( new Runnable() { @Override public void run() { System.out.println("执行任务" + " ,执行时间:" + LocalDateTime.now()); } }, 2, 2, TimeUnit.SECONDS); } }
|
程序执行的结果为:
程序启动时间:2020-04-12T21:28:10.416 执行任务 ,执行时间:2020-04-12T21:28:12.421 执行任务 ,执行时间:2020-04-12T21:28:14.422 ......
|
使用 ScheduledExecutorService#scheduleWithFixedDelay(...)
方法之后,会以某个频率一直循环
执行延迟任务。
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列
,队列中的元素必须实现 Delayed 接口,并重写 getDelay(TimeUnit) 和 compareTo(Delayed) 方法
,DelayQueue 实现延迟队列的完整代码如下:
public class DelayTest { public static void main(String[] args) throws InterruptedException { DelayQueue delayQueue = new DelayQueue(); delayQueue.put(new DelayElement(1000)); delayQueue.put(new DelayElement(3000)); delayQueue.put(new DelayElement(5000)); System.out.println("开始时间:" + DateFormat.getDateTimeInstance().format(new Date())); while (!delayQueue.isEmpty()){ System.out.println(delayQueue.take()); } System.out.println("结束时间:" + DateFormat.getDateTimeInstance().format(new Date())); }
static class DelayElement implements Delayed { long delayTime = System.currentTimeMillis(); public DelayElement(long delayTime) { this.delayTime = (this.delayTime + delayTime); } @Override public long getDelay(TimeUnit unit) { return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } else { return 0; } } @Override public String toString() { return DateFormat.getDateTimeInstance().format(new Date(delayTime)); } } }
|
程序执行的结果为:
开始时间:2020-4-12 20:40:38 2020-4-12 20:40:39 2020-4-12 20:40:41 2020-4-12 20:40:43 结束时间:2020-4-12 20:40:43
|
Redis
使用 Redis 实现延迟任务的方法大体可分为两类:通过 zset 数据判断的方式
,和通过键空间通知的方式
。
通过数据判断的方式
借助 zset 数据类型
,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费,实现代码如下(需要借助 Jedis 框架
):
import redis.clients.jedis.Jedis; import utils.JedisUtils; import java.time.Instant; import java.util.Set;
public class DelayQueueExample { private static final String _KEY = "myDelayQueue"; public static void main(String[] args) throws InterruptedException { Jedis jedis = JedisUtils.getJedis(); long delayTime = Instant.now().plusSeconds(30).getEpochSecond(); jedis.zadd(_KEY, delayTime, "order_1"); jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2"); jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3"); jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4"); jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5"); doDelayQueue(jedis); }
public static void doDelayQueue(Jedis jedis) throws InterruptedException { while (true) { Instant nowInstant = Instant.now(); long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); long nowSecond = nowInstant.getEpochSecond(); Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond); for (String item : data) { System.out.println("消费:" + item); } jedis.zremrangeByScore(_KEY, lastSecond, nowSecond); Thread.sleep(1000); } } }
|
通过键空间通知
默认情况
下 Redis 服务器端是不开启键空间通知
的,需要通过 config set notify-keyspace-events Ex
的命令手动开启
,开启键空间通知后,就可以拿到每个键值过期的事件,我们利用这个机制实现了给每个人开启一个定时任务的功能,实现代码如下:
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import utils.JedisUtils;
public class TaskExample { public static final String _TOPIC = "__keyevent@0__:expired"; public static void main(String[] args) { Jedis jedis = JedisUtils.getJedis(); doTask(jedis); }
public static void doTask(Jedis jedis) { jedis.psubscribe(new JedisPubSub() { @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("收到消息:" + message); } }, _TOPIC); } }
|
注:这里的键空间通知,其实是利用Redis的发布与订阅模式
来完成
Netty
Netty 是由 JBOSS 提供的一个 Java 开源框架
,它是一个基于 NIO 的客户、服务器端的编程框架
,使用 Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty 相当于简化和流线化了网络应用的编程开发过程
,例如:基于 TCP 和 UDP 的 socket 服务开发。
可以使用 Netty 提供的工具类 HashedWheelTimer
来实现延迟任务,实现代码如下。
首先在项目中添加 Netty 引用,配置如下:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.48.Final</version> </dependency>
|
Netty 实现的完整代码如下:
public class DelayTaskExample { public static void main(String[] args) { System.out.println("程序启动时间:" + LocalDateTime.now()); NettyTask(); }
private static void NettyTask() { HashedWheelTimer timer = new HashedWheelTimer(3, TimeUnit.SECONDS, 100); TimerTask task = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("执行任务" + " ,执行时间:" + LocalDateTime.now()); } }; timer.newTimeout(task, 0, TimeUnit.SECONDS);
} }
|
程序执行的结果为:
程序启动时间:2020-04-13T10:16:23.033 执行任务 ,执行时间:2020-04-13T10:16:26.118
|
HashedWheelTimer
是使用定时轮
实现的,定时轮其实就是一种环型的数据结构
,可以把它想象成一个时钟,分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务,如下图所示:
图片可以理解为,时间轮大小为 8,某个时间转一格(例如 1s),每格指向一个链表,保存着待执行的任务。
MQ
如果专门开启一个 MQ 中间件来执行延迟任务,就有点杀鸡用牛刀
般的奢侈
了,不过已经有了 MQ 环境的话,用它来实现延迟任务的话,还是可取的。本文以 RabbitMQ 为例实现延迟任务。
RabbitMQ 实现延迟队列的方式有两种:
- 通过消息过期后进入
死信交换器
,再由交换器转发到延迟消费队列,实现延迟功能;
- 使用
rabbitmq-delayed-message-exchange 插件
实现延迟功能。
由于使用死信交换器比较麻烦,所以推荐使用第二种实现方式 rabbitmq-delayed-message-exchange 插件的方式实现延迟队列的功能。
在Rabbitmq 消息延迟推送 -- 插件模式
文章中,已讲解过rabbitmq-delayed-message-exchange 插件
的使用,在此不再累述
传送门:Rabbitmq 消息延迟推送 – 插件模式
注:延迟插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依赖 Erlang/OPT 18.0 及以上运行环境。
Scheduled和Quartz
如果使用的是 Spring 或 SpringBoot 的项目的话,可以使用借助 Scheduled
注解来实现,需要在引导类声明中开启 @EnableScheduling
Quartz 是一款功能强大的任务调度器,可以实现较为复杂的调度功能,它还支持分布式的任务调度
。
在Redis 高效点赞与取消功能
文章中,已有Scheduled和Quartz的使用案例,在此不再累述
传送门:Redis 高效点赞与取消功能