写在前面的话

  最近一直都在研究Java的线程池ThreadPoolExecutor,但是虽然它那么好,但是在实际的用途中怎么去用,对于我来说就不知道如何下手了,还好有开源社区我们可以了解很多项目中所运用到的线程池,比如最熟悉的就是Apache Tomcat了,相信都对它不默生,一个Apache软件基金下的一个开源Web容器,所以今天就来聊一下Tomcat的线程池实现。

准备工作

  首先去Apache Tomcat的官网下载Tomcat的源代码,这里给出Tomcat源码链接,下载下来之后,它是一个zip文件,需要把它进行解压到相应的文件夹下面,以便我们能方便的查看其源代码。分析源码最行之有效的方法就是知道这个类有哪些方法,哪些字段,继承了哪些类,实现了哪些接口,所以我们这里推荐一款UML工具, astah-professional,可自行下载安装,这是一个收费软件,但是它有50天的试用期,所以我们可以以使用的身份使用该软件。准备工作做好之后就可以进行下一步的操作了。

初探Tomcat线程池

  Tomcat的线程池的类文件在../apache-tomcat-7.0.72-src\java\org\apache\catalina\core包下面,定位到这个文件夹下面可以看到StandardThreadExecutor.java就是我们找寻的类了,用文本工具打开就可以查看其源码了。这里源码如下:
StandardThreadExecutor.java

public class StandardThreadExecutor extends LifecycleMBeanBase
        implements Executor, ResizableExecutor {
    //默认线程的优先级
    protected int threadPriority = Thread.NORM_PRIORITY;
    //守护线程
    protected boolean daemon = true;
    //线程名称的前缀
    protected String namePrefix = "tomcat-exec-";
    //最大线程数默认200个
    protected int maxThreads = 200;
    //最小空闲线程25个
    protected int minSpareThreads = 25;
    //超时时间为6000
    protected int maxIdleTime = 60000;
    //线程池容器
    protected ThreadPoolExecutor executor = null;
    //线程池的名称
    protected String name;
     //是否提前启动线程
    protected boolean prestartminSpareThreads = false;
    //队列最大大小
    protected int maxQueueSize = Integer.MAX_VALUE;
    //为了避免在上下文停止之后,所有的线程在同一时间段被更新,所以进行线程的延迟操作
    protected long threadRenewalDelay = 1000L;
    //任务队列
    private TaskQueue taskqueue = null;

    //容器启动时进行,具体可参考org.apache.catalina.util.LifecycleBase#startInternal()
    @Override
    protected void startInternal() throws LifecycleException {
        //实例化任务队列
        taskqueue = new TaskQueue(maxQueueSize);
        //自定义的线程工厂类,实现了JDK的ThreadFactory接口
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
        //这里的ThreadPoolExecutor是tomcat自定义的,不是JDK的ThreadPoolExecutor
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
        executor.setThreadRenewalDelay(threadRenewalDelay);
        //是否提前启动线程,如果为true,则提前初始化minSpareThreads个的线程,放入线程池内
        if (prestartminSpareThreads) {
            executor.prestartAllCoreThreads();
        }
        //设置任务容器的父级线程池对象
        taskqueue.setParent(executor);
        //设置容器启动状态
        setState(LifecycleState.STARTING);
    }

  //容器停止时的生命周期方法,进行关闭线程池和资源清理
    @Override
    protected void stopInternal() throws LifecycleException {

        setState(LifecycleState.STOPPING);
        if ( executor != null ) executor.shutdownNow();
        executor = null;
        taskqueue = null;
    }

    //这个执行线程方法有超时的操作,参考org.apache.catalina.Executor接口
    @Override
    public void execute(Runnable command, long timeout, TimeUnit unit) {
        if ( executor != null ) {
            executor.execute(command,timeout,unit);
        } else { 
            throw new IllegalStateException("StandardThreadExecutor not started.");
        }
    }

    //JDK默认操作线程的方法,参考java.util.concurrent.Executor接口
    @Override
    public void execute(Runnable command) {
        if ( executor != null ) {
            try {
                executor.execute(command);
            } catch (RejectedExecutionException rx) {
                //there could have been contention around the queue
                if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
            }
        } else throw new IllegalStateException("StandardThreadPool not started.");
    }

    //由于继承了org.apache.tomcat.util.threads.ResizableExecutor接口,所以可以重新定义线程池的大小
    @Override
    public boolean resizePool(int corePoolSize, int maximumPoolSize) {
        if (executor == null)
            return false;

        executor.setCorePoolSize(corePoolSize);
        executor.setMaximumPoolSize(maximumPoolSize);
        return true;
    }
}

  看完了上面的源码之后,不知此刻的你是一面茫然还是认为小菜一碟呢,不管怎样,我们先来看下UML类图吧,了解一下具体的继承关系,你就明白了,废话不多说,能用图片解决的东西尽量少用文字。

