Użyj tablicy numpy w pamięci współdzielonej do przetwarzania wieloprocesowego

112

Chciałbym użyć tablicy numpy w pamięci współdzielonej do użytku z modułem wieloprocesorowym. Trudność polega na używaniu go jako tablicy numpy, a nie tylko jako tablicy ctypes.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

Daje to wyniki takie jak:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

Dostęp do tablicy można uzyskać w sposób ctypes, np arr[i] . to sens. Jednak nie jest to tablica numpy i nie mogę wykonywać takich operacji, jak -1*arr, lub arr.sum(). Przypuszczam, że rozwiązaniem byłoby przekonwertowanie tablicy ctypes na tablicę numpy. Jednak (poza tym, że nie mogę tego zrobić), nie sądzę, aby był on już udostępniany.

Wydaje się, że byłoby standardowe rozwiązanie tego, co musi być powszechnym problemem.

Ian Langmore
źródło
1
To nie to samo co ten? stackoverflow.com/questions/5033799/…
pygabriel
1
To nie jest to samo pytanie. Powiązane pytanie dotyczy subprocessraczej niż multiprocessing.
Andrew

Odpowiedzi:

83

Aby dodać do odpowiedzi @ unutbu (już niedostępnych) i @Henry Gomersall. Możesz użyć shared_arr.get_lock()do zsynchronizowania dostępu w razie potrzeby:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Przykład

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

Jeśli nie potrzebujesz zsynchronizowanego dostępu lub tworzysz własne blokady, mp.Array()jest to niepotrzebne. Możesz użyć mp.sharedctypes.RawArrayw tym przypadku.

jfs
źródło
2
Piękna odpowiedź! Jeśli chcę mieć więcej niż jedną dzieloną tablicę, każdą z osobno zamykaną, ale z liczbą tablic określoną w czasie wykonywania, czy jest to proste rozszerzenie tego, co tutaj zrobiłeś?
Andrew
3
@Andrew: tablice współużytkowane powinny być tworzone przed spawnowaniem procesów potomnych.
jfs
Dobra uwaga na temat kolejności operacji. Miałem jednak na myśli to: utwórz określoną przez użytkownika liczbę współdzielonych tablic, a następnie uruchom kilka procesów potomnych. Czy to proste?
Andrew
1
@ Chicony: nie możesz zmienić rozmiaru tablicy. Potraktuj to jako wspólny blok pamięci, który musiał zostać przydzielony przed uruchomieniem procesów potomnych. Nie musisz używać wszystkich pamięci np, można przejść countdo numpy.frombuffer(). Możesz spróbować zrobić to na niższym poziomie, używając mmaplub czegoś podobnego posix_ipcbezpośrednio do zaimplementowania skalowalnego analogu RawArray (może to obejmować kopiowanie podczas zmiany rozmiaru) (lub poszukaj istniejącej biblioteki). Lub jeśli twoje zadanie na to pozwala: skopiuj dane w częściach (jeśli nie potrzebujesz wszystkich naraz). „Jak zmienić rozmiar pamięci współdzielonej” to osobne pytanie.
jfs
1
@umopapisdn: Pool()określa liczbę procesów (domyślnie używana jest liczba dostępnych rdzeni procesora). Mto liczba wywołań f()funkcji.
jfs
21

Z Arrayobiektem jest get_obj()skojarzona metoda, która zwraca tablicę ctypes, która przedstawia interfejs bufora. Myślę, że następujące elementy powinny działać ...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

Po uruchomieniu wypisuje pierwszy element z aobecnie 10.0, pokazując ai bsą tylko dwoma widokami w tej samej pamięci.

Aby upewnić się, że nadal jest bezpieczny dla wielu procesorów, uważam, że będziesz musiał użyć acquirerelease metod i istniejących na Arrayobiekcie a, a także jego wbudowanego zamka, aby upewnić się, że wszystko jest bezpiecznie dostępne (chociaż nie jestem ekspertem w moduł wieloprocesorowy).

Henry Gomersall
źródło
nie zadziała bez synchronizacji, jak @unutbu pokazał w swojej (teraz usuniętej) odpowiedzi.
jfs
1
Przypuszczalnie, jeśli chcesz tylko uzyskać dostęp do przetwarzania końcowego tablicy, można to zrobić czysto, nie martwiąc się o problemy ze współbieżnością i blokowanie?
Henry Gomersall
w tym przypadku nie potrzebujesz mp.Array.
jfs
1
Kod przetwarzający może wymagać zablokowanych tablic, ale interpretacja danych po przetworzeniu nie musi koniecznie. Myślę, że wynika to ze zrozumienia, na czym dokładnie polega problem. Oczywiście jednoczesny dostęp do udostępnionych danych będzie wymagał pewnej ochrony, co moim zdaniem będzie oczywiste!
Henry Gomersall
16

