文章

JAVA 多线程 (一)

多线程

Java语言内置了多线程支持:一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()方法,在main()方法内部,我们又可以启动多个线程。此外,JVM还有负责垃圾回收的其他工作线程等。

因此,对于大多数Java程序来说,我们说多任务,实际上是说如何使用多线程实现多任务。

和单线程相比,多线程编程的特点在于:多线程经常需要读写共享数据,并且需要同步。例如,播放电影时,就必须由一个线程播放视频,另一个线程播放音频,两个线程需要协调运行,否则画面和声音就不同步。因此,多线程编程的复杂度高,调试更困难。

Java多线程编程的特点又在于:

  • 多线程模型是Java程序最基本的并发模型;

  • 后续读写网络、数据库、Web开发等都依赖Java多线程模型。

因此,必须掌握Java多线程编程才能继续深入学习其他内容。

在学习之前,我罗列了一些问题,学习完之后,可以回答一下。

  1. 线程有哪些状态?

  2. 多线程带来了什么问题?

  3. 使用synchronized加锁,锁加到哪里?作用域是什么?

  4. 原子操作不需要加锁,Java 虚拟机规范了哪些原子操作?

  5. 什么是可重入锁?

  6. wait()notify()在什么场景下使用?

  7. wait()是否会释放锁?

  8. ReentrantLock的作用?和synchronized相比有什么区别?

  9. 读写场景下,多线程读取相互不影响,即读不需要加锁,写的时候才需要加锁,即不允许读取,如何实现?

  10. ReadWriteLock是悲观锁,写入时无法读取,如何实现边写边读?

  11. 如何在限制在同一时刻可访问线程数量?

  12. 有哪些线程安全的集合?

  13. 有哪些线程池?创建线程池的方式有哪些?有什么需要注意的点?

  14. 如何获取其他线程返回的内容?

  15. 如何做到多个线程串行执行并获取返回内容?那并行执行呢?

  16. 多线程如何实现分治?(分别计算并合并结果)

  17. ThreadLocal有什么作用?

线程状态

介绍

Java程序中,一个线程对象只能调用一次start()方法启动新线程,并在新线程中执行run()方法。一旦run()方法执行完毕,线程就结束了。因此,Java线程的状态有以下几种:

  • New:新创建的线程,尚未执行;

  • Runnable:运行中的线程,正在执行run()方法的Java代码;

  • Blocked:运行中的线程,因为某些操作被阻塞而挂起;

  • Waiting:运行中的线程,因为某些操作在等待中;

  • Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待;

  • Terminated:线程已终止,因为run()方法执行完毕。

用一个状态转移图表示如下:

         ┌─────────────┐
         │     New     │
         └─────────────┘
                │
                ▼
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
 ┌─────────────┐ ┌─────────────┐
││  Runnable   │ │   Blocked   ││
 └─────────────┘ └─────────────┘
│┌─────────────┐ ┌─────────────┐│
 │   Waiting   │ │Timed Waiting│
│└─────────────┘ └─────────────┘│
 ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
                │
                ▼
         ┌─────────────┐
         │ Terminated  │
         └─────────────┘

当线程启动后,它可以在RunnableBlockedWaitingTimed Waiting这几个状态之间切换,直到最后变成Terminated状态,线程终止。

线程终止的原因有:

  • 线程正常终止:run()方法执行到return语句返回;

  • 线程意外终止:run()方法因为未捕获的异常导致线程终止;

  • 对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)。

一个线程还可以等待另一个线程直到其运行结束。例如,main线程在启动t线程后,可以通过t.join()等待t线程结束后再继续运行:

// 多线程
public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            System.out.println("hello");
        });
        System.out.println("start");
        t.start();
        t.join();
        System.out.println("end");
    }
}

main线程对线程对象t调用join()方法时,主线程将等待变量t表示的线程运行结束,即join就是指等待该线程结束,然后才继续往下执行自身线程。所以,上述代码打印顺序可以肯定是main线程先打印startt线程再打印hellomain线程最后再打印end

如果t线程已经结束,对实例t调用join()会立刻返回。此外,join(long)的重载方法也可以指定一个等待时间,超过等待时间后就不再继续等待。

小结

  1. Java线程对象Thread的状态包括:NewRunnableBlockedWaitingTimed WaitingTerminated

  2. 通过对另一个线程对象调用join()方法可以等待其执行结束;

  3. 可以指定等待时间,超过等待时间线程仍然没有结束就不再等待;

  4. 对已经运行结束的线程调用join()方法会立刻返回。

中断线程

如果线程需要执行一个长时间任务,就可能需要能中断线程。中断线程就是其他线程给该线程发一个信号,该线程收到信号后结束执行run()方法,使得自身线程能立刻结束运行。

我们举个栗子:假设从网络下载一个100M的文件,如果网速很慢,用户等得不耐烦,就可能在下载过程中点“取消”,这时,程序就需要中断下载线程的执行。

中断一个线程非常简单,只需要在其他线程中对目标线程调用interrupt()方法,目标线程需要反复检测自身状态是否是interrupted状态,如果是,就立刻结束运行。

我们还是看示例代码:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new MyThread();
        t.start();
        Thread.sleep(1); // 暂停1毫秒
        t.interrupt(); // 中断t线程
        t.join(); // 等待t线程结束
        System.out.println("end");
    }
}

class MyThread extends Thread {
    public void run() {
        int n = 0;
        while (! isInterrupted()) {
            n ++;
            System.out.println(n + " hello!");
        }
    }
}

Volatile 关键字

为什么要对线程间共享的变量用关键字volatile声明?这涉及到Java的内存模型。在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!

┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
           Main Memory
│                               │
   ┌───────┐┌───────┐┌───────┐
│  │ var A ││ var B ││ var C │  │
   └───────┘└───────┘└───────┘
│     │ ▲               │ ▲     │
 ─ ─ ─│─│─ ─ ─ ─ ─ ─ ─ ─│─│─ ─ ─
      │ │               │ │
┌ ─ ─ ┼ ┼ ─ ─ ┐   ┌ ─ ─ ┼ ┼ ─ ─ ┐
      ▼ │               ▼ │
│  ┌───────┐  │   │  ┌───────┐  │
   │ var A │         │ var C │
│  └───────┘  │   │  └───────┘  │
   Thread 1          Thread 2
└ ─ ─ ─ ─ ─ ─ ┘   └ ─ ─ ─ ─ ─ ─ ┘

这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。例如,主内存的变量a = true,线程1执行a = false时,它在此刻仅仅是把变量a的副本变成了false,主内存的变量a还是true,在JVM把修改后的a回写到主内存之前,其他线程读取到的a的值仍然是true,这就造成了多线程之间共享的变量不一致。

因此,volatile关键字的目的是告诉虚拟机:

  • 每次访问变量时,总是获取主内存的最新值;

  • 每次修改变量后,立刻回写到主内存。

volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。

如果我们去掉volatile关键字,运行上述程序,发现效果和带volatile差不多,这是因为在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。

守护线程

守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。

如何创建守护线程呢?方法和普通线程一样,只是在调用start()方法前,调用setDaemon(true)把该线程标记为守护线程:

Thread t = new MyThread();
t.setDaemon(true);
t.start();

在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。

线程同步

