W jaki sposób Hadoop przetwarza rekordy podzielone na granice bloków?

119

Według Hadoop - The Definitive Guide

Rekordy logiczne zdefiniowane przez FileInputFormats zwykle nie pasują dokładnie do bloków HDFS. Na przykład rekordy logiczne TextInputFormat to linie, które częściej przekraczają granice HDFS. Nie ma to wpływu na funkcjonowanie twojego programu - na przykład linie nie są pomijane ani przerywane - ale warto o tym wiedzieć, ponieważ oznacza to, że mapy lokalne (to znaczy mapy, które działają na tym samym hoście, co ich dane wejściowe) wykona kilka odczytów zdalnych. Niewielki narzut, jaki to powoduje, zwykle nie jest znaczący.

Załóżmy, że wiersz rekordu jest podzielony na dwa bloki (b1 i b2). Mapper przetwarzający pierwszy blok (b1) zauważy, że ostatnia linia nie ma separatora EOL i pobiera pozostałą część linii z następnego bloku danych (b2).

W jaki sposób program odwzorowujący przetwarzający drugi blok (b2) ustala, że ​​pierwszy rekord jest niekompletny i powinien przetwarzać, zaczynając od drugiego rekordu w bloku (b2)?

Praveen Sripati
źródło

Odpowiedzi:

160

Ciekawe pytanie, spędziłem trochę czasu przyglądając się kodowi w poszukiwaniu szczegółów i oto moje przemyślenia. Podziały są obsługiwane przez klienta InputFormat.getSplits, więc spojrzenie na FileInputFormat zawiera następujące informacje:

  • Dla każdego pliku wejściowego pobierz długość pliku, rozmiar bloku i oblicz rozmiar podziału, max(minSize, min(maxSize, blockSize))gdzie maxSizeodpowiada mapred.max.split.sizei minSizejest mapred.min.split.size.
  • Podziel plik na różne FileSplitna podstawie rozmiaru podziału obliczonego powyżej. Ważne jest tutaj to, że każdy FileSplitjest inicjowany startparametrem odpowiadającym przesunięciu w pliku wejściowym . W tym momencie nadal nie ma obsługi linii. Odpowiednia część kodu wygląda następująco:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

Po tym, jeśli spojrzysz na to, LineRecordReaderco jest zdefiniowane przez TextInputFormat, to tam są obsługiwane linie:

  • Kiedy inicjalizujesz swój LineRecordReader, próbuje utworzyć instancję, LineReaderktóra jest abstrakcją, aby móc odczytać wiersze FSDataInputStream. Istnieją 2 przypadki:
  • Jeśli jest CompressionCodeczdefiniowany, to ten kodek jest odpowiedzialny za obsługę granic. Prawdopodobnie nie dotyczy Twojego pytania.
  • Jeśli jednak nie ma kodeka, to jest to, co jest interesujące: jeśli starttwój InputSplitjest inny niż 0, cofasz się o 1 znak, a następnie pomijasz pierwszą napotkaną linię, oznaczoną \ n lub \ r \ n (Windows) ! Cofnięcie jest ważne, ponieważ w przypadku, gdy granice linii są takie same, jak granice podzielone, zapewnia to, że nie pominiesz prawidłowego wiersza. Oto odpowiedni kod:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Ponieważ podziały są obliczane w kliencie, elementy mapujące nie muszą działać po kolei, każdy program mapujący już wie, czy musi odrzucić pierwszą linię, czy nie.

