Łączenie wielu zadań MapReduce w łańcuchy na platformie Hadoop

124

W wielu rzeczywistych sytuacjach, w których stosujesz MapReduce, ostateczne algorytmy kończą się na kilku krokach MapReduce.

tj. Map1, Reduce1, Map2, Reduce2 i tak dalej.

Mamy więc dane wyjściowe z ostatniej redukcji, które są potrzebne jako dane wejściowe dla następnej mapy.

Dane pośrednie to coś, czego (ogólnie) nie chcesz zachować po pomyślnym zakończeniu potoku. Ponieważ te dane pośrednie są na ogół pewną strukturą danych (jak „mapa” lub „zbiór”), ​​nie chcesz wkładać zbyt wiele wysiłku w pisanie i odczytywanie tych par klucz-wartość.

Jaki jest zalecany sposób robienia tego w Hadoop?

Czy istnieje (prosty) przykład, który pokazuje, jak prawidłowo obsługiwać te dane pośrednie, łącznie z późniejszym czyszczeniem?

Niels Basjes
źródło
2
przy użyciu którego frameworku mapreduce?
skaffman
1
Zredagowałem pytanie, aby wyjaśnić, że mówię o Hadoop.
Niels Basjes
Poleciłbym do tego klejnot świniopasów: github.com/Ganglion/swineherd best, Tobias
Tobias

Odpowiedzi:

57

Myślę, że ten samouczek w sieci programistów Yahoo pomoże ci w tym: Chaining Jobs

Używasz JobClient.runJob(). Ścieżka wyjściowa danych z pierwszego zadania staje się ścieżką wejściową do drugiego zadania. Muszą one być przekazane jako argumenty do zadań z odpowiednim kodem, aby je przeanalizować i ustawić parametry zadania.

Myślę, że powyższa metoda może być jednak sposobem, w jaki zrobił to teraz starszy mapowany interfejs API, ale nadal powinien działać. Podobna metoda będzie dostępna w nowym API mapreduce, ale nie jestem pewien, co to jest.

Jeśli chodzi o usuwanie danych pośrednich po zakończeniu zadania, możesz to zrobić w swoim kodzie. Sposób, w jaki zrobiłem to wcześniej, to użycie czegoś takiego:

FileSystem.delete(Path f, boolean recursive);

Gdzie ścieżka to lokalizacja danych w HDFS. Musisz się upewnić, że usuniesz te dane tylko wtedy, gdy żadne inne zadanie ich nie wymaga.

Binary Nerd
źródło
3
Dzięki za link do samouczka Yahoo. Chaining Jobs jest rzeczywiście tym, czego chcesz, jeśli obie są w tym samym biegu. Szukałem tego, co najłatwiej zrobić, jeśli chcesz mieć możliwość ich osobnego uruchamiania. We wspomnianym samouczku znalazłem SequenceFileOutputFormat "Zapisuje pliki binarne odpowiednie do wczytywania do kolejnych zadań MapReduce" oraz pasujący SequenceFileInputFormat, dzięki czemu wszystko jest bardzo łatwe. Dzięki.
Niels Basjes
20

Można to zrobić na wiele sposobów.

(1) Praca kaskadowa

Utwórz obiekt JobConf „job1” dla pierwszego zadania i ustaw wszystkie parametry z „input” jako katalogiem wejściowym i „temp” jako katalogiem wyjściowym. Wykonaj to zadanie:

JobClient.run(job1).

Bezpośrednio pod nim utwórz obiekt JobConf „job2” dla drugiego zadania i ustaw wszystkie parametry z „temp” jako katalogiem wejściowym i „output” jako katalogiem wyjściowym. Wykonaj to zadanie:

JobClient.run(job2).

(2) Utwórz dwa obiekty JobConf i ustaw w nich wszystkie parametry, tak jak (1), z wyjątkiem tego, że nie używasz JobClient.run.

Następnie utwórz dwa obiekty Job z parametrami jobconfs:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

Używając obiektu jobControl, określasz zależności zadań, a następnie uruchamiasz zadania:

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) Jeśli potrzebujesz struktury podobnej do Map + | Zmniejsz | Map *, możesz użyć klas ChainMapper i ChainReducer, które są dostarczane z usługą Hadoop w wersji 0.19 i nowszych.

user381928
źródło
7

Można to zrobić na wiele sposobów. Skupię się na dwóch.

