Pokazywać postęp wywołania imap_unordered puli wieloprocesorowej Pythona?

96

Mam skrypt, który pomyślnie wykonuje zestaw zadań z puli wieloprocesorowej z imap_unordered()wywołaniem:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

Jednak mój num_taskswynosi około 250 000, więc join()blokuje główny wątek na około 10 sekund i chciałbym móc stopniowo wyświetlać echo do wiersza poleceń, aby pokazać, że główny proces nie jest zablokowany. Coś jak:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

Czy istnieje metoda dla obiektu wynikowego lub samej puli, która wskazuje liczbę pozostałych zadań? Próbowałem użyć multiprocessing.Valueobiektu jako licznika ( do_workwywołuje counter.value += 1akcję po wykonaniu swojego zadania), ale licznik osiąga tylko ~ 85% całkowitej wartości przed zatrzymaniem zwiększania.

Północ, Błyskawica
źródło

Odpowiedzi:

80

Nie ma potrzeby uzyskiwania dostępu do prywatnych atrybutów zestawu wyników:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
jfs
źródło
7
Wydruk widzę dopiero po wyjściu kodu (nie w każdej iteracji). Masz jakąś sugestię?
Hanan Shteingart
@HananShteingart: Działa dobrze w moim systemie (Ubuntu) zarówno z Pythonem 2, jak i 3. Użyłem def do_word(*a): time.sleep(.1)jako przykładu. Jeśli to nie zadziała, stwórz kompletny minimalny kod, który demonstruje Twój problem: opisz za pomocą słów, czego się spodziewasz i co się stanie, wspomnij, jak uruchamiasz skrypt Pythona, jaki jest Twój system operacyjny, wersja Pythona i opublikuj jako nowe pytanie .
jfs
14
Miałem ten sam problem, co @HananShteingart: to dlatego, że próbowałem użyć Pool.map(). Nie zdawałem sobie sprawy, że tylko imap() i imap_unordered()działam w ten sposób - w dokumentacji po prostu jest napisane „Leniwiejsza wersja map ()”, ale tak naprawdę oznacza, że ​​„bazowy iterator zwraca wyniki, gdy przychodzą”.
simonmacmullen
@simonmacmullen: zarówno pytanie, jak i moja odpowiedź imap_unordered(). Problem Hanana jest prawdopodobnie spowodowany sys.stderr.write('\r..')(nadpisaniem tej samej linii, aby pokazać postęp).
jfs
2
Również możliwe! Głównie chciałem udokumentować głupie założenie, które poczyniłem - na wypadek, gdyby ktoś inny to czytał.
simonmacmullen
94

Mój osobisty ulubiony - daje ci ładny mały pasek postępu i szacowany czas ukończenia, podczas gdy rzeczy działają i zatwierdzają się równolegle.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass
Tim
źródło
64
co jeśli pula zwróci wartość?
Nickpick
11
Utworzyłem pustą listę o nazwie result przed pętlą, a następnie wewnątrz pętli po prostu wykonaj result.append (x). Wypróbowałem to z 2 procesami i użyłem imap zamiast mapy i wszystko działało tak, jak chciałem @nickpick
bs7280
2
więc mój pasek postępu iteruje do nowych wierszy zamiast przesuwać się w miejscu, masz jakiś pomysł, dlaczego tak się dzieje?
Austin
2
nie zapomnijciepip install tqdm
Mr. T
3
@ bs7280 Czy przez result.append (x) miałeś na myśli wynik.append (_)? Co to jest X?
jason
27

Odkryłem, że praca była już wykonana, zanim spróbowałem sprawdzić postęp. To właśnie zadziałało dla mnie przy użyciu tqdm .

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

Powinno to działać ze wszystkimi odmianami przetwarzania wieloprocesowego, niezależnie od tego, czy blokują, czy nie.

