Jak zrównoleglić prostą pętlę Pythona?

255

Jest to prawdopodobnie trywialne pytanie, ale jak zrównoleglić następującą pętlę w pythonie?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Wiem, jak uruchomić pojedyncze wątki w Pythonie, ale nie wiem, jak „zbierać” wyniki.

Wiele procesów byłoby również w porządku - cokolwiek jest najłatwiejsze w tym przypadku. Używam obecnie Linuksa, ale kod powinien również działać w systemach Windows i Mac.

Jaki jest najprostszy sposób na zrównoleglenie tego kodu?

ja sam
źródło

Odpowiedzi:

191

Używanie wielu wątków w CPython nie zapewni lepszej wydajności kodu czysto Python ze względu na globalną blokadę interpretera (GIL). Proponuję multiprocessingzamiast tego użyć modułu:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Pamiętaj, że to nie zadziała w interaktywnym tłumaczu.

Aby uniknąć zwykłego FUD wokół GIL: W każdym razie użycie wątków w tym przykładzie nie byłoby korzystne. Ty chcesz tutaj, a nie używać nici procesów, ponieważ uniknąć całą masę problemów.

Sven Marnach
źródło
46
Skoro jest to wybrana odpowiedź, czy można mieć bardziej kompleksowy przykład? Jakie są argumenty calc_stuff?
Eduardo Pignatelli,
2
@EduardoPignatelli Proszę przeczytać dokumentację multiprocessingmodułu, aby uzyskać bardziej wyczerpujące przykłady. Pool.map()w zasadzie działa jak map(), ale równolegle.
Sven Marnach,
3
Czy istnieje sposób, aby po prostu dodać pasek ładowania tqdm do tej struktury kodu? Użyłem tqdm (pool.imap (calc_stuff, range (0, 10 * offset, offset))), ale nie otrzymuję pełnej grafiki paska ładowania.
user8188120,
@ user8188120 Nigdy wcześniej nie słyszałem o tqdm, więc przepraszam, nie mogę nic na to poradzić.
Sven Marnach,
Aby wyświetlić pasek ładowania tqdm, zobacz to pytanie: stackoverflow.com/questions/41920124/...
Johannes
66

W celu zrównoleglenia prostej pętli for, joblib wnosi wiele wartości do surowego wykorzystania wieloprocesowości. Nie tylko krótka składnia, ale także takie rzeczy, jak przezroczyste wiązanie iteracji, gdy są one bardzo szybkie (aby usunąć narzut) lub przechwytywanie śledzenia procesu potomnego w celu lepszego raportowania błędów.

Oświadczenie: Jestem oryginalnym autorem joblib.

Gael Varoquaux
źródło
1
Próbowałem joblib z jupyter, to nie działa. Po wywołaniu opóźnionym równolegle strona przestała działać.
Jie
1
Cześć, mam problem z korzystaniem z joblib ( stackoverflow.com/questions/52166572/... ), czy masz jakieś pojęcie, co może być przyczyną? Dziękuję bardzo.
Ting Sun,
Wydaje się, że chcę spróbować! Czy można go używać z podwójną pętlą, np. Dla i w zakresie (10): dla j w zakresie (20)
CutePoison
51

Jaki jest najprostszy sposób na zrównoleglenie tego kodu?

Naprawdę podoba mi się concurrent.futuresto, dostępne w Python3 od wersji 3.2 - i poprzez backport do wersji 2.6 i 2.7 na PyPi .

Możesz używać wątków lub procesów i używać dokładnie tego samego interfejsu.

Wieloprocesowe

Umieść to w pliku - futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

A oto wynik:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Wielowątkowość

Teraz zmienia ProcessPoolExecutorsię ThreadPoolExecutor, i ponownie uruchomić moduł:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Teraz wykonałeś zarówno wielowątkowość, jak i wieloprocesowość!

Uwaga na temat wydajności i korzystania z obu razem.

Próbkowanie jest zdecydowanie zbyt małe, aby porównać wyniki.

Podejrzewam jednak, że wielowątkowość będzie szybsza niż wieloprocesowość w ogóle, szczególnie w systemie Windows, ponieważ system Windows nie obsługuje rozwidlania, więc uruchomienie każdego nowego procesu wymaga czasu. W systemie Linux lub Mac będą prawdopodobnie bliżej.

