Skrócenie czasu przerwy w usuwaniu elementów bezużytecznych w programie Haskell

132

Opracowujemy program, który odbiera i przekazuje dalej „wiadomości”, zachowując tymczasową historię tych wiadomości, aby na żądanie mógł przekazać historię wiadomości. Wiadomości są identyfikowane numerycznie, zwykle mają rozmiar około 1 kilobajta i musimy przechowywać setki tysięcy takich wiadomości.

Chcemy zoptymalizować ten program pod kątem opóźnienia: czas między wysłaniem a odebraniem wiadomości musi być poniżej 10 milisekund.

Program został napisany w języku Haskell i skompilowany za pomocą GHC. Okazało się jednak, że przerwy w usuwaniu elementów bezużytecznych są zbyt długie, aby sprostać naszym wymaganiom dotyczącym opóźnienia: ponad 100 milisekund w naszym programie w świecie rzeczywistym.

Poniższy program jest uproszczoną wersją naszej aplikacji. Używa Data.Map.Strictdo przechowywania wiadomości. Wiadomości są ByteStringidentyfikowane przez Int. 1 000 000 wiadomości jest wstawianych w rosnącej kolejności numerycznej, a najstarsze wiadomości są stale usuwane, aby historia obejmowała maksymalnie 200 000 wiadomości.

module Main (main) where

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if 200000 < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

Skompilowaliśmy i uruchomiliśmy ten program przy użyciu:

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
   3,116,460,096 bytes allocated in the heap
     385,101,600 bytes copied during GC
     235,234,800 bytes maximum residency (14 sample(s))
     124,137,808 bytes maximum slop
             600 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      6558 colls,     0 par    0.238s   0.280s     0.0000s    0.0012s
  Gen  1        14 colls,     0 par    0.179s   0.250s     0.0179s    0.0515s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.652s  (  0.745s elapsed)
  GC      time    0.417s  (  0.530s elapsed)
  EXIT    time    0.010s  (  0.052s elapsed)
  Total   time    1.079s  (  1.326s elapsed)

  %GC     time      38.6%  (40.0% elapsed)

  Alloc rate    4,780,213,353 bytes per MUT second

  Productivity  61.4% of total user, 49.9% of total elapsed

Ważną metryką jest tutaj „maksymalna przerwa” wynosząca 0,0515 s lub 51 milisekund. Chcemy to zredukować przynajmniej o rząd wielkości.

Eksperymenty pokazują, że długość pauzy w GC jest określana przez liczbę wiadomości w historii. Zależność jest z grubsza liniowa lub być może superliniowa. Poniższa tabela przedstawia tę relację. ( Możesz zobaczyć nasze testy porównawcze tutaj , a niektóre wykresy tutaj ).

msgs history length  max GC pause (ms)
===================  =================
12500                                3
25000                                6
50000                               13
100000                              30
200000                              56
400000                             104
800000                             199
1600000                            487
3200000                           1957
6400000                           5378

Eksperymentowaliśmy z kilkoma innymi zmiennymi, aby dowiedzieć się, czy mogą one zmniejszyć to opóźnienie, z których żadna nie robi dużej różnicy. Wśród tych nieistotnych zmiennych są: optymalizacja ( -O, -O2); Opcje RTS GC ( -G, -H, -A, -c), liczba rdzeni ( -N), różne struktury danych ( Data.Sequence), rozmiar wiadomości, a ilość generowanych śmieci krótkotrwały. Decydującym czynnikiem jest liczba wiadomości w historii.

Nasza teoria robocza mówi, że przerwy są liniowe w liczbie komunikatów, ponieważ każdy cykl GC musi przejść przez całą dostępną pamięć roboczą i skopiować ją, co jest wyraźnie operacjami liniowymi.

Pytania:

  • Czy ta teoria czasu liniowego jest poprawna? Czy długość przerw GC można wyrazić w ten prosty sposób, czy rzeczywistość jest bardziej złożona?
  • Jeśli pauza GC jest liniowa w pamięci roboczej, czy istnieje sposób na zredukowanie stałych czynników?
  • Czy są jakieś opcje przyrostowego GC lub coś podobnego? Możemy zobaczyć tylko artykuły naukowe. Jesteśmy bardzo chętni do wymiany przepustowości na mniejsze opóźnienia.
  • Czy istnieją sposoby na „partycjonowanie” pamięci na mniejsze cykle GC, inne niż dzielenie na wiele procesów?
