Rozłożenie standardowego na równoległe procesy

13

Mam zadanie, które przetwarza listę plików na stdin. Czas uruchamiania programu jest znaczny, a ilość czasu, jaką zajmuje każdy plik, jest bardzo różna. Chcę odrodzić znaczną liczbę tych procesów, a następnie wysłać pracę do tych, które nie są zajęte. Istnieje kilka różnych narzędzi wiersza polecenia, które prawie robią to, co chcę, zawęziłem to do dwóch prawie działających opcji:

find . -type f | split -n r/24 -u --filter="myjob"
find . -type f | parallel --pipe -u -l 1 myjob

Problem polega na tym, że splitrobi czysty round-robin, więc jeden z procesów pozostaje w tyle i pozostaje w tyle, opóźniając zakończenie całej operacji; podczas gdy parallelchce odrodzić jeden proces na N linii lub bajtów danych wejściowych, a ja spędzam zbyt dużo czasu na narzutach związanych z uruchamianiem.

Czy istnieje coś takiego, który ponownie wykorzysta procesy i linie zasilające do tych procesów, które mają odblokowane standardowe wejścia?

BCoates
źródło
Skąd to splitpolecenie? Nazwa jest w konflikcie ze standardowym narzędziem do przetwarzania tekstu.
Gilles „SO- przestań być zły”
@Gilles, to GNU jeden: „split (GNU coreutils) 8.13” . Używanie go jako dziwnej alternatywy dla xargs prawdopodobnie nie jest zamierzonym zastosowaniem, ale jest najbliższe temu, co chciałem, znalazłem.
BCoates
2
Zastanawiałem się nad tym, a podstawowym problemem jest świadomość, że instancja myjobjest gotowa na otrzymanie większej ilości informacji. Nie ma sposobu, aby wiedzieć, że program jest gotowy do przetworzenia większej ilości danych wejściowych, wszystko, co możesz wiedzieć, to to, że gdzieś bufor (bufor potoku, bufor stdio) jest gotowy na przyjęcie większej ilości danych wejściowych. Czy możesz zorganizować wysyłanie przez program jakiegoś żądania (np. Wyświetlenie monitu), gdy będzie ono gotowe?
Gilles 'SO - przestań być zły'
Zakładając, że program nie korzysta z bufering na stdin, system plików FUSE, który reaguje na readwywołania, załatwi sprawę . To dość duże przedsięwzięcie programistyczne.
Gilles 'SO - przestań być zły'
dlaczego używasz -l 1w parallelargs? IIRC, który mówi równolegle, aby przetwarzał jeden wiersz danych wejściowych na zadanie (tj. Jedną nazwę pliku na rozwidlenie mojej pracy, więc dużo początkowego obciążenia).
cas

Odpowiedzi:

1

To nie wydaje się możliwe w tak ogólnym przypadku. Oznacza to, że masz bufor dla każdego procesu i możesz oglądać bufory z zewnątrz, aby zdecydować, gdzie umieścić następny wpis (planowanie) ... Oczywiście możesz coś napisać (lub użyć systemu wsadowego takiego jak slurm)

Ale w zależności od tego, jaki jest proces, możesz być w stanie wstępnie przetworzyć dane wejściowe. Na przykład, jeśli chcesz pobrać pliki, zaktualizować wpisy z DB lub podobnego, ale 50% z nich zostanie pominiętych (i dlatego masz dużą różnicę przetwarzania w zależności od danych wejściowych), po prostu skonfiguruj procesor wstępny weryfikuje, które wpisy potrwają długo (plik istnieje, dane zostały zmienione itp.), więc gwarantuje, że wszystko, co pochodzi z drugiej strony, zajmie dość dużo czasu. Nawet jeśli heurystyka nie jest doskonała, możesz skończyć ze znaczną poprawą. Możesz zrzucić pozostałe do pliku i przetworzyć później w ten sam sposób.

Ale to zależy od przypadku użycia.

estani
źródło
1

Nie, nie ma ogólnego rozwiązania. Twój dyspozytor musi wiedzieć, kiedy każdy program jest gotowy do odczytania innej linii, i nie ma standardu, o którym wiem, który na to pozwala. Wszystko, co możesz zrobić, to umieścić linię na STDOUT i poczekać, aż coś zużyje; producent nie może w dobry sposób stwierdzić, czy następny konsument jest gotowy, czy nie.

