(Dlaczego) musimy wywołać pamięć podręczną lub pozostać na RDD

171

Kiedy elastyczny rozproszony zestaw danych (RDD) jest tworzony z pliku tekstowego lub kolekcji (lub z innego RDD), czy musimy jawnie wywoływać „pamięć podręczną” lub „utrwalanie”, aby przechowywać dane RDD w pamięci? A może dane RDD są domyślnie przechowywane w pamięci w sposób rozproszony?

val textFile = sc.textFile("/user/emp.txt")

Zgodnie z moim rozumieniem, po powyższym kroku textFile jest RDD i jest dostępny we wszystkich / niektórych pamięci węzła.

Jeśli tak, dlaczego w takim razie musimy wywoływać „cache” lub „persist” w RDD textFile?

Ramana
źródło

Odpowiedzi:

300

Większość operacji RDD jest leniwych. Pomyśl o RDD jako opisie serii operacji. RDD to nie dane. Więc ta linia:

val textFile = sc.textFile("/user/emp.txt")

To nic nie robi. Tworzy RDD, który mówi „będziemy musieli załadować ten plik”. W tym momencie plik nie jest załadowany.

Operacje RDD, które wymagają obserwacji zawartości danych, nie mogą być leniwe. (Nazywa się to akcjami ). Przykład: RDD.countaby podać liczbę wierszy w pliku, plik musi zostać odczytany. Więc jeśli piszesz textFile.count, w tym momencie plik zostanie odczytany, wiersze zostaną policzone, a liczba zostanie zwrócona.

A jeśli zadzwonisz textFile.countponownie? To samo: plik zostanie ponownie odczytany i policzony. Nic nie jest przechowywane. RDD to nie dane.

Więc co robi RDD.cache? Jeśli dodasz textFile.cachedo powyższego kodu:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

To nic nie robi. RDD.cachejest również leniwą operacją. Plik nadal nie jest odczytywany. Ale teraz RDD mówi „przeczytaj ten plik i buforuj zawartość”. Jeśli następnie uruchomisz textFile.countgo po raz pierwszy, plik zostanie załadowany, zbuforowany i policzony. Jeśli zadzwonisz textFile.countpo raz drugi, operacja użyje pamięci podręcznej. Po prostu pobierze dane z pamięci podręcznej i policzy linie.

Zachowanie pamięci podręcznej zależy od dostępnej pamięci. Jeśli na przykład plik nie mieści się w pamięci, textFile.countpowróci do zwykłego zachowania i ponownie odczyta plik.

