多线程

学习目标

  • 理解多线程的概念。
  • 通过实现Runnable接口开发任务类。
  • 使用Thread类创建线程以运行任务。
  • 使用Thread类中的方法控制线程。
  • 执行线程池中的代码。
  • 使用同步方法或阻塞同步线程,避免竞争状态。
  • 使用锁来同步线程。
  • 使用锁的条件便于线程通信。
  • 使用阻塞序列来同步对数组队列、链表队列以及优先队列的访问。
  • 使用信号量限制对共享资源的访问次数。
  • 使用资源排序技术来避免死锁。
  • 描述线程的生命周期。
  • 使用Collections类中的静态方法创建同步的集合。

引言

Java的重要功能之一就是内部支持多线程—在一个程序内部允许同时运行多个任务。在许多程序设计语言中,多线程都是通过调用依赖于系统的过程或函数来实现的。本文将介绍线程的概念以及如何在Java中开发多线程程序。

线程的概念

一个程序可能包含多个运行的任务。线程(thread)是指一个任务从头至尾的执行流。线程提供了运行一个任务的机制。对于Java而言,可以在一个任务中并发的启动多个线程。而这些线程可以在多处理器系统上同时运行。

单处理器系统中,多个线程共享CPU时间成为时间共享,而操作系统负责调度及分配资源给它们。

多线程可以使程序反应更快。交付性更强、执行效率更高。
当程序作为一个应用程序运行时,Java解释器为main方法启动一个线程。在Java中,每个任务都是Runnable接口的实例,也称为可运行对象(runnable object)。线程本质上讲就是便于任务执行的对象。

创建任务和线程

