Jak korzystać z kolejki wieloprocesowej w Pythonie?

99

Mam wiele problemów ze zrozumieniem, jak kolejka wieloprocesorowa działa w Pythonie i jak ją zaimplementować. Powiedzmy, że mam dwa moduły Pythona, które uzyskują dostęp do danych z udostępnionego pliku, nazwijmy te dwa moduły pisarzem i czytnikiem. Mój plan polega na tym, aby zarówno czytający, jak i piszący umieścili żądania w dwóch oddzielnych kolejkach wieloprocesorowych, a następnie trzeci proces zapętla te żądania i wykonuje je jako takie.

Moim głównym problemem jest to, że naprawdę nie wiem, jak poprawnie zaimplementować multiprocessing.queue, nie możesz tak naprawdę utworzyć wystąpienia obiektu dla każdego procesu, ponieważ będą to oddzielne kolejki, jak upewnić się, że wszystkie procesy odnoszą się do wspólnej kolejki (lub w tym przypadku kolejki)

ukłucie
źródło
4
przekazać kolejki do każdej klasy procesu jako parametr podczas tworzenia ich wystąpienia w procesie nadrzędnym.
Joel Cornett,

Odpowiedzi:

125

Moim głównym problemem jest to, że naprawdę nie wiem, jak poprawnie zaimplementować multiprocessing.queue, nie możesz tak naprawdę utworzyć wystąpienia obiektu dla każdego procesu, ponieważ będą to oddzielne kolejki, jak upewnić się, że wszystkie procesy odnoszą się do wspólnej kolejki (lub w tym przypadku kolejki)

To jest prosty przykład czytelnika i pisarza współużytkujących jedną kolejkę… Autor wysyła do czytelnika zbiór liczb całkowitych; kiedy piszącemu skończą się liczby, wysyła „DONE”, co pozwala czytelnikowi wyrwać się z pętli odczytu.

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))
Mike Pennington
źródło
14
Świetny przykład. Tak jak dodatkowa informacja, aby rozwiązać problem z operatorem PO… Ten przykład pokazuje, że współdzielona kolejka musi pochodzić z procesu głównego, który jest następnie przekazywany do wszystkich jego podprocesów. Aby dwa zupełnie niezwiązane ze sobą procesy mogły współdzielić dane, muszą komunikować się przez jakieś centralne lub powiązane urządzenie sieciowe (na przykład gniazda). Coś musi koordynować informacje.
jdi
5
fajny przykład .. Jestem też nowy w tym temacie .. jeśli mam wiele procesów uruchamiających tę samą funkcję docelową (z różnymi argumentami), jak się upewnić, że nie kolidują one podczas umieszczania danych w kolejce .. jest konieczna blokada ?
WYSIWYG
@bharat_iyengar Z dokumentacji modułu wieloprocesorowego wynika, że ​​kolejka jest implementowana przy użyciu kilku blokad / semaforów. Więc kiedy używasz metod get () i put (object) Queue, kolejka będzie blokować, jeśli inny proces / wątek próbuje pobrać lub umieścić coś w kolejce. Nie musisz więc martwić się o ręczne zablokowanie go.
almel
1
Wyraźne warunki zatrzymania są lepsze niż niejawne warunki zatrzymania
Mike Pennington
2
Qsize może spaść do zera, jeśli czytelnicy kolejki przekroczą szybkość autora kolejki
Mike Pennington
8

w " from queue import Queue" nie ma wywoływanego modułu queue, zamiast tego multiprocessingnależy go użyć. Dlatego powinien wyglądać jak „ from multiprocessing import Queue

Drelich
źródło
11
Chociaż późno, używanie multiprocessing.Queuejest poprawne. Normalny Queue.Queuejest używany dla wątków Pythona . Podczas próby użycia Queue.Queuez przetwarzaniem wieloprocesowym kopie obiektu Queue zostaną utworzone w każdym procesie podrzędnym, a procesy potomne nigdy nie zostaną zaktualizowane. Zasadniczo Queue.Queuedziała przy użyciu globalnego obiektu udostępnionego i multiprocessing.Queuedziała przy użyciu protokołu IPC. Zobacz: stackoverflow.com/questions/925100/…
Michael Guffre
6

Oto martwe proste użycie multiprocessing.Queuei, multiprocessing.Processktóre umożliwia wywołującym wysyłanie „zdarzenia” wraz z argumentami do oddzielnego procesu, który wysyła zdarzenie do metody „do_” w procesie. (Python 3.4+)

import multiprocessing as mp
import collections

Msg = collections.namedtuple('Msg', ['event', 'args'])