jameshfisher
źródło
1
@Bakuriu: racja, ale 10 ms powinno być osiągalne w prawie każdym nowoczesnym systemie operacyjnym bez żadnych poprawek. Kiedy uruchamiam uproszczone programy w C, nawet na moim starym Raspberry pi, z łatwością osiągają opóźnienia w zakresie 5 ms lub przynajmniej niezawodnie około 15 ms.
leftaroundokoło
3
Czy jesteś pewien, że Twój przypadek testowy jest przydatny (jak COntrol.Concurrent.Channa przykład nie używasz ? Mutowalne obiekty zmieniają równanie)? Proponuję zacząć od upewnienia się, że wiesz, jakie śmieci generujesz i robisz ich jak najmniej (np. Upewnij się, że fuzja się dzieje, spróbuj -funbox-strict). Może spróbuj użyć biblioteki strumieniowej (iostreams, pipe, conduit, streaming) i dzwonić performGCbezpośrednio w częstszych odstępach czasu.
jberryman
6
Jeśli to, co próbujesz osiągnąć, można zrobić w stałej przestrzeni, zacznij od próby osiągnięcia tego (np. Może bufor pierścieniowy z MutableByteArray; GC w ogóle nie będzie w tym przypadku zaangażowany)
jberryman
1
Tym, którzy sugerują zmienne struktury i dbają o stworzenie minimalnej ilości śmieci, zauważ, że to zachowywany rozmiar, a nie ilość zebranych śmieci, wydaje się dyktować czas pauzy. Wymuszanie częstszego zbierania danych skutkuje większą liczbą przerw o mniej więcej tej samej długości. Edycja: Zmienne struktury poza stertą mogą być interesujące, ale w wielu przypadkach praca z nimi nie jest tak przyjemna!
mike
6
Opis ten z pewnością sugeruje, że czas GC będzie liniowy względem wielkości pryzmy dla wszystkich pokoleń, a ważnymi czynnikami są rozmiar zachowanych obiektów (do kopiowania) i liczba istniejących do nich wskaźników (do oczyszczania): ghc.haskell. org / trac / ghc / wiki / Commentary / Rts / Storage / GC / Copying
mike

Odpowiedzi:

97

W rzeczywistości robisz całkiem nieźle, mając czas przerwy 51 ms z ponad 200 MB danych na żywo. System, nad którym pracuję, ma dłuższy maksymalny czas pauzy z połową ilości danych na żywo.

Twoje założenie jest poprawne, główny czas przerwy w GC jest wprost proporcjonalny do ilości danych w czasie rzeczywistym i niestety nie ma sposobu na obejście tego z GHC w obecnym stanie. W przeszłości eksperymentowaliśmy z przyrostowym GC, ale był to projekt badawczy i nie osiągnęliśmy poziomu dojrzałości potrzebnego do złożenia go w uwolnionym GHC.

Mamy nadzieję, że pomoże w tym w przyszłości w kompaktowych regionach: https://phabricator.haskell.org/D1264 . Jest to rodzaj ręcznego zarządzania pamięcią, w którym kompaktujesz strukturę w stercie, a GC nie musi przez nią przechodzić. Działa najlepiej w przypadku danych o długiej żywotności, ale być może będzie wystarczająco dobry, aby używać go do pojedynczych wiadomości w Twoim ustawieniu. Chcemy mieć to w GHC 8.2.0.

Jeśli jesteś w środowisku rozproszonym i masz jakiś system równoważenia obciążenia, możesz skorzystać ze sztuczek, aby uniknąć uderzenia pauzy, w zasadzie upewnij się, że moduł równoważenia obciążenia nie wysyła żądań do maszyn, które mają zamiar wykonaj główny GC i oczywiście upewnij się, że maszyna nadal kończy GC, nawet jeśli nie otrzymuje żądań.

Simon Marlow
źródło
13
Cześć Simon, bardzo dziękuję za szczegółową odpowiedź! To zła wiadomość, ale dobrze jest mieć zamknięcie. Obecnie dążymy do mutowalnej implementacji będącej jedyną odpowiednią alternatywą. Kilka rzeczy, których nie rozumiemy: (1) Jakie są sztuczki związane ze schematem równoważenia obciążenia - czy obejmują one ręczne performGC? (2) Dlaczego zagęszczanie z użyciem -cdziała gorzej - przypuszczamy, że nie znajduje wielu rzeczy, które może pozostawić na miejscu? (3) Czy jest więcej szczegółów na temat kompaktów? Brzmi bardzo interesująco, ale niestety to trochę za daleko w przyszłości, abyśmy mogli to rozważyć.
jameshfisher
2
@mljrg Możesz być zainteresowany well-typed.com/blog/2019/10/nonmoving-gc-merge
Alfredo Di Napoli
9

