Java多线程入门3

如果你还不了解Java多线程相关的知识,建议去看我的上一篇Java多线程入门2,如果你还没有开始了解Java,那么可以从我的知乎开始:知乎传送门

线程池

池的概念

对于共享资源的情况,有一个通用的涉及模式:资源池,用于解决资源频繁创建消失所造成的资源浪费。所以初始化一些共享资源里,使用这些资源结束并不会让他们消失,而且会交接给下一个调用的人使用。这些资源通称为池的概念。

系统启动一个新线程的成本是很搞的,因为涉及到与操作系统交互。所以在这种情况下,使用线程池可以很好地提高性能

而且使用线程池可以有效地控制系统中并发线程地数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至JVM崩溃,而线程池可以控制系统中并发线程地数量。

使用线程池管理线程

创建线程池

Java5以后,Java内建支持线程池。Java5新增了一个Executors工厂类来创建线程池,下面是几个静态地工厂方法来创建线程池。

序号 方法 描述
1 newCachedThreadPool() 创建一个具有缓冲功能的线程池,系统根据需要创建线程,这些线程会被缓存在线程池中。
2 newFixedThreadPool(int nThreads) 创建一个可重用、具有固定线程数的线程池
3 newSingleThreadExecutor() 创建一个单线程的线程池,相当于上面的方法参数为1的情况
4 newScheduledThreadPool(int corePoolSize) 创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。corePoolSize指的是池中所保存的线程数,即使线程是空闲的也被保存在线程池中
5 newSingleThreadScheduleExecutor() 创建单线程线程池,指定延迟后执行线程任务
6 newWorkStealingPool( int parallelism) 创建 持有 足够 的 线程 的 线程 池 来 支持 给定 的 并行 级别, 该 方法 还会 使用 多个 队列 来 减少 竞争。
7 newWorkStealingPool() 该 方法 是 前一 个 方法 的 简化 版本。 如果 当前 机器 有 4 个 CPU, 则 目标 并行 级别 被 设置 为 4, 也就是 相当于 为 前一 个 方法 传入 4 作为 参数。

序号1-3返回 ExecutorService 对象,该对象代表一个线程池,可以执行Runnable对象或者Callable对象所代表的线程任务

序号4-5返回得是ScheduledExecutorService 线程池,它是 ExecutorService 的子类,它可以在指定延迟后执行线程任务;

序号6-7则是Java8新增的,这两个方法可以充分利用多CPU并行的能力。方法 生成 的 work stealing 池,都相当于后台线程池,如果所有的前台线程都死亡了,work stealing 池中的线程会自动死亡。

操作线程池的方法

ExecutorService代表马上执行线程的线程池,它的方法如下,下面的方法都是重载的

方法 描述
Future<?> submit( Runnable task) 将一个 Runnable 对象提交给指定的线程池,线程池将在有空闲线程时执行Runnable 对象代表的任务。该方法返回null,因为run方法没有返回值。
< T> Future< T> submit( Runnable task, T result) 将 一个 Runnable 对象提交给指定的线程池,线程池将在有空闲线程时执行 Runnable 对象代表的任务。其中 result 显式指定线程执行结束后的返回值,所以 Future 对象将在 run() 方法执行结束后返回 result。
< T> Future< T> submit( Callable< T> task) 将一个 Callable 对象提交给指定的线程池,线程池将在有空闲线程时执行 Callable 对象代表的任务。其中 Future 代表 Callable 对象里 call() 方法的返回值。

可以调用 Future 的 isDone() isCancelled() 方法 来 获得 Runnable 对象 的 执行状态。

ScheduledExecutorService 代表可在指定延迟后或周期性地执行线程任务的线程池,它提供了如下 4 个方法。

方法 描述
ScheduledFuture< V> schedule( Callable< V> callable, long delay, TimeUnit unit) 指定 callable 任务将在 delay 延迟后执行。
ScheduledFuture<?> schedule( Runnable command, long delay, TimeUnit unit) 指定 command 任务将在 delay 延迟后执行。
ScheduledFuture<?> scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit) 指定 command 任务将在 delay 延迟后执行,而且以设定频率重复执行。也就是说,在 initialDelay 后开始执行,依次在 initialDelay+ period、 initialDelay+ 2* period… 处重复执行,依此类推。
ScheduledFuture<?> scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit) 创建并执行一个在给定初始延迟后首次启用的定期操作,随后在 每一 次执行终止和下一 次执行开始之间都存在给定的延迟。如果任务在任 一次执行时遇到异常,就会取消后续执行;否则, 只能通过程序来显式取消或终止该任务。

使用完线程池,应该调用线程池的shutdown()方法来关闭线程池,执行过后线程池不再接收新任务,而且会在所有线程执行完毕后杀死线程。或者调用shutdownNow(),该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表

