Mam pojedynczy duży plik tekstowy, w którym chcę przetworzyć każdą linię (wykonać pewne operacje) i przechowywać je w bazie danych. Ponieważ pojedynczy prosty program trwa zbyt długo, chcę, aby był wykonywany za pomocą wielu procesów lub wątków. Każdy wątek / proces powinien odczytywać RÓŻNE dane (różne wiersze) z tego pojedynczego pliku i wykonywać pewne operacje na ich fragmencie danych (wierszach) i umieszczać je w bazie danych tak, aby na końcu mieć wszystkie przetworzone dane i moje baza danych jest zrzucana wraz z potrzebnymi mi danymi.
Ale nie jestem w stanie zrozumieć, jak do tego podejść.
python
multithreading
multiprocessing
pranavk
źródło
źródło
Odpowiedzi:
To, czego szukasz, to wzorzec Producent / Konsument
Podstawowy przykład gwintowania
Oto podstawowy przykład użycia modułu wątkowego (zamiast wieloprocesorowego)
import threading import Queue import sys def do_work(in_queue, out_queue): while True: item = in_queue.get() # process result = item out_queue.put(result) in_queue.task_done() if __name__ == "__main__": work = Queue.Queue() results = Queue.Queue() total = 20 # start for workers for i in xrange(4): t = threading.Thread(target=do_work, args=(work, results)) t.daemon = True t.start() # produce data for i in xrange(total): work.put(i) work.join() # get the results for i in xrange(total): print results.get() sys.exit()
Nie udostępniasz obiektu pliku wątkom. Pracowałbyś dla nich, dostarczając do kolejki wiersze danych. Następnie każdy wątek pobierałby wiersz, przetwarzał go, a następnie zwracał do kolejki.
W module wieloprocesorowym wbudowano kilka bardziej zaawansowanych funkcji udostępniania danych, takich jak listy i specjalny rodzaj kolejki . Istnieją pewne kompromisy związane z używaniem przetwarzania wieloprocesowego w porównaniu z wątkami i zależy to od tego, czy Twoja praca jest związana z procesorem czy we / wy.
Przykład podstawowego przetwarzania wieloprocesowego
Oto naprawdę podstawowy przykład puli wieloprocesowej
from multiprocessing import Pool def process_line(line): return "FOO: %s" % line if __name__ == "__main__": pool = Pool(4) with open('file.txt') as source_file: # chunk the work into batches of 4 lines at a time results = pool.map(process_line, source_file, 4) print results
Pula to wygodny obiekt zarządzający własnymi procesami. Ponieważ otwarty plik może iterować po swoich wierszach, możesz przekazać go do programu
pool.map()
, który zapętli go i dostarczy wiersze do funkcji roboczej. Map blokuje i zwraca cały wynik po zakończeniu. Należy pamiętać, że jest to zbyt uproszczony przykład i żepool.map()
przed wykonaniem pracy odczyta cały plik do pamięci na raz. Jeśli spodziewasz się dużych plików, pamiętaj o tym. Istnieją bardziej zaawansowane sposoby projektowania konfiguracji producenta / konsumenta.Ręczna „pula” z limitem i ponownym sortowaniem linii
To jest ręczny przykład mapy Pool.map , ale zamiast zużywać całą iterację za jednym razem, możesz ustawić rozmiar kolejki, aby podawać ją tylko kawałek po kawałku tak szybko, jak to możliwe. Dodałem również numery linii, abyś mógł je śledzić i odnosić się do nich, jeśli chcesz, później.
from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() line_no, line = item # exit signal if line == None: return # fake work time.sleep(.5) result = (line_no, line) out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data with open("source.txt") as f: iters = itertools.chain(f, (None,)*num_workers) for num_and_line in enumerate(iters): work.put(num_and_line) for p in pool: p.join() # get the results # example: [(1, "foo"), (10, "bar"), (0, "start")] print sorted(results)
źródło
(None,) * num_workers
tworzy krotkę wartości None równych wielkości liczby pracowników. Będą to wartości wartownicze, które mówią każdemu wątkowi, aby zakończył pracę, ponieważ nie ma już pracy.itertools.chain
Funkcja pozwala ci umieścić wiele sekwencji razem w jednym wirtualnym sekwencji bez konieczności kopiowania czegokolwiek. Otrzymujemy więc to, że najpierw przechodzi przez linie w pliku, a następnie wartości None.Oto naprawdę głupi przykład, który wymyśliłem:
import os.path import multiprocessing def newlinebefore(f,n): f.seek(n) c=f.read(1) while c!='\n' and n > 0: n-=1 f.seek(n) c=f.read(1) f.seek(n) return n filename='gpdata.dat' #your filename goes here. fsize=os.path.getsize(filename) #size of file (in bytes) #break the file into 20 chunks for processing. nchunks=20 initial_chunks=range(1,fsize,fsize/nchunks) #You could also do something like: #initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. with open(filename,'r') as f: start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) end_byte=[i-1 for i in start_byte] [1:] + [None] def process_piece(filename,start,end): with open(filename,'r') as f: f.seek(start+1) if(end is None): text=f.read() else: nbytes=end-start+1 text=f.read(nbytes) # process text here. createing some object to be returned # You could wrap text into a StringIO object if you want to be able to # read from it the way you would a file. returnobj=text return returnobj def wrapper(args): return process_piece(*args) filename_repeated=[filename]*len(start_byte) args=zip(filename_repeated,start_byte,end_byte) pool=multiprocessing.Pool(4) result=pool.map(wrapper,args) #Now take your results and write them to the database. print "".join(result) #I just print it to make sure I get my file back ...
Najtrudniejsze tutaj jest upewnienie się, że podzieliliśmy plik na znaki nowej linii, aby nie przegapić żadnych wierszy (lub odczytać tylko częściowe wiersze). Następnie każdy proces odczytuje swoją część pliku i zwraca obiekt, który może zostać umieszczony w bazie danych przez główny wątek. Oczywiście może być konieczne nawet wykonanie tej części fragmentami, aby nie trzeba było przechowywać wszystkich informacji w pamięci naraz. (jest to dość łatwe do osiągnięcia - wystarczy podzielić listę „argumentów” na X części i wywołać
pool.map(wrapper,chunk)
- patrz tutaj )źródło
dobrze podziel pojedynczy duży plik na wiele mniejszych plików i przetwórz każdy z nich w osobnych wątkach.
źródło