Pula wątków podobna do puli wieloprocesowej?

347

Czy istnieje klasa Pool dla wątków roboczych , podobna do klasy Pool modułu wieloprocesowego ?

Podoba mi się na przykład prosty sposób na zrównoleglenie funkcji mapy

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

chciałbym to jednak zrobić bez konieczności tworzenia nowych procesów.

Wiem o GIL. Jednak w moim przypadku użycia funkcja będzie funkcją C związaną z IO, dla której opakowanie Pythona zwolni GIL przed faktycznym wywołaniem funkcji.

Czy muszę pisać własną pulę wątków?

Jaskółka oknówka
źródło
Oto coś, co wygląda obiecująco w książce kucharskiej Python: Przepis 576519: Pula wątków z tym samym interfejsem API co przetwarzanie (multi).
Pula
1
W dzisiejszych czasach jest to wbudowane w: from multiprocessing.pool import ThreadPool.
martineau,
Czy możesz to rozwinąć I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call.?
mrgloom

Odpowiedzi:

448

Właśnie dowiedziałem się, że faktycznie jest wątek oparte interfejs Basen w multiprocessingmodule, jednak jest nieco ukryte i nie są odpowiednio udokumentowane.

Można go zaimportować za pośrednictwem

from multiprocessing.pool import ThreadPool

Zaimplementowano go za pomocą obojętnej klasy Process owijającej wątek python. Wątek oparte klasy procesu można znaleźć w multiprocessing.dummyktórych jest mowa w skrócie w Dokumentach . Ten fałszywy moduł podobno zapewnia cały interfejs wieloprocesowy oparty na wątkach.

Jaskółka oknówka
źródło
5
To cudownie. Miałem problem z tworzeniem ThreadPools poza głównym wątkiem, możesz jednak użyć ich z wątku podrzędnego po utworzeniu. Włożyłem
Olson
82
Nie rozumiem, dlaczego ta klasa nie ma dokumentacji. Takie klasy pomocnicze są obecnie tak ważne.
Wernight
18
@Wernight: nie jest publicznie przede wszystkim dlatego, że nikt nie zaoferował łaty, która zapewnia (lub coś podobnego) jako gwintowanie. Narzędzie wątku, w tym dokumentację i testy. Byłoby rzeczywiście dobrą baterią do włączenia do standardowej biblioteki, ale nie stanie się tak, jeśli nikt tego nie napisze. Jedną z zalet tej istniejącej implementacji w procesach wieloprocesowych jest to, że powinna ona znacznie ułatwić pisanie takich łatek do wątków ( docs.python.org/devguide )
ncoghlan
3
@ daniel.gindi: multiprocessing.dummy.Pool/ multiprocessing.pool.ThreadPoolsą tym samym i oba są pulami wątków. Naśladują interfejs puli procesów, ale są całkowicie zaimplementowane pod kątem wątków. Zapoznaj się ponownie z dokumentami, masz to wstecz.
ShadowRanger
9
@ daniel.gindi: Czytaj dalej : „ multiprocessing.dummyreplikuje interfejs API, multiprocessingale nie jest niczym więcej niż opakowaniem wokół threadingmodułu”. multiprocessingogólnie rzecz biorąc dotyczy procesów, ale aby umożliwić przełączanie między procesami i wątkami, (przeważnie) zreplikowali multiprocessinginterfejs API multiprocessing.dummy, ale wsparli go wątkami, a nie procesami. Celem jest umożliwienie import multiprocessing.dummy as multiprocessingzmiany kodu opartego na procesie na oparty na wątkach.
ShadowRanger
236

W Pythonie 3 możesz używać concurrent.futures.ThreadPoolExecutor, tj .:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

Zobacz dokumentację, aby uzyskać więcej informacji i przykładów.

Adrian Adamiak
źródło
6
w celu wykorzystania Moduł przeniesione futures, uruchomsudo pip install futures
Yair
jest to najbardziej wydajny i najszybszy sposób na przetwarzanie wielokrotne
Haritsinh Gohil
2
Jaka jest różnica między używaniem ThreadPoolExecutora multiprocessing.dummy.Pool?
Jay
63

Tak i wydaje się, że ma (mniej więcej) ten sam interfejs API.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....
opłaty wojenne
źródło
9
Ścieżka importu dla ThreadPooljest inna niż Pool. Prawidłowy import to from multiprocessing.pool import ThreadPool.
Marigold,
2
O dziwo, nie jest to udokumentowane API, a multiprocessing.pool jest tylko krótko wspomniany jako zapewniający AsyncResult. Ale jest dostępny w wersjach 2.xi 3.x.
Marvin
2
Właśnie tego szukałem. To tylko jedna linia importu i mała zmiana w mojej istniejącej linii puli i działa idealnie.
Danegraphics,
39