dannysauer
źródło
0

Nie wydaje mi się W moim ulubionym magazynie był kiedyś artykuł na temat programowania bash, który zrobił to, co chciałeś. Jestem skłonny uwierzyć, że gdyby istniały narzędzia do tego, wspomniałyby o nich. Więc chcesz coś w stylu:

set -m # enable job control
max_processes=8
concurrent_processes=0

child_has_ended() { concurrent_processes=$((concurrent_processes - 1)) }

trap child_has_ended SIGCHLD # that's magic calling our bash function when a child processes ends

for i in $(find . -type f)
do
  # don't do anything while there are max_processes running
  while [ ${concurrent_processes} -ge ${max_processes}]; do sleep 0.5; done 
  # increase the counter
  concurrent_processes=$((concurrent_processes + 1))
  # start a child process to actually deal with one file
  /path/to/script/to/handle/one/file $i &
done

Oczywiście możesz zmienić wywołanie na faktycznie działający skrypt według własnych upodobań. Magazyn, o którym wspominałem, początkowo zajmuje się zakładaniem rur i rozpoczynaniem wątków roboczych. Sprawdź mkfifoto, ale ta trasa jest o wiele bardziej skomplikowana, ponieważ procesy robocze muszą sygnalizować procesowi nadrzędnemu, że są gotowi na przyjęcie większej ilości danych. Potrzebujesz więc jednej piątki na każdy proces roboczy, aby wysłać dane, i jednej piątki na proces główny, aby odbierać rzeczy od robotników.

OŚWIADCZENIE Napisałem ten skrypt z góry głowy. Może mieć pewne problemy ze składnią.

Bananguin
źródło
1
To nie wydaje się spełniać wymagań: uruchamiasz inną instancję programu dla każdego elementu.
Gilles „SO- przestań być zły”
Zazwyczaj zaleca się stosowanie find . -type f | while read izamiast for i in $(find . -type f).
0

W przypadku GNU Parallel rozmiar bloku można ustawić za pomocą opcji --block. Wymaga to jednak wystarczającej ilości pamięci, aby utrzymać 1 blok w pamięci dla każdego z uruchomionych procesów.

Rozumiem, że nie jest to dokładnie to, czego szukasz, ale na razie może to być akceptowalne obejście.

Jeśli Twoje zadania trwają średnio w tym samym czasie, możesz użyć mbuffera:

find . -type f | split -n r/24 -u --filter="mbuffer -m 2G | myjob"
Ole Tange
źródło
0

Spróbuj tego:

mkfifo dla każdego procesu.

Następnie trzymaj tail -f | myjobsię każdego piątki.

Na przykład konfigurowanie pracowników (procesy Myjob)

mkdir /tmp/jobs
for X in 1 2 3 4
do
   mkfifo pipe$X
   tail -f pipe$X | myjob &
   jobs -l| awk '/pipe'$X'/ {print $2, "'pipe$X'"}' >> pipe-job-mapping
done

W zależności od aplikacji (myjob) możesz używać zadań -s do znalezienia zatrzymanych zadań. W przeciwnym razie wyświetl listę procesów posortowanych według procesora i wybierz ten, który zużywa najmniej zasobów. Of sam raport zadania, np. Ustawiając flagę w systemie plików, gdy chce więcej pracy.

Zakładając, że zadanie zostanie zatrzymane podczas oczekiwania na dane wejściowe, użyj

jobs -sl na przykład znaleźć numer zatrzymanego zadania i przypisać mu pracę

grep "^$STOPPED_PID" pipe-to-job-mapping | while read PID PIPE
do
   cat workset > $PIPE
done

Testowałem to z

garfield:~$ cd /tmp
garfield:/tmp$ mkfifo f1
garfield:/tmp$ mkfifo f2
garfield:/tmp$ tail -f f1 | sed 's/^/1 /' &
[1] 21056
garfield:/tmp$ tail -f f2 | sed 's/^/2 /' &
[2] 21058
garfield:/tmp$ echo hello > f1
1 hello
garfield:/tmp$ echo what > f2
2 what
garfield:/tmp$ echo yes > f1
1 yes

