博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java中等待所有线程都执行结束(转)
阅读量:6416 次
发布时间:2019-06-23

本文共 16412 字,大约阅读时间需要 54 分钟。

转自:

 

今天看到一篇文章,是关于java中如何等待所有线程都执行结束,文章总结得很好,原文如下

  看过之后在想java中有很大的灵活性,应该有更多的方式可以做这件事。

  这个事情的场景是这样的:许多线程并行的计算一堆问题,然后每个计算存在一个队列,在主线程要等待所有计算结果完成后排序并展示出来。这样的问题其实很常见。

  1. 使用join。这种方式其实并不是那么的优雅,将所有线程启动完之后还需要将所有线程都join,但是每次join都会阻塞,直到被join线程完成,很可能所有被阻塞线程已经完事了,主线程还在不断地join,貌似有点浪费,而且两个循环也不太好看。

  1. 1 public void testThreadSync1() {   2    3     final Vector
    list = new Vector
    (); 4 Thread[] threads = new Thread[TEST_THREAD_COUNT]; 5 try { 6 for (int i = 0; i < TEST_THREAD_COUNT; i++) { 7 final int num = i; 8 threads[i] = new Thread(new Runnable() { 9 public void run() { 10 try { 11 Thread.sleep(random.nextInt(100)); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 list.add(num); 16 System.out.print(num + " add.\t"); 17 } 18 }); 19 threads[i].start(); 20 } 21 for (int i = 0; i < threads.length; i++) { 22 threads[i].join(); 23 System.out.print(i + " end.\t"); 24 } 25 } catch (InterruptedException ie) { 26 ie.printStackTrace(); 27 } 28 printSortedResult(list); 29 }

     

  1. 1 9 add.  7 add.  3 add.  5 add.  4 add.  1 add.  0 add.  0 end.  1 end.  8 add.  2 add.  2 end.  3 end.  4 end.  5 end.  6 add.  6 end.  7 end.  8 end.  9 end.    2 before sort  3 9   7   3   5   4   1   0   8   2   6     4 after sort  5 0   1   2   3   4   5   6   7   8   9

     

 

  2. 使用wait/notifyAll,这个方式其实跟上面是类似的,只是比较底层些吧(join实际上也是wait)。

 

 
  1. 1 @Test   2 public void testThreadSync2() throws IOException, InterruptedException {   3     final Object waitObject = new Object();   4     final AtomicInteger count = new AtomicInteger(TEST_THREAD_COUNT);   5     final Vector
    list = new Vector
    (); 6 Thread[] threads = new Thread[TEST_THREAD_COUNT]; 7 for (int i = 0; i < TEST_THREAD_COUNT; i++) { 8 final int num = i; 9 threads[i] = new Thread(new Runnable() { 10 public void run() { 11 try { 12 Thread.sleep(random.nextInt(100)); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 list.add(num); 17 System.out.print(num + " add.\t"); 18 synchronized (waitObject) { 19 int cnt = count.decrementAndGet(); 20 if (cnt == 0) { 21 waitObject.notifyAll(); 22 } 23 } 24 } 25 }); 26 threads[i].start(); 27 } 28 synchronized (waitObject) { 29 while (count.get() != 0) { 30 waitObject.wait(); 31 } 32 } 33 printSortedResult(list); 34 }

     

  3. 使用CountDownLatch,这其实是最优雅的写法了,每个线程完成后都去将计数器减一,最后完成时再来唤醒。

例1

 
  1. 1 @Test   2 public void testThreadSync3() {   3     final Vector
    list = new Vector
    (); 4 Thread[] threads = new Thread[TEST_THREAD_COUNT]; 5 final CountDownLatch latch = new CountDownLatch(TEST_THREAD_COUNT); 6 for (int i = 0; i < TEST_THREAD_COUNT; i++) { 7 final int num = i; 8 threads[i] = new Thread(new Runnable() { 9 public void run() { 10 try { 11 Thread.sleep(random.nextInt(100)); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 list.add(num); 16 System.out.print(num + " add.\t"); 17 latch.countDown(); 18 } 19 }); 20 threads[i].start(); 21 } 22 try { 23 latch.await(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 printSortedResult(list); 28 }

     

例2

CountDownLatch 初始化设置count,即等待(await)count个线程或一个线程count次计数,通过工作线程来countDown计数减一,直到计数为0,await阻塞结束。

设置的count不可更改,如需要动态设置计数的线程数,可以使用CyclicBarrier.

 

下面的例子,所有的工作线程中准备就绪以后,并不是直接运行,而是等待主线程的信号后再执行具体的操作。

  1. 1 package com.example.multithread;    2     3 import java.util.concurrent.CountDownLatch;    4     5 class Driver    6 {    7     private static final int TOTAL_THREADS = 10;    8     private final CountDownLatch mStartSignal = new CountDownLatch(1);    9     private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);   10    11     void main()   12     {   13         for (int i = 0; i < TOTAL_THREADS; i++)   14         {   15             new Thread(new Worker(mStartSignal, mDoneSignal, i)).start();   16         }   17         System.out.println("Main Thread Now:" + System.currentTimeMillis());   18         doPrepareWork();// 准备工作    19         mStartSignal.countDown();// 计数减一为0,工作线程真正启动具体操作    20         doSomethingElse();//做点自己的事情    21         try   22         {   23             mDoneSignal.await();// 等待所有工作线程结束    24         }   25         catch (InterruptedException e)   26         {   27             // TODO Auto-generated catch block    28             e.printStackTrace();   29         }   30         System.out.println("All workers have finished now.");   31         System.out.println("Main Thread Now:" + System.currentTimeMillis());   32     }   33    34     void doPrepareWork()   35     {   36         System.out.println("Ready,GO!");   37     }   38    39     void doSomethingElse()   40     {   41         for (int i = 0; i < 100000; i++)   42         {   43             ;// delay    44         }   45         System.out.println("Main Thread Do something else.");   46     }   47 }   48    49 class Worker implements Runnable   50 {   51     private final CountDownLatch mStartSignal;   52     private final CountDownLatch mDoneSignal;   53     private final int mThreadIndex;   54    55     Worker(final CountDownLatch startSignal, final CountDownLatch doneSignal,   56             final int threadIndex)   57     {   58         this.mDoneSignal = doneSignal;   59         this.mStartSignal = startSignal;   60         this.mThreadIndex = threadIndex;   61     }   62    63     @Override   64     public void run()   65     {   66         // TODO Auto-generated method stub    67         try   68         {   69             mStartSignal.await();// 阻塞,等待mStartSignal计数为0运行后面的代码    70                                     // 所有的工作线程都在等待同一个启动的命令    71             doWork();// 具体操作    72             System.out.println("Thread " + mThreadIndex + " Done Now:"   73                     + System.currentTimeMillis());   74             mDoneSignal.countDown();// 完成以后计数减一    75         }   76         catch (InterruptedException e)   77         {   78             // TODO Auto-generated catch block    79             e.printStackTrace();   80         }   81     }   82    83     public void doWork()   84     {   85         for (int i = 0; i < 1000000; i++)   86         {   87             ;// 耗时操作    88         }   89         System.out.println("Thread " + mThreadIndex + ":do work");   90     }   91 }   92    93 public class CountDownLatchTest   94 {   95     public static void main(String[] args)   96     {   97         // TODO Auto-generated method stub    98         new Driver().main();   99     }  100   101 }

    通过Executor启动线程:

    1. 1 class CountDownLatchDriver2   2 {   3     private static final int TOTAL_THREADS = 10;   4     private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);   5   6  7    8     void main()   9     {  10         System.out.println("Main Thread Now:" + System.currentTimeMillis());  11         doPrepareWork();// 准备工作   12   13         Executor executor = Executors.newFixedThreadPool(TOTAL_THREADS);  14         for (int i = 0; i < TOTAL_THREADS; i++)  15         {  16             // 通过内建的线程池维护创建的线程   17             executor.execute(new RunnableWorker(mDoneSignal, i));  18         }  19         doSomethingElse();// 做点自己的事情   20         try  21         {  22             mDoneSignal.await();// 等待所有工作线程结束   23         }  24         catch (InterruptedException e)  25         {  26             // TODO Auto-generated catch block   27             e.printStackTrace();  28         }  29         System.out.println("All workers have finished now.");  30         System.out.println("Main Thread Now:" + System.currentTimeMillis());  31     }  32   33     void doPrepareWork()  34     {  35         System.out.println("Ready,GO!");  36     }  37   38     void doSomethingElse()  39     {  40         for (int i = 0; i < 100000; i++)  41         {  42             ;// delay   43         }  44         System.out.println("Main Thread Do something else.");  45     }  46 }  47   48 class RunnableWorker implements Runnable  49 {  50   51     private final CountDownLatch mDoneSignal;  52     private final int mThreadIndex;  53   54     RunnableWorker(final CountDownLatch doneSignal, final int threadIndex)  55     {  56         this.mDoneSignal = doneSignal;  57         this.mThreadIndex = threadIndex;  58     }  59   60     @Override  61     public void run()  62     {  63         // TODO Auto-generated method stub   64   65         doWork();// 具体操作   66         System.out.println("Thread " + mThreadIndex + " Done Now:"  67                 + System.currentTimeMillis());  68         mDoneSignal.countDown();// 完成以后计数减一   69                                 // 计数为0时,主线程接触阻塞,继续执行其他任务   70         try  71         {  72             // 可以继续做点其他的事情,与主线程无关了   73             Thread.sleep(5000);  74             System.out.println("Thread " + mThreadIndex  75                     + " Do something else after notifing main thread");  76   77         }  78         catch (InterruptedException e)  79         {  80             // TODO Auto-generated catch block   81             e.printStackTrace();  82         }  83   84     }  85   86     public void doWork()  87     {  88         for (int i = 0; i < 1000000; i++)  89         {  90             ;// 耗时操作   91         }  92         System.out.println("Thread " + mThreadIndex + ":do work");  93     }  94 }

      输出:

      1 Main Thread Now:1359959480786 2 Ready,GO! 3 Thread 0:do work 4 Thread 0 Done Now:1359959480808 5 Thread 1:do work 6 Thread 1 Done Now:1359959480811 7 Thread 2:do work 8 Thread 2 Done Now:1359959480813 9 Main Thread Do something else.10 Thread 3:do work11 Thread 3 Done Now:135995948082512 Thread 5:do work13 Thread 5 Done Now:135995948082714 Thread 7:do work15 Thread 7 Done Now:135995948082916 Thread 9:do work17 Thread 9 Done Now:135995948083118 Thread 4:do work19 Thread 4 Done Now:135995948083320 Thread 6:do work21 Thread 6 Done Now:135995948083522 Thread 8:do work23 Thread 8 Done Now:135995948083724 All workers have finished now.25 Main Thread Now:135995948083826 Thread 0 Do something else after notifing main thread27 Thread 1 Do something else after notifing main thread28 Thread 2 Do something else after notifing main thread29 Thread 3 Do something else after notifing main thread30 Thread 9 Do something else after notifing main thread31 Thread 7 Do something else after notifing main thread32 Thread 5 Do something else after notifing main thread33 Thread 4 Do something else after notifing main thread34 Thread 6 Do something else after notifing main thread35 Thread 8 Do something else after notifing main thread

       

 

  4. 使用CyclicBarrier。这里其实类似上面,这个berrier只是在等待完成后自动调用传入CyclicBarrier的Runnable。

例1

 
  1. 1 @Test   2 public void testThreadSync4() throws IOException {   3     final Vector
    list = new Vector
    (); 4 Thread[] threads = new Thread[TEST_THREAD_COUNT]; 5 final CyclicBarrier barrier = new CyclicBarrier(TEST_THREAD_COUNT, 6 new Runnable() { 7 public void run() { 8 printSortedResult(list); 9 } 10 }); 11 for (int i = 0; i < TEST_THREAD_COUNT; i++) { 12 final int num = i; 13 threads[i] = new Thread(new Runnable() { 14 public void run() { 15 try { 16 Thread.sleep(random.nextInt(100)); 17 } catch (InterruptedException e) { 18 e.printStackTrace(); 19 } 20 list.add(num); 21 System.out.print(num + " add.\t"); 22 try { 23 barrier.await(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } catch (BrokenBarrierException e) { 27 e.printStackTrace(); 28 } 29 } 30 }); 31 threads[i].start(); 32 } 33 System.in.read(); 34 }

    例2

    1. 1 class WalkTarget   2 {   3     private final int mCount = 5;   4     private final CyclicBarrier mBarrier;   5     ExecutorService mExecutor;   6    7     class BarrierAction implements Runnable   8     {   9         @Override  10         public void run()  11         {  12             // TODO Auto-generated method stub   13             System.out.println("所有线程都已经完成任务,计数达到预设值");  14             //mBarrier.reset();//恢复到初始化状态          15               16         }  17     }  18   19     WalkTarget()  20     {  21         //初始化CyclicBarrier   22         mBarrier = new CyclicBarrier(mCount, new BarrierAction());  23         mExecutor = Executors.newFixedThreadPool(mCount);  24   25         for (int i = 0; i < mCount; i++)  26         {  27             //启动工作线程   28             mExecutor.execute(new Walker(mBarrier, i));  29         }  30     }  31 }  32   33 //工作线程   34 class Walker implements Runnable  35 {  36     private final CyclicBarrier mBarrier;  37     private final int mThreadIndex;  38   39     Walker(final CyclicBarrier barrier, final int threadIndex)  40  41 42     {  43         mBarrier = barrier;  44         mThreadIndex = threadIndex;  45     }  46   47     @Override  48     public void run()  49     {  50         // TODO Auto-generated method stub   51         System.out.println("Thread " + mThreadIndex + " is running...");  52         // 执行任务   53         try  54         {  55             TimeUnit.MILLISECONDS.sleep(5000);  56             // do task   57         }  58         catch (InterruptedException e)  59         {  60             // TODO Auto-generated catch block   61             e.printStackTrace();  62         }  63   64         // 完成任务以后,等待其他线程完成任务   65         try  66         {  67             mBarrier.await();  68         }  69         catch (InterruptedException e)  70         {  71             // TODO Auto-generated catch block   72             e.printStackTrace();  73         }  74         catch (BrokenBarrierException e)  75         {  76             // TODO Auto-generated catch block   77             e.printStackTrace();  78         }  79         // 其他线程任务都完成以后,阻塞解除,可以继续接下来的任务   80         System.out.println("Thread " + mThreadIndex + " do something else");  81     }  82   83 }  84   85 public class CountDownLatchTest  86 {  87     public static void main(String[] args)  88     {  89         // TODO Auto-generated method stub   90         //new CountDownLatchDriver2().main();   91         new WalkTarget();  92     }  93   94 }

      输出(注意,只有所有的线程barrier.await之后才能继续执行其他的操作):

      Thread 0 is running... Thread 2 is running... Thread 3 is running... Thread 1 is running... Thread 4 is running... 所有线程都已经完成任务,计数达到预设值 Thread 4 do something else Thread 0 do something else Thread 2 do something else Thread 3 do something else Thread 1 do something else

 

5、

CountDownLatch和CyclicBarrier简单比较:

 

CountDownLatch

CyclicBarrier

软件包

java.util.concurrent

java.util.concurrent

适用情景

主线程等待多个工作线程结束

多个线程之间互相等待,直到所有线程达到一个障碍点(Barrier point)

主要方法

CountDownLatch(int count) (主线程调用)

初始化计数

CountDownLatch.await (主线程调用)

阻塞,直到等待计数为0解除阻塞

CountDownLatch.countDown

计数减一(工作线程调用)

CyclicBarrier(int parties, Runnable barrierAction) //初始化参与者数量和障碍点执行Action,Action可选。由主线程初始化

CyclicBarrier.await() //由参与者调用

阻塞,直到所有线程达到屏障点

等待结束

各线程之间不再互相影响,可以继续做自己的事情。不再执行下一个目标工作。

在屏障点达到后,允许所有线程继续执行,达到下一个目标。可以重复使用CyclicBarrier

异常

 

如果其中一个线程由于中断,错误,或超时导致永久离开屏障点,其他线程也将抛出异常。

其他

 

如果BarrierAction不依赖于任何Party中的所有线程,那么在任何party中的一个线程被释放的时候,可以直接运行这个Action。

If(barrier.await()==2)

{

//do action

}

转载地址:http://gtpra.baihongyu.com/

你可能感兴趣的文章
DIV+CSS命名规范有助于SEO
查看>>
js生成二维码
查看>>
C指针练习
查看>>
web项目buildPath与lib的区别
查看>>
php对redis的set(集合)操作
查看>>
我的友情链接
查看>>
ifconfig:command not found的解决方法
查看>>
js使用正则表达式判断手机和固话格式
查看>>
计算机是怎么存储数字的
查看>>
Codeforces Round #369 (Div. 2) A. Bus to Udayland 水题
查看>>
adb上使用cp/mv命令的替代方法(failed on '***' - Cross-device link解决方法)
查看>>
C++标准库简介、与STL的关系。
查看>>
Spring Boot 3 Hibernate
查看>>
查询EBS请求日志的位置和名称
查看>>
大型机、小型机、x86服务器的区别
查看>>
JVM调优总结:调优方法
查看>>
J2EE十三个规范小结
查看>>
算法(第四版)C#题解——2.1
查看>>
网关支付、银联代扣通道、快捷支付、银行卡支付分别是怎么样进行支付的?...
查看>>
大数据开发实战:Stream SQL实时开发一
查看>>