Jak uzyskać wartość zwracaną z wątku w pythonie?

342

Poniższa funkcja foozwraca ciąg znaków 'foo'. Jak mogę uzyskać wartość 'foo'zwracaną z celu wątku?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

Przedstawiony powyżej „oczywisty sposób na zrobienie tego” nie działa: thread.join()powrócił None.

wim
źródło

Odpowiedzi:

37

W Python 3.2+ concurrent.futuresmoduł stdlib zapewnia interfejs API wyższego poziomu threading, w tym przekazywanie wartości zwracanych lub wyjątków z wątku roboczego z powrotem do głównego wątku:

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)
Ramarao Amara
źródło
Dla tych, którzy zastanawiają się, można to zrobić za pomocą listy wątków. futures = [executor.submit(foo, param) for param in param_list]Kolejność zostanie utrzymana, a zamknięcie withumożliwi zbieranie wyników. [f.result() for f in futures]
jayreed1
273

FWIW, multiprocessingmoduł ma ładny interfejs do tego przy użyciu Poolklasy. A jeśli chcesz pozostać przy wątkach, a nie procesach, możesz po prostu użyć tej multiprocessing.pool.ThreadPoolklasy jako zamiennika.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.
Jake Biesinger
źródło
50
@JakeBiesinger Chodzi mi o to, że szukałem odpowiedzi, jak uzyskać odpowiedź z Wątku, przybyłem tutaj i zaakceptowana odpowiedź nie odpowiada na zadane pytanie. Różnicuję wątki i procesy. Wiem o Global Interpreter Lock, ale pracuję nad problemem związanym z We / Wy, więc wątki są w porządku, nie potrzebuję procesów. Inne odpowiedzi tutaj lepiej odpowiedzieć na pytanie.
omikron
7
@omikron Ale wątki w python nie zwracają odpowiedzi, chyba że użyjesz podklasy, która włącza tę funkcjonalność. Spośród możliwych podklas, ThreadPools to świetny wybór (wybierz # wątków, użyj map / Apply w / sync / async). Mimo że są importowane multiprocess, nie mają one nic wspólnego z procesami.
Jake Biesinger
4
@JakeBiesinger Oh, jestem ślepy. Przepraszam za moje niepotrzebne komentarze. Masz rację. Właśnie założyłem, że wieloprocesowość = procesy.
omikron
12
Nie zapomnij ustawić processes=1więcej niż jednego, jeśli masz więcej wątków!
iman
4
Problem z wieloprocesowością i pulą wątków polega na tym, że konfiguracja i uruchamianie wątków jest znacznie wolniejsze w porównaniu z podstawową biblioteką wątków. Jest świetny do rozpoczynania wątków długo działających, ale pokonuje cel, gdy trzeba rozpocząć wiele wątków krótko działających. Rozwiązanie użycia „wątków” i „kolejki” udokumentowane w innych odpowiedziach tutaj jest moim zdaniem lepszą alternatywą dla tego drugiego przypadku użycia.
Yves Dorfsman,
242

Jednym ze sposobów, które widziałem, jest przekazanie modyfikowalnego obiektu, takiego jak lista lub słownik, do konstruktora wątku wraz z indeksem lub innym identyfikatorem. Wątek może następnie zapisać wyniki w dedykowanym gnieździe w tym obiekcie. Na przykład:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

Jeśli naprawdę chcesz join()zwrócić zwracaną wartość wywoływanej funkcji, możesz to zrobić za pomocą Threadpodklasy, takiej jak poniżej:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

To staje się trochę owłosione z powodu zmiany nazwy i uzyskuje dostęp do „prywatnych” struktur danych specyficznych dla Threadimplementacji… ale działa.

Dla python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return
kindall
źródło
37
super, dzięki za przykład! Zastanawiam się, dlaczego Wątek nie został zaimplementowany z obsługą wartości zwracanej, wydaje się to dość oczywistą rzeczą do obsługi.
wim
16
Myślę, że to powinna być zaakceptowana odpowiedź - OP poprosił o threading, a nie inną bibliotekę do wypróbowania, a ograniczenie wielkości puli wprowadza dodatkowy potencjalny problem, który miał miejsce w moim przypadku.
domoarigato,
10
Świetny dowcip o pociągu.
meawoppl
7
Na python3 zwraca to TypeError: __init__() takes from 1 to 6 positional arguments but 7 were given. Jakiś sposób to naprawić?
GuySoft,
2
Ostrzeżenie dla każdego, kto skusi się na zrobienie drugiego z nich ( _Thread__targetrzecz). Sprawisz, że każdy, kto będzie próbował przenieść Twój kod do Pythona 3, będzie cię nienawidził, dopóki nie zorientuje się, co zrobiłeś (z powodu używania nieudokumentowanych funkcji, które zmieniły się między 2 a 3). Dobrze udokumentuj swój kod.
Ben Taylor,
84

Odpowiedź Jake'a jest dobra, ale jeśli nie chcesz korzystać z puli wątków (nie wiesz, ile wątków będziesz potrzebować, ale utwórz je w razie potrzeby), dobrym sposobem na przesyłanie informacji między wątkami jest wbudowane Klasa Queue.Queue , ponieważ zapewnia bezpieczeństwo wątków.

Utworzyłem następujący dekorator, aby działał podobnie do puli wątków:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Następnie używasz go jako:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

Udekorowana funkcja tworzy nowy wątek za każdym razem, gdy jest wywoływana, i zwraca obiekt Thread zawierający kolejkę, która otrzyma wynik.

AKTUALIZACJA

Minęło sporo czasu, odkąd opublikowałem tę odpowiedź, ale wciąż uzyskuje widoki, więc pomyślałem, że zaktualizuję ją, aby odzwierciedlić sposób, w jaki to robię w nowszych wersjach Pythona:

Dodano concurrent.futuresmoduł Python 3.2, który zapewnia interfejs wysokiego poziomu do zadań równoległych. Zapewnia ThreadPoolExecutori ProcessPoolExecutor, dzięki czemu można używać puli wątków lub procesów z tym samym interfejsem API.

Jedną z zalet tego interfejsu API jest to, że przekazanie zadania do obiektu Executorzwraca Futureobiekt, który zostanie uzupełniony o wartość zwracaną przez użytkownika.

To sprawia, że ​​dołączanie queueobiektu nie jest konieczne, co znacznie upraszcza dekorator:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

Spowoduje to użycie domyślnego modułu wykonującego pulę wątków, jeśli nie zostanie przekazany.

Użycie jest bardzo podobne do wcześniejszego:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Jeśli używasz Python 3.4+, jeden naprawdę miłą cechę tą metodą (i Przyszłość obiektów w ogóle) jest to, że przyszłość może być zwrócony zawinięte do przekształcić go asyncio.Futurez asyncio.wrap_future. To sprawia, że ​​łatwo współpracuje z coroutines:

result = await asyncio.wrap_future(long_task(10))

Jeśli nie potrzebujesz dostępu do concurrent.Futureobiektu bazowego , możesz dołączyć zawijanie do dekoratora:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

Następnie, ilekroć chcesz zepchnąć intensywny procesor lub blokować kod z wątku pętli zdarzeń, możesz umieścić go w ozdobnej funkcji:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
bj0
źródło
Nie mogę tego zmusić do działania; AttributeError: 'module' object has no attribute 'Lock'Pojawia się błąd stwierdzający, że wydaje się, że pochodzi z linii y = long_task(10)... myśli?
sadmicrowave
1
Kod nie używa jawnie blokady, więc problem może znajdować się gdzie indziej w kodzie. Możesz opublikować nowe pytanie SO na ten temat
bj0
Dlaczego wynik_kolejki jest atrybutem instancji? Czy byłoby lepiej, gdyby był to atrybut klasy, aby użytkownicy nie musieli wiedzieć, jak wywoływać wynik_kolejkę przy użyciu @wątku, co nie jest jednoznaczne i dwuznaczne?
nonbot
@ t88, nie jestem pewien, co masz na myśli, potrzebujesz dostępu do wyniku, co oznacza, że ​​musisz wiedzieć, do kogo zadzwonić. Jeśli chcesz, aby było to coś innego, możesz podklasować wątek i robić, co chcesz (to było proste rozwiązanie). Powodem, dla którego kolejka musi być dołączona do wątku, jest to, że wiele wywołań / funkcji ma swoje własne kolejki
bj0
1
To jest genialne! Dziękuję Ci bardzo.
Ganesh Kathiresan
53

Inne rozwiązanie, które nie wymaga zmiany istniejącego kodu:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

Można go również łatwo dostosować do środowiska wielowątkowego:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result
Arik
źródło
t = Wątek (cel = lambda q, arg1: q.put (foo (arg1)), args = (que, 'world!')) co robi q.put tutaj, co robi Queue.Queue ()
vijay shanker
6
W twoim rodzinnym mieście powinna znajdować się twoja statua, dziękuję!
Onilol,
3
@Onilol - Dziękuję bardzo. Twój komentarz jest właśnie powodem, dla którego to robię :)
Arik
4
W przypadku Python3 należy zmienić na from queue import Queue.
Gino Mempin
1
Wydaje się, że jest to najmniej zakłócająca metoda (nie ma potrzeby radykalnej restrukturyzacji pierwotnego kodu źródłowego), aby umożliwić powrót wartości zwracanej do głównego wątku.
Fanchen Bao,
24

