wielowątkowość w Pythonie czeka na zakończenie wszystkich wątków

119

Mogłem o to zapytać w podobnym kontekście, ale nie mogłem znaleźć odpowiedzi po około 20 minutach poszukiwań, więc zapytam.

Napisałem skrypt w Pythonie (powiedzmy: scriptA.py) i skrypt (powiedzmy scriptB.py)

W skrypcie B chcę wywołać skryptA wiele razy z różnymi argumentami, za każdym razem uruchomienie trwa około godziny (jest to ogromny skrypt, robi wiele rzeczy ... nie martw się o to) i chcę mieć możliwość uruchomienia scriptA ze wszystkimi różnymi argumentami jednocześnie, ale przed kontynuowaniem muszę poczekać, aż WSZYSTKIE zostaną wykonane; mój kod:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

Chcę uruchomić wszystko subprocess.call()w tym samym czasie, a następnie poczekać, aż wszystkie skończą , jak mam to zrobić?

Próbowałem użyć wątków, jak na przykładzie tutaj :

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

Ale nie sądzę, żeby to było słuszne.

Skąd mam wiedzieć, że wszyscy skończyli biegać przed pójściem do mojego do_finish()?

Inbar Rose
źródło

Odpowiedzi:

150

Na końcu skryptu należy użyć metody łączeniaThread obiektu.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Zatem główny wątek będzie czekać aż t1, t2i t3zakończyć działanie.

Maksim Skurydzin
źródło
5
hmmm - masz problem ze zrozumieniem czegoś, czy to najpierw nie uruchomi t1, poczekaj do jego zakończenia, a potem przejdź do t2..etc itd. jak sprawić, by to wszystko stało się naraz? nie widzę, jak to je uruchomi w tym samym czasie?
Inbar Rose
25
Wywołanie joinbloków do momentu zakończenia wykonywania wątku. I tak będziesz musiał poczekać na wszystkie wątki. Jeśli t1skończysz jako pierwszy, zaczniesz czekać t2(co może być już zakończone i natychmiast zaczniesz czekać t3). Jeśli wykonanie t1zajęło najwięcej czasu, po powrocie z niego oba t1i t2powrócą natychmiast bez blokowania.
Maksim Skurydzin
1
nie rozumiesz mojego pytania - jeśli skopiuję powyższy kod do swojego kodu - czy zadziała? czy coś mi brakuje?
Inbar Rose
2
ok rozumiem. teraz rozumiem, byłem trochę zdezorientowany, ale myślę, że rozumiem, w joinpewnym sensie dołącza bieżący proces do wątku i czeka, aż się skończy, a jeśli t2 zakończy się przed t1, to po zakończeniu t1 sprawdzi, czy t2 jest zakończony. że tak jest, a następnie sprawdź t3..etc..etc .. i tylko wtedy, gdy wszystko jest zrobione, będzie kontynuowane. niesamowite.
Inbar Rose
3
powiedzmy, że t1 trwa najdłużej, ale t2 ma wyjątek. Co się wtedy stanie? możesz złapać ten wyjątek lub sprawdzić, czy t2 zakończył się dobrze, czy nie?
Ciprian Tomoiagă
174

Umieść wątki na liście, a następnie użyj metody Join

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Aaron Digulla
źródło
1
Tak, to by działało, ale trudniej to zrozumieć. Należy zawsze starać się znaleźć równowagę między kompaktowym kodem a „czytelnością”. Pamiętaj: kod jest napisany raz, ale czytany wiele razy. Dlatego ważniejsze jest, aby było to łatwe do zrozumienia.
Aaron Digulla
2
„Wzorzec fabryczny” nie jest czymś, co mogę wyjaśnić jednym zdaniem. Google dla tego i wyszukaj stackoverflow.com. Istnieje wiele przykładów i wyjaśnień. Krótko mówiąc: piszesz kod, który buduje dla Ciebie coś złożonego. Jak w prawdziwej fabryce: składasz zamówienie i otrzymujesz gotowy produkt z powrotem.
Aaron Digulla
18
Nie podoba mi się pomysł używania funkcji rozumienia list ze względu na skutki uboczne i nie robienia niczego pożytecznego z listą wyników. Prosta pętla for byłaby czystsza, nawet gdyby rozciągała się w dwóch rzędach ...
Ioan Alexandru Cucu,
1
@Aaron DIgull Rozumiem to. Mam na myśli to, że po prostu zrobiłbym to, for x in threads: x.join()a nie używałbym Comprehantion
Ioan Alexandru Cucu
1
@IoanAlexandruCucu: Nadal zastanawiam się, czy istnieje bardziej czytelne i wydajne rozwiązanie: stackoverflow.com/questions/21428602/ ...
Aaron Digulla
29