Daniel Darabos
źródło
4
Cześć Daniel, - kiedy dzwonisz do pamięci podręcznej, czy to oznacza, że ​​RDD nie jest ładowany ponownie ze źródła (np. Pliku tekstowego) - skąd możesz mieć pewność, że dane z pliku tekstowego są najnowsze, gdy są buforowane? (czy Spark to wymyśla, czy jest to ręczna operacja okresowego unpersist (), aby zapewnić, że dane źródłowe zostaną ponownie
obliczone
także - jeśli musisz okresowo przestać działać, - jeśli masz rdd, który jest buforowany, zależny od innego RDD, który jest buforowany, czy musisz cofnąć oba RDD, aby zobaczyć ponownie obliczone wyniki?
andrew.butkus
21
Spark po prostu zakłada, że ​​plik nigdy się nie zmieni. Odczytuje plik w dowolnym momencie i może ponownie odczytać jego części w razie potrzeby później. (Np. Jeśli część danych została wypchnięta z pamięci podręcznej). Więc lepiej utrzymuj niezmienione pliki! Po prostu utwórz nowy plik z nową nazwą, gdy masz nowe dane, a następnie załaduj go jako nowy RDD. Jeśli ciągle otrzymujesz nowe dane, zajrzyj do przesyłania strumieniowego Spark.
Daniel Darabos,
10
Tak. RDD są niezmienne, więc każdy RDD zakłada, że ​​jego zależności są również niezmienne. Spark Streaming umożliwia skonfigurowanie takich drzew, które działają na strumieniu zmian. Ale jeszcze prostszym rozwiązaniem jest zbudowanie drzewa w funkcji, która jako parametr przyjmuje nazwę pliku. Następnie po prostu wywołaj funkcję dla nowego pliku i poof, masz nowe drzewo obliczeń.
Daniel Darabos,
1
@Humoyun: Na karcie Storage w Spark UI możesz zobaczyć, ile każdego RDD jest buforowane. Dane mogą być tak duże, że tylko 40% z nich mieści się w całkowitej pamięci, którą masz do buforowania. Jedną z opcji w tym przypadku jest użycie perisisti wybranie opcji przechowywania, która umożliwia rozlewanie danych z pamięci podręcznej na dysk.
Daniel Darabos,
197

Myślę, że pytanie byłoby lepiej sformułowane jako:

Kiedy musimy wywołać pamięć podręczną lub utrwalić na RDD?

Procesy Spark są leniwe, to znaczy nic się nie wydarzy, dopóki nie będzie to wymagane. Aby szybko odpowiedzieć na pytanie, po val textFile = sc.textFile("/user/emp.txt")wydaniu danych nic się nie dzieje, tylko HadoopRDDkonstruuje się plik, wykorzystując plik jako źródło.

Powiedzmy, że trochę przekształcamy te dane:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Znowu nic się nie dzieje z danymi. Teraz jest nowy RDD, wordsRDDktóry zawiera odniesienie testFilei funkcję do zastosowania w razie potrzeby.

Tylko wtedy, gdy akcja jest wywoływana na RDD, jak wordsRDD.countłańcuch RDD, nazywany rodowodem, zostanie wykonany. Oznacza to, że dane podzielone na partycje zostaną załadowane przez moduły wykonawcze klastra Spark, flatMapfunkcja zostanie zastosowana, a wynik zostanie obliczony.

Linii liniowej, takiej jak w tym przykładzie, cache()nie potrzeba. Dane zostaną załadowane do executorów, wszystkie transformacje zostaną zastosowane i na koniec countzostanie obliczony, wszystko w pamięci - jeśli dane mieszczą się w pamięci.

cachejest przydatne, gdy linia rodowa RDD się rozgałęzia. Powiedzmy, że chcesz przefiltrować słowa z poprzedniego przykładu do liczby słów pozytywnych i negatywnych. Możesz zrobić to tak:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Tutaj każda gałąź ponownie ładuje dane. Dodanie wyraźnej cacheinstrukcji zapewni, że przetwarzanie wykonane wcześniej zostanie zachowane i ponownie użyte. Praca będzie wyglądać następująco:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Z tego powodu cachemówi się, że „przerywa linię”, ponieważ tworzy punkt kontrolny, który można ponownie wykorzystać do dalszego przetwarzania.

Praktyczna zasada: użyj, cachegdy linia rodowa twojego RDD rozgałęzia się lub gdy RDD jest używany wiele razy, jak w pętli.

maasg
źródło
1
Niesamowite. Dzięki. Jeszcze jedno powiązane pytanie. Kiedy będziemy buforować lub utrwalać, dane będą przechowywane w pamięci modułu wykonawczego lub w pamięci węzła roboczego. Jeśli jest to pamięć modułu wykonawczego, sposób, w jaki Spark identyfikuje, który moduł wykonawczy ma dane.
Ramana
1
@RamanaUppala używana jest pamięć executora. Część pamięci executora używanej do buforowania jest kontrolowana przez config spark.storage.memoryFraction. Jeśli chodzi o to, który executor ma jakie dane, RDD będzie śledził swoje partycje, które są rozprowadzane na modułach wykonawczych.
maasg
5
@maasg Popraw mnie, jeśli się mylę, ale ani cachenie persist mogę złamać rodowodu .
zero323
Gdzie byłyby przechowywane wyrazyRDD, gdybyśmy nie mieli instrukcji .cache () w powyższym przykładzie?
sun_dare
co się stanie, jeśli przed policzeniem dwóch połączymy dwie gałęzie z powrotem w jedną rdd i policzymy? w tym przypadku, czy pamięć podręczna jest korzystna?
Xiawei Zhang
30

Czy musimy jawnie wywoływać „pamięć podręczną” lub „utrwalać”, aby przechowywać dane RDD w pamięci?

Tak, tylko w razie potrzeby.

Dane RDD przechowywane w sposób rozproszony domyślnie w pamięci?

Nie!

A oto powody, dla których:

  • Spark obsługuje dwa typy współdzielonych zmiennych: zmienne emisji, których można używać do buforowania wartości w pamięci we wszystkich węzłach, oraz akumulatory, czyli zmienne, do których można tylko „dodawać”, takie jak liczniki i sumy.

  • RDD obsługują dwa typy operacji: transformacje, które tworzą nowy zbiór danych z istniejącego oraz akcje, które zwracają wartość do programu sterownika po wykonaniu obliczeń na zbiorze danych. Na przykład map to transformacja, która przekazuje każdy element zestawu danych przez funkcję i zwraca nowy RDD reprezentujący wyniki. Z drugiej strony, redukuj jest akcją, która agreguje wszystkie elementy RDD za pomocą jakiejś funkcji i zwraca wynik końcowy do programu sterownika (chociaż istnieje również równoległeredredByKey, które zwraca rozproszony zestaw danych).

  • Wszystkie transformacje w Spark są leniwe, ponieważ nie obliczają od razu swoich wyników. Zamiast tego po prostu zapamiętują transformacje zastosowane do jakiegoś podstawowego zbioru danych (np. Pliku). Transformacje są obliczane tylko wtedy, gdy akcja wymaga zwrócenia wyniku do programu sterownika. Ten projekt umożliwia wydajniejsze działanie platformy Spark - na przykład możemy zdać sobie sprawę, że zestaw danych utworzony za pomocą mapy zostanie użyty w celu zmniejszenia i zwrócenia tylko wyniku redukcji do sterownika, a nie większego zmapowanego zestawu danych.

  • Domyślnie każdy przekształcony RDD może zostać przeliczony za każdym razem, gdy wykonujesz na nim akcję. Jednak możesz również utrwalić RDD w pamięci za pomocą metody utrwalania (lub pamięci podręcznej), w takim przypadku Spark zachowa elementy w klastrze, aby uzyskać znacznie szybszy dostęp podczas następnego zapytania. Dostępna jest również obsługa utrwalania RDD na dysku lub replikacji w wielu węzłach.

Aby uzyskać więcej informacji, zapoznaj się z przewodnikiem programowania Spark .

eliasah
źródło
1
To nie odpowiadało na moje pytanie.
Ramana
Co na to nie odpowiada?
eliasah
1
kiedy dane RDD są domyślnie przechowywane w pamięci, dlaczego musimy wywoływać Cache lub Persist?
Ramana
RDD nie są domyślnie przechowywane w pamięci, więc utrwalanie RDD sprawia, że ​​Spark wykonuje transformację szybciej w klastrze
eliasah
2
To dobra odpowiedź, nie wiem, dlaczego została odrzucona. Jest to odpowiedź odgórna, wyjaśniająca, jak działają RDD na podstawie koncepcji wysokiego poziomu. Dodałem kolejną odpowiedź oddolną: zaczynając od „co robi ta linia”. Może łatwiej jest śledzić kogoś, kto dopiero zaczyna przygodę ze Sparkiem.
Daniel Darabos
11

Poniżej znajdują się trzy sytuacje, w których należy buforować swoje RDD:

używając RDD wiele razy

wykonywanie wielu czynności na tym samym RDD

dla długich łańcuchów (lub bardzo kosztownych) przekształceń

rileyss
źródło
7

Dodanie kolejnego powodu, aby dodać (lub tymczasowo dodać) cachewywołanie metody.

w przypadku problemów z pamięcią debugowania

ze cachesposobu, iskra da debugowania informacje dotyczące wielkości RDD. więc w zintegrowanym interfejsie użytkownika Spark otrzymasz informacje o zużyciu pamięci RDD. co okazało się bardzo pomocne w diagnozowaniu problemów z pamięcią.

cynk
źródło