那曲市网站建设_网站建设公司_模板建站_seo优化
2026/1/16 5:46:53 网站建设 项目流程

📖目录

  • 1. 为什么需要Phaser和Condition?
  • 2. Phaser:动态阶段同步的智能调度系统
    • 2.1 Phaser的核心概念
    • 2.2 Phaser与CyclicBarrier的对比
    • 2.3 Phaser的典型应用场景
  • 3. Condition:线程的"个人等待区"
    • 3.1 Condition的核心概念
    • 3.2 Condition与wait/notify的对比
  • 4. 实战示例
    • 示例1:Phaser基础用法(模拟快递分拣流程)
    • 示例2:Phaser动态添加线程(快递中心临时增派人手)
    • 示例3:Condition实现生产者-消费者模型(快递仓库的进销存)
    • 示例4:Phaser与Condition结合使用(快递中心的多阶段运营)
  • 5. Phaser与Condition的深度对比
  • 6. 最佳实践与常见陷阱
    • 6.1 Phaser最佳实践
    • 6.2 Condition最佳实践
    • 6.3 常见陷阱
  • 7. 结语
  • 8. 经典书籍推荐
  • 9. 参考链接
  • 10. 往期博客回顾

1. 为什么需要Phaser和Condition?

在多线程编程中,我们常常需要控制多个线程的执行顺序和同步点。想象一下你是一个快递分拣中心的经理,需要管理多个分拣员(线程)在不同阶段(分拣、打包、装车)的工作。传统的同步工具就像一个固定的计时器,一旦设定就无法更改。而Phaser和Condition则像是一个智能调度系统,可以根据实际情况动态调整。

  • Phaser:就像一个智能分拣调度系统,可以动态添加/移除分拣员,每个分拣员可以在不同阶段等待。
  • Condition:就像一个分拣员的个人等待区,每个分拣员可以等待特定条件(如"包裹数量达到10个")。

2. Phaser:动态阶段同步的智能调度系统

Phaser是JDK 7引入的并发工具,用于管理多个线程在不同阶段的同步。相比CyclicBarrier,Phaser更加灵活,可以动态添加和移除线程,可以设置阶段。

2.1 Phaser的核心概念

  • 阶段(Phase):Phaser将整个任务分为多个阶段,每个阶段有特定的同步点。
  • 注册(Register):线程需要注册到Phaser,才能参与同步。
  • 到达(Arrive):线程到达同步点。
  • 等待(Await):线程等待其他线程到达同步点。
  • 终止(Termination):当所有线程都到达并离开,Phaser进入下一个阶段。

2.2 Phaser与CyclicBarrier的对比

特性CyclicBarrierPhaser
阶段管理固定阶段动态阶段
线程注册无法动态添加可以动态添加/移除
重用性可重用可重用
灵活性

2.3 Phaser的典型应用场景

  • 多阶段任务处理(如数据处理、计算)
  • 动态调整线程池大小
  • 需要多个同步点的复杂流程

3. Condition:线程的"个人等待区"

Condition是Lock接口的一部分,用于替代Object的wait/notify机制。它提供了更细粒度的线程同步控制,允许一个Lock有多个等待队列。

3.1 Condition的核心概念

  • 等待队列(Wait Queue):每个Condition有一个独立的等待队列。
  • await():线程等待特定条件。
  • signal():唤醒一个等待的线程。
  • signalAll():唤醒所有等待的线程。

3.2 Condition与wait/notify的对比

特性wait/notifyCondition
锁关联与对象锁关联与Lock关联
等待队列一个多个
灵活性
适用场景简单同步复杂同步

4. 实战示例

示例1:Phaser基础用法(模拟快递分拣流程)

