poczekaj, aż wszystkie wątki zakończą swoją pracę w java

94

Piszę aplikację, która ma 5 wątków, które jednocześnie pobierają informacje z sieci i wypełniają 5 różnych pól w klasie bufora.
Muszę sprawdzić poprawność danych w buforze i przechowywać je w bazie danych, gdy wszystkie wątki zakończyły swoją pracę.
Jak mogę to zrobić (otrzymać powiadomienie, gdy wszystkie wątki zakończą swoją pracę)?

RYN
źródło
4
Thread.join jest raczej niskopoziomowym, idiosynchronicznym sposobem rozwiązania tego problemu w bardzo Javie. Ponadto jest to problematyczne, ponieważ API wątków jest wadliwe: nie możesz wiedzieć, czy połączenie zakończyło się pomyślnie, czy nie (patrz Współbieżność Java w praktyce ). Abstrakcja wyższego poziomu, taka jak użycie CountDownLatch, może być preferowana i będzie wyglądać bardziej naturalnie dla programistów, którzy nie utknęli w idiosynchronicznym nastawieniu Java. Nie kłóć się ze mną, kłóć się z Dougiem Leą; )
Cedric Martin

Odpowiedzi:

122

Podejście, które wybieram, polega na użyciu ExecutorService do zarządzania pulami wątków.

ExecutorService es = Executors.newCachedThreadPool();
for(int i=0;i<5;i++)
    es.execute(new Runnable() { /*  your task */ });
es.shutdown();
boolean finished = es.awaitTermination(1, TimeUnit.MINUTES);
// all tasks have finished or the time has been reached.
Peter Lawrey
źródło
7
@Leonid dokładnie to robi shutdown ().
Peter Lawrey,
3
while(!es.awaitTermination(1, TimeUnit.MINUTES));
Aquarius Power
3
@AquariusPower Możesz po prostu powiedzieć mu, żeby czekał dłużej lub na zawsze.
Peter Lawrey
1
Oh, rozumiem; więc dodałem wiadomość w pętli mówiącą, że czeka na zakończenie wszystkich wątków; dzięki!
Aquarius Power
1
@PeterLawrey, czy trzeba dzwonić es.shutdown();? co jeśli napiszę kod, w którym wykonałem wątek używając es.execute(runnableObj_ZipMaking);w trybloku i finallywywołałem boolean finshed = es.awaitTermination(10, TimeUnit.MINUTES);. Więc przypuszczam, że powinno to poczekać, aż wszystkie wątki zakończą swoją pracę lub upłynie limit czasu (cokolwiek będzie pierwsze). Czy moje założenie jest poprawne? czy telefon shutdown()jest obowiązkowy?
Amogh,
54

Możesz joindo wątków. Łączenie blokuje się do zakończenia wątku.

for (Thread thread : threads) {
    thread.join();
}

Zauważ, że joinrzuca InterruptedException. Będziesz musiał zdecydować, co zrobić, jeśli tak się stanie (np. Spróbuj anulować inne wątki, aby zapobiec niepotrzebnej pracy).

Mark Byers
źródło
1
Czy te wątki działają równolegle lub sekwencyjnie względem siebie?
James Webster
5
@JamesWebster: Parallel.
RYN
4
@James Webster: Instrukcja t.join();oznacza, że bieżący wątek jest blokowany do momentu tzakończenia wątku . Nie wpływa na wątek t.
Mark Byers
1
Dzięki. =] Uczyłem się paralelizmu na uniwersytecie, ale to była jedyna rzecz, której starałem się nauczyć! Na szczęście nie muszę go często używać teraz, a kiedy to robię, nie jest to zbyt skomplikowane lub nie ma współdzielonych zasobów, a blokowanie nie jest krytyczne
James Webster
1
@ 4r1y4n To, czy dostarczony kod jest rzeczywiście równoległy, zależy od tego, co próbujesz z nim zrobić, i jest bardziej związane z agregacją danych rozproszonych w kolekcjach przy użyciu połączonych wątków. Dołączasz do wątków, co potencjalnie oznacza „łączenie” danych. Ponadto paralelizm NIE musi koniecznie oznaczać współbieżności. To zależy od procesorów. Może się zdarzyć, że wątki działają równolegle, ale obliczenia odbywają się w dowolnej kolejności określonej przez bazowy procesor.
22

