Martwy prosty przykład użycia kolejki wieloprocesowej, puli i blokowania

91

Próbowałem przeczytać dokumentację pod adresem http://docs.python.org/dev/library/multiprocessing.html, ale wciąż mam problemy z kolejką, pulą i blokowaniem wieloprocesorowym. I na razie udało mi się zbudować poniższy przykład.

Jeśli chodzi o kolejkę i pulę, nie jestem pewien, czy dobrze zrozumiałem koncepcję, więc popraw mnie, jeśli się mylę. To, co próbuję osiągnąć, to przetworzyć 2 żądania naraz (lista danych ma 8 w tym przykładzie), więc czego powinienem użyć? Pula, aby utworzyć 2 procesy, które mogą obsługiwać dwie różne kolejki (maksymalnie 2), czy powinienem po prostu użyć kolejki do przetwarzania 2 danych wejściowych za każdym razem? Blokada polegałaby na poprawnym wydrukowaniu wyjść.

import multiprocessing
import time

data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)


def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
        p.start()


def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

if __name__ == '__main__':
    mp_handler(data)
thclpr
źródło

Odpowiedzi:

129

Najlepszym rozwiązaniem Twojego problemu jest użycie pliku Pool. Używanie Queues i posiadanie oddzielnej funkcji „karmienia z kolejki” jest prawdopodobnie przesadą.

Oto nieco zmieniona wersja twojego programu, tym razem z tylko 2 procesami umieszczonymi w pliku Pool. Uważam, że jest to najłatwiejsza droga, przy minimalnych zmianach w oryginalnym kodzie:

import multiprocessing
import time

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

Zwróć uwagę, że mp_worker()funkcja przyjmuje teraz pojedynczy argument (krotkę dwóch poprzednich argumentów), ponieważ map()dzieli dane wejściowe na podlisty, z których każda jest podawana jako pojedynczy argument funkcji roboczej.

Wynik:

Processs a  Waiting 2 seconds
Processs b  Waiting 4 seconds
Process a   DONE
Processs c  Waiting 6 seconds
Process b   DONE
Processs d  Waiting 8 seconds
Process c   DONE
Processs e  Waiting 1 seconds
Process e   DONE
Processs f  Waiting 3 seconds
Process d   DONE
Processs g  Waiting 5 seconds
Process f   DONE
Processs h  Waiting 7 seconds
Process g   DONE
Process h   DONE

Edytuj zgodnie z komentarzem @Thales poniżej:

Jeśli chcesz mieć „blokadę dla każdego limitu puli”, aby procesy działały w tandemach, ala:

Czekam B czekam | Gotowe, B gotowe | C czekam, D czekam | C gotowe, D gotowe | ...

następnie zmień funkcję obsługi, aby uruchamiała pule (z 2 procesów) dla każdej pary danych:

def mp_handler():
    subdata = zip(data[0::2], data[1::2])
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2)
        p.map(mp_worker, (task1, task2))

Teraz twój wynik to:

 Processs a Waiting 2 seconds
 Processs b Waiting 4 seconds
 Process a  DONE
 Process b  DONE
 Processs c Waiting 6 seconds
 Processs d Waiting 8 seconds
 Process c  DONE
 Process d  DONE
 Processs e Waiting 1 seconds
 Processs f Waiting 3 seconds
 Process e  DONE
 Process f  DONE
 Processs g Waiting 5 seconds
 Processs h Waiting 7 seconds
 Process g  DONE
 Process h  DONE
Velimir Mlaker
źródło
Dziękuję za prosty i bezpośredni przykład, jak to zrobić, ale jak mogę zastosować blokadę dla każdego limitu puli? To znaczy, jeśli wykonasz kod, chciałbym zobaczyć coś w stylu „A czeka B czeka | A gotowe, b gotowe | C czeka, D czeka | C gotowe, D gotowe”
thclpr
2
Innymi słowy, nie chcesz, aby C zaczynał się, dopóki A i B nie zostaną zakończone?
Velimir Mlaker
Dokładnie, mogę to zrobić przy użyciu przetwarzania wieloprocesowego, ale nie wiem, jak to zrobić za pomocą puli
thclpr
Dziękuję bardzo, działaj zgodnie z przeznaczeniem, ale w funkcji mp_handler odwołujesz się do zmiennej data zamiast var1 :)
thclpr
Okej, dzięki, var1całkowicie usunąłem , odnosząc się datazamiast tego do globalnego .
Velimir Mlaker
8

To może nie być w 100% związane z pytaniem, ale podczas moich poszukiwań przykładu użycia przetwarzania wieloprocesowego z kolejką pojawia się to jako pierwsze w Google.

