Nie można marynować <wpisz „instancemethod”> podczas korzystania z puliaprocesap.

218

Próbuję użyć multiprocessing„s Pool.map()funkcję podzielenie prac jednocześnie. Gdy używam następującego kodu, działa dobrze:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

Jednak gdy używam go w podejściu bardziej obiektowym, to nie działa. Wyświetlany komunikat o błędzie to:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Dzieje się tak, gdy następujące są mój program główny:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

a oto moja someClassklasa:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Czy ktoś wie, na czym polega problem lub jak łatwo go obejść?

ventolin
źródło
4
jeśli f jest funkcją zagnieżdżoną, występuje podobny błądPicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
ggg

Odpowiedzi:

122

Problem polega na tym, że przetwarzanie wieloprocesorowe musi zalewać procesy między procesami, a powiązane metody nie są możliwe do wyboru. Obejściem problemu (czy uważasz to za „łatwe” czy nie ;-) jest dodanie infrastruktury do programu, aby umożliwić wytrawianie takich metod, rejestrując je za pomocą standardowej metody bibliotecznej copy_reg .

Na przykład wkład Stevena Betharda w ten wątek (pod koniec wątku) pokazuje jedno doskonale wykonalne podejście, które pozwala na wytrawianie / usuwanie zbędnych metod copy_reg.

Alex Martelli
źródło
To wspaniale, dziękuję. Wygląda na to, że jakoś się posunął: przy użyciu kodu z pastebin.ca/1693348 otrzymuję teraz RuntimeError: przekroczona maksymalna głębokość rekurencji. Rozejrzałem się i jeden post na forum zalecił zwiększenie maksymalnej głębokości do 1500 (z domyślnego 1000), ale nie miałem tam radości. Szczerze mówiąc, nie widzę, która część (przynajmniej mojego kodu) mogłaby się wymknąć spod kontroli, chyba że z jakiegoś powodu kod wytrawia się i rozprasza w pętli, z powodu drobnych zmian, które wprowadziłem w celu wprowadzenia Kod Stevena OO'd?
ventolin
1
Twoje _pickle_methodzwroty self._unpickle_method, metoda związana; więc oczywiście marynata próbuje teraz marynować TO - i robi to tak, jak to kazałeś: dzwoniąc _pickle_methodrekurencyjnie. Tj. OOWprowadzając kod w ten sposób, nieuchronnie wprowadziłeś nieskończoną rekurencję. Sugeruję powrót do kodu Stevena (i nie oddawanie czci przy ołtarzu OO, gdy nie jest to właściwe: wiele rzeczy w Pythonie najlepiej zrobić w bardziej funkcjonalny sposób, i to jest jeden).
Alex Martelli
8
dla leniwych
Jan
15
Dla super super leniwych , zobacz jedyną odpowiedź, która zadała sobie trud, aby opublikować rzeczywisty niepoprawny kod ...
Cerin
2
Innym sposobem na naprawienie / obejście problemu wytrawiania jest użycie kopru, patrz moja odpowiedź stackoverflow.com/questions/8804830/…
rocksportrocker
74

Wszystkie te rozwiązania są brzydkie, ponieważ wieloprocesowość i wytrawianie są zepsute i ograniczone, chyba że wyjdziesz poza standardową bibliotekę.

Jeśli używasz rozwidlenia multiprocessingwywoływanych pathos.multiprocesssing, możesz bezpośrednio używać klas i metod klasowych w mapfunkcjach wieloprocesowych . Jest tak, ponieważ dilljest używany zamiast picklelub cPicklei dillmoże serializować prawie wszystko w Pythonie.

pathos.multiprocessingzapewnia także funkcję mapy asynchronicznej… i może mapdziałać z wieloma argumentami (np. map(math.pow, [1,2,3], [4,5,6]))

Zobacz: Co wieloprocesowość i koperek mogą zrobić razem?

i: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Mówiąc wprost, możesz zrobić dokładnie to, co chciałeś, i możesz to zrobić z tłumacza, jeśli chcesz.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Pobierz kod tutaj: https://github.com/uqfoundation/pathos

