ScheduledExecutorService 一个小坑

今天在排查一个线上问题是发现一个使用 ScheduledExecutorService 执行的定时任务在执行了一次以后就再也没有执行过了。于是 Dump 了内存来检查问题。

首先,搜索对应 Task 的类,发现在堆中找不到这个类的实例。可是明明已经成功执行了一次,为何没有实例?

于是再去找 ScheduledExecutorService  对应的 ScheduledThreadPoolExecutor  类,成功筛选出了用来执行定时任务的实例。在实例的 queue 中,却只看到了 6 个 Task 对象,唯独不见了这个出问题的对象。百思不得解,因为日志中这个对象的 Logger 已经打印出来了,说明至少执行了一次,为啥会从内存中消失呢?

在同事的帮助下,查阅了 API 文档,发现了这么一句话:

If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

注意通常的理解,这里的 suppressed 意思应该为抑制、压制,一般意义上理解为可能是说降低频率啊权重啊什么的。可是实际上,这里使用 stoped 更合适。从表现上看,你的 Task 只要出现了异常,就会被彻底扔掉,再也不会执行。

下面给出一个网上同样问题的复现代码:

import java.util.concurrent.Executors;

public class BadAssTask implements Runnable {

        @Override
        public void run() {
                System.out.println("Sleeping ...");
                try {
                        Thread.sleep(100);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                System.out.println("Throwing ... ");
                throw new RuntimeException("bad ass!");
        }

        public static void main(String[] args) {
                Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new BadAssTask(), 1, 1, TimeUnit.SECONDS);
        }

}

当我们注释掉 throw new RuntimeException(“bad ass!”);  的时候,可以看到每个 0.1s 会有一行 Sleeping … 输出。当开启注释时,Throwing … 之后再也没有 Sleeping 输出了。

于是来查看对应的代码,在 ScheduledThreadPoolExecutor  的代码中看到执行 Task 的实际调用方法为:

/**
 * Overrides FutureTask version so as to reset/requeue if periodic.
 */
public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

注意最后一个 if 语句。当 Task 执行出错的时候, runAndReset  的返回值为 False,所以 if 里面的内容不会执行,因此这个 task 就不会被放回队列,也就再也不会被执行了。

runAndReset 方法的代码如下:

/**
 * Executes the computation without setting its result, and then
 * resets this future to initial state, failing to do so if the
 * computation encounters an exception or is cancelled.  This is
 * designed for use with tasks that intrinsically execute more
 * than once.
 *
 * @return {@code true} if successfully run and reset
 */
protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

注意其中的 setException 方法。这个方法会把 state 设置成出现异常的状态:

/**
 * Causes this future to report an {@link ExecutionException}
 * with the given throwable as its cause, unless this future has
 * already been set or has been cancelled.
 *
 * 

This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }

于是在 runAndReset 最后的判断中,s == NEW 不成立,于是返回 False 了。

参考:

http://code.nomad-labs.com/2011/12/09/mother-fk-the-scheduledexecutorservice/

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleAtFixedRate-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-

记一次 JNI 导致 Java fd 泄露的排查过程

前几天接到反馈,线上某机器上的服务在进行后端调用的时候失败了。查看日志是端口分配失败。通过 netstat -nulp  看到大量端口占用,用户段端口49152 到 65535 全部被占满。于是通过 awk sort 和 uniq 统计出每个进程的端口占用情况,发现某些 Java 服务占用了 2w+ 端口,于是对该服务展开分析。

首先考虑的是应用有大量 Socket 对象没有关闭释放,于是将堆 dump 出来,使用 VisualVM 加载分析。由于泄露的是 UDP 端口,于是考虑查找 Java 中 UDP socket 对应的对象 DatagramSocket 。可是一顿操作之后发现堆中并不存在该类对象,倒是找到了几个 DatagramSocketAdaptor  对象。查看了框架发现使用了 Netty 做 NIO,因此实际使用的是 NioDatagramChannel 和 DatagramSocketAdaptor 对象。经过对比,这些对象都是应用启动时创建 Netty channel 时创建的,数量完全正确,所以排除这方面的问题。

接着考虑 Netty 版本和 JVM 版本的问题。通过搜索发现 Netty 和 JVM 都出现过 fd 没有正确关闭的问题,于是进行升级。升级后发现问题依然存在,排除相关组件的问题。

在排除了几个可能之后,还是没有什么头绪,于是准备从 FD 本身入手。在 Java 中,所有的系统 FD 对应的都是一个 Java 中的 FileDescriptor  类,于是使用各种工具对该类的创建和删除进行了监控。结果通过排查发现,除了第一步中查到的 Netty Channel 持有的 fd 实例外,没有任何的有关网络 Socket 的 fd 实例。这下就很尴尬了,JVM 中的 fd 没有变化,按理说应该不会有 Socket 创建才对。仔细思考了一下,想到了最后一种可能,那就是 native 方法。

在 Java 中,为了支持调用其它语言所编写的方法,提供了 Java Native Interface 即 JNI 作为入口。当我们需要调用一个闭源的 C++ 编写的类库的时候,我们就可以使用 JNI 来进行调用。同时,由于 JNI 实际执行的是第三方类库中的代码,因此这部分代码进行的 fd 操作都不会被 JVM 管理,自然也就不会出现在 Dump 文件中。

既然猜到了可能的问题,接下来就需要排查了。可是由于一些原因,该服务中存在很多的 JNI 类库都会进行网络调用,无法最终确定。于是想了这么一个办法来进行排查:

  1. 在机器上起一个程序不断的对 lsof -a -n -P -i udp -p pid  进行采样对比,遇到两次采样期间有新增的 fd 就打印出来
  2. 同时使用 tcpdump -i any udp -w fd_leak.pcap  进行全量抓包,记录机器上所有的 UDP 网络流量,来对比分析流量发出的进程

经过排查,终于抓到了对应的包,找到了对端的 IP 端口,定位到了对应的组件。于是对这个组件编写复现代码进行测试,最终将代码简化为:

package com.maoxian.test;

import com.maoxian.xxx.NativeAPI;

/**
 * 测试类
 */
