Wskaźnik postępu podczas operacji pand

158

Regularnie wykonuję operacje na pandach na ramkach danych w ponad 15 milionach wierszy i chciałbym mieć dostęp do wskaźnika postępu dla poszczególnych operacji.

Czy istnieje tekstowy wskaźnik postępu dla operacji pandy podziel-zastosuj-połącz?

Na przykład w czymś takim:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

gdzie feature_rollupjest dość skomplikowaną funkcją, która pobiera wiele kolumn DF i tworzy nowe kolumny użytkowników różnymi metodami. Te operacje mogą zająć trochę czasu w przypadku dużych ramek danych, więc chciałbym wiedzieć, czy możliwe jest wyświetlenie tekstu w notatniku iPython, który aktualizuje mnie o postępie.

Do tej pory wypróbowałem wskaźniki postępu pętli kanonicznej dla Pythona, ale nie wchodzą one w interakcję z pandami w żaden znaczący sposób.

Mam nadzieję, że jest coś, co przeoczyłem w bibliotece / dokumentacji pand, co pozwala poznać postęp kombajnu z podziałem na aplikacje. Prosta implementacja może spojrzeć na całkowitą liczbę podzbiorów ramek danych, na których działa applyfunkcja i zgłosić postęp jako ukończony ułamek tych podzbiorów.

Czy to może coś, co trzeba dodać do biblioteki?

cwharland
źródło
czy wykonałeś% prun (profil) na kodzie? czasami możesz wykonać operacje na całej klatce przed złożeniem wniosku, aby wyeliminować wąskie gardła
Jeff,
@Jeff: jasne, zrobiłem to wcześniej, aby wycisnąć z tego każdy kawałek wydajności. Problem naprawdę sprowadza się do granicy pseudo zmniejszania mapy, nad którą pracuję, ponieważ wiersze są w dziesiątkach milionów, więc nie spodziewam się, że super prędkość wzrośnie tylko po to, aby uzyskać informacje zwrotne na temat postępu.
cwharland
Rozważ cytonizację: pandas.pydata.org/pandas-docs/dev/ ...
Andy Hayden
@AndyHayden - Jak skomentowałem twoją odpowiedź, twoja implementacja jest całkiem dobra i dodaje trochę czasu do całej pracy. Zcytonizowałem również trzy operacje w ramach pakietu zbiorczego funkcji, które odzyskały cały czas poświęcony teraz na raportowanie postępu. Więc w końcu założę się, że będę miał paski postępu ze skróceniem całkowitego czasu przetwarzania, jeśli wykonam cython na całej funkcji.
cwharland

Odpowiedzi:

277

Ze względu na popularne zapotrzebowanie tqdmdodano obsługę pandas. W przeciwieństwie do innych odpowiedzi, nie spowolni to zauważalnie pand - oto przykład dla DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

Jeśli jesteś zainteresowany tym, jak to działa (i jak zmodyfikować to dla własnych wywołań zwrotnych), zobacz przykłady na github , pełną dokumentację na pypi lub zaimportuj moduł i uruchom help(tqdm).

EDYTOWAĆ


Aby bezpośrednio odpowiedzieć na oryginalne pytanie, zamień:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

z:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Uwaga: tqdm <= v4.8 : W przypadku wersji tqdm poniżej 4.8 zamiast tego tqdm.pandas()trzeba było:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
casper.dcl
źródło
5
tqdmzostał stworzony dla zwykłych iterable pierwotnie: from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): passObsługa pand była niedawnym
hackiem, który
6
Przy okazji, jeśli używasz notebooków Jupyter, możesz również użyć tqdm_notebooks, aby uzyskać ładniejszy pasek. Razem z pandami musisz obecnie utworzyć instancję, tak jak from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs) tutaj
grinsbaeckchen
2
Od wersji 4.8.1 - zamiast tego użyj tqdm.pandas (). github.com/tqdm/tqdm/commit/ ...
mork
1
Dzięki, @mork ma rację. Pracujemy (powoli) w kierunku tqdmwersji 5, która sprawia, że ​​rzeczy są bardziej zmodularyzowane.
casper.dcl
1
Najnowsze zalecenia dotyczące składni można znaleźć w dokumentacji tqdm Pandas tutaj: pypi.python.org/pypi/tqdm#pandas-integration
Manu CJ
18

Aby poprawić odpowiedź Jeffa (i mieć to jako funkcję wielokrotnego użytku).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Uwaga: zastosowanie aktualizacji procentowych postępu w linii . Jeśli twoja funkcja ma standardowe ustawienia, to nie zadziała.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

Jak zwykle możesz dodać to do swoich obiektów grupowania jako metodę:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

Jak wspomniano w komentarzach, nie jest to funkcja, którą podstawowe pandy byłyby zainteresowane wdrożeniem. Ale python pozwala ci tworzyć je dla wielu obiektów / metod pand (zrobienie tego wymagałoby sporo pracy ... chociaż powinieneś być w stanie uogólnić to podejście).

