Kiedy biegnę coś takiego:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
to działa dobrze. Jednak umieszczając to jako funkcję klasy:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Daje mi następujący błąd:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Widziałem post Alexa Martelliego dotyczący tego samego problemu, ale nie był wystarczająco jasny.
python
multiprocessing
pickle
Mermoz
źródło
źródło
IPython.Parallel
środku, ale tam można było obejść ten problem, popychając obiekty do węzłów. Obejście tego problemu z przetwarzaniem wieloprocesowym wydaje się dość denerwujące.calculate
jest picklable, a więc wydaje się, że może to zostać rozwiązane przez: 1) tworzenia obiektów funkcji z konstruktora kopie nadcalculate
przykład, a następnie 2) przechodzącego wystąpienie tego obiektu funkcyjnej,Pool
jestmap
sposobem. Nie?multiprocessing
modułu wynikają z jego celu, jakim jest implementacja międzyplatformowa, oraz brakufork(2)
podobnego wywołania systemowego w systemie Windows. Jeśli nie zależy Ci na obsłudze Win32, może istnieć prostsze obejście oparte na procesach. Lub jeśli jesteś przygotowany do korzystania wątki zamiast procesów, można zastąpićfrom multiprocessing import Pool
zfrom multiprocessing.pool import ThreadPool as Pool
.Odpowiedzi:
Denerwowały mnie również ograniczenia dotyczące tego, jakie funkcje może akceptować pool.map. Napisałem następujące informacje, aby to obejść. Wydaje się, że działa, nawet przy rekurencyjnym użyciu parmap.
źródło
Nie mogłem użyć dotychczas opublikowanych kodów, ponieważ kody używające "multiprocessing.Pool" nie działają z wyrażeniami lambda, a kody nie używające "multiprocessing.Pool" generują tyle procesów, ile jest elementów pracy.
Dostosowałem kod tak, aby tworzył predefiniowaną liczbę pracowników i iterował listę wejściową tylko wtedy, gdy istnieje bezczynny pracownik. Włączyłem również tryb "demona" dla pracowników. Ctrl-c działa zgodnie z oczekiwaniami.
źródło
parmap
funkcją?(None, None)
jako ostatni element wskazujefun
, że osiągnął koniec sekwencji elementów dla każdego procesu.Wieloprocesowe przetwarzanie i wytrawianie jest zepsute i ograniczone, chyba że wyskoczysz poza standardową bibliotekę.
Jeśli używasz rozwidlenia
multiprocessing
wywołanegopathos.multiprocesssing
, możesz bezpośrednio używać klas i metod klas wmap
funkcjach wieloprocesorowych . Dzieje się tak, ponieważdill
jest używany zamiastpickle
lubcPickle
idill
może serializować prawie wszystko w Pythonie.pathos.multiprocessing
zapewnia również asynchroniczną funkcję mapy… i możemap
działać z wieloma argumentami (np.map(math.pow, [1,2,3], [4,5,6])
)Zobacz dyskusje: Co może zrobić wieloprocesorowość i koperek?
i: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
Obsługuje nawet kod, który napisałeś na początku, bez modyfikacji i od interpretera. Po co cokolwiek, co jest bardziej delikatne i specyficzne dla pojedynczego przypadku?
Pobierz kod tutaj: https://github.com/uqfoundation/pathos
I żeby pokazać, co potrafi:
źródło
amap
), która umożliwia korzystanie z pasków postępu i innego programowania asynchronicznego.Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
multiprocess
co jest kompatybilne w 2/3. Zobacz stackoverflow.com/questions/27873093/… i pypi.python.org/pypi/multiprocess .pathos
doczekał się nowej stabilnej wersji i jest również kompatybilny z 2.x i 3.x.O ile wiem, obecnie nie ma rozwiązania twojego problemu: funkcja, którą dajesz,
map()
musi być dostępna poprzez import twojego modułu. Dlatego działa kod Roberta: funkcjęf()
można uzyskać, importując następujący kod:Właściwie dodałem sekcję „main”, ponieważ jest ona zgodna z zaleceniami dla platformy Windows („Upewnij się, że moduł główny można bezpiecznie zaimportować przez nowy interpreter Pythona bez powodowania niezamierzonych efektów ubocznych”).
Dodałem również wielką literę przed
Calculate
, aby zastosować się do PEP 8 . :)źródło
Rozwiązanie mrule jest poprawne, ale zawiera błąd: jeśli dziecko odeśle dużą ilość danych, może wypełnić bufor potoku, blokując bufor dziecka
pipe.send()
, podczas gdy rodzic czeka, aż dziecko wyjdziepipe.join()
. Rozwiązaniem jest odczytanie danych dziecka przedjoin()
nim. Ponadto dziecko powinno zamknąć koniec rury rodzica, aby zapobiec zakleszczeniu. Poniższy kod rozwiązuje ten problem. Należy również pamiętać, żeparmap
tworzy to jeden proces na element wX
. Bardziej zaawansowanym rozwiązaniem jestmultiprocessing.cpu_count()
podzielenieX
na kilka fragmentów, a następnie scalenie wyników przed zwróceniem. Zostawiam to czytelnikowi jako ćwiczenie, aby nie zepsuć zwięzłości ładnej odpowiedzi muła. ;)źródło
OSError: [Errno 24] Too many open files
. Myślę, że muszą istnieć jakieś ograniczenia liczby procesów, aby działał poprawnie ...Ja też się z tym zmagałem. Miałem funkcje jako członkowie danych klasy, jako uproszczony przykład:
Musiałem użyć funkcji self.f w wywołaniu Pool.map () z tej samej klasy, a self.f nie przyjęło krotki jako argumentu. Ponieważ ta funkcja została osadzona w klasie, nie było dla mnie jasne, jak napisać typ opakowania, które sugerowały inne odpowiedzi.
Rozwiązałem ten problem, używając innego opakowania, które przyjmuje krotkę / listę, gdzie pierwszym elementem jest funkcja, a pozostałe elementy to argumenty tej funkcji, zwanej eval_func_tuple (f_args). Używając tego, problematyczna linia może zostać zastąpiona przez return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Oto pełny kod:
Plik: util.py
Plik: main.py
Uruchomienie main.py da [11, 22, 33]. Możesz to poprawić, na przykład eval_func_tuple można również zmodyfikować tak, aby przyjmował argumenty słów kluczowych.
Z drugiej strony, w innych odpowiedziach, funkcja „parmap” może być bardziej wydajna w przypadku większej liczby Procesów niż liczba dostępnych procesorów. Kopiuję edytowaną wersję poniżej. To jest mój pierwszy post i nie byłem pewien, czy powinienem bezpośrednio edytować oryginalną odpowiedź. Zmieniłem też nazwy niektórych zmiennych.
źródło
Wziąłem odpowiedź klausa se i aganders3 i stworzyłem udokumentowany moduł, który jest bardziej czytelny i przechowuje w jednym pliku. Możesz go po prostu dodać do swojego projektu. Ma nawet opcjonalny pasek postępu!
EDYCJA : Dodano sugestię @ alexander-mcfarlane i funkcję testową
źródło
join()
w tym samym czasie, a na wyświetlaczu pojawi się błysk100%
zakończeniatqdm
. Jedyny przypadek, w którym będzie to przydatne, to sytuacja, gdy każdy procesor ma tendencyjne obciążenie pracątqdm()
aby zawinąć linię:result = [q_out.get() for _ in tqdm(sent)]
i działa o wiele lepiej - wielki wysiłek, ale naprawdę to doceniam, więc +1Wiem, że zadawano to ponad 6 lat temu, ale chciałem tylko dodać moje rozwiązanie, ponieważ niektóre z powyższych sugestii wydają się strasznie skomplikowane, ale moje rozwiązanie było w rzeczywistości bardzo proste.
Wszystko, co musiałem zrobić, to zawinąć wywołanie pool.map () do funkcji pomocniczej. Przekazanie obiektu klasy wraz z argumentami dla metody jako krotki, która wyglądała trochę tak.
źródło
Funkcje zdefiniowane w klasach (nawet w funkcjach wewnątrz klas) tak naprawdę nie trawią. Jednak to działa:
źródło
Wiem, że to pytanie padło 8 lat i 10 miesięcy temu, ale chcę Wam przedstawić moje rozwiązanie:
Musisz tylko przekształcić swoją klasę w metodę statyczną. Ale jest to również możliwe z metodą klasową:
Przetestowano w Pythonie 3.7.3
źródło
Zmodyfikowałem metodę klaus se, ponieważ podczas pracy z małymi listami zawieszała się, gdy liczba pozycji wynosiła ~ 1000 lub więcej. Zamiast przesuwać zadania pojedynczo z
None
warunkiem zatrzymania, ładuję kolejkę wejściową naraz i po prostu pozwalam procesom miażdżyć ją, aż będzie pusta.Edycja: niestety teraz napotykam ten błąd w moim systemie: Limit maksymalnego rozmiaru kolejki wieloprocesowej wynosi 32767 , mam nadzieję, że obejścia tam pomogą.
źródło
Możesz uruchomić kod bez żadnych problemów, jeśli w jakiś sposób ręcznie zignorujesz
Pool
obiekt z listy obiektów w klasie, ponieważ nie jest on wpickle
stanie, jak mówi błąd. Możesz to zrobić za pomocą__getstate__
funkcji (patrz również tutaj ) w następujący sposób.Pool
Obiekt będzie starał się znaleźć__getstate__
i__setstate__
funkcje i wykonać je, jeśli uzna to po uruchomieniumap
,map_async
itp:Następnie wykonaj:
da ci wynik:
Przetestowałem powyższy kod w Pythonie 3.xi działa.
źródło
Nie jestem pewien, czy przyjęto takie podejście, ale obejście, którego używam, jest:
Wynik powinien być:
źródło
Istnieje możliwość, że zechcesz zastosować tę funkcję dla każdej innej instancji klasy. Oto rozwiązanie również na to
źródło
Oto moje rozwiązanie, które moim zdaniem jest nieco mniej hakerskie niż większość innych tutaj. Jest podobny do odpowiedzi Nightowl.
źródło
Od http://www.rueckstiess.net/research/snippets/show/ca1d7d90 i http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
Możemy utworzyć funkcję zewnętrzną i zapełnić ją obiektem klasy self:
LUB bez Joblib:
źródło
Może to nie jest zbyt dobre rozwiązanie, ale w moim przypadku rozwiązuję to w ten sposób.
Musiałem przejść
self
do mojej funkcji, ponieważ muszę uzyskać dostęp do atrybutów i funkcji mojej klasy za pośrednictwem tej funkcji. To działa dla mnie. Zawsze mile widziane są poprawki i sugestie.źródło
Oto szablon, który napisałem dla korzystania z wieloprocesorowej puli w python3, w szczególności python3.7.7 został użyty do uruchomienia testów. Moje najszybsze biegi uzyskałem przy użyciu
imap_unordered
. Po prostu podłącz swój scenariusz i wypróbuj go. Możesz użyćtimeit
lub po prostutime.time()
dowiedzieć się, co jest dla Ciebie najlepsze.W powyższym scenariuszu
imap_unordered
faktycznie wydaje się działać najgorzej dla mnie. Wypróbuj swoją obudowę i porównaj ją na maszynie, na której planujesz ją uruchomić. Przeczytaj także o pulach procesów . Twoje zdrowie!źródło