class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()

    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)

    def dispatch(self, msg):
        event, args = msg

        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)

        handler(*args)

    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)

Stosowanie:

class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)

if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')

sendDzieje w procesie macierzystego, do_*dzieje się w procesie potomnym.

Pominąłem obsługę wyjątków, która oczywiście przerwałaby pętlę uruchamiania i zakończyła proces potomny. Możesz również dostosować go, nadpisując, runaby kontrolować blokowanie lub cokolwiek innego.

Jest to naprawdę przydatne tylko w sytuacjach, gdy masz pojedynczy proces roboczy, ale myślę, że jest to odpowiednia odpowiedź na to pytanie, aby zademonstrować typowy scenariusz z nieco bardziej zorientowaniem obiektowym.

Joe Holloway
źródło
1
Znakomita odpowiedź! Dziękuję Ci. +50 :)
kmiklas
6

Przyjrzałem się wielu odpowiedziom na temat przepełnienia stosu i Internetu, próbując skonfigurować sposób wykonywania przetwarzania wieloprocesowego przy użyciu kolejek do przekazywania dużych ramek danych pand. Wydawało mi się, że każda odpowiedź była powtórzeniem tego samego rodzaju rozwiązań bez uwzględnienia mnogości skrajnych przypadków, na które z pewnością można się natknąć podczas wykonywania takich obliczeń. Problem w tym, że w tym samym czasie dzieje się wiele rzeczy. Liczba zadań, liczba pracowników, czas trwania każdego zadania i możliwe wyjątki podczas wykonywania zadania. Wszystko to utrudnia synchronizację, a większość odpowiedzi nie dotyczy tego, jak możesz się do tego zabrać. To jest moje spojrzenie po kilkugodzinnych zabawach, mam nadzieję, że będzie to wystarczająco ogólne, aby większość ludzi uznało to za przydatne.

Kilka przemyśleń przed przykładami kodowania. Ponieważ queue.Emptylub queue.qsize()jakakolwiek inna podobna metoda jest zawodna w kontroli przepływu, każdy podobny kod

while True:
    try:
        task = pending_queue.get_nowait()
    except queue.Empty:
        break

jest fałszywy. To zabije pracownika, nawet jeśli kilka milisekund później w kolejce pojawi się kolejne zadanie. Pracownik nie wyzdrowieje i po chwili WSZYSCY pracownicy znikną, ponieważ przypadkowo znajdą kolejkę chwilowo pustą. W rezultacie główna funkcja wieloprocesorowa (ta z funkcją join () w procesach) powróci bez ukończenia wszystkich zadań. Miły. Powodzenia w debugowaniu, jeśli masz tysiące zadań, a kilku brakuje.

Drugą kwestią jest użycie wartości wartowniczych. Wiele osób zasugerowało dodanie wartości wartowniczej do kolejki, aby oznaczyć koniec kolejki. Ale aby oznaczyć to komu dokładnie? Jeśli jest N pracowników, zakładając, że N to liczba dostępnych rdzeni, które można oddać lub wziąć, wtedy pojedyncza wartość wartownika będzie oznaczać koniec kolejki tylko do jednego pracownika. Wszyscy pozostali pracownicy będą siedzieć, czekając na więcej pracy, gdy jej już nie ma. Typowe przykłady, które widziałem, to

while True:
    task = pending_queue.get()
    if task == SOME_SENTINEL_VALUE:
        break

Jeden pracownik otrzyma wartość wartowniczą, podczas gdy reszta będzie czekać w nieskończoność. Żaden post, na który natknąłem się, nie wspomniał, że musisz przesłać wartość wartowniczą do kolejki CO NAJMNIEJ tyle razy, ile masz pracowników, aby WSZYSCY je otrzymali.

Drugą kwestią jest obsługa wyjątków podczas wykonywania zadań. Ponownie należy je złapać i zarządzać. Co więcej, jeśli masz completed_taskskolejkę, powinieneś niezależnie obliczyć w sposób deterministyczny, ile elementów jest w kolejce, zanim zdecydujesz, że zadanie zostało wykonane. Ponownie poleganie na rozmiarach kolejek jest skazane na niepowodzenie i zwraca nieoczekiwane wyniki.

W poniższym przykładzie par_proc()funkcja otrzyma listę zadań zawierającą funkcje, z którymi te zadania powinny być wykonywane, wraz z nazwanymi argumentami i wartościami.

import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil

SENTINEL = None