多线程所带来的问题

当多个线程同时运行时,线程的调度由操作系统决定,程序本身无法决定。因此,任何一个线程都有可能在任何指令处被操作系统暂停,然后在某个时间段后继续执行。

这个时候,有个单线程模型下不存在的问题就来了:如果多个线程同时读写共享变量,会出现数据不一致的问题。

我们来看一个例子:

// 多线程
public class Main {
    public static void main(String[] args) throws Exception {
        var add = new AddThread();
        var dec = new DecThread();
        add.start();
        dec.start();
        add.join();
        dec.join();
        System.out.println(Counter.count);
    }
}

class Counter {
    public static int count = 0;
}

class AddThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) { Counter.count += 1; }
    }
}

class DecThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) { Counter.count -= 1; }
    }
}

上面的代码很简单,两个线程同时对一个int变量进行操作,一个加10000次,一个减10000次,最后结果应该是0,但是,每次运行,结果实际上都是不一样的。

这是因为对变量进行读取和写入时,结果要正确,必须保证是原子操作。原子操作是指不能被中断的一个或一系列操作。

例如,对于语句:

n = n + 1;

看上去是一行语句,实际上对应了3条指令:

ILOAD
IADD
ISTORE

我们假设n的值是100,如果两个线程同时执行 n = n + 1,得到的结果很可能不是102,而是101,原因在于:

┌───────┐    ┌───────┐
│Thread1│    │Thread2│
└───┬───┘    └───┬───┘
    │            │
    │ILOAD (100) │
    │            │ILOAD (100)
    │            │IADD
    │            │ISTORE (101)
    │IADD        │
    │ISTORE (101)│
    ▼            ▼

如果线程1在执行ILOAD后被操作系统中断,此刻如果线程2被调度执行,它执行ILOAD后获取的值仍然是100,最终结果被两个线程的ISTORE写入后变成了101,而不是期待的102

解决多线程带来的问题-加锁

这说明多线程模型下,要保证逻辑正确,对共享变量进行读写时,必须保证一组指令以原子方式执行:即某一个线程执行时,其他线程必须等待:

┌───────┐     ┌───────┐
│Thread1│     │Thread2│
└───┬───┘     └───┬───┘
    │             │
    │-- lock --   │
    │ILOAD (100)  │
    │IADD         │
    │ISTORE (101) │
    │-- unlock -- │
    │             │-- lock --
    │             │ILOAD (101)
    │             │IADD
    │             │ISTORE (102)
    │             │-- unlock --
    ▼             ▼

通过加锁和解锁的操作,就能保证3条指令总是在一个线程执行期间,不会有其他线程会进入此指令区间。即使在执行期线程被操作系统中断执行,其他线程也会因为无法获得锁导致无法进入此指令区间。只有执行线程将锁释放后,其他线程才有机会获得锁并执行。这种加锁和解锁之间的代码块我们称之为临界区(Critical Section),任何时候临界区最多只有一个线程能执行。

可见,保证一段代码的原子性就是通过加锁和解锁实现的。Java程序使用synchronized关键字对一个对象进行加锁:

synchronized(lock) {
    n = n + 1;
}

synchronized保证了代码块在任意时刻最多只有一个线程能执行。我们把上面的代码用synchronized改写如下:

// 多线程
public class Main {
    public static void main(String[] args) throws Exception {
        var add = new AddThread();
        var dec = new DecThread();
        add.start();
        dec.start();
        add.join();
        dec.join();
        System.out.println(Counter.count);
    }
}

class Counter {
    public static final Object lock = new Object();
    public static int count = 0;
}

class AddThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) {
            synchronized(Counter.lock) {
                Counter.count += 1;
            }
        }
    }
}

class DecThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) {
            synchronized(Counter.lock) {
                Counter.count -= 1;
            }
        }
    }
}

注意到代码:

synchronized(Counter.lock) { // 获取锁
    ...
} // 释放锁

它表示用Counter.lock实例作为锁,两个线程在执行各自的synchronized(Counter.lock) { ... }代码块时,必须先获得锁,才能进入代码块进行。执行结束后,在synchronized语句块结束会自动释放锁。这样一来,对Counter.count变量进行读写就不可能同时进行。上述代码无论运行多少次,最终结果都是0。

使用synchronized解决了多线程同步访问共享变量的正确性问题。但是,它的缺点是带来了性能下降。因为synchronized代码块无法并发执行。此外,加锁和解锁需要消耗一定的时间,所以,synchronized会降低程序的执行效率。

我们来概括一下如何使synchronized

找出修改共享变量的线程代码块;

选择一个共享实例作为锁;

使用synchronized(lockObject) { ... }

在使用synchronized的时候,不必担心抛出异常。因为无论是否有异常,都会在synchronized结束处正确释放锁:

public void add(int m) {
synchronized (obj) {
    if (m < 0) {
        throw new RuntimeException();
    }
    this.value += m;
} // 无论有无异常,都会在此释放锁
}

虚拟机规范的原子操作

Java虚拟机规范定义了一部分操作是原子性的,比如读取和赋值(除了longdouble类型)等。以下是一些例子:

  • 读写byteshortcharintfloatbooleanreference类型数据。无论是将字段赋值给变量,还是直接读取这个字段,甚至是在数组中,这些操作都是原子性的。

  • 对于声明为volatilelongdouble类型的变量,也是原子性操作。

  • 原子操作包括对引用类型变量的读和写。

但是,需要注意的是,即使在JVM中定义的原子操作,但是由于编译器优化和处理器乱序执行的存在,其执行的顺序和代码的顺序可能并不一致。由于这个原因,所以在多线程环境中,即使是单个的原子操作也是线程不安全的。除非关键字volatile修饰,或者对操作的变量进行适当的同步。

单条原子操作的语句不需要同步。例如:

public void set(int m) {
	synchronized(lock) {
    	this.value = m;
	}
}

就不需要同步。

对引用也是类似。例如:

public void set(String s) {
	this.value = s;
}

上述赋值语句并不需要同步。

但是,如果是多行赋值语句,就必须保证是同步操作,例如:

class Point {
    int x;
    int y;
    public void set(int x, int y) {
        synchronized(this) {
            this.x = x;
            this.y = y;
        }
    }
}

此外,不但写需要同步,读也需要同步:

class Point {
    int x;
    int y;

    public void set(int x, int y) {
        synchronized(this) {
            this.x = x;
            this.y = y;
        }
    }

    public int[] get() {
        int[] copy = new int[2];
        copy[0] = x;
        copy[1] = y;
    }
}

假定当前坐标是(100, 200),那么当设置新坐标为(110, 220)时,上述未同步的多线程读到的值可能有:

  • (100, 200):x,y更新前;

  • (110, 200):x更新后,y更新前;

  • (110, 220):x,y更新后。

如果读取到(110, 200),即读到了更新后的x,更新前的y,那么可能会造成程序的逻辑错误,无法保证读取的多个变量状态保持一致。

有些时候,通过一些巧妙的转换,可以把非原子操作变为原子操作。例如,上述代码如果改造成:

class Point {
    int[] ps;
    public void set(int x, int y) {
        int[] ps = new int[] { x, y };
        this.ps = ps;
    }
}

就不再需要写同步,因为this.ps = ps是引用赋值的原子操作。而语句:

int[] ps = new int[] { x, y };

这里的ps是方法内部定义的局部变量,每个线程都会有各自的局部变量,互不影响,并且互不可见,并不需要同步。

不过要注意,读方法在复制int[]数组的过程中仍然需要同步。

其他的同步方式

确保多线程环境下数据一致性除了使用javasynchronized关键字外,还有以下几种方式:

  • ReentrantLockReentrantLock是自JAVA5开始引入的显式锁,使用时需要手动获取锁和释放锁,由于可以灵活的控制锁的获取和释放时机,所以相比synchronizedReentrantLock的功能更加强大。

  • volatile关键字:volatile关键字可以保证变量的可见性,即当一个线程修改了变量的值,新值对其他线程来说是可以立即得知的。但请注意,volatile并不能保证操作的原子性。

  • Atomic类:java.util.concurrent.atomic包下提供了一些原子类,例如AtomicIntegerAtomicLongAtomicReference等,它们可以在无锁的情况下进行原子操作。

  • LockFree算法或数据结构:这通常需要对多线程编程有深入的理解,对于一些高并发场景,可以考虑使用。

  • ThreadLocalThreadLocal可以创建线程本地变量,每个线程都会创建一个对应的变量副本,线程之间互不影响,这样就可以保持数据的一致性。

同步方法

线程安全的类

没有特殊说明时,一个类默认是非线程安全的。

我们知道Java程序依靠synchronized对线程进行同步,使用synchronized的时候,锁住的是哪个对象非常重要。

让线程自己选择锁对象往往会使得代码逻辑混乱,也不利于封装。更好的方法是把synchronized逻辑封装起来。例如,我们编写一个计数器如下:

public class Counter {
    private int count = 0;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }

    public void dec(int n) {
        synchronized(this) {
            count -= n;
        }
    }

    public int get() {
        return count;
    }
}

这样一来,线程调用add()dec()方法时,它不必关心同步逻辑,因为synchronized代码块在add()dec()方法内部。并且,我们注意到,synchronized锁住的对象是this,即当前实例,这又使得创建多个Counter实例的时候,它们之间互不影响,可以并发执行:

var c1 = Counter();
var c2 = Counter();

// 对c1进行操作的线程:
new Thread(() -> {
    c1.add();
}).start();
new Thread(() -> {
    c1.dec();
}).start();

// 对c2进行操作的线程:
new Thread(() -> {
    c2.add();
}).start();
new Thread(() -> {
    c2.dec();
}).start();

现在,对于Counter类,多线程可以正确调用。

如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe),上面的Counter类就是线程安全的。Java标准库的java.lang.StringBuffer也是线程安全的。

还有一些不变类,例如StringIntegerLocalDate,它们的所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的。

最后,类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的。

synchronized 作用域

synchronized修饰的方法就是同步方法,它表示整个方法都必须用this实例加锁

当我们锁住的是this实例时,实际上可以用synchronized修饰这个方法。下面两种写法是等价的:

public void add(int n) {
    synchronized(this) { // 锁住this
        count += n;
    } // 解锁
}
public synchronized void add(int n) { // 锁住this
    count += n;
} // 解锁


对一个静态方法添加
synchronized修饰符,它锁住的是Class实例

对于static方法,是没有this实例的,因为static方法是针对类而不是实例。但是我们注意到任何一个类都有一个由JVM自动创建的Class实例,因此,对static方法添加synchronized,锁住的是该类的Class实例

下面两种写法是等价的:

public synchronized static void test(int n) {
    ...
}
public class Counter {
    public static void test(int n) {
        synchronized(Counter.class) {
            ...
        }
    }
}

死锁

可重入锁

Java的线程锁是可重入的锁。

什么是可重入的锁?我们还是来看例子:

public class Counter {
    private int count = 0;

    public synchronized void add(int n) {
        if (n < 0) {
            dec(-n);
        } else {
            count += n;
        }
    }

    public synchronized void dec(int n) {
        count += n;
    }
}

观察synchronized修饰的add()方法,一旦线程执行到add()方法内部,说明它已经获取了当前实例的this锁。如果传入的n < 0,将在add()方法内部调用dec()方法。由于dec()方法也需要获取this锁,现在问题来了:

对同一个线程,能否在获取到锁以后继续获取同一个锁?

答案是肯定的。JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。

由于Java的线程锁是可重入锁,所以,获取锁的时候,不但要判断是否是第一次获取,还要记录这是第几次获取。每获取一次锁,记录+1,每退出synchronized块,记录-1,减到0的时候,才会真正释放锁。

Java中,大多数的锁都被设计为可重入锁。可重入锁可以被同一个线程多次获取,而不会引发死锁。这消除了因线程需要再次获得自己已持有的锁而引发的死锁问题。

如果一个锁被设计为不可重入的,那么它一次只能被线程获取一次,如果同一个线程试图再次获取这个锁,就会阻塞,就算这个线程已经持有了该锁。当你看到“不可重入锁”,它们通常是在特殊的用途中,比如为了满足特殊的同步需求,或者进行特殊的性能优化。

然而,在绝大多数情况下,使用可重入锁是更安全、更方便的,因为它们可以避免很多由于编程错误导致的死锁问题。Java内部的锁及java.util.concurrent库提供的Lock接口中的常见实现(如ReentrantLockReentrantReadWriteLock)都是可重入的。

通常情况下,我们不需要把锁设计为不可重入,如果你发现自己在设计一个不可重入的锁,可能需要重新思考一下是否有其他更好的方法可以解决问题。

死锁

一个线程可以获取一个锁后,再继续获取另一个锁。例如:

public void add(int m) {
    synchronized(lockA) { // 获得lockA的锁
        this.value += m;
        synchronized(lockB) { // 获得lockB的锁
            this.another += m;
        } // 释放lockB的锁
    } // 释放lockA的锁
}

public void dec(int m) {
    synchronized(lockB) { // 获得lockB的锁
        this.another -= m;
        synchronized(lockA) { // 获得lockA的锁
            this.value -= m;
        } // 释放lockA的锁
    } // 释放lockB的锁
}

在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁。对于上述代码,线程1和线程2如果分别执行add()和dec()方法时:

  • 线程1:进入add(),获得lockA;

  • 线程2:进入dec(),获得lockB。

随后:

  • 线程1:准备获得lockB,失败,等待中;

  • 线程2:准备获得lockA,失败,等待中。

此时,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。

死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。

因此,在编写多线程应用时,要特别注意防止死锁。因为死锁一旦形成,就只能强制结束进程。

那么我们应该如何避免死锁呢?答案是:线程获取锁的顺序要一致。即严格按照先获取lockA,再获取lockB的顺序,改写dec()方法如下:

public void dec(int m) {
    synchronized(lockA) { // 获得lockA的锁
        this.value -= m;
        synchronized(lockB) { // 获得lockB的锁
            this.another -= m;
        } // 释放lockB的锁
    } // 释放lockA的锁
}

小结

Java的synchronized锁是可重入锁;

死锁产生的条件是多线程各自持有不同的锁,并互相试图获取对方已持有的锁,导致无限等待;

避免死锁的方法是多线程获取锁的顺序要一致。

wait & notify

介绍

