Równoległe tworzenie pętli for w Pythonie

35

Czy są jakieś narzędzia w Pythonie, które są podobne do parla Matlaba? Znalazłem ten wątek , ale ma on cztery lata. Myślałem, że może ktoś tutaj może mieć nowsze doświadczenia.

Oto przykład tego, co chciałbym zrównoważyć:

X = np.random.normal(size=(10, 3))
F = np.zeros((10, ))
for i in range(10):
    F[i] = my_function(X[i,:])

gdzie my_functionprzyjmuje ndarrayrozmiar (1,3)i zwraca skalar.

Przynajmniej chciałbym używać wielu rdzeni jednocześnie --- jak parfor. Innymi słowy, załóżmy, że mamy do czynienia z systemem pamięci wspólnej z rdzeniami od 8 do 16.

Paul G. Constantine
źródło
Wiele wyników w Google. Wydaje się to dość proste: blog.dominodatalab.com/simple-parallelization quora.com/What-is-the-Python-equivalent-of-MATLABs-parfor
Doug Lipinski
Dzięki, @ doug-lipiński. Te przykłady, podobnie jak inne, które znalazłem podczas googlowania, mają pewne trywialne obliczenia oparte na indeksie iteracji. I zawsze twierdzą, że kod jest „niezwykle łatwy”. Mój przykład definiuje tablice (alokuje pamięć) poza pętlą for. Wszystko w porządku, robię to w inny sposób; tak właśnie robię w Matlabie. Trudną częścią, która zdaje się znosić te przykłady, jest włączenie części danej tablicy do funkcji wewnątrz pętli.
Paul G. Constantine

Odpowiedzi:

19

Joblib robi to, co chcesz. Podstawowy wzorzec użytkowania to:

from joblib import Parallel, delayed

def myfun(arg):
     do_stuff
     return result

results = Parallel(n_jobs=-1, verbose=verbosity_level, backend="threading")(
             map(delayed(myfun), arg_instances))

gdzie arg_instancesjest listą wartości, dla których myfunjest obliczana równolegle. Głównym ograniczeniem jest to, że myfunmusi to być funkcja najwyższego poziomu. backendParametr może być "threading"albo "multiprocessing".

Możesz przekazać dodatkowe wspólne parametry do funkcji równoległej. Treść myfunmoże również odnosić się do zainicjowanych zmiennych globalnych, wartości, które będą dostępne dla dzieci.

Argumenty i wyniki mogą być praktycznie wszystkim z backendem wątków, ale wyniki muszą być możliwe do serializacji z backendem wieloprocesowym.


Dask oferuje również podobną funkcjonalność. Może być bardziej wskazane, jeśli pracujesz z danymi podstawowymi lub próbujesz zrównoleglić bardziej złożone obliczenia.

Daniel Mahler
źródło
Widzę zerową wartość dodaną do korzystania z baterii, w tym do wielokrotnego przetwarzania. Założę się, że Joblib używa go pod maską.
Xavier Combelle
1
Należy wspomnieć, że joblib nie jest magią, threadingbackend cierpi z powodu wąskiego gardła GIL, a multiprocessingbackend wiąże się z dużym obciążeniem z powodu serializacji wszystkich parametrów i wartości zwracanych. Zobacz tę odpowiedź, aby poznać szczegóły niskiego poziomu przetwarzania równoległego w Pythonie.
Jakub Klinkovský
Nie mogę znaleźć kombinacji złożoności funkcji i liczby iteracji, dla których joblib byłby szybszy niż pętla for. Dla mnie ma tę samą prędkość, jeśli n_jobs = 1, i jest znacznie wolniejszy we wszystkich innych przypadkach
Aleksejs Fomins
@AleksejsFomins Równoległość oparta na wątkach nie pomoże kodowi, który nie wydaje GIL, ale znacząca liczba tak, szczególnie w zakresie analizy danych lub bibliotek numerycznych. W przeciwnym razie potrzebujesz mutiprocessingu, Jobli obsługuje oba. Moduł wieloprocesowy ma teraz również równoległość map, z której można korzystać bezpośrednio. Również jeśli użyjesz numpy skompilowanej przez mkl, automatycznie zrównoleglą operacje wektoryzowane bez robienia czegokolwiek. Numpy w Ananconda ma domyślnie włączone mkl. Nie ma jednak uniwersalnego rozwiązania. Joblib ma bardzo małe zamieszanie i w 2015 roku było mniej okazji.
Daniel Mahler
Dzięki za radę. Pamiętam, że próbowałem wcześniej wielu procesów, a nawet napisałem kilka postów, ponieważ nie skalowało się tak, jak się spodziewałem. Może powinienem spojrzeć jeszcze raz
Aleksejs Fomins,
9

