przetwarzanie wieloprocesowe: współdzielenie dużego obiektu tylko do odczytu między procesami?

107

Czy procesy potomne powstały w wyniku współdzielonych obiektów wieloprocesowych utworzonych wcześniej w programie?

Mam następującą konfigurację:

do_some_processing(filename):
    for line in file(filename):
        if line.split(',')[0] in big_lookup_object:
            # something here

if __name__ == '__main__':
    big_lookup_object = marshal.load('file.bin')
    pool = Pool(processes=4)
    print pool.map(do_some_processing, glob.glob('*.data'))

Ładuję duży obiekt do pamięci, a następnie tworzę pulę pracowników, którzy muszą z niego korzystać. Duży obiekt jest dostępny tylko do odczytu, nie muszę przekazywać jego modyfikacji między procesami.

Moje pytanie brzmi: czy duży obiekt jest ładowany do pamięci współdzielonej, tak jak by to było, gdybym uruchomił proces w unix / c, czy też każdy proces ładuje własną kopię dużego obiektu?

Aktualizacja: aby wyjaśnić dalej - big_lookup_object jest współdzielonym obiektem wyszukiwania. Nie muszę tego dzielić i przetwarzać osobno. Muszę zachować jedną kopię. Praca, której potrzebuję, aby go podzielić, polega na czytaniu wielu innych dużych plików i sprawdzaniu elementów w tych dużych plikach względem obiektu wyszukiwania.

Dalsza aktualizacja: baza danych jest dobrym rozwiązaniem, memcached może być lepszym rozwiązaniem, a plik na dysku (półka lub dbm) może być jeszcze lepszy. W tym pytaniu byłem szczególnie zainteresowany rozwiązaniem w pamięci. Jako ostateczne rozwiązanie użyję hadoopa, ale chciałem sprawdzić, czy mogę również mieć lokalną wersję w pamięci.

Parand
źródło
twój kod tak, jak napisano, będzie wywoływał marshal.loadrodzica i każde dziecko (każdy proces importuje moduł).
jfs
Masz rację, poprawione.
Parand
W przypadku „lokalnej pamięci” i jeśli chcesz uniknąć kopiowania, przydatne mogą być następujące informacje docs.python.org/library/ ...
jfs
udział nie. spawnowane procesy (na przykład fork lub exec) są dokładnym duplikatem procesu wywołującego ... ale w innej pamięci. Aby jeden proces mógł komunikować się z innym, potrzebna jest komunikacja międzyprocesowa lub odczyt / zapis IPC w jakiejś współdzielonej lokalizacji pamięci.
ron

Odpowiedzi:

50

„Czy procesy potomne powstały w wyniku współdzielonych obiektów wieloprocesowych utworzonych wcześniej w programie?”

