Dokumentacja multiprocessing
modułu pokazuje, jak przekazać kolejkę do procesu uruchomionego multiprocessing.Process
. Ale jak mogę współużytkować kolejkę z asynchronicznymi procesami roboczymi, od których rozpoczęto apply_async
? Nie potrzebuję dynamicznego łączenia ani niczego innego, tylko sposób, aby pracownicy (wielokrotnie) zgłaszali swoje wyniki z powrotem do bazy.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
To nie powiedzie się z:
RuntimeError: Queue objects should only be shared between processes through inheritance
. Rozumiem, co to oznacza, i rozumiem rady dotyczące dziedziczenia zamiast wytrawiania / usuwania wytrawiania (i wszystkie specjalne ograniczenia systemu Windows). Ale jak nie mogę przekazać kolejkę w sposób, który działa? Nie mogę znaleźć przykładu, a próbowałem kilku alternatyw, które zawiodły na różne sposoby. Prosimy o pomoc?
queue.Queue()
nie jest do tego odpowiednie?queue.Queue
został zbudowany do obsługi wątków, przy użyciu blokad w pamięci. W środowisku wieloprocesowym każdy podproces otrzymywałby własną kopięqueue.Queue()
instancji we własnej przestrzeni pamięci, ponieważ podprocesy nie współużytkują pamięci (w większości).multiprocessing.Pool
ma już udostępnioną kolejkę wyników, nie ma potrzeby dodatkowego angażowania plikuManager.Queue
.Manager.Queue
jestqueue.Queue
(kolejką wielowątkową) pod maską, zlokalizowaną na oddzielnym procesie serwerowym i ujawnioną przez serwery proxy. To dodaje dodatkowe obciążenie w porównaniu z kolejką wewnętrzną puli. W przeciwieństwie do natywnej obsługi wyników puli, wynikiManager.Queue
również nie mają gwarancji uporządkowania.Procesy robocze nie są uruchamiane
.apply_async()
, to już się dzieje podczas tworzenia instancjiPool
. Co się zaczęło, kiedy nazywająpool.apply_async()
to nowy „praca”. Procesymultiprocessing.pool.worker
robocze puli uruchamiają funkcję pod maską. Ta funkcja zajmuje się przetwarzaniem nowych "zadań" przenoszonych do wewnętrznej puliPool._inqueue
i wysyłaniem wyników z powrotem do rodzica przezPool._outqueue
. Twój określonyfunc
zostanie wykonany w ciągumultiprocessing.pool.worker
.func
musi tylkoreturn
coś, a wynik zostanie automatycznie odesłany do rodzica..apply_async()
natychmiast (asynchronicznie) zwracaAsyncResult
obiekt (alias dlaApplyResult
). Musisz wywołać.get()
(blokuje) ten obiekt, aby otrzymać rzeczywisty wynik. Inną opcją byłoby zarejestrowanie funkcji zwrotnej , która jest uruchamiana, gdy tylko wynik będzie gotowy.from multiprocessing import Pool def busy_foo(i): """Dummy function simulating cpu-bound work.""" for _ in range(int(10e6)): # do stuff pass return i if __name__ == '__main__': with Pool(4) as pool: print(pool._outqueue) # DEMO results = [pool.apply_async(busy_foo, (i,)) for i in range(10)] # `.apply_async()` immediately returns AsyncResult (ApplyResult) object print(results[0]) # DEMO results = [res.get() for res in results] print(f'result: {results}')
Przykładowe dane wyjściowe:
<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0> <multiprocessing.pool.ApplyResult object at 0x7fa12586da20> result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Uwaga: Określenie
timeout
parametru -parametru dla.get()
nie zatrzyma rzeczywistego przetwarzania zadania w ramach procesu roboczego, a jedynie odblokuje oczekującego rodzica, podnoszącmultiprocessing.TimeoutError
.źródło
error_callback
parametru -parametru dlaapply_async
, więc od tego czasu niewiele się zmieniło.