Jak czekać na ukończenie kilku wątków?

109

Jak można po prostu zaczekać na zakończenie całego procesu gwintowanego? Na przykład, powiedzmy, że mam:

public class DoSomethingInAThread implements Runnable{

    public static void main(String[] args) {
        for (int n=0; n<1000; n++) {
            Thread t = new Thread(new DoSomethingInAThread());
            t.start();
        }
        // wait for all threads' run() methods to complete before continuing
    }

    public void run() {
        // do something here
    }


}

Jak to zmienić, aby main()metoda zatrzymywała się w komentarzu do momentu zakończenia wszystkich run()metod wątków ? Dzięki!

DivideByHero
źródło

Odpowiedzi:

163

Umieszczasz wszystkie wątki w tablicy, uruchamiasz je wszystkie, a następnie tworzysz pętlę

for(i = 0; i < threads.length; i++)
  threads[i].join();

Każde połączenie będzie blokowane do momentu zakończenia odpowiedniego wątku. Wątki mogą kończyć się w innej kolejności niż ich dołączanie, ale to nie jest problem: po zamknięciu pętli wszystkie wątki są zakończone.

Martin przeciwko Löwis
źródło
1
@Mykola: jaka dokładnie jest zaleta korzystania z grupy wątków? Tylko dlatego, że API jest dostępne, nie oznacza, że ​​musisz go używać ...
Martin v. Löwis
2
Zobacz: „Grupa wątków reprezentuje zbiór wątków”. To jest poprawne semantycznie dla tego przypadku użycia! Oraz: „Wątek może uzyskać dostęp do informacji o swojej grupie wątków”
Martin K.
4
Książka „Effective Java” zaleca unikanie grup wątków (pozycja 73).
Bastien Léonard
2
Błędy wymienione w Effective Java powinny zostać naprawione w Javie 6. Jeśli nowsze wersje Java nie są ograniczeniem, do rozwiązywania problemów z wątkami lepiej jest używać Futures. Martin v. Löwis: Masz rację. Nie jest to istotne dla tego problemu, ale miło jest uzyskać więcej informacji o uruchomionych wątkach z jednego obiektu (na przykład ExecutorService). Myślę, że fajnie jest używać danych funkcji do rozwiązania problemu; być może w przyszłości będziesz potrzebować większej elastyczności (informacje o wątkach). Warto również wspomnieć o starych klasach błędów w starszych JDK.
Martin K.
5
ThreadGroup nie implementuje sprzężenia na poziomie grupy, więc to, dlaczego ludzie wypychają ThreadGroup, jest trochę kłopotliwe. Czy ludzie naprawdę używają blokad spinów i sprawdzają activeCount grupy? Trudno byłoby mnie przekonać, że zrobienie tego jest lepsze pod każdym względem niż zwykłe wywoływanie funkcji Join we wszystkich wątkach.
41

Jednym ze sposobów byłoby zrobienie Listz Threads, utworzenie i uruchomienie każdego wątku podczas dodawania go do listy. Gdy wszystko zostanie uruchomione, wróć do listy i wywołaj join()każdą z nich. Nie ma znaczenia, w jakiej kolejności wątki kończą wykonywanie, wszystko, co musisz wiedzieć, to to, że do czasu zakończenia wykonywania drugiej pętli każdy wątek zostanie ukończony.

Lepszym podejściem jest użycie ExecutorService i powiązanych z nim metod:

List<Callable> callables = ... // assemble list of Callables here
                               // Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
//       bother saving them

Korzystanie z usługi ExecutorService (i wszystkich nowych rzeczy z narzędzi współbieżności Java 5 ) jest niezwykle elastyczne, a powyższy przykład ledwo zarysowuje powierzchnię.

Adam Batkin
źródło
ThreadGroup to droga do zrobienia! Z zmienną listą będziesz miał kłopoty (synchronizacja)
Martin K.
3
Co? Jak byś miał kłopoty? Jest modyfikowalny (tylko do odczytu) tylko przez wątek, który uruchamia, więc jeśli nie modyfikuje listy podczas iteracji przez nią, jest w porządku.
Adam Batkin
To zależy od tego, jak go używasz. Jeśli użyjesz klasy wywołującej w wątku, będziesz miał problemy.
Martin K.
27
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DoSomethingInAThread implements Runnable
{
   public static void main(String[] args) throws ExecutionException, InterruptedException
   {
      //limit the number of actual threads
      int poolSize = 10;
      ExecutorService service = Executors.newFixedThreadPool(poolSize);
      List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();

      for (int n = 0; n < 1000; n++)
      {
         Future f = service.submit(new DoSomethingInAThread());
         futures.add(f);
      }

      // wait for all tasks to complete before continuing
      for (Future<Runnable> f : futures)
      {
         f.get();
      }

      //shut down the executor service so that this thread can exit
      service.shutdownNow();
   }

   public void run()
   {
      // do something here
   }
}
jt.
źródło
działało jak marzenie ... Mam dwa zestawy wątków, które nie powinny działać jednocześnie z powodu problemów z wieloma plikami cookie. Użyłem twojego przykładu, aby uruchomić jeden zestaw wątków naraz ... dzięki za podzielenie się wiedzą ...
arn-arn
@Dantalian - W swojej klasie Runnable (prawdopodobnie w metodzie run) chciałbyś przechwycić wszelkie wyjątki, które wystąpiły i zapisać je lokalnie (lub zapisać komunikat / warunek błędu). W tym przykładzie f.get () zwraca obiekt, który przesłałeś do ExecutorService. Twój obiekt może mieć metodę pobierania wyjątków / warunków błędów. W zależności od tego, jak zmodyfikujesz podany przykład, może być konieczne rzutowanie obiektu obróconego przez f.get () na oczekiwany typ.
jt.
12

