Mam skrypt, który pomyślnie wykonuje zestaw zadań z puli wieloprocesorowej z imap_unordered()
wywołaniem:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Jednak mój num_tasks
wynosi około 250 000, więc join()
blokuje główny wątek na około 10 sekund i chciałbym móc stopniowo wyświetlać echo do wiersza poleceń, aby pokazać, że główny proces nie jest zablokowany. Coś jak:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Czy istnieje metoda dla obiektu wynikowego lub samej puli, która wskazuje liczbę pozostałych zadań? Próbowałem użyć multiprocessing.Value
obiektu jako licznika ( do_work
wywołuje counter.value += 1
akcję po wykonaniu swojego zadania), ale licznik osiąga tylko ~ 85% całkowitej wartości przed zatrzymaniem zwiększania.
python
multiprocessing
Północ, Błyskawica
źródło
źródło
def do_word(*a): time.sleep(.1)
jako przykładu. Jeśli to nie zadziała, stwórz kompletny minimalny kod, który demonstruje Twój problem: opisz za pomocą słów, czego się spodziewasz i co się stanie, wspomnij, jak uruchamiasz skrypt Pythona, jaki jest Twój system operacyjny, wersja Pythona i opublikuj jako nowe pytanie .Pool.map()
. Nie zdawałem sobie sprawy, że tylkoimap()
iimap_unordered()
działam w ten sposób - w dokumentacji po prostu jest napisane „Leniwiejsza wersja map ()”, ale tak naprawdę oznacza, że „bazowy iterator zwraca wyniki, gdy przychodzą”.imap_unordered()
. Problem Hanana jest prawdopodobnie spowodowanysys.stderr.write('\r..')
(nadpisaniem tej samej linii, aby pokazać postęp).Mój osobisty ulubiony - daje ci ładny mały pasek postępu i szacowany czas ukończenia, podczas gdy rzeczy działają i zatwierdzają się równolegle.
źródło
pip install tqdm
Odkryłem, że praca była już wykonana, zanim spróbowałem sprawdzić postęp. To właśnie zadziałało dla mnie przy użyciu tqdm .
pip install tqdm
Powinno to działać ze wszystkimi odmianami przetwarzania wieloprocesowego, niezależnie od tego, czy blokują, czy nie.
źródło
Znalazłem odpowiedzi z siebie trochę więcej kopanie: Biorąc okiem na
__dict__
tegoimap_unordered
obiektu wynikowego, I okazało się, że ma_index
atrybut przyrosty z każdym zakończeniu zadania. Więc to działa dla logowania, zapakowane wwhile
pętlę:Jednak okazało się, że zamiana
imap_unordered
na amap_async
spowodowała znacznie szybsze wykonanie, chociaż obiekt wynikowy jest nieco inny. Zamiast tego obiekt result frommap_async
ma_number_left
atrybut iready()
metodę:źródło
rs
jest znana i jest trochę spóźniona, czy nie?rs
zostały już uruchomione inne wątki.rs
żadnej pętli, nowicjusz przetwarzam wieloprocesorowo i to by pomogło. Dziękuję Ci bardzo.python 3.5
rozwiązaniu wykorzystującym_number_left
nie działa._number_left
reprezentuje fragmenty, które pozostają do przetworzenia. Na przykład, jeśli chcę, aby 50 elementów było przekazywanych równolegle do mojej funkcji, to dla puli wątków z 3 procesami_map_async()
tworzy 10 fragmentów po 5 elementów każdy._number_left
następnie przedstawia, ile z tych fragmentów zostało ukończonych.Wiem, że to dość stare pytanie, ale oto, co robię, gdy chcę śledzić postęp puli zadań w Pythonie.
Zasadniczo używasz apply_async z callbakiem (w tym przypadku jest to dodanie zwróconej wartości do listy), więc nie musisz czekać, aby zrobić coś innego. Następnie w ramach pętli while sprawdzasz postęp pracy. W tym przypadku dodałem widżet, aby wyglądał ładniej.
Wyjście:
Mam nadzieję, że to pomoże.
źródło
[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
dla(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
Zgodnie z sugestią Tima możesz użyć
tqdm
iimap
rozwiązać ten problem. Właśnie natknąłem się na ten problem i ulepszyłemimap_unordered
rozwiązanie, aby uzyskać dostęp do wyników mapowania. Oto jak to działa:Jeśli nie interesują Cię wartości zwracane z Twoich zadań, nie musisz przypisywać listy do żadnej zmiennej.
źródło
dla każdego, kto szuka prostego rozwiązania współpracującego z
Pool.apply_async()
:źródło
Stworzyłem własną klasę, aby wydrukować postęp. Maby to pomaga:
źródło
Wypróbuj to proste podejście oparte na kolejkach, które można również zastosować w przypadku puli. Pamiętaj, że wydrukowanie czegokolwiek po zainicjowaniu paska postępu spowoduje jego przesunięcie, przynajmniej dla tego konkretnego paska postępu. (Postęp PyPI 1.5)
źródło