深入浅出线程池
作者:京东云开发者-京东零售 秦浩然
链接:https://my.oschina.net/u/4090830/blog/10112678
一、线程
1、什么是线程
2、如何创建线程
2.1、JAVA 中创建线程
/**
* 继承Thread类,重写run方法
*/
classMyThreadextendsThread{
@Override
publicvoidrun(){
System.out.println("myThread..."+Thread.currentThread().getName());
}}
/**
* 实现Runnable接口,实现run方法
*/
classMyRunnableimplementsRunnable{
@Override
publicvoidrun(){
System.out.println("MyRunnable..."+Thread.currentThread().getName());
}}
/**
* 实现Callable接口,指定返回类型,实现call方法
*/
classMyCallableimplementsCallable<String> {
@Override
publicStringcall()throwsException{
return"MyCallable..."+Thread.currentThread().getName();
}}
2.2、测试一下
publicstaticvoidmain(String[] args)throwsException{
MyThread thread =newMyThread();
thread.run();//myThread...main
thread.start();//myThread...Thread-0
MyRunnable myRunnable =newMyRunnable();
Thread thread1 =newThread(myRunnable);
myRunnable.run();//MyRunnable...main
thread1.start();//MyRunnable...Thread-1
MyCallable myCallable =newMyCallable();
FutureTask<String> futureTask =newFutureTask<>(myCallable);
Thread thread2 =newThread(futureTask);
thread2.start();
System.out.println(myCallable.call());//MyCallable...main
System.out.println(futureTask.get());//MyCallable...Thread-2
}
2.4、问题分析
classThreadimplementsRunnable{//Thread类实现了Runnalbe接口,实现了run()方法
privateRunnable target;
publicsynchronizedvoidstart(){
...
boolean started =false;
try{
start0();//可以看到,start()方法真实的调用时start0()方法
started =true;
}finally{
...
}
}
privatenativevoidstart0();//start0()是一个native方法,由JVM调用底层操作系统,开启一个线程,由操作系统过统一调度
@Override
publicvoidrun(){
if(target !=null){
target.run();//操作系统在执行新开启的线程时,回调Runnable接口的run()方法,执行我们预设的线程任务
}
}
}
二、多线程
1、什么是多线程
2、多线程有什么好处
2.1、串行处理
publicstaticvoidmain(String[] args)throwsException{
System.out.println("start...");
long start =System.currentTimeMillis();
for(int i =0; i <5; i++){
Thread.sleep(2000);//每个任务执行2秒
System.out.println("task done...");//处理执行结果
}
long end =System.currentTimeMillis();
System.out.println("end...,time = "+(end - start));
}
//执行结果
start...
task done...
task done...
task done...
task done...
task done... end...,time =10043
2.2、并行处理
publicstaticvoidmain(String[] args)throwsException{
System.out.println("start...");
long start =System.currentTimeMillis();
List<Future> list =newArrayList<>();
for(int i =0; i <5; i++){
Callable<String> callable =newCallable<String>(){
@Override
publicStringcall()throwsException{
Thread.sleep(2000);//每个任务执行2秒
return"task done...";
}
};
FutureTask task =newFutureTask(callable);
list.add(task);
newThread(task).start();
}
list.forEach(future ->{
try{
System.out.println(future.get());//处理执行结果 } catch (Exception e) {
}
});
long end =System.currentTimeMillis();
System.out.println("end...,time = "+(end - start));
}
//执行结果
start...
task done...
task done...
task done...
task done...
task done... end...,time =2005
2.4、多线程的问题
三、线程池
1、如何设计一个线程池
1.1、线程池基本功能
1.2、线程池面临问题
1.3、创新源于生活
1.4、技术源于创新
2、线程池具体分析
2.1、 JAVA 中的线程池是如何设计的
2.1.1、 线程池设计
publicclassThreadPoolExecutorextendsAbstractExecutorService{
//线程池的打包控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量
privatefinalAtomicInteger ctl =newAtomicInteger(ctlOf(RUNNING,0));
//值为29,用来表示偏移量
privatestaticfinalint COUNT_BITS =Integer.SIZE -3;
//线程池的最大容量
privatestaticfinalint CAPACITY =(1<< COUNT_BITS)-1;
//线程池的运行状态,总共有5个状态,用高3位来表示
privatestaticfinalint RUNNING =-1<< COUNT_BITS;//接受新任务并处理阻塞队列中的任务
privatestaticfinalint SHUTDOWN =0<< COUNT_BITS;//不接受新任务但会处理阻塞队列中的任务
privatestaticfinalint STOP =1<< COUNT_BITS;//不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务
privatestaticfinalint TIDYING =2<< COUNT_BITS;//所有任务都已终止, 工作线程数量为0,即将要执行terminated()钩子方法
privatestaticfinalint TERMINATED =3<< COUNT_BITS;// terminated()方法已经执行结束
//任务缓存队列,用来存放等待执行的任务
privatefinalBlockingQueue<Runnable> workQueue;
//全局锁,对线程池状态等属性修改时需要使用这个锁
privatefinalReentrantLock mainLock =newReentrantLock();
//线程池中工作线程的集合,访问和修改需要持有全局锁
privatefinalHashSet<Worker> workers =newHashSet<Worker>();
// 终止条件
privatefinalCondition termination = mainLock.newCondition();
//线程池中曾经出现过的最大线程数
privateint largestPoolSize;
//已完成任务的数量
privatelong completedTaskCount;
//线程工厂
privatevolatileThreadFactory threadFactory;
//任务拒绝策略
privatevolatileRejectedExecutionHandler handler;
//线程存活时间
privatevolatilelong keepAliveTime;
//是否允许核心线程超时
privatevolatileboolean allowCoreThreadTimeOut;
//核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0
privatevolatileint corePoolSize;
//最大池大小,不得超过CAPACITY
privatevolatileint maximumPoolSize;
//默认的任务拒绝策略
privatestaticfinalRejectedExecutionHandler defaultHandler =newAbortPolicy();
//运行权限相关
privatestaticfinalRuntimePermission shutdownPerm =
newRuntimePermission("modifyThread");
...
}
2.1.2、线程池构造函数
//构造函数
publicThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大允许线程数
long keepAliveTime,//线程存活时间
TimeUnit unit,//存活时间单位
BlockingQueue<Runnable> workQueue,//任务缓存队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler){//拒绝策略
if(corePoolSize <0||
maximumPoolSize <=0||
maximumPoolSize < corePoolSize ||
keepAliveTime <0)
thrownewIllegalArgumentException();
if(workQueue ==null|| threadFactory ==null|| handler ==null)
thrownewNullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.1.3、线程池执行
publicFuture<?> submit(Runnable task){
if(task ==null)thrownewNullPointerException();
RunnableFuture<Void> ftask =newTaskFor(task,null);
execute(ftask);
return ftask;
}
可以看到 submit 方法的底层调用的也是 execute 方法,所以我们这里只分析 execute 方法;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//第一步:创建核心线程
if (workerCountOf(c) < corePoolSize) { //worker数量小于corePoolSize
if (addWorker(command, true)) //创建worker
return;
c = ctl.get();
}
//第二步:加入缓存队列
if (isRunning(c) && workQueue.offer(command)) { //线程池处于RUNNING状态,将任务加入workQueue任务缓存队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //双重检查,若线程池状态关闭了,移除任务
reject(command);
else if (workerCountOf(recheck) == 0) //线程池状态正常,但是没有线程了,创建worker
addWorker(null, false);
}
//第三步:创建临时线程
else if (!addWorker(command, false))
reject(command);
}
privatebooleanaddWorker(Runnable firstTask,boolean core){
retry:
for(;;){
int c = ctl.get();
int rs =runStateOf(c);
//等价于:rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
//线程池已关闭,并且无需执行缓存队列中的任务,则不创建
if(rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask ==null&&
! workQueue.isEmpty()))
returnfalse;
for(;;){
int wc =workerCountOf(c);
if(wc >= CAPACITY ||
wc >=(core ? corePoolSize : maximumPoolSize))
returnfalse;
if(compareAndIncrementWorkerCount(c))//CAS增加线程数
break retry;
c = ctl.get();// Re-read ctl
if(runStateOf(c)!= rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//上面的流程走完,就可以真实开始创建线程了
boolean workerStarted =false;
boolean workerAdded =false;
Worker w =null;
try{
w =newWorker(firstTask);//这里创建了线程
finalThread t = w.thread;
if(t !=null){
finalReentrantLock mainLock =this.mainLock;
mainLock.lock();
try{
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs =runStateOf(ctl.get());
if(rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask ==null)){
if(t.isAlive())// precheck that t is startable
thrownewIllegalThreadStateException();
workers.add(w);//这里将线程加入到线程池中
int s = workers.size();
if(s > largestPoolSize)
largestPoolSize = s;
workerAdded =true;
}
}finally{
mainLock.unlock();
}
if(workerAdded){
t.start();//添加成功,启动线程
workerStarted =true;
}
}
}finally{
if(! workerStarted)
addWorkerFailed(w);//添加线程失败操作
}
return workerStarted;
}
privatefinalclassWorker//Worker类是ThreadPoolExecutor的内部类
extendsAbstractQueuedSynchronizer
implementsRunnable
{
finalThread thread;//持有实际线程
Runnable firstTask;//worker所对应的第一个任务,可能为空
volatilelong completedTasks;//记录执行任务数
Worker(Runnable firstTask){
setState(-1);// inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread =getThreadFactory().newThread(this);
}
publicvoidrun(){
runWorker(this);//当前线程调用ThreadPoolExecutor中的runWorker方法,在这里实现的线程复用
}
...继承AQS,实现了不可重入锁...
}
小结:工作线程 Worker 类主要功能;
1. 此类持有一个工作线程,不断处理拿到的新任务,持有的线程即为可复用的线程;
2. 此类可看作一个适配类,在 run () 方法中真实调用 runWorker () 方法不断获取新任务,完成线程复用;
2.1.3.4、线程的复用
finalvoidrunWorker(Worker w){//ThreadPoolExecutor中的runWorker方法,在这里实现的线程复用
Thread wt =Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask =null;
w.unlock();// allow interrupts
boolean completedAbruptly =true;//标识线程是否异常终止
try{
while(task !=null||(task =getTask())!=null){//这里会不断从任务队列获取任务并执行
w.lock();
//线程是否需要中断
if((runStateAtLeast(ctl.get(), STOP)||
(Thread.interrupted()&&
runStateAtLeast(ctl.get(), STOP)))&&
!wt.isInterrupted())
wt.interrupt();
try{
beforeExecute(wt, task);//执行任务前的Hook方法,可自定义
Throwable thrown =null;
try{
task.run();//执行实际的任务
}catch(RuntimeException x){
thrown = x;throw x;
}catch(Error x){
thrown = x;throw x;
}catch(Throwable x){
thrown = x;thrownewError(x);
}finally{
afterExecute(task, thrown);//执行任务后的Hook方法,可自定义
}
}finally{
task =null;//执行完成后,将当前线程中的任务制空,准备执行下一个任务
w.completedTasks++;
w.unlock();
}
}
completedAbruptly =false;
}finally{
processWorkerExit(w, completedAbruptly);//线程执行完成后的清理工作
}
}
privateRunnablegetTask(){
boolean timedOut =false;//标识当前线程是否超时未能获取到task对象
for(;;){
int c = ctl.get();
int rs =runStateOf(c);
// Check if queue empty only if necessary.
if(rs >= SHUTDOWN &&(rs >= STOP || workQueue.isEmpty())){
decrementWorkerCount();
returnnull;
}
int wc =workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if((wc > maximumPoolSize ||(timed && timedOut))
&&(wc >1|| workQueue.isEmpty())){
if(compareAndDecrementWorkerCount(c))//若线程存活时间超时,则CAS减去线程数量
returnnull;
continue;
}
try{
Runnable r = timed ?
workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)://允许超时回收则阻塞等待
workQueue.take();//不允许则直接获取,没有就返回null
if(r !=null)
return r;
timedOut =true;
}catch(InterruptedException retry){
timedOut =false;
}
}
}
privatevoidprocessWorkerExit(Worker w,boolean completedAbruptly){
if(completedAbruptly)// If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
finalReentrantLock mainLock =this.mainLock;
mainLock.lock();
try{
completedTaskCount += w.completedTasks;
workers.remove(w);//移除执行完成的线程
}finally{
mainLock.unlock();
}
tryTerminate();//每次回收完一个线程后都尝试终止线程池
int c = ctl.get();
if(runStateLessThan(c, STOP)){//到这里说明线程池没有终止
if(!completedAbruptly){
int min = allowCoreThreadTimeOut ?0: corePoolSize;
if(min ==0&&! workQueue.isEmpty())
min =1;
if(workerCountOf(c)>= min)
return;// replacement not needed
}
addWorker(null,false);//异常终止线程的话,需要在常见一个线程
}
}
小结:processWorkerExit () 方法主要功能;
1. 真实完成线程池线程的回收;
2. 调用尝试终止线程池;
3. 保证线程池正常运行;
2.1.3.7、尝试终止线程池
finalvoidtryTerminate(){
for(;;){
int c = ctl.get();
//若线程池正在执行、线程池已终止、线程池还需要执行缓存队列中的任务时,返回
if(isRunning(c)||
runStateAtLeast(c, TIDYING)||
(runStateOf(c)== SHUTDOWN &&! workQueue.isEmpty()))
return;
//执行到这里,线程池为SHUTDOWN且无待执行任务 或 STOP 状态
if(workerCountOf(c)!=0){
interruptIdleWorkers(ONLY_ONE);//只中断一个线程
return;
}
//执行到这里,线程池已经没有可用线程了,可以终止了
finalReentrantLock mainLock =this.mainLock;
mainLock.lock();
try{
if(ctl.compareAndSet(c,ctlOf(TIDYING,0))){//CAS设置线程池终止
try{
terminated();//执行钩子方法
}finally{
ctl.set(ctlOf(TERMINATED,0));//这里将线程池设为终态
termination.signalAll();
}
return;
}
}finally{
mainLock.unlock();
}
// else retry on failed CAS
}
}
2.2、JAVA 线程池总结
2.3、JAVA 线程池使用
publicstaticvoidmain(String[] args)throwsException{
//创建线程池
ThreadPoolExecutor threadPoolExecutor =newThreadPoolExecutor(
5,10,100,TimeUnit.SECONDS,newArrayBlockingQueue(5));
//加入4个任务,小于核心线程,应该只有4个核心线程,队列为0
for(int i =0; i <4; i++){
threadPoolExecutor.submit(newMyRunnable());
}
System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 4
System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 0
//再加4个任务,超过核心线程,但是没有超过核心线程 + 缓存队列容量,应该5个核心线程,队列为3
for(int i =0; i <4; i++){
threadPoolExecutor.submit(newMyRunnable());
}
System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 5
System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 3
//再加4个任务,队列满了,应该5个热核心线程,队列5个,非核心线程2个
for(int i =0; i <4; i++){
threadPoolExecutor.submit(newMyRunnable());
}
System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 7
System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 5
//再加4个任务,核心线程满了,应该5个热核心线程,队列5个,非核心线程5个,最后一个拒绝
for(int i =0; i <4; i++){
try{
threadPoolExecutor.submit(newMyRunnable());
}catch(Exception e){
e.printStackTrace();//java.util.concurrent.RejectedExecutionException
}
}
System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 10
System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 5
System.out.println(threadPoolExecutor.getTaskCount());//共执行15个任务
//执行完成,休眠15秒,非核心线程释放,应该5个核心线程,队列为0
Thread.sleep(1500);
System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 5
System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 0
//关闭线程池
threadPoolExecutor.shutdown();
}
往期推荐
这里有最新开源资讯、软件更新、技术干货等内容
点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦
微信扫码关注该文公众号作者
戳这里提交新闻线索和高质量文章给我们。
来源: qq
点击查看作者最近其他文章