Więc w zasadzie, jeśli masz 2 wiersze każdego 100 MB w tym samym pliku i dla uproszczenia, powiedzmy, że rozmiar podziału wynosi 64 MB. Następnie, kiedy zostaną obliczone podziały wejściowe, będziemy mieli następujący scenariusz:

  • Podział 1 zawierający ścieżkę i hosty do tego bloku. Zainicjowany na początku 200-200 = 0 Mb, długość 64 Mb.
  • Podział 2 zainicjowany na początku 200-200 + 64 = 64 Mb, długość 64 Mb.
  • Podział 3 zainicjowany na początku 200-200 + 128 = 128 Mb, długość 64 Mb.
  • Podział 4 zainicjowany na początku 200-200 + 192 = 192 Mb, długość 8 Mb.
  • Mapper A przetworzy podział 1, początek to 0, więc nie pomijaj pierwszego wiersza i odczytuje cały wiersz, który przekracza limit 64 MB, więc wymaga zdalnego odczytu.
  • Mapper B przetworzy podział 2, początek to! = 0, więc pomiń pierwszą linię po 64 Mb-1 bajt, co odpowiada końcowi linii 1 przy 100 Mb, która nadal jest w części 2, mamy 28 Mb linii w części 2, więc zdalny odczyt pozostałych 72Mb.
  • Mapper C przetworzy podział 3, początek to! = 0, więc pomiń pierwszą linię po 128 Mb-1 bajt, co odpowiada końcowi linii 2 na 200 Mb, czyli końcu pliku, więc nie rób nic.
  • Mapper D jest taki sam jak mapper C, z wyjątkiem tego, że szuka nowej linii po 192 Mb-1 bajcie.
Charles Menguy
źródło
Warto również wspomnieć, @PraveenSripati, że skrajne przypadki, w których granica znajdowałaby się na poziomie \ r podczas powrotu \ r \ n, są obsługiwane w LineReader.readLinefunkcji, nie sądzę, że jest to istotne dla twojego pytania, ale w razie potrzeby mogę dodać więcej szczegółów.
Charles Menguy
Załóżmy, że istnieją dwie linie z dokładnością do 64 MB na wejściu, więc InputSplits zachodzi dokładnie na granicach linii. Czy więc mapper zawsze zignoruje linię w drugim bloku, ponieważ start! = 0.
Praveen Sripati,
6
@PraveenSripati W takim przypadku drugi program odwzorowujący zobaczy początek! = 0, więc cofnij 1 znak, który przeniesie Cię z powrotem tuż przed \ n pierwszej linii, a następnie przeskoczy do następnego \ n. Więc pominie pierwszą linię, ale przetworzy drugą linię zgodnie z oczekiwaniami.
Charles Menguy
@CharlesMenguy czy to możliwe, że pierwsza linia pliku została w jakiś sposób pominięta? Konkretnie, mam pierwszą linię z kluczem = 1 i wartością a, a następnie gdzieś w pliku są jeszcze dwie linie z tym samym kluczem, klucz = 1, val = b i klucz = 1, val = c. Rzecz w tym, że mój reduktor pobiera {1, [b, c]} i {1, [a]} zamiast {1, [a, b, c]}. Nie dzieje się tak, jeśli dodam nową linię na początku mojego pliku. Jaki może być powód, sir?
Kobe-Wan Kenobi
@CharlesMenguy A co, jeśli plik na HDFS jest plikiem binarnym (w przeciwieństwie do pliku tekstowego, który \r\n, \nreprezentuje obcięcie rekordu)?
CᴴᴀZ
17

Algorytm Map Reduce nie działa na fizycznych blokach pliku. Działa na logicznych podziałach wejść. Podział danych wejściowych zależy od miejsca zapisu rekordu. Rekord może obejmować dwóch Mapperów.

Sposób, w jaki HDFS został skonfigurowany, dzieli bardzo duże pliki na duże bloki (na przykład mierzące 128 MB) i przechowuje trzy kopie tych bloków w różnych węzłach w klastrze.

HDFS nie ma świadomości zawartości tych plików. Rekord mógł zostać rozpoczęty w Bloku-a, ale koniec tego rekordu może znajdować się w Bloku-b .

Aby rozwiązać ten problem, Hadoop używa logicznej reprezentacji danych przechowywanych w blokach plików, zwanych podziałami danych wejściowych. Kiedy klient zadania MapReduce oblicza podziały wejściowe, ustala , gdzie zaczyna się pierwszy cały rekord w bloku i gdzie kończy się ostatni rekord w bloku .

Kluczowy punkt:

W przypadkach, gdy ostatni rekord w bloku jest niekompletny, podział danych wejściowych obejmuje informacje o lokalizacji dla następnego bloku i przesunięcie bajtów danych potrzebnych do uzupełnienia rekordu.

Spójrz na poniższy diagram.

wprowadź opis obrazu tutaj

Zapoznaj się z tym artykułem i powiązanym pytaniem SE: O dzieleniu plików Hadoop / HDFS