Spójrz na różne rozwiązania.

  1. join()API zostało wprowadzone we wczesnych wersjach Javy. Kilka dobrych alternatyw jest dostępnych w tym pakiecie równoległym od wydania JDK 1.5.

  2. ExecutorService # invokeAll ()

    Wykonuje podane zadania, zwracając listę kontraktów futures posiadających swój status i wyniki, gdy wszystko jest zakończone.

    Odnieś się do tego powiązanego pytania SE, aby zapoznać się z przykładem kodu:

    Jak używać metody invokeAll (), aby cała pula wątków mogła wykonać swoje zadanie?

  3. CountDownLatch

    Pomoc w synchronizacji, która pozwala jednemu lub większej liczbie wątków czekać na zakończenie zestawu operacji wykonywanych w innych wątkach.

    CountDownLatch jest inicjowana przy danej liczbie. Metody await blokują się, dopóki bieżąca liczba nie osiągnie zera z powodu wywołań countDown()metody, po czym wszystkie oczekujące wątki są zwalniane, a kolejne wywołania await wracają natychmiast. Jest to jednorazowe zjawisko - licznika nie można zresetować. Jeśli potrzebujesz wersji, która resetuje licznik, rozważ użycie CyclicBarrier .

    Zapoznaj się z tym pytaniem, aby uzyskać informacje o użyciu CountDownLatch

    Jak czekać na wątek, który odrodzi swój własny wątek?

  4. ForkJoinPool lub newWorkStealingPool () w Egzekutorach

  5. Wykonaj iterację wszystkich obiektów Future utworzonych po przesłaniu doExecutorService

Ravindra babu
źródło
11

Oprócz Thread.join()sugestii innych, java 5 wprowadziła framework wykonawczy. Tam nie pracujesz z Threadprzedmiotami. Zamiast tego przesyłasz swoje Callablelub Runnableobiekty do executora. Istnieje specjalny moduł wykonawczy, który ma wykonywać wiele zadań i zwracać ich wyniki w niewłaściwej kolejności. To jest ExecutorCompletionService:

ExecutorCompletionService executor;
for (..) {
    executor.submit(Executors.callable(yourRunnable));
}

Następnie możesz wielokrotnie wywoływać, take()aż nie będzie już żadnych Future<?>obiektów do zwrócenia, co oznacza, że ​​wszystkie z nich zostaną ukończone.


Inną rzeczą, która może mieć znaczenie, w zależności od scenariusza, jest CyclicBarrier.

Pomoc w synchronizacji, która umożliwia zestawowi wątków czekanie na siebie nawzajem, aby osiągnąć wspólny punkt bariery. CyclicBarriers są przydatne w programach obejmujących grupę wątków o stałej wielkości, które czasami muszą na siebie czekać. Bariera nazywana jest cykliczną, ponieważ można ją ponownie wykorzystać po zwolnieniu oczekujących wątków.

Bozho
źródło
To już blisko, ale nadal wprowadziłbym kilka poprawek. executor.submitzwraca a Future<?>. Dodałbym te futures do listy, a następnie przejrzałbym listę wzywającą getdo każdej przyszłości.
Ray
Możesz także utworzyć instancję konstruktora, używając Executorsnp. Executors.newCachedThreadPool(Lub podobnego)
Ray
10

Inną możliwością jest CountDownLatchobiekt, który jest przydatny w prostych sytuacjach: ponieważ znasz z góry liczbę wątków, inicjalizujesz go odpowiednią liczbą i przekazujesz referencję obiektu do każdego wątku.
Po zakończeniu zadania każdy wątek wywołuje, CountDownLatch.countDown()co zmniejsza licznik wewnętrzny. Główny wątek, po uruchomieniu wszystkich pozostałych, powinien wykonać CountDownLatch.await()wywołanie blokujące. Zostanie zwolniony, gdy licznik wewnętrzny osiągnie 0.

Zwróć uwagę, że tym przedmiotem również InterruptedExceptionmożna rzucić.

interDist
źródło
8

Poczekaj / zablokuj główny wątek, aż inne wątki zakończą swoją pracę.

