Obiekty pamięci współdzielonej w przetwarzaniu wieloprocesowym

124

Załóżmy, że mam dużą tablicę numpy w pamięci, mam funkcję, funcktóra przyjmuje tę gigantyczną tablicę jako dane wejściowe (wraz z kilkoma innymi parametrami). funcz różnymi parametrami mogą działać równolegle. Na przykład:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Jeśli użyję biblioteki wieloprocesorowej, ta olbrzymia tablica zostanie skopiowana wiele razy do różnych procesów.

Czy istnieje sposób, aby różne procesy korzystały z tej samej tablicy? Ten obiekt tablicy jest tylko do odczytu i nigdy nie będzie modyfikowany.

Co jest bardziej skomplikowane, jeśli arr nie jest tablicą, ale dowolnym obiektem w języku Python, czy istnieje sposób na udostępnienie go?

[ZMIENIONO]

Przeczytałem odpowiedź, ale nadal jestem trochę zdezorientowany. Ponieważ fork () jest funkcją kopiowania przy zapisie, nie powinniśmy wywoływać żadnych dodatkowych kosztów podczas tworzenia nowych procesów w wieloprocesorowej bibliotece Pythona. Ale poniższy kod sugeruje, że istnieje ogromny narzut:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

wyjście (a przy okazji, koszt rośnie wraz ze wzrostem rozmiaru tablicy, więc podejrzewam, że nadal istnieje narzut związany z kopiowaniem pamięci):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Dlaczego jest tak duże obciążenie, jeśli nie skopiowaliśmy tablicy? A jaką część oszczędza mi pamięć współdzielona?

Wendeta
źródło
Spojrzałeś na dokumenty , prawda?
Lew Lewicki
@FrancisAvila czy istnieje sposób na udostępnianie nie tylko tablicy, ale także dowolnych obiektów Pythona?
Vendetta
1
@LevLevitsky Muszę zapytać, czy istnieje sposób na udostępnienie nie tylko tablicy, ale także dowolnych obiektów Pythona?
Vendetta
2
Ta odpowiedź ładnie wyjaśnia, dlaczego nie można udostępniać dowolnych obiektów Pythona.
Janne Karila

Odpowiedzi:

121

Jeśli używasz systemu operacyjnego, który używa fork()semantyki kopiowania przy zapisie (jak każdy zwykły unix), to dopóki nie zmienisz struktury danych, będzie on dostępny dla wszystkich procesów potomnych bez zajmowania dodatkowej pamięci. Nie będziesz musiał robić nic specjalnego (z wyjątkiem absolutnej pewności, że nie zmieniasz obiektu).

Najbardziej wydajną rzeczą , jaką możesz zrobić dla swojego problemu, byłoby spakowanie tablicy w wydajną strukturę tablicową (przy użyciu numpylub array), umieszczenie jej w pamięci współdzielonej, zawinięcie jej multiprocessing.Arrayi przekazanie jej do funkcji. Ta odpowiedź pokazuje, jak to zrobić .

Jeśli chcesz udostępnić obiekt z możliwością zapisu , musisz go otoczyć jakąś synchronizacją lub blokowaniem. multiprocessingudostępnia dwie metody, aby to zrobić : jedną z wykorzystaniem pamięci współdzielonej (odpowiedniej dla prostych wartości, tablic lub typów ctypów) lub Managerproxy, gdzie jeden proces przechowuje pamięć, a menedżer decyduje o dostępie do niej z innych procesów (nawet przez sieć).

ManagerPodejście może być wykorzystywane z dowolną Pythona obiektów, ale będzie mniejsza niż równoważne im wspólnej pamięci, ponieważ przedmioty muszą być serializowane / rozszeregować i wysyłane tylko procesów.

W Pythonie dostępnych jest wiele bibliotek przetwarzania równoległego i metod . multiprocessingto doskonała i wszechstronna biblioteka, ale jeśli masz specjalne potrzeby, być może jedno z pozostałych podejść może być lepsze.