public class Test {
    public static void main(String[] args) {
        Thread.sleep(10000);
        int i = 0;
        while (i < 10) {
            i++;
            System.out.println("Start loop " + j + " ...");
            new Thread() {

                @Override
                public void run() {
                    try {
                        System.out.println("Thread " +
                                Thread.currentThread().getId() +
                                " is running...");


                        NativeAPI.doSomething();
                    } finally {
                        System.out.println("Thread " +
                                Thread.currentThread().getId() +
                                " finished.");
                    }
                }
            }.start();
            System.out.println("Finish loop " + j + ".");
        }
        Thread.sleep(60000);
    }
}

这段代码的作用非常简单,就是起 10 个线程去调用这个组件,每个线程调用一次就退出。Sleep 的作用是给 lsof 列出 fd 预留时间。在执行这段代码后可以明显的看到 fd 在 Native 方法调用时分配,但是在线程退出后没有释放。咨询了相关同事后得知,由于在 C++ 中,一般不会使用 Java 中的这种线程池模型,而是使用固定线程模型。当一个线程退出的时候通常意味着整个程序的退出。所以这个组件在制作的时候只考虑了线程安全的问题对每个线程独立分配了资源,但是没有考虑线程终止时候的资源释放

在定位到 fd 泄露与线程创建相关后,对相应的业务框架代码进行了分析。发现框架中创建线程池使用的是:

this.tasks = Executors.newCachedThreadPool(new ThreadFactory() {
    final AtomicInteger TID = new AtomicInteger(0);
    final String GROUP_NAME = "TPWS-" + GID.getAndIncrement();
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, GROUP_NAME + TID.getAndIncrement());
    }
});

查看 Executors 的创建线程池的方法:

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available, and uses the provided
 * ThreadFactory to create new threads when needed.
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created thread pool
 * @throws NullPointerException if threadFactory is null
 */
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue(),
                                  threadFactory);
}

可以看出,这个方法最终创建了一个最小 0 个线程,最大 Integer.MAX_VALUE 个线程,每个线程 60s 超时时间。于是,当一个业务请求到来时,线程池创建一个新线程处理这个请求,在请求中调用了 Native 方法。当请求处理完后,线程归还线程池。60s 内如果没有新的请求进入,则该线程被线程池销毁,但是 Native 申请的 fd 没有释放。当新的请求到来时,又重新创建线程,重新分配 fd,以此类推,导致 fd 泄露。

这次排查的经理告诉我们,当 Java 调用 Native 方法的时候一定要格外小心。由于虚拟机无法对大多数 Native 方法占用的资源进行管理,因此编写质量差的 Native 类库会直接导致不可预知的奇怪问题。特别是由于 C++ 在多线程、多进程模型上与 Java 有诸多不同,很多库在设计时就没有考虑全面,直接在 Java 内调用的时候可能就会出现在 C++ 中不会遇到的问题。