importjava.util.concurrent.Phaser;publicclassPhaserBasicExample{publicstaticvoidmain(String[]args){intparties=3;Phaserphaser=newPhaser(parties);for(inti=0;i<parties;i++){newThread(()->{System.out.println(Thread.currentThread().getName()+" is arriving at phase 0 (分拣开始)");phaser.arriveAndAwaitAdvance();// 到达并等待其他线程System.out.println(Thread.currentThread().getName()+" is at phase 1 (打包开始)");phaser.arriveAndAwaitAdvance();// 到达并等待其他线程System.out.println(Thread.currentThread().getName()+" is at phase 2 (装车完成)");}).start();}// 模拟主线程等待try{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}}}

执行结果:

Thread-0 is arriving at phase 0 (分拣开始) Thread-1 is arriving at phase 0 (分拣开始) Thread-2 is arriving at phase 0 (分拣开始) Thread-2 is at phase 1 (打包开始) Thread-1 is at phase 1 (打包开始) Thread-0 is at phase 1 (打包开始) Thread-0 is at phase 2 (装车完成) Thread-1 is at phase 2 (装车完成) Thread-2 is at phase 2 (装车完成)

示例2:Phaser动态添加线程(快递中心临时增派人手)

importjava.util.concurrent.Phaser;publicclassPhaserDynamicExample{publicstaticvoidmain(String[]args){Phaserphaser=newPhaser();// 启动第一个分拣员(先注册)newThread(()->{phaser.register();// 注册当前线程System.out.println(Thread.currentThread().getName()+" is arriving at phase 0 (分拣开始)");// 等待500毫秒,确保第二个线程也注册了try{// 这里要等得足够长,故可以适当调大Thread.sleep(5000);}catch(InterruptedExceptione){e.printStackTrace();}phaser.arriveAndAwaitAdvance();// 等待其他线程System.out.println(Thread.currentThread().getName()+" is at phase 1 (打包开始)");phaser.arriveAndAwaitAdvance();// 等待其他线程System.out.println(Thread.currentThread().getName()+" is done (装车完成)");}).start();// 等待500毫秒,让第一个线程等待try{Thread.sleep(500);}catch(InterruptedExceptione){e.printStackTrace();}// 动态添加第二个分拣员(注册并启动)newThread(()->{phaser.register();// 注册当前线程System.out.println(Thread.currentThread().getName()+" is arriving at phase 0 (分拣开始)");phaser.arriveAndAwaitAdvance();// 等待其他线程System.out.println(Thread.currentThread().getName()+" is at phase 1 (打包开始)");phaser.arriveAndAwaitAdvance();// 等待其他线程System.out.println(Thread.currentThread().getName()+" is done (装车完成)");}).start();// 等待所有线程完成phaser.awaitAdvance(0);}}

执行结果:

Thread-0 is arriving at phase 0 (分拣开始) Thread-1 is arriving at phase 0 (分拣开始) Thread-0 is at phase 1 (打包开始) Thread-1 is at phase 1 (打包开始) Thread-0 is done (装车完成) Thread-1 is done (装车完成)

示例3:Condition实现生产者-消费者模型(快递仓库的进销存)

importjava.util.LinkedList;importjava.util.Queue;importjava.util.concurrent.locks.Condition;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;publicclassConditionProducerConsumerExample{privatestaticfinalintMAX_CAPACITY=5;privatestaticfinalQueue<Integer>warehouse=newLinkedList<>();privatestaticfinalLocklock=newReentrantLock();privatestaticfinalConditionnotFull=lock.newCondition();privatestaticfinalConditionnotEmpty=lock.newCondition();publicstaticvoidmain(String[]args){newThread(()->{for(inti=0;i<10;i++){produce(i);}},"快递员-生产").start();newThread(()->{for(inti=0;i<10;i++){consume();}},"快递员-配送").start();}privatestaticvoidproduce(intitem){lock.lock();try{while(warehouse.size()==MAX_CAPACITY){System.out.println("仓库已满,等待空间...");notFull.await();// 等待仓库有空位}warehouse.offer(item);System.out.println("【生产】入库: 包裹ID "+item);notEmpty.signal();// 通知配送员有新包裹}catch(InterruptedExceptione){Thread.currentThread().interrupt();}finally{lock.unlock();}}privatestaticvoidconsume(){lock.lock();try{while(warehouse.isEmpty()){System.out.println("仓库为空,等待新包裹...");notEmpty.await();// 等待仓库有包裹}Integeritem=warehouse.poll();System.out.println("【配送】出库: 包裹ID "+item);notFull.signal();// 通知生产者可以继续生产}catch(InterruptedExceptione){thrownewRuntimeException(e);}finally{lock.unlock();}}}

执行结果:

【生产】入库: 包裹ID 0 【配送】出库: 包裹ID 0 仓库为空,等待新包裹... 【生产】入库: 包裹ID 1 【生产】入库: 包裹ID 2 【生产】入库: 包裹ID 3 【生产】入库: 包裹ID 4 【生产】入库: 包裹ID 5 仓库已满,等待空间... 【配送】出库: 包裹ID 1 【配送】出库: 包裹ID 2 【配送】出库: 包裹ID 3 【配送】出库: 包裹ID 4 【配送】出库: 包裹ID 5 仓库为空,等待新包裹... 【生产】入库: 包裹ID 6 【生产】入库: 包裹ID 7 【生产】入库: 包裹ID 8 【生产】入库: 包裹ID 9 【配送】出库: 包裹ID 6 【配送】出库: 包裹ID 7 【配送】出库: 包裹ID 8 【配送】出库: 包裹ID 9

示例4:Phaser与Condition结合使用(快递中心的多阶段运营)

importjava.util.concurrent.Phaser;importjava.util.concurrent.locks.Condition;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;publicclassCorrectPhaserConditionExample{privatestaticfinalintSTAGES=3;privatestaticfinalintWORKERS=2;privatestaticfinalPhaserphaser=newPhaser(WORKERS);privatestaticfinalLocklock=newReentrantLock();privatestaticfinalConditioncondition=lock.newCondition();privatestaticvolatilebooleanisApproved=false;publicstaticvoidmain(String[]args){// 启动工作线程for(inti=1;i<=WORKERS;i++){intfinalI=i;newThread(()->workerTask(finalI),"快递员-"+i).start();}// 独立协调线程(关键改进!)newThread(()->{for(intstage=0;stage<STAGES;stage++){System.out.println("\n【协调线程】等待所有快递员完成阶段 "+(stage+1)+"...");phaser.awaitAdvance(stage);// 等待所有工作线程完成Phaser同步System.out.println("【协调线程】阶段 "+(stage+1)+" 完成,质检通过!");lock.lock();try{isApproved=true;condition.signalAll();// 安全唤醒:此时工作线程已进入await()}finally{lock.unlock();}try{Thread.sleep(500);// 模拟质检时间}catch(InterruptedExceptione){Thread.currentThread().interrupt();}}System.out.println("\n【协调线程】全部流程结束!");},"协调线程").start();}privatestaticvoidworkerTask(intid){for(intstage=0;stage<STAGES;stage++){System.out.println("快递员-"+id+" 准备进入阶段 "+(stage+1));// 1. 先完成Phaser同步(等待所有线程到达)phaser.arriveAndAwaitAdvance();System.out.println("快递员-"+id+" 已到达阶段 "+(stage+1)+",开始工作");// 2. 进入Condition等待(关键:确保已进入等待状态)lock.lock();try{// 必须用while循环避免虚假唤醒while(!isApproved){System.out.println("快递员-"+id+" 在阶段 "+(stage+1)+" 等待质检放行...");condition.await();}System.out.println("快递员-"+id+" 获得放行,进入下一阶段!");}catch(InterruptedExceptione){Thread.currentThread().interrupt();}finally{lock.unlock();}}System.out.println("快递员-"+id+" 所有阶段完成!");}}

执行结果:

快递员-1 准备进入阶段 1 快递员-2 准备进入阶段 1 快递员-1 已到达阶段 1,开始工作 快递员-2 已到达阶段 1,开始工作 快递员-1 在阶段 1 等待质检放行... 快递员-2 在阶段 1 等待质检放行... 【协调线程】等待所有快递员完成阶段 1... 【协调线程】阶段 1 完成,质检通过! 快递员-1 获得放行,进入下一阶段! 快递员-1 准备进入阶段 2 快递员-2 获得放行,进入下一阶段! 快递员-2 准备进入阶段 2 快递员-2 已到达阶段 2,开始工作 快递员-2 获得放行,进入下一阶段! 快递员-2 准备进入阶段 3 快递员-1 已到达阶段 2,开始工作 快递员-1 获得放行,进入下一阶段! 快递员-1 准备进入阶段 3 快递员-1 已到达阶段 3,开始工作 快递员-1 获得放行,进入下一阶段! 快递员-1 所有阶段完成! 快递员-2 已到达阶段 3,开始工作 快递员-2 获得放行,进入下一阶段! 快递员-2 所有阶段完成! 【协调线程】等待所有快递员完成阶段 2... 【协调线程】阶段 2 完成,质检通过! 【协调线程】等待所有快递员完成阶段 3... 【协调线程】阶段 3 完成,质检通过! 【协调线程】全部流程结束!

5. Phaser与Condition的深度对比

特性PhaserCondition
核心用途多阶段同步细粒度等待/唤醒
同步方式基于阶段基于条件
线程管理动态添加/移除与Lock关联
适用场景多阶段任务流程生产者-消费者、等待特定条件
代码复杂度中高
性能

6. 最佳实践与常见陷阱

6.1 Phaser最佳实践

  1. 合理设置阶段:不要设置过多阶段,避免过度同步。
  2. 动态注册:在需要时动态注册线程,避免固定数量限制。
  3. 使用arriveAndDeregister:当线程不需要参与后续阶段时,使用arriveAndDeregister来移除。

6.2 Condition最佳实践

  1. 始终使用Lock:Condition必须与Lock一起使用。
  2. 使用while循环:在await()前使用while循环检查条件,避免虚假唤醒。
  3. 信号通知:使用signal()signalAll(),根据需求选择。

6.3 常见陷阱

  1. Phaser阶段计数错误:忘记正确注册线程,导致线程一直等待。
  2. Condition虚假唤醒:没有使用while循环检查条件,导致线程在条件不满足时继续执行。
  3. 死锁:在使用Condition时,没有正确管理Lock的获取和释放。

7. 结语

Phaser和Condition是Java并发编程中非常强大的工具,它们提供了比传统同步机制更灵活、更强大的控制能力。理解并正确使用它们,可以让你在处理复杂的多线程问题时更加得心应手。

通过本文的四个示例,你已经掌握了Phaser和Condition的基本用法和高阶玩法。在实际项目中,根据具体需求选择合适的同步机制,是提升系统性能和可维护性的关键。


8. 经典书籍推荐

《Java并发编程实战》(Java Concurrency in Practice)
作者:Brian Goetz 等
地位:Java并发领域的"圣经"
价值:不仅详细讲解了Phaser、Condition等核心组件,更重要的是传授了一套完整的并发设计思想和最佳实践。尽管出版较早,但其核心原理至今仍是金科玉律,是每一位Java工程师的必读书目。

《Java并发编程的艺术》
作者:方腾飞, 魏鹏, 程晓明
地位:国内优秀并发编程著作
价值:对Phaser、Condition、ReentrantLock等JUC组件的源码剖析极为深入,非常适合希望从源码层面理解Java并发机制的读者。


9. 参考链接

  1. Phaser API文档
  2. Condition API文档

10. 往期博客回顾

  • 【Java线程安全实战】① 从ArrayList并发翻车说起:2025年主流线程安全集合全景图解
  • 【Java线程安全实战】② ConcurrentHashMap 源码深度拆解:如何做到高性能并发?
  • 【Java线程安全实战】③ ThreadLocal 源码深度拆解:如何做到线程隔离?
  • 【Java线程安全实战】④ 可重入锁ReentrantLock深度拆解:如何实现线程安全的同步?
  • 【Java线程安全实战】⑤ 原子类(Atomic)深度解析:无锁编程(Lock-Free)的终极奥义
  • 【Java线程安全实战】⑥ 秒级达百万高并发框架-Disruptor:揭秘LMAX的“快递小哥“如何让系统跑得更快
  • 【Java线程安全实战】⑦ 线程间协作的艺术:CountDownLatch 与 CyclicBarrier 深度剖析

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询