今天在排查一个线上问题是发现一个使用 ScheduledExecutorService 执行的定时任务在执行了一次以后就再也没有执行过了。于是 Dump 了内存来检查问题。
首先,搜索对应 Task 的类,发现在堆中找不到这个类的实例。可是明明已经成功执行了一次,为何没有实例?
于是再去找 ScheduledExecutorService 对应的 ScheduledThreadPoolExecutor 类,成功筛选出了用来执行定时任务的实例。在实例的 queue 中,却只看到了 6 个 Task 对象,唯独不见了这个出问题的对象。百思不得解,因为日志中这个对象的 Logger 已经打印出来了,说明至少执行了一次,为啥会从内存中消失呢?
在同事的帮助下,查阅了 API 文档,发现了这么一句话:
If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.
注意通常的理解,这里的 suppressed 意思应该为抑制、压制,一般意义上理解为可能是说降低频率啊权重啊什么的。可是实际上,这里使用 stoped 更合适。从表现上看,你的 Task 只要出现了异常,就会被彻底扔掉,再也不会执行。
下面给出一个网上同样问题的复现代码:
import java.util.concurrent.Executors; public class BadAssTask implements Runnable { @Override public void run() { System.out.println("Sleeping ..."); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Throwing ... "); throw new RuntimeException("bad ass!"); } public static void main(String[] args) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new BadAssTask(), 1, 1, TimeUnit.SECONDS); } }
当我们注释掉 throw new RuntimeException(“bad ass!”); 的时候,可以看到每个 0.1s 会有一行 Sleeping … 输出。当开启注释时,Throwing … 之后再也没有 Sleeping 输出了。
于是来查看对应的代码,在 ScheduledThreadPoolExecutor 的代码中看到执行 Task 的实际调用方法为:
/** * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
注意最后一个 if 语句。当 Task 执行出错的时候, runAndReset 的返回值为 False,所以 if 里面的内容不会执行,因此这个 task 就不会被放回队列,也就再也不会被执行了。
runAndReset 方法的代码如下:
/** * Executes the computation without setting its result, and then * resets this future to initial state, failing to do so if the * computation encounters an exception or is cancelled. This is * designed for use with tasks that intrinsically execute more * than once. * * @return {@code true} if successfully run and reset */ protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callablec = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
注意其中的 setException 方法。这个方法会把 state 设置成出现异常的状态:
/** * Causes this future to report an {@link ExecutionException} * with the given throwable as its cause, unless this future has * already been set or has been cancelled. * *This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
于是在 runAndReset 最后的判断中,s == NEW 不成立,于是返回 False 了。
参考:
http://code.nomad-labs.com/2011/12/09/mother-fk-the-scheduledexecutorservice/
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleAtFixedRate-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-