Java线程池教程
什么是线程池
Java线程池是一种用于优化线程管理的技术,它可以在应用程序启动时预先创建一组线程并保存在内存中,以避免频繁地创建和销毁线程。线程池通过提供一个控制并发线程数量的机制,可以有效地管理和控制应用程序的并发访问量,从而提高应用程序的性能和响应速度。
Java线程池通常由Java的Executor框架提供支持,该框架提供了一组接口和类,用于创建和管理线程池。其中,ThreadPoolExecutor类是一个常用的线程池实现,它支持自定义线程池的大小、线程存活时间、工作队列类型等参数,并且提供了丰富的控制和监控接口,方便开发人员进行配置和管理。
使用线程池可以带来很多好处,例如:
- 提高应用程序的性能和响应速度:通过预创建线程并保存在内存中,可以避免频繁地创建和销毁线程,从而提高应用程序的性能和响应速度。
- 降低资源消耗:线程池可以控制并发线程数量,避免过多地创建线程,从而降低系统资源消耗。
- 提高系统稳定性:线程池提供了线程管理和监控机制,可以帮助开发人员及时发现和解决线程相关的问题,从而提高系统的稳定性和可靠性。
- 方便扩展:线程池提供了丰富的配置和管理接口,可以根据实际需求进行定制和扩展,满足不同场景下的需求。
总之,Java线程池是一种非常有用的技术,可以提高应用程序的性能、响应速度、稳定性以及可扩展性。在实际开发中,应该根据实际需求选择合适的线程池实现并进行合理的配置和管理。
线程池 Executor提供的API
ExecutorService接口:这是线程池的核心接口,它定义了线程池的基本操作,如提交任务、关闭线程池等。Executors类:这个类提供了一些静态方法来创建不同类型的线程池,如固定大小的线程池、可缓存的线程池等。Future接口:这个接口表示一个异步计算的结果,可以通过调用get方法来获取计算结果。Callable接口:这个接口与Runnable接口类似,但它可以返回计算结果,并且可以抛出异常。ThreadPoolExecutor类:这个类是一个可配置的线程池,它实现了ExecutorService接口,并提供了更多的配置选项,如线程池大小、线程工厂、拒绝策略等。ScheduledThreadPoolExecutor类:这个类是一个可定时执行的线程池,它继承自ThreadPoolExecutor类,并提供了定时执行任务的功能。ThreadFactory接口:这个接口用于创建新线程,可以通过实现这个接口来定义自己的线程工厂。RejectedExecutionHandler接口:这个接口用于处理无法执行的任务,可以通过实现这个接口来定义自己的拒绝策略。
线程池的创建方式
Java线程池提供了以下几种常见的方法:
Executors.newFixedThreadPool(int):创建一个固定大小的线程池,该线程池可以创建指定数量的线程,用于执行提交的任务。Executors.newCachedThreadPool():创建一个可缓存的线程池,该线程池会缓存空闲的线程,以便重用。Executors.newSingleThreadExecutor():创建一个单线程化的线程池,该线程池只会创建一个线程,用于执行提交的任务。Executors.newScheduledThreadPool(int):创建一个定时线程池,该线程池可以定时执行任务,并可以按照指定的时间间隔调度任务。ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>):创建一个可配置的线程池,该线程池可以指定线程池大小、任务队列、线程工厂、拒绝策略等参数。
newFixedThreadPool
- FixedThreadPool的核心线程数和最大线程数都是指定值,也就是说当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中
- keepAliveTime为 0,也就是超出核心 线程数量以外的线程空余存活时间
- 阻塞队列是 LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE, 相当于没有上限
newCachedThreadPool
- 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空 闲线程,若无可回收,则新建线程
- 没有核心线程,非核心线程无上限,完美诠释外包,用完六十秒后空闲就回收
ThreadPoolExecutor 的参数详解
| 参数名称 | 类型 | 描述 |
|---|---|---|
corePoolSize |
int |
核心线程数,线程池中保持活动的最小线程数。 |
maximumPoolSize |
int |
最大线程数,线程池能够容纳的最大线程数。 |
keepAliveTime |
long |
非核心线程的存活时间,当线程数超过核心线程数时,多余的线程在终止之前等待新任务的最长时间。 |
TimeUnit |
java.util.concurrent.TimeUnit |
存活时间的单位,可以是NANOSECONDS、MILLISECONDS、SECONDS、MINUTES、HOURS或DAYS。 |
BlockingQueue<Runnable> |
java.util.concurrent.BlockingQueue<Runnable> |
用于存放任务的队列。当所有核心线程都在忙,且任务队列未满时,新任务会进入队列等待执行。当队列满了,且当前线程数小于最大线程数时,则创建新的线程执行任务。 |
ThreadFactory |
java.util.concurrent.ThreadFactory |
用于创建新线程的工厂,可以自定义线程的创建方式,如设置线程的名称、优先级等。 |
RejectedExecutionHandler |
java.util.concurrent.RejectedExecutionHandler |
拒绝策略处理器,当任务无法被执行时(如队列满了且线程数达到上限),会调用此策略的处理方法。默认情况下,如果处理器未被设置,则抛出RejectedExecutionException异常。 |
如何使用线程池
示例1:创建线程池、添加任务、关闭线程池
Java线程池可以通过使用Executor框架提供的接口和类来创建和管理。其中,ThreadPoolExecutor类是一个常用的线程池实现,它支持自定义线程池的大小、线程存活时间、工作队列类型等参数,并且提供了丰富的控制和监控接口,方便开发人员进行配置和管理。
下面是一个简单的Java线程池的使用示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
int poolSize = 10;
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
// 提交任务到线程池
for (int i = 0; i < 100; i++) {
Runnable worker = new WorkerThread("" + i);
executor.execute(worker);
}
// 关闭线程池(平滑关闭)
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("所有线程运行完毕");
}
}
class WorkerThread implements Runnable {
private String command;
public WorkerThread(String command) {
this.command = command;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 开始处理:" + command);
processCommand();
System.out.println(Thread.currentThread().getName() + " 结束处理:" + command);
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在这个示例中,我们首先使用Executors类的newFixedThreadPool方法创建了一个固定大小的线程池,大小为10。然后,我们使用executor的execute方法提交了100个任务到线程池中。每个任务都是一个实现了Runnable接口的WorkerThread对象。在WorkerThread的run方法中,我们模拟了一个耗时的操作,并通过System.out.println输出线程的名字和处理的任务。最后,我们使用executor的shutdown方法关闭线程池,并等待所有线程运行完毕后输出“所有线程运行完毕”。
示例2:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample1 {
public static void main(String[] args) {
// 创建一个单线程的线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交任务到线程池
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("执行任务");
}
});
// 关闭线程池(平滑关闭)
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("所有线程运行完毕");
}
}
示例3 如何使用Java 线程池执行定时任务
要使用Java线程池执行定时任务,可以按照以下步骤进行:
-
创建一个线程池对象。可以使用
Executors.newScheduledThreadPool()方法创建一个定时任务线程池。例如:ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); -
创建一个
Runnable或Callable的任务对象,该任务对象表示要执行的定时任务。例如,创建一个Runnable的任务对象:Runnable task = new Runnable() { @Override public void run() { // 定时任务逻辑 } }; -
使用定时任务线程池的
schedule()方法来安排任务的执行。该方法接受一个任务对象和一个表示延迟执行的时间参数。例如,将任务延迟1秒后执行:executor.schedule(task, 1, TimeUnit.SECONDS); -
可选地,如果需要定时执行任务,可以使用
scheduleAtFixedRate()方法或scheduleWithFixedDelay()方法。例如,每隔5秒执行一次任务:executor.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS); -
当不再需要执行定时任务时,需要调用线程池的
shutdown()方法来关闭线程池。例如:executor.shutdown();
完整的示例代码如下所示:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
Runnable task = new Runnable() {
@Override
public void run() {
// 定时任务逻辑
System.out.println("定时任务执行");
}
};
// 延迟1秒后执行任务
executor.schedule(task, 1, TimeUnit.SECONDS);
// 每隔5秒执行任务
executor.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS);
// 关闭线程池
executor.shutdown();
}
}
这样就可以使用Java线程池执行定时任务了。
线程池的实现原理分析
- ThreadPoolExecutor 是线程池的核心,提供了线程池的实现
- ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,并另外提供一些调度方法以支 持定时和周期任务
- Executers 是工具类,主要用来创建线程池对象
execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // 新建一个线程执行任务 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 核心池满了,但任务队列未满,添加任务 int recheck = ctl.get(); // 任务添加成功,再次检查是否需要添加新的线程,因为可能有线程被销毁了 if (! isRunning(recheck) && remove(command)) reject(command); // 之前的线程被销毁,新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 核心池已满,队列已满,新建一个工作线程 else if (!addWorker(command, false)) reject(command);// 如果失败,说明线程池被关闭或者已经满了,拒绝任务 }
ctl
在线程池中,
ctl贯穿在线程池的整个生命周期中
ctl是一个AtomicInteger,主要保存线程池的数量和状态,这里采用 高三位 保存运行状态,低29位保存线程数量
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //goto语句
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程处于非运行状态,表示使用了 shutdown 了,所以拒绝添加任务
// shutdown状态,不允许添加任务,但可以执行原先就在队列中的任务
// 且 rs 不等于 SHUTDOWN 且 firstTask 不等于空
// 且 workQueue 为空,直接返回 false(表示不可添加 work 状态)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);// 获得工作线程数
// 如果工作线程数大于默认容量或者大于线程最大数量,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))// 通过cas增加,cas失败则会重试
break retry;
c = ctl.get(); // 获取ctl锁
if (runStateOf(c) != rs) // 线程的状态发生了变化,继续重试
continue retry;
}
}
// 正式创建worker线程
boolean workerStarted = false; // 工作线程是否启动的标志
boolean workerAdded = false; // 工作线程是否添加成功的标志
Worker w = null;
try {
w = new Worker(firstTask); // 构建一个worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock; // 从worker对象中取得线程
mainLock.lock();// 避免并发
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || // 线程池是运行状态或者是SHUTDOWN并且firstTask=null
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 将新创建的 worker 添加到 workers 中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新线程池出现过的最大线程数
workerAdded = true; //表示工作线程创建成功了
}
} finally {
mainLock.unlock(); // 释放锁
}
if (workerAdded) {
t.start(); // 启动线程
workerStarted = true; // 表示启动成功
}
}
} finally {
if (! workerStarted) // 添加失败,递减工作线程,因为最先开始的时候,创建了工作线程
addWorkerFailed(w);
}
return workerStarted;// 返回结果
}
Worker 类
每个
worker,都是一条线程,同时里面包含了一个firstTask,即初始化时要被首先执行的任务
最终执行任务的,是runWorker()
worker类继承了AQS,实现了Runnable接口,firstTask用于保存传入的任务,thread是在调用构造方法时通过ThreadFactory来创建的 线程,是用来处理任务的线程
在调用构造方法时,需要传入任务,通过getThreadFactory().newThread(this);来新建 一个线程
newThread方法传入的参数是this,因为Worker本身继承了Runnable接口, 也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run()由于
ReentrantLock是允许重入的,但是tryAcquire是不允许重入的,所以Worker继承了AQS,使用AQS来实现独占锁的功能如果正在执行任务,则不应该中断线程
如果该线程不是独占锁的状态,也就是空闲的状态,说明没有处理任务,这时可以对该线程终止
线程池执行shutdown()的时候会调用interruptIdleWorkers()中断空闲的线程
interruptIdleWorkers会使用tryLock()判断线程是否空闲之所以设置为不可重入,是因为我们不希望任务在调用像
setCorePoolSize这样的线程池控制方法时重新获取锁,这样会中断正在运行的线程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 真正执行任务的线程,由ThreadFactury构建. */
final Thread thread;
/** 需要执行的任务. */
Runnable firstTask;
/** 完成的任务数,用于任务池统计 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 初始状态,防止在调用 runWorker(),也就是真正执行 task前中断 thread
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); // 执行任务
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker
线程池中执行任务的 真正处理逻辑
- 如果
task不为空,则开始执行task - 如果
task为空,则通过getTask()再去取任务,并赋值给task,如果取到的Runnable不为空,则 执行该任务 - 执行完毕后,通过
while循环继续getTask()取任务 - 如果
getTask()取到的任务依然是空,那么整个runWorker()方法执行完毕
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 表示当前的线程允许中断,这里会将 state 设置为 0
boolean completedAbruptly = true;
try {
// 如果 task 为 null,则通过 getTask() 取得任务
while (task != null || (task = getTask()) != null) {
w.lock();// 为了 shutdown() 不中断执行中的任务,需要加锁
// 线程池为 stop 时,不接受新的任务
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();// 执行 task 的run()方法,也就是实际传入的线程执行逻辑
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 执行完成后,将task设置为null,下一次通过 getTask() 获取新的任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// worker执行完毕后,将当前的worker从workers中删除
// 根据布尔值 allowCoreThreadTimeOut 决定是否补充新的worker进workers
processWorkerExit(w, completedAbruptly);
}
}
getTask
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 对线程池状态的判断
// workQueue 为空并且是 shutdown 状态,则返回 null,结束 wroker 的循环
// 状态为 STOP,则是调用了shutdownNow,所以直接停止 worker 的循环
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null; // 返回 null,则当前的 worker 执行结束
}
int wc = workerCountOf(c);
// timed 用于判断是否需要做超时控制
// allowCoreThreadTimeOut 默认是 true,也就是核心线程不允许超时
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSi
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据 timed 判断
// true 通过 poll 方法进行超时控制
// false 通过 task 获取队列中的任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null) // 任务不为空,则交给worker处理
return r;
timedOut = true;// 如果 r==null,则设置为 timeOut,下一次自旋会回收
} catch (InterruptedException retry) {
timedOut = false;// 重试
}
}
}
拒绝策略
可以通过实现
RejectedExecutionHandler接口,自定义拒绝策略
| 类 | 说明 |
|---|---|
| AbortPolicy | 直接抛出异常,默认策略; |
| CallerRunsPolicy | 调用者所在的线程来执行任务 |
| DiscardOldestPolicy | 丢弃阻塞队列中靠最前的任务,并执行当前任务 |
| DiscardPolicy | 直接丢弃任务 |
如何合理的配置线程池大小
| 类型 | 说明 |
|---|---|
| IO 密集型 | ((线程池设定的线程等待时间+线程CPU时间)/线程CPU时间)*CPU数目 |
| CPU 密集型 | cpu 核心数+1 |
线程池中的线程初始化
ThreadPoolExecutor tpe=(ThreadPoolExecutor)service;
tpe.prestartCoreThread();// 初始化一个核心线程
tpe.prestartAllCoreThreads(); // 初始化所有核心线程
线程池的关闭
| 方法名称 | 说明 |
|---|---|
| shutdown() | 不会立刻关闭,会等到任务队列中的任务全部执行完后终止 |
| shutdownNow() | 立刻终止线程池,并且尝试打断正在执行的任务,清空缓存队列,返回为执行的任务 |
线程池容量的动态调整
| 方法名称 | 说明 |
|---|---|
| setCorePoolSize() | 设置核心池大小 |
| setMaximumPoolSize() | 设置线 程池最大能创建的线程数目大小 |
任务缓存队列及排队策略
| 类型 | 说明 |
|---|---|
| ArrayBlockingQueue | 基于数组的先进先出队列,此队列创建时必须指定大小 |
| LinkedBlockingQueue | 基于链表的先进先出队列,如果创建时没有指定此队列大小,则默 认为 Integer.MAX_VALUE |
| SynchronousQueue | 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务 |
线程池排队策略设置
在Java中,线程池的任务缓存队列一般是由线程池自己管理的,我们不需要手动去设置。具体使用哪个队列是由线程池的内部实现来决定的。
当我们提交一个任务给线程池时,线程池会根据其内部策略来决定是立即执行这个任务,还是将它放到任务队列中等待执行。具体的策略可能因线程池的实现而异,但通常会考虑线程池的大小(即当前有多少可用的线程)以及任务队列的大小等因素。
Java线程池的排队策略可以在创建线程池时通过ThreadPoolExecutor的构造函数进行设置。ThreadPoolExecutor有一个构造函数接收一个BlockingQueue对象作为参数,这个对象用于存储等待执行的任务。
Java线程池的排队策略主要有以下几种:
- 直接提交策略(
DiscreteThreadScheduler): 任务被提交后立即执行,适用于对延迟要求较高的场景。 - 优先队列策略(
PriorityThreadScheduler): 任务按优先级排队,高优先级任务先于低优先级任务执行,适用于对优先级要求较高的场景。 - 轮询策略(
RoundRobinThreadScheduler): 任务按轮询方式执行,适用于需要均衡负载的场景。 - 限时策略(
TimedThreadScheduler): 任务在等待一定时间后开始执行,适用于需要在规定时间内完成任务或者需要避免长时间等待的场景。
可以通过以下方式创建具有特定排队策略的线程池:
一些节本的策略设置
// 使用直接提交策略创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new DiscreteThreadScheduler());
// 使用优先队列策略创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new PriorityThreadScheduler());
// 使用轮询策略创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new RoundRobinThreadScheduler());
// 使用限时策略创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new TimedThreadScheduler());
需要注意的是,以上代码中的corePoolSize、maximumPoolSize、keepAliveTime等参数也需要根据实际需求进行设置。
使用 ArrayBlockingQueue 策略
对于ThreadPoolExecutor,我们可以通过构造函数来设置一些参数,如任务队列的容量,但具体的任务存储和管理是由线程池自己来负责的。
例如:
// 创建一个具有指定核心线程数、最大线程数、空闲线程存活时间的线程池,
// 并使用一个具有指定容量的任务队列来存储待执行的任务。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueCapacity)
);
这里使用的ArrayBlockingQueue是一个阻塞队列,可以作为任务队列。当我们向队列中添加任务时,如果队列已满,那么会阻塞直到队列有空位;当我们从队列中获取任务时,如果队列为空,那么会阻塞直到队列中有新的任务。这样能保证线程池中的任务总是按顺序执行,不会因为同时有大量的任务提交而被“打乱”。
总的来说,Java线程池会自动管理任务队列,我们通常不需要手动去设置。如果你需要使用自定义的任务队列,那么需要确保这个队列能正确地处理并发访问和阻塞情况。