zamiast tego join(), który jest starym interfejsem API, możesz użyć CountDownLatch . Zmodyfikowałem Twój kod, jak poniżej, aby spełnić Twoje wymagania.

import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable{
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch){
        this.latch = latch;
    } 
    public void run() {
        try{
            System.out.println("Do some thing");
            latch.countDown();
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try{
            CountDownLatch latch = new CountDownLatch(1000);
            for (int n=0; n<1000; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of 1000 threads");
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

Wyjaśnienie :

  1. CountDownLatch został zainicjowany z podaną liczbą 1000 zgodnie z wymaganiami.

  2. Każdy wątek roboczy DoSomethingInAThread zmniejszy wartość CountDownLatch, która została przekazana w konstruktorze.

  3. Główny wątek, CountDownLatchDemo await()aż liczba osiągnie zero. Gdy liczba osiągnie zero, otrzymasz wynik poniżej linii.

    In Main thread after completion of 1000 threads

Więcej informacji na stronie dokumentacji oracle

public void await()
           throws InterruptedException

Powoduje, że bieżący wątek czeka, aż zatrzask zostanie odliczony do zera, chyba że wątek zostanie przerwany.

Zobacz powiązane pytanie SE, aby uzyskać inne opcje:

poczekaj, aż wszystkie wątki zakończą swoją pracę w java

Ravindra babu
źródło
8

Całkowicie unikaj klasy Thread i zamiast tego używaj wyższych abstrakcji dostarczonych w java.util.concurrent

Klasa ExecutorService udostępnia metodę invokeAll, która wydaje się robić dokładnie to, co chcesz.

henrik
źródło
6

Rozważ użycie java.util.concurrent.CountDownLatch. Przykłady w javadocs

Pablo Cavalieri
źródło
Jest zatrzaskiem do gwintów, blokada zatrzasku działa z odliczaniem. W metodzie run () twojego wątku jawnie zadeklaruj czekanie, aż CountDownLatch osiągnie odliczanie do 0. Możesz użyć tego samego CountDownLatch w więcej niż jednym wątku, aby zwolnić je jednocześnie. Nie wiem, czy tego potrzebujesz, chciałem tylko o tym wspomnieć, ponieważ jest to przydatne podczas pracy w środowisku wielowątkowym.
Pablo Cavalieri
Może powinieneś umieścić to wyjaśnienie w treści swojej odpowiedzi?
Aaron Hall
Przykłady w Javadoc są bardzo opisowe, dlatego ich nie dodałem. docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . W pierwszym przykładzie wszystkie wątki Workers są zwolnione jednocześnie, ponieważ czekają, aż CountdownLatch startSignal osiągnie zero, co ma miejsce w startSignal.countDown (). Następnie wątek mian czeka, aż wszystkie prace zakończą się, używając instrukcji doneSignal.await (). doneSignal zmniejsza jego wartość u każdego pracownika.
Pablo Cavalieri
6

Jak zasugerował Martin K., java.util.concurrent.CountDownLatchwydaje się być lepszym rozwiązaniem. Dodam tylko przykład tego samego

     public class CountDownLatchDemo
{

    public static void main (String[] args)
    {
        int noOfThreads = 5;
        // Declare the count down latch based on the number of threads you need
        // to wait on
        final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
        for (int i = 0; i < noOfThreads; i++)
        {
            new Thread()
            {

                @Override
                public void run ()
                {

                    System.out.println("I am executed by :" + Thread.currentThread().getName());
                    try
                    {
                        // Dummy sleep
                        Thread.sleep(3000);
                        // One thread has completed its job
                        executionCompleted.countDown();
                    }
                    catch (InterruptedException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }.start();
        }

        try
        {
            // Wait till the count down latch opens.In the given case till five
            // times countDown method is invoked
            executionCompleted.await();
            System.out.println("All over");
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

}
Freaky Thommi
źródło
4

W zależności od potrzeb możesz również chcieć sprawdzić klasy CountDownLatch i CyclicBarrier w pakiecie java.util.concurrent. Mogą być przydatne, jeśli chcesz, aby Twoje wątki czekały na siebie nawzajem lub jeśli chcesz mieć bardziej szczegółową kontrolę nad sposobem wykonywania wątków (np. Czekając w ich wewnętrznym wykonaniu, aż inny wątek ustawi jakiś stan). Możesz również użyć CountDownLatch, aby zasygnalizować wszystkie swoje wątki, aby rozpoczęły się w tym samym czasie, zamiast uruchamiać je jeden po drugim podczas iteracji w pętli. Standardowa dokumentacja API ma tego przykład, plus użycie innego CountDownLatch, aby poczekać, aż wszystkie wątki zakończą swoje wykonanie.

Jeff
źródło
1

Utwórz obiekt wątku wewnątrz pierwszej pętli for.

for (int i = 0; i < threads.length; i++) {
     threads[i] = new Thread(new Runnable() {
         public void run() {
             // some code to run in parallel
         }
     });
     threads[i].start();
 }

A potem to, co wszyscy tutaj mówią.

for(i = 0; i < threads.length; i++)
  threads[i].join();
optimus0127
źródło
0

Możesz to zrobić za pomocą obiektu „ThreadGroup” i jego parametru activeCount :

Martin K.
źródło
Nie wiem, jak dokładnie zamierzasz to zrobić. Jeśli proponujesz sondowanie activeCount w pętli: to źle, ponieważ jest zajęty-czekaj (nawet jeśli śpisz między ankietami - wtedy uzyskujesz kompromis między biznesem a responsywnością).
Martin przeciwko Löwisowi
@Martin v. Löwis: „Join będzie czekał tylko na jeden wątek. Lepszym rozwiązaniem może być java.util.concurrent.CountDownLatch. Po prostu zainicjuj zatrzask z liczbą ustawioną na liczbę wątków roboczych. Każdy wątek roboczy powinien wywołać countDown () tuż przed wyjściem, a główny wątek po prostu wywołuje await (), co będzie blokować do zera. Problem z join () polega również na tym, że nie można dynamicznie dodawać kolejnych wątków. Lista eksploduje z jednoczesną modyfikacją ”. Twoje rozwiązanie działa dobrze w przypadku problemu, ale nie w ogólnym celu.
Martin K.
0

Jako alternatywę dla CountDownLatch możesz również użyć CyclicBarrier np

public class ThreadWaitEx {
    static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
        public void run(){
            System.out.println("clean up job after all tasks are done.");
        }
    });
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new MyCallable(barrier));
            t.start();
        }       
    }

}    

class MyCallable implements Runnable{
    private CyclicBarrier b = null;
    public MyCallable(CyclicBarrier b){
        this.b = b;
    }
    @Override
    public void run(){
        try {
            //do something
            System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
            b.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }       
}

Aby użyć CyclicBarrier w tym przypadku, barrier.await () powinno być ostatnią instrukcją, tj. Kiedy twój wątek kończy swoją pracę. CyclicBarrier może być ponownie użyty z jego metodą reset (). Cytując javadocs:

CyclicBarrier obsługuje opcjonalne polecenie Runnable, które jest uruchamiane raz na punkt bariery, po nadejściu ostatniego wątku w grupie, ale przed zwolnieniem jakichkolwiek wątków. Ta akcja bariery jest przydatna do aktualizowania stanu współużytkowanego, zanim którakolwiek ze stron będzie kontynuować.

shailendra1118
źródło
Nie sądzę, żeby to był dobry przykład dla CyclicBarrier. Dlaczego używasz wywołania Thread.sleep ()?
Guenther,
@Guenther - tak, zmieniłem kod, aby dostosować go do wymagań.
shailendra1118
CyclicBarrier nie jest alternatywą dla CountDownLatch. Gdy wątki muszą wielokrotnie odliczać w dół, należy utworzyć CyclicBarrier, w przeciwnym razie domyślnie CountDownLatch (chyba że wymaga dodatkowej abstrakcji wykonania, w którym to momencie należy spojrzeć na usługi wyższego poziomu).
Elysiumplain
0

To join()nie było dla mnie pomocne. zobacz tę próbkę w Kotlin:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
        }
    })

Wynik:

Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765

Teraz pozwól mi użyć join()dla wątków:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
            t.join()
        }
    })

A wynik:

Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833

Jak widać, gdy używamy join:

  1. Wątki działają sekwencyjnie.
  2. Pierwsza próbka zajmuje 765 milisekund, a druga 3833 milisekund.

Naszym rozwiązaniem zapobiegającym blokowaniu innych wątków było utworzenie ArrayList:

val threads = ArrayList<Thread>()

Teraz, gdy chcemy rozpocząć nowy wątek, najczęściej dodajemy go do ArrayList:

addThreadToArray(
    ThreadUtils.startNewThread(Runnable {
        ...
    })
)

addThreadToArrayFunkcja:

@Synchronized
fun addThreadToArray(th: Thread) {
    threads.add(th)
}

Funkcja startNewThread:

fun startNewThread(runnable: Runnable) : Thread {
    val th = Thread(runnable)
    th.isDaemon = false
    th.priority = Thread.MAX_PRIORITY
    th.start()
    return th
}

Sprawdź wypełnienie wątków, jak poniżej, wszędzie tam, gdzie jest to potrzebne:

val notAliveThreads = ArrayList<Thread>()
for (t in threads)
    if (!t.isAlive)
        notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0){
    // The size is 0 -> there is no alive threads.
}
Misagh
źródło