Możesz zagnieździć wiele wątków w wielu procesach, ale zaleca się, aby nie używać wielu wątków do wydzielenia wielu procesów.

Aaron Hall
źródło
czy ThreadPoolExecutor pomija ograniczenia nałożone przez GIL? również nie musisz dołączać (), aby czekać na zakończenie executorów, czy też jest to załatwiane pośrednio w menedżerze kontekstu
PirateApp
1
Nie i nie, tak dla „niejawnie załatwionego”
Aaron Hall
Z jakiegoś powodu podczas skalowania problemu wielowątkowość jest niezwykle szybka, ale wieloprocesowość powoduje powstanie wielu zablokowanych procesów (w systemie macOS). Wiesz, dlaczego to może być? Proces zawiera tylko zagnieżdżone pętle i matematykę, nic egzotycznego.
komodovaran_
@komodovaran_ Proces jest pełnym procesem w Pythonie, po jednym na każdy, podczas gdy wątek jest tylko wątkiem wykonania z własnym stosem, który dzieli proces, jego kod bajtowy i wszystko inne, co ma w pamięci z wszystkimi innymi wątkami - czy to pomaga ?
Aaron Hall
49
from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

Powyższe działa pięknie na moim komputerze (Ubuntu, pakiet joblib został wstępnie zainstalowany, ale można go zainstalować za pośrednictwem pip install joblib).

Zaczerpnięte z https://blog.dominodatalab.com/simple-parallelization/

tyrex
źródło
3
Próbowałem kodu, ale w moim systemie sekwencyjna wersja tego kodu zajmuje około pół minuty, a powyższa wersja równoległa zajmuje 4 minuty. Dlaczego tak?
shaifali Gupta
3
Dziękuję za odpowiedź! Myślę, że jest to najbardziej elegancki sposób na zrobienie tego w 2019 r.
Heikki Pulkkinen
2
wieloprocesowość nie działa w Pythonie 3.x, więc to nie działa dla mnie.
EngrStudent
2
@EngrStudent Nie wiesz, co rozumiesz przez „niepoprawny”. Działa dla mnie Python 3.6.x.
tyrex
@tyrex dzięki za udostępnienie! ten pakiet joblib jest świetny i ten przykład działa dla mnie. Chociaż w bardziej złożonym kontekście miałem niestety błąd. github.com/joblib/joblib/issues/949
Broker Open Food
13

Korzystanie z Ray ma wiele zalet :

  • Możesz pracować równolegle na wielu komputerach oprócz wielu rdzeni (z tym samym kodem).
  • Wydajna obsługa danych numerycznych za pośrednictwem pamięci współużytkowanej (i serializacji bez kopiowania).
  • Wysoka przepustowość zadań dzięki planowaniu rozproszonemu.
  • Odporność na awarie

W twoim przypadku możesz uruchomić Ray i zdefiniować zdalną funkcję

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

a następnie wywołać to równolegle

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

Aby uruchomić ten sam przykład w klastrze, jedyną linią, która by się zmieniła, byłoby wywołanie ray.init (). Odpowiednia dokumentacja znajduje się tutaj .

Zauważ, że pomagam rozwijać Ray.

Robert Nishihara
źródło
1
Dla każdego, kto rozważa ray, może być istotne, aby wiedzieć, że natywnie nie obsługuje Windows. Możliwe są niektóre włamania, aby uruchomić go w systemie Windows przy użyciu WSL (Windows Subsystem for Linux), choć nie jest to gotowe, jeśli chcesz korzystać z systemu Windows.
OscarVanL
9

To najłatwiejszy sposób na zrobienie tego!

Możesz użyć asyncio . (Dokumentacja znajduje się tutaj ). Jest wykorzystywany jako podstawa dla wielu asynchronicznych struktur Pythona, które zapewniają wysokowydajne serwery sieciowe i sieciowe, biblioteki połączeń z bazami danych, rozproszone kolejki zadań itp. Ponadto posiada interfejsy API wysokiego i niskiego poziomu, aby poradzić sobie z każdym rodzajem problemu .

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Teraz ta funkcja będzie uruchamiana równolegle za każdym razem, gdy zostanie wywołana, bez przełączania programu głównego w stan oczekiwania. Możesz go również użyć do równoległego tworzenia pętli. Gdy wywoływana jest pętla for, pętla jest sekwencyjna, ale każda iteracja przebiega równolegle do programu głównego, gdy tylko dojdzie do interpretera. Na przykład:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