Jedna to biblioteka adnotacji Riffle ( http://github.com/cwensel/riffle ) do identyfikowania elementów zależnych i „wykonywania” ich w zależności (topologicznej) kolejności.

Lub możesz użyć Cascade (i MapReduceFlow) w Cascading ( http://www.cascading.org/ ). Przyszła wersja będzie obsługiwać adnotacje Riffle, ale teraz działa świetnie z surowymi zadaniami MR JobConf.

Wariantem tego jest nie zarządzanie zadaniami MR ręcznie, ale tworzenie aplikacji przy użyciu Cascading API. Następnie JobConf i łączenie zadań są obsługiwane wewnętrznie za pośrednictwem klas Cascading planner i Flow.

W ten sposób spędzasz czas koncentrując się na swoim problemie, a nie na mechanice zarządzania zadaniami Hadoop itp. Możesz nawet nakładać na siebie różne języki (takie jak clojure lub jruby), aby jeszcze bardziej uprościć programowanie i aplikacje. http://www.cascading.org/modules.html

cwensel
źródło
6

Zrobiłem łańcuch pracy używając obiektów JobConf jeden po drugim. Wziąłem WordCount na przykład do łączenia ofert pracy. Jedno zadanie oblicza, ile razy słowo powtórzyło się w danym wyniku. Druga praca przyjmuje wynik pierwszej pracy jako dane wejściowe i oblicza całkowitą liczbę słów w danym wejściu. Poniżej znajduje się kod, który należy umieścić w klasie kierowcy.

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

Polecenie do uruchomienia tych zadań to:

bin / hadoop jar TotalWords.

Musimy podać ostateczną nazwę zadania dla polecenia. W powyższym przypadku jest to TotalWords.

psrklr
źródło
5

Możesz uruchomić łańcuch MR w sposób podany w kodzie.

UWAGA : podano tylko kod kierowcy

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

KOLEJNOŚĆ TO MAPA

( JOB1 )-> REDUCE-> ( JOB2 ) MAPA Zrobiono
to, aby posortować klucze, ale jest więcej sposobów, takich jak korzystanie z mapy drzewa.
Chcę jednak skupić się na sposobie, w jaki Zlecenia zostały połączone! !
Dziękuję Ci

Aniruddha Sinha
źródło
3

Możemy skorzystać z waitForCompletion(true)metody Job do zdefiniowania zależności pomiędzy pracą.

W moim scenariuszu miałem 3 prace, które były od siebie zależne. W klasie sterownika użyłem poniższego kodu i działa zgodnie z oczekiwaniami.

public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        CCJobExecution ccJobExecution = new CCJobExecution();

        Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
        Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
        Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);

        System.out.println("****************Started Executing distanceTimeFraudJob ================");
        distanceTimeFraudJob.submit();
        if(distanceTimeFraudJob.waitForCompletion(true))
        {
            System.out.println("=================Completed DistanceTimeFraudJob================= ");
            System.out.println("=================Started Executing spendingFraudJob ================");
            spendingFraudJob.submit();
            if(spendingFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed spendingFraudJob================= ");
                System.out.println("=================Started locationFraudJob================= ");
                locationFraudJob.submit();
                if(locationFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed locationFraudJob=================");
                }
            }
        }
    }
Shivaprasad
źródło
Twoja odpowiedź dotyczy tego, jak dołączyć do tych zadań pod względem wykonania. Pierwotne pytanie dotyczyło najlepszych struktur danych. Twoja odpowiedź nie jest więc odpowiednia dla tego konkretnego pytania.
Niels Basjes
2

Nowa klasa org.apache.hadoop.mapreduce.lib.chain.ChainMapper pomaga w tym scenariuszu

Xavi
źródło
1
Odpowiedź jest dobra - ale powinieneś dodać więcej szczegółów na temat tego, co robi lub przynajmniej link do referencji API, aby ludzie mogli głosować w górę
Jeremy Hajek
ChainMapper i ChainReducer są używane, aby mieć 1 lub więcej mapperów przed Reduce i 0 lub więcej mapperów po Reduce, spec. (Mapper +) Reduce (Mapper *). Popraw mnie, jeśli się mylę, ale nie sądzę, aby to podejście powodowało szeregowe łączenie zadań, o które prosił OP.
oczkoisse
1

Chociaż istnieją złożone mechanizmy przepływu pracy Hadoop oparte na serwerze, np. Oozie, mam prostą bibliotekę Java, która umożliwia wykonywanie wielu zadań Hadoop jako przepływu pracy. Konfiguracja zadania i przepływ pracy definiujące zależność między zadaniami są konfigurowane w pliku JSON. Wszystko jest konfigurowalne zewnętrznie i nie wymaga żadnych zmian w istniejącej mapie, aby implementacja była częścią przepływu pracy.

Szczegóły można znaleźć tutaj. Kod źródłowy i jar są dostępne na githubie.

http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

Pranab

Pranab
źródło
1

Myślę, że Oozie pomaga kolejnym zadaniom otrzymywać dane wejściowe bezpośrednio z poprzedniej pracy. Pozwala to uniknąć operacji we / wy wykonywanej za pomocą funkcji Jobcontrol.

stholy
źródło
1

Jeśli chcesz programowo łączyć swoje zadania w łańcuchy, zechcesz skorzystać z JobControl. Użycie jest dość proste:

JobControl jobControl = new JobControl(name);

Następnie dodaj wystąpienia ControlledJob. ControlledJob definiuje zadanie z jego zależnościami, w ten sposób automatycznie podłączając wejścia i wyjścia, aby pasowały do ​​„łańcucha” zadań.

    jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));

    jobControl.run();

uruchamia łańcuch. Będziesz chciał umieścić to w wątku mówionym. Pozwala to sprawdzić stan Twojego łańcucha podczas jego działania:

    while (!jobControl.allFinished()) {
        System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
        System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
        System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
        List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
        System.out.println("Jobs in success state: " + successfulJobList.size());
        List<ControlledJob> failedJobList = jobControl.getFailedJobList();
        System.out.println("Jobs in failed state: " + failedJobList.size());
    }
Erik Schmiegelow
źródło
0

Jak wspomniałeś w swoim wymaganiu, że chcesz, aby o / p z MRJob1 było i / p z MRJob2 itd., Możesz rozważyć użycie oozie przepływu pracy w tym przypadku. Możesz również rozważyć zapisanie danych pośrednich do HDFS, ponieważ będą one używane przez następne MRJob. Po zakończeniu zadania możesz wyczyścić dane pośrednie.

<start to="mr-action1"/>
<action name="mr-action1">
   <!-- action for MRJob1-->
   <!-- set output path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="mr-action2">
   <!-- action for MRJob2-->
   <!-- set input path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="success">
        <!-- action for success-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="fail">
        <!-- action for fail-->
    <ok to="end"/>
    <error to="end"/>
</action>

<end name="end"/>

Neha Kumari
źródło