def do_work(tasks_pending, tasks_completed):
    # Get the current worker's name
    worker_name = mp.current_process().name

    while True:
        try:
            task = tasks_pending.get_nowait()
        except queue.Empty:
            print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
            time.sleep(0.01)
        else:
            try:
                if task == SENTINEL:
                    print(worker_name + ' no more work left to be done. Exiting...')
                    break

                print(worker_name + ' received some work... ')
                time_start = time.perf_counter()
                work_func = pickle.loads(task['func'])
                result = work_func(**task['task'])
                tasks_completed.put({work_func.__name__: result})
                time_end = time.perf_counter() - time_start
                print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
            except Exception as e:
                print(worker_name + ' task failed. ' + str(e))
                tasks_completed.put({work_func.__name__: None})


def par_proc(job_list, num_cpus=None):

    # Get the number of cores
    if not num_cpus:
        num_cpus = psutil.cpu_count(logical=False)

    print('* Parallel processing')
    print('* Running on {} cores'.format(num_cpus))

    # Set-up the queues for sending and receiving data to/from the workers
    tasks_pending = mp.Queue()
    tasks_completed = mp.Queue()

    # Gather processes and results here
    processes = []
    results = []

    # Count tasks
    num_tasks = 0

    # Add the tasks to the queue
    for job in job_list:
        for task in job['tasks']:
            expanded_job = {}
            num_tasks = num_tasks + 1
            expanded_job.update({'func': pickle.dumps(job['func'])})
            expanded_job.update({'task': task})
            tasks_pending.put(expanded_job)

    # Use as many workers as there are cores (usually chokes the system so better use less)
    num_workers = num_cpus

    # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
    # work left to be done.
    for c in range(num_workers):
        tasks_pending.put(SENTINEL)

    print('* Number of tasks: {}'.format(num_tasks))

    # Set-up and start the workers
    for c in range(num_workers):
        p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
        p.name = 'worker' + str(c)
        processes.append(p)
        p.start()

    # Gather the results
    completed_tasks_counter = 0
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1

    for p in processes:
        p.join()

    return results

A oto test do uruchomienia powyższego kodu

def test_parallel_processing():
    def heavy_duty1(arg1, arg2, arg3):
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert job1 == 15
    assert job2 == 21

plus jeszcze jeden z kilkoma wyjątkami

def test_parallel_processing_exceptions():
    def heavy_duty1_raises(arg1, arg2, arg3):
        raise ValueError('Exception raised')
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert not job1
    assert job2 == 21

Mam nadzieję, że to jest pomocne.

Nick B.
źródło
2

Wdrożyliśmy dwie wersje tego, jedną prostą wielu wątków basen, który można wykonać wiele rodzajów callables, co czyni nasze życie o wiele łatwiejsze i drugą wersję, która wykorzystuje procesy , które jest mniej elastyczny pod względem callables i wymaga i dodatkowych połączeń do koperkiem.

Ustawienie wartości true dla frozen_pool spowoduje zawieszenie wykonania do momentu wywołania finish_pool_queue w którejkolwiek z klas.

Wersja wątku:

'''
Created on Nov 4, 2019

@author: Kevin
'''
from threading import Lock, Thread
from Queue import Queue
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os

class ThreadPool(object):
    def __init__(self, queue_threads, *args, **kwargs):
        self.frozen_pool = kwargs.get('frozen_pool', False)
        self.print_queue = kwargs.get('print_queue', True)
        self.pool_results = []
        self.lock = Lock()
        self.queue_threads = queue_threads
        self.queue = Queue()
        self.threads = []

        for i in range(self.queue_threads):
            t = Thread(target=self.make_pool_call)
            t.daemon = True
            t.start()
            self.threads.append(t)

    def make_pool_call(self):
        while True:
            if self.frozen_pool:
                #print '--> Queue is frozen'
                sleep(1)
                continue

            item = self.queue.get()
            if item is None:
                break

            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)

            try:
                result = call(*args, **kwargs)

                if keep_results:
                    self.lock.acquire()
                    self.pool_results.append((item, result))
                    self.lock.release()

            except Exception as e:
                self.lock.acquire()
                print e
                traceback.print_exc()
                self.lock.release()
                os.kill(os.getpid(), signal.SIGUSR1)

            self.queue.task_done()

    def finish_pool_queue(self):
        self.frozen_pool = False

        while self.queue.unfinished_tasks > 0:
            if self.print_queue:
                print_info('--> Thread pool... %s' % self.queue.unfinished_tasks)
            sleep(5)

        self.queue.join()

        for i in range(self.queue_threads):
            self.queue.put(None)

        for t in self.threads:
            t.join()

        del self.threads[:]

    def get_pool_results(self):
        return self.pool_results

    def clear_pool_results(self):
        del self.pool_results[:]

Wersja procesu:

  '''
Created on Nov 4, 2019

@author: Kevin
'''
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\
    RawArray, Manager
