• 第03篇_并发编程

    第01章_线程基础

    第一节 线程的基本概念

    1. 创建和启动线程

    1) 直接自定义线程

    可以通过继承java.lang.Thread类,重写其run方法来直接定义一个线程。

     

    2) 定义Runnable实现类

    由于Java只支持单继承,因此一般都是定义java.lang.Runnable接口的实现类,然后创建Runnable的对象传给Thread对象去执行。

     

     

    2. 线程基本属性

     

    3. 线程六大状态

    线程状态为Thread.State枚举,有如下六种状态:

    线程状态状态说明
    NEW新建。线程创建但未启动。
    RUNNABLE可运行。线程在运行或具备运行条件只是在等待操作系统调度。
    BLOCKED被动阻塞。等待锁或IO等资源。
    WAITING主动等待。线程在等待某个条件,调用wait()/join()方法会进入该状态。
    TIMED_WAITING超时等待。线程在等待某个条件或超时,调用wait(long timeout)/join(long millis)/sleep(long millis)会进入该状态。
    TERMINATED终止。线程已结束。

    注意:

    1. RUNNABLE不代表CPU一定在执行该线程的代码,也可能在等待操作系统分配时间片,只是它没有在等待其他条件。

     

    4. 线程基本操作

    注意:

    1. wait()对象级别的方法,定义在 Object 类,用于线程间的协作,需要在同步代码块中调用,会释放对象锁

    2. join()线程级别的方法,定义在 Thread 类,用于线程的同步,不会释放锁,用于等待目标线程执行完成。

     

    5. 线程的优点及成本

    优点:多线程可以充分利用多核CPU的计算能力以及相关的硬件资源,因为一个线程在占用CPU时,另一个线程可以同时进行IO等操作。同理,由于多个执行流,对保持程序的响应性也十分有帮助,如GUI程序中,一个线程处理慢任务时,另一个线程还能够向用户反馈进度信息。

    成本:创建和启动线程需要消耗操作系统的资源,并且,线程调度和切换也是有成本的。因此,如果执行的任务都是CPU密集型的,即主要消耗的都是CPU,那创建超过CPU数量的线程就是没有必要的,并不会加快程序的执行。

     

    6. 面试补充

    1) 进程 vs 线程?

     

    2) 并行 vs 并发

     

    3) 如何创建线程?

     

    4) 多线程的优点和缺点

     

    5) 并发编程的三大特性

     

    6) Thread#sleep() vs Object#wait()

     

     

     

     

    第二节 线程竞争

    1. 线程竞争情形

    多个线程可以对同一个变量进行读写操作,但由于非原子性读写CPU缓存等原因,可能形成竞态条件或出现内存可见性问题

     

    1) 竞态条件

    竞态条件(race condition)指当多个线程读写同一个对象时,最终执行结果与执行时序有关,可能正确,也可能不正确。

    为什么会这样呢?因为counter++不是原子操作,它先取counter的当前值,然后在当前值的基础上加1,再赋值回counter,而在它取到当前值到赋新值之间,当前值也可能被其它线程取走。

     

    2) 内存可见性问题

    内存可见性问题指一个线程对共享变量的修改,由于CPU缓存原因,另一个线程不一定马上就能看到,甚至永远也看不到。

    为什么会这样呢?由于CPU缓存等原因,可能修改没有及时同步到内存,也可能另一个线程根本就没从内存读。

     

    2. 线程竞争解决方案(synchronized)

    前面说到,当多个线程读写同一个变量时,可能形成竞态条件或出现内存可见性问题,这就是线程之间的竞争,在Java中,最简单的解决方式是使用synchronized关键字。

     

    1) 修饰实例方法

    上节我们介绍竞态条件时,了解到counter++是非原子操作,可能造成结果比预期偏小,现在我们可以对counter其进行包装,通过synchronized关键字修饰的实例方法来对其进行自增和读取操作,这样就始终和预期一致了。

    这里,synchronized到底做了什么呢?在进入synchronized实例方法前,会尝试对this对象进行加锁操作,当前线程不能获得锁的时候,它会进入等待队列(此时线程状态变为BLOCKED),这样来确保只有一个线程执行。

    注意:

    1. synchronized是对this对象加锁,而非对方法加锁

      • 当this对象不同时,是可以执行同一个synchronized实例方法的;

      • 当this对象相同时,即使是其它synchronized实例方法也是不可以执行的。

    2. synchronized机制不能防止非synchronized方法被同时执行,如Counter类添加一个非synchronized的decr方法,进行count--操作,是无法同步的,因此,一般在保护变量时,需要在所有访问该变量的方法上加上synchronized

     

    2) 修饰静态方法

    synchronized同样可以用于静态方法,比如:

    当修饰静态方法时,不能对this对象加锁了,它将对类对象(StaticCounter.class)进行加锁,加锁逻辑基本一致。

    注意:

    1. synchronized静态方法和synchronized实例方法对不同对象进行加锁,也就是说,不同的两个线程,可以一个执行synchronized静态方法,另一个执行synchronized实例方法。

    2. synchronized不可以修饰构造方法(因为不可能多个线程同时调用同一个对象的构造方法),如需在构造时访问共享资源,只能在构造方法内部修饰代码块。

     

    3) 修饰代码块

    除了用于修饰方法外,synchronized还可以用于修饰代码块,比如对于前面的Counter类,等价的代码可以为:

    此时,加锁的对象就是小括号里面的lock变量,它可以是任意对象。但是一般来说,在使用synchronized修饰代码块时,习惯使用this(实例方法中)或类对象(静态方法中)作为锁对象。

    注意:

    1. 尽量不要使用包装类字符串作为锁对象,因为它们有缓存池或常量池。

    2. 使用 字符串.intern() 作为锁对象时,由于字符串常量池的存在,锁对象粒度会过大。

     

    3. synchronized的特性

    1) 可重入性

    synchronized方法或代码块是可重入的,即某个线程获得锁之后,再次调用synchronized方法可直接调用,它是通过记录锁的持有线程和持有数量来实现的

     

    2) 内存可见性(volatile)

    synchronized还可以保证共享变量的内存可见性,在获得锁后,会始终从内存读取最新数据,在释放锁后,所有写入都会写回内存。

    但如果只需要保证内存可见性,而无需保证原子性,可以使用更轻量级的方式,那就是给共享变量加修饰符volatile

    加了volatile之后,Java会在操作对应变量时插入特殊的指令,保证读写到内存最新值,而非缓存的值。

     

    3) 可能死锁

    synchronized需进行加锁,当修饰代码块时,如果以不同顺序对多个对象进行加锁时,可能造成死锁。

    死锁可通过jstack -pid命令查看,我们在写程序时,应该尽量避免在持有一个锁的同时去申请另一个锁,如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。在Java中,也有一些显式锁支持tryLock以及加锁超时的设定,也可以一定程度解决死锁问题。

     

    4) 无法响应中断

    获取synchronized锁时,如果未获取到,将进入BLOCK状态,该状态无法响应中断请求,而显式锁Lock则支持以响应中断的方式获取锁。

     

    5) 只有一个条件队列

    synchronized的协作机制wait/notify,只支持一个条件队列,如果有多个等待条件,只能共用,并且在通知时必须通知所有等待的线程。

     

    4. synchronized容器

    1) 基本使用

    Collections类有一些方法可以修饰普通容器,返回线程安全同步容器,它们是给所有容器方法都加上synchronized来实现安全的。

    这里线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,不需要额外的同步操作,也不会出现错误的结果。

     

    2) 复合操作与伪同步

    加了synchronized,所有方法调用变成了原子操作,客户端在调用时,是不是就绝对安全了呢?不是的,至少有如下情况需要注意:

    上述案例中的原子操作显然是不能实现的,虽然map.get(key)和map.put(key, value)是原子的,但是它们组合后就不是原子的了

    那给putIfAbsent方法加上synchronized呢?也同样也不能实现原子操作,因为putIfAbsent方法的synchronized是对当前的EnhancedMap对象加锁,而map.get(key)和map.put(key, value)方法是对map加锁,这是一种伪同步

    但我们可以在putIfAbsent方法中使用synchronized代码块,并且使用map进行加锁。

     

    3) 并发修改异常

    此外,还需注意,虽然同步容器的单个操作是安全的,但是在增删元素时进行迭代操作依然可能会抛出 ConcurrentModificationException

    同步容器并没有解决这个问题,如果要避免这个异常,需要在遍历的时候给整个容器对象加锁,如startIteratorThread可以改为:

     

    4) 性能问题

    除了以上这些注意事项,同步容器的性能也是比较低的,当并发访问量比较大的时候性能很差。在高并发场景,可以使用CopyOnWriteArrayList、ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListSet等专为并发设计的容器类,它们都是线程安全的,但都没有使用synchronized没有迭代问题、直接支持一些复合操作、性能也高得多。

     

    5. 面试补充

    1) volatile

     

    2) synchronized

     

     

    第三节 线程协作

    多线程之间除了竞争,还经常需要相互协作,本节就来介绍下多线程协作的基本机制wait/notify

     

    1. 线程协作的场景

    多线程之间需要协作的场景有很多,比如说:

    我们会探讨如何实现这些协作场景,在此之前,我们先来了解协作的基本方法wait/notify

     

    2. wait/notify机制

    在Java中,任意对象都可以参与线程协作,它们都有waitnotify方法,可以在synchronized代码块中调用:

    前面提到,当线程等待synchronized锁时,会将线程加入到锁对象的等待队列中。而线程在synchronized代码块内主动调用wait方法时,会将线程加入到另一个条件队列,并进行阻塞。

    调用wait方法后,将释放持有的锁,线程状态变为WAITING/TIMED_WAITING,当被唤醒后,需重新竞争锁。

    被阻塞的线程需要其它线程调用该对象的notify方法来将它唤醒。调用notify后,不会立即释放持有的锁,而是等待synchronzied代码块执行完后才释放。这意味着wait方法必须等到调用notify的那段同步代码执行结束后才可能获得锁响应通知。

    注意:

    1. wait/notify方法只能在synchronized代码块内被调用,否则将抛出IllegalMonitorStateException(无对象监视器:monitor)。

    2. join方法也可进行等待,如前面介绍的主线程等待子线程结束案例,当子线程运行结束时,由java系统调用notifyAll来通知。

    在使用wait/notify时,一定要明确wait等待的是什么?在满足什么条件后调用notify?并且需注意,从wait返回后,不一定表示等待的条件就满足了,仍需进行条件检查,因此,wait方法的一般调用模式为:

    接下来,我们通过一些场景来进一步理解wait/notify的应用。

     

    3. 生产者/消费者模式

    在生产者/消费者模式中,协作的共享变量是队列,生产者往队列上放数据,如果满了就wait,而消费者从队列上取数据,如果队列为空也wait。我们将队列作为单独的类进行设计,代码如下:

    在上面代码中,生产者和消费者都调用了wait方法,但它们等待的条件是不一样的。生产者在队列为满时等待,而消费者在队列为空的时候等待,它们等待条件不同但又使用相同的条件队列,所以要调用notifyAll而不能调用notify,因为notify可能唤醒的恰好是同类线程。

    类似的,它们也都调用了notifyAll方法,但是需满足的条件也不一致,生产者在“有数据了”的条件下通知消费者,消费者在“有空位了”的条件下通知生产者。

    注意:synchronized的局限性

    1. synchronized获取锁的wait/notify协作机制,只能有一个条件等待队列,这是它的局限性,使的分析变得复杂,后面将会介绍支持多个条件的显式锁。

    2. 使用synchronized关键字获取锁的过程中不响应中断请求,这也是synchronized的局限性。

     

    4. 同时开始

    在同时开始模式中,协作的共享变量是一个开始信号。我们用一个类FireFlag来表示这个协作对象:

    子线程应该调用waitForFire()等待枪响,而主线程应该调用fire()发射比赛开始信号。代码如下所示:

     

    5. 等待结束

    在等待结束模式中,主线程与各个子线程协作的共享变量是一个数,这个数表示未结束线程个数。

    应用代码示例如下:

    注意:

    1. 可将线程计数初始值设置为1,由子线程调用await(),主线程调用countDown(),还可实现上面的“同时开始”模式。

     

    7. 集合点

    在集合点模式,协作的共享变量依然是一个数,这个数表示未到集合点的线程个数。

    多个游客线程,各自先独立运行,然后使用该协作对象到达集合点进行同步的示例代码如下:

     

     

    6. 异步结果

    异步结果模式依赖异步调用框架,主要由调用者执行器异步任务异步结果四个部分组成。

    其中异步任务和异步结果代码表示如下:

    执行器用于执行子任务并返回异步结果,使用执行器后调用者就无需创建并管理子线程了,其代码如下:

    调用者只需创建执行器,然后执行异步任务,即可得到异步结果对象。

     

     

    第四节 线程中断

    在Java中,停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,而是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出,本节我们主要就是来理解Java的中断机制。

     

    1. 线程中断的场景

    通过线程的start方法启动一个线程后,线程开始执行run方法,run方法运行结束后线程退出,但有些场景需要对其进行中断:

     

    2. 线程中断相关方法

    每个线程都有一个中断标志位,表示该线程是否被中断,可通过如下方式获取和设置:

    注意:

    1. 线程中还定义了stop()等一些方法,但它们已经过时了,不应该再继续使用。

     

    3. 线程对中断的反应

    线程被中断时,如何进行响应与线程的状态进行的IO操作有关。

    线程状态或IO操作响应
    NEW/TERMINATE状态非存活态,调用 interrupt() 对它没有任何效果,中断标志位也不会被设置
    RUNNABLE状态(未执行IO操作)只会设置线程的中断标志位,没有其它任何作用。
    WAITING/TIMED_WAITING状态抛出InterruptedException,同时清空线程的中断标志位。
    BLOCKED状态
    (如进入synchronized代码块时未获取到锁)
    设置线程的中断标志位,但线程依然会处于BLOCKED状态。
    也就是说,interrupt()并不能使一个在等待锁的线程真正"中断"。
    等待IO操作,且IO通道是可中断的
    (即实现了InterruptibleChannel接口)
    抛出ClosedByInterruptException,同时设置线程的中断标志位。
    线程阻塞于Selector调用设置线程的中断标志位,同时,阻塞的调用会立即返回。

    此外,重点介绍下InputStream的read调用,在流中没有数据时,read会阻塞 ,但线程依然是RUNNABLE状态。如果此时被中断,只会设置线程中断标志,并不能真正“中断”它。

    子线程启动后调用read方法,此时标准输入流无数据,会进行阻塞,虽然主线程设置了中断标记,但子线程并不能响应中断。如果输入一个字符,read方法执行通过,随后进入while循环的中断标志判断逻辑,此时发现被中断才会退出。

     

    4. 如何处理线程中断

    线程中断时,可能抛出InterruptedException异常,也可能只设置中断标识位。

    注意:

    1. BLOCKED状态的线程虽然会被设置中断标识位,但依然保持阻塞状态,无法对其进行处理。

     

    5. 如何取消/关闭线程

    线程中断只是一种协作机制,而不一定会真正"中断"线程,如果不明白线程在做什么,不应该贸然的调用线程的interrupt方法。

    那如何取消或关闭线程呢?一般而言,对于以线程提供服务的程序模块,它应该封装取消或关闭的操作,让调用者能正确关闭线程

    如上节的InterruptReadDemo,可以在子线程中提供如下cancel方法,调用后关闭流(注意,流在关闭时返回-1),并进行中断退出。

    同样的,Java并发库的一些代码也提供了单独的取消/关闭方法:

     

    第02章_并发协作工具

    第一节 原子变量(CAS)

    1. 基础用法

    原子变量可以保证更新操作的原子性,且无需加锁以及上下文切换,效率更高。常用基本类型的原子变量及相关变体如下:

    基本类型原子变量原子数组原子字段更新器
    BooleanAtomicBoolean  
    IntegerAtomicIntegerAtomicIntegerArrayAtomicIntegerFieldUpdater
    LongAtomicLongAtomicLongArrayAtomicLongFieldUpdater
    ReferenceAtomicReferenceAtomicReferenceArrayAtomicReferenceFieldUpdater

     

    1) AtomicInteger

    AtomicInteger可以用作计数器等场景,常用方法如下:

    AtomicBoolean可以用来在程序中表示一个标志位,它的原子操作方法有:

    AtomicLong可以用来在程序中生成唯一序列号,它的方法与AtomicInteger类似。

     

    2) AtomicReference

    AtomicReference用来以原子方式更新引用类型,它有一个类型参数,使用时需要指定引用的类型。以下代码演示了其基本用法:

     

    3) AtomicArray

    原子数组方便以原子的方式更新数组中的每个元素,我们以AtomicIntegerArray为例来简要介绍下。

     

    4) FieldUpdater

    FieldUpdater方便以原子方式更新对象中的字段,字段不需要声明为原子变量,FieldUpdater是基于反射机制实现的,看代码:

     

    2. 实现原理

    原子变量的组合操作都依赖一个特殊的方法,称之为CAS,比较并设置。如AtomicInteger中的CAS方法为:

    它在内部调用了Unsafe类的CAS方法,而该方法依赖底层计算机系统在硬件层次上直接支持的CAS指令

    有了CAS方法后,就可以通过死循环+CAS实现原子更新了,如AtomicInteger的incrementAndGet方法:

     

    3. ABA问题

    使用CAS方式更新有一个ABA问题,指一个线程刚开始看到的值是A,随后使用CAS进行更新,它实际期望的是没有其他线程修改过才更新,但普通的CAS做不到,因为可能在这个过程中,已经有其他线程修改过了,比如先改为了B,然后又改回为了A。

    一般来说,这对业务没啥影响,如果确实需要解决该问题,可以使用AtomicReference的增强类AtomicStampedReference,它在修改值的同时附加一个时间戳(或版本号),只有值和时间戳都相同才进行修改。

    此外,还可以使用AtomicMarkableReference,它多关联了一个boolean类型的标志位,只有值和标志位都相同的情况下才进行修改

     

    4. 面试扩展

    1) 悲观锁 vs 乐观锁

     

    2) CAS vs synchronized

     

     

    第二节 显式锁

    Java并发包中的显式锁接口和类位于包java.util.concurrent.locks下,主要接口和类有:

    本节主要介绍Lock接口和实现类ReentrantLock,关于读写锁,我们后续章节介绍。

     

    1. Lock接口

    显式锁接口Lock的定义为:

    可以看出,相比synchronized,显式锁支持以非阻塞方式获取锁可以响应中断可以限时,这使得它灵活的多。

     

    2. ReentrantLock

    1) 加锁/解锁

    Lock接口的主要实现类是ReentrantLock,它的基本方法lock/unlock实现了与synchronized一样的语义,包括:

     

    2) 避免死锁(tryLock)

    如果以不同顺序获取多个锁,可能造成死锁,可以使用tryLock()以尝试的方式获取锁,如果获取不到,还可以先释放已持有的锁,给其他线程获取锁的机会,然后再重试获取所有锁。

    注意:

    1. 做转账业务时,需在同事务内增加A账户余额并减少B账户余额,要保证AB账户按一致的顺序更新,否则相互转账会死锁。

     

    3) 获取锁信息

    除了实现Lock接口中的方法,ReentrantLock还有一些其他方法,可以获取关于锁的一些信息,这些信息可以用于监控和调试目的,比如:

     

    3. ReentrantLock的实现基础

    1) CAS + LockSupport

    ReentrantLock在最底层依赖于前面介绍的CAS方法,另外,还依赖了LockSupport中的一些方法,它的基本方法有:

    这些park/unpark方法是怎么实现的呢?与CAS方法一样,它们也调用了Unsafe类中的对应方法,Unsafe类最终调用了操作系统的API,从程序员的角度,我们可以认为LockSupport中的这些方法就是基本操作

     

    2) AQS (AbstractQueuedSynchronizer)

    AbstractQueuedSynchronizer是Java提供的一个抽象类,它封装了CAS和LockSupport,简化了并发工具的实现。原理简述如下:

     

    4. ReentrantLock的实现代码

    ReentrantLock内部定义了一个继承自AQS的抽象类Sync,它有两个实现类NonfairSyncFairSync

     

    1) 实现lock方法

    我们先来看下ReentrantLock的lock方法:

    其中tryAcquire必须被子类重写,NonfairSync的实现为:

    如果tryAcquire返回false,即被其它线程锁定,则AQS会调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg),其中addWaiter会新建一个节点Node,代表当前线程,然后加入到内部的等待队列中。

    放入等待队列中后,再调用acquireQueued尝试获得锁,代码为:

    以上就是lock方法的基本过程,能获得锁就立即获得,否则加入等待队列,被唤醒后检查自己是否是第一个等待的线程,如果是且能获得锁,则返回,否则继续等待,这个过程中如果发生了中断,lock会记录中断标志位,但不会提前返回或抛出异常。

     

    2) 实现unlock方法

    ReentrantLock的unlock方法的代码为:

     

    5. 面试扩展

    1) CAS和AQS的区别?

     

    2) 公平锁 vs 非公平锁?

     

    3) Lock vs synchronized

     

     

    第三节 显式条件

    显式条件基于显式锁的 await/signal 线程协作机制,它们之间的关系类似于 synchronized 与 wait/notify 协作机制的关系。

     

    1. Condition接口

    创建显式条件需要通过显式锁,Lock接口定义了创建方法:

    其中Condition为显式条件的接口,它的定义为:

    与wait/notify协作机制类似,await()/signal()也具有如下一些特性:

     

    2. 生产者/消费者模式案例

    生产者/消费者模式存在一个与队列满有关的条件,还存在一个与队列空有关的条件,而在前面通过wait/notify机制实现时,不得不共用同一个条件队列,而使用显式锁,则可以分别创建对应的条件队列。

    这样,代码更为清晰易读,同时避免了不必要的唤醒和检查,提高了效率。

     

    3. 显式条件的实现代码

    1) ConditionObject

    ConditionObject是AQS中定义的一个成员内部类,它可以直接访问AQS中的数据,比如AQS中定义的锁等待队列。它通过显式锁创建:

    它内部也有一个条件等待队列,其成员声明为:

     

    2) await实现分析

     

    3) awaitNanos实现分析

    awaitNanos与await的实现是基本类似的,区别主要是会限定等待的时间,如下所示:

     

    4) signal实现分析

     

     

    第四节 线程本地变量(ThreadLocal)

    1. 基本概念和用法

    线程本地变量指与线程绑定的变量,即每个线程都有同一个变量的独有拷贝,在Java中,用ThreadLocal表示。

     

    2. 使用场景

    1) 线程本地变量

    注意:

    1. 一般来说,ThreadLocal对象都定义为static,以便于引用。

     

    2) 存储上下文信息

    在Web服务器中,一个线程执行用户的请求时,多个方法都会用到请求信息、用户身份信息、数据库连接、当前事务等全局信息,如果作为参数传递则很不方便,这时就可以存放在线程本地变量中。

    在首次获取到信息时,调用set方法进行设置,然后就可以在代码的任意其他地方调用get相关方法进行获取了。

    注意:

    1. Thread内部存在一个 inheritableThreadLocals 变量,可用于跨线程传递 ThreadLocal 的值,但不支持线程池场景。

    2. 阿里开源工具类可以解决上述问题:TransmittableThreadLocal,它自定义了线程类并对线程池进行了装饰。

     

    3. 基本实现原理

    在 Thread 类内部,有一个名为 threadLocalsThreadLocalMap 对象,其 Key 为 ThreadLocal 对象的弱引用,Value 为该变量在该线程的值, 当调用 ThreadLocal 的 get/set 方法时,就是获取当前线程的该变量进行 get/set。

     

    1) set方法实现

    每个线程都有一个ThreadLocalMap,调用set实际上是在线程自己的Map里设置了一个条目,键为当前的ThreadLocal对象,值为value。

     

    2) get方法实现

     

    3) remove方法实现

    注意:

    1. 在使用完 ThreadLocal 变量后,必须调用 remove() 方法清空 ThreadLocalMap 中的相关条目,否则可能导致内存泄漏。

     

    4. ThreadLocal与线程池

    1) 未清理数据

    线程池中的线程是会重用的,如果异步任务使用了ThreadLocal,并且未进行清理操作,会将修改后的值带到了下一个异步任务

    上述代码输出为0 0 1,第1个任务将修改后的结果带到了第3个任务,造成了数据混乱。

     

    2) 解决方法

    第一种方法,在Task的run方法开始处,添加set或remove代码,如下所示:

    第二种方法,将Task的run方法包裹在try/finally中,并在finally语句中调用remove,如下所示:

    第三种方法,扩展线程池ThreadPoolExecutor,重写beforeExecute方法(在线程池将任务交给线程执行之前,会在线程中先调用该方法),通过反射清空所有ThreadLocal变量,这种方式无需修改异步任务的代码,推荐使用。

     

     

    第五节 并发容器

    1. CopyOnWrite的List和Set

    1) 写时复制

    写时复制是指每次修改都创建一个新数组,然后复制所有内容,但如果数组比较大,修改操作又比较频繁,那么性能是非常低的。

    写时复制是解决线程安全问题的一种重要思维,用于各种计算机程序中,比如操作系统内部的进程管理和内存管理等。

    注意:

    1. 解决线程安全问题还可以通过加锁(synchronized或ReentrantLock)循环CAS的方式实现。

    2. 写时复制的特点是写写互斥、读写兼容(和读写锁最大的差别)、读读兼容,只能保证最终一致性,不能保证实时一致性。

     

    2) CopyOnWriteArrayList

    CopyOnWriteArrayList实现了List接口,用法与ArrayList基本一致,主要区别有:

    CopyOnWriteArrayList是基于写时复制机制实现的,内部有一个普通数组,在读的时候,直接访问该数组,在写的时候,则会复制一个新数组,在新数组进行修改操作,修改完后再以原子方式设置内部的数组引用。

    如果读的过程中,发生了写操作,可能内部的数组引用已经被修改了,但不会影响读操作,它依旧访问原数组内容。

    换句话说,数组内容是只读的,写操作都是通过新建数组,然后原子性的修改数组引用来实现的

    在CopyOnWriteArrayList中,读不需要锁,可以并行,读和写也可以并行,但多个线程不能同时写,每个写操作都需要先获取锁。

    下面是一些常用方法的代码实现:

    注意:

    1. 在JDK8以前,CopyOnWriteArrayList返回的迭代器不支持修改相关的操作,如listIterator.set(1)、Collections.sort(list)等。

     

    3) CopyOnWriteArraySet

    CopyOnWriteArraySet实现了Set接口,不包含重复元素,用法非常简单,就不再赘述。它是基于CopyOnWriteArrayList实现的,因此性能比较低,不适用于元素个数特别多的集合。

    下面是一些常用方法的代码实现:

    注意:

    1. Java并发包中没有与HashSet对应的并发容器,但可以使用Collections.newSetFromMap方法基于ConcurrentHashMap构建一个

     

     

    2. ConcurrentHashMap

    1) ConcurrentHashMap

    ConcurrentHashMap是HashMap的并发版本,主要区别如下:

    扩展:

    1. HashMap在并发更新的情况下,链表结构可能形成环(在多个线程同时扩容哈希表的时候),出现死循环,占满CPU。

     

    2) 高并发性

    ConcurrentHashMap是如何实现高并发的呢?一般的同步容器使用synchronized,所有方法都竞争同一个锁。而ConcurrentHashMap采用分段锁技术,将数据分为多个段,而每个段有一个独立的锁,每一个段相当于一个独立的哈希表,分段的依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值映射到段,再在段对应的哈希表上进行操作。

    采用分段锁,可以大大提高并发度,多个段之间可以并行读写。默认情况下,段是16个,可通过构造方法进行设置:

    在对每个段的数据进行读写时,ConcurrentHashMap也不是简单的使用锁进行同步,内部使用了CAS,对一些写采用原子方式,实现比较复杂,我们就不介绍了,实现的效果是,对于写操作,需要获取锁,不能并行,但是读操作可以,多个读可以并行,写的同时也可以读,这使得ConcurrentHashMap的并行度远远大于同步容器。

    注意:

    1. ConcurrentHashMap在JDK1.7采用分段锁Segment 来保证安全,在JDK1.8及之后使用Node + CAS + synchronized 保证线程安全,锁粒度更细,synchronized 只锁定当前链表或红黑二叉树的首节点。

     

    3) 弱一致性

    ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素,但在遍历过程中,内部元素可能会发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来,而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性

    类似的情况还会出现在ConcurrentHashMap的另一个方法:

    该方法并非原子操作,而是调用put方法逐个元素进行添加的,在该方法没有结束的时候,部分修改效果就会体现出来。

     

     

    3. 基于SkipList的Map和Set

    1) SkipList

    SkipList称为跳表跳跃表,是一种基于有序链表多级索引数据结构,使其更易于实现高效并发算法。

    下面是一个包含3, 6, 7, 9, 12, 17, 19, 21, 25, 26元素的跳表结构,两条线展示了查找值19和8的过程:

    img

     

    2) ConcurrentSkipListMap

    ConcurrentSkipListMap实现了ConcurrentMapSortedMap等接口,默认按键自然有序,可以传递比较器自定义排序,用法与TreeMap类似,主要区别如下:

    注意:

    1. ConcurrentSkipListMap的size方法不是常量操作,需要遍历所有元素,且遍历结束后size可能已改变,因此一般用处不大。

     

    3) ConcurrentSkipListSet

    TreeSet是基于TreeMap实现的,与此类似,ConcurrentSkipListSet也是基于ConcurrentSkipListMap实现的。

     

     

    5. 并发队列

    Java并发包中提供了丰富的队列类,简单列举如下:

    队列类型队列类名
    无锁非阻塞并发队列ConcurrentLinkedQueue、ConcurrentLinkedDeque
    普通阻塞队列ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeque
    优先级阻塞队列PriorityBlockingQueue
    延时阻塞队列DelayQueue
    其他阻塞队列SynchronousQueue、LinkedTransferQueue

    这些队列迭代都不会抛出ConcurrentModificationException,都是弱一致的,后面就不单独强调了。

    注意:

    1. 无锁非阻塞是指这些队列不使用锁,所有操作总是可以立即执行,主要通过循环CAS实现并发安全。

    2. 阻塞队列是指使用锁和条件,很多操作都需要先获取锁或满足特定条件,获取不到锁或等待条件时,会等待(即阻塞)。

     

    1) 无锁非阻塞并发队列
    队列类名实现接口数据结构基本特性
    ConcurrentLinkedQueueQueue(先进先出队列,尾进头出)单向链表无界(没有限制大小)、szie方法需遍历
    ConcurrentLinkedDequeDeque(双端队列)双向链表无界(没有限制大小)、szie方法需遍历

    这两个类最基础的原理是循环CAS,ConcurrentLinkedQueue的算法基于一篇论文Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms,ConcurrentLinkedDeque扩展了ConcurrentLinkedQueue的技术,具体实现都非常复杂,我们就不探讨了。

     

    2) 普通阻塞队列

    除了刚介绍的两个队列,其他队列都是阻塞队列,都实现了接口BlockingQueue,在入队/出队时可能等待:

    队列类名实现接口数据结构基本特性
    ArrayBlockingQueueQueue(先进先出队列,尾进头出)循环数组有界(创建时指定大小)
    LinkedBlockingQueueQueue(先进先出队列,尾进头出)单向链表默认为无界,可以在创建时指定最大长度
    LinkedBlockingDequeDeque(双端队列)双向链表默认为无界,可以在创建时指定最大长度

    主要方法有,可用于生产者/消费者模式等:

    注意:

    1. ArrayBlockingQueue的大小在运行过程中不会改变,而同样基于循环数组实现的ArrayDeque是无界的,会自动扩展。

    上述普通阻塞式队列都是使用显式锁ReentrantLock显式条件Condition实现的。

     

    3) 优先级阻塞队列
    队列类名实现接口数据结构基本特性
    PriorityBlockingQueueBlockingQueue(队列为空时,take阻塞)堆(要求元素可比较)无界(没有限制大小),优先级高的先出队

    PriorityBlockingQueue使用了一个锁ReentrantLock保护所有访问,使用了一个条件协调阻塞等待。

     

    4) 延时阻塞队列
    队列类名实现接口数据结构基本特性
    DelayQueue<E extends Delayed>BlockingQueue堆(要求元素可比较)无界(没有限制大小),按元素的延时时间出队

    DelayQueue是一种特殊的优先级队列要求每个元素都实现Delayed接口,该接口的声明为:

    只有元素的延迟时间到期后,才能被取走,也就是说,take方法总是返回第一个过期的元素,如果没有,则阻塞等待。

    DelayQueue内部是基于PriorityQueue实现的,它使用一个锁ReentrantLock保护所有访问,使用一个条件available表示头部是否有元素,当头部元素的延时未到时,take操作会根据延时计算需睡眠的时间,然后睡眠,如果在此过程中有新的元素入队,且成为头部元素,则阻塞睡眠的线程会被提前唤醒然后重新检查。以上是基本思路,DelayQueue的实现有一些优化,以减少不必要的唤醒,具体我们就不探讨了。

     

    5) 实时队列

    SynchronousQueue不是传统意义上的存储队列,它不存储元素,它的入队操作要等待另一个线程的出队操作,反之亦然。

    如果没有其他线程在等待从队列中接收元素,put操作就会等待,take操作需要等待其他线程往队列中放元素,如果没有,也会等待。

    适用于两个线程之间直接传递信息、事件或任务

     

    6) 传递队列

    LinkedTransferQueue实现了TransferQueue接口,TransferQueue是BlockingQueue的子接口,但增加了一些额外功能,生产者在往队列中放元素时,可以等待消费者接收后再返回适用于一些消息传递类型的应用中。TransferQueue的接口定义为:

    LinkedTransferQueue是基于链表实现的,无界的TransferQueue,具体实现比较复杂,我们就不探讨了。

     

     

    6. 扩展知识

    1) JUC中的容器都是无锁的吗?

     

     

    第六节 其它协作工具

    在一些特定的同步协作场景中,相比使用最基本的wait/notify或显示锁/条件,使用并发同步协作工具更为方便,效率更高。

     

    1. ReentrantReadWriteLock

    1) 基本概念

    前面介绍的synchronized锁或ReentrantLock锁,不区分读操作或写操作,在访问前都需要获取锁。但在一些场景中,多个线程的读操作完全可以并行,在读多写少时,可以明显提高性能,这就是读写锁。

    在Java并发包中,接口ReadWriteLock表示读写锁,主要实现类是可重入读写锁ReentrantReadWriteLock

     

    2) 基本示例

     

    3) 基本原理

    注意:

    1. 同一个线程,在获取写锁后可以再获取读锁,但在获取读锁后,不能升级为写锁

     

    4) StampedLock

    StampedLock (基于 CLH 锁实现)是对 ReentrantReadWriteLock 的改进,额外支持乐观读,且性能更好,写线程更容易获得锁,但它不支持重入条件变量,代码也相对复杂。

     

     

    2. 信号量(Semaphore)

    1) 基本概念

    前面介绍的锁一般只允许一个线程同时访问资源,但是部分场景需要允许指定数目的线程访问,类似于限制最大并发数

    信号量类Semaphore就是用来解决这类问题的,它可以限制对资源的并发访问数:

     

    2) 基本示例

    注意:

    1. Semaphore是不可重入的,即使在同一个线程,每一次的acquire调用都会消耗一个许可。

    2. 一般锁只能由持有锁的线程释放,而Semaphore表示的只是一个许可数,任意线程都可以调用其release方法

    3. 因此,即使将permits设置为1,它和一般的锁还是有本质的不同。

     

    3) 基本原理

    Semaphore的基本原理比较简单,也是基于AQS实现的,permits表示共享的锁个数,acquire方法就是检查锁个数是否大于0,大于则减一,获取成功,否则就等待,release就是将锁个数加一,唤醒第一个等待的线程。

     

     

    3. 倒计时门栓(CountDownLatch)

    1) 基本概念

    倒计时门栓CountDownLatch类似于一次性开关,一开始是关闭的,所有希望通过该门的线程都需要等待。它有一个倒计时,当变为0后,门栓打开,等待的所有线程都可以通过,并且开关是一次性的,打开后就不能再关上了。

     

    2) 基本示例

    前面介绍过门栓的两种应用场景,一种是同时开始,另一种是等待结束,它们都有两类线程,互相需要同步,重写代码如下:

    注意:

    1. countDown的调用应该放到finally语句中,确保在工作线程发生异常的情况下也会被调用,使主线程能够从await调用中返回。

     

     

    4. 循环栅栏(CyclicBarrier)

    1)基本概念

    循环栅栏CyclicBarrier类似于一个集合点,只有全部线程都到达后才能进行下一步,它是循环的,可以用作重复的同步,特别适用于并行迭代计算,每个线程负责一部分计算,然后集合点等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

    注意:

    1. 只要有一个线程抛出BrokenBarrierException,就会导致所有在调用await的线程都抛出BrokenBarrierException。

    2. 此外,如果栅栏动作(集合点动作)抛出了异常,也会破坏栅栏。

     

    2) 基本示例

     

    3) 对比CountDownLatch

    CyclicBarrier与CountDownLatch可能容易混淆,它们主要有两点区别:

     

    5. 面试扩展

    1) 常用并发协作工具都有哪些应用场景?

     

     

    第03章_异步执行服务

    第一节 异步任务执行服务

    异步任务执行服务是一套框架,将要执行的并发任务线程的管理相分离。对于使用者,只需关注任务本身,如提交任务、获取结果、取消任务等,而由服务提供者执行任务,如创建线程、任务调度、关闭线程等,大大简化了并发业务的开发。

     

    1. 基本接口

    1) Runnable和Callable

    RunnableCallable表示要执行的异步任务,前者没有返回结果,且不可以抛异常,而 Callable 有返回结果,也允许抛异常

     

    2) Executor和ExecutorService

    Executor表示最简单的执行服务(执行器),可以执行一个Runnable,没有返回结果。

    注意:

    1. Executor接口并没有规定如何执行任务,它可以创建新的线程,复用线程池中的线程或在调用者线程执行。

     

    ExecutorService扩展了Executor,支持提交Runnable及Callable等多种类型的异步任务,并用Future封装返回结果。

     

    3) Future

    Future表示异步任务的结果,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。

    关于Futrue的get方法,在调用后有四种可能情形:

    注意:

    1. Future 实现类的对象内部封装了“执行线程”对象和相关的“锁”对象,因此调用者线程才能与执行线程相互协作,并做到"任务的提交"与"任务的执行"相分离。

     

     

    2. ExecutorService的使用用法

    1) 基本用法

     

    2) invokeAll示例

     

     

    3. ExecutorService的实现原理

    ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,关于线程池我们下节再介绍。ExecutorService有一个抽象实现类AbstractExecutorService,本节,我们简要分析其原理,并基于它实现一个简单的ExecutorService。

     

    1) AbstractExecutorService

    ExecutorService最基本的方法是submit,它的实现代码如下:

     

    2) FutureTask

    调用 ExecutorService 的 submit 方法,就会返回一个FutureTask,它实现了 RunnableFuture 接口,既可看作可执行任务:Runnable,也可看做异步结果:Future

    内部变量如下:

    在构造时,初始化待执行任务callable和任务状态state:

    任务执行服务会使用一个线程执行FutureTask的run方法,run()代码为:

    对于任务提交者,它通过FutureTask的get方法获取结果,限时等待的get方法代码如下:

    其中report根据状态返回结果或抛出异常:

    FutureTask的cancel方法可以取消任务,代码如下:

     

    3) invokeAll和invokeAny

    AbstractExecutorService的invokeAll,实现逻辑很简单,对每个任务,创建一个FutureTask,并调用execute执行,然后等待所有任务结束。

    invokeAny的实现稍微复杂些,它利用了ExecutorCompletionService,关于这个类及invokeAny的实现,我们后续章节再介绍。

     

    4. 面试扩展

    1) Runnable vs Future vs FutureTask

     

    2) execute() vs submit()

     

     

    第二节 线程池服务

    线程池实现资源共享的一种方式,主要由任务队列工作线程两个概念组成,它可以重用线程,减少线程创建的开销。

     

    1. 线程池的基本使用

    1) 创建线程池

    Java中的线程池实现类为ThreadPoolExecutor<-AbstractExecutorService<-ExecutorService<-Executor,构造方法如下:

     

    2) 获取线程池信息

     

    3) 配置线程池大小

    除了在创建时配置线程池大小外,还可以通过getter/setter方法获取和设置线程池大小。

     

    4) 选择任务队列

    ThreadPoolExecutor要求的队列类型必须是阻塞队列BlockingQueue,可以是:

    注意:

    1. 如果使用无界队列,则线程个数最大只能达到corePoolSize,设置maximumPoolSize参数将无意义。

     

    5) 配置任务拒绝策略

    线程池一般使用有界队列maximumPoolSize是有限的,当两者都达到上限时,就会拒绝任务的提交(execute/submit/invokeAll),可以通过构造方法或setter方法设置拒绝策略

    ThreadPoolExecutor内部实现了四种拒绝策略可供选择:

    注意:

    1. 拒绝策略只有在队列有界,且最大线程数有限的情况下才会触发,让拒绝策略有机会执行对保证系统稳定非常重要。

    2. 在任务量非常大的场景中,如果队列无界,可能会导致请求处理队列积压过多任务,消费非常大的内存;如果队列有界但不限制最大线程数,可能会创建过多的线程,占满CPU和内存。

    3. 如需保证任务不被丢弃,则必须使用CallerRunsPolicy或进行任务持久化存储(自定义拒绝策略/实现混合阻塞队列)。

     

    6) 自定义线程工厂

    线程工厂可以对创建的线程进行一些配置,如设置线程名称等:

    默认实现类为Executors类中的静态内部类DefaultThreadFactory,它主要就是创建一个线程,给线程设置一个名称( pool-<线程池编号>-thread-<线程编号>),设置daemon属性为false,设置线程优先级为标准默认优先级等。

     

    7) 线程池的其他配置

     

    2. 线程池工厂类

    线程池工厂类Executors提供了一些静态工厂方法,可以方便的创建一些预配置的线程池,主要方法有:

    注意:

    1. 在系统负载可能极高的情况下,FixedThreadPool 的问题是队列过长,而 CachedThreadPool 的问题是线程过多。

    2. 一般情况下,建议自定义 ThreadPoolExecutor,使用有界队列,控制线程创建数量

     

    3. 线程池的死锁

    如果提交的任务之间存在依赖,在线程池打满时,可能会出现死锁。

    上述问题可以将线程池类型替换为newCachedThreadPool来解决,让创建的线程不再受限。

    另外,创建线程池时使用SynchronousQueue队列也可解决上述问题:

    对于普通队列,入队只是把任务放到了队列中,而对于SynchronousQueue来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到maximumPoolSize,如果达到了maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。

     

    4. 面试扩展

    1) 线程池有哪些核心参数?

     

    2) 线程池处理任务的流程?

    图解线程池实现原理

     

    3) 线程空闲时在干嘛?

     

    4) 线程池线程抛出异常怎么处理?

     

    5) 线程池线程数设置多少合适?

     

    6) 线程池最佳实践?

     

     

    第三节 CompletionService

    CompletionService是 Executor 的装饰器,适用于提交多个异步任务,然后按完成顺序逐个处理任务结果的场景,不同于invokeAll,它无需等待所有异步任务全部完成才开始处理

     

    1. 基本用法

    1) 接口

     

    2) 实现类

    主要实现类为ExecutorCompletionService,它对Executor进行了装饰,并额外添加了一个阻塞队列来负责结果的排队和处理

     

    3) 基本示例

    前面通过ExecutorService的invokeAll方法实现并发下载并分析URL标题的示例中,必须等到所有任务都完成才开始处理结果,而使用CompletionServiceDemo可以在第一个任务完成后就开始处理结果

     

     

    2. 实现原理

    ExecutorCompletionService是如何实现结果实时按序处理的呢?它主要依赖内部的一个阻塞队列以及重写了FutureTask的done()方法

    ExecutorCompletionService的take/poll方法就可以从该队列中获取已结束任务的结果,如下所示:

     

     

    3. 实现invokeAny

    通过ExecutorCompletionService,可以实现invokeAny方法,基本思路是:在提交任务后,通过take方法获取结果,获取到第一个有效结果后,取消所有其他任务。

     

     

    第四节 定时任务

    1. 应用场景

    定时任务的应用场景是非常多的,比如:

    在Java中,可以使用java.util包中的TimerTimerTask实现,也可以使用并发包中的ScheduledExecutorService实现。

    注意:

    1. 上述两者都不能胜任复杂的定时任务调度,如每周一和周三晚上18:00到22:00,每半小时执行一次。 对于类似这种需求,可以使用更为强大的第三方类库,比如Quartz(http://www.quartz-scheduler.org/)。

     

    2. Timer和TimerTask

    1) 基本用法

    TimerTask表示一个定时任务,它是一个抽象类,实现了Runnable接口,具体的定时任务需要继承该类,实现run方法。

    Timer表示一个定时器,负责定时任务的调度和执行,它有如下主要方法:

     

    2) 基本原理

    Timer内部主要由任务队列Timer线程两部分组成。

    Timer线程主体是一个循环,从队列中拿任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它,如果队列中没有任务或第一个任务延时还没到,就睡眠。如果睡眠过程中队列上添加了新任务且新任务是第一个任务,Timer线程会被唤醒,重新进行检查。

    在执行任务之前,Timer线程判断任务是否为周期任务,如果是,就设置下次执行的时间并添加到优先级队列中,对于固定延时的任务,下次执行时间为当前时间加上period,对于固定频率的任务,下次执行时间为上次计划执行时间加上period。

    需要强调是,下次任务的计划是在执行当前任务之前就做出了的,对于固定延时的任务,延时相对的是任务执行前的当前时间,而不是任务执行后,这与后面讲到的ScheduledExecutorService的固定延时计算方法是不同的,后者的计算方法更合乎一般的期望。

    另一方面,对于固定频率的任务,它总是基于最先的计划计划的,所以,很有可能会出现前面例子中一下子执行很多次任务的情况。

     

    3) 基本示例

     

    4) 注意事项

    一个Timer对象只有一个Timer线程,这意味着,定时任务不能耗时太长,更不能是无限循环,看个例子:

    Timer线程在执行任何一个任务的run方法时,一旦run抛出异常,Timer线程就会退出,从而所有定时任务都会被取消

    所以,如果希望各个定时任务不互相干扰,一定要在run方法内捕获所有异常

     

     

    2. ScheduledExecutorService

    1) 接口

    由于Timer/TimerTask的一些问题,Java并发包引入了ScheduledExecutorService,它是一个接口:

    返回类型都是ScheduledFuture,它也是一个接口,实现了Future和Delayed,没有定义额外方法。

    注意:

    1. 与Timer不同,ScheduledExecutorService不支持以绝对时间作为首次运行的时间。

     

    2) 实现类

    主要实现类为ScheduledThreadPoolExecutor,它是线程池ThreadPoolExecutor的子类,是基于线程池实现的,构造方法如下:

    参数含义与ThreadPoolExecutor一样,我们就不赘述了。

    工厂类Executors也提供了一些方便的方法,以方便创建ScheduledThreadPoolExecutor,如下所示:

    注意:

    1. 它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使corePoolSize设为0,它也会至少运行一个线程。

     

    3) 基本示例

    由于可以有多个线程执行定时任务,一般任务就不会被某个长时间运行的任务所延迟了,比如,对于前面的TimerFixedDelay,如果改为:

    再次执行,第二个任务就不会被第一个任务延迟了。

    另外,与Timer不同,单个定时任务的异常不会再导致整个定时任务被取消了,即使背后只有一个线程执行任务,我们看个例子:

    TaskA和TaskB都是每秒执行一次,TaskB两秒后执行,但一执行就抛出异常,屏幕的输出类似如下:

    这说明,定时任务TaskB被取消了,但TaskA不受影响,即使它们是由同一个线程执行的。不过,需要强调的是,与Timer不同,没有异常被抛出来,TaskB的异常没有在任何地方体现。所以,与Timer中的任务类似,应该捕获所有异常

     

    3) 对比Timer/TimerTask

    ScheduledThreadPoolExecutor的实现思路与Timer基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,主要不同有:

     

     

    第五节 CompletableFuture

    CompletableFuture用于对异步任务进行编排组合,它实现了Future接口表示一个异步结果,又实现了CompletionStage表示一个计算阶段。

     

    1. 基本使用

    1) 创建和执行

    CompletableFuture 可通过 new 关键字或静态工厂方法来创建和执行。

    下面是一些简单示例:

    注意:

    1. Async后缀的函数表示该任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的,下同。

    2. executor参数表示执行该任务的线程池,如果省略,将使用ThreadPerTaskExecutor(单核cpu)或ForkJoinPool.commonPool()执行。

     

    2) 完成和取消任务

     

    3) 获取任务状态

     

    4) 获取任务结果

     

    5) 任务完成回调

    任务完成回调方法可以接收前一个任务正常结束时的结果值,或前面链路中的任务异常结束时的异常(抛出异常后,链路中的后续任务将不会继续执行),无返回值,不会改变原结果或覆盖原异常

    注意:

    1. 同步处理函数(不带Async)被注册后,如果任务线程未结束,将会使用任务线程执行,如果已结束,将会由当前注册线程执行。

    2. 注册的异常处理逻辑对前面链路中的任务都生效,可参考下方“顺序任务流”案例。

     

    6)结果或异常处理

    结果或异常处理方法也可以接收正常结束时的结果值,或异常结束时的异常,但可以修改任务结果,且会覆盖原异常

     

    2.构建任务流

    1) 依赖前一阶段正常完成

    下面一些方法可以用来构建依赖单一阶段的任务流,当前一个阶段正常完成时,自动触发所有依赖该阶段的下一阶段任务,如果前一个阶段发生了异常,所有后续阶段都不会执行,结果会被设为相同的异常,调用join会抛出运行时异常CompletionException

    简单示例如下:

    注意:

    1. 以run、accept、apply开头的方法,参数类型一般为Runnable、Consumer、Function类型。

    2. 在调用 thenXxx 时,如果前一阶段任务已经结束,那么将会在调用者线程执行子任务,可以使用 thenXxxAsync 避免该情形。

     

    2) 依赖前两阶段都正常完成

    当一个阶段正常完成,且指定的另一个阶段也正常完成时,才触发下一阶段。注意,这两个阶段可以并行执行,并且没有依赖关系。

     

    3) 依赖前两阶段之一正常完成

    当前阶段和指定的另一个阶段,只要其中一个正常完成,就会启动下一阶段任务。

     

    4) 构建依赖多个阶段的任务流

    如果依赖的阶段不止两个,可以使用如下静态方法,基于多个CompletableFuture构建了一个新的CompletableFuture。

     

    3. 其它方法

     

    4. 面试扩展

    1) 在使用 CompletableFuture 的时候为什么要自定义线程池?

     

     

    第六节 Fork/Join框架

    1. Fork/Join框架简介

    Fork/Join框架是一个并行任务执行框架,它把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果,适用于多核环境的可分割计算再合并结果的计算密集型任务。

    img

    注意:

    1. ForkJoinPool不是为了替代ExecutorService,而是它的补充,在某些应用场景下(计算密集型任务)性能比ExecutorService更好。

    2. ForkJoinPool主要用于实现分而治之的算法,特别是分治之后递归调用的函数,例如QuickSort等;

    3. ForkJoinPool的宗旨是使用少量的线程来处理大量的任务,多个线程获取到多个处理器的时间分片,并行的执行子任务。

     

     

    2. Fork/Join框架原理

    Fork/Join框架其实就是指由ForkJoinPool作为线程池、ForkJoinTask作为异步任务、ForkJoinWorkerThread作为执行任务的线程这三者构成的任务调度机制

     

    1) ForkJoinPool

    ForkJoinPool是ExecutorService的一个实现,用于管理工作线程,以及提供获取线程池状态和性能信息的相关方法。

    ForkJoinPool中的每个工作线程都有自己的双端队列(WorkQueue)用于存储任务(ForkJoinTask),并且使用了一种名为工作窃取(work-stealing)算法来平衡线程的工作负载。

    注意:

    1. 工作窃取(work-stealing)算法指空闲的线程试图从繁忙线程的队列中(队首)窃取任务。

     

    2) ForkJoinWorkerThread

    ForkJoinWorkerThread直接继承了Thread,是被ForkJoinPool管理的工作线程,由它来执行ForkJoinTask。

     

    3) ForkJoinTask<V>

    ForkJoinTask抽象类表示一个可分割计算再合并结果的异步任务,常用方法如下:

    它有两个子抽象类RecursiveActionRecursiveTask,分别表示无返回结果和有返回结果的异步任务,我们一般使用它们。

     

     

    3. Fork/Join的使用

    注意:

    1. 如果拆分逻辑比计算逻辑还要复杂时,ForkJoinPool并不会带来性能的提升,反而可能会起到负面作用。

     

    第04章 扩展补充

    第一节 并发编程总结

    1. 线程安全问题

    线程表示一条单独的执行流,有各自的计数器和栈等,但是内存是共享的,并发操作就会存在竞争内存可见性问题,解决思路如下:

     

    2. 线程协作机制

    线程之间需要相互协作,来解决业务问题:

     

    3. 线程安全容器

    线程安全的容器有两类,一类是同步容器,另一类是并发容器

     

    1) 同步容器

    Collections类中有一些静态方法,可以基于普通容器返回线程安全的同步容器,比如:

    同步容器是通过对所有方法都加上synchronized来实现的,不仅性能较低,不支持复合操作,而且可能会抛出并发修改异常。

     

    2) 并发容器

     

     

    4. 异步执行服务

    异步执行服务将"任务的提交"和"任务的执行"相分离,主要涉及以下接口:

    对于任务提交者而言,只需要通过 ExecutorService 提交任务,通过Future操作任务和获取结果即可,而不需要关注任务执行的细节,如线程创建、任务调度、线程关闭等。

    ExecutorService 的主要实现有如下:

     

    第二节 面试补充

    1. Java内存模型(JMM)

    1) JMM简介

    Java内存模型(Java Memory Model,简称JMM)是Java虚拟机定义的一种规范,用于描述多线程并发访问共享内存时的行为,即线程如何和主内存本地内存进行交互。

    JMM(Java 内存模型)

    交互原则:

     

    2) 八种内存交互操作

     

    3) 与Java内存区域对比