Piszę aplikację, która ma 5 wątków, które jednocześnie pobierają informacje z sieci i wypełniają 5 różnych pól w klasie bufora.
Muszę sprawdzić poprawność danych w buforze i przechowywać je w bazie danych, gdy wszystkie wątki zakończyły swoją pracę.
Jak mogę to zrobić (otrzymać powiadomienie, gdy wszystkie wątki zakończą swoją pracę)?
java
multithreading
wait
RYN
źródło
źródło
Odpowiedzi:
Podejście, które wybieram, polega na użyciu ExecutorService do zarządzania pulami wątków.
ExecutorService es = Executors.newCachedThreadPool(); for(int i=0;i<5;i++) es.execute(new Runnable() { /* your task */ }); es.shutdown(); boolean finished = es.awaitTermination(1, TimeUnit.MINUTES); // all tasks have finished or the time has been reached.
źródło
while(!es.awaitTermination(1, TimeUnit.MINUTES));
es.shutdown();
? co jeśli napiszę kod, w którym wykonałem wątek używająces.execute(runnableObj_ZipMaking);
wtry
bloku ifinally
wywołałemboolean finshed = es.awaitTermination(10, TimeUnit.MINUTES);
. Więc przypuszczam, że powinno to poczekać, aż wszystkie wątki zakończą swoją pracę lub upłynie limit czasu (cokolwiek będzie pierwsze). Czy moje założenie jest poprawne? czy telefonshutdown()
jest obowiązkowy?Możesz
join
do wątków. Łączenie blokuje się do zakończenia wątku.for (Thread thread : threads) { thread.join(); }
Zauważ, że
join
rzucaInterruptedException
. Będziesz musiał zdecydować, co zrobić, jeśli tak się stanie (np. Spróbuj anulować inne wątki, aby zapobiec niepotrzebnej pracy).źródło
t.join();
oznacza, że bieżący wątek jest blokowany do momentut
zakończenia wątku . Nie wpływa na wątekt
.Spójrz na różne rozwiązania.
join()
API zostało wprowadzone we wczesnych wersjach Javy. Kilka dobrych alternatyw jest dostępnych w tym pakiecie równoległym od wydania JDK 1.5.ExecutorService # invokeAll ()
Odnieś się do tego powiązanego pytania SE, aby zapoznać się z przykładem kodu:
Jak używać metody invokeAll (), aby cała pula wątków mogła wykonać swoje zadanie?
CountDownLatch
Zapoznaj się z tym pytaniem, aby uzyskać informacje o użyciu
CountDownLatch
Jak czekać na wątek, który odrodzi swój własny wątek?
ForkJoinPool lub newWorkStealingPool () w Egzekutorach
Wykonaj iterację wszystkich obiektów Future utworzonych po przesłaniu do
ExecutorService
źródło
Oprócz
Thread.join()
sugestii innych, java 5 wprowadziła framework wykonawczy. Tam nie pracujesz zThread
przedmiotami. Zamiast tego przesyłasz swojeCallable
lubRunnable
obiekty do executora. Istnieje specjalny moduł wykonawczy, który ma wykonywać wiele zadań i zwracać ich wyniki w niewłaściwej kolejności. To jestExecutorCompletionService
:ExecutorCompletionService executor; for (..) { executor.submit(Executors.callable(yourRunnable)); }
Następnie możesz wielokrotnie wywoływać,
take()
aż nie będzie już żadnychFuture<?>
obiektów do zwrócenia, co oznacza, że wszystkie z nich zostaną ukończone.Inną rzeczą, która może mieć znaczenie, w zależności od scenariusza, jest
CyclicBarrier
.źródło
executor.submit
zwraca aFuture<?>
. Dodałbym te futures do listy, a następnie przejrzałbym listę wzywającąget
do każdej przyszłości.Executors
np.Executors.newCachedThreadPool
(Lub podobnego)Inną możliwością jest
CountDownLatch
obiekt, który jest przydatny w prostych sytuacjach: ponieważ znasz z góry liczbę wątków, inicjalizujesz go odpowiednią liczbą i przekazujesz referencję obiektu do każdego wątku.Po zakończeniu zadania każdy wątek wywołuje,
CountDownLatch.countDown()
co zmniejsza licznik wewnętrzny. Główny wątek, po uruchomieniu wszystkich pozostałych, powinien wykonaćCountDownLatch.await()
wywołanie blokujące. Zostanie zwolniony, gdy licznik wewnętrzny osiągnie 0.Zwróć uwagę, że tym przedmiotem również
InterruptedException
można rzucić.źródło
Poczekaj / zablokuj główny wątek, aż inne wątki zakończą swoją pracę.
Jak
@Ravindra babu
powiedziano, można to osiągnąć na różne sposoby, ale pokazując przykłady.java.lang.Thread. join () Od: 1.0
public static void joiningThreads() throws InterruptedException { Thread t1 = new Thread( new LatchTask(1, null), "T1" ); Thread t2 = new Thread( new LatchTask(7, null), "T2" ); Thread t3 = new Thread( new LatchTask(5, null), "T3" ); Thread t4 = new Thread( new LatchTask(2, null), "T4" ); // Start all the threads t1.start(); t2.start(); t3.start(); t4.start(); // Wait till all threads completes t1.join(); t2.join(); t3.join(); t4.join(); }
java.util.concurrent.CountDownLatch od: 1.5
.countDown()
«Zmniejsza licznik grupy zatrzasków..await()
«Metody await blokują się, aż aktualna liczba osiągnie zero.Jeśli utworzono
latchGroupCount = 4
następniecountDown()
powinien być nazywany 4 razy, aby liczyć 0. Tak więc, żeawait()
wyda blokowania wątków.public static void latchThreads() throws InterruptedException { int latchGroupCount = 4; CountDownLatch latch = new CountDownLatch(latchGroupCount); Thread t1 = new Thread( new LatchTask(1, latch), "T1" ); Thread t2 = new Thread( new LatchTask(7, latch), "T2" ); Thread t3 = new Thread( new LatchTask(5, latch), "T3" ); Thread t4 = new Thread( new LatchTask(2, latch), "T4" ); t1.start(); t2.start(); t3.start(); t4.start(); //latch.countDown(); latch.await(); // block until latchGroupCount is 0. }
Przykładowy kod klasy Threaded
LatchTask
. Aby przetestować zastosowanie podejściajoiningThreads();
ilatchThreads();
od głównej metody.class LatchTask extends Thread { CountDownLatch latch; int iterations = 10; public LatchTask(int iterations, CountDownLatch latch) { this.iterations = iterations; this.latch = latch; } @Override public void run() { String threadName = Thread.currentThread().getName(); System.out.println(threadName + " : Started Task..."); for (int i = 0; i < iterations; i++) { System.out.println(threadName + " : " + i); MainThread_Wait_TillWorkerThreadsComplete.sleep(1); } System.out.println(threadName + " : Completed Task"); // countDown() « Decrements the count of the latch group. if(latch != null) latch.countDown(); } }
Na przykład odwołaj się do tej klasy Concurrent_ParallelNotifyies .CyclicBarrier barrier = new CyclicBarrier(3); barrier.await();
Framework wykonawczy : możemy użyć ExecutorService, aby utworzyć pulę wątków i śledzić postęp zadań asynchronicznych z Future.
submit(Runnable)
,submit(Callable)
które zwracają Future Object. Korzystając zfuture.get()
funkcji możemy zablokować główny wątek do momentu zakończenia pracy wątków roboczych.invokeAll(...)
- zwraca listę obiektów Future, poprzez które można uzyskać wyniki wykonania każdego wywoływanego.Znajdź przykład korzystania z interfejsów Runnable, Callable with Executor.
@Zobacz też
źródło
Ty robisz
for (Thread t : new Thread[] { th1, th2, th3, th4, th5 }) t.join()
Po wykonaniu tej pętli for możesz być pewien, że wszystkie wątki zakończyły swoje zadania.
źródło
Przechowuj obiekty wątku w jakiejś kolekcji (takiej jak lista lub zestaw), a następnie zapętlaj kolekcję po uruchomieniu wątków i wywołaj join () w wątkach .
źródło
W tym celu możesz użyć metody Threadf # join .
źródło
Chociaż nie dotyczy to problemu OP, jeśli jesteś zainteresowany synchronizacją (a dokładniej rendez-vous) z dokładnie jednym wątkiem, możesz użyć Exchanger
W moim przypadku musiałem wstrzymać wątek nadrzędny do czasu, aż wątek potomny coś zrobi, np. Zakończy inicjalizację. CountDownLatch również działa dobrze.
źródło
Usługa wykonawcy może służyć do zarządzania wieloma wątkami, w tym statusem i ukończeniem. Zobacz http://programmingexamples.wikidot.com/executorservice
źródło
spróbuj tego, zadziała.
Thread[] threads = new Thread[10]; List<Thread> allThreads = new ArrayList<Thread>(); for(Thread thread : threads){ if(null != thread){ if(thread.isAlive()){ allThreads.add(thread); } } } while(!allThreads.isEmpty()){ Iterator<Thread> ite = allThreads.iterator(); while(ite.hasNext()){ Thread thread = ite.next(); if(!thread.isAlive()){ ite.remove(); } } }
źródło
Miałem podobny problem i skończyłem na Java 8 parallelStream.
To bardzo proste i czytelne. W tle używa domyślnej puli złączeń widełek JVM, co oznacza, że przed kontynuowaniem będzie czekać na zakończenie wszystkich wątków. W moim przypadku było to zgrabne rozwiązanie, ponieważ był to jedyny strumień równoległy w mojej aplikacji. Jeśli masz jednocześnie uruchomionych więcej niż jeden strumień równoległy, przeczytaj poniższe łącze.
Więcej informacji na temat strumieni równoległych tutaj .
źródło
Stworzyłem małą metodę pomocniczą, aby poczekać na zakończenie kilku wątków:
public static void waitForThreadsToFinish(Thread... threads) { try { for (Thread thread : threads) { thread.join(); } } catch (InterruptedException e) { e.printStackTrace(); } }
źródło
Istniejące odpowiedzi mówiły, że
join()
każdy wątek.Ale istnieje kilka sposobów uzyskania tablicy / listy wątków:
ThreadGroup
do zarządzania wątkami.Poniższy kod użyje tego
ThreadGruop
podejścia. Najpierw tworzy grupę, a następnie podczas tworzenia każdego wątku określa grupę w konstruktorze, a później może uzyskać tablicę wątków za pośrednictwemThreadGroup.enumerate()
Kod
SyncBlockLearn.java
import org.testng.Assert; import org.testng.annotations.Test; /** * synchronized block - learn, * * @author eric * @date Apr 20, 2015 1:37:11 PM */ public class SyncBlockLearn { private static final int TD_COUNT = 5; // thread count private static final int ROUND_PER_THREAD = 100; // round for each thread, private static final long INC_DELAY = 10; // delay of each increase, // sync block test, @Test public void syncBlockTest() throws InterruptedException { Counter ct = new Counter(); ThreadGroup tg = new ThreadGroup("runner"); for (int i = 0; i < TD_COUNT; i++) { new Thread(tg, ct, "t-" + i).start(); } Thread[] tArr = new Thread[TD_COUNT]; tg.enumerate(tArr); // get threads, // wait all runner to finish, for (Thread t : tArr) { t.join(); } System.out.printf("\nfinal count: %d\n", ct.getCount()); Assert.assertEquals(ct.getCount(), TD_COUNT * ROUND_PER_THREAD); } static class Counter implements Runnable { private final Object lkOn = new Object(); // the object to lock on, private int count = 0; @Override public void run() { System.out.printf("[%s] begin\n", Thread.currentThread().getName()); for (int i = 0; i < ROUND_PER_THREAD; i++) { synchronized (lkOn) { System.out.printf("[%s] [%d] inc to: %d\n", Thread.currentThread().getName(), i, ++count); } try { Thread.sleep(INC_DELAY); // wait a while, } catch (InterruptedException e) { e.printStackTrace(); } } System.out.printf("[%s] end\n", Thread.currentThread().getName()); } public int getCount() { return count; } } }
Główny wątek będzie czekał na zakończenie wszystkich wątków w grupie.
źródło
Użyj tego w swoim głównym wątku: while (! Executor.isTerminated ()); Umieść ten wiersz kodu po uruchomieniu wszystkich wątków z usługi wykonawczej. Spowoduje to uruchomienie głównego wątku dopiero po zakończeniu wszystkich wątków uruchomionych przez executory. Upewnij się, że wywołujesz executor.shutdown (); przed powyższą pętlą.
źródło