Czy byłoby możliwe utworzenie puli Pythona, która nie jest demonem? Chcę, aby pula mogła wywołać funkcję zawierającą inną pulę w środku.
Chcę tego, ponieważ procesy demona nie mogą tworzyć procesu. W szczególności spowoduje to błąd:
AssertionError: daemonic processes are not allowed to have children
Na przykład rozważmy scenariusz, w którym function_a
jest uruchomiona pula, function_b
która ma uruchomioną pulę function_c
. Ten łańcuch funkcji nie powiedzie się, ponieważ function_b
jest uruchamiany w procesie demona, a procesy demona nie mogą tworzyć procesów.
python
multiprocessing
pool
Maks
źródło
źródło
I want a pool to be able to call a function that has another pool inside
i jak to wpływa na fakt, że pracownicy są zdemonizowani.AssertionError: daemonic processes are not allowed to have children
Odpowiedzi:
multiprocessing.pool.Pool
Klasa tworzy procesy pracownik w jego__init__
metodzie, sprawia im demoniczny i uruchamia je, a to nie jest możliwe, aby ponownie ustawić swójdaemon
atrybutFalse
, zanim zostaną uruchomione (i potem nie wolno już). Możesz jednak utworzyć własną podklasęmultiprocesing.pool.Pool
(multiprocessing.Pool
jest to tylko funkcja opakowująca) i zastąpić własnąmultiprocessing.Process
podklasę, która jest zawsze nie-demoniczna, do wykorzystania w procesach roboczych.Oto pełny przykład, jak to zrobić. Ważnymi częściami są dwie klasy
NoDaemonProcess
iMyPool
na górze oraz do wywołaniapool.close()
ipool.join()
naMyPool
końcu Twojej instancji.#!/usr/bin/env python # -*- coding: UTF-8 -*- import multiprocessing # We must import this explicitly, it is not imported by the top-level # multiprocessing module. import multiprocessing.pool import time from random import randint class NoDaemonProcess(multiprocessing.Process): # make 'daemon' attribute always return False def _get_daemon(self): return False def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool # because the latter is only a wrapper function, not a proper class. class MyPool(multiprocessing.pool.Pool): Process = NoDaemonProcess def sleepawhile(t): print("Sleeping %i seconds..." % t) time.sleep(t) return t def work(num_procs): print("Creating %i (daemon) workers and jobs in child." % num_procs) pool = multiprocessing.Pool(num_procs) result = pool.map(sleepawhile, [randint(1, 5) for x in range(num_procs)]) # The following is not really needed, since the (daemon) workers of the # child's pool are killed when the child is terminated, but it's good # practice to cleanup after ourselves anyway. pool.close() pool.join() return result def test(): print("Creating 5 (non-daemon) workers and jobs in main process.") pool = MyPool(5) result = pool.map(work, [randint(1, 5) for x in range(5)]) pool.close() pool.join() print(result) if __name__ == '__main__': test()
źródło
multiprocessing.freeze_support()
MyPool
zamiast domyślnegoPool
? Innymi słowy, jakie koszty ponoszę w zamian za elastyczność uruchamiania procesów potomnych? (Gdyby nie było kosztów, przypuszczalnie standardPool
używałby procesów innych niż demoniczne).Pool
klasa została gruntownie zmieniona, więcProcess
nie jest już prostym atrybutem, ale metodą, która zwraca instancję procesu, którą pobiera z kontekstu . Próbowałem nadpisać tę metodę, aby zwrócićNoDaemonPool
wystąpienie, ale powoduje to wyjątek,AssertionError: daemonic processes are not allowed to have children
gdy używana jest pula.Musiałem zastosować nie-demoniczną pulę w Pythonie 3.7 i ostatecznie dostosowałem kod zamieszczony w zaakceptowanej odpowiedzi. Poniżej znajduje się fragment, który tworzy nie-demoniczną pulę:
import multiprocessing.pool class NoDaemonProcess(multiprocessing.Process): @property def daemon(self): return False @daemon.setter def daemon(self, value): pass class NoDaemonContext(type(multiprocessing.get_context())): Process = NoDaemonProcess # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool # because the latter is only a wrapper function, not a proper class. class NestablePool(multiprocessing.pool.Pool): def __init__(self, *args, **kwargs): kwargs['context'] = NoDaemonContext() super(NestablePool, self).__init__(*args, **kwargs)
Ponieważ obecna implementacja
multiprocessing
została gruntownie zreformowana, aby była oparta na kontekstach, musimy zapewnićNoDaemonContext
klasę, która maNoDaemonProcess
atrybut as.NestablePool
użyje wtedy tego kontekstu zamiast domyślnego.To powiedziawszy, powinienem ostrzec, że istnieją co najmniej dwa zastrzeżenia dotyczące tego podejścia:
multiprocessing
pakietu i dlatego może się zepsuć w dowolnym momencie.multiprocessing
których tak trudno jest używać procesów innych niż demoniczne, z których wiele zostało wyjaśnionych tutaj . Moim zdaniem najbardziej przekonujące jest:źródło
Wieloprocesorowe moduł ma piękny interfejs do korzystania z basenów z procesów lub wątków. W zależności od twojego obecnego przypadku użycia, możesz rozważyć użycie
multiprocessing.pool.ThreadPool
dla swojej puli zewnętrznej, co spowoduje powstanie wątków (które pozwolą na uruchamianie procesów od wewnątrz), w przeciwieństwie do procesów.Może to być ograniczone przez GIL, ale w moim konkretnym przypadku (testowałem oba) , czas uruchamiania procesów z zewnątrz, jakie zostały tutaj
Pool
utworzone , znacznie przewyższał rozwiązanieThreadPool
.To naprawdę łatwe do wymiany
Processes
dlaThreads
. Przeczytaj więcej o tym, jak korzystać zThreadPool
rozwiązania tutaj lub tutaj .źródło
W niektórych wersjach Pythona zastępujących standardowe Pool zwyczaju może podnieść błąd:
AssertionError: group argument must be None for now
.Tutaj znalazłem rozwiązanie, które może pomóc:
class NoDaemonProcess(multiprocessing.Process): # make 'daemon' attribute always return False @property def daemon(self): return False @daemon.setter def daemon(self, val): pass class NoDaemonProcessPool(multiprocessing.pool.Pool): def Process(self, *args, **kwds): proc = super(NoDaemonProcessPool, self).Process(*args, **kwds) proc.__class__ = NoDaemonProcess return proc
źródło
concurrent.futures.ProcessPoolExecutor
nie ma tego ograniczenia. Może mieć zagnieżdżoną pulę procesów bez żadnego problemu:from concurrent.futures import ProcessPoolExecutor as Pool from itertools import repeat from multiprocessing import current_process import time def pid(): return current_process().pid def _square(i): # Runs in inner_pool square = i ** 2 time.sleep(i / 10) print(f'{pid()=} {i=} {square=}') return square def _sum_squares(i, j): # Runs in outer_pool with Pool(max_workers=2) as inner_pool: squares = inner_pool.map(_square, (i, j)) sum_squares = sum(squares) time.sleep(sum_squares ** .5) print(f'{pid()=}, {i=}, {j=} {sum_squares=}') return sum_squares def main(): with Pool(max_workers=3) as outer_pool: for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)): print(f'{pid()=} {sum_squares=}') if __name__ == "__main__": main()
Powyższy kod demonstracyjny został przetestowany w Pythonie 3.8.
Ograniczeniem jest
ProcessPoolExecutor
jednak to, że nie mamaxtasksperchild
. Jeśli tego potrzebujesz, zamiast tego rozważ odpowiedź Massimiliano .Kredyt: odpowiedź jfs
źródło
multiprocessing.Pool
wewnątrz aProcessPoolExecutor.Pool
jest również możliwe!Problem, który napotkałem, polegał na próbie zaimportowania globali między modułami, powodując, że wiersz ProcessPool () był wielokrotnie oceniany.
globals.py
from processing import Manager, Lock from pathos.multiprocessing import ProcessPool from pathos.threading import ThreadPool class SingletonMeta(type): def __new__(cls, name, bases, dict): dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self return super(SingletonMeta, cls).__new__(cls, name, bases, dict) def __init__(cls, name, bases, dict): super(SingletonMeta, cls).__init__(name, bases, dict) cls.instance = None def __call__(cls,*args,**kw): if cls.instance is None: cls.instance = super(SingletonMeta, cls).__call__(*args, **kw) return cls.instance def __deepcopy__(self, item): return item.__class__.instance class Globals(object): __metaclass__ = SingletonMeta """ This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children The root cause is that importing this file from different modules causes this file to be reevalutated each time, thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug """ def __init__(self): print "%s::__init__()" % (self.__class__.__name__) self.shared_manager = Manager() self.shared_process_pool = ProcessPool() self.shared_thread_pool = ThreadPool() self.shared_lock = Lock() # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin
Następnie bezpiecznie zaimportuj z innego miejsca w kodzie
from globals import Globals Globals().shared_manager Globals().shared_process_pool Globals().shared_thread_pool Globals().shared_lock
Napisałem tutaj bardziej rozszerzoną klasę opakowującą
pathos.multiprocessing
:Na marginesie, jeśli Twój przypadek użycia wymaga tylko asynchronicznej mapy wieloprocesowej jako optymalizacji wydajności, to joblib będzie zarządzać wszystkimi pulami procesów w tle i zezwoli na tę bardzo prostą składnię:
squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
źródło
Widziałem ludzi zajmujących się tym problemem, używając
celery
rozwidleniamultiprocessing
zwanego bilardem (wieloprocesorowe rozszerzenia puli), które pozwala procesom demonicznym na tworzenie dzieci. Aby obejść ten problem, wystarczy wymienićmultiprocessing
moduł na:import billiard as multiprocessing
źródło
Stanowi to obejście problemu, gdy błąd jest pozornie fałszywie dodatni. Jak również zauważył James , może się to zdarzyć w przypadku niezamierzonego importu z procesu demonicznego.
Na przykład, jeśli masz następujący prosty kod,
WORKER_POOL
można go nieumyślnie zaimportować od pracownika, co prowadzi do błędu.import multiprocessing WORKER_POOL = multiprocessing.Pool()
Prostym, ale niezawodnym sposobem obejścia tego problemu jest:
import multiprocessing import multiprocessing.pool class MyClass: @property def worker_pool(self) -> multiprocessing.pool.Pool: # Ref: https://stackoverflow.com/a/63984747/ try: return self._worker_pool # type: ignore except AttributeError: # pylint: disable=protected-access self.__class__._worker_pool = multiprocessing.Pool() # type: ignore return self.__class__._worker_pool # type: ignore # pylint: enable=protected-access
W powyższym obejściu
MyClass.worker_pool
można użyć bez błędu. Jeśli uważasz, że to podejście można ulepszyć, daj mi znać.źródło