Większość operacji RDD jest leniwych. Pomyśl o RDD jako opisie serii operacji. RDD to nie dane. Więc ta linia:
val textFile = sc.textFile("/user/emp.txt")
To nic nie robi. Tworzy RDD, który mówi „będziemy musieli załadować ten plik”. W tym momencie plik nie jest załadowany.
Operacje RDD, które wymagają obserwacji zawartości danych, nie mogą być leniwe. (Nazywa się to akcjami ). Przykład: RDD.count
aby podać liczbę wierszy w pliku, plik musi zostać odczytany. Więc jeśli piszesz textFile.count
, w tym momencie plik zostanie odczytany, wiersze zostaną policzone, a liczba zostanie zwrócona.
A jeśli zadzwonisz textFile.count
ponownie? To samo: plik zostanie ponownie odczytany i policzony. Nic nie jest przechowywane. RDD to nie dane.
Więc co robi RDD.cache
? Jeśli dodasz textFile.cache
do powyższego kodu:
val textFile = sc.textFile("/user/emp.txt")
textFile.cache
To nic nie robi. RDD.cache
jest również leniwą operacją. Plik nadal nie jest odczytywany. Ale teraz RDD mówi „przeczytaj ten plik i buforuj zawartość”. Jeśli następnie uruchomisz textFile.count
go po raz pierwszy, plik zostanie załadowany, zbuforowany i policzony. Jeśli zadzwonisz textFile.count
po raz drugi, operacja użyje pamięci podręcznej. Po prostu pobierze dane z pamięci podręcznej i policzy linie.
Zachowanie pamięci podręcznej zależy od dostępnej pamięci. Jeśli na przykład plik nie mieści się w pamięci, textFile.count
powróci do zwykłego zachowania i ponownie odczyta plik.
perisist
i wybranie opcji przechowywania, która umożliwia rozlewanie danych z pamięci podręcznej na dysk.Myślę, że pytanie byłoby lepiej sformułowane jako:
Kiedy musimy wywołać pamięć podręczną lub utrwalić na RDD?
Procesy Spark są leniwe, to znaczy nic się nie wydarzy, dopóki nie będzie to wymagane. Aby szybko odpowiedzieć na pytanie, po
val textFile = sc.textFile("/user/emp.txt")
wydaniu danych nic się nie dzieje, tylkoHadoopRDD
konstruuje się plik, wykorzystując plik jako źródło.Powiedzmy, że trochę przekształcamy te dane:
Znowu nic się nie dzieje z danymi. Teraz jest nowy RDD,
wordsRDD
który zawiera odniesienietestFile
i funkcję do zastosowania w razie potrzeby.Tylko wtedy, gdy akcja jest wywoływana na RDD, jak
wordsRDD.count
łańcuch RDD, nazywany rodowodem, zostanie wykonany. Oznacza to, że dane podzielone na partycje zostaną załadowane przez moduły wykonawcze klastra Spark,flatMap
funkcja zostanie zastosowana, a wynik zostanie obliczony.Linii liniowej, takiej jak w tym przykładzie,
cache()
nie potrzeba. Dane zostaną załadowane do executorów, wszystkie transformacje zostaną zastosowane i na konieccount
zostanie obliczony, wszystko w pamięci - jeśli dane mieszczą się w pamięci.cache
jest przydatne, gdy linia rodowa RDD się rozgałęzia. Powiedzmy, że chcesz przefiltrować słowa z poprzedniego przykładu do liczby słów pozytywnych i negatywnych. Możesz zrobić to tak:Tutaj każda gałąź ponownie ładuje dane. Dodanie wyraźnej
cache
instrukcji zapewni, że przetwarzanie wykonane wcześniej zostanie zachowane i ponownie użyte. Praca będzie wyglądać następująco:Z tego powodu
cache
mówi się, że „przerywa linię”, ponieważ tworzy punkt kontrolny, który można ponownie wykorzystać do dalszego przetwarzania.Praktyczna zasada: użyj,
cache
gdy linia rodowa twojego RDD rozgałęzia się lub gdy RDD jest używany wiele razy, jak w pętli.źródło
spark.storage.memoryFraction
. Jeśli chodzi o to, który executor ma jakie dane, RDD będzie śledził swoje partycje, które są rozprowadzane na modułach wykonawczych.cache
niepersist
mogę złamać rodowodu .Czy musimy jawnie wywoływać „pamięć podręczną” lub „utrwalać”, aby przechowywać dane RDD w pamięci?
Tak, tylko w razie potrzeby.
Dane RDD przechowywane w sposób rozproszony domyślnie w pamięci?
Nie!
A oto powody, dla których:
Spark obsługuje dwa typy współdzielonych zmiennych: zmienne emisji, których można używać do buforowania wartości w pamięci we wszystkich węzłach, oraz akumulatory, czyli zmienne, do których można tylko „dodawać”, takie jak liczniki i sumy.
RDD obsługują dwa typy operacji: transformacje, które tworzą nowy zbiór danych z istniejącego oraz akcje, które zwracają wartość do programu sterownika po wykonaniu obliczeń na zbiorze danych. Na przykład map to transformacja, która przekazuje każdy element zestawu danych przez funkcję i zwraca nowy RDD reprezentujący wyniki. Z drugiej strony, redukuj jest akcją, która agreguje wszystkie elementy RDD za pomocą jakiejś funkcji i zwraca wynik końcowy do programu sterownika (chociaż istnieje również równoległeredredByKey, które zwraca rozproszony zestaw danych).
Wszystkie transformacje w Spark są leniwe, ponieważ nie obliczają od razu swoich wyników. Zamiast tego po prostu zapamiętują transformacje zastosowane do jakiegoś podstawowego zbioru danych (np. Pliku). Transformacje są obliczane tylko wtedy, gdy akcja wymaga zwrócenia wyniku do programu sterownika. Ten projekt umożliwia wydajniejsze działanie platformy Spark - na przykład możemy zdać sobie sprawę, że zestaw danych utworzony za pomocą mapy zostanie użyty w celu zmniejszenia i zwrócenia tylko wyniku redukcji do sterownika, a nie większego zmapowanego zestawu danych.
Domyślnie każdy przekształcony RDD może zostać przeliczony za każdym razem, gdy wykonujesz na nim akcję. Jednak możesz również utrwalić RDD w pamięci za pomocą metody utrwalania (lub pamięci podręcznej), w takim przypadku Spark zachowa elementy w klastrze, aby uzyskać znacznie szybszy dostęp podczas następnego zapytania. Dostępna jest również obsługa utrwalania RDD na dysku lub replikacji w wielu węzłach.
Aby uzyskać więcej informacji, zapoznaj się z przewodnikiem programowania Spark .
źródło
Poniżej znajdują się trzy sytuacje, w których należy buforować swoje RDD:
źródło
Dodanie kolejnego powodu, aby dodać (lub tymczasowo dodać)
cache
wywołanie metody.w przypadku problemów z pamięcią debugowania
ze
cache
sposobu, iskra da debugowania informacje dotyczące wielkości RDD. więc w zintegrowanym interfejsie użytkownika Spark otrzymasz informacje o zużyciu pamięci RDD. co okazało się bardzo pomocne w diagnozowaniu problemów z pamięcią.źródło