Przetwarzanie pojedynczego pliku z wielu procesów

82

Mam pojedynczy duży plik tekstowy, w którym chcę przetworzyć każdą linię (wykonać pewne operacje) i przechowywać je w bazie danych. Ponieważ pojedynczy prosty program trwa zbyt długo, chcę, aby był wykonywany za pomocą wielu procesów lub wątków. Każdy wątek / proces powinien odczytywać RÓŻNE dane (różne wiersze) z tego pojedynczego pliku i wykonywać pewne operacje na ich fragmencie danych (wierszach) i umieszczać je w bazie danych tak, aby na końcu mieć wszystkie przetworzone dane i moje baza danych jest zrzucana wraz z potrzebnymi mi danymi.

Ale nie jestem w stanie zrozumieć, jak do tego podejść.

pranavk
źródło
3
Miłe pytanie. Miałem też tę wątpliwość. Chociaż wybrałem opcję podzielenia pliku na mniejsze pliki :)
Sushant Gupta

Odpowiedzi:

109

To, czego szukasz, to wzorzec Producent / Konsument

Podstawowy przykład gwintowania

Oto podstawowy przykład użycia modułu wątkowego (zamiast wieloprocesorowego)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

Nie udostępniasz obiektu pliku wątkom. Pracowałbyś dla nich, dostarczając do kolejki wiersze danych. Następnie każdy wątek pobierałby wiersz, przetwarzał go, a następnie zwracał do kolejki.

W module wieloprocesorowym wbudowano kilka bardziej zaawansowanych funkcji udostępniania danych, takich jak listy i specjalny rodzaj kolejki . Istnieją pewne kompromisy związane z używaniem przetwarzania wieloprocesowego w porównaniu z wątkami i zależy to od tego, czy Twoja praca jest związana z procesorem czy we / wy.

Przykład podstawowego przetwarzania wieloprocesowego

Oto naprawdę podstawowy przykład puli wieloprocesowej

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

Pula to wygodny obiekt zarządzający własnymi procesami. Ponieważ otwarty plik może iterować po swoich wierszach, możesz przekazać go do programu pool.map(), który zapętli go i dostarczy wiersze do funkcji roboczej. Map blokuje i zwraca cały wynik po zakończeniu. Należy pamiętać, że jest to zbyt uproszczony przykład i że pool.map()przed wykonaniem pracy odczyta cały plik do pamięci na raz. Jeśli spodziewasz się dużych plików, pamiętaj o tym. Istnieją bardziej zaawansowane sposoby projektowania konfiguracji producenta / konsumenta.

Ręczna „pula” z limitem i ponownym sortowaniem linii

To jest ręczny przykład mapy Pool.map , ale zamiast zużywać całą iterację za jednym razem, możesz ustawić rozmiar kolejki, aby podawać ją tylko kawałek po kawałku tak szybko, jak to możliwe. Dodałem również numery linii, abyś mógł je śledzić i odnosić się do nich, jeśli chcesz, później.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)
jdi
źródło
1
To dobrze, ale co się stanie, jeśli przetwarzanie jest ograniczone we / wy? W takim przypadku paralelizm może spowolnić, a nie przyspieszyć. Przeszukiwania na pojedynczej ścieżce dysku są znacznie szybsze niż przeszukiwania międzyścieżkowe, a wykonywanie operacji we / wy równolegle ma tendencję do wprowadzania przeszukiwań międzyścieżkowych do tego, co w innym przypadku byłoby sekwencyjnym obciążeniem we / wy. Aby uzyskać pewne korzyści z równoległego wejścia / wyjścia, czasami bardzo pomaga użycie lustra RAID.
user1277476
2
@ jwillis0720 - Jasne. (None,) * num_workerstworzy krotkę wartości None równych wielkości liczby pracowników. Będą to wartości wartownicze, które mówią każdemu wątkowi, aby zakończył pracę, ponieważ nie ma już pracy. itertools.chainFunkcja pozwala ci umieścić wiele sekwencji razem w jednym wirtualnym sekwencji bez konieczności kopiowania czegokolwiek. Otrzymujemy więc to, że najpierw przechodzi przez linie w pliku, a następnie wartości None.
jdi
2
To lepiej wyjaśnione niż mój profesor, bardzo ładne +1.
lycuid
1
@ ℕʘʘḆḽḘ, nieco zredagowałem tekst, aby był bardziej przejrzysty. Wyjaśnia teraz, że środkowy przykład wyrzuci wszystkie dane pliku na raz do pamięci, co może być problemem, jeśli plik jest większy niż ilość pamięci RAM, którą obecnie masz dostępną. Następnie pokazuję w trzecim przykładzie, jak przejść wiersz po wierszu, aby nie zużywać całego pliku naraz.
jdi
1
@ ℕʘʘḆḽḘ przeczytaj dokumentację dla pool.Map (). Mówi, że podzieli iterowalne na kawałki i przekaże je pracownikom. W efekcie zajmie to wszystkie wiersze w pamięci. Tak, iterowanie po jednym wierszu na raz jest wydajne pod względem pamięci, ale jeśli w końcu zatrzymasz wszystkie te wiersze w pamięci, wrócisz do czytania całego pliku.
jdi
9

Oto naprawdę głupi przykład, który wymyśliłem:

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

Najtrudniejsze tutaj jest upewnienie się, że podzieliliśmy plik na znaki nowej linii, aby nie przegapić żadnych wierszy (lub odczytać tylko częściowe wiersze). Następnie każdy proces odczytuje swoją część pliku i zwraca obiekt, który może zostać umieszczony w bazie danych przez główny wątek. Oczywiście może być konieczne nawet wykonanie tej części fragmentami, aby nie trzeba było przechowywać wszystkich informacji w pamięci naraz. (jest to dość łatwe do osiągnięcia - wystarczy podzielić listę „argumentów” na X części i wywołać pool.map(wrapper,chunk) - patrz tutaj )

mgilson
źródło
-3

dobrze podziel pojedynczy duży plik na wiele mniejszych plików i przetwórz każdy z nich w osobnych wątkach.

Tanu
źródło
to nie jest to, czego chce OP !! ale tylko dla pomysłu ... nieźle.
DRPK