Czy aplikacja Pandas DataFrame () używa wszystkich rdzeni?

105

Od sierpnia 2017 r. Pandas DataFame.apply () jest niestety nadal ograniczona do pracy z jednym rdzeniem, co oznacza, że ​​maszyna wielordzeniowa będzie marnować większość czasu obliczeniowego po uruchomieniu df.apply(myfunc, axis=1).

Jak można wykorzystać wszystkie rdzenie do równoległego uruchamiania aplikacji na ramce danych?

Roko Mijic
źródło

Odpowiedzi:

79

Możesz skorzystać z swifterpakietu:

pip install swifter

Działa jako wtyczka dla pand, umożliwiając ponowne użycie applyfunkcji:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Automatycznie znajdzie najbardziej efektywny sposób zrównoleglenia funkcji, bez względu na to, czy jest wektoryzowana (jak w powyższym przykładzie), czy nie.

Więcej przykładów i porównanie wydajności są dostępne na GitHub. Należy pamiętać, że pakiet jest w fazie rozwoju, więc interfejs API może ulec zmianie.

Pamiętaj również, że nie zadziała to automatycznie w przypadku kolumn typu string. Używając łańcuchów, Swifter powróci do „prostych” pand apply, które nie będą równoległe. W takim przypadku nawet wymuszenie jego użycia dasknie spowoduje poprawy wydajności i lepiej byłoby po prostu ręcznie podzielić zestaw danych i zrównoleglenie za pomocąmultiprocessing .

slhck
źródło
1
Z czystej ciekawości, czy istnieje sposób na ograniczenie liczby rdzeni używanych podczas stosowania równoległego? Mam wspólny serwer, więc jeśli złapię wszystkie 32 rdzenie, nikt nie będzie zadowolony.
Maksim Khaitovich
1
@MaximHaytovich Nie wiem. Swifter używa dask w tle, więc może respektuje te ustawienia: stackoverflow.com/a/40633117/435093 - w przeciwnym razie polecam otwarcie problemu na GitHubie. Autor jest bardzo elastyczny.
slhck
@slhck thanks! Kopnę to trochę bardziej. Wydaje się, że i tak nie działa na serwerze Windows - po prostu zawiesza się, nic nie robiąc przy zadaniu zabawki
Maksim Khaitovich
czy możesz mi pomóc w odpowiedzi na to: - stackoverflow.com/questions/53561794/ ...
ak3191
2
W przypadku smyczków po prostu dodaj w allow_dask_on_strings(enable=True)ten sposób: df.swifter.allow_dask_on_strings(enable=True).apply(some_function) Źródło: github.com/jmcarpenter2/swifter/issues/45
Sumit Sidana
104

