przetwarzanie wieloprocesowe: jak udostępnić dyktando wielu procesom?

113

Program, który tworzy kilka procesów, które działają w kolejce, którą Qmożna połączyć , i może ostatecznie manipulować słownikiem globalnym w Dcelu przechowywania wyników. (aby każdy proces podrzędny mógł Dprzechowywać swój wynik, a także zobaczyć, jakie wyniki wytwarzają inne procesy podrzędne)

Jeśli drukuję słownik D w procesie potomnym, widzę modyfikacje, które zostały w nim wykonane (tj. Na D). Ale gdy główny proces dołącza do Q, jeśli drukuję D, jest to pusty dykt!

Rozumiem, że jest to problem z synchronizacją / blokadą. Czy ktoś może mi powiedzieć, co się tutaj dzieje i jak mogę zsynchronizować dostęp do D?

dop
źródło
1
Nie działa to zgodnie z oczekiwaniami, przynajmniej na Pythonie 3.7.2 używającym osx 10.14.4 Dict nie jest zsynchronizowany, a jego zawartość jest przepisywana przez inne procesy. Jednak <code> multiprocessing.Manager (). List () </code> działa zgodnie z oczekiwaniami.
Andrew Druchenko

Odpowiedzi:

162

Ogólna odpowiedź dotyczy użycia Managerprzedmiotu. Na podstawie dokumentacji:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

Wynik:

$ python mul.py 
{1: '111', '2': 6}
nadawca
źródło
4
Dzięki nadawcy. Rzeczywiście, D = multiprocessing.Manager (). Dict () rozwiązuje mój problem. Używałem D = dict ().
dop
3
@LorenzoBelli, jeśli pytasz, czy dostęp do menedżera jest zsynchronizowany, myślę, że odpowiedź brzmi: tak. multiprocessing.Manager()zwraca instancjęSyncManager , której nazwa sugeruje tyle!
senderle
@senderle Chcę udostępnić numpy losowy stan procesu nadrzędnego procesowi podrzędnemu. Próbowałem użyć, Managerale nadal nie mam szczęścia. Czy mógłbyś spojrzeć na moje pytanie tutaj i sprawdzić, czy możesz zaoferować rozwiązanie? Nadal mogę uzyskać różne liczby losowe, jeśli robię to za np.random.seed(None)każdym razem, gdy generuję liczbę losową, ale nie pozwala mi to na użycie losowego stanu procesu nadrzędnego, czego nie chcę. Każda pomoc jest mile widziana.
Amir
1
@RadioControlled z przyjemnością pisze aktualizację, ale w skrócie, chociaż nie sądzę, abyś mógł to zrobić bezpośrednio, możesz łatwo utworzyć nowy zarządzany dykt z tymi samymi kluczami i wartościami i użyć go zamiast oryginału. Czy to jest odpowiednie dla twojego przypadku?
nadawca
1
@senderle, właśnie to zrobiłem. Więc odpowiedź byłaby taka, że ​​musiałbyś to zrobić.
Sterowanie radiowe
25

wieloprocesorowość to nie wątkowanie. Każdy proces potomny otrzyma kopię pamięci procesu głównego. Ogólnie stan jest udostępniany za pośrednictwem komunikacji (potoki / gniazda), sygnałów lub pamięci współdzielonej.

Wieloprocesowość udostępnia pewne abstrakcje dla twojego przypadku użycia - stan współdzielony, który jest traktowany jako lokalny przez użycie serwerów proxy lub pamięci współdzielonej: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Odpowiednie sekcje:

Jeremy Brown
źródło
1
Wielkie dzięki. Doprowadziłeś mnie do rozwiązania / a: multiprocessing.Manager (). Dict ().
dop
Czy ktoś może wyjaśnić, co oznacza stwierdzenie „Każdy proces podrzędny otrzyma kopię pamięci procesu głównego”.
Itsme2003,
@ Itsme2003 domyślnie spawnowany proces nie ma dostępu do pamięci procesu nadrzędnego (jest to jedna z kluczowych różnic w wątkach). Więc gdy proces potrzebuje obiektu procesu macierzystego, musi utworzyć jego kopię (zamiast pobierać odniesienie do rzeczywistego obiektu). Powyższa odpowiedź wyjaśnia, jak współdzielić obiekty między procesami.
Niklas Mertsch
Ponieważ jest to często błędne: dopóki nie zmodyfikujesz obiektu, przynajmniej w zwykłej konfiguracji Linuksa, obiekt zostanie faktycznie zapisany w pamięci tylko raz. Zostanie skopiowany, gdy tylko zostanie zmieniony. Może to być bardzo ważne, jeśli chcesz zaoszczędzić pamięć i nie modyfikować obiektu.
Kontrolowane radiowo
16

Chciałbym podzielić się własną pracą, która jest szybsza niż dykt Manager i jest prostsza i bardziej stabilna niż biblioteka pyshmht, która zużywa mnóstwo pamięci i nie działa w systemie Mac OS. Chociaż mój dykt działa tylko dla zwykłych strun i jest obecnie niezmienny. Używam implementacji sondowania liniowego i przechowuję klucze i pary wartości w oddzielnym bloku pamięci po tabeli.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

Wyniki wydajności mojego laptopa to:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

prosty przykład użycia:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')
alyaxey
źródło
14
Github? Dokumentacja? jak możemy używać tego narzędzia?
Pavlos Panteliadis
10

Oprócz @ senderle tutaj, niektórzy mogą również zastanawiać się, jak korzystać z funkcjonalności multiprocessing.Pool.

Fajną rzeczą jest to, że .Pool()w managerinstancji istnieje metoda, która naśladuje wszystkie znane API najwyższego poziomu multiprocessing.

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

Wynik:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

To jest nieco inny przykład, w którym każdy proces po prostu rejestruje swój identyfikator procesu w DictProxyobiekcie globalnym d.

Brad Solomon
źródło
3

Może możesz spróbować pyshmht , współdzielenie rozszerzenia tablicy mieszania opartego na pamięci dla Pythona.

Ogłoszenie

  1. Nie jest w pełni przetestowany, tylko w celach informacyjnych.

  2. Obecnie brakuje mu mechanizmów blokowania / sem do przetwarzania wieloprocesowego.

felix021
źródło