wait()notify()Java多线程编程中的两个方法,它们必须在同步代码块或者同步方法中使用,因为它们需要调用对象的监视器(monitor)。

wait()方法用来让当前线程进入等待状态,同时也会让当前线程释放它所占有的对象的锁

notify()方法用来唤醒一个在等待状态中的线程,如果有多个线程处于等待状态,notify()方法只会唤醒其中一个线程。而notifyAll()方法可以唤醒所有在等待状态中的线程。

这两个方法经常在生产者/消费者模式中使用,例如:

public class SharedResource {
    private Object object = new Object();

    public void producer() {
        synchronized (object) {
            while (条件不满足) {
                try {
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 生产逻辑
                object.notifyAll(); // 唤醒正在等待的消费者
            }
        }
    }

    public void consumer() {
        synchronized (object) {
            while (条件满足) {
                try {
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 消费逻辑
                object.notifyAll(); // 唤醒正在等待的生产者
            }
        }
    }
}

这里的producer方法和consumer方法分别代表生产者和消费者,它们通过调用object.wait()来进入等待状态,并通过object.notifyAll()来唤醒对方。

举例

Java程序中,synchronized解决了多线程竞争的问题。例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized加锁:

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
    }
}

但是synchronized并没有解决多线程协调的问题。

仍然以上面的TaskQueue为例,我们再编写一个 getTask() 方法取出队列的第一个任务:

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
    }

    public synchronized String getTask() {
        while (queue.isEmpty()) {
        }
        return queue.remove();
    }
}

上述代码看上去没有问题:getTask()内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()循环退出,就可以返回队列的元素了。

但实际上while()循环永远不会退出。因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁

因此,执行上述代码,线程会在getTask()中因为死循环而100%占用CPU资源。

如果深入思考一下,我们想要的执行效果是:

  • 线程1可以调用addTask()不断往队列中添加任务;

  • 线程2可以调用getTask()从队列中获取任务。如果队列为空,则getTask()应该等待,直到队列中至少有一个任务时再返回。

因此,多线程协调运行的原则就是:当条件不满足时,线程进入等待状态(释放锁);当条件满足时,线程被唤醒,继续执行任务

对于上述TaskQueue,我们先改造getTask()方法,在条件不满足时,线程进入等待状态:

// 更改之后的 getTask()
public synchronized String getTask() {
    while (queue.isEmpty()) {
        this.wait();
    }
    return queue.remove();
}

// 在 同步方法/syncchronized 中有讲过,同等写法,这里温习一下
public String getTask() {
    synchronized(this){
        while (queue.isEmpty()) {
        	this.wait();
    	}
    	return queue.remove();
    }  
}

当一个线程执行到getTask()方法内部的while循环时,它必定已经获取到了this锁,此时,线程执行while条件判断,如果条件成立(队列为空),线程将执行this.wait(),进入等待状态。

这里的关键是:wait()方法必须在当前获取的锁对象上调用,这里获取的是this锁,因此调用this.wait()

简介中举例就是 object.wait() 即被锁的对象调用 wait() 方法

调用wait()方法后,线程进入等待状态,wait()方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后,wait()方法才会返回,然后,继续执行下一条语句。

有些仔细的童鞋会指出:即使线程在getTask()内部等待,其他线程如果拿不到this锁,照样无法执行addTask(),肿么办?

这个问题的关键就在于wait()方法的执行机制非常复杂。首先,它不是一个普通的Java方法,而是定义在Object类的一个native方法,也就是由JVMC代码实现的。其次,必须在synchronized块中才能调用wait()方法,因为wait()方法调用时,会释放线程获得的锁,wait()方法返回后,线程又会重新试图获得锁。

因此,只能在锁对象上调用wait()方法。因为在getTask()中,我们获得了this锁,因此,只能在this对象上调用wait()方法:

public synchronized String getTask() {
    while (queue.isEmpty()) {
        // 释放this锁:
        this.wait();
        // 重新获取this锁
    }
    return queue.remove();
}

当一个线程在this.wait()等待时,它就会释放this锁,从而使得其他线程能够在addTask()方法获得this锁

现在我们面临第二个问题:如何让等待的线程被重新唤醒,然后从wait()方法返回?答案是在相同的锁对象上调用notify()方法。我们修改addTask()如下:

public synchronized void addTask(String s) {
    this.queue.add(s);
    this.notify(); // 唤醒在this锁等待的线程
}

注意到在往队列中添加了任务后,线程立刻对this锁对象调用notify()方法,这个方法会唤醒一个正在this锁等待的线程(就是在getTask()中位于this.wait()的线程),从而使得等待线程从this.wait()方法返回。

最终的代码为:

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
        this.notifyAll();
    }

    public synchronized String getTask() throws InterruptedException {
        while (queue.isEmpty()) {
            this.wait();
        }
        return queue.remove();
    }
}

示例代码

package site.suimu.practise.multithreading.waitandnotify.correct;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Queue;

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        var q = new TaskQueue();
        var ts = getThreads(q);
        var add = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                // 放入task:
                String s = "t-" + i * 10;
                System.out.println("add task: " + s);
                q.addTask(s);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                }
            }
        });
        add.start();
        add.join();
        Thread.sleep(100);
        for (var t : ts) {
            t.interrupt();
        }
    }

    private static ArrayList<Thread> getThreads(TaskQueue q) {
        var ts = new ArrayList<Thread>();
        for (int i = 0; i < 5; i++) {
            var t = new Thread() {
                public void run() {
                    // 执行task:
                    while (true) {
                        try {
                            String s = q.getTask();
                            System.out.println("execute task: " + s);
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            };
            t.start();
            ts.add(t);
        }
        return ts;
    }
}

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
        this.notifyAll();
    }

    public synchronized String getTask() throws InterruptedException {
        while (queue.isEmpty()) {
            this.wait();
        }
        return queue.remove();
    }
}

小结

waitnotify用于多线程协调运行:

  • synchronized内部可以调用wait()使线程进入等待状态;

  • 必须在已获得的锁对象上调用wait()方法;

  • synchronized内部可以调用notify()notifyAll()唤醒其他等待线程;

  • 必须在已获得的锁对象上调用notify()notifyAll()方法;

  • 已唤醒的线程还需要重新获得锁后才能继续执行。

ReentrantLock

介绍

Java 5开始,引入了一个高级的处理并发的 java.util.concurrent 包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。

我们知道Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。

java.util.concurrent.locks包提供的ReentrantLock用于替代 synchronized加锁,我们来看一下传统的synchronized代码:

public class Counter {
    private int count;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }
}

如果用 ReentrantLock 替代,可以把代码改造为:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count;

    public void add(int n) {
        lock.lock();
        try {
            count += n;
        } finally {
            lock.unlock();
        }
    }
}

因为synchronizedJava语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLockJava代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。

顾名思义,ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁。

synchronized不同的是,ReentrantLock可以尝试获取锁:

if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}

上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。

所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。

JUC包