StandardThreadExecutor类继承关系

  接下来,我们来看一下ResizableExecutor这个接口:

import java.util.concurrent.Executor;

public interface ResizableExecutor extends Executor {

    /**
     * Returns the current number of threads in the pool.
     *
     * @return the number of threads
     */
    public int getPoolSize();

    public int getMaxThreads();

    /**
     * Returns the approximate number of threads that are actively executing
     * tasks.
     *
     * @return the number of threads
     */
    public int getActiveCount();

    public boolean resizePool(int corePoolSize, int maximumPoolSize);

    public boolean resizeQueue(int capacity);

}

  实现这个接口之后,就能动态改变线程池的大小和任务队列的大小了,它是继承自JDK的Executor接口的,其它的接口不再多说,可自行查看源码。

Tomcat线程池的实现

  Tomcat的线程池的名字也叫作ThreadPoolExecutor,刚开始看源代码的时候还以为是使用了JDK的ThreadPoolExecutor了呢,后面仔细查看才知道是Tomcat自己实现的一个ThreadPoolExecutor,不过基本上都差不多,都是在JDK之上封装了一些自己的东西,上源码:

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
    protected static final StringManager sm = StringManager
            .getManager("org.apache.tomcat.util.threads.res");
    /**
     *  已经提交但尚未完成的任务数量。
     *  这包括已经在队列中的任务和已经交给工作线程的任务但还未开始执行的任务
     *  这个数字总是大于getActiveCount()的
     **/
    private final AtomicInteger submittedCount = new AtomicInteger(0);
    private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
    /**
    *  最近的时间在ms时,一个线程决定杀死自己来避免
    *  潜在的内存泄漏。 用于调节线程的更新速率。
    */
    private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
    //延迟2个线程之间的延迟。 如果为负,不更新线程。
    private long threadRenewalDelay = 1000L;
    //4个构造方法  ... 省略

    public long getThreadRenewalDelay() {
        return threadRenewalDelay;
    }

    public void setThreadRenewalDelay(long threadRenewalDelay) {
        this.threadRenewalDelay = threadRenewalDelay;
    }

    /**
    *  方法在完成给定Runnable的执行时调用。
    *  此方法由执行任务的线程调用。 如果
    *  非null,Throwable是未捕获的{@code RuntimeException}
    *  或{@code Error},导致执行突然终止。...
    *  @param r 已完成的任务
    *  @param t 引起终止的异常,如果执行正常完成则为null
    **/
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedCount.decrementAndGet();

        if (t == null) {
            stopCurrentThreadIfNeeded();
        }
    }

    //如果当前线程在上一次上下文停止之前启动,则抛出异常,以便停止当前线程。
    protected void stopCurrentThreadIfNeeded() {
        if (currentThreadShouldBeStopped()) {
            long lastTime = lastTimeThreadKilledItself.longValue();
            if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
                if (lastTimeThreadKilledItself.compareAndSet(lastTime,
                        System.currentTimeMillis() + 1)) {
                    // OK, it's really time to dispose of this thread

                    final String msg = sm.getString(
                                    "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
                                    Thread.currentThread().getName());

                    throw new StopPooledThreadException(msg);
                }
            }
        }
    }

    //当前线程是否需要被终止
    protected boolean currentThreadShouldBeStopped() {
        if (threadRenewalDelay >= 0
            && Thread.currentThread() instanceof TaskThread) {
            TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
            //线程创建的时间<上下文停止的时间,则可以停止该线程
            if (currentTaskThread.getCreationTime() <
                    this.lastContextStoppedTime.longValue()) {
                return true;
            }
        }
        return false;
    }

    public int getSubmittedCount() {
        return submittedCount.get();
    }

    @Override
    public void execute(Runnable command) {
        execute(command,0,TimeUnit.MILLISECONDS);
    }

    public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    if (!queue.force(command, timeout, unit)) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    Thread.interrupted();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

    public void contextStopping() {
        this.lastContextStoppedTime.set(System.currentTimeMillis());
        int savedCorePoolSize = this.getCorePoolSize();
        TaskQueue taskQueue =
                getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
        if (taskQueue != null) {
            taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
        }

        // setCorePoolSize(0) wakes idle threads
        this.setCorePoolSize(0);

        if (taskQueue != null) {
            // ok, restore the state of the queue and pool
            taskQueue.setForcedRemainingCapacity(null);
        }
        this.setCorePoolSize(savedCorePoolSize);
    }
}

Tomcat的线程池根据文档来说:和java.util.concurrent一样,但是它实现了一个高效的方法getSubmittedCount()方法用来处理工作队列。具体可以查看上面的注释和源码就知道了。把UML图献上。