Parris / kindall's odpowiedź join / returnodpowiedź przeniesiona do Pythona 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Uwaga: Threadklasa jest zaimplementowana inaczej w Pythonie 3.

GuySoft
źródło
1
join przyjmuje parametr limitu czasu, który należy przekazać
cz
22

Ukradłem odpowiedź życzliwą i trochę ją posprzątałem.

Kluczową częścią jest dodanie * args i ** kwargs do join () w celu obsłużenia limitu czasu

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

ZAKTUALIZOWANO ODPOWIEDŹ PONIŻEJ

To moja najpopularniejsza odpowiedź, więc postanowiłem zaktualizować kod, który będzie działał zarówno na py2, jak i py3.

Dodatkowo widzę wiele odpowiedzi na to pytanie, które pokazują brak zrozumienia odnośnie Thread.join (). Niektóre całkowicie nie radzą sobie z timeoutarg. Ale jest też przypadek narożny, o którym powinieneś wiedzieć, jeśli masz (1) funkcję docelową, która może zwrócić Nonei (2) przekazujesz również timeoutargument arg, aby dołączyć (). Zobacz „TEST 4”, aby zrozumieć ten przypadek narożny.

Klasa ThreadWithReturn, która działa z py2 i py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Niektóre przykładowe testy pokazano poniżej:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Czy potrafisz zidentyfikować przypadek, w którym możemy spotkać się z TESTEM 4?