Chociaż już podane odpowiedzi są dobre, istnieje znacznie łatwiejsze rozwiązanie tego problemu, pod warunkiem spełnienia dwóch warunków:

  1. Jesteś na serwerze zgodnym z POSIX systemu operacyjnego (np. Linux, Mac OSX); i
  2. Twoje procesy podrzędne wymagają dostępu tylko do odczytu do udostępnionej tablicy.

W tym przypadku nie musisz majstrować przy jawnym udostępnianiu zmiennych, ponieważ procesy potomne zostaną utworzone za pomocą rozwidlenia. Rozwidlone dziecko automatycznie dzieli przestrzeń pamięci rodzica. W kontekście wieloprocesorowości Pythona oznacza to, że udostępnia on wszystkie zmienne na poziomie modułu ; zwróć uwagę, że nie dotyczy to argumentów, które jawnie przekazujesz swoim procesom potomnym lub funkcjom, które wywołujesz w plikumultiprocessing.Pool .

Prosty przykład:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
EelkeSpaak
źródło
3
+1 Naprawdę cenne informacje. Czy możesz wyjaśnić, dlaczego współdzielone są tylko zmienne na poziomie modułu? Dlaczego lokalne zmienne nie są częścią przestrzeni pamięci rodzica? Np. Dlaczego to nie zadziała, jeśli mam funkcję F z lokalną zmienną V i funkcję G wewnątrz F, która odwołuje się do V?
Coffee_Table
5
Ostrzeżenie: ta odpowiedź jest trochę zwodnicza. Proces potomny otrzymuje kopię stanu procesu nadrzędnego, w tym zmiennych globalnych, w momencie rozgałęzienia. Stany nie są w żaden sposób zsynchronizowane i od tego momentu będą się różnić. Technika ta może być przydatna w niektórych scenariuszach (np. Rozwidlanie procesów potomnych ad-hoc, z których każdy obsługuje migawkę procesu nadrzędnego, a następnie kończy pracę), ale jest bezużyteczna w innych (np. Długotrwałe procesy potomne, które muszą udostępniać i synchronizacja danych z procesem nadrzędnym).
David Stein
4
@EelkeSpaak: Twoje stwierdzenie - „rozwidlone dziecko automatycznie dzieli przestrzeń pamięci rodzica” - jest niepoprawne. Jeśli mam proces potomny, który chce monitorować stan procesu nadrzędnego w sposób ściśle tylko do odczytu, rozwidlanie mnie tam nie doprowadzi: dziecko widzi tylko migawkę stanu nadrzędnego w momencie rozwidlenia. W rzeczywistości właśnie to próbowałem zrobić (podążając za twoją odpowiedzią), kiedy odkryłem to ograniczenie. Stąd dopisek do Twojej odpowiedzi. Krótko mówiąc: stan nadrzędny nie jest „udostępniany”, lecz jedynie kopiowany do dziecka. To nie jest „dzielenie się” w zwykłym sensie.
David Stein
2
Czy mylę się, myśląc, że jest to sytuacja polegająca na kopiowaniu przy zapisie, przynajmniej w systemach POSIX? Oznacza to, że myślę, że po rozwidleniu pamięć jest udostępniana do momentu zapisania nowych danych, w którym to momencie tworzona jest kopia. Więc tak, to prawda, że ​​dane nie są dokładnie „udostępniane”, ale mogą zapewnić potencjalnie ogromny wzrost wydajności. Jeśli twój proces jest tylko do odczytu, nie będzie kosztów kopiowania! Czy dobrze zrozumiałem punkt?
senderle
2
@senderle Tak, dokładnie to miałem na myśli! Stąd mój punkt (2) w odpowiedzi na temat dostępu tylko do odczytu.
EelkeSpaak
11

Napisałem mały moduł Pythona, który używa pamięci współdzielonej POSIX do udostępniania tablic numpy między interpreterami Pythona. Może uznasz to za przydatne.

https://pypi.python.org/pypi/SharedArray

Oto jak to działa:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
mata
źródło
8

Możesz skorzystać z sharedmemmodułu: https://bitbucket.org/cleemesser/numpy-sharedmem

Oto twój oryginalny kod, tym razem wykorzystujący pamięć współdzieloną, która zachowuje się jak tablica NumPy (zwróć uwagę na dodatkową ostatnią instrukcję wywołującą funkcję NumPy sum()):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()
Velimir Mlaker
źródło
1
Uwaga: to nie jest już rozwijane i nie wydaje się działać na linuxie github.com/sturlamolden/sharedmem-numpy/issues/4
AD
numpy-sharedmem może nie być w fazie rozwoju, ale nadal działa na Linuksie, sprawdź github.com/vmlaker/benchmark-sharedmem .
Velimir Mlaker