Aby mój kod był bardziej „pythonowy” i szybszy, używam funkcji „multiprocessing” i funkcji map do wysyłania a) funkcji i b) zakresu iteracji.
Wszczepione rozwiązanie (tj. Wywołanie tqdm bezpośrednio z zakresu tqdm.tqdm (zakres (0, 30)) nie działa z przetwarzaniem wieloprocesowym (zgodnie z poniższym kodem).
Pasek postępu jest wyświetlany od 0 do 100% (kiedy Python czyta kod?), Ale nie wskazuje faktycznego postępu funkcji mapy.
Jak wyświetlić pasek postępu, który wskazuje, na którym etapie znajduje się funkcja „mapa”?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
Każda pomoc lub sugestie są mile widziane ...
.starmap()
: Oto łatka doPool
dodawania.istarmap()
, która również będzie działaćtqdm
.Odpowiedzi:
Użyj imap zamiast map, co zwraca iterator przetwarzanych wartości.
from multiprocessing import Pool import tqdm import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': with Pool(2) as p: r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
źródło
starmap()
?for i in tqdm.tqdm(...): pass
może być prostsze niżlist(tqdm.tqdm)
chunk_size
dlap.imap
. Czy możnatqdm
aktualizować każdą iterację zamiast każdego fragmentu?Znalezione rozwiązanie: bądź ostrożny! Ze względu na przetwarzanie wieloprocesowe czas oszacowania (iteracja na pętlę, czas całkowity itp.) Może być niestabilny, ale pasek postępu działa idealnie.
Uwaga: Menedżer kontekstu dla puli jest dostępny tylko od wersji Python 3.3
from multiprocessing import Pool import time from tqdm import * def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': with Pool(processes=2) as p: max_ = 30 with tqdm(total=max_) as pbar: for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))): pbar.update()
źródło
pbar.close()
nie jest wymagane, zostanie zamknięte automatycznie po zakończeniuwith
tqdm
potrzebne jest tutaj drugie / wewnętrzne wezwanie?starmap()
?imap_unordered
jest tutaj kluczowy, daje najlepszą wydajność i najlepsze oceny paska postępu.Możesz użyć
p_tqdm
zamiast tego.https://github.com/swansonk14/p_tqdm
from p_tqdm import p_map import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': r = p_map(_foo, list(range(0, 30)))
źródło
pip install
. To zastępuje tqdm dla większości moich potrzebp_tqdm
jest ograniczone domultiprocessing.Pool
, niedostępne dla wątkówPrzepraszamy za spóźnienie, ale jeśli potrzebujesz tylko współbieżnej mapy, najnowsza wersja (
tqdm>=4.42.0
) ma teraz wbudowane:from tqdm.contrib.concurrent import process_map # or thread_map import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': r = process_map(_foo, range(0, 30), max_workers=2)
Źródła: https://tqdm.github.io/docs/contrib.concurrent/ i https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py
źródło
HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))
Jupyterna podstawie odpowiedzi Xavi Martíneza napisałem funkcję
imap_unordered_bar
. Można go używać w taki sam sposób, jakimap_unordered
z tą różnicą, że wyświetlany jest pasek przetwarzania.from multiprocessing import Pool import time from tqdm import * def imap_unordered_bar(func, args, n_processes = 2): p = Pool(n_processes) res_list = [] with tqdm(total = len(args)) as pbar: for i, res in tqdm(enumerate(p.imap_unordered(func, args))): pbar.update() res_list.append(res) pbar.close() p.close() p.join() return res_list def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': result = imap_unordered_bar(_foo, range(5))
źródło
Oto moja opinia na temat sytuacji, w których musisz uzyskać wyniki z równoległych funkcji wykonujących. Ta funkcja robi kilka rzeczy (jest jeszcze jeden mój post, który wyjaśnia to dokładniej), ale kluczową kwestią jest to, że istnieje kolejka oczekujących zadań i kolejka zadań zakończonych. Gdy pracownicy wykonują każde zadanie w kolejce oczekującej, dodają wyniki do kolejki zadań zakończonych. Możesz zawinąć czek do kolejki ukończonych zadań za pomocą paska postępu tqdm. Nie umieszczam tutaj implementacji funkcji do_work (), nie ma to znaczenia, ponieważ przesłanie tutaj ma na celu monitorowanie kolejki wykonanych zadań i aktualizowanie paska postępu za każdym razem, gdy jest wynik.
def par_proc(job_list, num_cpus=None, verbose=False): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Set the number of workers here num_workers = min(num_cpus, num_tasks) # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 with tqdm(total=num_tasks) as bar: while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 bar.update(completed_tasks_counter) for p in processes: p.join() return results
źródło
import multiprocessing as mp import tqdm some_iterable = ... def some_func(): # your logic ... if __name__ == '__main__': with mp.Pool(mp.cpu_count()-2) as p: list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
źródło
Takie podejście jest proste i działa.
from multiprocessing.pool import ThreadPool import time from tqdm import tqdm def job(): time.sleep(1) pbar.update() pool = ThreadPool(5) with tqdm(total=100) as pbar: for i in range(100): pool.apply_async(job) pool.close() pool.join()
źródło