Problem polega na tym, że oczekujemy, że metoda returnMe () zwróci None (patrz TEST 2), ale spodziewamy się również, że funkcja join () zwróci None, jeśli upłynie limit czasu.

returned is None oznacza albo:

(1) właśnie to zwrócił returnMe () lub

(2) Upłynął limit czasu dołączenia ()

Ten przykład jest trywialny, ponieważ wiemy, że giveMe () zawsze zwróci None. Ale w rzeczywistej instancji (gdzie cel może zgodnie z prawem zwrócić Brak lub coś innego) chcielibyśmy wyraźnie sprawdzić, co się stało.

Poniżej znajduje się sposób rozwiązania tego przypadku:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()
użytkownik2426679
źródło
Czy znasz odpowiednik _Thread_target dla Python3? Ten atrybut nie istnieje w Python3.
GreySage
Zajrzałem do pliku threading.py, okazuje się, że jest to _target (inne atrybuty mają podobną nazwę).
GreySage
Można uniknąć dostępu do zmiennych prywatnych klasy gwintu, jeśli zapisać target, argsi kwargsargumentów init, jak zmienne składowe w swojej klasie.
Tolli,
@GreySage Zobacz moją odpowiedź, przeniosłem ten blok do python3 poniżej
GuySoft
Odpowiedź @GreySage obsługuje teraz py2 i py3
user2426679,
15

Korzystanie z kolejki:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
użytkownik341143
źródło
1
Naprawdę podoba mi się to rozwiązanie, krótkie i słodkie. Jeśli funkcja odczytuje kolejki wejściowej i dodasz do out_queue1trzeba będzie pętli nad out_queue1.get()i złapać wyjątek Queue.Empty: ret = [] ; try: ; while True; ret.append(out_queue1.get(block=False)) ; except Queue.Empty: ; pass. Średniki do symulacji przerwania linii.
sastorsl,
6

Moim rozwiązaniem tego problemu jest zawinięcie funkcji i wątku w klasę. Nie wymaga używania pul, kolejek ani przekazywania zmiennych typu c. To również nie blokuje. Zamiast tego sprawdzasz status. Zobacz przykład, jak go używać na końcu kodu.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
Peter Lonjers
źródło
jak poradziłbyś sobie z wyjątkiem? powiedzmy, że podano funkcję add oraz int i str. czy wszystkie wątki zawiodą, czy tylko jeden zawodzi?
user1745713
4

joinzawsze zwracaj None, myślę, że powinieneś podklasę Threadobsługiwać kody powrotu i tak dalej.

