Często muszę zastosować funkcję do grup o bardzo dużych DataFrame
(mieszanych typach danych) i chciałbym skorzystać z wielu rdzeni.
Mogę utworzyć iterator z grup i użyć modułu wieloprocesorowego, ale nie jest to wydajne, ponieważ każda grupa i wyniki funkcji muszą być marynowane do przesyłania wiadomości między procesami.
Czy jest jakiś sposób, aby uniknąć wytrawiania lub nawet DataFrame
całkowitego kopiowania ? Wygląda na to, że funkcje pamięci współdzielonej modułów wieloprocesorowych są ograniczone do numpy
tablic. Są jakieś inne opcje?
python
pandas
multiprocessing
shared-memory
user2303
źródło
źródło
Odpowiedzi:
Z powyższych komentarzy wynika, że jest to planowane od
pandas
jakiegoś czasu (jest też ciekawie wyglądającyrosetta
projekt, który właśnie zauważyłem).Jednak dopóki wszystkie funkcje równoległe nie zostaną włączone
pandas
, zauważyłem, że bardzo łatwo jest pisać wydajne i nie kopiujące pamięci równoległe rozszerzeniapandas
bezpośrednio przy użyciucython
+ OpenMP i C ++.Oto krótki przykład pisania równoległej sumy grupowej, której użycie wygląda mniej więcej tak:
import pandas as pd import para_group_demo df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) print para_group_demo.sum(df.a, df.b)
a wynik to:
sum key 0 6 1 11 2 4
Uwaga Bez wątpienia funkcjonalność tego prostego przykładu będzie ostatecznie częścią
pandas
. Jednak niektóre rzeczy będą bardziej naturalne do zrównoleglenia w C ++ przez jakiś czas i ważne jest, aby być świadomym, jak łatwo jest to połączyćpandas
.Aby to zrobić, napisałem proste rozszerzenie pliku o jednym źródle, którego kod następuje.
Zaczyna się od importu i definicji typów
from libc.stdint cimport int64_t, uint64_t from libcpp.vector cimport vector from libcpp.unordered_map cimport unordered_map cimport cython from cython.operator cimport dereference as deref, preincrement as inc from cython.parallel import prange import pandas as pd ctypedef unordered_map[int64_t, uint64_t] counts_t ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t ctypedef vector[counts_t] counts_vec_t
C ++
unordered_map
służy do sumowania przez pojedynczy wątek, avector
do sumowania przez wszystkie wątki.Teraz do funkcji
sum
. Zaczyna się od wpisanych widoków pamięci dla szybkiego dostępu:def sum(crit, vals): cdef int64_t[:] crit_view = crit.values cdef int64_t[:] vals_view = vals.values
Funkcja kontynuuje, dzieląc pół-równo na wątki (tutaj zakodowane na stałe do 4), a każdy wątek sumuje wpisy w swoim zakresie:
cdef uint64_t num_threads = 4 cdef uint64_t l = len(crit) cdef uint64_t s = l / num_threads + 1 cdef uint64_t i, j, e cdef counts_vec_t counts counts = counts_vec_t(num_threads) counts.resize(num_threads) with cython.boundscheck(False): for i in prange(num_threads, nogil=True): j = i * s e = j + s if e > l: e = l while j < e: counts[i][crit_view[j]] += vals_view[j] inc(j)
Po zakończeniu wątków funkcja scala wszystkie wyniki (z różnych zakresów) w jeden
unordered_map
:cdef counts_t total cdef counts_it_t it, e_it for i in range(num_threads): it = counts[i].begin() e_it = counts[i].end() while it != e_it: total[deref(it).first] += deref(it).second inc(it)
Pozostało tylko utworzyć
DataFrame
i zwrócić wyniki:key, sum_ = [], [] it = total.begin() e_it = total.end() while it != e_it: key.append(deref(it).first) sum_.append(deref(it).second) inc(it) df = pd.DataFrame({'key': key, 'sum': sum_}) df.set_index('key', inplace=True) return df
źródło