Dla czegoś bardzo prostego i lekkiego (nieco zmodyfikowanego stąd ):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

Aby obsługiwać połączenia zwrotne po zakończeniu zadania, możesz po prostu dodać połączenie zwrotne do krotki zadania.

dgorissen
źródło
jak wątki mogą się kiedykolwiek łączyć, jeśli bezwarunkowo nieskończoną pętlę?
Joseph Garvin,
@JosephGarvin Przetestowałem to, a wątki blokują się w pustej kolejce (ponieważ wywołanie Queue.get()to blokuje) aż do zakończenia programu, po czym są automatycznie kończone.
forumulator
@JosephGarvin, dobre pytanie. Queue.join()faktycznie dołączy do kolejki zadań, a nie wątków roboczych. Tak więc, gdy kolejka jest pusta, wait_completionzwraca, kończy program i wątki są zbierane przez system operacyjny.
randomir
Jeśli cały ten kod jest zawinięty w schludną funkcję, wydaje się, że nie zatrzymuje wątków, nawet gdy kolejka jest pusta i pool.wait_completion()wraca. W rezultacie nici wciąż się budują.
ubiquibacon
17

Witaj, aby użyć puli wątków w Pythonie, możesz użyć tej biblioteki:

from multiprocessing.dummy import Pool as ThreadPool

a następnie do użytku, ta biblioteka robi to tak:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

Wątki to żądana liczba wątków, a zadania to lista zadań, które najczęściej są mapowane do usługi.

Manochehr Rasouli
źródło
Dzięki, to świetna sugestia! Z dokumentacji: multiprocessing.dummy replikuje interfejs API wieloprzetwarzania, ale jest niczym więcej niż opakowaniem wokół modułu wątków. Jedna korekta - myślę, że chcesz powiedzieć, że interfejs API puli jest (funkcja, iterowalna)
layser
2
Brakowało nam .close()i .join()połączeń i który powoduje .map(), aby zakończyć, zanim wszystkie wątki są zakończone. Tylko ostrzeżenie.
Anatolij Scherbakov,
8

Oto wynik, którego w końcu skończyłem. To zmodyfikowana wersja klas autorstwa dgorissen powyżej.

Plik: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

Aby skorzystać z puli

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()
forumulator
źródło
Adnotacja dla innych czytelników: Ten kod to Python 3 (shebang #!/usr/bin/python3)
Daniel Marschall,
Dlaczego używasz, for i, d in enumerate(delays):a następnie ignorujesz iwartość?
martineau
@martineau - prawdopodobnie relikt z rozwoju, w którym prawdopodobnie chcieli wydrukować ipodczas biegu.
n1k31t4
Dlaczego create_tasktam jest Po co to jest?
Mr
Nie mogę uwierzyć i odpowiedź z 4 głosami na SO jest sposobem na wykonanie ThreadPooling w Pythonie. Threadpool w oficjalnej dystrybucji Pythona jest nadal uszkodzony? czego mi brakuje?
Mr
2

Narzut związany z tworzeniem nowych procesów jest minimalny, zwłaszcza gdy są to tylko 4 z nich. Wątpię, aby była to największa zaleta wydajności Twojej aplikacji. Uprość to, zoptymalizuj, gdzie musisz i gdzie wskazują wyniki profilowania.

unbeli
źródło
5
Jeśli pytający jest w systemie Windows (co, jak sądzę, nie określił), myślę, że rozpad procesu może być znaczącym wydatkiem. Przynajmniej dotyczy to projektów, które ostatnio realizowałem. :-)
Brandon Rhodes
1

Nie ma wbudowanej puli opartej na wątkach. Jednak implementacja kolejki producent / konsument z Queueklasą może być bardzo szybka .

Od: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
Yann Ramin
źródło
3
Nie dotyczy to już concurrent.futuresmodułu.
Thanatos
11
Nie sądzę, żeby to już była prawda. from multiprocessing.pool import ThreadPool
Randall Hunt
0

innym sposobem może być dodanie procesu do wątkowej puli kolejek

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(0, len(list_of_files) - 1):
        a = executor.submit(loop_files2, i, list_of_files2, mt_list, temp_path, mt_dicto)
pelos
źródło