Chcę odczytać kilka plików tekstowych z lokalizacji hdfs i wykonać mapowanie w iteracji za pomocą Spark.
JavaRDD<String> records = ctx.textFile(args[1], 1);
jest w stanie odczytać tylko jeden plik na raz.
Chcę odczytać więcej niż jeden plik i przetworzyć je jako pojedynczy RDD. W jaki sposób?
apache-spark
użytkownik3705662
źródło
źródło
Path
opcje.sc.wholeTextFiles
przydaje się w przypadku danych, które nie są rozdzielanesc.textFile(multipleCommaSeparatedDirs,320)
że prowadzi on do19430
całkowitej liczby zadań zamiast320
... zachowuje się tak,union
co prowadzi również do szalonej liczby zadań z bardzo niskiej równoległościwholeTextFiles
. Jaki jest twój przypadek użycia? Mogę wymyślić obejście, pod warunkiem że używasz tej samej liczby partycji co pliki ...Użyj
union
w następujący sposób:Następnie
bigRdd
jest RDD ze wszystkimi plikami.źródło
Możesz użyć pojedynczego wywołania textFile, aby odczytać wiele plików. Scala:
źródło
sc.textFile(files.mkString(","))
Możesz tego użyć
Najpierw możesz zdobyć bufor / listę ścieżek S3:
Teraz przekaż ten obiekt List do następującego fragmentu kodu, uwaga: sc jest obiektem SQLContext
Teraz masz ostateczną Unified RDD, tj. Df
Opcjonalnie, a także można podzielić na partycje w jednym BigRDD
Ponowne partycjonowanie zawsze działa: D
źródło
W PySpark znalazłem dodatkowy użyteczny sposób na parsowanie plików. Być może w Scali jest odpowiednik, ale nie czuję się wystarczająco dobrze wymyślając tłumaczenie robocze. W rzeczywistości jest to wywołanie textFile z dodanymi etykietami (w poniższym przykładzie klucz = nazwa pliku, wartość = 1 linia z pliku).
„Oznaczony” plik tekstowy
Wejście:
wyjście: tablica z każdym wpisem zawierającym krotkę przy użyciu nazwa-pliku-klucza i wartość = każdy wiersz pliku. (Technicznie, korzystając z tej metody, możesz również użyć innego klucza oprócz nazwy ścieżki do pliku - być może reprezentacji mieszającej, aby zaoszczędzić na pamięci). to znaczy.
Możesz także połączyć ponownie jako listę linii:
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
Lub połącz ponownie całe pliki z powrotem w pojedyncze ciągi znaków (w tym przykładzie wynik jest taki sam, jak wynik z całych plików tekstowych, ale z ciągiem „file:” usuniętym ze ścieżki pliku).
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
źródło
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
dostałem błąd tjTypeError: 'PipelinedRDD' object is not iterable
. Rozumiem, że ta linia tworzy RDD, który jest niezmienny, więc zastanawiałem się, jak udało ci się dołączyć go do innej zmiennej?możesz użyć
tutaj otrzymasz ścieżkę do pliku i zawartość tego pliku. dzięki czemu można wykonać dowolną akcję całego pliku w tym samym czasie, co oszczędza koszty ogólne
źródło
Wszystkie odpowiedzi są poprawne z
sc.textFile
Zastanawiałem się, dlaczego nie
wholeTextFiles
Na przykład w tym przypadku ...jednym ograniczeniem jest to, że musimy ładować małe pliki, w przeciwnym razie wydajność będzie niska i może prowadzić do OOM.
Uwaga :
Dalsze informacje do odwiedzenia
źródło
sc.wholeTextFiles(folder).flatMap...
Dostępne jest proste, czyste rozwiązanie. Użyj metody wholeTextFiles (). Spowoduje to pobranie katalogu i utworzenie pary klucz-wartość. Zwrócony RDD będzie parą RDD. Znajdź poniżej opis z dokumentacji Spark :
źródło
WYPRÓBUJ TEN interfejs używany do zapisywania DataFrame w zewnętrznych systemach pamięci masowej (np. Systemach plików, sklepach z kluczowymi wartościami itp.). Użyj DataFrame.write (), aby uzyskać do tego dostęp.
Nowości w wersji 1.4.
csv (ścieżka, tryb = Brak, kompresja = Brak, sep = Brak, cytat = Brak, escape = Brak, nagłówek = Brak, nullValue = Brak, escapeQuotes = Brak, quoteAll = Brak, dateFormat = Brak, timestampFormat = Brak) Zapisuje zawartość DataFrame w formacie CSV pod określoną ścieżką.
Parametry: ścieżka - ścieżka w dowolnym trybie systemu plików obsługiwanym przez Hadoop - określa zachowanie operacji składowania, gdy dane już istnieją.
append: Dołącz zawartość DataFrame do istniejących danych. nadpisz: nadpisz istniejące dane. ignore: cicho zignoruj tę operację, jeśli dane już istnieją. błąd (przypadek domyślny): Zgłaszaj wyjątek, jeśli dane już istnieją. kompresja - kodek kompresji używany podczas zapisywania do pliku. Może to być jedna ze znanych krótkich nazw bez rozróżniania wielkości liter (none, bzip2, gzip, lz4, snappy i deflate). sep - ustawia pojedynczy znak jako separator dla każdego pola i wartości. Jeśli ustawiono Brak, używa wartości domyślnej,,. quote - ustawia pojedynczy znak używany do zmiany wartości cytowanych, w których separator może być częścią wartości. Jeśli ustawiony jest None, używa wartości domyślnej, „. Jeśli chcesz wyłączyć cytaty, musisz ustawić pusty ciąg. Escape - ustawia pojedynczy znak używany do zmiany znaczenia cytatów wewnątrz już cytowanej wartości. Jeśli None jest ustawiony , używa wartości domyślnej, \ escapeQuotes - Flaga wskazująca, czy wartości zawierające cudzysłowy zawsze powinny być ujęte w cudzysłowy. Jeśli ustawiono Brak, używa domyślnej wartości true, unikając wszystkich wartości zawierających znak cudzysłowu. quoteAll - Flaga wskazująca, czy wszystkie wartości powinny być zawsze ujęte w cudzysłów. Jeśli ustawiono Brak, używa wartości domyślnej false, a jedynie wartości specjalne zawierające znak cudzysłowu. nagłówek - zapisuje nazwy kolumn jako pierwszy wiersz. Jeśli ustawiono None, używa wartości domyślnej, false. nullValue - ustawia ciąg reprezentujący wartość zerową. Jeśli ustawiono Brak, używa wartości domyślnej, pusty ciąg. dateFormat - ustawia ciąg wskazujący format daty. Niestandardowe formaty dat są zgodne z formatami java.text.SimpleDateFormat. Dotyczy to typu daty. Jeśli ustawiono Brak, używa wartości domyślnej rrrr-MM-dd. timestampFormat - ustawia ciąg znaków wskazujący format znacznika czasu. Niestandardowe formaty dat są zgodne z formatami java.text.SimpleDateFormat. Dotyczy to typu znacznika czasu. Jeśli ustawiony jest None, używa wartości domyślnej rrrr-MM-dd'T'HH: mm: ss.SSSZZ.
źródło
źródło