最近公司定时任务越来越多,新的需求也在增加,新写了代码就要上线,上线重新部署的时候,如果这时候有其他定时任务或者异步逻辑在处理,这个处理就会被中断,而中断自然就会影响到业务。
之前我是加了个页面监控当前正在运行的任务,在每次部署的时候,看看现在有没有其他业务在运行,如果有的话,就等一会这个任务结束再部署。
不过最近也是有一些任务耗时特别长,可能你需要等它一天,如果这时候重新部署打断了这个任务,就会影响到业务,出现比如用户的扣款日没有扣款,导致一些订单违约之类的,总之让用户感知到肯定是不好的。
所以,如果重新部署后恢复执行呢,一个是定时任务的代码也要经过适配,需要确认不会出现重复执行的情况,也就是代码也要有幂等的设计;然后就是多余的系统消耗,之前已经执行了10小时了,快执行完了,你这一打断,又需要重新来了;还有就是最重要的,可能会忘记,自己不经意间打断了正在进行的任务,后面用户投诉过来才想起了,就不好了。
还有就是代码幂等适配这个问题,这种约定俗成的规则在编程语言中也很常见,不过在这里我不太喜欢这种设计,他更多是一种君子协议,万一有人没有遵守,或者说忘了什么的,到时候逻辑就会有问题,既然是写 Java 的,相比 JS,Java 这种强类型要的不就是编译期间确认问题吗,所有我更倾向于定义一个接口之类,强制要求实现一些方法,来实现逻辑的中断和恢复。
所有就有了接下来的想法,能不能设计一套可以中断然后重新恢复的底层框架,让上层应用可以在此之上写业务逻辑,这些逻辑就可以支持随时中断,在后面条件允许的情况下恢复执行。
既然是中断,就需要知道在哪里中断,然后保存上下文,下次从这里恢复。
这里就不得不说下 Java 的 `InterruptedException`,写 Java 的肯定知道 `Threa.sleep(100L)` 这种线程休眠必须要捕获这个异常,因为当前线程运行时可能会被其他线程触发中断,必须要处理中断逻辑。什么时候会触发这个中断呢,比如当你调用线程池的 `shutdownNow` 方法时,当前线程池运行的线程都会被标记为中断,对于此时线程逻辑来说,就应该考虑退出了。
通常,当你运行一个异步任务,任务里涉及到循环或者 IO 这种耗时的工作,都可以考虑适当判断中断,来让系统关闭时及时退出。例如:
```java
for (int i = 0; i < 100000; i++) {
//业务逻辑
if (Thread.currentThread().isInterrupted()) {
// 线程已经被中断,这时候需要清理环境,及时退出
break;
}
}
```
如果没有这种判断的话,当你 kill 这个进程的话,可能就要卡很久,这时候虽然可以强制杀死,但如果数据损坏呢,所以考虑这种情况可以让代码更加健壮。
再说回之前的问题,我一开始想到的有个 Task 接口如下。
```java
interface Task{
void run(Context context);
}
```
`context` 为任务上下文,业务可以把当前执行情况保存到上下文中,如果中途发生了中断,下次恢复执行可以传入相同的上下文,实现任务继续。但是想了想还是太乱。
后面想到了 Kolin,Kolin 的协程可以在指定代码触发中断,然后切换其他协程,后面也可以恢复回来,也是保存了上下文,不过这个就是更底层的保存,我想我如何可以用同样的代码,精准的知道在字节码什么位置中断任务,然后把任务的堆栈之类信息序列化保存,后面需要时再恢复对应堆栈,继续执行,不就可以完美解决了,而且还是几乎业务无感的,只需要在一些位置打上标记即可。
不过这种方法需要修改字节码,还是颇有难度,我又去看了看有没有 Java 的其他协程框架,看到一个很有意思的,不是协程框架,叫 `RxJava`
`RxJava` 可以实现java的流式计算,像是下面这种,
```java
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
```
```java
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
```
```java
import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
```
RxJava 让代码以一个个消费者的形式异步流式执行,可以实现例如上面这些例子,可以让执行代码块在后台执行,然后回到主线程获取结果,这在一些 UI 场景例如安卓可能会有点用;
感觉和我想实现的想法差不多,复杂的逻辑通过这种方式可以拆分成很多子逻辑,他们以 Lambda 块,也就是匿名类的方式编写,也不需要修改字节码,他们本省就是独立的;同过响应式流的方式传递运行数据,也就是上一个代码的执行结果,通过流的参数向下传递,这时候我在框架这边,就可以截胡了,把相应数据进行保存,同时也可以标记当前执行到流的哪一步了,即使是循环这种场景(参考上面的 `range` 例子),我甚至可以判断当前循环到哪里,这时候就可以在任务需要中断的时候,保存这些信息,然后在系统恢复的时候,拿着之前保存的信息,判断执行流的位置,之前已经执行的就跳过,跳到上次中断的地方,继续执行。
对于数据库相关操作,可以强制使用事务来保证多次执行的可靠性,同时可以设置相关回滚的逻辑。
这种想法会对逻辑影响蛮大,之前的业务代码就需要重写。不过我想单独写一个库封装这种操作,对于使用这种方式的任务就是支持中断的,同时之前老写法的类就是不支持中断的。我觉得现在任务还是有挺多这种场景。
现在还是停留在想法这一阶段,我记得安卓也是专门有后台任务这个概念的,系统有资源的时候才会执行,可能以各种情况暂停执行,后面也可以去看看安卓的这个设计是什么东西。

关于定时任务和一些其他异步任务的随想