Czy istnieje prosty sposób na równoległe uruchamianie pandas.DataFrame.isin?

25

Mam program do modelowania i oceniania, który intensywnie wykorzystuje DataFrame.isinfunkcję pand, przeszukując listy „podobnych” rejestrów Facebooka dla poszczególnych użytkowników dla każdej z kilku tysięcy konkretnych stron. Jest to najbardziej czasochłonna część programu, bardziej niż modelowanie lub ocenianie elementów, po prostu dlatego, że działa tylko na jednym rdzeniu, podczas gdy reszta działa na kilkudziesięciu jednocześnie.

Chociaż wiem, że mogę ręcznie rozbić ramkę danych na części i równolegle uruchomić tę operację, czy istnieje jakikolwiek prosty sposób, aby to zrobić automatycznie? Innymi słowy, czy istnieje jakikolwiek pakiet, który rozpozna, że ​​prowadzę łatwą do delegowania operację i automatycznie ją rozpowszechnię? Być może to wymaga zbyt wiele, ale w przeszłości byłem wystarczająco zaskoczony tym, co jest już dostępne w Pythonie, więc uważam, że warto o to zapytać.

Wszelkie inne sugestie dotyczące tego, jak można to osiągnąć (nawet jeśli nie za pomocą jakiejś magicznej paczki jednorożca!) Również będą mile widziane. Głównie po prostu próbuję znaleźć sposób na zmniejszenie ilości 15-20 minut na cykl bez poświęcania równej ilości czasu na kodowanie rozwiązania.

Therriault
źródło
Jak duża jest twoja lista wartości? Czy próbowałeś przekazać go jako zestaw? Jeśli chodzi o paralelizm, możesz być zainteresowany Joblib. Jest łatwy w użyciu i może przyspieszyć obliczenia. Używaj go z dużymi porcjami danych.
oao
Inną opcją jest przeformułowanie problemu jako połączenie. Połączenia są znacznie szybsze w Pandach stackoverflow.com/questions/23945493/...
Brian Spiering
Jeszcze inną opcją jest użycie np.in1d, który jest również szybszy stackoverflow.com/questions/21738882/fast-pandas-filtering
Brian Spiering

Odpowiedzi:

8

Niestety, równoległość nie jest jeszcze zaimplementowana w pandach. Możesz dołączyć do tego wydania github, jeśli chcesz uczestniczyć w rozwoju tej funkcji.

Nie znam żadnego „magicznego pakietu jednorożca” do tego celu, więc najlepszą rzeczą będzie napisanie własnego rozwiązania. Ale jeśli nadal nie chcesz poświęcać na to czasu i chcesz nauczyć się czegoś nowego - możesz wypróbować dwie metody wbudowane w MongoDB (redukcja mapy i szkielet ag). Zobacz mongodb_agg_framework .

Stanpol
źródło
6

Myślę, że najlepszym rozwiązaniem byłaby rozeta . Uważam to za niezwykle przydatne i łatwe. Sprawdź metody pand .

Możesz dostać to przez pip .

dmvianna
źródło
Polecam zdobycie rosetty, przechodząc bezpośrednio do GitHub. To gwarantuje, że otrzymasz najnowszą wersję. github.com/columbia-applied-data-science/rosetta
Ian Langmore
0

Jest bardziej powszechna wersja tego pytania dotyczące parallelization na pandy mają zastosowanie funkcji - tak to jest orzeźwiający pytanie :)

Po pierwsze , chciałbym wspomnieć o szybszym, ponieważ poprosiłeś o rozwiązanie „spakowane”, i pojawia się ono na większości pytań SO dotyczących równoległości pand.

Ale .. nadal chciałbym udostępnić dla niego mój osobisty kod źródłowy, ponieważ po kilku latach pracy z DataFrame nigdy nie znalazłem rozwiązania w 100% zrównoleglonego (głównie dla funkcji Apply) i zawsze musiałem wracać po mój „ ręczny ”.

Dzięki tobie sprawiłem, że bardziej ogólna jest obsługa dowolnej (teoretycznie) metody DataFrame według jej nazwy (abyś nie musiał zachowywać wersji dla isin, aplikować itp.).

Przetestowałem to na funkcjach „isin”, „Apply” i „isna”, używając zarówno Pythona 2.7, jak i 3.6. Ma mniej niż 20 linii i postępowałem zgodnie z konwencją nazewnictwa pand, taką jak „podzbiór” i „njobs”.

Dodałem również porównanie czasu z równoważnym kodem dask dla „isin” i wydaje się ~ X2 razy wolniejsze niż ta treść.

Zawiera 2 funkcje:

df_multi_core - to ten, do którego dzwonisz. Akceptuje:

  1. Twój obiekt df
  2. Nazwa funkcji, którą chcesz wywołać
  3. Podzbiór kolumn, na których można wykonać funkcję (pomaga skrócić czas / pamięć)
  4. Liczba zadań do uruchomienia równoległego (-1 lub pominięcie dla wszystkich rdzeni)
  5. Wszelkie inne kwargsy, które akceptuje funkcja df (np. „Oś”)

_df_split - jest to wewnętrzna funkcja pomocnicza, która musi być globalnie umieszczona w uruchomionym module (Pool.map jest „zależna od położenia”), w przeciwnym razie zlokalizowałbym ją wewnętrznie.

Oto kod z mojego GIST (dodam Więcej Pandy testy funkcyjne tam):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Poniżej znajduje się kod testowy dla równoległego isin , porównujący natywną, wielordzeniową wydajność gist i dask. Na maszynie I7 z 8 rdzeniami fizycznymi miałem około X4 razy przyspieszenie. Bardzo chciałbym usłyszeć, co masz na swoich prawdziwych danych!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88
Mork
źródło
@Therriault Dodałem porównanie dask z isin- wydaje się, że fragment kodu jest najskuteczniejszy z 'isin' - ~ X1,75 razy szybciej niż dask (w porównaniu z applyfunkcją, która jest tylko 5% szybsza niż dask)
mork