Efektywne stosowanie funkcji równolegle do zgrupowanych pand DataFrame

89

Często muszę zastosować funkcję do grup o bardzo dużych DataFrame(mieszanych typach danych) i chciałbym skorzystać z wielu rdzeni.

Mogę utworzyć iterator z grup i użyć modułu wieloprocesorowego, ale nie jest to wydajne, ponieważ każda grupa i wyniki funkcji muszą być marynowane do przesyłania wiadomości między procesami.

Czy jest jakiś sposób, aby uniknąć wytrawiania lub nawet DataFramecałkowitego kopiowania ? Wygląda na to, że funkcje pamięci współdzielonej modułów wieloprocesorowych są ograniczone do numpytablic. Są jakieś inne opcje?

user2303
źródło
O ile wiem, nie ma możliwości udostępniania dowolnych obiektów. Zastanawiam się, czy wytrawianie zajmuje o wiele więcej czasu, niż korzyści wynikające z wieloprocesowości. Może powinieneś poszukać możliwości tworzenia większych pakietów roboczych dla każdego procesu, aby skrócić względny czas trawienia. Inną możliwością byłoby użycie przetwarzania wieloprocesowego podczas tworzenia grup.
Sebastian Werk
3
Robię coś takiego, ale używam UWSGI, Flask i preforking: ładuję ramkę danych pandy do procesu, rozwidlam ją x razy (czyniąc ją obiektem pamięci współdzielonej), a następnie wywołuję te procesy z innego procesu w Pythonie, w którym konkluduję wyniki. atm, używam JSON jako procesu komunikacji, ale to nadchodzi (ale nadal wysoce eksperymentalne): pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental
Carst
Nawiasem mówiąc, czy kiedykolwiek spojrzałeś na HDF5 z kawałkami? (HDF5 nie jest zapisywany do równoczesnego zapisu, ale możesz także zapisać do oddzielnych plików i na koniec
połączyć
7
to będzie skierowane do wersji 0.14, zobacz ten numer: github.com/pydata/pandas/issues/5751
Jeff
4
@Jeff został przesunięty do 0,15 = (
pyCthon

Odpowiedzi:

12

Z powyższych komentarzy wynika, że ​​jest to planowane od pandasjakiegoś czasu (jest też ciekawie wyglądający rosettaprojekt, który właśnie zauważyłem).

Jednak dopóki wszystkie funkcje równoległe nie zostaną włączone pandas, zauważyłem, że bardzo łatwo jest pisać wydajne i nie kopiujące pamięci równoległe rozszerzenia pandasbezpośrednio przy użyciu cython+ OpenMP i C ++.

Oto krótki przykład pisania równoległej sumy grupowej, której użycie wygląda mniej więcej tak:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

a wynik to:

     sum
key     
0      6
1      11
2      4

Uwaga Bez wątpienia funkcjonalność tego prostego przykładu będzie ostatecznie częścią pandas. Jednak niektóre rzeczy będą bardziej naturalne do zrównoleglenia w C ++ przez jakiś czas i ważne jest, aby być świadomym, jak łatwo jest to połączyć pandas.


Aby to zrobić, napisałem proste rozszerzenie pliku o jednym źródle, którego kod następuje.

Zaczyna się od importu i definicji typów

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

C ++ unordered_map służy do sumowania przez pojedynczy wątek, a vectordo sumowania przez wszystkie wątki.

Teraz do funkcji sum. Zaczyna się od wpisanych widoków pamięci dla szybkiego dostępu:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

Funkcja kontynuuje, dzieląc pół-równo na wątki (tutaj zakodowane na stałe do 4), a każdy wątek sumuje wpisy w swoim zakresie:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

Po zakończeniu wątków funkcja scala wszystkie wyniki (z różnych zakresów) w jeden unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

Pozostało tylko utworzyć DataFramei zwrócić wyniki:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
Ami Tavory
źródło