操作线程池的流程

  1. 调用 Executors 类的静态工厂方法创建一个 ExecutorService 对象,该对象代表一个线程池。
  2. 创建Runnable 实现类或Callable 实现类的实例,作为线程执行任务。
  3. 调用ExecutorService对象的 submit() 方法来提交 Runable 实例或 Callable 实例。
  4. 当不想提交任何任务时,调用 ExecutorService 的shudown()方法来关闭线程池

下面的demo程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {
public static void main(String[] args) {

//创建一个具有固定线程数(6)的线程池
ExecutorService pool = Executors.newFixedThreadPool(6);

//使用Lambda表达式创建Runnable对象
Runnable target = () ->{
for(int i=0;i<100;i++) {
System.out.println(Thread.currentThread().getName()+"的i值"+i);
}
};

//向线程池中提交两个线程
pool.submit(target);
pool.submit(target);

//关闭线程池
pool.shutdown();
}
}

使用 ForkJoinPool 利用多CPU

Java7提供了 ForkJoinPool 来支持将一个任务拆分成多个小任务并行计算,再把多个小任务的结果合并成总的计算结果。结果。 ForkJoinPool 是 ExecutorService 的实现类, 因此是一种特殊的线程池。下面是它的常用的构造器。

  • ForkJoinPool( int parallelism): 创建一个包含 parallelism 个并行线程的 ForkJoinPool。
  • ForkJoinPool():以 Runtime. availableProcessors() 方法的返回值作为 parallelism 参数来创建 ForkJoinPool。

Java8提供了静态方法来创建

  • ForkJoinPool commonPool(): 该方法返回一个通用池,通用池的运行状态不会受 shutdown() 或 shutdownNow() 方法的影响。当然,如果程序直接执行 System. exit(0);来终止虚拟机,通用池以及通用池中正在执行的任务都会被自动终止。
  • int getCommonPoolParallelism():该方法返回通用池的并行级别。

创建之后的操作

之后就可以调用 ForkJoinPool 的 submit( ForkJoinTask task) 或 invoke( ForkJoinTask task) 方法来执行指定任务了。ForkJoinTask 是一 个 抽象 类, 它 还有 两个 抽象 子类: RecursiveAction 和 RecursiveTask。其中 RecursiveTask 代表有返回值的任务,而 RecursiveAction 代表没有返回值的任务。

下面是一个demo程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

//继承RecurisveAction来实现可分解任务
class PrintTask extends RecursiveAction{

//每个小任务最多只打印50个数
private static final int THRESHOLD = 50;
private int start;
private int end;

//打印从start到end的任务
public PrintTask(int start,int end) {
this.start = start;
this.end = end;
}

@Override
protected void compute() {

//当end与start之间的差小于THRESHOLD时,开始打印
if(end - start < THRESHOLD) {
for(int i = start;i<end;i++) {
System.out.println(Thread.currentThread().getName()+"的i值:"+i);
}
}else {

//当end与start之间的差大于THRESHOLD,即要打印的数超过50个时
//将大任务分解成两个“小任务”
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start,middle);
PrintTask right = new PrintTask(middle,end);

//并行执行两个小任务
left.fork();
right.fork();
}

}
}
public class ForkJoinPoolTest {
public static void main(String[] args) throws InterruptedException {

ForkJoinPool pool = new ForkJoinPool();

//提交可分解的PrintTask任务
pool.submit(new PrintTask(0,300));
pool.awaitTermination(2, TimeUnit.SECONDS);

//关闭线程池
pool.shutdown();
}
}

下面程序示范了使用 Recursive Task 对一个长度为 100 的数组的元素值进行累加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

//继承RecursiveTask来实现可分解的任务
class CalTask extends RecursiveTask<Integer>{

//每个小任务 最多只累加20个数
private static final int THRESHOLD = 20;
private int arr[];
private int start;
private int end;

public CalTask(int[] arr,int start,int end) {
this.arr = arr;
this.start = start;
this.end = end;
}

//累加从start到end的数组
@Override
protected Integer compute() {
int sum =0;

//当end与start之间的差小于THRESHOLD时,开始实际累加
if(end - start < THRESHOLD) {
for(int i=start;i<end;i++) {
sum += arr[i];
}
return sum;
}else {

//当end与start之间的差大于ThRESHOLD,即要累加的数超过20小时
//将大任务分解成两个“小任务”
int middle = (start+end)/2;
CalTask left = new CalTask(arr,start,middle);
CalTask right = new CalTask(arr,middle,end);

//并行执行两个“小任务”
left.fork();
right.fork();

//把两个小人物累加的结果合并起来
return left.join()+right.join();
}
}

}
public class Sum {
public static void main(String[] args) throws InterruptedException, ExecutionException {

int[] arr = new int[100];
Random rand = new Random();
int total =0;

//初始化100个数字元素
for(int i =0,len =arr.length;i<len;i++) {
int tmp = rand.nextInt(20);

//对数组元素复制,并将数组元素的值添加到sum总和中
total+=(arr[i] = tmp);
}
System.out.println(total);

//创建一个通用池
ForkJoinPool pool = ForkJoinPool.commonPool();

//提交可分解的CaltTask任务
Future<Integer> future = pool.submit(new CalTask(arr,0,arr.length));
System.out.println(future.get());

//关闭线程池
pool.shutdown();
}
}