Mike McKerns
źródło
3
Czy możesz zaktualizować tę odpowiedź na podstawie pathos.pp, ponieważ pathos.multiprocessing już nie istnieje?
Saheel Godhane
10
Jestem pathosautorem Wersja, o której mowa, ma kilka lat. Wypróbuj wersję na github. Możesz użyć pathos.pplub github.com/uqfoundation/ppft .
Mike McKerns,
1
lub github.com/uqfoundation/pathos . @ SaheelGodhane: Nowe wydanie jest już dawno spóźnione, ale niedługo powinno zostać wydane.
Mike McKerns,
3
Najpierw pip install setuptoolswięc pip install git+https://github.com/uqfoundation/pathos.git@master. Otrzymasz odpowiednie zależności. Nowa wersja jest już prawie gotowa… teraz prawie wszystko pathosdziała również w systemie Windows i jest 3.xkompatybilna.
Mike McKerns
1
@Rika: Tak. dostępne są mapy blokujące, iteracyjne i asynchroniczne.
Mike McKerns
35

Możesz również zdefiniować __call__()metodę wewnątrz twojego someClass(), która wywołuje, someClass.go()a następnie przekazać instancję someClass()do puli. Ten obiekt jest zalewalny i działa dobrze (dla mnie) ...

dorvak
źródło
3
Jest to o wiele łatwiejsze niż technika zaproponowana przez Alexa Martellego, ale jesteś ograniczony do wysyłania tylko jednej metody na klasę do puli wieloprocesowej.
przestarzałe
6
Innym szczegółem, o którym należy pamiętać, jest to, że tylko obiekt (instancja klasy) jest trawiony, a nie sama klasa. Dlatego też, jeśli zmieniono jakiekolwiek atrybuty klasy z ich wartości domyślnych, zmiany te nie zostaną przeniesione do różnych procesów. Obejściem tego jest upewnienie się, że wszystko, czego potrzebuje twoja funkcja, jest przechowywane jako atrybut instancji.
przestarzałe
2
@dorvak, czy możesz podać prosty przykład __call__()? Myślę, że twoja odpowiedź może być czystsza - staram się zrozumieć ten błąd i po raz pierwszy przychodzę zobaczyć połączenie. Nawiasem mówiąc, także ta odpowiedź pomaga wyjaśnić, co robi wieloprocesowość: [ stackoverflow.com/a/20789937/305883]
user305883
1
Czy możesz podać przykład?
frmsaul
1
Jest nowy odpowiedź pisał (obecnie poniżej tej jednej) z przykładowym kodem do tego.
Aaron
22

Niektóre ograniczenia rozwiązania Stevena Betharda:

Po zarejestrowaniu metody klasy jako funkcji, niszczyciel klasy jest zadziwiająco wywoływany za każdym razem, gdy przetwarzanie metody jest zakończone. Więc jeśli masz 1 instancję swojej klasy, która wywołuje n razy jej metodę, członkowie mogą zniknąć między 2 uruchomieniami i możesz otrzymać komunikat malloc: *** error for object 0x...: pointer being freed was not allocated(np. Otwarty plik członka) lub pure virtual method called, terminate called without an active exception(co oznacza, że ​​czas życia obiektu członka, którego użyłem, był krótszy niż co myślałem). Mam to, gdy mam do czynienia z n większą niż wielkość puli. Oto krótki przykład:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Wynik:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

__call__Metoda nie jest więc równoważne, ponieważ [None, ...] są odczytywane z wynikami:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Żadna z obu metod nie jest satysfakcjonująca ...

Eric H.
źródło
7
Dostać Nonez powrotem, ponieważ definicja __call__brakuje return: powinien być return self.process_obj(i).
trek
1
@Eric Otrzymywałem ten sam błąd i wypróbowałem to rozwiązanie, ale zacząłem otrzymywać nowy błąd jako „cPickle.PicklingError: Can't pickle <type 'function'>: wbudowane wyszukiwanie atrybutów .funkcja nie powiodła się”. Czy wiesz, co może być tego przyczyną?
Naman
15

Istnieje inny skrót, którego możesz użyć, chociaż może być nieefektywny w zależności od tego, co znajduje się w twojej klasie.

Jak wszyscy powiedzieli, problem polega na tym, że multiprocessingkod musi zalewać rzeczy, które wysyła do podprocesów, które uruchomił, a moduł wybierający nie stosuje metod instancji.