Burza mózgów
źródło
4

Biorąc pod uwagę @iman komentarzu @JakeBiesinger odpowiedź mam kompozyty go mieć różną liczbę wątków:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Twoje zdrowie,

Chłopak.

Guy Avraham
źródło
2

Możesz zdefiniować zmienną powyżej zakresu funkcji wątkowej i dodać do tego wynik. (Zmodyfikowałem również kod, aby był zgodny z python3)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

To wraca {'world!': 'foo'}

Jeśli użyjesz wejścia funkcji jako klucza do wyników, każde unikalne wejście gwarantuje wpis w wynikach

Thijs D.
źródło
2

Używam tego opakowania, które wygodnie włącza dowolną funkcję do uruchamiania w Thread- dbając o jej wartość zwracaną lub wyjątek. Nie dodaje Queuekosztów ogólnych.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Przykłady użycia

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Uwagi na temat threadingmodułu

Wygodna obsługa zwracanych wartości i wyjątków funkcji wątkowej jest częstą potrzebą „Pythonic” i rzeczywiście powinna być już oferowana przez threadingmoduł - być może bezpośrednio w Threadklasie standardowej . ThreadPoolma o wiele za dużo narzutów na proste zadania - 3 zarządzanie wątkami, dużo biurokracji. Niestety Threadukład został pierwotnie skopiowany z Javy - co widać np. Z wciąż bezużytecznego parametru 1. (!) Konstruktora group.

kxr
źródło
pierwszy konstruktor nie jest bezużyteczny, jest tam zarezerwowany do przyszłej implementacji .. z książki kucharskiej programowania równoległego w języku Python
vijay shanker
1

Zdefiniuj cel, aby
1) wziąć argument q
2) zastąpić dowolne instrukcjereturn foo zq.put(foo); return

więc funkcja

def func(a):
    ans = a * a
    return ans

stanie się

def func(a, q):
    ans = a * a
    q.put(ans)
    return

i wtedy postępowałbyś jako taki

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

I możesz użyć dekoratorów / opakowań funkcji, aby móc używać istniejących funkcji targetbez ich modyfikowania, ale postępuj zgodnie z tym podstawowym schematem.

tscizzle
źródło
Powinno byćresults = [ans_q.get() for _ in xrange(len(threads))]
Hemant H Kumar
1

Jak wspomniano, pula wieloprocesowa jest znacznie wolniejsza niż podstawowe wątki. Korzystanie z kolejek zaproponowanych w niektórych odpowiedziach tutaj jest bardzo skuteczną alternatywą. Używam go ze słownikami, aby móc uruchomić wiele małych wątków i odzyskać wiele odpowiedzi, łącząc je ze słownikami:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)
Yves Dorfsman
źródło
1

Idea GuySoft jest świetna, ale myślę, że obiekt niekoniecznie musi dziedziczyć po Thread, a start () można usunąć z interfejsu:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo
pandy.song
źródło
0

Jednym z typowych rozwiązań jest zawinięcie funkcji foodekoratorem

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Wtedy cały kod może tak wyglądać

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Uwaga

Jedną ważną kwestią jest to, że zwracane wartości mogą być nieuporządkowane . (W rzeczywistości return valueniekoniecznie są one zapisywane w queue, ponieważ możesz wybrać dowolną strukturę danych bezpieczną dla wątków )

Odpowiedź777
źródło
0

Dlaczego po prostu nie użyjesz zmiennej globalnej?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)
Alexey
źródło
0

Odpowiedź Kindalla w Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return
SmartManoj
źródło
-2

Jeśli tylko prawda lub fałsz mają być sprawdzane na podstawie wywołania funkcji, prostszym rozwiązaniem, które znajduję, jest aktualizacja globalnej listy.

import threading

lists = {"A":"True", "B":"True"}

def myfunc(name: str, mylist):
    for i in mylist:
        if i == 31:
            lists[name] = "False"
            return False
        else:
            print("name {} : {}".format(name, i))

t1 = threading.Thread(target=myfunc, args=("A", [1, 2, 3, 4, 5, 6], ))
t2 = threading.Thread(target=myfunc, args=("B", [11, 21, 31, 41, 51, 61], ))
t1.start()
t2.start()
t1.join()
t2.join()

for value in lists.values():
    if value == False:
        # Something is suspicious 
        # Take necessary action 

Jest to bardziej pomocne, gdy chcesz sprawdzić, czy któryś z wątków zwrócił fałszywy status, aby podjąć niezbędne działanie.

Sravya
źródło