线程相关类

ThreadLocal类

该类就是为每一个使用该变量的线程都提供一个变量值的副本,使每一个线程都可以独立地改变自己地副本,而不会和其他线程地副本冲突。下面是三个public 方法

  • T get():返回此线程局部变量中当前线程副本中的值
  • void remove(): 删除此线程局部变量中当前线程的值
  • void set(T value):设置此线程局部变量中当前线程副本的值

下面是demo程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Account{

//定义一个ThreadLocal类型的变量,该变量将是一个线程局部变量,每个线程都会保留该变量的一个副本
private ThreadLocal<String> name = new ThreadLocal<String>();

//定义一个初始化name成员变量的构造器
public Account(String str) {
this.name.set(str);

//下面的代码用于访问当前线程的name副本的值
System.out.println("---"+this.name.get());
}

//name的get和set方法
public String getName() {
return name.get();
}

public void setName(String str) {
this.name.set(str);
}
}

class MyTest extends Thread{

//定义一个Account类型的成员变量
private Account account;
public MyTest(Account account,String name) {
super(name);
this.account = account;
}

public void run() {

//循环10次
for(int i = 0;i<10;i++) {

//当i==6时输出账号名替换成当前线程名
if(i==6) {
account.setName(getName());
}

//输出同一个账户的账户名和循环变量
System.out.println(account.getName()+"账户的i值"+ i);
}
}
}
public class ThreadLocalTest {
public static void main(String[] args) {

//启动两个线程,两个线程共享同一个Account
Account at = new Account("初始名");

/*
* 虽然两个线程共享同一个账户,即只有一个账户名
* 但由于账户名是ThreadLocal类型的,所以每个线程都完全拥有各自的账户名副本
* 因此再i == 6之后,将看到两个线程访问同一个账户时出现不同的账户名
*/
new MyTest(at,"线程甲").start();
new MyTest(at,"线程乙").start();
}
}

包装线程不安全的集合

如果程序中有多个线程可能访问 ArrayList、 LinkedList、 HashSet、 TreeSet、 HashMap、 TreeMap,就可以使用 Collections 提供的类方法把这些集合包装成线程安全的集合。下面是几个静态方法:

方法 描述
< T> Collection< T> synchronizedCollection( Collection< T> c) 返回指定 collection 对应的线程安全的 collection。
static < T> List< T> synchronizedList( List< T> list) 返回指定 List 对象对应的线程安全的 List 对象。
static < K, V> Map< K, V> synchronizedMap( Map< K, V> m) 返回指定 Map 对象 对应的线程安全的 Map 对象。
static < T> Set< T> synchronizedSet( Set< T> s) 返回指定 Set 对象对应的线程 安全的 Set 对象。
static < K, V> SortedMap< K, V> synchronizedSortedMap( SortedMap< K, V> m) 返回 指定 SortedMap 对象 对应的线程 安全的 SortedMap 对象。
static < T> SortedSet< T> synchronizedSortedSet( SortedSet< T> s) 返回指定 SortedSet 对象对应的线程安全的 SortedSet 对象。

使用的话如下即可,注意如果需要包装,请一开始就设置为线程安全类

1
2
//使用Collections的方法将一个普通的HashMap包装成线程安全的类
HashMap m = Collections.synchronizedMap(new HashMap());

线程安全的集合类

Java5开始,就开始提供一些线程安全的集合类了

线程安全的集合类

从上图可以看出线程安全的集合类可分为如下两类:

  • 以 Concurrent 开头的集合类,如 ConcurrentHashMap(默认情况下,支持16个线程并发写入,超过的话可能需要等待。)、 ConcurrentSkipListMap、 ConcurrentSkipListSet、 ConcurrentLinkedQueue(当多个线程共享访问一个公共集合时,这是一个恰当的选择,但是这个类不允许存null值,多个线程访问时无需等待) 和 ConcurrentLinkedDeque。这个类写入线程的所有操作都是安全的,但是读取操作不必锁定线程。这些类采用了更复杂的算法来保证永远不会锁住整个集合,因此再并发写入时有更好的性能。上述类支持多线程并发访问,所以当使用迭代器来访问的时候,可能不能反映出创建迭代器之后所做的修改,但程序不会抛出任何异常。
  • 以CopyOnWrite开头的集合类,如 CopyOnWriteArrayList、 CopyOnWriteArraySet。当 线程 对 CopyOnWriteArrayList 集合执行读取操作时,线程将会直接读取集合本身,无须加锁与阻塞。不过在写入操作时会在底层复制一份新的数组,然后对新的数组进行写入操作。所以它是线程安全的,不过因为写入时需要频繁地复制数组,所以性能比较差。这些类适合用在读取比较多,写入少的时候。

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!