Wypróbowałem Twój fragment kodu z podejściem buforowania pierścienia, używając IOVectorjako podstawowej struktury danych. W moim systemie (GHC 7.10.3, te same opcje kompilacji) spowodowało to skrócenie maksymalnego czasu (metryki, którą wspomniałeś w swoim OP) o ~ 22%.

NB. Zrobiłem tutaj dwa założenia:

  1. Zmienna struktura danych pasuje do problemu (wydaje mi się, że przekazywanie wiadomości i tak implikuje IO)
  2. Twoja wiadomośćId są ciągłe

Z dodatkowymi Intparametrami i arytmetyką (na przykład gdy messageId są resetowane do 0 lub minBound) powinno być łatwo określić, czy dana wiadomość jest nadal w historii i pobrać ją z odpowiedniego indeksu w buforze pierścieniowym.

Dla przyjemności testowania:

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

import qualified Data.Vector.Mutable as Vector

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

data Chan2 = Chan2
    { next          :: !Int
    , maxId         :: !Int
    , ringBuffer    :: !(Vector.IOVector ByteString.ByteString)
    }

chanSize :: Int
chanSize = 200000

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))


newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize

pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
    let ix' = if ix == chanSize then 0 else ix + 1
    in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if chanSize < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main, main1, main2 :: IO ()

main = main2

main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
mgmeier
źródło
2
Cześć! Niezła odpowiedź. Podejrzewam, że powodem, dla którego przyspiesza to tylko o 22%, jest to, że GC nadal musi przechodzić przez wartości IOVectori (niezmienne, GC'd) w każdym indeksie. Obecnie badamy opcje ponownego wdrożenia przy użyciu zmiennych struktur. Prawdopodobnie będzie podobny do twojego systemu bufora pierścieniowego. Ale przenosimy to całkowicie poza przestrzeń pamięci Haskell, aby wykonać własne ręczne zarządzanie pamięcią.
jameshfisher
11
@jamesfisher: Właściwie stanąłem przed podobnym problemem, ale postanowiłem zachować zarządzanie memami po stronie Haskella. Rozwiązaniem był rzeczywiście bufor pierścieniowy, który zachowuje bajtową kopię oryginalnych danych w pojedynczym, ciągłym bloku pamięci, dając w ten sposób pojedynczą wartość Haskella. Spójrz na to w tym streszczeniu RingBuffer.hs . Przetestowałem to na twoim przykładowym kodzie i przyspieszyłem o około 90% krytycznej metryki. Możesz użyć kodu w dogodnym dla siebie czasie.
mgmeier
8

Muszę się zgodzić z innymi - jeśli masz trudne ograniczenia w czasie rzeczywistym, używanie języka GC nie jest idealne.

Możesz jednak rozważyć eksperymentowanie z innymi dostępnymi strukturami danych zamiast tylko Data.Map.

Przepisałem go za pomocą Data.Sequence i otrzymałem kilka obiecujących ulepszeń:

msgs history length  max GC pause (ms)
===================  =================
12500                              0.7
25000                              1.4
50000                              2.8
100000                             5.4
200000                            10.9
400000                            21.8
800000                            46
1600000                           87
3200000                          175
6400000                          350

Mimo że optymalizujesz pod kątem opóźnienia, zauważyłem, że poprawiły się również inne wskaźniki. W przypadku 200000 czas wykonania spada z 1,5 s do 0,2 s, a całkowite użycie pamięci spada z 600 MB do 27 MB.

Powinienem zauważyć, że oszukałem, poprawiając projekt:

  • Usunąłem Intz Msg, więc nie jest w dwóch miejscach.
  • Zamiast używać mapy od Ints do ByteStrings, użyłem a Sequencez ByteStrings i Intmyślę, że zamiast jednej na wiadomość, można to zrobić z jedną Intdla całości Sequence. Zakładając, że wiadomości nie mogą zostać ponownie uporządkowane, możesz użyć pojedynczego przesunięcia, aby przetłumaczyć wiadomość, którą chcesz, na miejsce, w którym znajduje się w kolejce.

(Dodałem dodatkową funkcję, getMsgaby to zademonstrować.)

{-# LANGUAGE BangPatterns #-}

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S

newtype Msg = Msg ByteString.ByteString

data Chan = Chan Int (Seq ByteString.ByteString)

message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))

