《线程池系列二》
线程是什么相信大家都使用过线程池,也了解使用线程池的好处。我们使用线程池最多的还是使用Executors工具类创建FixedThreadPool、SingleThreadPool以及CachedThreadPool三种线程池,如果我们不了解其工作原理,将会碰到很多意想不到的问题,例如内存被撑爆,cpu被打满,线程池无故中断,关闭线程池应该使用shutdown()还是shutdownNow()等等一系列的问题,这篇文章将讲解为什么要是用线程池,为什么会出现上述的问题,线程池的工作原理是什么,应该选择何种线程池,如何定义线程池线程的个数等等。本文采用JDK8源码进行讲解,主要讲解原理为主,不过多的涉及到源码。注:手机端代码展示不佳,建议查看原文链接
不讲书本知识,只是抛出一个问题:new Thread(); new Runnable(); new Callable(); (尽管不能new 接口,这里只是说明意思,不要较真),请问这代表线程吗?
一定要注意,这都是类,java中的类,不是线程,千万不要看到thread, runnable, callable就认为是线程,它们和Object, List,Map一样是java语言的类而已。 或者可以说他们是任务,线程执行的任务,因为他们内部都有线程要执行的方法run()或者call(),那什么才是线程呢?
new Thread().start(); 调用了start()方法才会在操作系统层面启动一个线程,除此之外都是承载了线程执行方法的类而已。这一点大家一定要分清楚。
多线程多线程一定比单线程快吗?这个答案是否定的,在单核处理器环境下,多个线程执行任务势必会引起线程上下文切换,上下文切换会对当前线程的执行环境进行保存,并还原将要执行线程的执行环境,存在开销。多线程与单线程相比,多出了上下文切换的时间,因此在单核处理器环境下,多线程并不会提高性能。
现今,处理基本上都是多核多处理器,因此合理使用多线程编程将取很大的性能提升。但是当线程数过多,引起过多的上下文切换,当上下文切换的开销大于多线程带来的收益的话,性能将会下降。滥用多线程将会是一场灾难。
线程池的引入首先从线程类Thread讲起,Thread类具有两个功能:
维护线程 线程的创建、休眠、中断、暂停、销毁执行任务 Thread类及其子类run(), Runnable对象的run(), Callable对象的call() (更准确的说应该是FutureTask,因为Callable对象并不能传入Thread类)Thread类将线程和任务耦合在一起,一般的使用方式为:有多少个任务就需要多少个线程去执行,并发的任务数太多,就会引起大量的上下文切换,以及线程的创建与销毁(线程的创建和销毁都设计到内核态和用户态的转换,开销也不容小觑)。
为了能对线程进行统一的管理和复用,引入了线程池。线程池对线程进行统一的管理,并可以弹性的扩展,将执行任务和线程完全分离,任务存放到阻塞队列中,线程不断的去阻塞队列中取任务执行。从而达到线程复用的目的(说白了,线程在死循环中去阻塞队列获取数据,如果获取不到就阻塞,如果获取到就执行,其run()方法一直执行),这样线程与任务个数比为m:n 其中m<<<n
因此,编写多线程程序时,我们最好使用线程池。
线程池的参数corePoolSize核心线程数,当提交任务时如果线程数小于corePoolSize,则直接创建线程执行该任务,否则,将任务添加到阻塞队列
maximumPoolSize最大线程数,当提交任务时,任务需添加到阻塞队列且阻塞队列满时,如果线程数小于maximumPoolSize,则创建线程执行该任务,否则执行拒绝策略
注:如果阻塞队列采用的是无界队列的话,该参数无意义,因为阻塞队列无界就永远不会满
keepAliveTime线程空闲时间,空闲时间超过该时间则销毁线程,只对大于corePoolSize~maximumPoolSize的线程有效,即至少保留corePoolSize个线程,即便空闲时间大于keepAliveTime也不销毁。(核心线程也是可以销毁的,需要设置核心线程过期)
注:如果阻塞队列为无界,则maximumPoolSize无意义,那么keepAliveTime也就无意义
unitkeepAliveTime的时间单位
workQueue阻塞队列,分为有界队列和无界队列,一般使用LinkedBlockingQueue、SynchronousQueue,用于存放任务,阻塞队列的泛型必须是Runnable
threadFactory线程工厂,负责创建线程,指定线程名,线程组,线程优先级,是否为守护线程等信息
handler拒绝策略,当阻塞队列放不下,且线程数达到最大值maximumPoolSize时,再提交任务,改任务会被拒绝。目前,JDK提供了四种拒绝策略
CallerRunsPolicy 调用线程执行策略,当前执行的线程执行该任务,可以保证任务不丢失,减缓任务添加的速度AbortPolicy 直接抛出异常,会导致线程池抛异常,线程池不可用,默认拒绝策略DiscardPolicy 直接丢弃该任务DiscardOldestPolicy 丢弃最老的任务,重试添加该任务注:如果阻塞队列为无界,则拒绝策略无效,因为不会存在任务放不下的情况,也可以自定义自己的拒绝策略。该参数一定要重视
线程池的构造函数构造线程池无非就是为上节中介绍的几个参数赋值,源码如下
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <=0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue==null || threadFactory==null || handler==null) throw new NullPointerException(); this.corePoolSize=corePoolSize; this.maximumPoolSize=maximumPoolSize; this.workQueue=workQueue; this.keepAliveTime=unit.toNanos(keepAliveTime); this.threadFactory=threadFactory; this.handler=handler;}
其他的构造函数,都是间接调用该构造函数
线程池的工作原理提交任务,如何当前线程数<corPoolSize,不管是否有空闲线程都会创建新的线程执行如何当前线程数>=corPoolSize,将任务提交给阻塞队列如果阻塞队列不满,添加到阻塞队列,否则执行4如果当前线程数<maxPoolSize,且不存在空闲线程则创建一个线程执行该任务,否则执行5执行拒绝策略第一步需要注意的是在提交任务时,excutor会不会判断有无空闲线程,答案是不会,因为如果每次提交任务都需要判断有无空闲线程,将会造成很大的开销,excutor的做法是,启动的每一个worker在空闲时都会去阻塞队里阻塞的获取任务,如果没有任务则worker会阻塞,因为worker到底空不空闲worker自己是最清楚的。
线程执行完一个任务之后,会从阻塞队列中获取任务,如果没有任务可以获取,则阻塞等待,如果有任务则直接获取执行。与此同时,线程池中有专门的线程坚持线程的空闲时间(等待任务的时间),如果超过指定时间且线程数>corePoolSize,就销毁线程。
Executors提供的三种线程池FixedThreadPool固定大小的线程池,其源代码如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}
通过源码可以看出,线程池的corePoolSize和maximumPoolSize都为指定大小,阻塞队列使用无界阻塞队列(看到无界阻塞队列,就应该想到maximumPoolSize、keepAliveTime、handler都无效),因此,该方法中有用的参数只有corePoolSize和workQueue是有意义的。
存在的问题:当任务执行的较慢,且任务提交的速度过快时,会有大量的任务存放到阻塞队列中,阻塞队列会越来越大,内存会被撑爆,使用该线程池时,一定要考虑清楚。
除了该方法外,Executors还提供了重载方法,可以指定ThreadFactory,但是却没有提供修改阻塞队列的重载方法
使用场景: 负载较重的服务器
SingleThreadPool当个线程的线程池,与FixedThreadPool相比就是将线程数指定为1,同样该线程池存在FixedThreadPool存在的问题,其源码如下:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));}
与FixedThreadPool类型,Executors也提供了指定ThreadFactory的重载方法
使用场景: 单线程执行环境,保证顺序执行各个任务的场景
CachedThreadPool使用SynchronousQueue阻塞队列,该队列不保存元素,有任务提交到阻塞队列时,任务必须立即被处理。源码如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}
从源码中可以看出,maximumPoolSize的值为Integer.MAX_VALUE,意味着只要有任务到达,且线程池内没有空闲线程,就给任务开辟一个线程去执行。线程空闲60s就销毁
存在问题:如果任务执行时间长,提交速度快,那么会产生大量的线程,引起上下文切换,应用可能会出现假死或者崩溃的情况。
同样,这种类型的线程池,也提供了一个指定ThreadFactory的重载方法使用场景:适用于大量短期异步任务,或者负载较轻的服务器
由此可见:Executors提供的三种线程池都各自有优缺点,如果使用线程池,建议不要使用这三种线程池,而是直接通过线程池的构造方法指定自己的corePoolSize,maximumPoolSize,keepAliveTime,阻塞队列workQueue,ThreadFactory,拒绝策略,自己指定的优点就是可以根据自己的场景灵活的对各个参数进行配置。
线程池提交任务submit()提交有返回值的任务,返回值为Future类型(真正的类型是RunnableFuture,而实现RunnableFuture接口的在JDK实现中对外可以使用的就只有FutureTask类
有关FutureTask的相关知识可以参考我的另外一篇文章: FutureTask原理讲解与源码剖析)
execute()提交没有返回值的任务
线程池关闭shutdown()将线程池的状态修改为shutdown,禁止向线程池中提交任务,并执行完已经提交的任务
shutdonwNow()将线程池的状态修改为stop, 立即终止线程池中的线程, 不处理阻塞队列中的任务,返回没有执行任务的列表
可以通过isTerminated()方法判断线程池是否完全关闭也可以通过awaitTermination(long timeout, TimeUnit unit)最长等待一段时间后退出,但并不能保证关闭
如何分配线程池的大小一般来讲没有上下文切换的多线程程序是最好的,因此,如果有n个核,那么启动n个线程就可以。但是线程并不是一直处于运行状态(可能在等待IO放弃了cpu资源),这样cpu资源就会浪费,因此我们一般针对不同的任务设定不同的线程数。
首先我们应该获取服务器的线程数,可以通过如下代码获取:
Runtime.getRuntime().availableProcessors();
注意,如果使用docker容器,使用该参数获取的是实机的核数,并不是分配给docker容器的核数,如果碰到需要修改, 具体情况具体分析。
针对IO密集型任务:一般分配2*p个线程(p代表服务器cpu总核数)针对cpu密集型任务: 一般分配 p+1个线程线程池的监控线程池提供了很多参数,来记录线程池中各个状态,了解即可:
taskCount 线程池执行任务总数completedTaskCount 已执行完成任务数量largestPoolSize 创建过最大的线程数getPoolSize() 当前线程数量getActiveCount() 活动线程数除此之外,还可以继承线程池类定义自己的线程池实现, 可以重写 beforeExecute(), afterExecute(), terminated()方法设置监控
总结本文并没有从源码的角度讲解线程池,更加详细的实现将在下一周抽时间整理。
欢迎扫描下方二维码,关注公众号,我们可以进行技术交流,共同成长qrcode_for_gh_5580beb3cba1_430.jpg线程池原理
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后
启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,
再从队列中取出任务来执行。他的主要特点为:线程复用;控制最大并发数;管理线程。
1. 线程复用
每一个 Thread 的类都有一个 start 方法。 当调用 start 启动线程时 Java 虚拟机会调用该类的 run
方法。 那么该类的 run() 方法中就是调用了 Runnable 对象的 run() 方法。 我们可以继承重写
Thread 类,在其 start 方法中添加不断循环调用传递过来的 Runnable 对象。 这就是线程池的实
现原理。循环方法中不断获取 Runnable 是用 Queue 实现的,在获取下一个 Runnable 之前可以
是阻塞的。
2. 线程池的组成
一般的线程池主要分为以下 4 个组成部分:
1. 线程池管理器:用于创建并管理线程池
2. 工作线程:线程池中的线程
3. 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
4. 任务队列:用于存放待处理的任务,提供一种缓冲机制
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,
ExecutorService,ThreadPoolExecutor ,Callable 和 Future、FutureTask 这几个类。
ThreadPoolExecutor 的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
1. corePoolSize:指定了线程池中的线程数量。
2. maximumPoolSize:指定了线程池中的最大线程数量。
3. keepAliveTime:当前线程池数量超过 corePoolSize 时,多余的空闲线程的存活时间,即多
次时间内会被销毁。
4. unit:keepAliveTime 的单位。
5. workQueue:任务队列,被提交但尚未被执行的任务。
6. threadFactory:线程工厂,用于创建线程,一般用默认的即可。
7. handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。
3. 拒绝策略
线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也
塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK 内置的拒绝策略如下:
1. AbortPolicy : 直接抛出异常,阻止系统正常运行。
2. CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的
任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
3. DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再
次提交当前任务。
4. DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢
失,这是最好的一种方案。
以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际
需要,完全可以自己扩展 RejectedExecutionHandler 接口。
4. Java 线程池工作过程
1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面
有任务,线程池也不会马上执行它们。
2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要
创建非核心线程立刻运行这个任务;
d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池
会抛出异常 RejectExecutionException。
3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
4. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运
行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它
最终会收缩到 corePoolSize 的大小。
上一篇:14课鹿和狼的故事图片-
下一篇:shake的现在分词
发表评论