Jednak zamiast wysyłać metodę instancji, możesz wysłać rzeczywistą instancję klasy oraz nazwę funkcji do wywołania, do zwykłej funkcji, która następnie używa getattrdo wywołania metody instancji, tworząc w ten sposób powiązaną metodę w Poolpodprocesie. Jest to podobne do definiowania __call__metody, z tą różnicą, że można wywołać więcej niż jedną funkcję składową.

Ukradłem kod @ EricH. Z jego odpowiedzi i trochę go opatrzyłem komentarzem (przepisałem go stąd wszystkie zmiany nazwy i takie, z jakiegoś powodu wydawało się to łatwiejsze niż wycinanie i wklejanie :-)) dla zilustrowania całej magii:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

Dane wyjściowe pokazują, że rzeczywiście, konstruktor jest wywoływany raz (w oryginalnym pid), a destruktor jest wywoływany 9 razy (raz dla każdej wykonanej kopii = 2 lub 3 razy na proces-pulę-proces, zależnie od potrzeby, plus raz w oryginale proces). Jest to często OK, tak jak w tym przypadku, ponieważ domyślny moduł wybierający tworzy kopię całej instancji i (częściowo) potajemnie wypełnia ją ponownie - w tym przypadku wykonując:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

- dlatego mimo że destruktor jest wywoływany osiem razy w trzech procesach roboczych, odlicza od 1 do 0 za każdym razem - ale oczywiście nadal możesz mieć kłopoty w ten sposób. W razie potrzeby możesz podać własne __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

na przykład w tym przypadku.

Trek
źródło
1
To zdecydowanie najlepsza odpowiedź na ten problem, ponieważ najłatwiej jest zastosować go do domyślnego zachowania niemożliwego do marynowania
Matt Taylor
12

Możesz również zdefiniować __call__()metodę wewnątrz twojego someClass(), która wywołuje, someClass.go()a następnie przekazać instancję someClass()do puli. Ten obiekt jest zalewalny i działa dobrze (dla mnie) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()
Paryż
źródło
3

Rozwiązanie z powyższego parisjohn działa dobrze ze mną. Ponadto kod wygląda na przejrzysty i łatwy do zrozumienia. W moim przypadku jest kilka funkcji, które można wywołać za pomocą Pool, więc zmodyfikowałem nieco kod parisjohn. Wykonałem wywołanie, aby móc wywołać kilka funkcji, a nazwy funkcji są przekazywane w argumencie dict z go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()
neobot
źródło
1

Potencjalnie trywialnym rozwiązaniem tego problemu jest przejście na używanie multiprocessing.dummy. Jest to implementacja interfejsu wieloprocesowego oparta na wątkach, która nie wydaje się mieć tego problemu w Pythonie 2.7. Nie mam tutaj dużego doświadczenia, ale ta szybka zmiana importu pozwoliła mi wywołać metodę Apply_async w metodzie klasy.

Kilka dobrych zasobów na multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

David Parks
źródło
1

W tym prostym przypadku, w którym someClass.fnie dziedziczy się żadnych danych z klasy i nie dołącza niczego do klasy, możliwym rozwiązaniem byłoby oddzielenie f, aby można je było marynować:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))
mhh
źródło
1

Dlaczego nie użyć osobnego func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)
0script0
źródło
1

Natknąłem się na ten sam problem, ale odkryłem, że istnieje koder JSON, którego można używać do przenoszenia tych obiektów między procesami.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Użyj tego, aby utworzyć listę:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Następnie w funkcji mapowanej użyj tego do odzyskania obiektu:

pfVmomiObj = json.loads(jsonSerialized)
Jerzy
źródło
0

Aktualizacja: na dzień pisania tego tekstu można pobrać nazwane krotki (począwszy od Pythona 2.7)

Problem w tym, że procesy potomne nie są w stanie zaimportować klasy obiektu - w tym przypadku klasy P - w przypadku projektu wielomodelowego klasa P powinna być importowana wszędzie tam, gdzie proces potomny zostanie wykorzystany

szybkim obejściem jest umożliwienie importu poprzez wpływanie na globals ()

globals()["P"] = P
rachid el kedmiri
źródło