Wieloprocesowe przetwarzanie w Pythonie PicklingError: Can't pickle <type 'function'>

243

Przykro mi, że nie mogę odtworzyć błędu na prostszym przykładzie, a mój kod jest zbyt skomplikowany, aby go opublikować. Jeśli uruchomię program w powłoce IPython zamiast zwykłego Pythona, wszystko zadziała dobrze.

Przejrzałem kilka wcześniejszych notatek na ten temat. Wszystkie zostały spowodowane przez użycie puli do wywołania funkcji zdefiniowanej w funkcji klasy. Ale tak nie jest w moim przypadku.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Byłbym wdzięczny za wszelką pomoc.

Aktualizacja : Funkcja I marynowana jest zdefiniowana na najwyższym poziomie modułu. Chociaż wywołuje funkcję, która zawiera funkcję zagnieżdżoną. tzn. f()wywołuje g()wywołania, h()które mają funkcję zagnieżdżoną i(), a ja dzwonię pool.apply_async(f). f(), g(), h()Są określone na poziomie górnym. Próbowałem prostszego przykładu z tym wzorem i to działa.

Wendeta
źródło
3
Odpowiedź najwyższego poziomu / zaakceptowana jest dobra, ale może to oznaczać konieczność przebudowania kodu, co może być bolesne. Poleciłbym każdemu, kto ma ten problem, aby również przeczytał dodatkowe odpowiedzi wykorzystujące dilli pathos. Jednak nie mam szczęścia do żadnego z rozwiązań podczas pracy z vtkobjects :( Ktoś zdążył uruchomić kod python w równoległym przetwarzaniu vtkPolyData?
Chris

Odpowiedzi:

305

Oto lista tego, co można marynować . W szczególności funkcje można wybierać tylko wtedy, gdy są zdefiniowane na najwyższym poziomie modułu.

Ten fragment kodu:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

zwraca błąd prawie identyczny z tym, który opublikowałeś:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Problem polega na tym, że poolwszystkie metody używają a mp.SimpleQueuedo przekazywania zadań do procesów roboczych. Wszystko, co przechodzi, mp.SimpleQueuemusi być możliwe do odebrania i foo.worknie jest możliwe do odebrania, ponieważ nie jest zdefiniowane na najwyższym poziomie modułu.

Można to naprawić, definiując funkcję na najwyższym poziomie, która wywołuje foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Zauważ, że foojest do odebrania, ponieważ Foojest zdefiniowany na najwyższym poziomie i foo.__dict__można go odebrać.

unutbu
źródło
2
Dzięki za odpowiedź. Zaktualizowałem swoje pytanie. Ale nie sądzę, że to jest przyczyna
Vendetta,
7
Aby uzyskać błąd PicklingError, w kolejce należy umieścić coś, czego nie można odebrać. Może to być funkcja lub jej argumenty. Aby dowiedzieć się więcej na temat problemu, proponuję wykonać kopię programu i rozpocząć parowanie, dzięki czemu będzie on coraz prostszy, za każdym razem ponownie uruchamiając program, aby sprawdzić, czy problem nadal występuje. Kiedy stanie się to naprawdę proste, albo sam odkryjesz problem, albo będziesz mieć coś, co możesz tutaj opublikować.
unutbu
3
Ponadto: jeśli zdefiniujesz funkcję na najwyższym poziomie modułu, ale jest ona dekorowana, wówczas odniesienie będzie do wyjścia dekoratora i i tak otrzymasz ten błąd.
bobpoekert
5
Dopiero późno o 5 lat, ale właśnie na to wpadłem. Okazuje się, że „najwyższy poziom” należy traktować bardziej dosłownie niż zwykle: wydaje mi się, że definicja funkcji musi poprzedzać inicjalizację puli (tj. pool = Pool()Wiersz tutaj ). Nie spodziewałem się tego, i to może być powód, dla którego problem OP nadal występował.
Andras Deak
4
W szczególności funkcje można wybierać tylko wtedy, gdy są zdefiniowane na najwyższym poziomie modułu. Wydaje się, że wynik zastosowania functool.partialdo funkcji najwyższego poziomu jest również możliwy do marynowania, nawet jeśli jest zdefiniowany w innej funkcji.
user1071847,
96

Użyłbym pathos.multiprocesssingzamiast multiprocessing. pathos.multiprocessingjest rozwidleniem multiprocessingtych zastosowań dill. dillpotrafi serializować prawie wszystko w Pythonie, więc możesz wysłać o wiele więcej równolegle. pathosWidelec posiada również możliwość pracy bezpośrednio z wieloma funkcjami argumentów, ile potrzeba do metod klasy.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Pobierz pathos(i jeśli chcesz dill) tutaj: https://github.com/uqfoundation

Mike McKerns
źródło
5
pracował przysmak. Dla wszystkich innych zainstalowałem obie biblioteki przez: sudo pip install git+https://github.com/uqfoundation/dill.git@masterisudo pip install git+https://github.com/uqfoundation/pathos.git@master
Alexander McFarlane
5
@AlexanderMcFarlane Nie instalowałbym pakietów Pythona sudo(szczególnie ze źródeł zewnętrznych, takich jak github). Zamiast tego polecam uruchomić:pip install --user git+...
Chris,
Korzystanie po prostu pip install pathosnie działa niestety i daje następujący komunikat:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple
11
pip install pathosteraz działa i pathosjest kompatybilny z Python 3.
Mike McKerns,
3
@DanielGoldfarb: multiprocessjest rozwidleniem multiprocessinggdzie dillzastąpił picklew kilku miejscach w kodzie ... ale w zasadzie to wszystko. pathoszapewnia dodatkowe warstwy API, multiprocessa także ma dodatkowe zaplecza. Ale to jest sedno tego.
Mike McKerns,
29

Jak powiedzieli inni, multiprocessingmogą przenosić tylko obiekty Pythona do procesów roboczych, które mogą być marynowane. Jeśli nie możesz reorganizować kodu zgodnie z opisem unutbu, możesz skorzystać z dillrozszerzonych funkcji wytrawiania / odznaczania do przesyłania danych (szczególnie danych kodu), jak pokazano poniżej.

To rozwiązanie wymaga jedynie instalacji dilli żadnych innych bibliotek, ponieważ pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
rocksportrocker
źródło
6
Jestem autorem dilli pathosautorem… a jeśli masz rację, czy nie jest to o wiele ładniejsze, czystsze i bardziej elastyczne w użyciu, pathostak jak w mojej odpowiedzi? A może jestem trochę stronniczy…
Mike McKerns
4
Nie wiedziałem o stanie pathosw momencie pisania i chciałem przedstawić rozwiązanie, które jest bardzo bliskie odpowiedzi. Teraz, gdy widziałem twoje rozwiązanie, zgadzam się, że to jest właściwa droga.
rocksportrocker
Przeczytałem twoje rozwiązanie i pomyślałem: Doh… I didn't even think of doing it like that. To było trochę fajne.
Mike McKerns,
4
Dzięki za wysłanie, zastosowałem to podejście do podsycania / nieumieszczania argumentów, których nie można było marynować: stackoverflow.com/questions/27883574/…
jazzblue
@rocksportrocker. Czytam ten przykład i nie mogę zrozumieć, dlaczego istnieje wyraźna forpętla. Normalnie widziałbym, jak równoległa procedura pobiera listę i zwraca listę bez pętli.
user1700890
20

Odkryłem, że mogę również wygenerować dokładnie ten błąd na doskonale działającym fragmencie kodu, próbując użyć na nim profilera.

Pamiętaj, że było to w systemie Windows (gdzie rozwidlenie jest nieco mniej eleganckie).

Biegłem:

python -m profile -o output.pstats <script> 

I stwierdził, że usunięcie profilowania usunęło błąd, a umieszczenie profilowania przywróciło go. Doprowadzał mnie też do szału, ponieważ wiedziałem, że kod działa. Sprawdzałem, czy coś zaktualizowało pool.py ... a potem poczułem tonięcie i wyeliminowałem profilowanie i to wszystko.

Zamieszczam tutaj archiwum, na wypadek gdyby ktoś wpadł na nie.

Ezekiel Kruglick
źródło
3
WOW, dzięki za wzmiankę! Doprowadzało mnie to do szału przez ostatnią godzinę; Próbowałem wszystkiego na bardzo prostym przykładzie - nic nie działało. Ale miałem również profiler przebiegający przez mój plik wsadowy :(
tim
1
Och, nie mogę ci wystarczająco podziękować. Brzmi to jednak głupio, ponieważ jest tak nieoczekiwane. Myślę, że należy o tym wspomnieć w dokumentacji. Wszystko, co miałem, to instrukcja importu pdb, a prosta funkcja najwyższego poziomu z tylko a passnie była „łatwa”.
0xc0de
10

Kiedy pojawia się ten problem, multiprocessingprostym rozwiązaniem jest przejście z Poolna ThreadPool. Można to zrobić bez zmiany kodu oprócz importu

from multiprocessing.pool import ThreadPool as Pool

Działa to, ponieważ ThreadPool dzieli pamięć z głównym wątkiem, zamiast tworzyć nowy proces - oznacza to, że wytrawianie nie jest wymagane.

Wadą tej metody jest to, że Python nie jest najlepszym językiem do obsługi wątków - używa czegoś o nazwie Global Interpreter Lock, aby zachować bezpieczeństwo wątków, co może spowolnić niektóre przypadki użycia tutaj. Jeśli jednak przede wszystkim wchodzisz w interakcję z innymi systemami (uruchamianie poleceń HTTP, rozmawianie z bazą danych, pisanie do systemów plików), kod prawdopodobnie nie jest związany z procesorem i nie odniesie większego trafienia. W rzeczywistości podczas pisania testów porównawczych HTTP / HTTPS stwierdziłem, że zastosowany tutaj model wątkowy ma mniejszy narzut i opóźnienia, ponieważ narzut związany z tworzeniem nowych procesów jest znacznie wyższy niż narzut związany z tworzeniem nowych wątków.

Więc jeśli przetwarzasz mnóstwo rzeczy w przestrzeni użytkownika Pythona, może to nie być najlepsza metoda.

tedivm
źródło
2
Ale wtedy używasz tylko jednego procesora (przynajmniej ze zwykłymi wersjami Pythona, które używają GIL ), co w pewnym sensie nie udaje się.
Endre Both
To naprawdę zależy od celu. Globalna blokada interpretera oznacza, że ​​tylko jedna instancja może jednocześnie uruchamiać kod Pythona, ale w przypadku działań, które mocno blokują (dostęp do systemu plików, pobieranie dużych lub wielu plików, uruchamianie kodu zewnętrznego) GIL nie stanowi problemu. W niektórych przypadkach narzut związany z otwieraniem nowych procesów (zamiast wątków) przewyższa narzut GIL.
tedivm
To prawda, dzięki. Nadal możesz chcieć zawrzeć zastrzeżenie w odpowiedzi. W dzisiejszych czasach, gdy wzrost mocy przetwarzania występuje głównie w postaci bardziej niż mocniejszych rdzeni procesora, przejście z wykonywania wielordzeniowego na jednordzeniowy jest raczej znaczącym efektem ubocznym.
Endre Both
Dobra uwaga - zaktualizowałem odpowiedź o więcej szczegółów. Chciałbym jednak zauważyć, że przejście na wielowątkowe przetwarzanie wątków nie powoduje, że Python działa tylko na jednym rdzeniu.
tedivm
4

To rozwiązanie wymaga jedynie instalacji kopru i żadnych innych bibliotek jako patosu

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Działa również w przypadku tablic numpy.

Ilia w495 Nikitin
źródło
2
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Ten błąd wystąpi również, jeśli w obiekcie modelu zostanie przekazana jakaś funkcja, która została przekazana do zadania asynchronicznego.

Upewnij się więc, że przekazane obiekty modelu nie mają wbudowanych funkcji. (W naszym przypadku używaliśmy FieldTracker()funkcji django-model-utils wewnątrz modelu do śledzenia określonego pola). Oto link do odpowiedniego problemu GitHub.

Penkey Suresh
źródło
0

Opierając się na rozwiązaniu @rocksportrocker, sensowne byłoby koperek podczas wysyłania i odzyskiwania wyników.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)
powinieneś zobaczyć
źródło