Francis Avila
źródło
25
Uwaga: w Pythonie fork () w rzeczywistości oznacza kopiowanie przy dostępie (ponieważ samo uzyskanie dostępu do obiektu zmieni jego liczbę ref).
Fabio Zadrozny
3
@FabioZadrozny Czy faktycznie skopiowałby cały obiekt, czy tylko stronę pamięci zawierającą jego refcount?
zigg,
5
AFAIK, tylko strona pamięci zawierająca refcount (czyli 4kb na każdy dostęp do obiektu).
Fabio Zadrozny
1
@max Użyj zamknięcia. Funkcja podana do apply_asyncpowinna odwoływać się do współdzielonego obiektu bezpośrednio w zakresie zamiast poprzez swoje argumenty.
Francis Avila
3
@FrancisAvila Jak używasz zamknięcia? Czy funkcja, którą dajesz apply_async nie powinna być możliwa do wybrania? Czy jest to tylko ograniczenie map_async?
GermanK
17

Napotkałem ten sam problem i napisałem małą klasę narzędzi pamięci współdzielonej, aby go obejść.

Używam multiprocessing.RawArray(bez blokady), a także dostęp do tablic nie jest w ogóle zsynchronizowany (bez blokady), uważaj, aby nie strzelić sobie nogami.

Dzięki temu rozwiązaniu uzyskuję około 3-krotne przyspieszenie na czterordzeniowym i7.

Oto kod: Zapraszam do korzystania i ulepszania go oraz zgłaszania wszelkich błędów.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
martin.preinfalk
źródło
Właśnie zdałem sobie sprawę, że musisz skonfigurować tablice pamięci współdzielonej, zanim utworzysz pulę wieloprocesorową, nie wiem jeszcze dlaczego, ale na pewno nie będzie działać w drugą stronę.
martin.preinfalk
powodem jest to, że pula wieloprocesorowa wywołuje fork (), gdy tworzona jest instancja puli, więc nic po tym nie uzyska dostępu do wskaźnika do żadnej współdzielonej pamięci utworzonej później.
Xiv
Kiedy wypróbowałem ten kod pod py35, dostałem wyjątek w multiprocessing.sharedctypes.py, więc myślę, że ten kod jest przeznaczony tylko dla py2.
Dr. Hillier Dániel
11

To jest zamierzony przypadek użycia Ray , który jest biblioteką równoległego i rozproszonego Pythona. Pod maską serializuje obiekty przy użyciu układu danych Apache Arrow (który jest formatem zerowej kopii) i przechowuje je w magazynie obiektów pamięci współdzielonej, dzięki czemu można uzyskać do nich dostęp przez wiele procesów bez tworzenia kopii.

Kod wyglądałby następująco.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

Jeśli nie wywołasz, ray.puttablica nadal będzie przechowywana w pamięci współdzielonej, ale zostanie to zrobione raz na wywołanie func, co nie jest tym, czego chcesz.

Zauważ, że zadziała to nie tylko w przypadku tablic, ale także w przypadku obiektów, które zawierają tablice , np. Słowniki mapujące ints do tablic, jak poniżej.

Możesz porównać wydajność serializacji w Ray i pickle, uruchamiając następujące polecenie w IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Serializacja z Rayem jest tylko trochę szybsza niż pikle, ale deserializacja jest 1000x szybsza ze względu na użycie pamięci współdzielonej (ta liczba będzie oczywiście zależała od obiektu).

Zobacz dokumentację Ray . Możesz przeczytać więcej o szybkiej serializacji za pomocą Ray and Arrow . Uwaga Jestem jednym z programistów Ray.

Robert Nishihara
źródło
1
Ray brzmi dobrze! Ale próbowałem wcześniej korzystać z tej biblioteki, ale niestety właśnie zdałem sobie sprawę, że Ray nie obsługuje okien. Mam nadzieję, że jak najszybciej będziecie wspierać Windows. Dziękuję programistom!
Hzzkygcs
6

Jak wspomniał Robert Nishihara, Apache Arrow sprawia, że ​​jest to łatwe, szczególnie dzięki przechowywaniu obiektów w pamięci Plazmy, na którym zbudowany jest Ray.

Stworzyłem plazmę mózgową specjalnie z tego powodu - szybkie ładowanie i przeładowywanie dużych obiektów w aplikacji Flask. Jest to przestrzeń nazw obiektów pamięci współużytkowanej dla obiektów picklemożliwych do serializacji Apache Arrow, w tym 'd bytestring'ów generowanych przezpickle.dumps(...) .

Kluczową różnicą w stosunku do Apache Ray i Plasma jest to, że śledzi identyfikatory obiektów za Ciebie. Wszystkie procesy, wątki lub programy działające lokalnie mogą współużytkować wartości zmiennych, wywołując nazwę z dowolnego Brainobiektu.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]
russellthehippo
źródło