maxSize :: Int
maxSize = 200000

pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
    Exception.evaluate $
        let newSize = 1 + S.length sq
            newSq = sq |> msgContent
        in
        if newSize <= maxSize
            then Chan offset newSq
            else
                case S.viewl newSq of
                    (_ :< newSq') -> Chan (offset+1) newSq'
                    S.EmptyL -> error "Can't happen"

getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
    where
    getMsg' i
        | i < 0            = Nothing
        | i >= S.length sq = Nothing
        | otherwise        = Just (Msg (S.index sq i))

main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])
John H.
źródło
4
Cześć! Dzięki za odpowiedź. Twoje wyniki zdecydowanie nadal pokazują liniowe spowolnienie, ale to całkiem interesujące, że masz takie przyspieszenie Data.Sequence- przetestowaliśmy to i stwierdziliśmy, że jest gorsze niż Data. Nie jestem pewien, jaka była różnica, więc będę musiał zbadać ...
jameshfisher
8

Jak wspomniano w innych odpowiedziach, moduł odśmiecania pamięci w GHC przetwarza dane na żywo, co oznacza, że ​​im dłużej przechowywane są dane w pamięci, tym dłuższe będą przerwy GC.

GHC 8.2

Aby częściowo rozwiązać ten problem, w GHC-8.2 wprowadzono funkcję zwaną regionami zwartymi . Jest to zarówno funkcja systemu uruchomieniowego GHC, jak i biblioteka udostępniająca wygodny interfejs do pracy. Kompaktowa funkcja regionów umożliwia umieszczanie danych w oddzielnym miejscu w pamięci, a GC nie będzie ich przechodzić podczas fazy zbierania elementów bezużytecznych. Więc jeśli masz dużą strukturę, którą chcesz zachować w pamięci, rozważ użycie zwartych regionów. Jednak sam zwarty region nie ma wewnątrz mini garbage collectora , działa lepiej w przypadku struktur danych tylko do dołączania , a nie w HashMapprzypadku, gdy chcesz również usunąć rzeczy. Chociaż możesz pokonać ten problem. Szczegółowe informacje można znaleźć w następującym poście na blogu:

GHC 8.10

Co więcej, od GHC-8.10 zaimplementowano nowy algorytm przyrostowego garbage collector o małym opóźnieniu . Jest to alternatywny algorytm GC, który nie jest domyślnie włączony, ale możesz się do niego włączyć, jeśli chcesz. Możesz więc zmienić domyślny GC na nowszy, aby automatycznie uzyskać funkcje dostarczane przez regiony kompaktowe bez konieczności ręcznego owijania i rozpakowywania. Jednak nowy GC nie jest srebrną kulą i nie rozwiązuje wszystkich problemów w sposób automagiczny i ma swoje kompromisy. Testy porównawcze nowego GC znajdują się w następującym repozytorium GitHub:

Shersh
źródło
3

Cóż, w GC znalazłeś ograniczenie języków: nie nadają się do hardkorowych systemów czasu rzeczywistego.

Masz 2 opcje:

1. Zwiększ rozmiar sterty i użyj dwupoziomowego systemu buforowania, najstarsze wiadomości są wysyłane na dysk, a najnowsze wiadomości przechowujesz w pamięci, możesz to zrobić za pomocą stronicowania systemu operacyjnego. Problem z tym rozwiązaniem polega jednak na tym, że stronicowanie może być kosztowne w zależności od możliwości odczytu używanej dodatkowej jednostki pamięci.

2. Zaprogramuj to rozwiązanie za pomocą 'C' i połącz je z FFI w haskell. W ten sposób możesz samodzielnie zarządzać pamięcią. To byłaby najlepsza opcja, ponieważ możesz samodzielnie kontrolować potrzebną pamięć.

Fernando Andreas Sahmkow Beico
źródło
1
Cześć Fernando. Dzięki za to. Nasz system działa tylko w „miękkim” czasie rzeczywistym, ale w naszym przypadku okazało się, że GC jest zbyt karzący, nawet w przypadku miękkiego czasu rzeczywistego. Zdecydowanie skłaniamy się ku rozwiązaniu nr 2.
jameshfisher