Daje to następujące wyniki:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
Użytkownik 5
źródło
Myślę, że jest literówka wrapped()i powinna być **kwargszamiast*kwargs
jakub-olczyk
Ups! Mój błąd. Poprawione!
Użytkownik5
6

dlaczego nie używasz wątków i jednego muteksu do ochrony jednej globalnej listy?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

pamiętaj, że będziesz tak szybki jak najwolniejszy wątek

jackdoe
źródło
2
Wiem, że jest to bardzo stara odpowiedź, więc nie jest łatwo zdobyć losowe głosowanie znikąd. Poparłem tylko głosowanie, ponieważ wątki niczego nie zrównoleglą. Wątki w Pythonie są powiązane tylko z jednym wątkiem wykonującym się na interpreterze z powodu globalnej blokady interpretera, więc obsługują programowanie współbieżne, ale nie równoległe, jak wymaga OP.
skrrgwasme
3
@skrrgwasme Wiem, że o tym wiesz, ale kiedy użyjesz słów „nic nie zrównoleglą”, może to wprowadzić w błąd czytelników. Jeśli operacje zajmują dużo czasu, ponieważ są związane z IO lub śpią w oczekiwaniu na zdarzenie, wówczas interpreter może uruchomić inne wątki, co spowoduje wzrost prędkości, na który ludzie liczą w takich przypadkach. Skrrgwasme naprawdę wpływa na tylko wątki związane z procesorem.
Jonathan Hartley,
5

Uważam, że joblibjest ze mną bardzo przydatny. Zobacz następujący przykład:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs = -1: użyj wszystkich dostępnych rdzeni

miuxu
źródło
14
Wiesz, lepiej sprawdzić już istniejące odpowiedzi przed opublikowaniem własnych. Ta odpowiedź proponuje również użycie joblib.
sanyash
2

Powiedzmy, że mamy funkcję asynchroniczną

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

To musi być uruchomione na dużej tablicy. Niektóre atrybuty są przekazywane do programu, a niektóre są używane z właściwości elementu słownika w tablicy.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))
Amit Teli
źródło
1

Zerknij na to;

http://docs.python.org/library/queue.html

To może nie być właściwy sposób, ale zrobiłbym coś takiego;

Aktualny kod;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

Mam nadzieję, że to pomaga.

MerreM
źródło
1

Może to być przydatne podczas wdrażania przetwarzania wieloprocesowego i przetwarzania równoległego / rozproszonego w języku Python.

Samouczek YouTube na temat korzystania z pakietu Techila

Techila to pośrednie oprogramowanie do przetwarzania rozproszonego, które integruje się bezpośrednio z Pythonem za pomocą pakietu techila. Funkcja brzoskwini w pakiecie może być użyteczna w równoległych strukturach pętli. (Poniższy fragment kodu pochodzi z forów społeczności Techila )

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )
Trójnik
źródło
1
Chociaż ten link może odpowiedzieć na pytanie, lepiej jest dołączyć tutaj istotne części odpowiedzi i podać link w celach informacyjnych. Odpowiedzi zawierające tylko łącze mogą stać się nieprawidłowe, jeśli połączona strona ulegnie zmianie.
SL Barth - Przywróć Monikę
2
@SLBarth dziękuję za opinię. Do odpowiedzi dodałem mały przykładowy kod.
TEe
1

dzięki @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'
Felipe de Macêdo
źródło
2
-1. To jest odpowiedź tylko na kod. Sugeruję dodanie objaśnienia, które mówi czytelnikom, co robi napisany przez Ciebie kod, i być może gdzie mogą znaleźć dodatkowe informacje.
starbeamrainbowlabs,
-1

bardzo prosty przykład przetwarzania równoległego to

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()
Adil Warsi
źródło
3
Tutaj nie ma równoległości w pętli for, po prostu spawnujesz proces, który uruchamia całą pętlę; to nie jest zamierzenie PO.
facuq