java.util.concurrent(简称JUC)包是Java提供的一个并发编程工具包,它对用于并发编程的一些基本概念进行了抽象和封装,提供了大量的类来支持并发编程,主要包括以下几个方面:

  1. 基本工具:提供了解决并发问题的基本工具,包括锁、同步器、线程安全的集合等。

  2. Executor框架:这是JUC的一个重要部分,提供了线程池等工具,用于管理线程的创建、执行和销毁过程。

  3. 并发集合java.util.concurrent包下提供了大量线程安全的集合类,比如ConcurrentHashMapConcurrentLinkedQueue等。

  4. 各种并发工具类:如SemaphoreCyclicBarrierCountDownLatch等,它们提供了在多线程之间进行同步控制的各种机制。

  5. 原子变量java.util.concurrent.atomic包下提供了支持无锁编程的原子操作类,如AtomicIntegerAtomicLongAtomicStampedReference等。

  6. 并发队列:提供了线程安全的并发队列,如ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue等。

这些工具可以帮助开发者更简单、更安全地完成并发编程,避免了复杂的同步控制,提高了编程效率。

ReentrantLock中的公平锁和非公平锁

公平锁和非公平锁是ReentrantLock提供的两种类型的锁,它们在锁的获取方式和处理策略上有所不同:

  1. 公平锁:公平锁的意思是,当锁不可用时,请求锁的线程将被放入到一个等待队列中,当锁释放时,队列中等待时间最长的那个(即最先请求锁的那个)线程会获得锁。这种机制保证了公平性,首先请求锁的线程首先得到锁。在创建ReentrantLock实例的时候,通过传递一个boolean值可以指定锁是否是公平的,true为公平锁,false为非公平锁。

  2. 非公平锁(默认):与公平锁相比的是,非公平锁在锁释放时,不一定会先通知等待时间最长的线程,而是给予任何线程获取的机会,这样有可能产生某些线程获取锁的时间明显较长的情况,所以是不公平的。

在性能上,公平锁和非公平锁也有所区别,由于公平锁需要维护一个线程等待队列,所以在一些情况下,公平锁的性能可能不如非公平锁。

使用示例如下:

// 创建一个公平锁
ReentrantLock fairLock = new ReentrantLock(true);

// 创建一个非公平锁
ReentrantLock nonfairLock = new ReentrantLock(false);

不论公平锁或者非公平锁,最重要的原则还是要确保在所有情况下都能释放掉锁,否则可能引起线程死锁。

ReentrantLockSynchronized的区别:

  1. 等待可中断:使用synchronized如果一个线程正在等待锁,那么它的等待是无法被中断的。而ReentrantLock提供了一个方法 lockInterruptibly(),可以在等待时响应中断。

  2. 公平和非公平锁:synchronized只提供非公平锁,非公平锁会优先给予等待时间最短的线程。而ReentrantLock既提供公平锁也提供非公平锁,公平锁的控制是严格按照线程加锁的顺序来分配的。

  3. 多个条件Variable:一个ReentrantLock可以同时绑定多个Condition对象,这样做的好处是可以实现多路唤醒,也就是调用一次signallAll()可以唤醒所有等待的线程,而synchronized要做到这点需要搭配多个Objectwait/notifyAll来实现。

  4. 锁粒度大:synchronized锁定的是整个方法或者同步块,没办法延迟锁定,而ReentrantLock可以调用lock()方法来获取锁,且可以尽可能延迟锁定,从而降低锁的粒度。

  5. 灵活性:synchronized的锁应用非常简单,只要在要同步的方法或者代码块声明就可以。但如果要实现更复杂的同步交互,就需要与其他的线程通信工具一起使用。而ReentrantLock提供的API更为丰富和灵活。

两者的选择需要根据实际的需求情况来定,通常情况下,如果需要实现简单的线程同步操作,可以优先使用synchronized,复杂的同步交互可以使用ReentrantLock

小结

  1. ReentrantLock可以替代 synchronized 进行同步;

  2. ReentrantLock获取锁更安全;

  3. 必须先获取到锁,再进入try {...}代码块,最后使用finally保证释放锁;

  4. 可以使用tryLock()尝试获取锁。

  5. ReentrantLock对于等待锁的处理提供了三种方法:中断等待、无限期等待和带有超时的等待,足够灵活以适应各种不同情况。同时还包含一个Condition类,可以达到Object类使用waitnotify的效果。

  6. ReentrantLock是一个可重入的互斥锁,从名字上看,它的一种特性就是“可重入”,也就是同一线程的外层函数获得锁后,内层递归函数可以直接再次获得该锁。在Java多线程编程中,ReentrantLock是一个非常重要的锁机制工具。

Condition

介绍

使用 ReentrantLock比直接使用 synchronized更安全,可以替代 synchronized进行线程同步。

但是,synchronized可以配合waitnotify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写waitnotify的功能呢?

答案是使用Condition对象来实现waitnotify的功能。

我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLockCondition来实现:

class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。

Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;

  • signal()会唤醒某个等待线程;

  • signalAll()会唤醒所有等待线程;

  • 唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

if (condition.await(1, TimeUnit.SECOND)) {
    // 被其他线程唤醒
} else {
    // 指定时间内没有被其他线程唤醒
}

可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。

小结

Condition可以替代waitnotify

Condition对象必须从Lock对象获取。

ReadWriteLock

介绍

前面讲到的ReentrantLock保证了只有一个线程可以执行临界区代码:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        lock.lock();
        try {
            counts[index] += 1;
        } finally {
            lock.unlock();
        }
    }

    public int[] get() {
        lock.lock();
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            lock.unlock();
        }
    }
}

但是有些时候,这种保护有点过头。因为我们发现,任何时刻,只允许一个线程修改,也就是调用inc()方法是必须获取锁,但是,get()方法只读取数据,不修改数据,它实际上允许多个线程同时调用。

实际上我们想要的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待:

允许

不允许

不允许

不允许

使用ReadWriteLock可以解决这个问题,它保证:

  • 只允许一个线程写入(其他线程既不能写入也不能读取);

  • 没有写入时,多个线程允许同时读(提高性能)。

ReadWriteLock实现这个功能十分容易。我们需要创建一个ReadWriteLock实例,然后分别获取读锁和写锁:

public class Counter {
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        wlock.lock(); // 加写锁
        try {
            counts[index] += 1;
        } finally {
            wlock.unlock(); // 释放写锁
        }
    }

    public int[] get() {
        rlock.lock(); // 加读锁
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            rlock.unlock(); // 释放读锁
        }
    }
}

把读写操作分别用读锁和写锁来加锁,在读取时,多个线程可以同时获得读锁,这样就大大提高了并发读的执行效率。

使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。

例如,一个论坛的帖子,回复可以看做写入操作,它是不频繁的,但是,浏览可以看做读取操作,是非常频繁的,这种情况就可以使用ReadWriteLock

小结

使用ReadWriteLock可以提高读取效率:

  • ReadWriteLock只允许一个线程写入;

  • ReadWriteLock允许多个线程在没有写入时同时读取;

  • ReadWriteLock适合读多写少的场景。

StampedLock

介绍

前面介绍的ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。

如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。

要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock

StampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

我们来看例子:

public class Point {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock(); // 获取写锁
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 释放写锁
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
        // 注意下面两行代码不是原子操作
        // 假设x,y = (100,200)
        double currentX = x;
        // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
        double currentY = y;
        // 此处已读取到y,如果没有写入,读取是正确的(100,200)
        // 如果有写入,读取是错误的(100,400)
        if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
            stamp = stampedLock.readLock(); // 获取一个悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 释放悲观读锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。

可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

小结

StampedLock提供了乐观读锁,可取代ReadWriteLock以进一步提升并发性能;

StampedLock是不可重入锁。

Semaphore

介绍

前面我们讲了各种锁的实现,本质上锁的目的是保护一种受限资源,保证同一时刻只有一个线程能访问(ReentrantLock),或者只有一个线程能写入(ReadWriteLock)

还有一种受限资源,它需要保证同一时刻最多有N个线程能访问,比如同一时刻最多创建100个数据库连接,最多允许10个用户下载等。

这种限制数量的锁,如果用Lock数组来实现,就太麻烦了。

这种情况就可以使用Semaphore,例如,最多允许3个线程同时访问:

public class AccessLimitControl {
    // 任意时刻仅允许最多3个线程获取许可:
    final Semaphore semaphore = new Semaphore(3);

    public String access() throws Exception {
        // 如果超过了许可数量,其他线程将在此等待:
        semaphore.acquire();
        try {
            // TODO:
            return UUID.randomUUID().toString();
        } finally {
            semaphore.release();
        }
    }
}

使用Semaphore先调用acquire()获取,然后通过try ... finally保证在finally中释放。

调用acquire()可能会进入等待,直到满足条件为止。也可以使用tryAcquire()指定等待时间:

if (semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
    // 指定等待时间3秒内获取到许可:
    try {
        // TODO:
    } finally {
        semaphore.release();
    }
}

Semaphore本质上就是一个信号计数器,用于限制同一时间的最大访问数量。

小结

如果要对某一受限资源进行限流访问,可以使用 Semaphore,保证同一时间最多N个线程访问受限资源。

Concurrent集合

介绍

我们在前面已经通过ReentrantLockCondition实现了一个BlockingQueue

public class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

BlockingQueue的意思就是说,当一个线程调用这个TaskQueue的getTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。

因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue

除了BlockingQueue外,针对ListMapSetDeque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:

interface

non-thread-safe

thread-safe

List

ArrayList

CopyOnWriteArrayList

Map

HashMap

ConcurrentHashMap

Set

HashSet / TreeSet

CopyOnWriteArraySet

Queue

ArrayDeque / LinkedList

ArrayBlockingQueue / LinkedBlockingQueue

Deque

ArrayDeque / LinkedList

LinkedBlockingDeque

使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:

Map<String, String> map = new ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");

因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:

Map<String, String> map = new HashMap<>();

改为:

Map<String, String> map = new ConcurrentHashMap<>();

就可以了。

java.util.Collections工具类还提供了一个旧的线程安全集合转换器,可以这么用:

Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);

但是它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。

使用案例

public class Example {
    public static void main(String[] args) throws InterruptedException {
        Map<String, String> unSafeMap = new HashMap<String, String>();
        Map<String, String> safeMap = new ConcurrentHashMap<>();
        addItemToMap("unsafe",unSafeMap);
        addItemToMap("safe",safeMap);
    }
    public static void addItemToMap(String type, Map<String, String> map) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 50000; i++) {
                map.put(String.valueOf(i), String.valueOf(i));
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 50000; i < 100000; i++) {
                map.put(String.valueOf(i), String.valueOf(i));
            }
        });
        t1.start();
        t2.start();
        // 等待线程执行完
        t1.join();
        t2.join();
        System.out.println(type + " Final size: " + map.size());
    }
}

执行结果:

unsafe Final size: 82362

safe Final size: 100000

小结

使用java.util.concurrent包提供的线程安全的并发集合可以大大简化多线程编程:

多线程同时读写并发集合是安全的;

尽量使用Java标准库提供的并发集合,避免自己编写同步代码。

Atomic

介绍

Javajava.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包。

我们以AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)

  • 加1后返回新值:int incrementAndGet()

  • 获取当前值:int get()

  • 用CAS方式设置:int compareAndSet(int expect, int update)

Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CASCompare and Set

如果我们自己通过CAS编写incrementAndGet(),它大概长这样:

public int incrementAndGet(AtomicInteger var) {
    int prev, next;
    do {
        prev = var.get();
        next = prev + 1;
    } while ( ! var.compareAndSet(prev, next));
    return next;
}

CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do ... while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。

我们利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:

class IdGenerator {
    AtomicLong var = new AtomicLong(0);

    public long getNextId() {
        return var.incrementAndGet();
    }
}

通常情况下,我们并不需要直接用do ... while循环调用compareAndSet实现复杂的并发操作,而是用incrementAndGet()这样的封装好的方法,因此,使用起来非常简单。

在高度竞争的情况下,还可以使用Java 8提供的LongAdderLongAccumulator

小结

使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:

  • 原子操作实现了无锁的线程安全;

  • 适用于计数器,累加器等。

例子

public class AtomicIntegerExample {
    private static AtomicInteger counter = new AtomicInteger(0);
    private static Integer counter1 = 0;
    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[100];
        for (int i = 0; i < 100; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    counter.incrementAndGet();
                    counter1 = counter1 + 1;
                }
            });
            threads[i].start();
        }
        // 确保所有线程都执行完毕
        for (int i = 0; i < 100; i++) {
            threads[i].join();
        }
        System.out.println("Final counter value: " + counter.get());
        System.out.println("Final counter1 value: " + counter1);
    }
}

输出结果:

Final counter value: 1000000

Final counter1 value: 200989

线程池

介绍

Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。

如果可以复用一组线程:

┌─────┐ execute  ┌──────────────────┐
│Task1│─────────>│ThreadPool        │
├─────┤          │┌───────┐┌───────┐│
│Task2│          ││Thread1││Thread2││
├─────┤          │└───────┘└───────┘│
│Task3│          │┌───────┐┌───────┐│
├─────┤          ││Thread3││Thread4││
│Task4│          │└───────┘└───────┘│
├─────┤          └──────────────────┘
│Task5│
├─────┤
│Task6│
└─────┘
  ...

那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。

简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);

因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:

  • FixedThreadPool:线程数固定的线程池;

  • CachedThreadPool:线程数根据任务动态调整的线程池;

  • SingleThreadExecutor:仅单线程执行的线程池。

创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:

// thread-pool
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池:
        ExecutorService es = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 6; i++) {
            es.submit(new Task("" + i));
        }
        // 关闭线程池:
        es.shutdown();
    }
}

class Task implements Runnable {
    private final String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("start task " + name);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("end task " + name);
    }
}

我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。

线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。

如果我们把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。

如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

因此,想创建指定动态范围的线程池,可以这么写:

int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,
                                            60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

ExxcutorService

在Java中,ExecutorService 是一个接口,真正提供功能的是它的实现类。这些实现类主要包括以下几种:

  1. ThreadPoolExecutor:这是一个可以根据需要控制任务执行策略的灵活的线程池实现。核心和最大的线程数,保持连接的闲置时间,运行非核心线程的时间,队列等待策略等都可以自定义。在大多数情况下,都推荐直接使用这个类创建和管理线程。

  2. ScheduledThreadPoolExecutor:这是ThreadPoolExecutor的一种扩展,它可以用来周期性地执行任务。它提供的schedule方法可以用来执行一次性任务,scheduleAtFixedRatescheduleWithFixedDelay可以用来执行周期性任务。

  3. ForkJoinPool:这是一个提供了任务拆分执行然后合并结果的线程池实现。它设计用来解决大任务需要拆分成小任务并行执行的问题。

