W poniższym przykładowym kodzie chciałbym odzyskać zwracaną wartość funkcji worker
. Jak mogę to zrobić? Gdzie jest przechowywana ta wartość?
Przykładowy kod:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
Wynik:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
Nie mogę znaleźć odpowiedniego atrybutu w obiektach przechowywanych w jobs
.
multiprocessing.Queue
, a nieManager
tutaj. Użycie aManager
wymaga odrodzenia całkowicie nowego procesu, który jest przesadą, kiedyQueue
to zrobi.multiprocessing.Pool.map
do przetworzenia listy elementów pracy.args=(my_function_argument, )
. Zwróć uwagę na,
przecinek tutaj! W przeciwnym razie Python będzie narzekał na „brak argumentów pozycyjnych”. Zajęło mi 10 minut, żeby się domyślić. Sprawdź także użycie ręczne (w sekcji „klasa procesu”).Myślę, że podejście sugerowane przez @sega_sai jest lepsze. Ale naprawdę potrzebuje przykładu kodu, więc oto:
Które wydrukują zwracane wartości:
Jeśli znasz
map
(wbudowany w Python 2), nie powinno to być zbyt trudne. W przeciwnym razie spójrz na link sega_Sai .Zwróć uwagę, jak mało kodu jest potrzebne. (Zwróć również uwagę na to, jak procesy są ponownie wykorzystywane).
źródło
getpid()
zwrot ma tę samą wartość? Używam Python3pool.map
zakres 1 000 000 za pomocą więcej niż 10 procesów, widzę co najwyżej dwie różne stawki.pool.apply_async
: docs.python.org/3/library/…W tym przykładzie pokazano, jak korzystać z listy procesów wieloprocesowych. Instancje potoków, aby zwrócić ciągi z dowolnej liczby procesów:
Wynik:
Rozwiązanie to wykorzystuje mniej zasobów niż multiprocessing.Queue który używa
lub multiprocessing.SimpleQueue, który używa
Bardzo pouczające jest spojrzenie na źródło każdego z tych typów.
źródło
Pipe
na proces vs. jedenQueue
dla wszystkich procesów. Nie wiem, czy to we wszystkich przypadkach będzie bardziej wydajne.Z jakiegoś powodu nie mogłem znaleźć ogólnego przykładu, jak to zrobić w
Queue
dowolnym miejscu (nawet przykłady dokumentów Pythona nie spawnują wielu procesów), więc oto, co udało mi się wykonać po 10 próbach:Queue
to blokująca, bezpieczna dla wątków kolejka, której można użyć do przechowywania wartości zwracanych z procesów potomnych. Więc musisz przekazać kolejkę do każdego procesu. Coś mniej oczywiste jest to, że trzebaget()
z kolejki przed wamijoin
przezProcess
ES lub innego kolejka zapełnia się i blokuje wszystko.Aktualizacja dla osób zorientowanych obiektowo (testowane w Pythonie 3.4):
źródło
Dla każdego, kto szuka sposobu na uzyskanie wartości za
Process
pomocąQueue
:źródło
Queue
join()
queue.put(ret)
przed rozmową telefonicznąp.start()
? W takim przypadku wątek roboczy zawiesi się naqueue.get()
zawsze. Możesz to zreplikować, kopiując mój fragment powyżej podczas komentowaniaqueue.put(ret)
.queue.get()
musi to nastąpić przedp.join()
. To działa teraz dla mnie.Wydaje się, że zamiast tego należy użyć klasy multiprocessing.Pool i metod .apply () .apply_async (), map ()
http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult
źródło
Możesz użyć
exit
wbudowanego, aby ustawić kod zakończenia procesu. Można go uzyskać zexitcode
atrybutu procesu:Wynik:
źródło
Żwirowa pakiet ma ładny abstrakcji efektu dźwigni
multiprocessing.Pipe
, która sprawia, że ten dość prosty:Przykład z: https://pythonhosted.org/Pebble/#concurrent-decorators
źródło
Pomyślałem, że uproszczę najprostsze przykłady skopiowane z góry, pracując dla mnie na Py3.6. Najprostszy to
multiprocessing.Pool
:Możesz ustawić liczbę procesów w puli, używając np
Pool(processes=5)
. Domyślnie jest to jednak liczba procesorów, więc pozostaw puste dla zadań związanych z procesorem. (Zadania związane z operacjami we / wy często i tak odpowiadają wątkom, ponieważ wątki w większości czekają, więc mogą współużytkować rdzeń procesora).Pool
Stosuje również optymalizację porcji .(Należy zauważyć, że metody roboczej nie można zagnieździć w metodzie. Początkowo zdefiniowałem metodę roboczą w metodzie, która wywołuje wywołanie
pool.map
, aby zachować ją jako całość, ale potem procesy nie mogły jej zaimportować, i zgłosiło błąd „AttributeError” : Nie można marynować lokalnego obiektu outer_method..inner_method ". Więcej tutaj . Może być wewnątrz klasy.)(Doceń
'represent!'
raczej oryginalne pytanie zadane podczas drukowaniatime.sleep()
, ale bez niego pomyślałem, że jakiś kod działałby równolegle, gdy nie był).Py3
ProcessPoolExecutor
ma również dwie linie (.map
zwraca generator, więc potrzebujeszlist()
):Z prostymi
Process
es:Użyj,
SimpleQueue
jeśli wszystko czego potrzebujesz toput
iget
. Pierwsza pętla rozpoczyna wszystkie procesy, zanim druga wykonaqueue.get
wywołania blokujące . Nie wydaje mi się, żeby dzwonićp.join()
.źródło
Proste rozwiązanie:
Wynik:
źródło
Jeśli używasz języka Python 3, możesz użyć go
concurrent.futures.ProcessPoolExecutor
jako wygodnej abstrakcji:Wynik:
źródło
Zmodyfikowałem nieco odpowiedź vartec, ponieważ potrzebowałem uzyskać kody błędów z funkcji. (Dzięki vertec !!! to niesamowita sztuczka)
Można to również zrobić za pomocą,
manager.list
ale myślę, że lepiej jest mieć go w dykcie i przechowywać w nim listę. W ten sposób zachowujemy funkcję i wyniki, ponieważ nie jesteśmy pewni kolejności, w jakiej lista zostanie wypełniona.źródło