线程池

JAVA线程池

Executors.newFixedThreadPool(int)

创建一个最大线程数固定的线程池,参数中传入最大线程数的数量,这些线程全部作为核心线程,一经创建不进行回收。提交任务到线程池时,如果线程都在忙,任务会被放入工作队列,其工作队列的长度为int的取值范围,视为无界,所以可以说它不会拒绝任务抛出RejectedExecutionException异常

Executors.newCachedThreadPool()

创建一个可缓存的线程池,缓存即线程池中的线程可以复用。该线程池中没有核心线程,全部皆为临时线程,线程的数量会根据并发任务的数量自动扩容,上限为int的取值范围,视为无界,线程的空闲时间超过60秒会自动回收。由于线程的数量无上限,所以可以说它不会拒绝任务抛出RejectedExecutionException异常

它的工作队列是一个阻塞队列,队列没有长度所以不存储任务,每个插入到阻塞队列的任务必须阻塞等待,直到线程池中的线程取出任务,当有新任务到来时,如果没有空闲线程取出任务,线程池会立刻创建新的线程来处理任务,任务不会进入工作队列而是直接传递给可用的线程,适合处理大量短期异步任务

该工作队列符合生产者消费者模型,当有线程作为生产者插入任务时,如果线程池中没有线程作为消费者等待任务,那么生产者线程会被阻塞,直到任务被取出;同样,线程池中的线程作为消费者取任务时,如果没有其他线程作为生产者插入任务,消费者线程也会被阻塞,直到取出一个任务。如下代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 代码示例说明
SynchronousQueue<String> queue = new SynchronousQueue<>();

// 生产者线程
Thread producer = new Thread(() -> {
try {
System.out.println("生产者尝试插入数据...");
queue.put("Data"); // 此处会阻塞
System.out.println("生产者插入成功");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// 消费者线程
Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000); // 故意延迟消费
System.out.println("消费者开始获取数据...");
String data = queue.take();
System.out.println("获取到数据: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

producer.start();
consumer.start();

/* 输出结果:
生产者尝试插入数据...
(2秒后)
消费者开始获取数据...
生产者插入成功
获取到数据: Data
*/

Executors.newSingleThreadExecutor()

创建单个线程数的线程池,就一个核心线程,如果这个线程异常终止,线程池会创建一个新的线程代替它。提交任务到线程池时,如果线程在忙,任务会被放入工作队列,其工作队列的长度为int的取值范围,视为无界,所以可以说它不会拒绝任务抛出RejectedExecutionException异常

Executors.newScheduledThreadPool(int)

创建一个可以执行延迟任务的线程池,支持定时或周期性执行任务,参数中传入核心线程数的数量,最大线程数上限为int的取值范围,视为无界;工作队列的初始长度为16,但是队列满时会自动扩容,最大为int的取值范围

Executors.newSingleThreadScheduledExecutor()

创建单线程可以执行延迟任务的线程池,同上但限制了线程池中的线程数为1

Executors.newWorkStealingPool()

创建一个抢占式执行的线程池

自定义线程池

自定义线程池通常需要填入7个参数,如下代码供参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ThreadPoolExecutor pool = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
10, // 临时线程空闲存活时间
TimeUnit.SECONDS, // 临时线程空闲存活时间单位
new LinkedBlockingDeque<>(), // 工作队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
// 执行任务
pool.submit(new Runnable() {
@Override
public void run() {
System.out.println("run");
}
});
// 停止
pool.shutdown();

参数一:核心线程数

线程池中的核心线程数,核心线程即使处于空闲状态,线程池也不会回收它们

参数二:最大线程数

线程池中允许的最大线程数,最大线程数不能小于核心线程数,当最大线程数大于核心线程数时,意为最多允许存在最大线程数 - 核心线程数个临时线程

参数三:临时线程空闲存活时间

当临时线程空闲时会被线程池回收,此处为空闲存活的时间值

参数四:临时线程空闲存活时间单位

当临时线程空闲时会被线程池回收,此处为空闲存活的时间单位,可以为纳秒、微秒、毫秒、秒、分钟、、天

参数五:工作队列

用于存放待执行的任务的阻塞队列,常见有

  1. ArrayBlockingQueue
    • 一个由数组支持的有界阻塞队列。按照先进先出的原则对元素进行排序
  2. LinkedBlockingQueue
    • 一个由链表结构组成的有界阻塞队列。如果构造时未指定容量,则默认容量为Integer.MAX_VALUE,即视为无界队列
  3. SynchronousQueue
    • 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程的对应移除操作,反之亦然
  4. PriorityBlockingQueue
    • 一个支持优先级排序的无界阻塞队列,元素必须实现Comparable接口或在构造队列时提供Comparator,以便进行排序
  5. LinkedBlockingDeque
    • 一个由链表结构组成的双端阻塞队列。它可以从两端插入和移除元素
  6. LinkedTransferQueue
    • 一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法
  7. DelayQueue
    • 一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素

参数六:线程工厂

默认的线程工厂

1
Executors.defaultThreadFactory()

自定义线程工厂,如下设置线程名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyThreadFactory implements ThreadFactory {
private final String prefix;
private final AtomicInteger atomicInteger = new AtomicInteger(1);

public MyThreadFactory(String prefix) {
this.prefix = prefix;
}

@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
// 设置线程名
thread.setName(prefix + "-" + atomicInteger.getAndIncrement());
return thread;
}
}

参数七:拒绝策略

  1. AbortPolicy
    • 丢弃任务并抛出RejectedExecutionException异常(默认策略)
  2. CallerRunsPolicy
    • 调用任务的run()方法,绕过线程池直接执行
  3. DiscardPolicy
    • 直接丢弃任务,不抛出任何异常
  4. DiscardOldestPolicy
    • 丢弃工作队列中等待最久的任务,然后把当前任务加入队列中

通过实现RejectedExecutionHandler接口来自定义拒绝策略

1
2
3
4
5
6
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
throw new RejectedExecutionException("线程池出现异常,线程被拒绝");
}
}