[2021-04-16 16:14:08.001][|][quartz-scheduler_QuartzSchedulerThread] ERROR o.s.s.q.LocalTaskExecutorThreadPool:runInThread:84 - Task has been rejected by TaskExecutor
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@ef3310e[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 20]] did not accept task: org.quartz.core.JobRunShell@5841bfb6
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
at org.springframework.scheduling.quartz.LocalTaskExecutorThreadPool.runInThread(LocalTaskExecutorThreadPool.java:80)
at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:398)
Caused by: java.util.concurrent.RejectedExecutionException: Task org.quartz.core.JobRunShell@5841bfb6 rejected from java.util.concurrent.ThreadPoolExecutor@ef3310e[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
… 2 common frames omitted
[2021-04-16 16:14:08.001][|][quartz-scheduler_QuartzSchedulerThread] ERROR o.quartz.core.QuartzSchedulerThread:run:403 - ThreadPool.runInThread() return false!
于是查找相关代码,可以看到有一共有四处修改状态为 ERROR 的代码。其中由于 retrieveJob 导致的错误暂时排除,因为 DB 中 Trigger 的数据都是高度相似的。如果是这里出了问题,那么一定是大批量的出现错误,而非个别任务。继续查找,找到如下逻辑:
JobRunShell shell = null;
try {
// 创建触发器执行环境
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 提交执行并检查提交结果
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
// do some process states check
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
// do exception process
continue;
} catch (RuntimeException e) {
// do exception process
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// do some check
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
// do exception process
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
// do some check
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
这段是 Quartz 调度线程的主要逻辑,注意其中的这段:
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
这里可以看到,Quartz 每次从 DB 拉取的任务数量与线程池的可用线程数和一个参数 MaxBatchSize有关。根据查阅到的一些文章,说此处 Quartz 会每次拉取线程数个任务。因此怀疑这里是由于工作线程池的线程数量过大,导致任务都被一台机器拉取,导致的负载不均衡。于是尝试调小线程池的参数,来缓解拉取任务过多的问题。这里要说下,这个怀疑是错误的!让我误入歧途。具体原因会在后面说明。
同步加锁延迟。当 Quartz 使用DB作为集群持久化存储的场景下,为了保证调度秩序,避免重复调度或漏调度,每一次 DB 操作都要进行加锁。而每次调度时,需要先从 DB 读取要触发的 Trigger 信息并更新状态为 ACQUIRED,然后在将要触发时更新状态到 EXECUTING,之后才会向工作线程池提交任务。虽然在读取 Trigger 信息的时候大部分操作都不是批量进行的,但是加锁操作是针对一批任务的。考虑到正常的 DB 读写耗时在 5~10ms 左右,如果不进行批量操作的话,按照 50 个任务一批计算,每一批的加锁时间就白白增加了 250~500ms。因此,进行批量拉取可以有效的减小 DB 加锁操作带来的调度延迟。同时,考虑到锁的竞争会导致线程等待,批量加锁操作还可以有效的降低锁争抢,提升整体效率。如果业务场景对调度延迟比较敏感,应考虑使用性能更好的独立 DB 实例,并尽量降低 Quartz 实例到 DB 之间的网络延迟;
CPU调度延迟。当调度线程将任务提交给工作线程池后,工作线程池中的线程最终还需要依赖 CPU 时间片来执行,若调度机器的 CPU 负载较高,或线程池数量不合理,那么线程在真正被 CPU 执行前将会有较大等待。为了尽量减少这里的执行延迟,应将工作线程池部署在 CPU 核数较多且负载不高的机器上。同时应该合理设置工作线程数量,避免一次性触发过多任务。
注意 DatabaseName 替换为创建 DB 时设置的名称。就是 Lightsail 控制面板里显示的那个。
执行成功后打开 json 文件,可以看到所有变量。
注意每个变量有几个属性:
Allowed values 允许的变量范围
Apply method 变量的生效时间。immediate 表示立即生效,pending-reboot 表示重启后生效
Apply type 底层引擎支持的生效方式。dynamic 动态,可以立即生效,static 静态,必须重启后才能生效
Data type 数据类型
Description 变量描述
Is modifiable 能否修改
Parameter name 变量名
这里我们用最大连接数举例:
{
"allowedValues": "1-100000",
"applyMethod": "pending-reboot",
"applyType": "dynamic",
"dataType": "integer",
"description": "The number of simultaneous client connections allowed.",
"isModifiable": true,
"parameterName": "max_connections",
"parameterValue": "{DBInstanceClassMemory/12582880}"
}