Zastosowanie funkcji Python do zgrupowanej Pandas DataFrame - jakie jest najbardziej efektywne podejście do przyspieszenia obliczeń?

9

Mam do czynienia z dość dużą Pandas DataFrame - mój zestaw danych przypomina następującą dfkonfigurację:

import pandas as pd
import numpy  as np

#--------------------------------------------- SIZING PARAMETERS :
R1 =                    20        # .repeat( repeats = R1 )
R2 =                    10        # .repeat( repeats = R2 )
R3 =                541680        # .repeat( repeats = [ R3, R4 ] )
R4 =                576720        # .repeat( repeats = [ R3, R4 ] )
T  =                 55920        # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used

#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
         { 'measurement_id':        np.repeat( [0, 1], repeats = [ R3, R4 ] ), 
           'time':np.concatenate( [ np.repeat( A1,     repeats = R1 ),
                                    np.repeat( A2,     repeats = R1 ) ] ), 
           'group':        np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
           'object':       np.tile( np.arange( 0, R1 ),                T )
           }
        )

#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
                  df                                                  \
                    .groupby( ['measurement_id', 'time', 'group'] )    \
                    .apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
                    .explode()                                           \
                    .astype( 'float' )                                    \
                    .to_frame( 'var' )                                     \
                    .reset_index( drop = True )
                  ], axis = 1
                )

Uwaga: W celu uzyskania minimalnego przykładu można go łatwo podzielić (na przykład za pomocą df.loc[df['time'] <= 400, :]), ale ponieważ i tak symuluję dane, pomyślałem, że oryginalny rozmiar da lepszy przegląd.

Dla każdej grupy zdefiniowanej przez ['measurement_id', 'time', 'group']muszę wywołać następującą funkcję:

from sklearn.cluster import SpectralClustering
from pandarallel     import pandarallel

def cluster( x, index ):
    if len( x ) >= 2:
        data = np.asarray( x )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.Series( clustering.labels_ + 1, index = index )
    else:
        return pd.Series( np.nan, index = index )

Aby poprawić wydajność, wypróbowałem dwa podejścia:

Pakiet Pandarallel

Pierwsze podejście polegało na zrównolegleniu obliczeń za pomocą pandarallelpakietu:

pandarallel.initialize( progress_bar = True )
df \
  .groupby( ['measurement_id', 'time', 'group'] ) \
  .parallel_apply( lambda x: cluster( x['var'], x['object'] ) )

Wydaje się to jednak nieoptymalne, ponieważ zużywa dużo pamięci RAM i nie wszystkie rdzenie są używane w obliczeniach (nawet pomimo wyraźnej liczby rdzeni w pandarallel.initialize()metodzie). Czasami obliczenia są przerywane różnymi błędami, chociaż nie miałem okazji znaleźć przyczyny tego (być może brak pamięci RAM?).

PySpark Pandas UDF

Dałem też Spark Pandas UDF, chociaż jestem zupełnie nowy w Spark. Oto moja próba:

import findspark;  findspark.init()

from pyspark.sql           import SparkSession
from pyspark.conf          import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types     import *

spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )

@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
    if len( df['var'] ) >= 2:
        data = np.asarray( df['var'] )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.DataFrame( clustering.labels_ + 1,
                             index = df['object']
                             )
    else:
        return pd.DataFrame( np.nan,
                             index = df['object']
                             )

res = df                                           \
        .groupBy( ['id_half', 'frame', 'team_id'] ) \
        .apply( cluster )                            \
        .toPandas()

Niestety wydajność również była niezadowalająca, a z tego, co przeczytałem na ten temat, może to być tylko ciężar korzystania z funkcji UDF napisanej w Pythonie i związana z tym potrzeba konwersji wszystkich obiektów Python na obiekty Spark i odwrotnie.

Oto moje pytania:

  1. Czy jedno z moich podejść może zostać dostosowane, aby wyeliminować ewentualne wąskie gardła i poprawić wydajność? (np. konfiguracja PySpark, dostosowywanie nieoptymalnych operacji itp.)
  2. Czy są jakieś lepsze alternatywy? Jak różnią się one od dostarczonych rozwiązań pod względem wydajności?
