Mam do czynienia z dość dużą Pandas DataFrame - mój zestaw danych przypomina następującą df
konfigurację:
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used
#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
{ 'measurement_id': np.repeat( [0, 1], repeats = [ R3, R4 ] ),
'time':np.concatenate( [ np.repeat( A1, repeats = R1 ),
np.repeat( A2, repeats = R1 ) ] ),
'group': np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
'object': np.tile( np.arange( 0, R1 ), T )
}
)
#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
.explode() \
.astype( 'float' ) \
.to_frame( 'var' ) \
.reset_index( drop = True )
], axis = 1
)
Uwaga: W celu uzyskania minimalnego przykładu można go łatwo podzielić (na przykład za pomocą df.loc[df['time'] <= 400, :]
), ale ponieważ i tak symuluję dane, pomyślałem, że oryginalny rozmiar da lepszy przegląd.
Dla każdej grupy zdefiniowanej przez ['measurement_id', 'time', 'group']
muszę wywołać następującą funkcję:
from sklearn.cluster import SpectralClustering
from pandarallel import pandarallel
def cluster( x, index ):
if len( x ) >= 2:
data = np.asarray( x )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.Series( clustering.labels_ + 1, index = index )
else:
return pd.Series( np.nan, index = index )
Aby poprawić wydajność, wypróbowałem dwa podejścia:
Pakiet Pandarallel
Pierwsze podejście polegało na zrównolegleniu obliczeń za pomocą pandarallel
pakietu:
pandarallel.initialize( progress_bar = True )
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.parallel_apply( lambda x: cluster( x['var'], x['object'] ) )
Wydaje się to jednak nieoptymalne, ponieważ zużywa dużo pamięci RAM i nie wszystkie rdzenie są używane w obliczeniach (nawet pomimo wyraźnej liczby rdzeni w pandarallel.initialize()
metodzie). Czasami obliczenia są przerywane różnymi błędami, chociaż nie miałem okazji znaleźć przyczyny tego (być może brak pamięci RAM?).
PySpark Pandas UDF
Dałem też Spark Pandas UDF, chociaż jestem zupełnie nowy w Spark. Oto moja próba:
import findspark; findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )
@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
if len( df['var'] ) >= 2:
data = np.asarray( df['var'] )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.DataFrame( clustering.labels_ + 1,
index = df['object']
)
else:
return pd.DataFrame( np.nan,
index = df['object']
)
res = df \
.groupBy( ['id_half', 'frame', 'team_id'] ) \
.apply( cluster ) \
.toPandas()
Niestety wydajność również była niezadowalająca, a z tego, co przeczytałem na ten temat, może to być tylko ciężar korzystania z funkcji UDF napisanej w Pythonie i związana z tym potrzeba konwersji wszystkich obiektów Python na obiekty Spark i odwrotnie.
Oto moje pytania:
- Czy jedno z moich podejść może zostać dostosowane, aby wyeliminować ewentualne wąskie gardła i poprawić wydajność? (np. konfiguracja PySpark, dostosowywanie nieoptymalnych operacji itp.)
- Czy są jakieś lepsze alternatywy? Jak różnią się one od dostarczonych rozwiązań pod względem wydajności?
dask
((więc mój komentarz to tylko rada do badań.Odpowiedzi:
+1
do wymieniania instalacyjny dodatku na koszty ogólne dla obu strategii obliczeniowej. To zawsze stanowi próg rentowności, tylko wtedy, gdy brak[SERIAL]
strategii może osiągnąć jakąkolwiek korzystną radość z pewnego[TIME]
przyspieszenia -Domain (jeszcze, jeśli inne, zwykle[SPACE]
-Domain koszty pozwalają lub pozostają wykonalne - tak, RAM. ... istnienie takiego urządzenia, budżetu i innych podobnych ograniczeń w świecie rzeczywistym i dostęp do nich)Po pierwsze,
kontrola przed lotem, przed startem
. Nowe, ściśle określone sformułowanie Prawa Amdahla jest obecnie w stanie uwzględnić oba te dodatkowe
pSO + pTO
koszty ogólne i odzwierciedla je w przewidywaniu osiągalnych poziomów Przyspieszenia, w tym progu rentowności punkt, od którego może stać się sensowne (w sensie kosztów / efektu, w sensie wydajności) równoległe.To jednak nie
jest nasz główny problem . To jest następne:
Następnie,
biorąc pod uwagę koszty obliczeniowe związane z
SpectralClustering()
użyciem jądra Radial Boltzmann Function,~ exp( -gamma * distance( data, data )**2 )
wydaje się, że nie ma żadnych postępów w podzialedata
-object na dowolną liczbę rozłącznych jednostek roboczych, ponieważ elementdistance( data, data )
z definicji ma jedynie odwiedź wszystkiedata
-elementy (zob. koszty komunikacyjne{ process | node }
topologii dystrybuowanych z dowolną do dowolnej wartości są, z oczywistych powodów, strasznie złe, jeśli nie najgorsze przypadki użycia do{ process | node }
dystrybucji rozproszonej, jeśli nie proste anty-wzorce (z wyjątkiem niektórych rzeczywiście tajemnych, pozbawionych pamięci / bezstanowych, a jednak obliczeniowych struktur).W przypadku pedantycznych analityków tak - dodaj do tego (i możemy już powiedzieć, że zły stan) koszty - znowu - przetwarzania typu k-oznacza -dowolny , w
O( N^( 1 + 5 * 5 ) )
tym przypadkuN ~ len( data ) ~ 1.12E6+
, jest to bardzo sprzeczne z naszą wolą posiadania inteligentne i szybkie przetwarzanie.Więc co?
Natomiast koszty instalacji nie są zaniedbane, że zwiększone koszty komunikacji będzie prawie na pewno wyłączyć wszystkie ulepszenia z użyciem wyżej naszkicowanych próby przenieść z wyłącznie przy pomocy
[SERIAL]
strumienia procesowego w jakiejś formie tylko -[CONCURRENT]
lub prawdziwie-[PARALLEL]
orkiestracji niektórych-podjednostek pracy , ze względu na zwiększone koszty ogólne związane z koniecznością implementacji (para tandemowa) topologii przekazywania dowolnej wartości.Gdyby nie dla nich?
Cóż, to brzmi jak oksymoron Informatyki - nawet gdyby to było możliwe, koszty dowolnych wcześniej obliczonych odległości (co
[TIME]
zabrałoby te ogromne koszty Domeny „wcześniej” (Gdzie? Jak? Czy są jakieś inne, nieuniknione opóźnienie, pozwalające na ewentualne maskowanie opóźnień przez niektóre (nieznane do tej pory) przyrostowe narastanie kompletnej w przyszłości macierzy odległości typu dowolny w dowolnym miejscu?)), ale przestawiłoby te zasadniczo obecne koszty na inną lokalizację w[TIME]
- i[SPACE]
-Domains, nie zmniejszaj ich.Jak dotąd zdaję sobie sprawę, że jedynym rozwiązaniem jest próba przeformułowania problemu na inny, sformułowany przez QUBO, problematyczny sposób postępowania (zob .: Q uantum- U nieskrępowany- B inary- O ptimizacja , dobrą wiadomością jest to, że istnieją do tego narzędzia, baza wiedzy z pierwszej ręki oraz praktyczne doświadczenie w rozwiązywaniu problemów.
Wydajność zapiera dech w piersiach - problem sformułowany przez QUBO ma obiecujący
O(1)
(!) Solver w stałym czasie (w[TIME]
-Domain) i nieco ograniczony w[SPACE]
-Domain (gdzie niedawno ogłoszone sztuczki LLNL mogą pomóc uniknąć tego świata fizycznego, bieżącej implementacji QPU, ograniczenia problemu rozmiary).źródło
To nie jest odpowiedź, ale ...
Jeśli uciekniesz
(tj. tylko z Pandami), zauważysz, że już używasz kilku rdzeni. Wynika to z tego, że domyślnie
sklearn
korzystajoblib
z pracy równoległej. Państwo może zamienić się harmonogramu na rzecz dask i być może uzyskać większą wydajność na dzielenie danych pomiędzy wątkami, ale tak długo, jak praca, którą wykonujesz jest CPU-bound tak, nie będzie nic nie można zrobić, aby ją przyspieszyć.Krótko mówiąc, jest to problem z algorytmem: dowiedz się, co naprawdę musisz obliczyć, zanim spróbujesz rozważyć inne ramy obliczeniowe.
źródło
joblib
odrodzone procesy , które nie mają nic wspólnego z wątkami, tym mniej z udostępnianiem? Dziękuję za miłe wyjaśnienie argumentów.Nie jestem ekspertem od
Dask
, ale jako podstawowy kod podaję następujący kod:źródło