由于经过 CloudFlare 代理后,Nginx 看到的 remote ip 其实是 CF 的 IP 地址。因此需要通过 Nginx 的 ngx_http_realip_module,还原出真实的客户端 IP。这一步在 CloudFlare 有详细的文档说明。
简单来说,当发现请求 IP 为 CF 的 IP 段时,读取 CF-Connecting-IP 头,并将其中的 IP 设置为客户端 IP。CF 的 IP 段可以通过 API 获取。以下是一个脚本可以生成相关的配置:
#!/bin/bash
# 脚本:获取 Cloudflare IP 列表并生成 Nginx 配置
# 输出文件
IP_CONFIG_FILE="cf_ips.conf"
# 清空输出文件
> "$IP_CONFIG_FILE"
# Cloudflare IP 列表 API 端点
CLOUDFLARE_API_URL="https://api.cloudflare.com/client/v4/ips"
# 生成 set_real_ip_from 配置
echo "# Cloudflare IPv4 IPs" >> "$IP_CONFIG_FILE"
curl -s "$CLOUDFLARE_API_URL" | jq -r '.result.ipv4_cidrs[]' | while read -r ip; do
echo "set_real_ip_from $ip;" >> "$IP_CONFIG_FILE"
done
echo -e "\n# Cloudflare IPv6 IPs" >> "$IP_CONFIG_FILE"
curl -s "$CLOUDFLARE_API_URL" | jq -r '.result.ipv6_cidrs[]' | while read -r ip; do
echo "set_real_ip_from $ip;" >> "$IP_CONFIG_FILE"
done
# 添加 real_ip_header 配置
echo -e "\n# Set real IP header" >> "$IP_CONFIG_FILE"
echo "real_ip_header CF-Connecting-IP;" >> "$IP_CONFIG_FILE"
脚本运行后会生成 cf_ips.conf 文件,内含 set_real_ip_from 和 real_ip_header 配置,在 http 段 include 该文件,即可自动为所有 server 自动设置真实客户端 IP。
限制仅 CloudFlare IP 访问
解决了真实客户端 IP 的问题后,接下来就是如何做到仅限 CF IP 访问。由于该 Nginx 托管了多个 server,因此并不希望直接从防火墙层面拒绝所有非 CF IP 访问。那么从 Nginx 侧,第一时间想到的就是用 ngx_http_access_module 的方式,仅放过 CF IP 段:
通过 gdb 调试发现,qcloud_license_internal_verify 函数的第一个入参就是校验用的 Public Key,打出来长这样:
-----BEGIN PUBLIC KEY-----
MHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEDJcPoMIFyYDlhTUzNSeT/+3ZKcByILoI
Fw8mLv07Hpy2I5qgAGQu66vF3VUjFKgpDDFKVER9jwjjmXUOpoCXX4ynvFUpEM25
ULJE86Z6WjcLqyG03Mv6d3GPYNl/cJYt
-----END PUBLIC KEY-----
function _M.new(self, key, salt, _cipher, _hash, hash_rounds, iv_len, enable_padding)
...
local _hash = _hash or hash.md5
local hash_rounds = hash_rounds or 1
...
if C.EVP_BytesToKey(_cipher.method, _hash, salt, key, #key,
hash_rounds, gen_key, gen_iv)
~= _cipherLength
then
return nil, "failed to generate key and iv"
end
end
为了解决这个问题,我首先想到了设置 http_proxy 等环境变量的方式。但是搜索之后发现,似乎 TeslaMate 并不会读区这几个环境变量,因此无法使用这种方式强制走代理访问相关 API。而且,由于 Tesla 本身的 API 不能走代理访问,必须使用中国 IP 访问,因此如果要使用这种方式的话,还需要配置 Proxy 的分流策略,非常麻烦。
综合考虑各种方案,我最终选择了 sniproxy 的方式。sniproxy 是允许在不解密流量本身的情况下,进行 SSL/TLS 流量转发的方案。使用 sniproxy 时,只需要将目标域名的 IP 强制指向 sniproxy 的 IP 地址(如修改 /etc/hosts),既可以完成代理。
事实上我们可以这样操作,但是由于容器内的二进制文件在主机上并没有,所以 gdb attach 之后会报 No such file,并不能正常工作。就算我们把二进制拷贝出来,放在对应的位置上,gdb 还是会有 warning:
warning: Target and debugger are in different PID namespaces; thread lists and other data are likely unreliable. Connect to gdbserver inside the container.
[2021-04-16 16:14:08.001][|][quartz-scheduler_QuartzSchedulerThread] ERROR o.s.s.q.LocalTaskExecutorThreadPool:runInThread:84 - Task has been rejected by TaskExecutor
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@ef3310e[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 20]] did not accept task: org.quartz.core.JobRunShell@5841bfb6
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
at org.springframework.scheduling.quartz.LocalTaskExecutorThreadPool.runInThread(LocalTaskExecutorThreadPool.java:80)
at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:398)
Caused by: java.util.concurrent.RejectedExecutionException: Task org.quartz.core.JobRunShell@5841bfb6 rejected from java.util.concurrent.ThreadPoolExecutor@ef3310e[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
… 2 common frames omitted
[2021-04-16 16:14:08.001][|][quartz-scheduler_QuartzSchedulerThread] ERROR o.quartz.core.QuartzSchedulerThread:run:403 - ThreadPool.runInThread() return false!
于是查找相关代码,可以看到有一共有四处修改状态为 ERROR 的代码。其中由于 retrieveJob 导致的错误暂时排除,因为 DB 中 Trigger 的数据都是高度相似的。如果是这里出了问题,那么一定是大批量的出现错误,而非个别任务。继续查找,找到如下逻辑:
JobRunShell shell = null;
try {
// 创建触发器执行环境
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 提交执行并检查提交结果
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
// do some process states check
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
// do exception process
continue;
} catch (RuntimeException e) {
// do exception process
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// do some check
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
// do exception process
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
// do some check
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
这段是 Quartz 调度线程的主要逻辑,注意其中的这段:
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
这里可以看到,Quartz 每次从 DB 拉取的任务数量与线程池的可用线程数和一个参数 MaxBatchSize有关。根据查阅到的一些文章,说此处 Quartz 会每次拉取线程数个任务。因此怀疑这里是由于工作线程池的线程数量过大,导致任务都被一台机器拉取,导致的负载不均衡。于是尝试调小线程池的参数,来缓解拉取任务过多的问题。这里要说下,这个怀疑是错误的!让我误入歧途。具体原因会在后面说明。
同步加锁延迟。当 Quartz 使用DB作为集群持久化存储的场景下,为了保证调度秩序,避免重复调度或漏调度,每一次 DB 操作都要进行加锁。而每次调度时,需要先从 DB 读取要触发的 Trigger 信息并更新状态为 ACQUIRED,然后在将要触发时更新状态到 EXECUTING,之后才会向工作线程池提交任务。虽然在读取 Trigger 信息的时候大部分操作都不是批量进行的,但是加锁操作是针对一批任务的。考虑到正常的 DB 读写耗时在 5~10ms 左右,如果不进行批量操作的话,按照 50 个任务一批计算,每一批的加锁时间就白白增加了 250~500ms。因此,进行批量拉取可以有效的减小 DB 加锁操作带来的调度延迟。同时,考虑到锁的竞争会导致线程等待,批量加锁操作还可以有效的降低锁争抢,提升整体效率。如果业务场景对调度延迟比较敏感,应考虑使用性能更好的独立 DB 实例,并尽量降低 Quartz 实例到 DB 之间的网络延迟;
CPU调度延迟。当调度线程将任务提交给工作线程池后,工作线程池中的线程最终还需要依赖 CPU 时间片来执行,若调度机器的 CPU 负载较高,或线程池数量不合理,那么线程在真正被 CPU 执行前将会有较大等待。为了尽量减少这里的执行延迟,应将工作线程池部署在 CPU 核数较多且负载不高的机器上。同时应该合理设置工作线程数量,避免一次性触发过多任务。