1. 线程概述
1.1 线程和进程
- 线程: 当一个程序运行时, 内部可能包含多个顺序执行流, 每个顺序执行流就是一个线程
- 进程(Process): 每个运行中的程序就是一个进程
- 操作系统可以同时执行多个任务, 每个任务就是一个进程, 进程可以同时执行多个任务, 每个任务就是线程; 一个程序运行之后至少有一个进程, 一个进程里包含了多个线程, 但至少要包含一个线程
进程的三个特征
- 独立性: 进程是系统中独立存在的实体, 它可以拥有自己独立的资源, 每一个进程都拥有自己私有的地址空间; 在没有经过进程本身允许的情况下, 一个用户进程不可以直接访问其他进程的地址空间
- 动态性: 进程与程序的区别在于, 程序只是一个静态的指令集合, 而进程是一个正在系统中活动的指令集合, 在进程中加入了时间的概念, 进程具有自己的声明周期和各种不同的状态, 这些概念在程序中是不具备的
- 并发性: 多个进程可以在单个处理器上并发执行, 多个进程之间互不影响
并发(concurrency)和并行(parallel)的区别
- 并行指在同一时刻, 有多条指令在多个处理器上同时执行; 并发指在同一时刻只能有一条指令执行, 但多个进程指令被快速的轮换执行, 使得在宏观上具有多个进程同时执行的效果
1.2 多线程的优势
- 线程在程序中是独立的, 并发的执行流, 与分隔的进程相比, 进程中线程之间的隔离程度要小, 他们共享内存, 文件句柄和其他每个进程应有的状态
- 线程比进程具有更高的性能, 这是由于同一个进程中的线程都有共性---多个线程共享同一个进程虚拟空间. 线程共享的环境包括: 进程代码段, 进程的公有数据等, 利用这些共享数据, 线程之间通信很容易
多线程的优势
- 进程之间不能共享内存, 但线程之间共享内存非常容易
- 系统创建进程时需要为该进程重新分配系统资源, 但创建线程则代价小的多, 因此使用多线程来实现多任务并发比多进程的效率高
- Java语言内置了多线程功能支持, 而不是单纯地作为底层操作系统的调度方式, 从而简化了Java的多线程编程; Java运行时的主线程是由main()方法确定的, main()方法的方法体就是主线程的线程执行体
2. 线程的创建和启动
2.1 继承 Thread 类创建线程类
- Java使用 Thread 类代表线程, 所有的线程对象都必须是 Thread 类或其子类的实例, 每个线程的作用是完成一定任务,实际上就是执行一段程序流(一段顺序执行的代码)
- 使用继承 Thread 类的方法来创建线程时, 多个线程之间无法共享线程类的实例变量
- 通过继承 Thread 类来创建并启动多线程的步骤:
- 定义 Thread 类的子类(创建一个类 extends Thread 类), 并重写(@override)该类的 run() 方法, 该 run() 方法的方法体就代表了线程需要完成的任务, 因此 run() 方法也叫做 此案成执行体
- 创建 Thread 子类的实例, 即创建了线程对象
- 调用线程对象的 start() 方法来启动该线程
Thread类的构造方法
- public Thread(): 分配一个新的线程对象
- public Thread(String name): 分配一个指定名字的新的线程对象
- public Thread(Runnable target): 分配一个带有指定目标新线程对象
- public Thread(Runnable target, String name): 分配一个带有指定目标新的线程对象并指定名字
Thread 类的常用方法
- Thread.currentThread(): currentThread() 方法是 Thread 类的静态方法, 该方法总是返回当前正在执行的线程对象
- getName(): 该方法是 Thread 类的实例方法, 该方法返回调用该方法的线程名字
- setName(String name): 通过该方法可为线程设置名字
- start(): 导致次线程开始执行, Java虚拟机调用此线程的run方法
- Thread.sleep(long millis): 使当前正在执行的线程以指定的毫秒数暂停
代码示例
// 通过继承 Thread 类来创建线程类
public class FirstThread extends Thread {
private int i;
// 重写 run() 方法, run() 方法的方法体就是线程执行体
public void run() {
for (; i < 100; i++) {
// 当线程类继承Thread类时, 直接使用this即可获取当前线程
// Thread 对象的 getName() 返回当前线程的名字
// 因此可以直接调用 getName() 方法返回当前线程的名字
System.out.println(getName() + " " + i);
}
}
//
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
// 调用 Thread 的 currentThread() 方法获取当前线程
System.out.println(Thread.currentThread().getName() + " " + i);
if (i == 20) {
// 创建并启动第一个线程
new FirstThread().start();
// 创建并启动第二个线程
new FirstThread().start();
}
}
}
}
2.2 实现 Runnable 接口创建线程类
- Runnable 对象仅仅作为Thread对象的 target, Runnable 实现类里包含的 run() 方法仅作为线程执行体,
而实际的线程对象依然是 Thread 实例, 只是该 Thread 线程负责执行其 target 的 run() 方法创建Runnable实现类的实例,并以此实例作为Thread的target来创建Thread对象,该Thread对象才是真正的线程对象 - Runnable 接口中只包含了一个抽象方法, 所以从Java8开始, Runnable接口使用了 @FunctionInterface修饰; 所以Runnable是一个函数式接口, 可以使用Lambda表达式创建Runnable对象
- 采用 Runnable 接口的方式创建的线程可以共享线程类的实例变量,因此, 程序所创建的 Runnable 对象只是线程的 target, 而多个线程可以共享一个 target, 所以多个线程可以共享同一个线程类(实际上应该是线程的target类)的实例变量
- 通过实现 Runnable 接口来创建并启动多线程的步骤:
- 定义 Runnable 接口的实现类, 并重写该接口的run()方法, 该 run() 方法的方法体同样也是该线程的线程执行体
- 创建 Runnable 实现类的实例, 并以此实例作为 Thread 类的构造器的 target 来创建 Thread 对象, 该 Thread 对象才是真正的线程对象
- 调用线程对象的 start() 方法启动
实现 Runnable 接口比继承 Thread 类所具有的优势
- 适合多个相同的程序代码的线程去共享同一个资源
- 可以避免java中的单继承的局限性
- 增加程序的健壮性,实现解耦操作,代码可以被多个线程共享,代码和线程独立
- 线程池只能放入实现Runnable或Callable类线程,不能直接放入继承Thread的类
代码示例
public class SecondThread implements Runnable {
private int i;
// run()方法同样是线程执行体
@Override
public void run() {
for (; i < 100; i++) {
// 当线程类实现 Runnable 接口时
// 如果想获取当前线程,只能用 Thread.currentThread() 方法
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
//
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
if (i == 20) {
SecondThread st = new SecondThread();
// 通过 new Thread(target, name) 方法创建新线程
new Thread(st, "新线程1").start();
new Thread(st, "新线程2").start();
}
}
}
}
2.3 使用 Callable 和 Future 创建线程
- 从Java5开始, Java提供的 Callable 接口(Runnable接口的增强版), Callable 接口提供了一个 call() 方法可以作为线程执行体; 因此可以提供一个 Callable对象作为 Thread 的 target, 而该线程的线程执行体就是该 Callable 对象的 call() 方法;
因为 Callable 接口不是 Runnable 接口的子接口, 所以 Callable 对象不能直接作为 Thread 的 target; call() 方法并不是直接调用, 而是作为线程执行体被调用 - call() 方法可以有返回值, 也可以声明抛出异常
- Java5提供了 Future 接口来代表 Callable 接口里 call() 方法的返回值, 并为 Future 接口提供了一个 FutureTask 实现类, 该实现类实现了 Future 接口, 并实现类 Runnable 接口, 可以作为 Thread 类的 target
- Callable 接口是有泛型限制, Callable 接口里的泛型形参类型与 call() 方法返回值的类型相同
- Future 接口定义的公共的控制关联的 Callable 任务的方法:
- boolean cancel(boolean mayInterruptIfRunning): 试图取消该 Future 里关联的 Callable 任务
- V get(): 返回 Callable 任务里 call() 方法的返回值, 调用该方法将导致程序阻塞, 必须等到子线程结束后才会得到返回值
- V get(long timeout, TimeUnit unit): 返回 Callable 任务里的 call() 方法的返回值, 该方法让程序最多阻塞 timeout 和 unit 指定的时间, 如果经过指定时间后 Callable 任务依然没有返回值, 将会抛出 TimeoutException 异常
- boolean isCancelled(): 如果在 Callable 任务正常完成前被取消, 则返回 true
- Boolean isDone(): 如果 Callable 任务已经完成, 则返回true
创建并启动有返回值的线程的步骤
- 创建Callable 接口的实现类, 并实现call()方法, 该 call() 方法将作为线程执行体, 且该 call() 方法有返回值, 再创建 Callable 实现类的实例; 可以使用 Lambda 表达式创建 Callable 对象
- 使用 FutureTask 类来包装 Callable 对象, 该 FutureTask 对象封装了该 Callable 对象的 call() 方法的返回值
- 使用 FutureTask 对象作为 Thread 对象的 target 创建并启动新线程
- 调用 FutureTask 对象的 get() 方法来获取子线程执行结束后的返回值
代码示例
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
//
public class ThirdThread {
public static void main(String[] args) {
// 创建 Callable 对象
ThirdThread rt = new ThirdThread();
// 先使用 Lambda 表达式来创建 Callable<Integer>对象
// 在使用 FutureTask来包装 Callable 对象
FutureTask<Integer> task = new FutureTask<Integer>((Callable<Integer>)() ->{
int i = 0;
for (; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " 的循环变量i的值是: " + i);
}
return i;
});
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " 的循环变量i的值是: " + i);
if (i == 20) {
// 实质还是以 Callable 对象来创建并启动线程的
new Thread(task, "有返回值的线程").start();
}
}
try{
// 获取线程返回值
System.out.println("子线程的返回值: " + task.get());
} catch(Exception e) {
e.printStackTrace();
}
}
}
2.4 创建线程的三种方式对比
- 采用 Runnable 或者 Callable 接口的话, 线程类还可以继承其他类
- 采用 Runnable 或者 callable 的方式下, 多个线程可以共享同一个 target 对象, 适合多个相同线程来处理同一份资源的情况, 从而可以将CPU, 代码和数据分开, 做到解耦
- 采用 Runnable 或者 callable 的方式下, 访问当前线程还是需要使用 Thread.currentThread() 方法
3. 线程的生命周期
新建(New) --> 就绪(Runnable) --> 运行(Running) --> 阻塞(Blocked) --> 死亡(Dead)
3.1 新建和就绪
- 当程序使用 nwe 关键字创建了一个线程之后, 该线程就处于新建状态, 此时它和其他的 Java 对象一样, 仅仅由 Java 虚拟机为其分配内存, 并初始化器成员变量的值, 此时的线程对象没有表现出任何线程的动态特征, 程序也不会执行线程的线程执行体
- 当线程对象调用了 start() 方法之后, 该线程处于就绪状态, Java虚拟机会为其创建方法调用栈和程序计数器, 处于这个状态中的线程并没有开始运行, 只是标识该程序可以运行了, 至于该线程何时开始运行, 取决于JVM 里线程调度器的调度
- 启动线程使用 start() 方法, 而不是 run() 方法! 如果直接调用 run() 方法, 系统把线程对象当成一个普通对象, 而run()方法也是一个普通方法, 而不是线程执行体
- 只能对处于新建状态(通过 new 关键字创建一个线程)的线程调用 start() 方法, 否则引发 IllegalThreadStartException 异常
- 调用子线程的 start() 方法后子线程不会立即开始执行, 程序可以使用 Thread.sleep(1)来让当前运行的线程(主线程)睡眠1毫秒, 这样就可以让子线程立即开始执行
代码示例
/**
* @author jefxff
* @date 2019/12/18 - 15:40
*/
public class InvokeRun extends Thread {
private int i;
// 重写 run() 方法, run() 方法的方法体就是线程执行体
@Override
public void run() {
for (; i < 100; i++) {
// 直接调用 run() 方法时, Thread 的 this.getName() 方法返回的是该对象的名字
// 而不是当前线程的名字
// 使用 Thread.currentThread().getName() 获取当前线程的名字
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
//
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
// 调用Thread的currentThread()方法获取线程
System.out.println(Thread.currentThread().getName() + " " + i);
if(i == 20) {
// 直接调用线程对象的run()方法
// 系统会把线程对象当成普通对象, 把 run() 方法当成普通方法
// 所以下面两行代码并不会启动两个线程, 而是依次执行两个 run() 方法
// new InvokeRun().run();
// new InvokeRun().run();
new InvokeRun().start();
new InvokeRun().start();
}
}
}
}
3.2 运行和阻塞状态
- 再单CPU的计算机中,同一时刻只有一个线程处于运行状态, 如果处于就绪状态的线程获得了CPU, 开始执行run()方法的线程执行体, 则该线程处于运行状态; 但是在多CPU的计算机中, 将会是多个线程并行(parallel)执行, 当要执行的线程大于CPU处理器个数时, 还是会出现线程在同一个CPU上轮换执行的现象
- 当一个线程开始运行后, 他不可能一直处于运行状态, 线程在运行过程中需要被中断, 目的是使其他线程获得执行的机会, 线程调度的细节取决于底层平台所采用的策略.
- 当前正在执行的线程被阻塞之后, 其他线程就可以获得执行的机会; 但被阻塞的线程会在合适的时候从新进入就绪状态(不是将进入运行状态); 被阻塞的线程的阻塞解除之后, 必须重新等待线程调度器再次调度它
导致线程进入(解除)阻塞状态的情况
- 线程调用 sleep() 方法主动放弃所占用的处理器资源(经过了指定的时间后即可解除阻塞)
- 线程调用了一个阻塞式IO方法, 在该方法返回之前, 该线程被阻塞(阻塞式IO方法已经返回即可解除阻塞)
- 线程试图获得一个同步监视器, 但该同步监视器正被其他线程所持有(线程成功地获得了试图取得的同步监视器即可解除阻塞)
- 线程在等待某个通知(notify)(线程正在等待某个通知时, 其他线程发出了一个通知即可解除阻塞)
程序调用了线程的 suspend() 方法将该线程挂起, 但这个方法容易导致死锁, 尽量不用(处于挂起状态的线程被调用了resume()恢复方法即可解除阻塞)- 线程状态转换
3.3 线程死亡
- 线程结束的三种方式(结束后的线程处于死亡状态)
- run() 或 call() 方法执行完成, 线程正常结束
- 线程抛出一个未捕获的 Exception 或 Error
- 直接调用该线程的 stop() 方法来结束该线程(该方法容易导致死锁, 不使用)
- 当主线程(如main线程)结束时, 其他线程不受任何影响, 并不会随之结束, 一旦子线程启动起来后, 他就拥有和主线程相同的地位, 它不会受主线程的影响
- 可以调用线程对象的 isAlive() 方法来测试线程是否已经死亡, (当线程处于就绪, 运行, 阻塞时返回true; 当线程处于新建, 死亡时返回false)
- 处于死亡状态的线程无法再次运行, 不可再次调用 start() 方法试图启动线程
4. 控制线程
4.1 join 线程
- join() 方法: Thread提供的让一个线程等待另一个线程完成的方法; 当在某个程序执行流中调用其他线程的 join() 方法时, 调用线程将被阻塞, 直到被 join() 方法加入的 join 线程执行完为止
- join() 方法通常由使用线程的程序调用, 以用来将大问题划分成为许多的小问题, 每个小问题分配一个线程来处理, 当所有的小问题都得到处理后, 再调用主线程来进一步操作
join() 方法的三种重载形式
- join(): 等待被join的线程执行完成
- join(long millis): 等待被join的线程的时间最长为milli毫秒; 如果在 millis 毫秒内被 join 的线程还没执行完成, 则不再等待
join(long millis, int nanos): 等待被join的线程的时间最长为millis毫秒加上nanos豪微秒
代码示例
public class JoinThread extends Thread {
// 提供一个有参数的构造器, 用于设置该线程的名字
public JoinThread(String name) {
super(name);
}
// 重写 run() 方法, 定义线程的执行体
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println(getName() + " " + i);
}
}
//
public static void main(String[] args) throws Exception{
// 启动子线程
new JoinThread("新线程").start();
for (int i = 0; i < 100; i++) {
if(i == 20){
JoinThread jt = new JoinThread("被 join 的线程");
jt.start();
// main 线程调用了 jt 线程的 join() 方法, main 线程必须等 jt 执行结束才会向下执行
jt.join();
}
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
}
4.2 后台程序
- 后台线程(Daemon Thread): 在后台运行的线程, 他的任务是为其他的线程提供服务; 又叫做 "守护线程" 或 "精灵线程"; 如:JVM的垃圾回收线程
- 后台线程的特征: 如果所有的前台线程都死亡, 后台线程会自动死亡
- 调用 Thread 对象的 setDaemon(true) 方法(在该线程调用start()方法之前)可以将指定的线程设置称为后台线程
- 通过 isDaemon() 方法, 判断指定的线程是否为后台线程
代码示例
public class DaemonThread extends Thread {
// 定义后台线程的线程执行体(run()方法的方法体)与普通线程没有任何区别
@Override
public void run(){
for (int i = 0; i < 100; i++) {
System.out.println(getName() + " " + i);
}
}
//
public static void main(String[] args) {
DaemonThread t = new DaemonThread();
// 将此线程设置成后台线程
t.setDaemon(true);
// 启动线程
t.start();
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
// 到此行, 前台线程(main线程)执行完成
// 后台线程也随之结束
}
}
4.3 线程睡眠: sleep
- 线程睡眠: 当前执行的线程调用sleep()方法让正在执行的线程暂停一段时间, 进入阻塞状态
- 当当前线程调用 sleep() 方法进入阻塞状态后, 在其睡眠时间段内, 该线程不hi获得执行的机会, 即使系统中没有其他可执行的线程, 处于 sleep() 中的线程也不会执行
- sleep() 方法的两种重载形式:
- static void sleep(long millis): 让当前执行的线程暂停 millis 毫秒, 并进入阻塞状态, 该方法收到系统计时器和线程调度器的精度与准确度的影响
static void sleep(long millis, int nanos): 让当前执行的线程暂停 millis 毫秒加 nanos 微秒, 并进入阻塞状态,很少用
4.4 线程让步: yield
- yield 方法是 Thread 类提供的一个静态方法, 调用 yield 方法也会让当前正在执行的线程暂停, 但不会阻塞该线程, 它只是将该线程转入就绪状态
- 当某个线程调用了 yield() 方法暂停之后, 只有优先级与当前线程相同, 或者优先级比当前线程更高的处于就绪状态的线程才会获得执行的机会
代码示例
public class YieldTest extends Thread {
public YieldTest(String name) {
super(name);
}
// 定义 run() 方法作为线程的执行体
@Override
public void run(){
for (int i = 0; i < 50; i++) {
System.out.println(getName() + " " + i);
// 当i等于20时, 使用yield()方法让当前的线程让步
if(i == 20) {
Thread.yield();
}
}
}
public static void main(String[] args) throws Exception {
// 启动两个并发的线程
YieldTest yt1 = new YieldTest("高级");
// 将yt1的线程设置成最高优先级
yt1.setPriority(Thread.MAX_PRIORITY);
yt1.start();
YieldTest yt2 = new YieldTest("低级");
// 将yt2的线程设置成低优先级
// yt2.setPriority(Thread.MIN_PRIORITY);
yt2.start();
}
}
yield() 和 sleep() 方法的区别
- sleep() 方法暂停当前线程后, 会给其他线程执行机会, 不会理会其他线程的优先级; 但 yield() 方法只会给优先级相同, 或优先级更高的线程执行机会
- sleep() 方法会将线程转入阻塞状态, 直到经过阻塞时间才会转入就绪状态; 而 yield() 不会将线程转入阻塞状态, 它只是强制当前线程进入就绪状态
- sleep() 方法声明抛出了 InterruptedException 异常, 所以调用 sleep() 方法时要么捕获该异常, 要么显示声明抛出该异常, 而 yield() 方法则没有声明抛出任何异常
- sleep() 方法比 yield() 方法有更好的移植性, 通常不使用 yield() 方法来控制线程的并发
4.5 改变线程的优先级
- 每个线程执行时都有一定的优先级, 默认的优先级都与创建它的父线程的优先级相同, 默认情况下, main 线程具有普通优先级, 而由 main 创建的子线程都具有普通的优先级
- Thread 类提供了 setPriority(int newPriority) 方法来设置和返回指定的线程的优先级; 其中 setPriority() 方法的参数可以是一个整数, 范围是1~10之间, 也可以使用三个静态常量
- MAX_PRIORITY: 其值是 10
- MIN_PRIORITY: 其值是 1
- NORM_PRIORITY: 其值是 5
- 获得更高优先级的线程会获得更多的执行机会
代码示例
public class PriorityTest extends Thread {
// 定义有参数的构造器, 用于常见线程试指定name
public PriorityTest(String name) {
super(name);
}
@Override
public void run(){
for (int i = 0; i < 50; i++) {
System.out.println(getName() + ", 其优先级是: " + getPriority() + ", 其循环变量的值是: " + i);
}
}
public static void main(String[] args) {
// 改变主线程的优先级为6
Thread.currentThread().setPriority(6);
for (int i = 0; i < 30; i++) {
if (i == 10) {
PriorityTest low = new PriorityTest("低级");
low.start();
System.out.println("创建之初的优先级: " + low.getPriority());
// 设置该线程为最低优先级
low.setPriority(Thread.MIN_PRIORITY);
}
if (i == 10) {
PriorityTest high = new PriorityTest("高级");
high.start();
System.out.println("创建之初的优先级: " + high.getPriority());
// 设置该线程为最高优先级
high.setPriority(Thread.MAX_PRIORITY);
}
}
}
}
5. 线程同步
5.1 线程安全问题
- 通过两个线程操作银行取钱的操作, 模拟两个人使用同一个账户并发的取钱, 来理解线程安全问题
代码示例
// Account 类来模拟用户账户的类
public class Account {
// 封装账户编号, 账户余额, 两个成员变量
private String accountNo;
private double balance;
// 无参数构造器
public Account(){}
// 有参数构造器
public Account(String accountNo, double balance) {
this.accountNo = accountNo;
this.balance = balance;
}
// setter getter
public String getAccountNo() {
return accountNo;
}
public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}
public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
// 重写 hashCode 方法
@Override
public int hashCode() {
return accountNo.hashCode();
}
// 重写 equals() 方法
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj.getClass() == Account.class) {
Account target = (Account)obj;
return target.getAccountNo().equals(accountNo);
}
return false;
}
}
// DrawThread 类来模拟两个用户的取钱线程
public class DrawThread extends Thread {
// 模拟用户账户
private Account account;
// 当前取钱线程所希望的钱数
private double drawAmount;
public DrawThread(String name, Account account, double drawAmount){
super(name);
this.account = account;
this.drawAmount = drawAmount;
}
// 当多个线程修改同一份共享数据时, 将涉及数据安全问题
@Override
public void run() {
// 账户余额大于取钱数目
if (account.getBalance() >= drawAmount) {
// 吐出钞票
System.out.println(getName() + " 取钱成功, 吐出钞票: " + drawAmount);
/*
try {
Thread.sleep(1);
}
catch (InterruptedException e) {
e.printStackTrace();
}
*/
// 修改余额
account.setBalance(account.getBalance() - drawAmount);
System.out.println("\t余额为: " + account.getBalance());
} else {
System.out.println(getName() + " 取钱失败, 余额不足! ");
}
}
}
// DrawTest 类来模拟实际的取钱操作, 两次取钱都成功了, 但是账户的余额变成了负数, 这就是编程事故
public class DrawTest {
public static void main(String[] args) {
// 创建一个账户
Account acct = new Account("1234567", 1000);
new DrawThread("甲", acct, 800).start();
new DrawThread("乙", acct, 800).start();
}
}
5.2 同步代码块
- 出现上面编程事故是因为 run() 方法的方法体不具有同步安全性, 程序中有两个并发线程在修改 Account 对象, 而系统此时通过 sleep(1) 切换给另一个修改 Account 对象的线程, 所以就出现了问题
- 解决上面不同步的问题就需要同步监视器来同步代码块
- 同步监视器的目的: 阻止两个线程对同一个共享资源进行并发访问, 因此通常应该使用可能被并发访问的共享资源充当同步监视器, 对于上面的取钱程序, 应该使用账户 account 作为同步监视器
- 同步代码块的语法格式:
synchronized(obj) {
//...
// 此处的代码块就是同步代码块
}
// 这段代码的含义是: synchronized后括号里的(obj) 就是同步监视器,
// 线程开始执行同步代码块之前, 必须先获得对同步监视器的锁定
代码示例
// 修改上面的 DrawThread 类
public class DrawThread extends Thread {
// 模拟用户账户
private Account account;
// 当前取钱线程所希望的钱数
private double drawAmount;
public DrawThread(String name, Account account, double drawAmount){
super(name);
this.account = account;
this.drawAmount = drawAmount;
}
// 当多个线程修改同一份共享数据时, 将设计数据安全问题
@Override
public void run() {
// 使用 account 作为同步监视器, 任何线程进入下面的同步代码块之前
// 必须先获得 account 账户的锁定 -- 其他线程无法获得锁, 也就无法修改它
// 这种做法符合: "加锁 -> 修改 -> 释放锁" 的逻辑
synchronized (account) {
// 账户余额大于取钱数目
if (account.getBalance() >= drawAmount) {
// 吐出钞票
System.out.println(getName() + " 取钱成功, 吐出钞票: " + drawAmount);
/*
try {
Thread.sleep(1);
}
catch (InterruptedException e) {
e.printStackTrace();
}
*/
// 修改余额
account.setBalance(account.getBalance() - drawAmount);
System.out.println("\t余额为: " + account.getBalance());
} else {
System.out.println(getName() + " 取钱失败, 余额不足! ");
}
}
// 同步代码块结束, 该线程释放同步锁
}
}
/* 程序输出:
甲 取钱成功, 吐出钞票: 800.0
余额为: 200.0
乙 取钱失败, 余额不足!
*/
5.3 同步方法
- Java的多线程同步安全提供了同步方法, 同步方法就是使用 synchronized 关键字修饰的某个方法
- 对于 synchronized 修饰的实例方法而言, 无须显式的指定同步监视器, 同步方法的同步监视器是this, 也就是调用该方法的对象
- 不可变类总是线程安全的, 因为它的对象状态不可改变, 但可变对象需要额外的方法来保证其线程的安全
线程安全的类的特征
- 该类的对象可以被多个线程安全的访问
- 每个线程调用该对象的任意方法之后都将得到正确的结果
- 每个线程调用该对象的任意方法之后, 该对象状态依然保持合理状态
线程安全的注意问题
- 不要对线程安全类的所有方法都进行同步, 只对那些会改变竞争资源(竞争资源也就是共享资源)的方法进行同步
- 如果可变类由两种运行环境: 单线程和多线程环境, 则应该为该可变类提供两种版本, 即线程安全和线程不安全版本(如: StringBuffer(单线程)和StringBuilder(多线程))
代码示例
// 修改之后的 Account 类
public class Account {
// 封装账户编号, 账户余额, 两个成员变量
private String accountNo;
private double balance;
// 无参数构造器
public Account(){}
// 有参数构造器
public Account(String accountNo, double balance) {
this.accountNo = accountNo;
this.balance = balance;
}
// setter getter
public String getAccountNo() {
return accountNo;
}
public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}
public double getBalance() {
return balance;
}
// 提供一个线程安全的 draw() 方法来完成取钱操作
public synchronized void draw(double drawAmount) {
// 账户余额大于数
if(balance >= drawAmount) {
// 吐钱
System.out.println(Thread.currentThread().getName() + " 取钱成功, 吐出钞票: " + drawAmount);
try {
Thread.sleep(1);
}catch (InterruptedException e) {
e.printStackTrace();
}
// 修改余额
balance -= drawAmount;
System.out.println("\t余额为: " + balance);
} else {
System.out.println(Thread.currentThread().getName() + " 取钱失败, 余额不足!");
}
}
// 重写 hashCode 方法
@Override
public int hashCode() {
return accountNo.hashCode();
}
// 重写 equals() 方法
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj.getClass() == Account.class) {
Account target = (Account)obj;
return target.getAccountNo().equals(accountNo);
}
return false;
}
}
// 修改后的 DrawThread 类
public class DrawThread extends Thread {
// 模拟用户账户
private Account account;
// 当前取钱线程所希望的钱数
private double drawAmount;
public DrawThread(String name, Account account, double drawAmount){
super(name);
this.account = account;
this.drawAmount = drawAmount;
}
// 当多个线程修改同一份共享数据时, 将设计数据安全问题
@Override
public void run() {
// 直接调用 account 对象的 draw() 方法来执行取钱操作
// 同步方法的同步监视器是this, this 代表调用 draw() 方法对象
// 也就是说, 线程进入 draw() 方法之前, 必须先对 account 对象加锁
account.draw(drawAmount);
}
}
代码说明
- 在 Account 里定义 draw() 方法, 而不是直接在 run() 方法中实现取钱逻辑, 这种做法更符合面向对象的规则
- DoMain Driven Design (DDM 领域驱动设计): 这种方式认为每个类都应该是完备的领域对象; 例如 Account 代表用户账户, 应该提供用户账户的相关方法; 通过 draw() 方法来执行取钱操作(或者为实现的转账操作), 而不是直接将 setBalance() 方法暴露出来任人操作, 这样才可以更好地保证 Account 对象的完整性和一致性
5.4 释放同步监视器的锁定
线程释放同步监视器锁定的情况
- 当前线程的同步方法, 同步代码块执行结束, 当前线程即释放同步监视器
- 当前线程在同步代码块, 同步方法中遇到 break, return 终止了该代码块, 该方法的继续执行, 当前线程将会释放同步监视器
- 当前线程在同步代码块, 同步方法中出现了未处理的 Error 或者 Exception, 导致了该代码块, 该方法异常结束时, 当前线程将会释放同步监视器
- 当前线程执行同步代码块或同步方法时, 程序执行了同步监视器对象的 wait() 方法, 则当前线程暂停, 并释放同步监视器
不会释放同步监视器的情况
- 线程执行同步代码块或同步方法时, 程序调用 Thread.sleep(), Thread.yield() 方法来暂停当前线程的执行, 当前线程不会释放同步监视器
- 线程执行同步代码块时, 其他线程调用了该线程的 suspend()方法将线程挂起, 该线程不会释放同步监视
5.5 同步锁 (Lock)
- Java5开始提供了一种更强大的线程同步机制, 通过显示定义同步锁对象来实现同步, 这种机制下, 同步锁由 Lock 对象充当
- Lock 是控制多个线程对共享资源进行访问的工具, 通常, 锁提供了对共享资源的独占访问, 每次只能有一个线程对 Lock 对象加锁, 线程开始访问共享资源之前应该先获得 Lock 对象
- Lock 允许实现更灵活的结构, 可以具有差别很大的属性, 并且支持多个相关的 Condition 对象
- Lock, ReadWriteLock(读写锁) 是Java5提供的两个根接口, 并提供了 ReentrantLock(可重入锁)实现类和 ReentrantReadWriteLock 实现类
- 同步方法或同步代码块使用与竞争资源相关的, 隐式的同步监视器, 并且强制要求加锁和释放锁要出现在一个块结构中, 而且当获取了多个锁机构时, 他们必须以相反的顺序释放, 且必须在与所有锁被获取时相同的范围内释放所有锁
- Lock 提供的用于其他功能的方法:
- tryLock(): 用于非块结构
- lockInterruptibly(): 试图获取可中断的锁
- tryLock(long, TimeUnit): 获取超时失效锁
- Java8提供了新增的 StampedLock类, 大多数时候可以替换传统的 ReentrantReadWriteLock, ReentrantReadWriteLock 为读写操作提供了三种模式:
- writing
- ReadingOptimistic
- Reading
- ReentrantLock 锁具有可重入性, 即 一个线程可以对已被加锁的 ReentrantLock 锁再次加锁, ReentrantLock 对象会维持一个计数器来追踪 lock() 方法的嵌套调用, 线程在每次调用 lock() 加锁后, 必须显式调用 unlock() 来释放锁
- 在实现线程安全的控制中, 比较常用的是 ReentrantLock(可重入锁), 使用该 Lock 对象可以显式地加锁, 释放锁, 其代码格式通常为:
class x {
// 定义锁对象
private final ReentrantLock lock = new ReentrantLock();
// ...
// 定义需要保证线程安全的方法
public void m(){
// 加锁
lock.lock();
try {
// 需要保证线程安全的代码
// .. methond body
}
catch (Exception e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
/* 使用ReentrantLock对象来进行同步, 加锁和释放锁出现在不同大的作用范围时, 应该使用 finally 块来确保在必要时释放锁
代码示例
public class Account {
// 定义锁对象
private final ReentrantLock lock = new ReentrantLock();
// 封装账户编号, 账户余额, 两个成员变量
private String accountNo;
private double balance;
// 无参数构造器
public Account(){}
// 有参数构造器
public Account(String accountNo, double balance) {
this.accountNo = accountNo;
this.balance = balance;
}
// setter getter
public String getAccountNo() {
return accountNo;
}
public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}
public double getBalance() {
return balance;
}
// 提供一个线程安全的 draw() 方法来完成取钱操作
public void draw(double drawAmount) {
// 加锁
lock.lock();
try {
// 账户余额大于取钱数
if(balance >= drawAmount) {
// 吐钱
System.out.println(Thread.currentThread().getName() + " 取钱成功, 吐出钞票: " + drawAmount);
try {
Thread.sleep(1);
}catch (InterruptedException e) {
e.printStackTrace();
}
// 修改余额
balance -= drawAmount;
System.out.println("\t余额为: " + balance);
} else {
System.out.println(Thread.currentThread().getName() + " 取钱失败, 余额不足!");
}
}
finally{
// 修改完成, 释放锁
lock.unlock();
}
}
// 重写 hashCode 方法
@Override
public int hashCode() {
return accountNo.hashCode();
}
// 重写 equals() 方法
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj.getClass() == Account.class) {
Account target = (Account)obj;
return target.getAccountNo().equals(accountNo);
}
return false;
}
}
5.6 死锁
- 死锁: 当两个线程相互等待对方释放同步监视器时就会发生死锁; 发生死锁后, 所有的线程处于阻塞状态, 无法继续; Java虚拟机没有检测死锁的情况, 也没有采取措施来处理死锁, 所以多线程编程应该避免死锁
代码示例
class A {
public synchronized void foo(B b) {
System.out.println("当前线程名: " + Thread.currentThread().getName() + " 进入了A实例的foo()方法");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前线程名: " + Thread.currentThread().getName() + " 企图调用B实例的last()方法");
b.last();
}
public synchronized void last() {
System.out.println("进入了A类的last()方法内部");
}
}
class B {
public synchronized void bar(A a) {
System.out.println("当前线程名: " + Thread.currentThread().getName() + " 进入了B实例的bar()方法");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前线程名: " + Thread.currentThread().getName() + " 企图调用A实例的last()方法");
a.last();
}
public synchronized void last() {
System.out.println("进入了B类的last()方法内部");
}
}
public class DeadLock implements Runnable {
A a = new A();
B b = new B();
public void init() {
Thread.currentThread().setName("主线程");
// 调用a对象的foo()方法
a.foo(b);
System.out.println("进入了主线程之后");
}
public void run() {
Thread.currentThread().setName("副线程");
// 调用b对象的bar()方法
b.bar(a);
System.out.println("进入了副线程之后");
}
public static void main(String[] args) {
DeadLock dl = new DeadLock();
// 以 dl 为 target 启动新线程
new Thread(dl).start();
dl.init();
}
}
6. 线程通信
6.1 传统的线程通信
Object 类提供的三个线程相关的同步监视器
方法
- wait(): 导致当前线程等待, 直到其他线程调用该同步监视器的 notify() 方法或 notifyAll() 方法来唤醒该线程; 有三种形式:
- wait(): 一直等待,直到其他线程通知
- wait(long millis): 等待 millis 毫秒后自动苏醒
wait(ling millis, int nanos)
- notify(): 唤醒在此同步监视器上等待的单个线程; 如果所有线程都在此同步监视器上等待, 则会选择唤醒其中一个线程; 选择是任意性的, 只有当前线程放弃对该同步监视器的锁定后(使用wait()方法), 才可以执行被唤醒的线程
- notifyAll(): 唤醒再此同步监视器上等待的所有线程, 只有当前线程放弃对该同步监视器的锁定后, 才可以执行被唤醒的线程
使用同步监视器对象调用上述方法的两种情况
- 对于使用 synchronized 修饰的同步方法, 因为该类的默认实例(this)就是同步监视器, 所以可以在同步方法中直接调用这三个方法
- 对于使用 synchronized 修饰的同步代码块, 同步监视器是 synchronized 后括号里的对象, 所以必须使用该对象调用上述三个方法
- 注意: 上述三个方法只可以用于 synchronized 关键字修饰的, 存在隐式的同步监视器的线程中
代码示例
// 模拟用户账户的类
public class AccountCommunicate {
// 封装账户编号, 账户余额的两个成员变量
private String accountNo;
private double balance;
// 标识账户中是否已经有村换的旗标
private boolean flag = false;
public AccountCommunicate() {}
public AccountCommunicate(String accountNo, double balance) {
this.accountNo = accountNo;
this.balance = balance;
}
public String getAccountNo() {
return accountNo;
}
public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}
public double getBalance() {
return balance;
}
// 重写 hashCode 方法
@Override
public int hashCode() {
return accountNo.hashCode();
}
// 重写 equals() 方法
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj.getClass() == Account.class) {
Account target = (Account)obj;
return target.getAccountNo().equals(accountNo);
}
return false;
}
public synchronized void draw(double drawAmount) {
try {
// 如果 flag 为 false, 表明账户中还没有人存钱进去, 取钱方法阻塞
if (!flag) {
wait();
} else {
// 执行取钱的操作
System.out.println(Thread.currentThread().getName() + " 取钱: " + drawAmount);
balance-= drawAmount;
System.out.println("账户余额: " + balance);
// 将标识账户是否已有存款的旗标设为 false
flag = false;
// 唤醒其他线程
notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void deposit(double depositAmount) {
try {
// 如果 flag 为真, 表明账户中已有人存钱进去, 存钱方法阻塞
if (flag) {
wait();
} else {
// 执行存款操作
System.out.println(Thread.currentThread().getName() + " 存款: " + depositAmount);
balance += depositAmount;
System.out.println("账户余额为: " + balance);
// 将表示账户是否已有存款的旗标设为true
flag = true;
// 唤醒其他线程
notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 模拟用户取钱的线程类
public class DrawThreadCommunicate extends Thread {
// 模拟用户账户
private AccountCommunicate account;
// 当前取钱线程所希望取的钱数
private double drawAmount;
public DrawThreadCommunicate (String name, AccountCommunicate account, double drawAmount) {
super(name);
this.account = account;
this.drawAmount = drawAmount;
}
// 重复100次执行取钱操作
@Override
public void run () {
for (int i = 0; i < 100; i++) {
account.draw(drawAmount);
}
}
}
// 模拟用户存钱的类
public class DepositThreadCommunicate extends Thread {
// 模拟用户账户
private AccountCommunicate account;
// 当前存款线程希望村的钱数
private double depositAmount;
public DepositThreadCommunicate(String name, AccountCommunicate account, double depositAmount) {
super(name);
this.account = account;
this.depositAmount = depositAmount;
}
// 重复执行100次存钱操作
@Override
public void run() {
for (int i = 0; i < 100; i++) {
account.deposit(depositAmount);
}
}
}
// 测试类
public class DrawTestCommunicate {
public static void main(String[] args) {
// 创建一个账户
AccountCommunicate account = new AccountCommunicate("1234567", 0);
new DrawThreadCommunicate("取钱者", account, 800).start();
new DepositThreadCommunicate("存款者甲", account, 800).start();
new DepositThreadCommunicate("存款者乙", account, 800).start();
new DepositThreadCommunicate("存款者丙", account, 800).start();
}
}
/*
线程最后阻塞了, 并不是死锁, 原因是有三个存款的线程, 取钱的线程只有一个, 而当取钱的线程执行结束后, 存钱的线程只是
在等待其他线程来取钱, 并不是等待其他线程释放同步监视器
*/
6.2 使用 Condition 控制线程通信
- 如果程序不使用 synchronized 关键字来保证同步, 而是直接使用 Lock 对象来保证同步, 则系统中不存在隐式的同步监视器, 也就不能使用 wait(), notify(), notifyAll() 方法来进行通信
- 使用 Lock 对象来保证同步时, Java提供了一个 Condition 类来保持协调, 使用 Condition 可以让那些已经得到Lock对象却无法继续执行的线程释放Lock对象, Condition 对象也可以唤醒其他处于等待的线程
- Condition 实例被绑定在一个 Lock 对象上, 要获得特定 Lock 实例的 Condition 实例, 调用 Lock 对象的 newCondition() 方法
Condition 类的三个方法
- await(): 类似于隐式同监视器上的 wait() 方法, 导致当前线程等待, 知道其他线程调用该 Condition 的 signal() 方法或 signalAll() 方法来唤醒该线程; 该 await() 还有很多变体: long awaitNanos(long nanosTimeout), void awaitUninterruptibly(), awaitUnit(Date deadline) 等
- signal(): 唤醒在此 Lock 对象上等待的单个线程, 只有当前线程放弃对该 Lock 对象的锁定后(使用await()方法), 才可以执行被唤醒的线程
- signalAll(): 唤醒在此 Lock 对象上等待的所有线程, 只有当前线程放弃对该Lock对象的锁定之后, 才可以执行被唤醒的线程
代码示例
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//
public class AccountCondition {
// 显式定义Lock对象
private final Lock lock = new ReentrantLock();
// 获得指定 Lock 对象对应的 Condition
private final Condition cond = lock.newCondition();
// 封装账户编号, 账户余额的两个成员变量
private String accountNo;
private double balance;
// 是否已有钱的旗标
private boolean flag = false;
public AccountCondition() {}
public AccountCondition (String accountNo, double balance) {
this.accountNo = accountNo;
this.balance = balance;
}
//
public String getAccountNo() {
return accountNo;
}
//
public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}
//
public double getBalance() {
return balance;
}
//
public void draw(double drawAmount) {
// 加锁
lock.lock();
try {
// 如果 flag 为假, 表明账户没钱存进去, 取钱方法阻塞
if (!flag) {
cond.await();
} else {
// 执行取钱的操作
System.out.println(Thread.currentThread().getName() + " 取钱: " + drawAmount);
balance -= drawAmount;
System.out.println("账户余额为: " + balance);
// 重点: 将标识账户是否有存款的旗标设为false
flag = false;
cond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 因为显式的加锁, 必须使用现实的在 finally 中释放锁
finally {
lock.unlock();
}
}
//
public void deposit(double depositAmount) {
lock.lock();
try {
// 如果 flag 为真, 表明账户已有人存钱, 存钱方法阻塞
if (flag) {
cond.await();
} else {
// 执行存钱操作
System.out.println(Thread.currentThread().getName() + " 存钱: " + depositAmount);
balance += depositAmount;
System.out.println("账户余额: " + balance);
// 将表示账户已有存款的旗标设为 true
flag = true;
cond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 在 finally 块中显式释放锁
finally {
lock.unlock();
}
}
//
@Override
public int hashCode(){
return accountNo.hashCode();
}
//
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj.getClass() == AccountCondition.class) {
AccountCondition account = (AccountCondition) obj;
return account.getAccountNo().equals(accountNo);
}
return false;
}
}
6.3 使用阻塞队列(BlockingQueue) 控制线程通信
- BlockingQueue 接口是 Queue 接口的子接口, 主要作用不是作为容器, 而是作为线程通信的工具
- BlockingQueue 接口有一个特征, 当生产者线程试图向 BlockingQueue 中放入元素时, 如果队列已满, 则该线程阻塞; 当消费者线程试图从 BlockingQueue 中取出元素时, 如果该队列为空, 则该线程被阻塞
- BlockingQueue 的两个支持阻塞的方法:
- put(E e) : 尝试将 E 元素放入 BlockingQueue 中, 如果该队列的元素已满, 则阻塞该线程
- take(): 尝试从 BlockingQueue 的头部取出元素, 如果该队列元素为空, 则阻塞该线程
- Queue 接口中的方法归纳:
- 在队列尾部插入元素; 包括 add(E e), offer(E e), put(E e) 方法, 当该队列已满时, 这三个方法分别会抛出异常, 返回 false, 阻塞队列
- 在队列头部删除并返回删除的元素; 包括 remove(), poll(), take() 方法, 当该队列已空时, 这三个方法会抛出异常, 返回 false, 阻塞队列
- 在队列头部取出但不删除元素; 包括 element(), peek() 方法, 当队列已空时, 这两个方法会抛出异常, 返回 false
BlockingQueue 接口的5个实现类
- ArrayBlockingQueue: 基于数组实现的 BlockingQueue 队列
- LinkedBlockingQueue: 基于链表实现的 BlockingQueue 队列
- PriorityBlockingQueue: 不是标准的阻塞队列, 与 PriorityQueue 类似, 该队列调用 remove(), poll(), take() 等方法取出元素时, 并不是取出队列中存在时间最长的元素, 而是队列中最小的元素, PriorityBlockingQueue 判断元素的大小即可根据元素(实现 Comparable 接口)的本身大小来自然排序, 也可使用 Comparator 进行定制排序
- SynchronizedQueue: 同步队列, 对该队列的存, 取操作必须交替进行
- DelayQueue: 一个特殊的 BlockingQueue, 底层基于 PriorityBlockingQueue 实现; DelayQueue 要求集合元素都实现 Delay 接口(该接口有一个 long getDelay() 方法), DelayQueue 根据集合元素的 getDepay() 方法的返回值进行排序
代码示例
// BlockingQueue 接口队列中 put(E e) take() 方法示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//
public class BlockingQueueTest {
public static void main(String[] args) throws Exception {
// 定义一个长度为2的阻塞队列
BlockingQueue<String> bq = new ArrayBlockingQueue<>(2);
bq.put("java"); // 与 bq.add("java")或bq.offer("java") 相同
bq.put("Python");
// bq.put("css"); // 阻塞线程
bq.take();
bq.take();
// bq.take(); // 阻塞队列
/*
1. 因为创建 BlockingQueue 时指定了容量为2, 所以未满时, 使用 put(), add(), offer() 方法放入元素的效果都一样
但是当队列已满时, 使用 put(E e) 放入元素会阻塞线程, 如果使用 add() 方法放入元素则会引发异常, 使用 offer() 方法
尝试放入元素则会返回 false, 元素不会放成功;
2. 当取出队列的2个元素后, 在使用take() 取出元素时, 该方法会阻塞线程, 如果使用 remove() 方法取出元素会引发异常,
使用 poll() 方法尝试取出元素会返回 false, 元素不会被删除
*/
}
}
代码示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//
class Producer extends Thread {
private BlockingQueue<String> bq;
public Producer(BlockingQueue<String> bq) {
this.bq = bq;
}
@Override
public void run(){
String[] strArr = new String[] {
"java",
"python",
"spring"
};
for (int i = 0; i < 99; i++) {
System.out.println(getName() + " producer prepared to produce element");
try {
Thread.sleep(200);
// 尝试放入元素, 如果队列已满, 则线程被阻塞
bq.put(strArr[i % 3]);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " Production is Completed! " + bq);
}
}
}
//
class Consumer extends Thread {
private BlockingQueue<String> bq;
public Consumer(BlockingQueue<String> bq) {
this.bq = bq;
}
@Override
public void run(){
while (true) {
System.out.println(getName() + " Comsumer prepared to consumer element");
try {
Thread.sleep(200);
// 尝试取出元素, 如果队列为空, 则线程阻塞
bq.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " Consumption completed!");
}
}
}
//
public class BlockingQueueTest2 {
public static void main(String[] args) {
// 创建一个容量为1的 BlockingQueue
BlockingQueue<String> bq = new ArrayBlockingQueue<>(1);
// 启动三个线程
new Producer(bq).start();
new Producer(bq).start();
new Producer(bq).start();
// 启动一个消费者线程
new Consumer(bq).start();
}
}
7. 线程组和未处理的异常
7.1 线程组
- 线程组: Java使用 ThreadGroup 来表示线程组, 线程组可以分类管理一批线程; Java 允许程序直接对线程组进行控制, 对线程组的控制相当于同时控制这批线程
- 用户创建的所有线程都属于指定线程组, 如果程序没有显式指定线程属于哪个线程组, 则该线程属于默认线程组; 默认情况下, 子线程和创建它的父线程处于同一个线程组内, 例如 A线程创建了 B线程, 并且没有指定B线程的线程组, 则B线程属于A线程所在的线程组
- 一旦某个线程加入了指定线程组之后, 该线程将一直属于该线程组, 直到该线程死亡, 线程运行中途不能改变它所属的线程组
- 可以通过 getThreadGroup()方法来获取该线程所属的线程组, 该方法返回的值是 ThreadGroup 对象, 表示一个线程组
Thread 类提供的用于设置新创建的线程属于那个线程组的构造器
- Thread(ThreadGroup group, Runnable target): 以 target 的 run() 方法作为线程执行体创建新线程, 属于 group 线程组
- Thread(ThreadGroup group, Runnable target, String name): 以 target 的 run() 方法作为线程执行体创建新线程, 该线程属于 group 线程组, 且线程名为 name
- Thread(ThreadGroup group, String name): 创建新线程, 新线程名为 name, 属于 group 线程组
ThreadGroup() 类的两个构造器
- ThreadGroup(String name): 以指定的线程组名字来创建新的线程组
- ThreadGroup(ThreadGroup group, String name): 以指定的名字, 指定的父线程组创建一个新线程组
ThreadGroup 类提供的操作线程组的常用方法
- int activeCount(): 返回此线程组中活动线程的数目
- interrupt(): 中断此线程组中的所有线程
- isDaemon(): 判断该线程组是否是后台线程
- setDaemon(boolean daemon): 把该线程组设置成后台线程组; (特征: 当后台线程组的最后一个线程执行结束或最后一个线程被销毁后, 后台线程组将自动销毁)
- setMaxPriority(int pri): 设置线程组的最高优先级
代码示例
class MyThread extends Thread {
// 提供指定线程名的构造器
public MyThread(String name) {
super(name);
}
//
// 提供了指定线程名, 线程组的构造器
public MyThread(ThreadGroup group, String name) {
super(group, name);
}
//
@Override
public void run() {
for (int i = 0; i < 20; i++) {
System.out.println(getName() + " 线程的 i 变量" + i);
}
}
}
public class ThreadGroupTest {
public static void main(String[] args) {
// 获取主线程所在的线程组, 这是所有线程默认的线程组
ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
System.out.println("主线程组的名字是: " + mainGroup.getName());
System.out.println("主线程组是否是后台线程组: " + mainGroup.isDaemon());
new MyThread("主线程组的线程").start();
ThreadGroup tg = new ThreadGroup("新线程组");
tg.setDaemon(true);
System.out.println("tg线程组是否是后台线程组: " + tg.isDaemon());
MyThread tt = new MyThread(tg, "tg组的线程甲");
tt.start();
new MyThread(tg, "tg组的线程乙").start();
}
}
7.2 未处理异常
- ThreadGroup 类的 void uncaughtException(Thread t, Throwable e) 方法: 可以处理该线程组内的任意线程所抛出的未处理异常
- 如果线程执行过程中抛出了一个未处理异常, JVM在结束该线程之前会自动查找是否有对应的 Thread.UncaughtExceptionHandler 对象, 如果找到该处理器对象, 则会调用该对象的 uncaughtException(Thread t, Throwable e) 方法来处理异常
- Thread.UncaughtExceptionHandler 是 Thread 类的一个静态内部接口, 该接口内只有一个方法: void uncaughtException(Thread t, Throwable e), 该方法中的 t 代表出现异常的线程, 而 e 代表该线程抛出的异常
- Thread 类提供的设置异常处理器的方法
- static setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh): 为该线程类的所有线程实例设置默认的异常处理器
- setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh): 未指定的线程实例设置异常处理器
- ThreadGroup 类实现了 Thread.UncaughtExceptionHandler 接口, 所以每个线程所属的线程组将会作为默认的异常处理器; 当一个线程抛出未处理异常时, JVM 会首先查找该异常对应的异常处理器(通过 setUncaughtExceptionHandler() 方法设置的异常处理器), 如果找到该异常处理器, 则将调用该异常处理器处理该异常, 否则, JVM 将会调用该线程所属的线程组对象的 uncaughtException() 方法来处理该异常
- 线程组处理异常的默认流程:
- ① 如果该线程组有父线程组, 则调用父线程组的 uncaughtException() 方法来处理该异常
- ② 如果该线程实例所属的线程类有默认的异常处理器(由 serDefaultUncaughtExceptionHandler() 方法设置的异常处理器), 那么就调用该异常处理器来处理该异常
- ③ 如果该异常对象是 ThreadDeath 的对象, 则不做任何处理, 否则, 将异常跟踪栈的信息打印到 System.err 错误输出流, 并结束该线程
代码示例
// 定义自己的异常处理器
class MyExHandler implements Thread.UncaughtExceptionHandler {
// 实现 uncaughtException() 方法, 该方法将处理线程的未处理异常
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t + " 线程出现了异常: " + e);
}
}
public class ExHandler {
public static void main(String[] args) {
// 设置主线程的异常处理器
Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler());
int a = 5 / 0;
System.out.println("程序结束了");
}
}
8. 线程池
- 线程池在系统启动时即创建大量空闲的线程, 程序将一个 Runnable 对象或 Callable 对象传给线程池, 线程池就会启动一个线程来执行他们的 run() 或 call() 方法, 当 run() 或 call() 方法执行结束后, 该线程不会死亡, 而是再次放回线程池中称为空闲线程, 等到执行下一个 Runnable 对象的 run() 或 call() 方法
- 因为线程池中的最大线程数可控, 所以实用线程池可以有效的控制系统中并发线程的数量
8.1 Java 8 改进的线程池
- Java5 开始, Java内建支持线程池, Java 5 新增了一个 Executors 工厂类来产生线程池
- ExecutorService 代表尽快执行线程的线程池, (只要线程池中有空闲线程,就立即执行线程任务), 程序只要将一个 Runnable 对象或 Callable 对象(代表线程任务) 提交给该线程池, 该线程池就会尽快执行任务; 有如下三个方法:
- ScheduledExecutorService 是 ExecutorService 的子类, 可以在指定延迟之后职系那个线程任务
- 用完一个线程池后, 应该调用该线程池的 shutdown() 方法, 该方法将启动线程池的关闭序列, 调用 shutdown() 方法后的线程池不再接收新任务, 但会将以前所有已提交任务执行完成; 当线程池中所有任务都执行完成后, 池中的所有线程都会死亡, 另外也可以调用线程池的 shutdownNow() 方法来关闭线程池, 该方法试图停止所有正在执行的活动任务, 暂停处理正在执行的任务, 并返回等待执行的任务列表
工厂类创建线程池的静态方法
- ExecutorService newCachedThreadPool(): 创建一个具有缓存功能的线程池, 系统根据需要创建线程, 这些线程将会被缓存在线程池中
- ExecutorService newFixedThreadPool(int nThreads): 创建一个可重用的, 具有固定线程数的线程池
- ExecutorService newSingleThreadExecutor(): 创建一个只有单线程的线程池, 它相当于调用 newFixedThreadPool()方法时传入了参数1
- ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 创建具有指定线程数的线程池, 它可以在指定延迟后执行线程任务; corePoolSize 指的是所保存的线程数, 即使线程数空闲的也被保存在线程池内
- ScheduledExecutorService newSingleThreadScheduledExecutor(): 创建只有一个线程的线程池, 它可以在指定延迟后执行线程任务
- ExecutorService newWorkStealingPool(int parallelism): 创建持有足有的线程的线程池来支持给定的并行级别, 该方法还hi实用多个队列来减少竞争
- ExecutorService newWorkStealingPool(): 该方法是上一个方法的简化版本; 如果当前及其是4个CPU, 则目标的并行级别被设置为4, 也就是说根据CPU的个数来创建对应的并行数
ExecutorService 常用方法
- Future<?> submit(Runnable task): 将一个 Runnable 对象提交给指定的线程池, 线程池将在有空闲线程时执行 Runnable 对象代表的任务. 其中 Future 对象代表 Runnable 任务的返回值 --- 但 run() 方法没有返回值, 所以 Future 对象将在 run() 方法执行结束后返回null, 但可以调用 Future 的 isDone(), isCancelled() 方法来获得 Runnable 对象的执行状态
Future submit(Runnable task, T result): 将一个 Runnable 对象提交给指定的线程池, 线程池将在有空闲线程时执行 Runnable 对象代表的任务. 其中 result 显式指定线程执行结束后的返回值, 所以 Future 对象将在 run() 方法执行结束后返回 result Future submit(Callable task): 将一个 Callable 对象提交给指定的线程池, 线程池将在有空闲线程时执行 Callable 对象的代表的任务, 其中 Future 代表 Callable 对象里 call() 方法的返回值
ScheduledExecutorService 常用方法
- ScheduledFuture
schedule(Callable callable, long delay, TimeUnit unit): 指定 callable 任务将在 delay 延迟后执行 - ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit): 指定 command 任务将在 delay 延迟后执行
- ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): 指定 command 任务将在 delay 延迟后执行, 而且以设定频率重复执行; 也就是说, 在 initialDelay 后开始执行, 依次在 initialDaley+period, initialDaley+2*period...处重复执行, 依次类推
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): 创建并执行一个在给定初始延迟后首次启用的定期操作, 随后在每一次执行终止和下一次执行开始之间都存在给定的延迟, 如果任务在任意一次执行时遇到异常, 就会取消后续执行, 否则, 只能通过程序来显式取消或终止该任务
实用线程池来执行线程任务的步骤
- 调用 Executors 类的静态工厂方法创建一个 ExecutorService 对象, 该对象代表一个线程池
- 创建 Runnable 实现类或 Callable 实现类的实例, 作为线程执行任务
- 调用 ExecutorService 对象的 submit() 方法来提交 Runnable 实例或 Callable 实例
- 当不想提交任何任务时, 调用 ExecutorService 对象的 shutdown() 方法来关闭线程池
代码示例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//
public class ThreadPoolTest {
public static void main(String[] args) throws Exception {
// 创建一个具有固定线程数 (6) 的线程池
ExecutorService pool = Executors.newFixedThreadPool(6);
// 使用 Lambda 表达式创建 Runnable 对象
Runnable target = () -> {
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " 的i值为: " + i);
}
};
// 向线程池中提交两个线程池
pool.submit(target);
pool.submit(target);
// 关闭线程池
pool.shutdown();
}
}
8.2 Java 8 增强的 ForkjoinPool
- java 7 提供的 ForkJoinPool 来支持将一个任务拆分成多个 "小任务" 并行计算, 再把多个 "小任务" 的结果合并成中的计算结果
- ForkJoinPool 是 ExecutorService 的实现类, 也是一种特殊的线程池
- 创建了 ForkJoinPool 实例之后, 就可调用 ForkJoinPool 的 submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法来执行指定任务了
- ForkJoinTask 是一个抽象类, 代表一个可并行, 合并的任务, 有两个抽象子类: RecursiveAction 和 RecursiveTask
- 其中 RecursiveTask 代表有返回值的任务, 而 RecursiveAction 代表没有返回值的任务
- 如果大任务是有返回值的任务, 则可以让任务继承 RecursiveTask
, 其中泛型参数T就代表了该任务的返回值类型
ForkJoinPool 常用的构造器
- ForkJoinPool(int parallelism): 创建一个包含了 parallelism 个并行线程的 ForkJoinPool
- ForkJoinPool(): 以 Runtime.availableProcessors()方法的返回值作为 parallelism 参数来创建 ForkJoinPool
java 8 为 ForkJoinPool 新增的通用池功能, 创建方法
- ForkJoinPool commonPool(): 该方法返回一个通用池, 通用池的运行状态不会受 shutdown() 或 shutdownNow() 方法的影响; 但是 如果程序直接执行 System.exot(0) 来终止虚拟机, 通用池以及通用池中正在执行的任务都hi被自动终止
- int getCommonPoolParallelism(): 该方法返回通用池的并行级别
代码示例
// 无返回值的任务, 将打印0~300的任务拆解成每次只打50个数的小任务
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
//
// 继承 RecursiveAction 来实现 "可分解" 的任务
class PrintTask extends RecursiveAction {
// 每个小任务最多只打印 50 个数
private static final int THRESHOLD = 50;
private int start;
private int end;
// 打印从 start 到 end 的任务
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute () {
// 当 end 与 start 之间的差小于 THRESHOLD 时, 开始打印
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + " i的值: " + i);
}
} else {
// 当 end 与 start 之间的差大于 THRESHOLD 时, 即要打印的数超过50个时, 将大任务分解成两个小任务
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
// 并执行两个小任务
left.fork();
right.fork();
}
}
}
public class ForkJoinPoolTest {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
// 提交可分解的 PrintTask 任务
pool.submit(new PrintTask(0, 300));
pool.awaitTermination(2, TimeUnit.SECONDS);
// 关闭线程池
pool.shutdown();
System.out.println(System.currentTimeMillis() - start);
}
}
// 具有返回值的ForkJoinPool
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
//
// 继承 RecursiveTask 来实现 "可分解" 的任务
class CalTask extends RecursiveTask<Integer> {
// 每个小任务只加20个数
private static final int THRESHOLD = 20;
private int arr[];
private int start;
private int end;
// 累加从 start 到 end 的数组元素
public CalTask(int[] arr, int start, int end){
this.arr = arr;
this.start = start;
this.end = end;
}
@Override
protected Integer compute () {
int sum = 0;
// 当 end 与 start 之间的差小于 THRESHOLD 时, 开始进行实际累加
if (end - start < THRESHOLD) {
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else {
// 当 end 与 start 之间的差大于 THRESHOLD 时, 即要累加的数超过 20 时, 将大任务分解成两个 小任务
int middle = (start + end) / 2;
CalTask left = new CalTask(arr, start, middle);
CalTask right = new CalTask(arr, middle, end);
// 执行两个小任务
left.fork();
right.fork();
// 把两个 "小任务" 累加的结果合并起来
return left.join() + right.join();
}
}
}
public class Sum {
public static void main(String[] args) throws Exception {
int[] arr = new int[100];
Random rand = new Random();
int total = 0;
// 初始化100个数组元素
for (int i = 0, len = arr.length; i < len; i++) {
int tmp = rand.nextInt(20);
// 对数组元素赋值, 并将数组元素的值添加到 sum 总和中
total += (arr[i] = tmp);
}
System.out.println(total);
// 创建一个通用池
ForkJoinPool pool = ForkJoinPool.commonPool();
// 提交可分解的CaltTask 任务
Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
System.out.println(future.get());
// 关闭线程池
pool.shutdown();
}
}
9. 线程相关的类
9.1 ThreadLocal 类
- ThreadLocal 类可以简化多线程编程时并发的访问, 使用这个工具类可以简单地隔离多线程程序的竞争资源
- ThreadLocal 是 Thread Local Variable (线程局部变量) 的意思, 其功能非常简单, 就是为每一个使用该变量的线程都提供一个变量值的副本, 使每一个线程都可以独立地改变自己的副本, 而不会和其他线程的副本冲突; (从线程的角度看, 就好像每个线程都完全拥有该变量一样)
- ThreadLocal 和其他所有同步机制一样, 都是为了解决多线程中对同一变量的访问冲突, 普通同步机制中, 是通过对象加锁来实现多线程对同一变量的安全访问的
- ThreadLocal 是将需要并发访问的资源复制多份, 每个线程拥有一份资源, 每个线程都拥有自己的资源副本, 从而也就没有必要对该变量进行同步了; 在编写多线程代码时, 可以将不安全的整个变量封装进 ThreadLocal, 或者把该对象与线程相关的状态使用 ThreadLocal 保存
- 如果多个线程之间需要共享资源, 以达到线程之间的通信, 就是用同步机制; 如果仅仅需要隔离多个线程之间的共享冲突, 则可以使用 ThreadLocal
ThreadLocal 常用方法:
- T get(): 返回此线程局部变量中当前线程副本中的值
- void remove(): 删除此线程局部变量中当前线程的值
- void set(T value): 设置此线程局部变量中当前线程副本中的值
代码示例
class TLAccount {
/* 定义一个 ThreadLocal 类型的变量, 该变量将是一个线程局部变量, 每个线程都会保留该变量的副本*/
private ThreadLocal<String> name = new ThreadLocal<>();
// 定义一个初始化 name 成员变量的构造器
public TLAccount(String str) {
this.name.set(str);
// 下面代码用于访问当前线程的 name 副本的值
System.out.println("----- " + this.name.get());
}
// name de setter 和 getter 方法
public String getName() {
return name.get();
}
public void setName(String str) {
this.name.set(str);
}
}
class MyTest extends Thread {
// 定义一个 TLAccount 类型的成员变量
private TLAccount account;
public MyTest(TLAccount account, String name) {
super(name);
this.account = account;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
// 当 i == 6 时输出将账户名替换成当前线程名
if (i == 6) {
account.setName(getName());
}
// 输出同一账户的账户名和循环变量
System.out.println(account.getName() + " 账户 i 的值: " + i);
}
}
}
public class ThreadLocalTest {
public static void main(String[] args) {
// 启动两个线程, 两个线程共享同一个 TLAccount
TLAccount at = new TLAccount("初始化");
/*
虽然连个线程共享同一个账户, 即只有一个账户名
但由于账户名是 ThreadLocal 类型的, 所以每个线程
都完全拥有各自的账户名副本, 因此在 i == 6 之后, 将看到两个
线程访问同一个账户时出现不同账户名
*/
new MyTest(at, "线程甲").start();
new MyTest(at, "线程乙").start();
}
}
9.2 包装线程不安全的集合
- ArrayList, LinkedList, HashSet, TreeSet, HashMap, TreeMap 等都是线程不安全的, 即多个线程向这些集合中存, 取元素时, 可能破坏这些集合数据的完整性
- 为了多线程访问这些集合安全, 就需要 Collections 提供的类方法把这些集合包装成线程安全的集合
Collections 常用的方法
Connection synchronizedConnection(Collection c): 返回指定 Connection 对应的线程安全的 collection - static
List synchronizedList(List list): 返回指定 List 对象对应的线程安全的 List 对象 - static <K, V> Map<K, V> synchronizedMap(Map<K, V> m): 返回指定 Map 对象对应的线程安全的 Map 对象
- static
Set synchronizedSet(Set s): 返回指定 Set 对象对应的线程安全的 Set 对象 - static <K, V> SortedMap<K, V> synchronizedSortedMap(SortedMap<K, V> m): 返回指定 SortedMap 对象对应的线程安全的 SortedMap 对象
- static
SortedSet synchronizedSortedSet(SortedSet s): 返回指定 SortedSet 对象对应的线程安全的 SortedSet 对象
代码示例
import java.util.*;
public class CollectionsTest {
public static void main(String[] args) {
// 创建多线程安全的 HashMap List Set
Map<String, String> m = Collections.synchronizedMap(new HashMap<>());
List<String> l = Collections.synchronizedList(new ArrayList<>());
Set<Object> s = Collections.synchronizedSet(new HashSet<>());
}
}
9.3 线程安全相关的集合类
- java.util.concurrent 包下提供了大量支持高效并发访问的集合接口和实现类
- 以 Concurrent 开头的集合类代表了支持并发访问的集合, 他们可以支持多个线程并发的写入访问, 这些写入线程的所有操作都是线程安全的, 但读取操作不必锁定
- ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, ConcurrentLinkedQueue, ConcurrentLinkedQeque
- 当多个线程共享访问一个公共集合时, ConcurrentLinkedQueue 是一个更好的选择, ConcurrentLinkedQueue 不允许使用 null 元素; ConcurrentLinkedQueue 实现了多线程的高效访问, 多个线程访问 ConcurrentLinkedQueue 集合时无须等待
- ConcurrentHashMap 支持 16 个线程并发写入, 当有超过 16 个线程并发向该Map中写入数据时, 可能有一些线程需要等待; 但是程序通过 concurrencyLevel 构造参数(默认值是16)来支持更多的并发写入线程
- ConcurrentHashMap 新增的方法分三大类:
- forEach 系列: forEach, forEachKey, forEachValue, forEachEntry
- search 系列: search, searchKeys, searchValues, searchEntries
- reduce 系列: reduce, reduceToDouble, reduceToLong, reduceKeys, reduceValues