Ciekawe pytanie, spędziłem trochę czasu przyglądając się kodowi w poszukiwaniu szczegółów i oto moje przemyślenia. Podziały są obsługiwane przez klienta InputFormat.getSplits
, więc spojrzenie na FileInputFormat zawiera następujące informacje:
- Dla każdego pliku wejściowego pobierz długość pliku, rozmiar bloku i oblicz rozmiar podziału,
max(minSize, min(maxSize, blockSize))
gdzie maxSize
odpowiada mapred.max.split.size
i minSize
jest mapred.min.split.size
.
Podziel plik na różne FileSplit
na podstawie rozmiaru podziału obliczonego powyżej. Ważne jest tutaj to, że każdy FileSplit
jest inicjowany start
parametrem odpowiadającym przesunięciu w pliku wejściowym . W tym momencie nadal nie ma obsługi linii. Odpowiednia część kodu wygląda następująco:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
Po tym, jeśli spojrzysz na to, LineRecordReader
co jest zdefiniowane przez TextInputFormat
, to tam są obsługiwane linie:
- Kiedy inicjalizujesz swój
LineRecordReader
, próbuje utworzyć instancję, LineReader
która jest abstrakcją, aby móc odczytać wiersze FSDataInputStream
. Istnieją 2 przypadki:
- Jeśli jest
CompressionCodec
zdefiniowany, to ten kodek jest odpowiedzialny za obsługę granic. Prawdopodobnie nie dotyczy Twojego pytania.
Jeśli jednak nie ma kodeka, to jest to, co jest interesujące: jeśli start
twój InputSplit
jest inny niż 0, cofasz się o 1 znak, a następnie pomijasz pierwszą napotkaną linię, oznaczoną \ n lub \ r \ n (Windows) ! Cofnięcie jest ważne, ponieważ w przypadku, gdy granice linii są takie same, jak granice podzielone, zapewnia to, że nie pominiesz prawidłowego wiersza. Oto odpowiedni kod:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
Ponieważ podziały są obliczane w kliencie, elementy mapujące nie muszą działać po kolei, każdy program mapujący już wie, czy musi odrzucić pierwszą linię, czy nie.
Więc w zasadzie, jeśli masz 2 wiersze każdego 100 MB w tym samym pliku i dla uproszczenia, powiedzmy, że rozmiar podziału wynosi 64 MB. Następnie, kiedy zostaną obliczone podziały wejściowe, będziemy mieli następujący scenariusz:
- Podział 1 zawierający ścieżkę i hosty do tego bloku. Zainicjowany na początku 200-200 = 0 Mb, długość 64 Mb.
- Podział 2 zainicjowany na początku 200-200 + 64 = 64 Mb, długość 64 Mb.
- Podział 3 zainicjowany na początku 200-200 + 128 = 128 Mb, długość 64 Mb.
- Podział 4 zainicjowany na początku 200-200 + 192 = 192 Mb, długość 8 Mb.
- Mapper A przetworzy podział 1, początek to 0, więc nie pomijaj pierwszego wiersza i odczytuje cały wiersz, który przekracza limit 64 MB, więc wymaga zdalnego odczytu.
- Mapper B przetworzy podział 2, początek to! = 0, więc pomiń pierwszą linię po 64 Mb-1 bajt, co odpowiada końcowi linii 1 przy 100 Mb, która nadal jest w części 2, mamy 28 Mb linii w części 2, więc zdalny odczyt pozostałych 72Mb.
- Mapper C przetworzy podział 3, początek to! = 0, więc pomiń pierwszą linię po 128 Mb-1 bajt, co odpowiada końcowi linii 2 na 200 Mb, czyli końcu pliku, więc nie rób nic.
- Mapper D jest taki sam jak mapper C, z wyjątkiem tego, że szuka nowej linii po 192 Mb-1 bajcie.
LineReader.readLine
funkcji, nie sądzę, że jest to istotne dla twojego pytania, ale w razie potrzeby mogę dodać więcej szczegółów.\r\n, \n
reprezentuje obcięcie rekordu)?Algorytm Map Reduce nie działa na fizycznych blokach pliku. Działa na logicznych podziałach wejść. Podział danych wejściowych zależy od miejsca zapisu rekordu. Rekord może obejmować dwóch Mapperów.
Sposób, w jaki HDFS został skonfigurowany, dzieli bardzo duże pliki na duże bloki (na przykład mierzące 128 MB) i przechowuje trzy kopie tych bloków w różnych węzłach w klastrze.
HDFS nie ma świadomości zawartości tych plików. Rekord mógł zostać rozpoczęty w Bloku-a, ale koniec tego rekordu może znajdować się w Bloku-b .
Aby rozwiązać ten problem, Hadoop używa logicznej reprezentacji danych przechowywanych w blokach plików, zwanych podziałami danych wejściowych. Kiedy klient zadania MapReduce oblicza podziały wejściowe, ustala , gdzie zaczyna się pierwszy cały rekord w bloku i gdzie kończy się ostatni rekord w bloku .
Kluczowy punkt:
W przypadkach, gdy ostatni rekord w bloku jest niekompletny, podział danych wejściowych obejmuje informacje o lokalizacji dla następnego bloku i przesunięcie bajtów danych potrzebnych do uzupełnienia rekordu.
Spójrz na poniższy diagram.
Zapoznaj się z tym artykułem i powiązanym pytaniem SE: O dzieleniu plików Hadoop / HDFS
Więcej szczegółów można znaleźć w dokumentacji
Struktura Map-Reduce opiera się na InputFormat zadania w celu:
InputSplit[] getSplits(JobConf job,int numSplits
) to API, które zajmuje się tymi rzeczami.FileInputFormat , który rozszerza
InputFormat
zaimplementowanągetSplits
metodę (). Zapoznaj się z elementami wewnętrznymi tej metody w grepcodeźródło
Widzę to następująco: InputFormat jest odpowiedzialny za podzielenie danych na logiczne podziały, biorąc pod uwagę charakter danych.
Nic nie stoi na przeszkodzie, aby to zrobić, chociaż może to znacznie zwiększyć opóźnienie zadania - cała logika i czytanie wokół pożądanych granic wielkości podziału będą miały miejsce w module śledzącym zadania.
Najprostszym formatem wejściowym obsługującym rekordy jest TextInputFormat. Działa w następujący sposób (o ile zrozumiałem z kodu) - format wejściowy tworzy podziały według rozmiaru, niezależnie od linii, ale LineRecordReader zawsze:
a) Pomiń pierwszą linię w podziale (lub jej część), jeśli nie jest pierwszy podział
b) Przeczytaj jedną linię na końcu za granicą podziału (jeśli dane są dostępne, więc nie jest to ostatni podział).
źródło
Skip first line in the split (or part of it), if it is not the first split
- jeśli pierwszy rekord w bloku innym niż pierwszy jest kompletny, nie wiadomo, jak ta logika zadziała.Z tego, co zrozumiałem, po
FileSplit
zainicjowaniu pierwszego bloku wywoływany jest domyślny konstruktor. Dlatego początkowo wartości początku i długości wynoszą zero. Pod koniec przetwarzania pierwszego bloku, jeśli ostatnia linia jest niekompletna, to wartość długości będzie większa niż długość podziału i odczyta również pierwszą linię następnego bloku. W związku z tym wartość początku dla pierwszego bloku będzie większa od zera i pod tym warunkiemLineRecordReader
pominie pierwszą linię drugiego bloku. (Patrz źródło )W przypadku, gdy ostatni wiersz pierwszego bloku jest kompletny, to wartość długości będzie równa długości pierwszego bloku, a wartość początku drugiego bloku będzie równa zero. W takim przypadku
LineRecordReader
nie pominie pierwszej linii i przeczyta drugi blok od początku.Ma sens?
źródło
Z kodu źródłowego hadoopa konstruktora LineRecordReader.java: znajduję kilka komentarzy:
z tego, jak sądzę, hadoop przeczyta jedną dodatkową linię dla każdego podziału (na końcu bieżącego podziału, przeczyta następną linię w następnym podziale), a jeśli nie pierwszy podział, pierwsza linia zostanie odrzucona. aby żaden zapis linii nie został utracony ani niekompletny
źródło
Twórcy map nie muszą się komunikować. Bloki plików są w formacie HDFS i może bieżący program odwzorowujący (RecordReader) może odczytać blok zawierający pozostałą część wiersza. Dzieje się to za kulisami.
źródło