Python: Jak mogę równolegle uruchamiać funkcje Pythona?

109

Najpierw szukałem informacji i nie mogłem znaleźć odpowiedzi na moje pytanie. Próbuję uruchomić wiele funkcji równolegle w Pythonie.

Mam coś takiego:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Chcę wywołać func1 i func2 i uruchomić je w tym samym czasie. Funkcje nie oddziałują na siebie ani na ten sam obiekt. W tej chwili muszę poczekać na zakończenie funkcji func1 przed uruchomieniem funkcji func2. Jak mogę zrobić coś takiego jak poniżej:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Chcę mieć możliwość tworzenia obu katalogów w przybliżeniu w tym samym czasie, ponieważ co minutę liczę, ile plików jest tworzonych. Jeśli katalogu tam nie ma, wpłynie to na mój czas.

lmcadory
źródło
1
Możesz zmienić architekturę tego; jeśli liczysz pliki / foldery co minutę, tworzysz warunek wyścigu. Co powiesz na to, że każda funkcja aktualizuje licznik lub używa pliku blokującego, aby upewnić się, że okresowy proces nie aktualizuje licznika, dopóki obie funkcje nie zakończą wykonywania?

Odpowiedzi:

164

Możesz użyć threadinglub multiprocessing.

Ze względu na specyfikę CPython , threadingjest mało prawdopodobne, aby osiągnąć prawdziwą równoległość. Z tego powodu multiprocessingjest ogólnie lepszym zakładem.

Oto pełny przykład:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

Mechanizm uruchamiania / dołączania procesów potomnych można łatwo zamknąć w funkcji zgodnie z runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
NPE
źródło
4
Użyłem twojego kodu, ale funkcje nadal nie uruchomiły się w tym samym czasie.
lmcadory
4
@Lamar McAdory: Proszę wyjaśnij, co dokładnie rozumiesz przez „w tym samym czasie”, być może podając konkretny przykład tego, co zrobiłeś, czego się spodziewałeś i co się faktycznie wydarzyło.
NPE
4
@Lamar: Nigdy nie możesz mieć żadnej gwarancji „dokładnie tego samego czasu”, a myślenie, że możesz, jest po prostu błędne. W zależności od tego, ile masz procesorów cpus, obciążenia maszyny, czas wielu zdarzeń na komputerze będzie miał wpływ na czas rozpoczęcia wątków / procesu. Ponadto, ponieważ procesy są uruchamiane zaraz po utworzeniu, narzut związany z tworzeniem procesu należy również obliczyć w widocznej różnicy czasu.
Martin
1
czy można uzyskać listę wyników każdej funkcji? powiedzmy, że każda funkcja zwraca inną wartość, czy wartości można dołączyć do jakiejś listy, której można użyć później? może dodanie wyniku do globalnej listy?
pelos
1
Jeśli moje funkcje przyjmują parametry, a kiedy przekazuję je podczas wywoływania ich z oddzielnych procesów, nie działają one jednocześnie. Czy możesz pomóc
user2910372
18

Można to zrobić elegancko za pomocą Ray , systemu, który umożliwia łatwe zrównoleglenie i dystrybucję kodu Pythona.

Aby zrównoleglać swój przykład, musisz zdefiniować swoje funkcje za pomocą @ray.remotedekoratora, a następnie wywołać je za pomocą .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Jeśli przekażesz ten sam argument do obu funkcji, a argument jest duży, wydajniejszym sposobem jest użycie ray.put(). Pozwala to uniknąć podwójnego serializacji dużego argumentu i utworzyć dwie kopie pamięci:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Jeśli func1()i func2()zwrócisz wyniki, musisz przepisać kod w następujący sposób:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Stosowanie Ray w porównaniu z modułem wieloprocesorowym ma wiele zalet . W szczególności ten sam kod będzie działał na pojedynczej maszynie, jak również na klastrze maszyn. Aby uzyskać więcej zalet Ray, zobacz ten powiązany post .

Ion Stoica
źródło
18

Jeśli twoje funkcje wykonują głównie operacje we / wy (i mniej pracy procesora) i masz Python 3.2+, możesz użyć ThreadPoolExecutor :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Jeśli twoje funkcje wykonują głównie pracę procesora (i mniej pracy we / wy) i masz Python 2.6+, możesz użyć modułu wieloprocesorowego :

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
David Foster
źródło
To jest dobra odpowiedź. Jak zidentyfikować na podstawie wyniku zadań związanych z we / wy przy użyciu concurrent.futures, które z nich zostało zakończone? Zasadniczo zamiast funkcji lamba, jeśli mamy normalne funkcje, jak zidentyfikować wynik odwzorowany na wywoływaną funkcję?
Tragaknight
Nieważne znalazłem sposób - zamiast tego run_cpu_tasks_in_parallel ([lambda: print ('CPU task 1 running!'), Lambda: print ('CPU task 2 running!'),]) Użyj tego - results = run_io_tasks_in_parallel ([lambda: {'is_something1': func1 ()}, lambda: {'is_something2': func2 ()},])
Tragaknight
5

Jeśli jesteś użytkownikiem systemu Windows i korzystasz z Pythona 3, ten post pomoże ci w programowaniu równoległym w pythonie. Po uruchomieniu zwykłego programowania puli biblioteki wieloprocesorowej, otrzymasz błąd dotyczący głównej funkcji w twoim programie. Wynika to z faktu, że okna nie mają funkcji fork (). Poniższy post przedstawia rozwiązanie wspomnianego problemu.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Ponieważ używałem Pythona 3, zmieniłem program trochę w ten sposób:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

Po tej funkcji powyższy kod problemu również zmienia się trochę w ten sposób:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

Otrzymałem wynik jako:

[1, 8, 27, 64, 125, 216]

Myślę, że ten post może być przydatny dla niektórych użytkowników systemu Windows.

Arun Sooraj
źródło
4

Nie ma sposobu, aby zagwarantować, że dwie funkcje zostaną zsynchronizowane ze sobą, co wydaje się być tym, co chcesz zrobić.

Najlepsze, co możesz zrobić, to podzielić funkcję na kilka kroków, a następnie poczekać, aż oba zakończą się w krytycznych punktach synchronizacji, używając Process.joinwzmianek w odpowiedzi jak @ aix.

Jest to lepsze niż to, time.sleep(10)że nie możesz zagwarantować dokładnych czasów. Z jawnym czekaniem mówisz, że funkcje muszą zostać wykonane, wykonując ten krok przed przejściem do następnego, zamiast zakładać, że zostanie to wykonane w ciągu 10 ms, co nie jest gwarantowane na podstawie tego, co jeszcze dzieje się na komputerze.

Davy8
źródło
1

Wygląda na to, że masz jedną funkcję, którą musisz wywołać dla dwóch różnych parametrów. Można to elegancko zrobić używając kombinacji concurrent.futuresi mapz Pythonem 3.2+

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Teraz, jeśli twoja operacja jest związana z IO, możesz użyć ThreadPoolExecutorjako takiego:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Zwróć uwagę, w jaki sposób mapzostała użyta mapTwoja funkcja do listy argumentów.

Teraz, jeśli twoja funkcja jest związana z procesorem, możesz użyć ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Jeśli nie masz pewności, możesz po prostu wypróbować oba i sprawdzić, który z nich daje lepsze wyniki.

Wreszcie, jeśli chcesz wydrukować wyniki, możesz po prostu zrobić to:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
BICube
źródło