Próbowałem przeczytać dokumentację pod adresem http://docs.python.org/dev/library/multiprocessing.html, ale wciąż mam problemy z kolejką, pulą i blokowaniem wieloprocesorowym. I na razie udało mi się zbudować poniższy przykład.
Jeśli chodzi o kolejkę i pulę, nie jestem pewien, czy dobrze zrozumiałem koncepcję, więc popraw mnie, jeśli się mylę. To, co próbuję osiągnąć, to przetworzyć 2 żądania naraz (lista danych ma 8 w tym przykładzie), więc czego powinienem użyć? Pula, aby utworzyć 2 procesy, które mogą obsługiwać dwie różne kolejki (maksymalnie 2), czy powinienem po prostu użyć kolejki do przetwarzania 2 danych wejściowych za każdym razem? Blokada polegałaby na poprawnym wydrukowaniu wyjść.
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
if __name__ == '__main__':
mp_handler(data)
źródło
var1
całkowicie usunąłem , odnosząc siędata
zamiast tego do globalnego .To może nie być w 100% związane z pytaniem, ale podczas moich poszukiwań przykładu użycia przetwarzania wieloprocesowego z kolejką pojawia się to jako pierwsze w Google.
Jest to podstawowa klasa przykładowa, w której można tworzyć instancje i umieszczać elementy w kolejce oraz czekać, aż kolejka zostanie zakończona. To wszystko, czego potrzebowałem.
from multiprocessing import JoinableQueue from multiprocessing.context import Process class Renderer: queue = None def __init__(self, nb_workers=2): self.queue = JoinableQueue() self.processes = [Process(target=self.upload) for i in range(nb_workers)] for p in self.processes: p.start() def render(self, item): self.queue.put(item) def upload(self): while True: item = self.queue.get() if item is None: break # process your item here self.queue.task_done() def terminate(self): """ wait until queue is empty and terminate processes """ self.queue.join() for p in self.processes: p.terminate() r = Renderer() r.render(item1) r.render(item2) r.terminate()
źródło
item1
iitem2
? Czy są to jakieś zadania lub funkcje, które będą wykonywane w dwóch różnych procesach?Oto moje osobiste podejście do tego tematu:
Streszczenie tutaj (mile widziane prośby o ściągnięcie!): Https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec
import multiprocessing import sys THREADS = 3 # Used to prevent multiple threads from mixing thier output GLOBALLOCK = multiprocessing.Lock() def func_worker(args): """This function will be called by each thread. This function can not be a class method. """ # Expand list of args into named args. str1, str2 = args del args # Work # ... # Serial-only Portion GLOBALLOCK.acquire() print(str1) print(str2) GLOBALLOCK.release() def main(argp=None): """Multiprocessing Spawn Example """ # Create the number of threads you want pool = multiprocessing.Pool(THREADS) # Define two jobs, each with two args. func_args = [ ('Hello', 'World',), ('Goodbye', 'World',), ] try: # Spawn up to 9999999 jobs, I think this is the maximum possible. # I do not know what happens if you exceed this. pool.map_async(func_worker, func_args).get(9999999) except KeyboardInterrupt: # Allow ^C to interrupt from any thread. sys.stdout.write('\033[0m') sys.stdout.write('User Interupt\n') pool.close() if __name__ == '__main__': main()
źródło
get()
to limit czasu, nie ma nic wspólnego z liczbą uruchomionych zadań..get(timeout=1)
? i czy można po prostu powiedzieć,.get()
aby uzyskać pełną listę?.get()
czeka w nieskończoność, aż wszystkie wyniki będą dostępne i zwraca listę wyników. Możesz użyć pętli odpytywania, aby sprawdzić, czy są dostępne wyniki pogodowe, lub możesz przekazać funkcję zwrotną wmap_async()
wywołaniu, która zostanie następnie wywołana dla każdego wyniku, gdy stanie się dostępny.Dla wszystkich korzystających z edytorów takich jak Komodo Edit (win10) dodaj
sys.stdout.flush()
:def mp_worker((inputs, the_time)): print " Process %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs sys.stdout.flush()
lub jako pierwsza linia do:
if __name__ == '__main__': sys.stdout.flush()
Pomaga to zobaczyć, co się dzieje podczas wykonywania skryptu; zamiast patrzeć na czarne pole wiersza poleceń.
źródło
Oto przykład z mojego kodu (dla puli wątkowej, ale po prostu zmień nazwę klasy, a będziesz mieć pulę procesów):
def execute_run(rp): ... do something pool = ThreadPoolExecutor(6) for mat in TESTED_MATERIAL: for en in TESTED_ENERGIES: for ecut in TESTED_E_CUT: rp = RunParams( simulations, DEST_DIR, PARTICLE, mat, 960, 0.125, ecut, en ) pool.submit(execute_run, rp) pool.join()
Gruntownie:
pool = ThreadPoolExecutor(6)
tworzy pulę na 6 wątkówpool.submit(execute_run, rp)
dodaje zadanie do puli, pierwszy arogument to funkcja wywoływana w wątku / procesie, reszta argumentów jest przekazywana do wywoływanej funkcji.pool.join
czeka, aż wszystkie zadania zostaną wykonane.źródło
concurrent.futures
, ale OP pyta omultiprocessing
i Python 2.7.