Mam wiele problemów ze zrozumieniem, jak kolejka wieloprocesorowa działa w Pythonie i jak ją zaimplementować. Powiedzmy, że mam dwa moduły Pythona, które uzyskują dostęp do danych z udostępnionego pliku, nazwijmy te dwa moduły pisarzem i czytnikiem. Mój plan polega na tym, aby zarówno czytający, jak i piszący umieścili żądania w dwóch oddzielnych kolejkach wieloprocesorowych, a następnie trzeci proces zapętla te żądania i wykonuje je jako takie.
Moim głównym problemem jest to, że naprawdę nie wiem, jak poprawnie zaimplementować multiprocessing.queue, nie możesz tak naprawdę utworzyć wystąpienia obiektu dla każdego procesu, ponieważ będą to oddzielne kolejki, jak upewnić się, że wszystkie procesy odnoszą się do wspólnej kolejki (lub w tym przypadku kolejki)
źródło
Odpowiedzi:
To jest prosty przykład czytelnika i pisarza współużytkujących jedną kolejkę… Autor wysyła do czytelnika zbiór liczb całkowitych; kiedy piszącemu skończą się liczby, wysyła „DONE”, co pozwala czytelnikowi wyrwać się z pętli odczytu.
from multiprocessing import Process, Queue import time import sys def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): ## Write to the queue for ii in range(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': pqueue = Queue() # writer() writes to pqueue from _this_ process for count in [10**4, 10**5, 10**6]: ### reader_proc() reads from pqueue as a separate process reader_p = Process(target=reader_proc, args=((pqueue),)) reader_p.daemon = True reader_p.start() # Launch reader_proc() as a separate python process _start = time.time() writer(count, pqueue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print("Sending {0} numbers to Queue() took {1} seconds".format(count, (time.time() - _start)))
źródło
w "
from queue import Queue
" nie ma wywoływanego modułuqueue
, zamiast tegomultiprocessing
należy go użyć. Dlatego powinien wyglądać jak „from multiprocessing import Queue
”źródło
multiprocessing.Queue
jest poprawne. NormalnyQueue.Queue
jest używany dla wątków Pythona . Podczas próby użyciaQueue.Queue
z przetwarzaniem wieloprocesowym kopie obiektu Queue zostaną utworzone w każdym procesie podrzędnym, a procesy potomne nigdy nie zostaną zaktualizowane. ZasadniczoQueue.Queue
działa przy użyciu globalnego obiektu udostępnionego imultiprocessing.Queue
działa przy użyciu protokołu IPC. Zobacz: stackoverflow.com/questions/925100/…Oto martwe proste użycie
multiprocessing.Queue
i,multiprocessing.Process
które umożliwia wywołującym wysyłanie „zdarzenia” wraz z argumentami do oddzielnego procesu, który wysyła zdarzenie do metody „do_” w procesie. (Python 3.4+)import multiprocessing as mp import collections Msg = collections.namedtuple('Msg', ['event', 'args']) class BaseProcess(mp.Process): """A process backed by an internal queue for simple one-way message passing. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.queue = mp.Queue() def send(self, event, *args): """Puts the event and args as a `Msg` on the queue """ msg = Msg(event, args) self.queue.put(msg) def dispatch(self, msg): event, args = msg handler = getattr(self, "do_%s" % event, None) if not handler: raise NotImplementedError("Process has no handler for [%s]" % event) handler(*args) def run(self): while True: msg = self.queue.get() self.dispatch(msg)
Stosowanie:
class MyProcess(BaseProcess): def do_helloworld(self, arg1, arg2): print(arg1, arg2) if __name__ == "__main__": process = MyProcess() process.start() process.send('helloworld', 'hello', 'world')
send
Dzieje w procesie macierzystego,do_*
dzieje się w procesie potomnym.Pominąłem obsługę wyjątków, która oczywiście przerwałaby pętlę uruchamiania i zakończyła proces potomny. Możesz również dostosować go, nadpisując,
run
aby kontrolować blokowanie lub cokolwiek innego.Jest to naprawdę przydatne tylko w sytuacjach, gdy masz pojedynczy proces roboczy, ale myślę, że jest to odpowiednia odpowiedź na to pytanie, aby zademonstrować typowy scenariusz z nieco bardziej zorientowaniem obiektowym.
źródło
Przyjrzałem się wielu odpowiedziom na temat przepełnienia stosu i Internetu, próbując skonfigurować sposób wykonywania przetwarzania wieloprocesowego przy użyciu kolejek do przekazywania dużych ramek danych pand. Wydawało mi się, że każda odpowiedź była powtórzeniem tego samego rodzaju rozwiązań bez uwzględnienia mnogości skrajnych przypadków, na które z pewnością można się natknąć podczas wykonywania takich obliczeń. Problem w tym, że w tym samym czasie dzieje się wiele rzeczy. Liczba zadań, liczba pracowników, czas trwania każdego zadania i możliwe wyjątki podczas wykonywania zadania. Wszystko to utrudnia synchronizację, a większość odpowiedzi nie dotyczy tego, jak możesz się do tego zabrać. To jest moje spojrzenie po kilkugodzinnych zabawach, mam nadzieję, że będzie to wystarczająco ogólne, aby większość ludzi uznało to za przydatne.
Kilka przemyśleń przed przykładami kodowania. Ponieważ
queue.Empty
lubqueue.qsize()
jakakolwiek inna podobna metoda jest zawodna w kontroli przepływu, każdy podobny kodwhile True: try: task = pending_queue.get_nowait() except queue.Empty: break
jest fałszywy. To zabije pracownika, nawet jeśli kilka milisekund później w kolejce pojawi się kolejne zadanie. Pracownik nie wyzdrowieje i po chwili WSZYSCY pracownicy znikną, ponieważ przypadkowo znajdą kolejkę chwilowo pustą. W rezultacie główna funkcja wieloprocesorowa (ta z funkcją join () w procesach) powróci bez ukończenia wszystkich zadań. Miły. Powodzenia w debugowaniu, jeśli masz tysiące zadań, a kilku brakuje.
Drugą kwestią jest użycie wartości wartowniczych. Wiele osób zasugerowało dodanie wartości wartowniczej do kolejki, aby oznaczyć koniec kolejki. Ale aby oznaczyć to komu dokładnie? Jeśli jest N pracowników, zakładając, że N to liczba dostępnych rdzeni, które można oddać lub wziąć, wtedy pojedyncza wartość wartownika będzie oznaczać koniec kolejki tylko do jednego pracownika. Wszyscy pozostali pracownicy będą siedzieć, czekając na więcej pracy, gdy jej już nie ma. Typowe przykłady, które widziałem, to
while True: task = pending_queue.get() if task == SOME_SENTINEL_VALUE: break
Jeden pracownik otrzyma wartość wartowniczą, podczas gdy reszta będzie czekać w nieskończoność. Żaden post, na który natknąłem się, nie wspomniał, że musisz przesłać wartość wartowniczą do kolejki CO NAJMNIEJ tyle razy, ile masz pracowników, aby WSZYSCY je otrzymali.
Drugą kwestią jest obsługa wyjątków podczas wykonywania zadań. Ponownie należy je złapać i zarządzać. Co więcej, jeśli masz
completed_tasks
kolejkę, powinieneś niezależnie obliczyć w sposób deterministyczny, ile elementów jest w kolejce, zanim zdecydujesz, że zadanie zostało wykonane. Ponownie poleganie na rozmiarach kolejek jest skazane na niepowodzenie i zwraca nieoczekiwane wyniki.W poniższym przykładzie
par_proc()
funkcja otrzyma listę zadań zawierającą funkcje, z którymi te zadania powinny być wykonywane, wraz z nazwanymi argumentami i wartościami.import multiprocessing as mp import dill as pickle import queue import time import psutil SENTINEL = None def do_work(tasks_pending, tasks_completed): # Get the current worker's name worker_name = mp.current_process().name while True: try: task = tasks_pending.get_nowait() except queue.Empty: print(worker_name + ' found an empty queue. Sleeping for a while before checking again...') time.sleep(0.01) else: try: if task == SENTINEL: print(worker_name + ' no more work left to be done. Exiting...') break print(worker_name + ' received some work... ') time_start = time.perf_counter() work_func = pickle.loads(task['func']) result = work_func(**task['task']) tasks_completed.put({work_func.__name__: result}) time_end = time.perf_counter() - time_start print(worker_name + ' done in {} seconds'.format(round(time_end, 5))) except Exception as e: print(worker_name + ' task failed. ' + str(e)) tasks_completed.put({work_func.__name__: None}) def par_proc(job_list, num_cpus=None): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Use as many workers as there are cores (usually chokes the system so better use less) num_workers = num_cpus # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 for p in processes: p.join() return results
A oto test do uruchomienia powyższego kodu
def test_parallel_processing(): def heavy_duty1(arg1, arg2, arg3): return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert job1 == 15 assert job2 == 21
plus jeszcze jeden z kilkoma wyjątkami
def test_parallel_processing_exceptions(): def heavy_duty1_raises(arg1, arg2, arg3): raise ValueError('Exception raised') return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert not job1 assert job2 == 21
Mam nadzieję, że to jest pomocne.
źródło
Wdrożyliśmy dwie wersje tego, jedną prostą wielu wątków basen, który można wykonać wiele rodzajów callables, co czyni nasze życie o wiele łatwiejsze i drugą wersję, która wykorzystuje procesy , które jest mniej elastyczny pod względem callables i wymaga i dodatkowych połączeń do koperkiem.
Ustawienie wartości true dla frozen_pool spowoduje zawieszenie wykonania do momentu wywołania finish_pool_queue w którejkolwiek z klas.
Wersja wątku:
''' Created on Nov 4, 2019 @author: Kevin ''' from threading import Lock, Thread from Queue import Queue import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os class ThreadPool(object): def __init__(self, queue_threads, *args, **kwargs): self.frozen_pool = kwargs.get('frozen_pool', False) self.print_queue = kwargs.get('print_queue', True) self.pool_results = [] self.lock = Lock() self.queue_threads = queue_threads self.queue = Queue() self.threads = [] for i in range(self.queue_threads): t = Thread(target=self.make_pool_call) t.daemon = True t.start() self.threads.append(t) def make_pool_call(self): while True: if self.frozen_pool: #print '--> Queue is frozen' sleep(1) continue item = self.queue.get() if item is None: break call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.lock.acquire() self.pool_results.append((item, result)) self.lock.release() except Exception as e: self.lock.acquire() print e traceback.print_exc() self.lock.release() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self): self.frozen_pool = False while self.queue.unfinished_tasks > 0: if self.print_queue: print_info('--> Thread pool... %s' % self.queue.unfinished_tasks) sleep(5) self.queue.join() for i in range(self.queue_threads): self.queue.put(None) for t in self.threads: t.join() del self.threads[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Wersja procesu:
''' Created on Nov 4, 2019 @author: Kevin ''' import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\ RawArray, Manager from dill import dill import ctypes from helium.misc.utils import ignore_exception from mem_top import mem_top import gc class ProcessPool(object): def __init__(self, queue_processes, *args, **kwargs): self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False)) self.print_queue = kwargs.get('print_queue', True) self.manager = Manager() self.pool_results = self.manager.list() self.queue_processes = queue_processes self.queue = JoinableQueue() self.processes = [] for i in range(self.queue_processes): p = Process(target=self.make_pool_call) p.start() self.processes.append(p) print 'Processes', self.queue_processes def make_pool_call(self): while True: if self.frozen_pool.value: sleep(1) continue item_pickled = self.queue.get() if item_pickled is None: #print '--> Ending' self.queue.task_done() break item = dill.loads(item_pickled) call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.pool_results.append(dill.dumps((item, result))) else: del call, args, kwargs, keep_results, item, result except Exception as e: print e traceback.print_exc() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self, callable=None): self.frozen_pool.value = False while self.queue._unfinished_tasks.get_value() > 0: if self.print_queue: print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value())) if callable: callable() sleep(5) for i in range(self.queue_processes): self.queue.put(None) self.queue.join() self.queue.close() for p in self.processes: with ignore_exception: p.join(10) with ignore_exception: p.terminate() with ignore_exception: del self.processes[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Zadzwoń z:
tp = ThreadPool(queue_threads=2) tp.queue.put({'call': test, 'args': [random.randint(0, 100)]}) tp.finish_pool_queue()
lub
pp = ProcessPool(queue_processes=2) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.finish_pool_queue()
źródło
Zrobiłem prosty i ogólny przykład pokazujący przekazywanie wiadomości przez kolejkę między dwoma niezależnymi programami. Nie odpowiada bezpośrednio na pytanie PO, ale powinno być wystarczająco jasne, wskazując koncepcję.
Serwer:
multiprocessing-queue-manager-server.py
import asyncio import concurrent.futures import multiprocessing import multiprocessing.managers import queue import sys import threading from typing import Any, AnyStr, Dict, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: global q if not ident in q: q[ident] = multiprocessing.Queue() return q[ident] q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict() delattr(QueueManager, 'get_queue') def init_queue_manager_server(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue', get_queue) def serve(no: int, term_ev: threading.Event): manager: QueueManager with QueueManager(authkey=QueueManager.__name__.encode()) as manager: print(f"Server address {no}: {manager.address}") while not term_ev.is_set(): try: item: Any = manager.get_queue().get(timeout=0.1) print(f"Client {no}: {item} from {manager.address}") except queue.Empty: continue async def main(n: int): init_queue_manager_server() term_ev: threading.Event = threading.Event() executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor() i: int for i in range(n): asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev)) # Gracefully shut down try: await asyncio.get_running_loop().create_future() except asyncio.CancelledError: term_ev.set() executor.shutdown() raise if __name__ == '__main__': asyncio.run(main(int(sys.argv[1])))
Klient:
multiprocessing-queue-manager-client.py
import multiprocessing import multiprocessing.managers import os import sys from typing import AnyStr, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass delattr(QueueManager, 'get_queue') def init_queue_manager_client(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue') def main(): init_queue_manager_client() manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode()) manager.connect() message = f"A message from {os.getpid()}" print(f"Message to send: {message}") manager.get_queue().put(message) if __name__ == '__main__': main()
Stosowanie
Serwer:
N
to liczba całkowita wskazująca, ile serwerów należy utworzyć. Skopiuj jedno z danych<server-address-N>
wyjściowych serwera i ustaw je jako pierwszy argument każdego z nichmultiprocessing-queue-manager-client.py
.Klient:
python3 multiprocessing-queue-manager-client.py <server-address-1>
Wynik
Serwer:
Client 1: <item> from <server-address-1>
Streszczenie: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD : utworzono pakiet tutaj .
Serwer:
import ipcq with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server: server.get_queue().get()
Klient:
import ipcq client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) client.get_queue().put('a message')
źródło