Jak wydrukować zawartość RDD?

124

Próbuję wydrukować zawartość kolekcji do konsoli Spark.

Mam typ:

linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]

I używam polecenia:

scala> linesWithSessionId.map(line => println(line))

Ale to jest drukowane:

res1: org.apache.spark.rdd.RDD [Unit] = zmapowanyRDD [4] na mapie w: 19

Jak mogę napisać RDD na konsoli lub zapisać go na dysku, aby móc przeglądać jego zawartość?

niebieskie niebo
źródło
1
Cześć! czy przeczytałeś komentarze do odpowiedzi, które zostały przez Ciebie przyjęte? Wydaje się to mylące
dk14
2
@ dk14 zgodził się, ponownie przypisałem zaakceptowaną odpowiedź
blue-sky
RDD są zdegradowani jako obywatele drugiej kategorii, powinieneś użyć DataFrame i showmetody.
Thomas Decaux

Odpowiedzi:

235

Jeśli chcesz wyświetlić zawartość RDD, jednym ze sposobów jest użycie collect():

myRDD.collect().foreach(println)

Nie jest to jednak dobry pomysł, skoro RDD ma miliardy linii. Użyj, take()aby wydrukować tylko kilka:

myRDD.take(n).foreach(println)
Oussama
źródło
1
Jeśli użyję foreach na RDD (który ma miliony wierszy) do zapisania zawartości w HDFS jako pojedynczego pliku, czy będzie działać bez żadnych problemów w klastrze?
Shankar,
Powodem, dla którego nie używam saveAsTextFilena RDD, jest to, że muszę zapisać zawartość RDD w więcej niż jednym pliku, dlatego używamforeach
Shankar
Jeśli chcesz zapisać w jednym pliku, możesz połączyć RDD w jedną partycję przed wywołaniem metody saveAsTextFile, ale znowu może to powodować problemy. Myślę, że najlepszą opcją jest zapisywanie w wielu plikach w HDFS, a następnie użycie hdfs dfs --getmerge w celu scalenia plików
Oussama
powiedziałeś, że jeśli użyjesz foreach na RDD, zachowa go on w pamięci RAM sterownika, czy oświadczenie jest prawidłowe? ponieważ zrozumiałem, że foreach będzie działać na każdym module roboczym [klastrze], a nie na sterowniku.
Shankar
saveAsTextFile zapisze jeden plik na partycję, czyli to, czego chcesz (wiele plików). W przeciwnym razie, jak sugeruje Oussama, możesz wykonać rdd.coalesce (1) .saveAsTextFile (), aby uzyskać jeden plik. Jeśli RDD ma zbyt mało partycji dla swoich potrzeb, można spróbować rdd.repartition (N) .saveAsTextFile ()
foghorn
49

Ta mapfunkcja jest transformacją , co oznacza, że ​​Spark nie oceni twojego RDD, dopóki nie wykonasz na nim akcji .

Aby go wydrukować, możesz użyć foreach(co jest akcją):

linesWithSessionId.foreach(println)

Aby zapisać go na dysku, możesz użyć jednej z saveAs...funkcji (nadal działań) z RDD API

fedragon
źródło
6
Może trzeba o tym wspomnieć, collectżeby RDD można było wydrukować w konsoli.
zsxwing
1
foreachsam najpierw "zmaterializuje" RDD, a następnie uruchomi się printlnna każdym elemencie, więc collectnie jest tu naprawdę potrzebny (chociaż możesz go oczywiście użyć) ...
fedragon
5
Właściwie bez metody collect (), przed foreach, nie widzę niczego na konsoli.
Vittorio Cozzolino
3
W rzeczywistości działa całkowicie dobrze w mojej powłoce Spark, nawet w wersji 1.2.0. Ale myślę, że wiem, skąd bierze się to zamieszanie: pierwotne pytanie dotyczyło sposobu wydrukowania RDD na konsoli Spark (= powłoka), więc założyłem, że uruchomi zadanie lokalne, w którym to przypadku foreachdziała dobrze. Jeśli uruchamiasz zadanie w klastrze i chcesz wydrukować swoje rdd, powinieneś collect(jak wskazują inne komentarze i odpowiedzi), aby zostało wysłane do sterownika przed printlnwykonaniem. Używanie takezgodnie z sugestią Oussamy może być dobrym pomysłem, jeśli twój RDD jest zbyt duży.
fedragon
6
Powyższa odpowiedź jest zła. Powinieneś to zaakceptować. Zawsze nie będzie drukować na konsoli, będzie drukować na węzłach roboczych. Jeśli masz tylko jeden węzeł, każdy będzie działał. Ale jeśli masz tylko jeden węzeł, dlaczego używasz Spark? Po prostu użyj SQL awk, Grep lub czegoś znacznie prostszego. Więc myślę, że jedyną poprawną odpowiedzią jest zebranie. Jeśli kolekcja jest dla Ciebie zbyt duża i chcesz tylko pobrać próbkę, użyj funkcji pobierania lub głowy lub podobnych funkcji opisanych poniżej.
eshalev,
12