Jest to podstawowa klasa przykładowa, w której można tworzyć instancje i umieszczać elementy w kolejce oraz czekać, aż kolejka zostanie zakończona. To wszystko, czego potrzebowałem.

from multiprocessing import JoinableQueue
from multiprocessing.context import Process


class Renderer:
    queue = None

    def __init__(self, nb_workers=2):
        self.queue = JoinableQueue()
        self.processes = [Process(target=self.upload) for i in range(nb_workers)]
        for p in self.processes:
            p.start()

    def render(self, item):
        self.queue.put(item)

    def upload(self):
        while True:
            item = self.queue.get()
            if item is None:
                break

            # process your item here

            self.queue.task_done()

    def terminate(self):
        """ wait until queue is empty and terminate processes """
        self.queue.join()
        for p in self.processes:
            p.terminate()

r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()
linqu
źródło
2
Jakie są item1i item2? Czy są to jakieś zadania lub funkcje, które będą wykonywane w dwóch różnych procesach?
Zelphir Kaltstahl
2
tak, są to zadania lub parametry wejściowe, które są przetwarzane równolegle.
linqu
8

Oto moje osobiste podejście do tego tematu:

Streszczenie tutaj (mile widziane prośby o ściągnięcie!): Https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

import multiprocessing
import sys

THREADS = 3

# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()


def func_worker(args):
    """This function will be called by each thread.
    This function can not be a class method.
    """
    # Expand list of args into named args.
    str1, str2 = args
    del args

    # Work
    # ...



    # Serial-only Portion
    GLOBALLOCK.acquire()
    print(str1)
    print(str2)
    GLOBALLOCK.release()


def main(argp=None):
    """Multiprocessing Spawn Example
    """
    # Create the number of threads you want
    pool = multiprocessing.Pool(THREADS)

    # Define two jobs, each with two args.
    func_args = [
        ('Hello', 'World',), 
        ('Goodbye', 'World',), 
    ]


    try:
        # Spawn up to 9999999 jobs, I think this is the maximum possible.
        # I do not know what happens if you exceed this.
        pool.map_async(func_worker, func_args).get(9999999)
    except KeyboardInterrupt:
        # Allow ^C to interrupt from any thread.
        sys.stdout.write('\033[0m')
        sys.stdout.write('User Interupt\n')
    pool.close()

if __name__ == '__main__':
    main()
ThorSummoner
źródło
1
Nie jestem do końca pewien, czy .map_async () jest w jakikolwiek sposób lepsze niż .map ().
ThorSummoner
3
Argument do get()to limit czasu, nie ma nic wspólnego z liczbą uruchomionych zadań.
mata
@mata, więc czy to ma być używane w pętli odpytywania? .get(timeout=1)? i czy można po prostu powiedzieć, .get()aby uzyskać pełną listę?
ThorSummoner
Tak, .get()czeka w nieskończoność, aż wszystkie wyniki będą dostępne i zwraca listę wyników. Możesz użyć pętli odpytywania, aby sprawdzić, czy są dostępne wyniki pogodowe, lub możesz przekazać funkcję zwrotną w map_async()wywołaniu, która zostanie następnie wywołana dla każdego wyniku, gdy stanie się dostępny.
mata
2

Dla wszystkich korzystających z edytorów takich jak Komodo Edit (win10) dodaj sys.stdout.flush():

def mp_worker((inputs, the_time)):
    print " Process %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs
    sys.stdout.flush()

lub jako pierwsza linia do:

    if __name__ == '__main__':
       sys.stdout.flush()

Pomaga to zobaczyć, co się dzieje podczas wykonywania skryptu; zamiast patrzeć na czarne pole wiersza poleceń.

ZF007
źródło
1

Oto przykład z mojego kodu (dla puli wątkowej, ale po prostu zmień nazwę klasy, a będziesz mieć pulę procesów):

def execute_run(rp): 
   ... do something 

pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
    for en in TESTED_ENERGIES:
        for ecut in TESTED_E_CUT:
            rp = RunParams(
                simulations, DEST_DIR,
                PARTICLE, mat, 960, 0.125, ecut, en
            )
            pool.submit(execute_run, rp)
pool.join()

Gruntownie:

  • pool = ThreadPoolExecutor(6) tworzy pulę na 6 wątków
  • Następnie masz kilka for, które dodają zadania do puli
  • pool.submit(execute_run, rp) dodaje zadanie do puli, pierwszy arogument to funkcja wywoływana w wątku / procesie, reszta argumentów jest przekazywana do wywoływanej funkcji.
  • pool.join czeka, aż wszystkie zadania zostaną wykonane.
jb.
źródło
2
Zauważ, że używasz concurrent.futures, ale OP pyta o multiprocessingi Python 2.7.
Tim Peters