Muszę przyznać, że właśnie wymyśliłem so ymmv.

Johan
źródło
0

Naprawdę potrzebny jest do tego jakiś mechanizm kolejki.

Czy możliwe jest, aby zadania odczytywały dane wejściowe z kolejki, takie jak kolejka komunikatów SYSV, a następnie programy działały równolegle po prostu wypychając wartości do kolejki?

Inną możliwością jest użycie katalogów dla kolejki, takich jak to:

  1. wynik wyszukiwania tworzy dowiązanie symboliczne do każdego pliku do przetworzenia w katalogu, pending
  2. każdy proces zadania wykonuje mvpierwszy plik, który widzi w katalogu do katalogu rodzeństwa o pendingnazwie inprogress.
  3. jeśli zadanie pomyślnie przeniesie plik, wykonuje przetwarzanie; w przeciwnym razie wraca do znalezienia i przeniesienia innej nazwy plikupending
popiół
źródło
0

objaśniając odpowiedź @ ash, możesz użyć kolejki komunikatów SYSV do rozpowszechnienia pracy. Jeśli nie chcesz pisać własnego programu w C, istnieje narzędzie o nazwie, ipcmdktóre może pomóc. Oto, co zestawiłem, aby przekazać dane wyjściowe find $DIRECTORY -type fdo $PARALLELliczby procesów:

set -o errexit
set -o nounset

export IPCMD_MSQID=$(ipcmd msgget)

DIRECTORY=$1
PARALLEL=$2

# clean up message queue on exit
trap 'ipcrm -q $IPCMD_MSQID' EXIT

for i in $(seq $PARALLEL); do
   {
      while true
      do
          message=$(ipcmd msgrcv) || exit
          [ -f $message ] || break
          sleep $((RANDOM/3000))
      done
   } &
done

find "$DIRECTORY" -type f | xargs ipcmd msgsnd

for i in $(seq $PARALLEL); do
   ipcmd msgsnd "/dev/null/bar"
done
wait

Oto przebieg próbny:

$ for i in $(seq 20 10 100) ; do time parallel.sh /usr/lib/ $i ; done
parallel.sh /usr/lib/ $i  0.30s user 0.67s system 0% cpu 1:57.23 total
parallel.sh /usr/lib/ $i  0.28s user 0.69s system 1% cpu 1:09.58 total
parallel.sh /usr/lib/ $i  0.19s user 0.80s system 1% cpu 1:05.29 total
parallel.sh /usr/lib/ $i  0.29s user 0.73s system 2% cpu 44.417 total
parallel.sh /usr/lib/ $i  0.25s user 0.80s system 2% cpu 37.353 total
parallel.sh /usr/lib/ $i  0.21s user 0.85s system 3% cpu 32.354 total
parallel.sh /usr/lib/ $i  0.30s user 0.82s system 3% cpu 28.542 total
parallel.sh /usr/lib/ $i  0.27s user 0.88s system 3% cpu 30.219 total
parallel.sh /usr/lib/ $i  0.34s user 0.84s system 4% cpu 26.535 total
kouk
źródło
0

O ile nie można oszacować, jak długo dany plik wejściowy będzie przetwarzany, a procesy robocze nie będą miały możliwości zgłoszenia się do harmonogramu (tak jak ma to miejsce w zwykłych scenariuszach obliczeń równoległych - często przez MPI ), generalnie nie ma szczęścia - albo zapłacić karę niektórym pracownikom przetwarzającym dane wejściowe dłużej niż inni (z powodu nierówności danych wejściowych), albo zapłacić karę odrodzenia jednego nowego procesu dla każdego pliku wejściowego.

Peter
źródło
0

GNU Parallel zmieniło się w ciągu ostatnich 7 lat. Więc dzisiaj może to zrobić:

Ten przykład pokazuje, że procesom 11 i 10 podano więcej bloków niż procesom 4 i 5, ponieważ 4 i 5 czytają wolniej:

seq 1000000 |
  parallel -j8 --tag --roundrobin --pipe --block 1k 'pv -qL {}0000 | wc' ::: 11 4 5 6 9 8 7 10
Ole Tange
źródło