Jeśli uruchamiasz to w klastrze println, nie drukujesz z powrotem do twojego kontekstu. Musisz przynieść RDDdane do swojej sesji. Aby to zrobić, możesz zmusić go do lokalnej tablicy, a następnie wydrukować:

linesWithSessionId.toArray().foreach(line => println(line))
Noe
źródło
12

Można konwertować RDDDo DataFramepotem show()ona.

// For implicit conversion from RDD to DataFrame
import spark.implicits._

fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)])

// convert to DF then show it
fruits.toDF().show()

Spowoduje to wyświetlenie 20 pierwszych wierszy danych, więc rozmiar danych nie powinien stanowić problemu.

+------+---+                                                                    
|    _1| _2|
+------+---+
| apple|  1|
|banana|  2|
|orange| 17|
+------+---+
Wesam
źródło
1
Myślę, że takimport spark.implicits._
Ryan Hartman
Z jakiej biblioteki korzystano tutaj? Nie mogę wykryć toDFani spark.implicits._w zakresie iskier.
Sergii
1

Prawdopodobnie istnieje wiele różnic architektonicznych pomiędzy myRDD.foreach(println)i myRDD.collect().foreach(println)(nie tylko „zbieraj”, ale także inne działania). Jedną z różnic, które zauważyłem, jest to myRDD.foreach(println), że dane wyjściowe będą w losowej kolejności. Na przykład: jeśli mój rdd pochodzi z pliku tekstowego, w którym każda linia ma numer, dane wyjściowe będą miały inną kolejność. Ale kiedy to zrobiłem myRDD.collect().foreach(println), porządek pozostaje taki, jak plik tekstowy.

Karan Gupta
źródło
1

W Pythonie

   linesWithSessionIdCollect = linesWithSessionId.collect()
   linesWithSessionIdCollect

Spowoduje to wydrukowanie całej zawartości RDD

Niranjan Molkeri
źródło
1
Dzięki, ale oznaczyłem to pytanie tagiem scala, a nie python
blue-sky
1
c.take(10)

a nowsza wersja Spark ładnie pokaże tabelę.

Harvey
źródło
1

Zamiast wpisywać za każdym razem, możesz;

[1] Utwórz ogólną metodę drukowania wewnątrz Spark Shell.

def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println)

[2] Lub nawet lepiej, używając implicits, możesz dodać funkcję do klasy RDD, aby wydrukować jej zawartość.

implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) {
    def print = rdd.foreach(println)
}

Przykładowe użycie:

val rdd = sc.parallelize(List(1,2,3,4)).map(_*2)

p(rdd) // 1
rdd.print // 2

Wynik:

2
6
4
8

Ważny

Ma to sens tylko wtedy, gdy pracujesz w trybie lokalnym i z małą ilością zestawu danych. W przeciwnym razie albo nie będzie można zobaczyć wyników na kliencie, albo zabraknie pamięci z powodu wyniku dużego zbioru danych.

noego
źródło
0

Możesz również zapisać jako plik: rdd.saveAsTextFile("alicia.txt")

Thomas Decaux
źródło
0

W składni java:

rdd.collect().forEach(line -> System.out.println(line));
ForeverLearner
źródło