To, czego szukasz, to Numba , która może automatycznie zrównoleglać pętlę for. Z ich dokumentacji

from numba import jit, prange

@jit
def parallel_sum(A):
    sum = 0.0
    for i in prange(A.shape[0]):
        sum += A[i]

    return sum
LKlevin
źródło
8

Bez zakładania czegoś specjalnego przy my_functionwyborze multiprocessing.Pool().map()można zgadywać równolegle do tak prostych pętli. joblib, dask,mpi Obliczenia lubnumba jak zaproponowano w innych odpowiedzi wygląda nie przynosząc żadnych korzyści dla takich przypadków użycia i dodawać niepotrzebnych zależności (podsumować są przesadą). Używanie wątków, jak zaproponowano w innej odpowiedzi, raczej nie będzie dobrym rozwiązaniem, ponieważ musisz być blisko interakcji GIL z twoim kodem lub twój kod powinien wykonywać głównie operacje wejścia / wyjścia.

To powiedziawszy numbamoże być dobrym pomysłem, aby przyspieszyć sekwencyjny czysty kod Pythona, ale czuję, że jest to poza zakresem pytania.

import multiprocessing
import numpy as np

if __name__ == "__main__":
   #the previous line is necessary under windows to not execute 
   # main module on each child under windows

   X = np.random.normal(size=(10, 3))
   F = np.zeros((10, ))

   pool = multiprocessing.Pool(processes=16)
   # if number of processes is not specified, it uses the number of core
   F[:] = pool.map(my_function, (X[i,:] for i in range(10)) )