Java还提供了一个Executors工具类,它提供了一系列工厂方法用来创建不同类型的ExecutorService。这些工厂方法方便快捷,但通常不推荐在生产环境使用,因为它们都使用了无界的任务队列,可能会导致OOM(内存溢出)问题。在生产环境下,应该使用ThreadPoolExecutor自定义线程池参数。

Executors 工具类

Executors 是一个工具类,主要用于创建不同类型的线程池,以下是它的主要方法:

  1. newCachedThreadPool():创建一个线程池,该线程池根据需要创建新的线程,但是在可用线程处于空闲状态时,将重用之前构造的可用线程。这些池通常将提高执行许多短期异步任务的程序的性能。调用 execute() 将重用以前构造的线程(如果可用)。如果没有可用的线程,将创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒未被使用的线程。

  2. newSingleThreadExecutor():创建一个包含一个线程的线程池,顺序执行提交的任务,如果这个线程异常结束,会创建一个新的线程继续执行后续的任务。

  3. newFixedThreadPool(int nThreads):创建固定数目线程的线程池,可以执行长期的任务。这种类型的线程池特性是有固定线程数的线程池,如果当前需要执行的任务超过线程池数量,那么部分任务会在任务队列中等待线程池中线程空闲出来后被执行。

  4. newScheduledThreadPool(int corePoolSize):创建一个定长线程池,支持定时及周期性任务执行,延迟或者定期执行任务。

以上四种类型的线程池,底层都是通过 ThreadPoolExecutor 实现的,只是参数有所不同。这些快捷方法适用于快速开发或者测试环境,如果用于生产环境,一般建议使用 ThreadPoolExecutor 的构造函数,自定义参数创建线程池,根据任务的性质有针对性地调整线程池的相关参数,以提升程序性能,防止资源浪费和程序崩溃。

ScheduledThreadPool

还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。

创建一个ScheduledThreadPool仍然是通过Executors类:

ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

我们可以提交一次性任务,它会在指定延迟后只执行一次:

// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

如果任务以固定的每3秒执行,我们可以这样写:

// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

如果任务以固定的3秒为间隔执行,我们可以这样写:

// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

注意FixedRateFixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:

│░░░░   │░░░░░░ │░░░    │░░░░░  │░░░  
├───────┼───────┼───────┼───────┼────>
│<─────>│<─────>│<─────>│<─────>│

FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:

│░░░│       │░░░░░│       │░░│       │░
└───┼───────┼─────┼───────┼──┼───────┼──>
    │<─────>│     │<─────>│  │<─────>│

因此,使用ScheduledThreadPool时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。

细心的童鞋还可以思考下面的问题:

  • 问题一:在FixedRate模式下,假设每秒触发,如果某次任务执行时间超过1秒,后续任务会不会并发执行?

  • 问题二:如果任务抛出了异常,后续任务是否继续执行?

Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer

问题一

后续任务并不会并发执行,如果某次的任务执行时间超过了1秒,那么它就会推迟下一个任务的执行,直到当前的任务执行完毕,维持任务执行的顺序性,而不是并行地执行。

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        Runnable task = new Runnable() {
            public void run() {
                // 创建一个当前时间的实例
                LocalDateTime now = LocalDateTime.now();
                // 创建一个日期时间格式化器
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                // 使用格式化器格式化当前时间
                String formattedDateTime = now.format(formatter);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                // 输出格式化后的时间
                System.out.println("Executing Task At " + formattedDateTime);
            }
        };
        System.out.println("Submitting task at " + System.nanoTime() + " to be executed every 2 seconds with an initial delay of 0 seconds");
        scheduledExecutorService.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
    }
}

执行结果

Executing Task At 2024-01-09 17:00:36
Executing Task At 2024-01-09 17:00:38
Executing Task At 2024-01-09 17:00:40
Executing Task At 2024-01-09 17:00:42
Executing Task At 2024-01-09 17:00:44
Executing Task At 2024-01-09 17:00:46
Executing Task At 2024-01-09 17:00:48

问题二

执行 ScheduledThreadPoolExecutor 中的定时任务时,如果某次任务执行过程中抛出了未检查的异常(比如 RuntimeException),那么 ScheduledThreadPoolExecutor 将会停止后续任务的执行,即使你设置了该任务是周期性执行。这是因为 ScheduledThreadPoolExecutor 无法从任务的失败中自动恢复。

为了防止这种情况的发生,我们应该在定时任务的代码中适当地应用异常处理机制。下面是一个简单的例子:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
Runnable task = new Runnable() {
    public void run() {
        try {
            // 在这里执行你的任务
            throw new RuntimeException("An exception happened.");
        } catch (Throwable t) {
            // 对异常进行处理,然后可以打印输出,例如:
            System.out.println("Exception in task: " + t.getMessage());
            // 如果需要的话,重新抛出异常
            // throw t;
        }
    }
};

scheduledExecutorService.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);

在这个例子中,我们捕获到任务中出现的除可检查性异常之外的所有类型的错误和异常,并对其进行处理,这样就可以防止异常导致后续任务的停止。

小结

JDK提供了ExecutorService实现了线程池功能:

  • 线程池内部维护一组线程,可以高效执行大量小任务;

  • Executors提供了静态方法创建不同类型的ExecutorService

  • 必须调用shutdown()关闭ExecutorService

  • ScheduledThreadPool可以定期调度多个任务。

Future

介绍

在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable接口,就可以让线程池去执行:

class Task implements Runnable {
    public String result;

    public void run() {
        this.result = longTimeCalculation(); 
    }
}

Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:

class Task implements Callable<String> {
    public String call() throws Exception {
        return longTimeCalculation(); 
    }
}

并且Callable接口是一个泛型接口,可以返回指定类型的结果。

现在的问题是,如何获得异步执行的结果?

如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象:

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞

当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。

一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)

  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;

  • cancel(boolean mayInterruptIfRunning):取消当前任务;

  • isDone():判断任务是否已完成。

小结

对线程池提交一个Callable任务,可以获得一个Future对象;

可以用Future在将来某个时刻获取结果。



CompletableFuture

介绍

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

尤其当我们在碰到一下业务场景的时候,单纯使用Future接口或者FutureTask类并不能很好地完成以下我们所需的业务:

  • 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果

  • 等待Future集合种的所有任务都完成。

  • 仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。

  • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。

  • 应对Future的完成时间(即当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果)

Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

我们以获取股票价格为例,看看如何使用 CompletableFuture :

public class CompletableFutureExample {
    public static void main(String[] args) throws Exception {
        // 创建异步执行任务:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(CompletableFutureExample::fetchPrice);
        // 如果执行成功:
        cf.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 如果执行异常:
        cf.exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(200);
    }

    static Double fetchPrice() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException ignored) {
        }
        if (Math.random() < 0.3) {
            throw new RuntimeException("fetch price failed!");
        }
        return 5 + Math.random() * 20;
    }
}

串行执行

如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

public class SerialTest {