ThreadPoolExecutor

Tomcat线程工厂

  想要自定义线程工厂类,只需要实现JDK的ThreadFactory接口就可以了,我们来看看Tomcat是如何实现的吧:

public class TaskThreadFactory implements ThreadFactory {
    //线程组
    private final ThreadGroup group;
    //线程增长因子
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    //名称前缀
    private final String namePrefix;
    //是否是守护线程
    private final boolean daemon;
    //线程优先级
    private final int threadPriority;
    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.threadPriority = priority;
    }

    @Override
    public Thread newThread(Runnable r) {
        TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
        t.setDaemon(daemon);
        t.setPriority(threadPriority);
        return t;
    }

}

Tomcat的线程工厂类和JDK实现的线程工厂类相差无几,具体可以参考一下JDK线程工厂Executors.DefaultThreadFactory工厂类的实现。

Tomcat的线程类

  Tomcat自己定义了TaskThread用于线程的执行,里面增加了creationTime字段用于定义线程创建的开始时间,以便后面线程池获取这个时间来进行优化。

/**
 * 一个实现创建时间纪录的线程类
 */
public class TaskThread extends Thread {

    private static final Log log = LogFactory.getLog(TaskThread.class);
    private final long creationTime;

    public TaskThread(ThreadGroup group, Runnable target, String name) {
        super(group, new WrappingRunnable(target), name);
        this.creationTime = System.currentTimeMillis();
    }

    public TaskThread(ThreadGroup group, Runnable target, String name,
            long stackSize) {
        super(group, new WrappingRunnable(target), name, stackSize);
        this.creationTime = System.currentTimeMillis();
    }

    public final long getCreationTime() {
        return creationTime;
    }

    /**
    *  封装{@link Runnable}以接受任何{@link StopPooledThreadException},而不是让它走,并可能在调试器中触发中断。
     */
    private static class WrappingRunnable implements Runnable {
        private Runnable wrappedRunnable;
        WrappingRunnable(Runnable wrappedRunnable) {
            this.wrappedRunnable = wrappedRunnable;
        }
        @Override
        public void run() {
            try {
                wrappedRunnable.run();
            } catch(StopPooledThreadException exc) {
                //expected : we just swallow the exception to avoid disturbing
                //debuggers like eclipse's
                log.debug("Thread exiting on purpose", exc);
            }
        }
    }
}

按照Tomcat的注解可知,它就是一个普通的线程类然后增加一个纪录线程创建的时间纪录而已,后面还使用动态内部类封装了一个Runnable,用于调试出发中断。

Tomcat任务队列

  Tomcat的线程队列由org.apache.tomcat.util.threads.TaskQueue来处理,它集成自LinkedBlockingQueue(一个阻塞的链表队列),来看下源代码吧。

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = 1L;

    private ThreadPoolExecutor parent = null;

    // no need to be volatile, the one times when we change and read it occur in
    // a single thread (the one that did stop a context and fired listeners)
    private Integer forcedRemainingCapacity = null;

    public TaskQueue() {
        super();
    }

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public TaskQueue(Collection<? extends Runnable> c) {
        super(c);
    }

    public void setParent(ThreadPoolExecutor tp) {
        parent = tp;
    }

    public boolean force(Runnable o) {
        if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
    }

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
        return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
    }

    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }


    @Override
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // the poll timed out, it gives an opportunity to stop the current
            // thread if needed to avoid memory leaks.
            parent.stopCurrentThreadIfNeeded();
        }
        return runnable;
    }

    @Override
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
            // yes, this may return null (in case of timeout) which normally
            // does not occur with take()
            // but the ThreadPoolExecutor implementation allows this
        }
        return super.take();
    }

    @Override
    public int remainingCapacity() {
        if (forcedRemainingCapacity != null) {
            return forcedRemainingCapacity.intValue();
        }
        return super.remainingCapacity();
    }

    public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
        this.forcedRemainingCapacity = forcedRemainingCapacity;
    }
}

TaskQueue这个任务队列是专门为线程池而设计的。优化任务队列以适当地利用线程池执行器内的线程。
如果你使用一个普通的队列,执行器将产生线程,当有空闲线程,你不能强制项目到队列本身。

总结

从0到1分析一下Apache Tomcat的线程池,感觉心好累啊,不过有收获,至少多线程池这一块又加强了,首先是定位到了StandardThreadExecutor这个类,然后由此展开,ResizableExecutor(动态大小的线程池接口) 、ThreadPoolExecutor (Tomcat线程池具体实现对象)、TaskThreadFactory(Tomcat线程工厂)、TaskThread(Tomcat线程类-一个纪录创建时间的线程类)、TaskQueue(Tomcat的任务队列-一个专门为线程池而设计优化的任务队列),喝口水,压压惊。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