Istnieją jednak pewne zastrzeżenia (które nie powinny mieć wpływu na większość aplikacji):

  • pod oknami nie ma obsługi wideł, więc interpreter z modułem głównym jest uruchamiany przy uruchomieniu każdego dziecka, więc może mieć narzut (reklama jest przyczyną if __name__ == "__main__"
  • Argumenty i wyniki funkcji my_funkcje są marynowane i rozszyfrowane, może to być zbyt duży narzut, zobacz tę odpowiedź, aby ją zmniejszyć https://stackoverflow.com/a/37072511/128629 . Sprawia również, że przedmioty, których nie można odebrać, są bezużyteczne
  • my_functionnie powinny zależeć od wspólnych stanów, takich jak komunikacja ze zmiennymi globalnymi, ponieważ stany nie są współużytkowane między procesami. funkcje czyste (funkcje w sensie matematycznym) są przykładem funkcji, które nie dzielą stanów
Xavier Combelle
źródło
6

Mam wrażenie, że parfor jest taki, że MATLAB hermetyzuje szczegóły implementacji, więc może on używać zarówno równoległości pamięci współużytkowanej (co jest potrzebne), jak i równoległości pamięci rozproszonej (jeśli korzystasz z rozproszonego serwera obliczeniowego MATLAB ).

Jeśli chcesz równoległości pamięci współużytkowanej i wykonujesz coś w rodzaju równoległej pętli zadań, prawdopodobnie potrzebujesz standardowego pakietu bibliotek wieloprocesorowych , może z ładnym front- endem , takim jak joblib , jak wspomniano w poście Douga. Standardowa biblioteka nie zniknie i jest utrzymywana, więc jest niskiego ryzyka.

Istnieją również inne opcje, takie jak równoległe możliwości Pythona i równoległe możliwości IPython . Szybki rzut oka na Parallel Python sprawia, że ​​myślę, że jest bliżej ducha parfor, ponieważ biblioteka zawiera szczegółowe dane dla rozproszonej skrzynki, ale kosztem tego jest konieczność przyjęcia ich ekosystemu. Koszt korzystania z IPython jest podobny; musisz przyjąć sposób wykonywania rzeczy przez IPython, co może, ale nie musi być tego warte.

Jeśli zależy ci na pamięci rozproszonej, polecam mpi4py . Lisandro Dalcin wykonuje świetną robotę, a mpi4py jest używany w opakowaniach PETSc Python, więc nie sądzę, że wkrótce zniknie. Podobnie jak przetwarzanie wieloprocesowe, jest to interfejs o niskim (er) poziomie równoległości niż parfor, ale taki, który prawdopodobnie potrwa przez jakiś czas.

Geoff Oxberry
źródło
Dzięki, @Geoff. Czy masz jakieś doświadczenie w pracy z tymi bibliotekami? Może spróbuję użyć mpi4py na maszynie z pamięcią współużytkowaną / procesorze wielordzeniowym.
Paul G. Constantine
@PaulGConstantine Z powodzeniem korzystałem z mpi4py; jest całkiem bezbolesny, jeśli znasz MPI. Nie korzystałem z przetwarzania wieloprocesorowego, ale poleciłem go kolegom, którzy stwierdzili, że zadziałało to dobrze. Użyłem również IPython, ale nie funkcji równoległości, więc nie mogę powiedzieć, jak dobrze działa.
Geoff Oxberry
1
Aron ma fajny samouczek mpi4py, który przygotował na kurs PyHPC w Supercomputing: github.com/pyHPC/pyhpc-tutorial
Matt Knepley
4

Zanim zacznę szukać narzędzia „czarnej skrzynki”, którego można używać do równoległego wykonywania „ogólnych” funkcji pytona, sugeruję przeanalizować, w jaki sposób my_function()można go sparaliżować ręcznie.

Najpierw porównaj czas wykonania narzutu my_function(v)do forpętli Pythona : [C] forPętle Pythona są dość wolne, więc czas spędzony w nich my_function()może być znikomy.

>>> timeit.timeit('pass', number=1000000)
0.01692986488342285
>>> timeit.timeit('for i in range(10): pass', number=1000000)
0.47521495819091797
>>> timeit.timeit('for i in xrange(10): pass', number=1000000)
0.42337894439697266

Drugie sprawdzenie, czy istnieje prosta implementacja wektorowa my_function(v), która nie wymaga pętli:F[:] = my_vector_function(X)

(Te dwa pierwsze punkty są dość trywialne, wybacz mi, jeśli wspomniałem je tutaj tylko dla kompletności.)

Po trzecie i najważniejsze, przynajmniej dla wdrożeń CPython, jest sprawdzenie, czy my_functionspędza większość nadszedł czas, wewnątrz lub na zewnątrz w Global Interpreter Lock lub GIL . Jeśli czas spędza się poza GIL, należy użyć threadingstandardowego modułu bibliotecznego . ( Oto przykład). BTW, można by pomyśleć o pisaniu my_function()jako rozszerzenie C tylko po to, aby wydać GIL.

Wreszcie, jeśli my_function()nie wyda GIL, można użyć multiprocessingmodułu .

Odnośniki: Dokumenty Pythona dotyczące jednoczesnego wykonywania i wprowadzenie numpy / scipy do przetwarzania równoległego .

Stefano M.
źródło
2

Możesz spróbować Julii. Jest dość zbliżony do Pythona i ma wiele konstrukcji MATLAB. Tłumaczenie tutaj to:

F = @parallel (vcat) for i in 1:10
    my_function(randn(3))
end

To sprawia, że ​​liczby losowe są również równoległe i po prostu łączą wyniki na końcu podczas redukcji. Korzysta z przetwarzania wieloprocesorowego (więc musisz addprocs(N)dodać procesy przed użyciem, i działa to również na wielu węzłach na HPC, jak pokazano w tym poście na blogu ).

Zamiast tego możesz również użyć pmap:

F = pmap((i)->my_function(randn(3)),1:10)

Jeśli chcesz równoległości wątków, możesz użyć Threads.@threads(ale upewnij się, że algorytm jest bezpieczny dla wątków). Przed otwarciem Julii ustaw zmienną środowiskową JULIA_NUM_THREADS, a następnie:

Ftmp = [Float64[] for i in Threads.nthreads()]
Threads.@threads for i in 1:10
    push!(Ftmp[Threads.threadid()],my_function(randn(3)))
end
F = vcat(Ftmp...)

Tutaj tworzę osobną tablicę dla każdego wątku, aby w ten sposób nie kolidowały podczas dodawania do tablicy, a następnie po prostu łączyły tablice. Wątek jest całkiem nowy, więc teraz jest proste użycie wątków, ale jestem pewien, że zostaną dodane redukcje wątków i mapy, tak jak w przypadku przetwarzania wieloprocesowego.

Chris Rackauckas
źródło
0

Polecam korzystać z biblioteki równoległej i opóźnionej funkcji Joblib. Użyj modułu „tempfile” do tworzenia tymczasowej pamięci współużytkowanej dla dużych tablic, przykłady i użycie można znaleźć tutaj https://pythonhosted.org/joblib/parallel.html

Ramkumar
źródło