Jak @Ravindra babupowiedziano, można to osiągnąć na różne sposoby, ale pokazując przykłady.

  • java.lang.Thread. join () Od: 1.0

    public static void joiningThreads() throws InterruptedException {
        Thread t1 = new Thread( new LatchTask(1, null), "T1" );
        Thread t2 = new Thread( new LatchTask(7, null), "T2" );
        Thread t3 = new Thread( new LatchTask(5, null), "T3" );
        Thread t4 = new Thread( new LatchTask(2, null), "T4" );
    
        // Start all the threads
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    
        // Wait till all threads completes
        t1.join();
        t2.join();
        t3.join();
        t4.join();
    }
    
  • java.util.concurrent.CountDownLatch od: 1.5

    • .countDown() «Zmniejsza licznik grupy zatrzasków.
    • .await() «Metody await blokują się, aż aktualna liczba osiągnie zero.

    Jeśli utworzono latchGroupCount = 4następnie countDown()powinien być nazywany 4 razy, aby liczyć 0. Tak więc, że await()wyda blokowania wątków.

    public static void latchThreads() throws InterruptedException {
        int latchGroupCount = 4;
        CountDownLatch latch = new CountDownLatch(latchGroupCount);
        Thread t1 = new Thread( new LatchTask(1, latch), "T1" );
        Thread t2 = new Thread( new LatchTask(7, latch), "T2" );
        Thread t3 = new Thread( new LatchTask(5, latch), "T3" );
        Thread t4 = new Thread( new LatchTask(2, latch), "T4" );
    
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    
        //latch.countDown();
    
        latch.await(); // block until latchGroupCount is 0.
    }
    

Przykładowy kod klasy Threaded LatchTask. Aby przetestować zastosowanie podejścia joiningThreads(); i latchThreads();od głównej metody.

class LatchTask extends Thread {
    CountDownLatch latch;
    int iterations = 10;
    public LatchTask(int iterations, CountDownLatch latch) {
        this.iterations = iterations;
        this.latch = latch;
    }

    @Override
    public void run() {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + " : Started Task...");

        for (int i = 0; i < iterations; i++) {
            System.out.println(threadName + " : " + i);
            MainThread_Wait_TillWorkerThreadsComplete.sleep(1);
        }
        System.out.println(threadName + " : Completed Task");
        // countDown() « Decrements the count of the latch group.
        if(latch != null)
            latch.countDown();
    }
}
  • CyclicBarriers Pomoc w synchronizacji, która umożliwia zestawowi wątków czekanie na siebie nawzajem, aby osiągnąć wspólny punkt bariery.CyclicBarriers są przydatne w programach obejmujących grupę wątków o stałej wielkości, które czasami muszą na siebie czekać. Bariera nazywana jest cykliczną, ponieważ można ją ponownie wykorzystać po zwolnieniu oczekujących wątków.
    CyclicBarrier barrier = new CyclicBarrier(3);
    barrier.await();
    
    Na przykład odwołaj się do tej klasy Concurrent_ParallelNotifyies .

  • Framework wykonawczy : możemy użyć ExecutorService, aby utworzyć pulę wątków i śledzić postęp zadań asynchronicznych z Future.

    • submit(Runnable), submit(Callable)które zwracają Future Object. Korzystając z future.get()funkcji możemy zablokować główny wątek do momentu zakończenia pracy wątków roboczych.

    • invokeAll(...) - zwraca listę obiektów Future, poprzez które można uzyskać wyniki wykonania każdego wywoływanego.

Znajdź przykład korzystania z interfejsów Runnable, Callable with Executor.


@Zobacz też

Yash
źródło
7

Ty robisz

for (Thread t : new Thread[] { th1, th2, th3, th4, th5 })
    t.join()

Po wykonaniu tej pętli for możesz być pewien, że wszystkie wątki zakończyły swoje zadania.

aioobe
źródło
4

Przechowuj obiekty wątku w jakiejś kolekcji (takiej jak lista lub zestaw), a następnie zapętlaj kolekcję po uruchomieniu wątków i wywołaj join () w wątkach .

esaj
źródło
2

Chociaż nie dotyczy to problemu OP, jeśli jesteś zainteresowany synchronizacją (a dokładniej rendez-vous) z dokładnie jednym wątkiem, możesz użyć Exchanger

W moim przypadku musiałem wstrzymać wątek nadrzędny do czasu, aż wątek potomny coś zrobi, np. Zakończy inicjalizację. CountDownLatch również działa dobrze.

18446744073709551615
źródło
1