Więcej szczegółów można znaleźć w dokumentacji

Struktura Map-Reduce opiera się na InputFormat zadania w celu:

  1. Sprawdź poprawność specyfikacji wejściowej zadania.
  2. Podziel plik (i) wejściowe na logiczne podziały InputSplits, z których każdy jest następnie przypisywany do indywidualnego Mappera.
  3. Każdy InputSplit jest następnie przypisywany do indywidualnego Mappera do przetwarzania. Podział może być krotką . InputSplit[] getSplits(JobConf job,int numSplits) to API, które zajmuje się tymi rzeczami.

FileInputFormat , który rozszerza InputFormatzaimplementowaną getSplitsmetodę (). Zapoznaj się z elementami wewnętrznymi tej metody w grepcode

Ravindra babu
źródło
7

Widzę to następująco: InputFormat jest odpowiedzialny za podzielenie danych na logiczne podziały, biorąc pod uwagę charakter danych.
Nic nie stoi na przeszkodzie, aby to zrobić, chociaż może to znacznie zwiększyć opóźnienie zadania - cała logika i czytanie wokół pożądanych granic wielkości podziału będą miały miejsce w module śledzącym zadania.
Najprostszym formatem wejściowym obsługującym rekordy jest TextInputFormat. Działa w następujący sposób (o ile zrozumiałem z kodu) - format wejściowy tworzy podziały według rozmiaru, niezależnie od linii, ale LineRecordReader zawsze:
a) Pomiń pierwszą linię w podziale (lub jej część), jeśli nie jest pierwszy podział
b) Przeczytaj jedną linię na końcu za granicą podziału (jeśli dane są dostępne, więc nie jest to ostatni podział).

David Gruzman
źródło
Skip first line in the split (or part of it), if it is not the first split- jeśli pierwszy rekord w bloku innym niż pierwszy jest kompletny, nie wiadomo, jak ta logika zadziała.
Praveen Sripati
O ile widzę kod - każdy podział czyta co ma + następna linia. Więc jeśli koniec linii nie znajduje się na granicy bloku - jest w porządku. Jak dokładnie obsłużono przypadek, gdy koniec wiersza znajduje się dokładnie na granicy bloku - trzeba to zrozumieć - przeczytam kod trochę więcej
David Gruzman
3

Z tego, co zrozumiałem, po FileSplitzainicjowaniu pierwszego bloku wywoływany jest domyślny konstruktor. Dlatego początkowo wartości początku i długości wynoszą zero. Pod koniec przetwarzania pierwszego bloku, jeśli ostatnia linia jest niekompletna, to wartość długości będzie większa niż długość podziału i odczyta również pierwszą linię następnego bloku. W związku z tym wartość początku dla pierwszego bloku będzie większa od zera i pod tym warunkiem LineRecordReaderpominie pierwszą linię drugiego bloku. (Patrz źródło )

W przypadku, gdy ostatni wiersz pierwszego bloku jest kompletny, to wartość długości będzie równa długości pierwszego bloku, a wartość początku drugiego bloku będzie równa zero. W takim przypadku LineRecordReadernie pominie pierwszej linii i przeczyta drugi blok od początku.

Ma sens?

aa8y
źródło
2
W tym scenariuszu osoby odwzorowujące muszą komunikować się ze sobą i przetwarzać bloki po kolei, gdy ostatnia linia w danym bloku nie jest kompletna. Nie jestem pewien, czy tak to działa.
Praveen Sripati
1

Z kodu źródłowego hadoopa konstruktora LineRecordReader.java: znajduję kilka komentarzy:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

z tego, jak sądzę, hadoop przeczyta jedną dodatkową linię dla każdego podziału (na końcu bieżącego podziału, przeczyta następną linię w następnym podziale), a jeśli nie pierwszy podział, pierwsza linia zostanie odrzucona. aby żaden zapis linii nie został utracony ani niekompletny

Shenghai.Geng
źródło
0

Twórcy map nie muszą się komunikować. Bloki plików są w formacie HDFS i może bieżący program odwzorowujący (RecordReader) może odczytać blok zawierający pozostałą część wiersza. Dzieje się to za kulisami.

user3507308
źródło