Najprostszym sposobem jest użycie map_partitions firmy Dask . Potrzebujesz importu (będziesz musiał pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

a składnia to

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(Uważam, że 30 to odpowiednia liczba partycji, jeśli masz 16 rdzeni). Dla kompletności zmierzyłem czas na mojej maszynie (16 rdzeni):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0,010668013244867325

Podanie 10- krotnego przyspieszenia przy przechodzeniu z pand dotyczy zastosowania dask na partycjach. Oczywiście, jeśli masz funkcję, którą możesz wektoryzować, powinieneś - w tym przypadku funkcja ( y*(x**2+1)) jest trywialnie wektoryzowana, ale jest wiele rzeczy, których nie można wektoryzować.

Roko Mijic
źródło
2
Dobrze wiedzieć, dziękuję za wysłanie wiadomości. Czy możesz wyjaśnić, dlaczego wybrałeś 30 partycji? Czy wydajność zmienia się po zmianie tej wartości?
Andrew L
4
@AndrewL Zakładam, że każda partycja jest obsługiwana przez oddzielny proces, a przy 16 rdzeniach zakładam, że jednocześnie może działać 16 lub 32 procesów. Wypróbowałem to i wydaje się, że wydajność poprawia się do 32 partycji, ale dalsze zwiększenie nie ma korzystnego wpływu. Zakładam, że z czterordzeniową maszyną chciałbyś mieć 8 partycji itd. Zauważ, że zauważyłem pewną poprawę między 16 a 32, więc myślę, że naprawdę chcesz 2x $ NUM_PROCESSORS
Roko Mijic
9
The get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
Chodzi
6
Dla dask v0.20.0 i nowszych użyj ddata.map_partitions (lambda df: df.apply ((wiersz lambda: myfunc (* wiersz)), axis = 1)). Compute (scheduleer = 'process') lub jeden z inne opcje harmonogramu. Bieżący kod zgłasza „TypeError: Słowo kluczowe get = zostało usunięte. Zamiast tego użyj słowa kluczowego scheduleer = z nazwą żądanego programu planującego, na przykład„ wątki ”lub„ procesy ””
mork
1
Upewnij się, że zanim to zrobisz, ramka danych nie ma zduplikowanych indeksów w trakcie generowania ValueError: cannot reindex from a duplicate axis. Aby to obejść, należy usunąć zduplikowane indeksy df = df[~df.index.duplicated()]lub zresetować indeksy o df.reset_index(inplace=True).
Habib Karbasian
24

pandarallelzamiast tego możesz spróbować : Proste i wydajne narzędzie do zrównoleglania operacji pand na wszystkich procesorach (w systemie Linux i macOS)

  • Zrównoleglenie ma koszt (tworzenie nowych procesów, wysyłanie danych przez pamięć współdzieloną itp.), Więc zrównoleglenie jest skuteczne tylko wtedy, gdy ilość obliczeń do zrównoleglenia jest wystarczająco wysoka. W przypadku bardzo małej ilości danych użycie paralezacji nie zawsze jest tego warte.
  • Zastosowane funkcje NIE powinny być funkcjami lambda.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

zobacz https://github.com/nalepae/pandarallel

G_KOBELIEF
źródło
cześć, nie mogę rozwiązać jednego problemu, używając pandarallel pojawia się błąd: AttributeError: Can't pickle local object „Preparat_worker. <locals> .closure. <locals> .wrapper”. Czy możesz mi w tym pomóc?
Alex Cam
@Alex Sry Nie jestem programistą tego modułu. Jak wyglądają Twoje kody? Możesz spróbować zadeklarować swoje „funkcje wewnętrzne” jako globalne? (tylko zgadnij)
G_KOBELIEF
@AlexCam Twoja funkcja powinna być zdefiniowana poza inną funkcją, aby Python mógł ją wykorzystać do przetwarzania wieloprocesowego
Kenan
1
@G_KOBELIEF W Pythonie> 3.6 możemy użyć funkcji lambda z pandaparallel
user110244
18

Jeśli chcesz pozostać w natywnym Pythonie:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

zastosuje funkcję frównolegle do kolumny colramki danychdf

Olivier Cruchant
źródło
Podążając za takim podejściem, dostałem ValueError: Length of values does not match length of indexod __setitem__in pandas/core/frame.py. Nie jestem pewien, czy zrobiłem coś źle lub czy przypisywanie do df['newcol']nie jest bezpieczne dla wątków.
Rattle
2
Możesz zapisać pool.map na pośredniej liście temp_result, aby umożliwić sprawdzenie, czy długość jest zgodna z df, a następnie wykonać df ['newcol'] = temp_result?
Olivier Cruchant
masz na myśli tworzenie nowej kolumny? czego byś użył?
Olivier Cruchant
tak, przypisując wynik mapy do nowej kolumny ramki danych. Czy map nie zwraca listy wyników każdego fragmentu wysłanego do funkcji f? Więc co się stanie, gdy przypiszesz to do kolumny „newcol”? Korzystanie z Pand i Pythona 3
Mina
W rzeczywistości działa naprawdę płynnie! Próbowałeś tego? Tworzy listę o tej samej długości df, w tym samym porządku, w jakim zostało wysłane. Dosłownie równolegle c2 = f (c1). Nie ma prostszego sposobu na wieloprocesowość w Pythonie. Pod względem wydajności wydaje się, że Ray może również robić dobre rzeczy (w kierunku datascience.com/ ... ), ale nie jest tak dojrzały, a instalacja nie zawsze przebiega gładko z mojego doświadczenia
Olivier Cruchant
2

Oto przykład transformatora bazowego sklearn, w którym zastosowanie pandy jest równoległe

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

aby uzyskać więcej informacji, zobacz https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

Maxim Balatsko
źródło
0

Aby użyć wszystkich rdzeni (fizycznych lub logicznych), możesz spróbować mapplyalternatywnie dla swifteri pandarallel.

Możesz ustawić liczbę rdzeni (i zachowanie fragmentacji) po uruchomieniu:

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

Domyślnie ( n_workers=-1) pakiet wykorzystuje wszystkie fizyczne procesory dostępne w systemie. Jeśli twój system używa hiperwątkowości (zwykle pojawia się dwukrotnie więcej fizycznych procesorów), mapplypojawi się jeden dodatkowy pracownik , który nada priorytet puli wieloprocesowej nad innymi procesami w systemie.

W zależności od definicji all your coresmożesz również zamiast tego użyć wszystkich rdzeni logicznych (uwaga, w ten sposób procesy związane z procesorem będą walczyć o fizyczne procesory, co może spowolnić działanie):

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)
ddelange
źródło