from dill import dill
import ctypes
from helium.misc.utils import ignore_exception
from mem_top import mem_top
import gc

class ProcessPool(object):
    def __init__(self, queue_processes, *args, **kwargs):
        self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False))
        self.print_queue = kwargs.get('print_queue', True)
        self.manager = Manager()
        self.pool_results = self.manager.list()
        self.queue_processes = queue_processes
        self.queue = JoinableQueue()
        self.processes = []

        for i in range(self.queue_processes):
            p = Process(target=self.make_pool_call)
            p.start()
            self.processes.append(p)

        print 'Processes', self.queue_processes

    def make_pool_call(self):
        while True:
            if self.frozen_pool.value:
                sleep(1)
                continue

            item_pickled = self.queue.get()

            if item_pickled is None:
                #print '--> Ending'
                self.queue.task_done()
                break

            item = dill.loads(item_pickled)

            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)

            try:
                result = call(*args, **kwargs)

                if keep_results:
                    self.pool_results.append(dill.dumps((item, result)))
                else:
                    del call, args, kwargs, keep_results, item, result

            except Exception as e:
                print e
                traceback.print_exc()
                os.kill(os.getpid(), signal.SIGUSR1)

            self.queue.task_done()

    def finish_pool_queue(self, callable=None):
        self.frozen_pool.value = False

        while self.queue._unfinished_tasks.get_value() > 0:
            if self.print_queue:
                print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value()))

            if callable:
                callable()

            sleep(5)

        for i in range(self.queue_processes):
            self.queue.put(None)

        self.queue.join()
        self.queue.close()

        for p in self.processes:
            with ignore_exception: p.join(10)
            with ignore_exception: p.terminate()

        with ignore_exception: del self.processes[:]

    def get_pool_results(self):
        return self.pool_results

    def clear_pool_results(self):
        del self.pool_results[:]
def test(eg):
        print 'EG', eg

Zadzwoń z:

tp = ThreadPool(queue_threads=2)
tp.queue.put({'call': test, 'args': [random.randint(0, 100)]})
tp.finish_pool_queue()

lub

pp = ProcessPool(queue_processes=2)
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.finish_pool_queue()
Kevin Parker
źródło
0

Zrobiłem prosty i ogólny przykład pokazujący przekazywanie wiadomości przez kolejkę między dwoma niezależnymi programami. Nie odpowiada bezpośrednio na pytanie PO, ale powinno być wystarczająco jasne, wskazując koncepcję.

Serwer:

multiprocessing-queue-manager-server.py

import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union


class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass


def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
    global q

    if not ident in q:
        q[ident] = multiprocessing.Queue()

    return q[ident]


q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')


def init_queue_manager_server():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue', get_queue)


def serve(no: int, term_ev: threading.Event):
    manager: QueueManager
    with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
        print(f"Server address {no}: {manager.address}")

        while not term_ev.is_set():
            try:
                item: Any = manager.get_queue().get(timeout=0.1)
                print(f"Client {no}: {item} from {manager.address}")
            except queue.Empty:
                continue


async def main(n: int):
    init_queue_manager_server()
    term_ev: threading.Event = threading.Event()
    executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()

    i: int
    for i in range(n):
        asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))

    # Gracefully shut down
    try:
        await asyncio.get_running_loop().create_future()
    except asyncio.CancelledError:
        term_ev.set()
        executor.shutdown()
        raise


if __name__ == '__main__':
    asyncio.run(main(int(sys.argv[1])))

Klient:

multiprocessing-queue-manager-client.py

import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union


class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass


delattr(QueueManager, 'get_queue')


def init_queue_manager_client():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue')


def main():
    init_queue_manager_client()

    manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
    manager.connect()

    message = f"A message from {os.getpid()}"
    print(f"Message to send: {message}")
    manager.get_queue().put(message)


if __name__ == '__main__':
    main()

Stosowanie

Serwer:

$ python3 multiprocessing-queue-manager-server.py N

Nto liczba całkowita wskazująca, ile serwerów należy utworzyć. Skopiuj jedno z danych <server-address-N>wyjściowych serwera i ustaw je jako pierwszy argument każdego z nich multiprocessing-queue-manager-client.py.

Klient:

python3 multiprocessing-queue-manager-client.py <server-address-1>

Wynik

Serwer:

Client 1: <item> from <server-address-1>

Streszczenie: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5


UPD : utworzono pakiet tutaj .

Serwer:

import ipcq


with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server:
    server.get_queue().get()

Klient:

import ipcq


client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT)
client.get_queue().put('a message')

wprowadź opis obrazu tutaj

changyuheng
źródło