Kuba_
źródło
2
czy badałeś dask ?
Danila Ganchar
1
Jeszcze nie, ale dziękuję za sugestię
spróbuję
niestety nie współpracowałem dask((więc mój komentarz to tylko rada do badań.
Danila Ganchar
Przez wydajność rozumiałem czas, w którym można zakończyć obliczenia.
Kuba_

Odpowiedzi:

1

P : Czy można dostosować jedno z moich podejść, aby wyeliminować ewentualne wąskie gardła i poprawić wydajność? (Np. Konfiguracja PySpark, dostosowanie nieoptymalnych operacji itp.)

+1do wymieniania instalacyjny dodatku na koszty ogólne dla obu strategii obliczeniowej. To zawsze stanowi próg rentowności, tylko wtedy, gdy brak [SERIAL]strategii może osiągnąć jakąkolwiek korzystną radość z pewnego [TIME]przyspieszenia -Domain (jeszcze, jeśli inne, zwykle [SPACE]-Domain koszty pozwalają lub pozostają wykonalne - tak, RAM. ... istnienie takiego urządzenia, budżetu i innych podobnych ograniczeń w świecie rzeczywistym i dostęp do nich)

Po pierwsze,
kontrola przed lotem, przed startem

. Nowe, ściśle określone sformułowanie Prawa Amdahla jest obecnie w stanie uwzględnić oba te dodatkowe pSO + pTOkoszty ogólne i odzwierciedla je w przewidywaniu osiągalnych poziomów Przyspieszenia, w tym progu rentowności punkt, od którego może stać się sensowne (w sensie kosztów / efektu, w sensie wydajności) równoległe.

wprowadź opis zdjęcia tutaj

To jednak nie
jest nasz główny problem . To jest następne:

Następnie,
biorąc pod uwagę koszty obliczeniowe związane z SpectralClustering()użyciem jądra Radial Boltzmann Function, ~ exp( -gamma * distance( data, data )**2 )wydaje się, że nie ma żadnych postępów w podziale data-object na dowolną liczbę rozłącznych jednostek roboczych, ponieważ element distance( data, data )z definicji ma jedynie odwiedź wszystkie data-elementy (zob. koszty komunikacyjne { process | node }topologii dystrybuowanych z dowolną do dowolnej wartości są, z oczywistych powodów, strasznie złe, jeśli nie najgorsze przypadki użycia do { process | node }dystrybucji rozproszonej, jeśli nie proste anty-wzorce (z wyjątkiem niektórych rzeczywiście tajemnych, pozbawionych pamięci / bezstanowych, a jednak obliczeniowych struktur).

W przypadku pedantycznych analityków tak - dodaj do tego (i możemy już powiedzieć, że zły stan) koszty - znowu - przetwarzania typu k-oznacza -dowolny , w O( N^( 1 + 5 * 5 ) )tym przypadku N ~ len( data ) ~ 1.12E6+, jest to bardzo sprzeczne z naszą wolą posiadania inteligentne i szybkie przetwarzanie.

Więc co?

Natomiast koszty instalacji nie są zaniedbane, że zwiększone koszty komunikacji będzie prawie na pewno wyłączyć wszystkie ulepszenia z użyciem wyżej naszkicowanych próby przenieść z wyłącznie przy pomocy [SERIAL]strumienia procesowego w jakiejś formie tylko - [CONCURRENT]lub prawdziwie- [PARALLEL]orkiestracji niektórych-podjednostek pracy , ze względu na zwiększone koszty ogólne związane z koniecznością implementacji (para tandemowa) topologii przekazywania dowolnej wartości.

Gdyby nie dla nich?

Cóż, to brzmi jak oksymoron Informatyki - nawet gdyby to było możliwe, koszty dowolnych wcześniej obliczonych odległości (co [TIME]zabrałoby te ogromne koszty Domeny „wcześniej” (Gdzie? Jak? Czy są jakieś inne, nieuniknione opóźnienie, pozwalające na ewentualne maskowanie opóźnień przez niektóre (nieznane do tej pory) przyrostowe narastanie kompletnej w przyszłości macierzy odległości typu dowolny w dowolnym miejscu?)), ale przestawiłoby te zasadniczo obecne koszty na inną lokalizację w [TIME]- i [SPACE]-Domains, nie zmniejszaj ich.

P : „Czy są jakieś lepsze alternatywy?

Jak dotąd zdaję sobie sprawę, że jedynym rozwiązaniem jest próba przeformułowania problemu na inny, sformułowany przez QUBO, problematyczny sposób postępowania (zob .: Q uantum- U nieskrępowany- B inary- O ptimizacja , dobrą wiadomością jest to, że istnieją do tego narzędzia, baza wiedzy z pierwszej ręki oraz praktyczne doświadczenie w rozwiązywaniu problemów.

P : Jak wypada porównanie z dostarczonymi rozwiązaniami pod względem wydajności?

Wydajność zapiera dech w piersiach - problem sformułowany przez QUBO ma obiecujący O(1)(!) Solver w stałym czasie (w [TIME]-Domain) i nieco ograniczony w [SPACE]-Domain (gdzie niedawno ogłoszone sztuczki LLNL mogą pomóc uniknąć tego świata fizycznego, bieżącej implementacji QPU, ograniczenia problemu rozmiary).

użytkownik3666197
źródło
To ciekawa odpowiedź, ale wydaje się, że nie trafia w sedno - OP szkoli wiele małych modeli, a nie jednego. Zatem twoja główna obserwacja jest w większości nieistotna.
user10938362
@ user10938362 W jaki sposób twoja własność będąca przedmiotem roszczenia (szkolenie małych modeli) przekłada się na inne niż wyżej podane wskaźniki kosztów przetwarzania? Pewnie, że wiele mniejszych modeli obiecuje teoretycznie po prostu liniowo rosnącą sumę (wciąż) dużych kosztów O pojedynczego (teraz mniejszego w N, ale nie w innych czynnikach) przetwarzania, jednak musisz dodać do tego o wiele droższą sumę wszystkich koszty dodatkowe zarówno kosztów ogólnych instalacji i zakończenia, plus wszystkie dodatkowe koszty ogólne komunikacji (parametry / dane / wyniki + zazwyczaj również pary kosztów przetwarzania SER / DES na każdym etapie)
3666197
0

To nie jest odpowiedź, ale ...

Jeśli uciekniesz

df.groupby(['measurement_id', 'time', 'group']).apply(
    lambda x: cluster(x['var'], x['object']))

(tj. tylko z Pandami), zauważysz, że już używasz kilku rdzeni. Wynika to z tego, że domyślnie sklearnkorzysta joblibz pracy równoległej. Państwo może zamienić się harmonogramu na rzecz dask i być może uzyskać większą wydajność na dzielenie danych pomiędzy wątkami, ale tak długo, jak praca, którą wykonujesz jest CPU-bound tak, nie będzie nic nie można zrobić, aby ją przyspieszyć.

Krótko mówiąc, jest to problem z algorytmem: dowiedz się, co naprawdę musisz obliczyć, zanim spróbujesz rozważyć inne ramy obliczeniowe.

mdurant
źródło
Czy możesz wyjaśnić, dlaczego wspominasz „… udostępnianie danych między wątkami …”, kiedy podział pracy został zorganizowany przez joblibodrodzone procesy , które nie mają nic wspólnego z wątkami, tym mniej z udostępnianiem? Dziękuję za miłe wyjaśnienie argumentów.
user3666197
Dokładnie, jboblib zwykle używa procesów, ale może alternatywnie użyć dask jako backendu, w którym możesz wybrać swoją mieszankę wątków i procesów.
mdurant
Jestem trochę nowicjuszem w dziedzinie obliczeń równoległych, ale nawet jeśli sklearn korzysta z równoległości, czy nie jest to bezużyteczne w tych ustawieniach? Mam na myśli, że operacje wykonywane przez sklearn są niezwykle proste, ponieważ każda operacja grupowania jest stosowana tylko do 10 punktów. Ponownie mogę się mylić, ale myślę, że sposób, w jaki równolegle przetwarzamy fragmenty oryginalnych danych, jest prawdziwym problemem.
Kuba_
„czy to nie jest bezużyteczne w tych ustawieniach” - cóż, zamiast 1
używamy
0

Nie jestem ekspertem od Dask, ale jako podstawowy kod podaję następujący kod:

import dask.dataframe as ddf

df = ddf.from_pandas(df, npartitions=4) # My PC has 4 cores

task = df.groupby(["measurement_id", "time", "group"]).apply(
    lambda x: cluster(x["var"], x["object"]),
    meta=pd.Series(np.nan, index=pd.Series([0, 1, 1, 1])),
)

res = task.compute()
pętla
źródło