W Pythonie3, od Pythona 3.2, jest nowe podejście do osiągnięcia tego samego wyniku, który osobiście wolę od tradycyjnego tworzenia wątku / start / dołączanie, pakiet concurrent.futures: https://docs.python.org/3/library/concurrent.futures .html

Użycie ThreadPoolExecutorkodu wyglądałoby tak:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

Wynik poprzedniego kodu jest podobny do:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

Jedną z zalet jest to, że można kontrolować przepływność, ustawiając maksymalną liczbę równoczesnych pracowników.

Roberto
źródło
ale jak możesz stwierdzić, że wszystkie wątki w puli wątków się skończyły?
Prime By Design,
1
Jak widać w przykładzie, kod po withinstrukcji jest wykonywany po zakończeniu wszystkich zadań.
Roberto
to nie działa. Spróbuj zrobić coś naprawdę długiego w wątkach. Twoja instrukcja print zostanie wykonana przed zakończeniem wątku
Pranalee
@Pranalee, Ten kod działa, zaktualizowałem kod, aby dodać wiersze wyjściowe. Nie możesz zobaczyć „Wszystkie zadania ...” przed zakończeniem wszystkich wątków. Tak właśnie withdziała instrukcja w tym przypadku. W każdym razie, zawsze możesz otworzyć nowe pytanie w SO i wysłać swój kod, abyśmy mogli Ci pomóc dowiedzieć się, co się dzieje w Twojej sprawie.
Roberto,
@PrimeByDesign możesz użyć concurrent.futures.waitfunkcji, prawdziwy przykład możesz zobaczyć tutaj Oficjalna dokumentacja: docs.python.org/3/library/ ...
Alexander Fortin
28

Preferuję rozumienie list na podstawie listy wejściowej:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Adam Matan
źródło
Zaznaczona odpowiedź dobrze wyjaśnia, ale ta jest krótsza i nie wymaga brzydkich powtórzeń. Po prostu ładna odpowiedź. :)
tleb
Zrozumienie listy tylko dla skutków ubocznych jest zwykle amortyzowane *. Ale w tym przypadku wydaje się, że to dobry pomysł. * stackoverflow.com/questions/5753597/…
Vinayak Kaniyarakkal
3
@VinayakKaniyarakkal for t in threads:t.start()czy nie jest lepiej?
SmartManoj
5

Możesz mieć klasę podobną do poniżej, z której możesz dodać 'n' liczbę funkcji lub skryptów console_scripts, które chcesz wykonać równolegle, i rozpocząć wykonywanie i poczekać na zakończenie wszystkich zadań.

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
PBD
źródło
To jest wieloprocesowość. Pytanie dotyczyło docs.python.org/3/library/threading.html
Rustam A.,
3

Z threading dokumentacji modułu

Istnieje obiekt „wątek główny”; odpowiada to początkowemu wątkowi kontroli w programie w języku Python. To nie jest wątek demona.

Istnieje możliwość, że zostaną utworzone „fikcyjne obiekty wątku”. Są to obiekty wątków odpowiadające „obcym wątkom”, które są wątkami kontroli uruchomionymi poza modułem wątkowym, na przykład bezpośrednio z kodu C. Fikcyjne obiekty wątku mają ograniczoną funkcjonalność; są zawsze uważani za żywych i demonicznych i nie można ich join()edytować. Nigdy nie są usuwane, ponieważ niemożliwe jest wykrycie zakończenia obcych wątków.

Tak więc, aby uchwycić te dwa przypadki, gdy nie jesteś zainteresowany utrzymywaniem listy utworzonych wątków:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Po czym:

>>> print(data)
[0, 4, 12, 40]
berna1111
źródło
2

Może coś takiego

for t in threading.enumerate():
    if t.daemon:
        t.join()
jno
źródło
Wypróbowałem ten kod, ale nie jestem pewien, czy działa, ponieważ została wydrukowana ostatnia instrukcja mojego kodu, która była po tej pętli for i nadal proces nie został zakończony.
Omkar
1

Właśnie natknąłem się na ten sam problem, w którym musiałem poczekać na wszystkie wątki, które zostały utworzone za pomocą pętli for. Właśnie wypróbowałem następujący fragment kodu Może nie jest to idealne rozwiązanie, ale pomyślałem, że będzie to proste rozwiązanie testować:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Omkar
źródło