Andy Hayden
źródło
Mówię „sporo pracy”, ale prawdopodobnie mógłbyś przepisać całą tę funkcję jako (bardziej ogólny) dekorator.
Andy Hayden
Dzięki za rozwinięcie postu Jeffa. Zaimplementowałem oba, a spowolnienie dla każdego jest dość minimalne (dodałem w sumie 1,1 minuty do operacji, która zajęła 27 minut). W ten sposób mogę zobaczyć postęp i biorąc pod uwagę doraźny charakter tych operacji, uważam, że jest to dopuszczalne spowolnienie.
cwharland
Świetnie, cieszę się, że pomogło. Zaskoczyło mnie spowolnienie (kiedy próbowałem na przykładzie), spodziewałem się, że będzie dużo gorzej.
Andy Hayden
1
Aby jeszcze bardziej zwiększyć wydajność opublikowanych metod, byłem leniwy w kwestii importu danych (pandy są po prostu zbyt dobre w radzeniu sobie z niechlujnym csv !!), a kilka moich wpisów (~ 1%) całkowicie wypuściło wstawki (myślę, że całość rekordy wstawiane w pojedyncze pola). Wyeliminowanie ich powoduje ogromne przyspieszenie w zestawianiu funkcji, ponieważ nie było niejasności co do tego, co należy zrobić podczas operacji rozdzielania aplikacji i łączenia.
cwharland
1
Mam 8 minut ... ale dodałem coś do pakietu zbiorczego funkcji (więcej funkcji -> lepszy AUC!). Te 8 minut przypada na fragment (obecnie w sumie dwa fragmenty) z każdym fragmentem w sąsiedztwie 12 milionów wierszy. Więc tak ... 16 minut na wykonanie ogromnych operacji na 24 milionach wierszy przy użyciu HDFStore (a pakiet funkcji nltk zawiera). Całkiem dobry. Miejmy nadzieję, że Internet nie ocenia mnie na podstawie początkowej ignorancji lub ambiwalencji wobec pomieszanych wstawek =)
cwharland
11

Jeśli potrzebujesz pomocy, jak używać tego w notatniku Jupyter / ipython, tak jak ja, oto pomocny przewodnik i źródło odpowiedniego artykułu :

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Zwróć uwagę na podkreślenie w instrukcji importu dla _tqdm_notebook. Jak wspomniano w przywołanym artykule, rozwój jest na późnym etapie beta.

Victor Vulovic
źródło
8

Dla każdego, kto chce zastosować tqdm w swoim niestandardowym kodzie równoległego pandy.

(Przez lata próbowałem zrównoleglenie niektórych bibliotek, ale nigdy nie znalazłem rozwiązania umożliwiającego zrównoleglenie w 100%, głównie dla funkcji stosującej, i zawsze musiałem wracać po mój „ręczny” kod).

df_multi_core - to ten, do którego dzwonisz. Akceptuje:

  1. Twój obiekt df
  2. Nazwa funkcji, którą chcesz wywołać
  3. Podzbiór kolumn, na których można wykonać funkcję (pomaga skrócić czas / pamięć)
  4. Liczba zadań do równoległego uruchomienia (-1 lub pominięte dla wszystkich rdzeni)
  5. Wszelkie inne kwargi, które akceptuje funkcja df (np. „Axis”)

_df_split - jest to wewnętrzna funkcja pomocnicza, która musi być umieszczona globalnie względem działającego modułu (Pool.map jest „zależna od miejsca”), w przeciwnym razie zlokalizowałbym ją wewnętrznie.

oto kod z mojego sedna (dodam tam więcej testów funkcji pand):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Poniżej znajduje się kod testowy dla równoległego zastosowania z tqdm „progress_apply”.

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

W danych wyjściowych można zobaczyć 1 pasek postępu do pracy bez równoległości i paski postępu na rdzeń podczas pracy z równoległością. Występuje niewielki problem i czasami reszta rdzeni pojawia się od razu, ale nawet wtedy myślę, że jest to przydatne, ponieważ otrzymujesz statystyki postępu na rdzeń (it / s i całkowite rekordy, na przykład)

wprowadź opis obrazu tutaj

Dziękuję @abcdaa za tę wspaniałą bibliotekę!

mork
źródło
1
Dzięki @mork - dodaj do github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar lub utwórz nową stronę na github.com/tqdm/tqdm/wiki
. dcl
Dzięki, ale musiałem zmienić tę część: z try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs)powodu wyjątku KeyError zamiast ValueError, zmień na Exception, aby obsłużyć wszystkie przypadki.
Marius
Dzięki @mork - ta odpowiedź powinna być wyższa.
Andy
5

Możesz to łatwo zrobić za pomocą dekoratora

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

następnie po prostu użyj funkcji modified_function (i zmień, kiedy chcesz ją wydrukować)

Jeff
źródło
1
Oczywiste ostrzeżenie spowolni twoją funkcję! Możesz nawet zaktualizować go za pomocą stackoverflow.com/questions/5426546/ ... np. Count / len jako procent.
Andy Hayden
tak - będziesz miał porządek (liczbę grup), więc w zależności od wąskiego gardła może to mieć znaczenie
Jeff
być może intuicyjną rzeczą do zrobienia jest zawinięcie tego w logged_apply(g, func)funkcję, w której miałbyś dostęp do zamówienia i mógłbyś logować od początku.
Andy Hayden
Zrobiłem powyższe w mojej odpowiedzi, również bezczelną aktualizację procentową. Właściwie nie mogłem sprawić, żeby twój działał ... Myślę, że z okładami. Jeśli używasz go do aplikacji, i tak nie jest to takie ważne.
Andy Hayden
1

Zmieniłem odpowiedź Jeffa, dodając sumę, abyś mógł śledzić postęp i zmienną, aby po prostu wydrukować każdą iterację X (to faktycznie znacznie poprawia wydajność, jeśli "print_at" jest rozsądnie wysoki)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

funkcja clear_output () pochodzi z

from IPython.core.display import clear_output

jeśli nie na IPythonie, odpowiedź Andy'ego Haydena robi to bez niej

Filipe Silva
źródło