Możesz skorzystać z swifter
pakietu:
pip install swifter
Działa jako wtyczka dla pand, umożliwiając ponowne użycie apply
funkcji:
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
Automatycznie znajdzie najbardziej efektywny sposób zrównoleglenia funkcji, bez względu na to, czy jest wektoryzowana (jak w powyższym przykładzie), czy nie.
Więcej przykładów i porównanie wydajności są dostępne na GitHub. Należy pamiętać, że pakiet jest w fazie rozwoju, więc interfejs API może ulec zmianie.
Pamiętaj również, że nie zadziała to automatycznie w przypadku kolumn typu string. Używając łańcuchów, Swifter powróci do „prostych” pand apply
, które nie będą równoległe. W takim przypadku nawet wymuszenie jego użycia dask
nie spowoduje poprawy wydajności i lepiej byłoby po prostu ręcznie podzielić zestaw danych i zrównoleglenie za pomocąmultiprocessing
.
allow_dask_on_strings(enable=True)
ten sposób:df.swifter.allow_dask_on_strings(enable=True).apply(some_function)
Źródło: github.com/jmcarpenter2/swifter/issues/45Najprostszym sposobem jest użycie map_partitions firmy Dask . Potrzebujesz importu (będziesz musiał
pip install dask
):import pandas as pd import dask.dataframe as dd from dask.multiprocessing import get
a składnia to
data = <your_pandas_dataframe> ddata = dd.from_pandas(data, npartitions=30) def myfunc(x,y,z, ...): return <whatever> res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)
(Uważam, że 30 to odpowiednia liczba partycji, jeśli masz 16 rdzeni). Dla kompletności zmierzyłem czas na mojej maszynie (16 rdzeni):
data = pd.DataFrame() data['col1'] = np.random.normal(size = 1500000) data['col2'] = np.random.normal(size = 1500000) ddata = dd.from_pandas(data, npartitions=30) def myfunc(x,y): return y*(x**2+1) def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1) def pandas_apply(): return apply_myfunc_to_DF(data) def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get) def vectorized(): return myfunc(data['col1'], data['col2'] ) t_pds = timeit.Timer(lambda: pandas_apply()) print(t_pds.timeit(number=1))
t_dsk = timeit.Timer(lambda: dask_apply()) print(t_dsk.timeit(number=1))
t_vec = timeit.Timer(lambda: vectorized()) print(t_vec.timeit(number=1))
Podanie 10- krotnego przyspieszenia przy przechodzeniu z pand dotyczy zastosowania dask na partycjach. Oczywiście, jeśli masz funkcję, którą możesz wektoryzować, powinieneś - w tym przypadku funkcja (
y*(x**2+1)
) jest trywialnie wektoryzowana, ale jest wiele rzeczy, których nie można wektoryzować.źródło
The get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
ValueError: cannot reindex from a duplicate axis
. Aby to obejść, należy usunąć zduplikowane indeksydf = df[~df.index.duplicated()]
lub zresetować indeksy odf.reset_index(inplace=True)
.pandarallel
zamiast tego możesz spróbować : Proste i wydajne narzędzie do zrównoleglania operacji pand na wszystkich procesorach (w systemie Linux i macOS)from pandarallel import pandarallel from math import sin pandarallel.initialize() # FORBIDDEN df.parallel_apply(lambda x: sin(x**2), axis=1) # ALLOWED def func(x): return sin(x**2) df.parallel_apply(func, axis=1)
zobacz https://github.com/nalepae/pandarallel
źródło
Jeśli chcesz pozostać w natywnym Pythonie:
import multiprocessing as mp with mp.Pool(mp.cpu_count()) as pool: df['newcol'] = pool.map(f, df['col'])
zastosuje funkcję
f
równolegle do kolumnycol
ramki danychdf
źródło
ValueError: Length of values does not match length of index
od__setitem__
inpandas/core/frame.py
. Nie jestem pewien, czy zrobiłem coś źle lub czy przypisywanie dodf['newcol']
nie jest bezpieczne dla wątków.Oto przykład transformatora bazowego sklearn, w którym zastosowanie pandy jest równoległe
import multiprocessing as mp from sklearn.base import TransformerMixin, BaseEstimator class ParllelTransformer(BaseEstimator, TransformerMixin): def __init__(self, n_jobs=1): """ n_jobs - parallel jobs to run """ self.variety = variety self.user_abbrevs = user_abbrevs self.n_jobs = n_jobs def fit(self, X, y=None): return self def transform(self, X, *_): X_copy = X.copy() cores = mp.cpu_count() partitions = 1 if self.n_jobs <= -1: partitions = cores elif self.n_jobs <= 0: partitions = 1 else: partitions = min(self.n_jobs, cores) if partitions == 1: # transform sequentially return X_copy.apply(self._transform_one) # splitting data into batches data_split = np.array_split(X_copy, partitions) pool = mp.Pool(cores) # Here reduce function - concationation of transformed batches data = pd.concat( pool.map(self._preprocess_part, data_split) ) pool.close() pool.join() return data def _transform_part(self, df_part): return df_part.apply(self._transform_one) def _transform_one(self, line): # some kind of transformations here return line
aby uzyskać więcej informacji, zobacz https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8
źródło
Aby użyć wszystkich rdzeni (fizycznych lub logicznych), możesz spróbować
mapply
alternatywnie dlaswifter
ipandarallel
.Możesz ustawić liczbę rdzeni (i zachowanie fragmentacji) po uruchomieniu:
import pandas as pd import mapply mapply.init(n_workers=-1) ... df.mapply(myfunc, axis=1)
Domyślnie (
n_workers=-1
) pakiet wykorzystuje wszystkie fizyczne procesory dostępne w systemie. Jeśli twój system używa hiperwątkowości (zwykle pojawia się dwukrotnie więcej fizycznych procesorów),mapply
pojawi się jeden dodatkowy pracownik , który nada priorytet puli wieloprocesowej nad innymi procesami w systemie.W zależności od definicji
all your cores
możesz również zamiast tego użyć wszystkich rdzeni logicznych (uwaga, w ten sposób procesy związane z procesorem będą walczyć o fizyczne procesory, co może spowolnić działanie):import multiprocessing n_workers = multiprocessing.cpu_count() # or more explicit import psutil n_workers = psutil.cpu_count(logical=True)
źródło