reubano
źródło
4
Myślę, że tworzy kilka wątków, a każdy wątek liczy się niezależnie
nburn42
1
Mam funkcje w ramach funkcji, które powodują błąd wytrawiania.
ojunk
21

Znalazłem odpowiedzi z siebie trochę więcej kopanie: Biorąc okiem na __dict__tego imap_unorderedobiektu wynikowego, I okazało się, że ma _indexatrybut przyrosty z każdym zakończeniu zadania. Więc to działa dla logowania, zapakowane w whilepętlę:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

Jednak okazało się, że zamiana imap_unorderedna a map_asyncspowodowała znacznie szybsze wykonanie, chociaż obiekt wynikowy jest nieco inny. Zamiast tego obiekt result from map_asyncma _number_leftatrybut i ready()metodę:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)
Północ, Błyskawica
źródło
3
Przetestowałem to dla Pythona 2.7.6, a rs._number_left wydaje się być liczbą pozostałych fragmentów. Więc jeśli rs._chunksize nie jest 1, to rs._number_left nie będzie liczbą pozostałych elementów listy.
Allen
Gdzie mam umieścić ten kod? Chodzi mi o to, że nie jest to wykonywane, dopóki treść nie rsjest znana i jest trochę spóźniona, czy nie?
Wakan Tanka
@WakanTanka: Pojawia się w głównym skrypcie po odłączeniu dodatkowych wątków. W moim oryginalnym przykładzie przechodzi w pętlę „while”, w której rszostały już uruchomione inne wątki.
MidnightLightning
1
Czy mógłbyś zmienić swoje pytanie i / lub odpowiedź, aby pokazać minimalny przykład roboczy. Nie widzę rsżadnej pętli, nowicjusz przetwarzam wieloprocesorowo i to by pomogło. Dziękuję Ci bardzo.
Wakan Tanka
1
Przynajmniej w python 3.5rozwiązaniu wykorzystującym _number_leftnie działa. _number_leftreprezentuje fragmenty, które pozostają do przetworzenia. Na przykład, jeśli chcę, aby 50 elementów było przekazywanych równolegle do mojej funkcji, to dla puli wątków z 3 procesami _map_async()tworzy 10 fragmentów po 5 elementów każdy. _number_leftnastępnie przedstawia, ile z tych fragmentów zostało ukończonych.
mSSM
9

Wiem, że to dość stare pytanie, ale oto, co robię, gdy chcę śledzić postęp puli zadań w Pythonie.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

Zasadniczo używasz apply_async z callbakiem (w tym przypadku jest to dodanie zwróconej wartości do listy), więc nie musisz czekać, aby zrobić coś innego. Następnie w ramach pętli while sprawdzasz postęp pracy. W tym przypadku dodałem widżet, aby wyglądał ładniej.

Wyjście:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Mam nadzieję, że to pomoże.

Julien Tourille
źródło
muszę zmienić: [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]dla(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
David Przybilla
To nieprawda. Obiekt generatora nie będzie tutaj działał. Sprawdzone.
swagatam
9

Zgodnie z sugestią Tima możesz użyć tqdmi imaprozwiązać ten problem. Właśnie natknąłem się na ten problem i ulepszyłem imap_unorderedrozwiązanie, aby uzyskać dostęp do wyników mapowania. Oto jak to działa:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

Jeśli nie interesują Cię wartości zwracane z Twoich zadań, nie musisz przypisywać listy do żadnej zmiennej.

mrapacz
źródło
4

dla każdego, kto szuka prostego rozwiązania współpracującego z Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]
zeawoas
źródło
3

Stworzyłem własną klasę, aby wydrukować postęp. Maby to pomaga:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results
Aronstef
źródło
1

Wypróbuj to proste podejście oparte na kolejkach, które można również zastosować w przypadku puli. Pamiętaj, że wydrukowanie czegokolwiek po zainicjowaniu paska postępu spowoduje jego przesunięcie, przynajmniej dla tego konkretnego paska postępu. (Postęp PyPI 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()
Mott The Tuple
źródło