    public static void main(String[] args) throws Exception {
        // 第一个任务:
        CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油");
        });
        // cfQuery成功后继续执行下一个任务:
        CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice(code);
        });
        // cfFetch成功后打印结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(2000);
    }

    static String queryCode(String name) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        return "601857";
    }

    static Double fetchPrice(String code) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}

并行执行

除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:

同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

public class ParallelTest {
    public static void main(String[] args) throws Exception {
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
            String result = queryCode("中国石油", "https://finance.sina.com.cn/code/");
            System.out.println("get query from finance.sina.com");
            return result;
        });
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
            String result = queryCode("中国石油", "https://money.163.com/code/");
            System.out.println("get query from money.163.com");
            return result;
        });

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

        // 两个CompletableFuture执行异步查询:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
            Double result = fetchPrice((String) code, "https://finance.sina.com.cn/price/");
            System.out.println("Get price from finance.sina.com");
            return result;
        });
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
            Double result = fetchPrice((String) code, "https://money.163.com/price/");
            System.out.println("Get price from money.163.com");
            return result;
        });

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

        // 最终结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(200);
    }
    static String queryCode(String name, String url) {
        System.out.println("query code from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return "601857";
    }
    static Double fetchPrice(String code, String url) {
        System.out.println("query price from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}

运行结果:

query code from https://money.163.com/code/...
query code from https://finance.sina.com.cn/code/...
get query from finance.sina.com
query price from https://finance.sina.com.cn/price/...
query price from https://money.163.com/price/...
Get price from money.163.com
price: 9.679795394882749
get query from money.163.com
Get price from finance.sina.com

上述逻辑实现的异步查询规则实际上是:

┌─────────────┐ ┌─────────────┐
│ Query Code  │ │ Query Code  │
│  from sina  │ │  from 163   │
└─────────────┘ └─────────────┘
       │               │
       └───────┬───────┘
               ▼
        ┌─────────────┐
        │    anyOf    │
        └─────────────┘
               │
       ┌───────┴────────┐
       ▼                ▼
┌─────────────┐  ┌─────────────┐
│ Query Price │  │ Query Price │
│  from sina  │  │  from 163   │
└─────────────┘  └─────────────┘
       │                │
       └────────┬───────┘
                ▼
         ┌─────────────┐
         │    anyOf    │
         └─────────────┘
                │
                ▼
         ┌─────────────┐
         │Display Price│
         └─────────────┘

除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

最后我们注意CompletableFuture的命名规则:

  • xxx():表示该方法将继续在已有的线程中执行;

  • xxxAsync():表示将异步在线程池中执行。

小结

CompletableFuture可以指定异步处理流程:

  • thenAccept()处理正常结果;

  • exceptional()处理异常结果;

  • thenApplyAsync()用于串行化另一个CompletableFuture

  • anyOf()allOf()用于并行化多个CompletableFuture

ForkJoin

介绍

ForkJoin框架是Java 7中引入的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最后汇总每个小任务结果得到大任务结果的框架。

具体的使用包括以下两个步骤:

  1. 创建一个继承自RecursiveTaskRecursiveAction的子类,覆写compute方法定义需要完成的任务。

  1. RecursiveTask: 用于有返回结果的任务。

  2. RecursiveAction: 用于没有返回结果的任务。

  1. 创建ForkJoinPool来执行ForkJoinTask。使用ForkJoinPoolinvokesubmit方法。

下面是一个例子,显示如何使用ForkJoin实现一个简单的并行求和运算:

import java.util.concurrent.*;

public class ForkJoinExample {

    static class SumTask extends RecursiveTask<Integer> {
        static final int THRESHOLD = 100;
        int start;
        int end;
        SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            if(end - start <= THRESHOLD) {
                int sum = 0;
                for(int i=start; i<end; i++)
                    sum += i;
                return sum;
            }

            int mid = (end + start) / 2;

            SumTask leftTask = new SumTask(start, mid);
            SumTask rightTask = new SumTask(mid, end);

            leftTask.fork();
            rightTask.fork();

            return leftTask.join() + rightTask.join();
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        System.out.println(pool.invoke(new SumTask(0, 10000)));
    }
}

在这个例子中,我们创建了一个SumTask类,继承自RecursiveTask,用于计算一系列数字的总和,并且当数值过大时会将任务分割成更小的任务来并行执行,最后我们使用ForkJoinPool来执行任务并获取结果。

Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。

小结

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction

使用Fork/Join模式可以进行并行计算以提高效率。

ThreadLocal

介绍

多线程是Java实现多任务的基础,Thread对象代表一个线程,我们可以在代码中调用Thread.currentThread()获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字:

// Thread
public class Main {
    public static void main(String[] args) throws Exception {
        log("start main...");
        new Thread(() -> {
            log("run task...");
        }).start();
        new Thread(() -> {
            log("print...");
        }).start();
        log("end main.");
    }

    static void log(String s) {
        System.out.println(Thread.currentThread().getName() + ": " + s);
    }
}

对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:

public void process(User user) {
    checkPermission();
    doWork();
    saveStatus();
    sendResponse();
}

然后,通过线程池去执行这些任务。

观察process()方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?

process()方法需要传递的状态就是User实例。有的童鞋会想,简单地传入User就可以了:

public void process(User user) {
    checkPermission(user);
    doWork(user);
    saveStatus(user);
    sendResponse(user);
}

但是往往一个方法又会调用其他很多方法,这样会导致User传递到所有地方:

void doWork(User user) {
    queryStatus(user);
    checkStatus();
    setNewStatus(user);
    log();
}

这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。

给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User对象就传不进去了。

Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象。

ThreadLocal实例通常总是以静态字段初始化如下:

static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();

它的典型使用方式如下:

void processUser(user) {
    try {
        threadLocalUser.set(user);
        step1();
        step2();
    } finally {
        threadLocalUser.remove();
    }
}

通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:

void step1() {
    User u = threadLocalUser.get();
    log();
    printUser();
}

void log() {
    User u = threadLocalUser.get();
    println(u.name);
}

void step2() {
    User u = threadLocalUser.get();
    checkUser(u.id);
}

注意到普通的方法调用一定是同一个线程执行的,所以,step1()step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例。

实际上,可以把ThreadLocal看成一个全局Map<Thread, Object>:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key

Object threadLocalValue = threadLocalMap.get(Thread.currentThread());

因此,ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰。

最后,特别注意ThreadLocal一定要在finally中清除:

try {
    threadLocalUser.set(user);
    ...
} finally {
    threadLocalUser.remove();
}

这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。

为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {...}结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal可以封装为一个UserContext对象:

public class UserContext implements AutoCloseable {

    static final ThreadLocal<String> ctx = new ThreadLocal<>();

    public UserContext(String user) {
        ctx.set(user);
    }

    public static String currentUser() {
        return ctx.get();
    }

    @Override
    public void close() {
        ctx.remove();
    }
}

使用的时候,我们借助try (resource) {...}结构,可以这么写:

try (var ctx = new UserContext("Bob")) {
    // 可任意调用UserContext.currentUser():
    String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象

这样就在UserContext中完全封装了ThreadLocal,外部代码在try (resource) {...}内部可以随时调用UserContext.currentUser()获取当前线程绑定的用户名。

小结

ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;

ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);

使用ThreadLocal要用try ... finally结构,并在finally中清除。

License:  CC BY 4.0