任务就是对象。
任务:任务类必须实现Runnable接口。接口很简单,包含一个run方法,这个方法就是来告诉系统线程将如何运行。
线程:任务必须在线程中执行。创建任务线程Thread thread = new Thread(task);
然后调用start()方法告诉Java虚拟机该线程准备运行。thread.start();

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TaskClass implements Runnable {
...
public TaskClass() {
}

// Implement the run method in Runnable
@Override
public void run() {
// Tell system hoe to run custom thread
...
}
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Client {
...
public void someMethod() {
...
// Create an instance of TaskClass
TaskClass task = new TaskClass();

// Create a theread
Thread thread = new Thread();

// Start a thread
thread.start();
...
}
...
}

重要的注意事项:任务中的run()方法指明如何完成这个任务。Java虚拟机会自动调用该方法,无需特意调用它。直接调用run()只是在同一线程中执行该方法,而没有新线程被启动。

Thread类

Thread类包含为任务而创建的线程的构造方法,以及控制线程的方法。
Thread类包括控制线程的方法

注意:Thread类还包含方法stop()、suspend()和resume()。由于普遍认为这些方法具有不安全因素,所以在Java2中不提倡(或不流行)这些方法。为替代方法stop()的使用,可以通过给Thread变量赋值null来表明它被停止。

yield()为其它线程临时让出CPU时间。

1
2
3
4
5
6
public void run() {
for (int i = 0; i < lastNum; i++) {
System.out.println(" " + i);
Thread.yield();
}
}

sleep()可以将该线程设置为休眠以确保其它线程的执行,休眠时间为指定的毫秒。

1
2
3
4
5
6
7
8
9
10
11
12
public void run() {
try {
for (int i = 0; i < 100; i++) {
System.out.println(" " + i);
if (i >= 50) {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

sleep方法抛出一个InterruptedException,这是一个必检异常。如果在一个循环中使用了sleep方法,那就将这个循环放在try-catch块中,如下所示

1
2
3
4
5
6
7
8
9
10
public void run() {
try {
while (...) {
...
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

如果循环在try-catch块外,即使线程被中断,它也可能会继续执行。

join()方法使一个线程等待另一个线程的结束。
join()

提示 在Java将来的版本中,优先级的数字可能会改变。为将这种变化带来的影响降低到最低,可以使用Thread类中的常量来指定线程优先级。
提示 如果总有一个优先级较高的线程在运行,或者有一个相同优先级的线程不退出,那么这个线程可能永远也没有运行的机会,这种情况称为资源竟争或缺乏状态(contention or starvation )。为避免竟争现象,高优先蜒的线程必须定时地调用sleep方法或yield方法,来给低优先级或相同优先级的线程一个运行的机会。

线程池

学习了如何通过实现java.lang.Runnable来定义任务类,以及如何像下面这样创建一个线程来运行任务:

1
2
Runnable task = new TaskClass(task);
new Thread(task).start();

该方法对单一任务的执行时很方便的,但是由于必须为每个任务创建一个线程,因此对大量的任务而言是不够高效的。为每个任务开始一个新线程可能会限制流量并且造成性能降低。线程池是管理并发执行任务个数的理想方法。Java提供`Excutor`接口来执行线程池中的任务,提供`ExcutorService`接口来管理和控制任务。ExcutorService是Excutor的子接口。
为了创建一个Excutor对象,可以使用ExcutorServices类中的静态方法,`newFixedThreadPool(int)`方法在池中创建固定数目的线程。如果线程完成了任务的执行,它可以被重新使用以执行另外一个任务。如果线程池中所有的线程都不是处于空闲状态,而且有任务在等待执行,那么在关机之前,如果由于一个错误终止了一个线程,就会创建一个新的线程来替代它。 如果线程池中多有线程都不是处于空闲状态,而且有任务在等待执行,那么newCachedThreadPoll()方法就会创建一个新线程。如果缓冲池中的线程在60秒内都没有被使用就该终止它。对多个小人物而言,一个缓冲池已经足够。

join()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.concurrent.*;

public class ExecutorDemo {
public static void main(String[] args) {
// Craete a fixed thread pool with maximun three threads
ExecutorService executor= Executors.newFixedThreadPool(3);

// Submit runnable tasks to the executor
executor.execute(new PaintChar('a', 100));
executor.execute(new PaintChar('b', 100));
executor.execute(new PaintNum(100));

// Shut odwn the executor
executor.shutdown();
}
}

在上面程序中第6行创建了一个最大线程数为3的线程池执行器。两个任务PrintChar、PrintNum。第9行创建任务new PrintChar(‘a’,100),并且将它添加到线程池中,在10~11行,创建了另外两个可运行的任务,并且将它们添加到同一个线程池中。执行器创建三个线程来并发执行三个任务。如果用下面的语句替换第6行

1
ExecutorService executor =  Executors.newFixedThreadPool(1);

会发生什么呢?这三个可运行的任务将顺序执行,因为线程池中只有一个线程。如果用下面的语句替换第6行,又会发生什么呢?

1
ExecutorService executor = Executors.newCachedThreadPool();

为每个等待的任务创建一个新线程,所以,所有的任务都并发的执行。
第14行的方法shutdown()通知执行器关闭。不能接受新的任务,但是现有的任务将继续热行直至完成。

提示 如果需要为一个任务创建一个线程,就使用Thread类。如果需要为多个任务创建线程,最好使用线程池。

使用new Thread()创建线程的弊端

  • 每次通过new Thread()创建对象性能不佳。
  • 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
  • 缺乏更多功能,如定时执行、定期执行、线程中断。

线程池使用场景

  • 单个任务处理的时间比较短
  • 将需处理的任务的数量大

使用线程池的好处

  • 降低资源消耗。重用存在的线程,减少对象创建、消亡的开销,提升性能。
  • 提高响应速度。可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

怎样使用线程池

创建线程池

可以通过ThreadPoolExecutor来创建一个线程池。

1
2
3
4
5
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)

创建线程池需要的几个参数:
corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。

keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。(ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue)

JDK自带四种线程池总类介绍

Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。下面这张图完整描述了线程池的类体系结构:

  1. newFixedThreadPool创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
  2. newCachedThreadPool创建一个可缓存的线程池。这种类型的线程池特点是:
    1).工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
    2).如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  3. newScheduleThreadPool创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。
  4. newSingleThreadExecutor创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行(我觉得这点是它的特色)。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的 。

总结:

  1. FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
  2. CachedThreadPool的特点就是在线程池空闲时,即线程池中没有可运行任务时,它会释放工作线程,从而释放工作线程所占用的资源。但是,但当出现新任务时,又要创建一新的工作线程,又要一定的系统开销。并且,在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。

你可以使用JDK自带的监控工具来监控我们创建的线程数量,运行一个不终止的线程,创建指定量的线程,来观察:
工具目录:C:\Program Files\Java\jdk1.6.0_06\bin\jconsole.exe
效果如下:

选择我们运行的程序:

线程同步

如果一个共享资源被多个线程同时访问,可能会遭到破坏。
假设创建并启动100个线程,每个线程都往同一个账户中添加一个便士。定义一个名为Account的类模拟账户,一个名为AddPennyTask的类用来想账户里添加一个便士,以及一个用于创建和启动线程的主类。这些类之间的关系如下;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.*;

public class AccountWithoutSync {
private static Account account = new Account();

public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();


for (int i = 0; i < 1; i++) {
executor.execute(new AddPennyTask());
}

executor.shutdown();


while (!executor.isTerminated()) {
}

System.out.println("What is balance? " +account.getBalance());
}


private static class AddPennyTask implements Runnable {
public void run() {
account.deposit(1);
}
}


private static class Account {
private int balance = 0;

public int getBalance() {
return balance;
}

public void deposit(int amount) {
int newBalance = balance +amount;

try {
Thread.sleep(5);
} catch (InterruptedException e) {
}

balance = newBalance;
}
}
}

在上面38~47里面就可能导致数据破坏,理论上得出的结果为100,然而实际结果可能小于100。就是可能出现多个线程同时执行39行导致多个线程做了同样的工作。像这种多个线程同时是用共同资源就叫做竞争状态。如果一个类的对象在多线程程序中没有导致竞争状态,则称这样的类为线程安全的

synchronized关键字

为了避免竞争状态,应该防止多个线程同时进入程序的某一特定部分,程序中的这部分称为临界区。如果上面的deposit方法就可以使用关键字synchronized来同步方法,以便一次只有一个线程可以访问这个方法。
当然解决上面的同步问题有几种办法。
一种是在通过在第38上的deposit方法中添加关键字synchronized,使Account类成为线程安全的,如下所示:

1
public synchronized void deposit(int amount)

一种是同步方法在执行之前需要加锁。对于实例方法,要给调用该方法的对象加锁,对于静态方法要给这个类加锁。如果一个线程调用一个对象上的同步实例方法(静态方法),首先给该对象(类)加锁,然后执行该方法,最后解锁。在解锁之前,另一个调用那个对象(类)中方法的线程将被阻塞,直到解锁。
随着deposit方法被同步化,前面的情景就不在会出现。如果任务1开始进入deposit方法,任务2就会被阻塞,直到任务1完成该方法的运行,如下;

同步语句

调用一个对象的同步实例方法要求给该对象加锁。调用一个类的同步静态方法要求对该类加锁。当执行方法中某一个代码块时,同步语句不仅可以用于对this对象加锁,而且可用于对任何对象加锁。这个代码块成为同步块。同步语句的一般形式如下所示:

1
2
3
synchronized(expr) {
statements;
}

表达式expr必须求出对象的引用。如果对象已经被另一个线程锁定,则在解锁之前,该线程将被阻塞。当获准对一个对象加锁时,该线程执行同步快中的语句,然后接触给对象所加的锁。
同步语句允许设置同步方法中的部分代码,而不必是整个方法。这大大增强了程序的并发能力。

1
2
3
synchronized(account) {
account.deposit(1);
}

注意 任何同步的实例方法都可以转换为同步语句。例如:

1
public synchronized void method() {}

等同于

1
2
3
4
public void method() {
synchronized (this) {
}
}

利用加锁同步

上面讲过,避免多线程操作同意资源,只要在方法中使用synchronized关键字就可以避免这种情况,如下所示:public synchronized void deposit(int amount) {}
同步的实例方法在执行方法之前都隐士的需要一把锁。
Java可以显示的加锁,这给协调线程带来了更多的控制功能。一个锁是一个Lock接口的实例,它定义了加锁和释放锁的方法。锁也可以是使用newCondition()方法创建任意个数的Condition对象,用来进行线程通信。

ReentrantLock是为创建相互排斥的锁的Lock的具体实现。可以创建具有特定的公平策略的锁。真正的公平策略确保等待时间最长的线程首先获得锁。假的公平策略将锁给任意一个在等待的线程。被多个线程访问的使用公共锁的程序,其整体性能可能比那些使用默认设置的程序差,但是在获取锁且避免资源缺乏时变得很小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class AccountWithoutSync {
private static Account account = new Account();

public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();


for (int i = 0; i < 1; i++) {
executor.execute(new AddPennyTask());
}

executor.shutdown();


while (!executor.isTerminated()) {
}

System.out.println("What is balance? " +account.getBalance());
}


private static class AddPennyTask implements Runnable {
public void run() {
account.deposit(1);
}
}


private static class Account {
private static Lock lock = new ReentrantLock(); // Craete a lock
private int balance = 0;

public int getBalance() {
return balance;
}

public void deposit(int amount) {
lock.lock(); // Acquire the lock

try {
int newBalance = balance +amount;



Thread.sleep(5);

balance = newBalance;
} catch (InterruptedException e) {
} finally {
lock.unlock(); // Release the lock
}
}
}
}

第33行创建一个锁,第41行获取该锁,第55行释放该锁。
如上:通常,使用synchronized方法或语句比使用相互排斥的显示锁简单些。然而,使用显示锁对同步具有状态的线程更加直观和灵活。

线程间协作

通过保证在临界区上多个线程的相互排斥,线程同步完全可避免竞争状态的发生,但是有时候,还需要线程之间的相互协作。使用条件便于线程间通信。一个线程可以指定在某种条件下该做什么。条件是通过调用Lock对象的newCondition()方法而创建的对象。一旦创建了条件,就可以使用await()、signal()和signalAll()方法来实现线程之间的相互通信。
await()可以让当前线程都处于等待状态,直到条件发生。
signal()唤醒一个等待的线程。
signalAll()唤醒所有等待的线程。

假设创建并启动两个任务,一个来想账户中存钱,另一个从同一账户中提款。当提款的数额大于当前的余额时,提款线程必须等待。不管什么时候,只要向账户新存入一笔资金,存入线程必须通知提款线程重新尝试。如果余额任未达到提款的数额,提款线程必须继续等待新的存款。两个任务的交互如下图所示:

程序清单 ThreadCooperation.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class ThreadCooperation {
private static Account account = new Account();

public static void main(String[] args) {
// Craete a thread pool with three threads
ExecutorService executor= Executors.newFixedThreadPool(2);
executor.execute(new DepositTask());
executor.execute(new WithdrawTask());
executor.shutdown();

System.out.println("Thread 1\t\tThread 2\t\tBalance");
}

// A task for adding an amount to the account
public static class DepositTask implements Runnable {
public void run() {
try {
while (true) {
account.deposit((int) (Math.random() * 10) + 1);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

// A task for subtracting an amount from the account
public static class WithdrawTask implements Runnable {
public void run() {
while (true) {
account.withdraw((int) (Math.random() *10) +1);
}
}
}

// An inner class for account
private static class Account {
// Create a new lock
private static Lock lock = new ReentrantLock();

// Create a condition
private static Condition newDeposit = lock.newCondition();

private int balance = 0;

public int getBalance() {
return balance;
}

public void withdraw(int amount) {
lock.lock();
try {
while (balance < amount) {
System.out.println("\t\tWait for a deposit");
newDeposit.await();
}

balance -= amount;
System.out.println("\t\tWithdraw " + amount + "\t\t" +getBalance());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}




public void deposit(int amount) {
lock.lock();
try {
balance += amount;
System.out.println("Deposit " + amount +"\t\t\t\t\t" +getBalance());

// Signal thread waiting on the condition
newDeposit.signalAll();
} finally {
lock.unlock();
}
}
}
}

程序创建并提交存款任务(第10行)和提款任务(第11行)。为让提款任务运行,特意让存款任务进人休眠
状态(第23行)。如果没有足够的资金可提取,则提款任务等待(第59行)存款任务弓’余额变化的通知(第82行)。
第44行创建个锁,锁上名为newDeposit的条件在第47行创建。一个条件对应一个锁。在等待和通知状态之前,线程必须先获取该条件的锁。当没有足够可取的数目时,提款t任务在第56行获取锁,等待newDeposit条件(第60行),并且在第70行释放该锁。存款任务在第75行获取锁,在有新的钱存入之后通知所有newDeposit条件的等待线程(第82行)。
如果将第59一60行的while循环用if语句代替,会出现什么情况?

1
2
3
if (balance < amount) {	                          System.out.println("\t\tWait for a deposit");
newDeposit.await();
}

只要余额发生变化,存款任务都会通知提款任务。当唤醒提款任务时,条件(balance < amount)的判断结果可能仍然为true。如果使用if语句,提款任务有可能永久等待。如果使用循环语句,则提款任务可以有重新检验条件的机会。因此,应该在循环语句中测试条件。

警告 一旦线程调用条件上的await(),线程就进入等待状态,等待恢复的信号。如果忘记对状态调用signal()或者signalAll()那么线程就永远等待下去。
警告 条件由Lock衬象创建。为了调用任意方法(例如,await(), signal()和signalAll())
必须首先拥有锁。如果没有获取锁就调用这些方法,会抛出I11ega1MonitorStateException异常。

Jvava内置监视器

锁和条件是Java5中的新内容。在Java5之前,线程通信是使用对象的内置监视器编程实现的。锁和条件与内置监视器相比是非常强天且灵活的,因此,本节可以完全忽略。然而,如果使用传统的Java代
代码,就可能会碰到Java的内置监视器。

监视器是一个相互排斥且具备同步能力的对象。监视器中的一个时间点上只能有一个线程执行一个方法。线程通过获取监视器上的锁进入监视器,并且通过释放锁退出监视器。任意对象都可能是一个监视器。一旦一个线程锁住对象,该对象就成为监视器。就锁是通过在方法或块上使用synchronized关键字来实现的。在执行同步方法或块之前,线程必须获取锁。如果条件不适合线程继续在监视器中执行,线程可能在监视器中等待。可以对监视器对象调用wait()方法来释放锁,这样其它的一些监视器中的线程就可以获取它,也就有可能改变监视器的状态。当条件合适时,另一线程可以调用notify()ynotifyAll()方法来通知一个或所有的等待线程重新获取锁并且恢复执行。
wait()、notify()ynotifyAll()方法必须在这些方法的接受对象的同步方法或同步快中调用,否则,就会出现会抛出I11ega1MonitorStateException异常。

生产者/消费者

考虑典型的消费者/生产者例子。假设使用缓冲区存储整数。缓冲区的大小是受限的。缓冲区提供write(int)方法将一个int值添加到缓冲区中,还提供方法read()从缓冲区中读取和删除一个int值。为了同步这个操作,使用具有两个条件的锁:notEmpty(即缓冲区非空)和notFull(即缓冲区未满)。当任务向缓冲区添加一个int时,如果缓冲区是满的,那么任务将会等待notFull状态。当任务从缓冲区中删除一个int时,如果缓冲区是空的,那么任务将等待notEmpty状态。两个任务之间的交互如下图所示。

程序清单ConsumerProducer.java。是一个完整的程序。程序包括了。uffer类(第48 – 95行)以及重复向缓冲区产生数字和重复从缓冲区消耗数字的两个任务(第16一45行)。write(int)方法(第60行)向缓冲区添加一个整数。read()方法(第77行)从缓冲区删除和返回一个整数。
缓冲区实际上是一个先进先出的队列(第50一51行)。锁的状态notEmpty和notFull在第57——58行创建。状态和锁捆绑在一起。在应用一个状态之前必须获取一个锁。如果使用wait()和notify()方法重写这个例子,必须指派两个对象作为监视器。

程序清单ConsumerProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class ConsumerProducer {
private static Buffer buffer = new Buffer();

public static void main(String[] args) {
// Create a thread pool write with two threads
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}

// A task for adding an int to the buffer
private static class ProducerTask implements Runnable {
public void run() {
try {
int i = 1;
while (true) {
System.out.println("Producer writes " + i);
buffer.write(i++); // Add a value to the buffer
// Put the thread to sleep
Thread.sleep((int) (Math.random() *10000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// A task for reading and deleting an int from the buffer
private static class ConsumerTask implements Runnable {
public void run() {
try {
while (true) {
System.out.println("\t\t\tConsumer reads " + buffer.read());
// Put the thread to sleep
Thread.sleep((int) (Math.random() *10000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// An inner class for buffer
private static class Buffer {
private static final int CAPACITY = 1; // buffer size
private java.util.LinkedList<Integer> queue =
new java.util.LinkedList<Integer>();

// Create a new lock
private static Lock lock = new ReentrantLock();

// Create two conditions
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();

public void write(int value) {
lock.lock(); // Acquire the lock
try {
while (queue.size() == CAPACITY) {
System.out.println("Wait for notFull condition");
notFull.wait();
}

queue.offer(value);
notEmpty.signal(); // Signal notEmpty condition
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // Release the lock
}
}

public int read() {
int value = 0;
lock.lock(); // Acquire the lock
try {
while (queue.isEmpty()) {
System.out.println("\t\t\tWait for notEmpty condition");
notEmpty.await();
}

value = queue.remove();
notFull.signal(); // Signal notFull condition
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // Release the lock
}
return value;
}
}
}

这个程序运行的结果实例如下图所示。

阻塞队列

阻塞队列在试图想一个满队列添加元素或者从空队列中删除元素时导致线程阻塞。BlockingQueue接口扩咋java.util.Queue,并且提供同步的put和take方法向队列头添加元素,以及从队列尾删除元素,如下图所示。

Java支持三个具体的阻塞队列 ArrayBlockingQueeu、LinedBlockingQueue 和 PriorityBlockingQueue 如下图所示。他们都在java.util.concurrent包中。ArrayBlockingQueeu 使用数组实现阻塞队列。必须制定一个容量或者可选的公平性来构造 ArrayBlockingQueeu。LinedBlockingQueue 使用链表实现阻塞队列。可以创建不受限的或受限的 LinedBlockingQueue。PriorityBlockingQueue 是优先队列。可以创建不受限的或受限的优先队列。

程序清单给出使用 ArrayBlockingQueeu 来简化程序清单ConsumerProducer.java生产者/消费者例子。第5行创建一个 ArrayBlockingQueeu 来存储数组。生产者线程将一个整数放入队列中(第22行),而消费者线程从队列中取走一个整数(第37行)。者线程从队列中取走一个整数(第37行)。

程序清单ConsumerProducerUsingBlockingQueue.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.*;

public class ConsumerProducerUsingBlockingQueue {
private static ArrayBlockingQueue<Integer> buffer =
new ArrayBlockingQueue<Integer>(2);

public static void main(String[] args) {
// Create a thread pool write with two threads
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new ProducerTask());
executor.execute(new ConsumerTask());
executor.shutdown();
}

// A task for adding an int to the buffer
private static class ProducerTask implements Runnable {
public void run() {
try {
int i = 1;
while (true) {
System.out.println("Producer writes " + i);
buffer.put(i++); // Add any value to the buffer
// Put the thread to sleep
Thread.sleep((int) (Math.random() *10000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// A task for reading and deleting an int from the buffer
private static class ConsumerTask implements Runnable {
public void run() {
try {
while (true) {
System.out.println("\t\t\tConsumer reads " + buffer.take());
// Put the thread to sleep
Thread.sleep((int) (Math.random() *10000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

信号量

信号量可以用来限制访问共享资源的线程数。在访问资源之前,线程必须从信号量获取许可。在访问完资源之后,这个线程必须将许可返回给信号量。

为了创建信号量,必须使用可选择的公平策略来确定许可的数量,如图29-24所示。任务通过调用信号量的acquire()方法来获得许可,通过调用信号量的release()方法来释放许可。一旦获得许可,信号量中可用许可的总数减1。一旦许可被释放,信号量中可用许可的总数加1。

只有一个许可的信号量可以用来模拟一个相互排斥的锁。程序清单Account内部类使用信号量修改了上面程序清单的Account内部类,确保一个时间只有一个线程可以访问deposit方法。
程序清单Account内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.Semaphore;

public class Account {
// Create a semaphore
private static Semaphore semaphore = new Semaphore(1);
private int balance = 0;

public int getBalance() {
return balance;
}

public void deposit(int amount) {
try {
semaphore.acquire(); // Acquire a permit
int newBalance = balance + amount;



Thread.sleep(5);

balance = newBalance;
} catch (InterruptedException ex) {
} finally {
semaphore.release(); // Release a permit
}
}
}

有一个许可的信号量在第4行创建。当执行第13行的存款方法时,一个线程首先获得许可。在余额更新之吼线程在第25行释放该许可。总是将release()方法放到finally子句中是一个很好的习惯,这样可以确保即使发生异常也能最终释放该许可。

避免死锁

有时两个或多个线程需要在几个共享对象上获取锁。这可能会导致死摘(deadlock),也就是说,每个线程已经锁定一个对象,而且正在等待锁定另一个对象。考虑有两个线程和两个对象的情形,如图29-
25所示。线程1获取bjectl上的锁,而线程2获取obj
ect2上的锁。现在线程1等待object2上的锁。线程2等待object1上的锁。每个线程都在等待另一个线程释放它所需要的锁,结果导致两个线程都无法继续运行。
死锁

使用一种名为资源排序(resource oraenng)的简单技术可以轻易的避免死锁的发生。该技术是给每一个需要锁的对象指定一个顺序,确保每个线程都按这个顺序来获取锁。例如,在图29-25中,假设按bjectl、object2的顺序对两个对象排序。采用资源排序技术,线程2必须先获取bject1上的锁,然后才能获取object2上的锁。一旦线程1获取了bject1上的锁,线程2必须等待object1上的锁。所以,
线程1就能获取object2上的锁,不会再发生死锁现象。

线程的状态

任务在线程中执行。线程可以是以下五种状态之一:新建、就绪、运行、阻塞或结束(如图29-26所示)。
线程的状态

新创建一个线程时,它就进人新建状态(New)。调用线程的start()方法启动线程后,它进入就绪状态(Ready)。就绪线程是可运行的,但可能还没有开始运行。操作系统就必须为它分配CPU时间。
就绪线程开始运行时,它就进人运行状态(Running) 。如果给定的CPU时间调用完或调用线程的yie工d()方法,处于运行状态的线程可能就进入就绪状态。
有几种原因可能使线程进人阻塞状态(Blocked)(即非活动状态)。可能是它自己调用了join()、sleep()或wait()方法,也可能是其他线程调用了这些方法。它可能是在等待I/O操作的完成。当阻塞行为不起阻塞作用时,阻塞线程可能被重新激活。例如,如果线程处于休眠状态并且休眠时间已满,线程就会被重新激活并进人就绪状态。
最后,如果一个线程执行完它的run()方法,这个线程就被结束(finished)。
isAlive()方法是用来判断线程状态的方法。如果线程处于就绪、阻塞或运行状态,则返回true;如果线程处于新建并且没有启动的状态,或者已经结束,则返回false。
方法interrupt()按下列方式中断一个线程:当线程处于就绪状态或运行状态时,给它设置一个中断标志;当线程处于阻塞状态时,它将被唤醒并进人就绪状态,同时抛出异常java.lang.InterruptedException。

同步集合

Java集合框架中的类不是线程安全的,也就是说,如果它们同时被多个线程访问和更新,它们的内容可能被破坏。可以通过锁定集合或者同步集合保护集合中的数据。
Collection“类提供六个静态方法来将集合转成同步版本,如图29-27所示。使用这些方法创建的集合称为同步包装类
同步集合

参考资料:

JAVA线程池原理以及几种线程池类型介绍
Java几种线程池的分析和使用

热评文章