由于经过 CloudFlare 代理后,Nginx 看到的 remote ip 其实是 CF 的 IP 地址。因此需要通过 Nginx 的 ngx_http_realip_module,还原出真实的客户端 IP。这一步在 CloudFlare 有详细的文档说明。
简单来说,当发现请求 IP 为 CF 的 IP 段时,读取 CF-Connecting-IP 头,并将其中的 IP 设置为客户端 IP。CF 的 IP 段可以通过 API 获取。以下是一个脚本可以生成相关的配置:
#!/bin/bash
# 脚本:获取 Cloudflare IP 列表并生成 Nginx 配置
# 输出文件
IP_CONFIG_FILE="cf_ips.conf"
# 清空输出文件
> "$IP_CONFIG_FILE"
# Cloudflare IP 列表 API 端点
CLOUDFLARE_API_URL="https://api.cloudflare.com/client/v4/ips"
# 生成 set_real_ip_from 配置
echo "# Cloudflare IPv4 IPs" >> "$IP_CONFIG_FILE"
curl -s "$CLOUDFLARE_API_URL" | jq -r '.result.ipv4_cidrs[]' | while read -r ip; do
echo "set_real_ip_from $ip;" >> "$IP_CONFIG_FILE"
done
echo -e "\n# Cloudflare IPv6 IPs" >> "$IP_CONFIG_FILE"
curl -s "$CLOUDFLARE_API_URL" | jq -r '.result.ipv6_cidrs[]' | while read -r ip; do
echo "set_real_ip_from $ip;" >> "$IP_CONFIG_FILE"
done
# 添加 real_ip_header 配置
echo -e "\n# Set real IP header" >> "$IP_CONFIG_FILE"
echo "real_ip_header CF-Connecting-IP;" >> "$IP_CONFIG_FILE"
脚本运行后会生成 cf_ips.conf 文件,内含 set_real_ip_from 和 real_ip_header 配置,在 http 段 include 该文件,即可自动为所有 server 自动设置真实客户端 IP。
限制仅 CloudFlare IP 访问
解决了真实客户端 IP 的问题后,接下来就是如何做到仅限 CF IP 访问。由于该 Nginx 托管了多个 server,因此并不希望直接从防火墙层面拒绝所有非 CF IP 访问。那么从 Nginx 侧,第一时间想到的就是用 ngx_http_access_module 的方式,仅放过 CF IP 段:
通过 gdb 调试发现,qcloud_license_internal_verify 函数的第一个入参就是校验用的 Public Key,打出来长这样:
-----BEGIN PUBLIC KEY-----
MHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEDJcPoMIFyYDlhTUzNSeT/+3ZKcByILoI
Fw8mLv07Hpy2I5qgAGQu66vF3VUjFKgpDDFKVER9jwjjmXUOpoCXX4ynvFUpEM25
ULJE86Z6WjcLqyG03Mv6d3GPYNl/cJYt
-----END PUBLIC KEY-----
function _M.new(self, key, salt, _cipher, _hash, hash_rounds, iv_len, enable_padding)
...
local _hash = _hash or hash.md5
local hash_rounds = hash_rounds or 1
...
if C.EVP_BytesToKey(_cipher.method, _hash, salt, key, #key,
hash_rounds, gen_key, gen_iv)
~= _cipherLength
then
return nil, "failed to generate key and iv"
end
end
[2021-04-16 16:14:08.001][|][quartz-scheduler_QuartzSchedulerThread] ERROR o.s.s.q.LocalTaskExecutorThreadPool:runInThread:84 - Task has been rejected by TaskExecutor
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@ef3310e[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 20]] did not accept task: org.quartz.core.JobRunShell@5841bfb6
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
at org.springframework.scheduling.quartz.LocalTaskExecutorThreadPool.runInThread(LocalTaskExecutorThreadPool.java:80)
at org.quartz.core.QuartzSchedulerThread.run(QuartzSchedulerThread.java:398)
Caused by: java.util.concurrent.RejectedExecutionException: Task org.quartz.core.JobRunShell@5841bfb6 rejected from java.util.concurrent.ThreadPoolExecutor@ef3310e[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 20]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
… 2 common frames omitted
[2021-04-16 16:14:08.001][|][quartz-scheduler_QuartzSchedulerThread] ERROR o.quartz.core.QuartzSchedulerThread:run:403 - ThreadPool.runInThread() return false!
于是查找相关代码,可以看到有一共有四处修改状态为 ERROR 的代码。其中由于 retrieveJob 导致的错误暂时排除,因为 DB 中 Trigger 的数据都是高度相似的。如果是这里出了问题,那么一定是大批量的出现错误,而非个别任务。继续查找,找到如下逻辑:
JobRunShell shell = null;
try {
// 创建触发器执行环境
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 提交执行并检查提交结果
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
@Override
public void run() {
int acquiresFailed = 0;
while (!halted.get()) {
try {
// do some process states check
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
// do exception process
continue;
} catch (RuntimeException e) {
// do exception process
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// do some check
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
// do exception process
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
// do some check
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
这段是 Quartz 调度线程的主要逻辑,注意其中的这段:
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
这里可以看到,Quartz 每次从 DB 拉取的任务数量与线程池的可用线程数和一个参数 MaxBatchSize有关。根据查阅到的一些文章,说此处 Quartz 会每次拉取线程数个任务。因此怀疑这里是由于工作线程池的线程数量过大,导致任务都被一台机器拉取,导致的负载不均衡。于是尝试调小线程池的参数,来缓解拉取任务过多的问题。这里要说下,这个怀疑是错误的!让我误入歧途。具体原因会在后面说明。
同步加锁延迟。当 Quartz 使用DB作为集群持久化存储的场景下,为了保证调度秩序,避免重复调度或漏调度,每一次 DB 操作都要进行加锁。而每次调度时,需要先从 DB 读取要触发的 Trigger 信息并更新状态为 ACQUIRED,然后在将要触发时更新状态到 EXECUTING,之后才会向工作线程池提交任务。虽然在读取 Trigger 信息的时候大部分操作都不是批量进行的,但是加锁操作是针对一批任务的。考虑到正常的 DB 读写耗时在 5~10ms 左右,如果不进行批量操作的话,按照 50 个任务一批计算,每一批的加锁时间就白白增加了 250~500ms。因此,进行批量拉取可以有效的减小 DB 加锁操作带来的调度延迟。同时,考虑到锁的竞争会导致线程等待,批量加锁操作还可以有效的降低锁争抢,提升整体效率。如果业务场景对调度延迟比较敏感,应考虑使用性能更好的独立 DB 实例,并尽量降低 Quartz 实例到 DB 之间的网络延迟;
CPU调度延迟。当调度线程将任务提交给工作线程池后,工作线程池中的线程最终还需要依赖 CPU 时间片来执行,若调度机器的 CPU 负载较高,或线程池数量不合理,那么线程在真正被 CPU 执行前将会有较大等待。为了尽量减少这里的执行延迟,应将工作线程池部署在 CPU 核数较多且负载不高的机器上。同时应该合理设置工作线程数量,避免一次性触发过多任务。
注意 DatabaseName 替换为创建 DB 时设置的名称。就是 Lightsail 控制面板里显示的那个。
执行成功后打开 json 文件,可以看到所有变量。
注意每个变量有几个属性:
Allowed values 允许的变量范围
Apply method 变量的生效时间。immediate 表示立即生效,pending-reboot 表示重启后生效
Apply type 底层引擎支持的生效方式。dynamic 动态,可以立即生效,static 静态,必须重启后才能生效
Data type 数据类型
Description 变量描述
Is modifiable 能否修改
Parameter name 变量名
这里我们用最大连接数举例:
{
"allowedValues": "1-100000",
"applyMethod": "pending-reboot",
"applyType": "dynamic",
"dataType": "integer",
"description": "The number of simultaneous client connections allowed.",
"isModifiable": true,
"parameterName": "max_connections",
"parameterValue": "{DBInstanceClassMemory/12582880}"
}
PARTITION BY RANGE COLUMNS(`a`, `b`, `c`) (
PARTITION p1 VALUES LESS THAN (0, 0, MAXVALUE),
PARTITION p2 VALUES LESS THAN (10, 10, MAXVALUE),
PARTITION p3 VALUES LESS THAN (20, 20, MAXVALUE)
)
CREATE TABLE `test_table` (
`a` INT(20) NOT NULL,
`b` INT(11) NOT NULL
) ENGINE=INNODB DEFAULT CHARSET=UTF8MB4
PARTITION BY RANGE COLUMNS(`a`, `b`) (
PARTITION p1 VALUES LESS THAN (0, 0),
PARTITION p2 VALUES LESS THAN (10, 10),
PARTITION p3 VALUES LESS THAN (20, 20)
);
INSERT INTO `test_table` VALUES (10,10);
INSERT INTO `test_table` VALUES (10,9);
INSERT INTO `test_table` VALUES (9,11);
执行之后发现,第一条记录毫不意外的在 p3 ,但是第二条记录和第三条记录却都在 p2 !
那么这时候执行查询会发生什么呢?
mysql> SELECT * FROM `test_table`;
+----+----+
| a | b |
+----+----+
| 10 | 9 |
| 9 | 11 |
| 10 | 10 |
+----+----+
3 rows in set (0.00 sec)
mysql> EXPLAIN PARTITIONS SELECT * FROM `test_table` WHERE a=10;
+------+-------------+------------+------------+------+---------------+------+---------+------+------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra |
+------+-------------+------------+------------+------+---------------+------+---------+------+------+-------------+
| 1 | SIMPLE | test_table | p2,p3 | ALL | NULL | NULL | NULL | NULL | 3 | Using where |
+------+-------------+------------+------------+------+---------------+------+---------+------+------+-------------+
1 row in set (0.00 sec)
mysql> EXPLAIN PARTITIONS SELECT * FROM `test_table` WHERE b=10;
+------+-------------+------------+------------+------+---------------+------+---------+------+------+-------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | Extra |
+------+-------------+------------+------------+------+---------------+------+---------+------+------+-------------+
| 1 | SIMPLE | test_table | p1,p2,p3 | ALL | NULL | NULL | NULL | NULL | 5 | Using where |
+------+-------------+------------+------------+------+---------------+------+---------+------+------+-------------+
1 row in set (0.00 sec)