spróbuj tego, zadziała.

  Thread[] threads = new Thread[10];

  List<Thread> allThreads = new ArrayList<Thread>();

  for(Thread thread : threads){

        if(null != thread){

              if(thread.isAlive()){

                    allThreads.add(thread);

              }

        }

  }

  while(!allThreads.isEmpty()){

        Iterator<Thread> ite = allThreads.iterator();

        while(ite.hasNext()){

              Thread thread = ite.next();

              if(!thread.isAlive()){

                   ite.remove();
              }

        }

   }
Jeyaraj.J
źródło
1

Miałem podobny problem i skończyłem na Java 8 parallelStream.

requestList.parallelStream().forEach(req -> makeRequest(req));

To bardzo proste i czytelne. W tle używa domyślnej puli złączeń widełek JVM, co oznacza, że ​​przed kontynuowaniem będzie czekać na zakończenie wszystkich wątków. W moim przypadku było to zgrabne rozwiązanie, ponieważ był to jedyny strumień równoległy w mojej aplikacji. Jeśli masz jednocześnie uruchomionych więcej niż jeden strumień równoległy, przeczytaj poniższe łącze.

Więcej informacji na temat strumieni równoległych tutaj .

Madis Pukkonen
źródło
1

Stworzyłem małą metodę pomocniczą, aby poczekać na zakończenie kilku wątków:

public static void waitForThreadsToFinish(Thread... threads) {
        try {
            for (Thread thread : threads) {
                thread.join();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
Alon Gouldman
źródło
0

Istniejące odpowiedzi mówiły, że join()każdy wątek.

Ale istnieje kilka sposobów uzyskania tablicy / listy wątków:

  • Dodaj wątek do listy podczas tworzenia.
  • Służy ThreadGroupdo zarządzania wątkami.

Poniższy kod użyje tego ThreadGruoppodejścia. Najpierw tworzy grupę, a następnie podczas tworzenia każdego wątku określa grupę w konstruktorze, a później może uzyskać tablicę wątków za pośrednictwemThreadGroup.enumerate()


Kod

SyncBlockLearn.java

import org.testng.Assert;
import org.testng.annotations.Test;

/**
 * synchronized block - learn,
 *
 * @author eric
 * @date Apr 20, 2015 1:37:11 PM
 */
public class SyncBlockLearn {
    private static final int TD_COUNT = 5; // thread count
    private static final int ROUND_PER_THREAD = 100; // round for each thread,
    private static final long INC_DELAY = 10; // delay of each increase,

    // sync block test,
    @Test
    public void syncBlockTest() throws InterruptedException {
        Counter ct = new Counter();
        ThreadGroup tg = new ThreadGroup("runner");

        for (int i = 0; i < TD_COUNT; i++) {
            new Thread(tg, ct, "t-" + i).start();
        }

        Thread[] tArr = new Thread[TD_COUNT];
        tg.enumerate(tArr); // get threads,

        // wait all runner to finish,
        for (Thread t : tArr) {
            t.join();
        }

        System.out.printf("\nfinal count: %d\n", ct.getCount());
        Assert.assertEquals(ct.getCount(), TD_COUNT * ROUND_PER_THREAD);
    }

    static class Counter implements Runnable {
        private final Object lkOn = new Object(); // the object to lock on,
        private int count = 0;

        @Override
        public void run() {
            System.out.printf("[%s] begin\n", Thread.currentThread().getName());

            for (int i = 0; i < ROUND_PER_THREAD; i++) {
                synchronized (lkOn) {
                    System.out.printf("[%s] [%d] inc to: %d\n", Thread.currentThread().getName(), i, ++count);
                }
                try {
                    Thread.sleep(INC_DELAY); // wait a while,
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.printf("[%s] end\n", Thread.currentThread().getName());
        }

        public int getCount() {
            return count;
        }
    }
}

Główny wątek będzie czekał na zakończenie wszystkich wątków w grupie.

Eric Wang
źródło
-1

Użyj tego w swoim głównym wątku: while (! Executor.isTerminated ()); Umieść ten wiersz kodu po uruchomieniu wszystkich wątków z usługi wykonawczej. Spowoduje to uruchomienie głównego wątku dopiero po zakończeniu wszystkich wątków uruchomionych przez executory. Upewnij się, że wywołujesz executor.shutdown (); przed powyższą pętlą.

Maggy
źródło
Jest to aktywne oczekiwanie, które spowoduje, że procesor będzie stale wykonywał pustą pętlę. Bardzo marnotrawne.
Adam Michalik