Nie (python przed 3.8) i tak w 3.8 ( https://docs.python.org/3/library/multiprocessing.shared_memory.html#module-multiprocessing.shared_memory )

Procesy mają niezależną przestrzeń pamięci.

Rozwiązanie 1

Aby jak najlepiej wykorzystać dużą strukturę z dużą liczbą pracowników, zrób to.

  1. Zapisz każdego pracownika jako „filtr” - odczytuje wyniki pośrednie ze standardowego wejścia, działa, zapisuje wyniki pośrednie na standardowe wyjście.

  2. Połącz wszystkich pracowników jako rurociąg:

    process1 <source | process2 | process3 | ... | processn >result

Każdy proces czyta, pracuje i pisze.

Jest to niezwykle wydajne, ponieważ wszystkie procesy działają jednocześnie. Zapisy i odczyty przechodzą bezpośrednio przez bufory współdzielone między procesami.


Rozwiązanie 2

W niektórych przypadkach masz bardziej złożoną strukturę - często strukturę „rozłożoną”. W takim przypadku masz rodzica z kilkoma dziećmi.

  1. Rodzic otwiera dane źródłowe. Rodzic rozwidla wiele dzieci.

  2. Rodzic czyta źródło, wypuszcza części źródła do każdego współbieżnie działającego dziecka.

  3. Kiedy rodzic dojdzie do końca, zamknij rurkę. Dziecko otrzymuje koniec pliku i kończy normalnie.

Części dziecięce są przyjemne w pisaniu, ponieważ każde dziecko po prostu czyta sys.stdin.

Rodzic ma trochę wymyślnej pracy nóg przy odradzaniu wszystkich dzieci i prawidłowym utrzymywaniu rur, ale nie jest tak źle.

Fan-in to odwrotna konstrukcja. Wiele niezależnie działających procesów musi przeplatać swoje dane wejściowe do wspólnego procesu. Kolektor nie jest tak łatwy do napisania, ponieważ musi czytać z wielu źródeł.

Czytanie z wielu nazwanych potoków jest często wykonywane za pomocą selectmodułu, aby zobaczyć, które potoki mają oczekujące dane wejściowe.


Rozwiązanie 3

Wspólne wyszukiwanie to definicja bazy danych.

Rozwiązanie 3A - załaduj bazę danych. Pozwól pracownikom przetwarzać dane w bazie danych.

Rozwiązanie 3B - utwórz bardzo prosty serwer przy użyciu werkzeug (lub podobnego), aby zapewnić aplikacje WSGI, które odpowiadają na HTTP GET, aby pracownicy mogli wysyłać zapytania do serwera.


Rozwiązanie 4

Współdzielony obiekt systemu plików. Unix OS oferuje współdzielone obiekty pamięci. Są to tylko pliki, które są mapowane do pamięci, dzięki czemu zamiana we / wy jest wykonywana zamiast bardziej konwencjonalnych odczytów buforowanych.

Możesz to zrobić z kontekstu Pythona na kilka sposobów

  1. Napisz program startowy, który (1) rozbija twój oryginalny gigantyczny obiekt na mniejsze obiekty i (2) uruchamia pracowników, każdy z mniejszym obiektem. Mniejsze obiekty mogą być piklowanymi obiektami Pythona, aby zaoszczędzić trochę czasu na odczyt plików.

  2. Napisz program startowy, który (1) czyta twój oryginalny gigantyczny obiekt i zapisuje plik ze strukturą strony, zakodowany bajtowo, używając seekoperacji, aby zapewnić, że poszczególne sekcje są łatwe do znalezienia za pomocą prostych wyszukiwań. To właśnie robi silnik bazy danych - dzieli dane na strony, ułatwiając zlokalizowanie każdej strony za pomocą pliku seek.

    Przywołaj pracowników mających dostęp do tego dużego pliku o strukturze strony. Każdy pracownik może szukać odpowiednich części i tam wykonywać swoją pracę.

S.Lott
źródło
Moje procesy nie są tak naprawdę fitlerami; wszystkie są takie same, po prostu przetwarzają różne fragmenty danych.
Parand
Często mogą mieć strukturę filtrów. Odczytują swoje dane, wykonują swoją pracę i zapisują wyniki do późniejszego przetworzenia.
S.Lott,
Podoba mi się twoje rozwiązanie, ale co się dzieje z blokowaniem we / wy? A co, jeśli rodzic blokuje czytanie / pisanie od / do jednego ze swoich dzieci? Select powiadamia Cię, że możesz pisać, ale nie mówi, ile. To samo do czytania.
Cristian Ciupitu
To odrębne procesy - rodzice i dzieci nie przeszkadzają sobie nawzajem. Każdy bajt wytworzony na jednym końcu potoku jest natychmiast dostępny na drugim końcu do wykorzystania - potok jest współdzielonym buforem. Nie jestem pewien, co oznacza Twoje pytanie w tym kontekście.
S.Lott,
Mogę zweryfikować, co powiedział S.Lott. Potrzebowałem tych samych operacji na jednym pliku. Tak więc pierwszy pracownik uruchomił swoją funkcję w każdej linii o numerze% 2 == 0 i zapisał ją w pliku, a pozostałe linie wysłał do następnego potokowanego procesu (który był tym samym skryptem). Czas działania skrócił się o połowę. To trochę hacky, ale narzut jest znacznie mniejszy niż mapa / kupa w module wieloprocesorowym.
Vince
36

Czy procesy potomne powstały w wyniku współdzielonych obiektów wieloprocesowych utworzonych wcześniej w programie?

To zależy. W przypadku zmiennych globalnych tylko do odczytu można to często uznać za tak (oprócz zajętej pamięci), w przeciwnym razie nie powinno.

Dokumentacja multiprocessingu mówi:

Better to inherit than pickle/unpickle

W systemie Windows wiele typów z przetwarzania wieloprocesowego musi być możliwych do pobrania, aby procesy potomne mogły ich używać. Jednak generalnie należy unikać wysyłania obiektów współdzielonych do innych procesów przy użyciu potoków lub kolejek. Zamiast tego należy tak ustawić program, aby proces, który potrzebuje dostępu do współdzielonego zasobu utworzonego w innym miejscu, mógł odziedziczyć go po procesie nadrzędnym.

Explicitly pass resources to child processes

W systemie Unix proces potomny może korzystać ze współużytkowanego zasobu utworzonego w procesie nadrzędnym przy użyciu zasobu globalnego. Jednak lepiej jest przekazać obiekt jako argument do konstruktora procesu potomnego.

Oprócz uczynienia kodu (potencjalnie) kompatybilnym z Windows zapewnia to również, że dopóki proces potomny nadal żyje, obiekt nie zostanie usunięty jako śmieci w procesie nadrzędnym. Może to być ważne, jeśli jakiś zasób zostanie zwolniony, gdy obiekt zostanie usunięty w procesie nadrzędnym.

Global variables

Należy pamiętać, że jeśli kod uruchamiany w procesie potomnym próbuje uzyskać dostęp do zmiennej globalnej, wówczas wartość, którą widzi (jeśli istnieje) może nie być taka sama, jak wartość w procesie nadrzędnym w momencie wywołania metody Process.start () .

Przykład

W systemie Windows (pojedynczy procesor):

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool

x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let's try mutable object

def printx(y):
    global x
    if y == 3:
       x = -x
    z.append(y)
    print os.getpid(), x, id(x), z, id(z) 
    print y
    if len(sys.argv) == 2 and sys.argv[1] == "sleep":
       time.sleep(.1) # should make more apparant the effect

if __name__ == '__main__':
    pool = Pool(processes=4)
    pool.map(printx, (1,2,3,4))

Z sleep:

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

Bez sleep:

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4
jfs
źródło
6
Co? W jaki sposób z jest dzielony między procesy?
cbare
4
@cbare: Dobre pytanie! z w rzeczywistości nie jest współdzielony, jak pokazuje wynik ze stanem uśpienia. Wynik bez uśpienia pokazuje, że pojedynczy proces obsługuje (PID = 1148) całą pracę; to, co widzimy w ostatnim przykładzie, to wartość z dla tego pojedynczego procesu.
Eric O Lebigot
Ta odpowiedź pokazuje, że znie jest udostępniana. To w ten sposób odpowiada na pytanie: „nie, przynajmniej pod Windows zmienna nadrzędna nie jest dzielona między dzieci”.
Eric O Lebigot
@EOL: technicznie masz rację, ale w praktyce, jeśli dane są tylko do odczytu (w przeciwieństwie do zprzypadku), można je uznać za udostępnione.
jfs
Dla wyjaśnienia, instrukcja Należy pamiętać, że jeśli kod uruchamiany w procesie potomnym próbuje uzyskać dostęp do zmiennej globalnej ... w dokumentacji 2.7 odnosi się do Pythona działającego pod Windows.
user1071847
28

S.Lott ma rację. Skróty wieloprocesorowe Pythona skutecznie zapewniają oddzielny, zduplikowany fragment pamięci.

W większości systemów * nix użycie wywołania niższego poziomu os.fork()w rzeczywistości da ci pamięć typu „kopiuj przy zapisie”, o której myślisz. AFAIK, w teorii, w najbardziej uproszczonych programach można było czytać te dane bez ich powielania.

Jednak w przypadku interpretera Pythona sprawy nie są takie proste. Dane obiektu i metadane są przechowywane w tym samym segmencie pamięci, więc nawet jeśli obiekt nigdy się nie zmienia, coś w rodzaju licznika odniesienia dla zwiększanego obiektu spowoduje zapis do pamięci, a tym samym kopię. Prawie każdy program w języku Python, który robi więcej niż „print 'hello'”, spowoduje zwiększenie liczby odwołań, więc prawdopodobnie nigdy nie zdasz sobie sprawy z korzyści płynących z kopiowania przy zapisie.

Nawet gdyby komuś udało się zhakować rozwiązanie pamięci współdzielonej w Pythonie, próba skoordynowania czyszczenia pamięci między procesami byłaby prawdopodobnie dość bolesna.

Jarret Hardie
źródło
3
W takim przypadku skopiowany zostanie tylko obszar pamięci liczby referencyjnej, niekoniecznie duże dane tylko do odczytu, prawda?
kawing-chiu
7

Jeśli pracujesz pod Uniksem, mogą dzielić ten sam obiekt, ze względu na to, jak działa fork (tj. Procesy potomne mają oddzielną pamięć, ale jest to kopiowanie przy zapisie, więc może być współdzielone, o ile nikt go nie modyfikuje). Wypróbowałem następujące:

import multiprocessing

x = 23

def printx(y):
    print x, id(x)
    print y

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(printx, (1,2,3,4))

i otrzymałem następujący wynik:

$ ./mtest.py
23 22995656
1
23 22995656
2
23 22995656
3
23 22995656
4

Oczywiście nie dowodzi to, że kopia nie została utworzona, ale powinieneś być w stanie to sprawdzić w swojej sytuacji, patrząc na wynik, psaby zobaczyć, ile rzeczywistej pamięci używa każdy podproces.

Jacob Gabrielson
źródło
2
A co ze śmieciem? Co się dzieje, gdy działa? Czy układ pamięci się nie zmienia?
Cristian Ciupitu
To ważna obawa. To, czy wpłynie to na Paranda, będzie zależeć od tego, w jaki sposób wykorzystuje to wszystko i jak niezawodny musi być ten kod. Gdyby to nie zadziałało, poleciłbym użycie modułu mmap dla większej kontroli (zakładając, że chce trzymać się tego podstawowego podejścia).
Jacob Gabrielson
Opublikowałem aktualizację Twojego przykładu: stackoverflow.com/questions/659865/ ...
jfs
@JacobGabrielson: Kopia została utworzona. Pierwotne pytanie dotyczy tego, czy kopia jest wykonana.
abhinavkulkarni
3

Różne procesy mają różną przestrzeń adresową. Jak uruchamianie różnych instancji tłumacza. Do tego służy IPC (komunikacja międzyprocesowa).

W tym celu można użyć kolejek lub potoków. Możesz także użyć rpc over tcp, jeśli chcesz później rozprowadzić procesy w sieci.

http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes

Wasyl
źródło
2
Nie sądzę, aby IPC był do tego odpowiedni; są to dane tylko do odczytu, do których każdy potrzebuje dostępu. Nie ma sensu przekazywać go między procesami; w najgorszym przypadku każdy może przeczytać własną kopię. Próbuję oszczędzać pamięć, nie mając osobnej kopii w każdym procesie.
Parand
Możesz mieć proces główny delegujący fragmenty danych do pracy do innych procesów podrzędnych. Albo niewolnicy mogą prosić o dane, albo mogą przesyłać dane. W ten sposób nie każdy proces będzie miał kopię całego obiektu.
Vasil
1
@Vasil: A co, jeśli każdy proces potrzebuje całego zestawu danych i wykonuje na nim po prostu inną operację?
Będzie
1

Nie jest to bezpośrednio związane z przetwarzaniem wieloprocesowym jako takim, ale z twojego przykładu wydaje się, że możesz po prostu użyć modułu półki lub czegoś podobnego. Czy „big_lookup_object” naprawdę musi być całkowicie w pamięci?


źródło
Słuszna uwaga, nie porównałem bezpośrednio wydajności na dysku i w pamięci. Zakładałem, że będzie duża różnica, ale tak naprawdę nie testowałem.
Parand
1

Nie, ale możesz załadować swoje dane jako proces podrzędny i pozwolić mu na udostępnianie swoich danych innym dzieciom. patrz poniżej.

import time
import multiprocessing

def load_data( queue_load, n_processes )

    ... load data here into some_variable

    """
    Store multiple copies of the data into
    the data queue. There needs to be enough
    copies available for each process to access. 
    """

    for i in range(n_processes):
        queue_load.put(some_variable)


def work_with_data( queue_data, queue_load ):

    # Wait for load_data() to complete
    while queue_load.empty():
        time.sleep(1)

    some_variable = queue_load.get()

    """
    ! Tuples can also be used here
    if you have multiple data files
    you wish to keep seperate.  
    a,b = queue_load.get()
    """

    ... do some stuff, resulting in new_data

    # store it in the queue
    queue_data.put(new_data)


def start_multiprocess():

    n_processes = 5

    processes = []
    stored_data = []

    # Create two Queues
    queue_load = multiprocessing.Queue()
    queue_data = multiprocessing.Queue()

    for i in range(n_processes):

        if i == 0:
            # Your big data file will be loaded here...
            p = multiprocessing.Process(target = load_data,
            args=(queue_load, n_processes))

            processes.append(p)
            p.start()   

        # ... and then it will be used here with each process
        p = multiprocessing.Process(target = work_with_data,
        args=(queue_data, queue_load))

        processes.append(p)
        p.start()

    for i in range(n_processes)
        new_data = queue_data.get()
        stored_data.append(new_data)    

    for p in processes:
        p.join()
    print(processes)    
Mott The Tuple
źródło
-4

W przypadku platformy Linux / Unix / MacOS forkmap jest szybkim rozwiązaniem.

Maxim Imakaev
źródło