Opracowujemy program, który odbiera i przekazuje dalej „wiadomości”, zachowując tymczasową historię tych wiadomości, aby na żądanie mógł przekazać historię wiadomości. Wiadomości są identyfikowane numerycznie, zwykle mają rozmiar około 1 kilobajta i musimy przechowywać setki tysięcy takich wiadomości.
Chcemy zoptymalizować ten program pod kątem opóźnienia: czas między wysłaniem a odebraniem wiadomości musi być poniżej 10 milisekund.
Program został napisany w języku Haskell i skompilowany za pomocą GHC. Okazało się jednak, że przerwy w usuwaniu elementów bezużytecznych są zbyt długie, aby sprostać naszym wymaganiom dotyczącym opóźnienia: ponad 100 milisekund w naszym programie w świecie rzeczywistym.
Poniższy program jest uproszczoną wersją naszej aplikacji. Używa Data.Map.Strict
do przechowywania wiadomości. Wiadomości są ByteString
identyfikowane przez Int
. 1 000 000 wiadomości jest wstawianych w rosnącej kolejności numerycznej, a najstarsze wiadomości są stale usuwane, aby historia obejmowała maksymalnie 200 000 wiadomości.
module Main (main) where
import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map
data Msg = Msg !Int !ByteString.ByteString
type Chan = Map.Map Int ByteString.ByteString
message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))
pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if 200000 < Map.size inserted
then Map.deleteMin inserted
else inserted
main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
Skompilowaliśmy i uruchomiliśmy ten program przy użyciu:
$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
3,116,460,096 bytes allocated in the heap
385,101,600 bytes copied during GC
235,234,800 bytes maximum residency (14 sample(s))
124,137,808 bytes maximum slop
600 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 6558 colls, 0 par 0.238s 0.280s 0.0000s 0.0012s
Gen 1 14 colls, 0 par 0.179s 0.250s 0.0179s 0.0515s
INIT time 0.000s ( 0.000s elapsed)
MUT time 0.652s ( 0.745s elapsed)
GC time 0.417s ( 0.530s elapsed)
EXIT time 0.010s ( 0.052s elapsed)
Total time 1.079s ( 1.326s elapsed)
%GC time 38.6% (40.0% elapsed)
Alloc rate 4,780,213,353 bytes per MUT second
Productivity 61.4% of total user, 49.9% of total elapsed
Ważną metryką jest tutaj „maksymalna przerwa” wynosząca 0,0515 s lub 51 milisekund. Chcemy to zredukować przynajmniej o rząd wielkości.
Eksperymenty pokazują, że długość pauzy w GC jest określana przez liczbę wiadomości w historii. Zależność jest z grubsza liniowa lub być może superliniowa. Poniższa tabela przedstawia tę relację. ( Możesz zobaczyć nasze testy porównawcze tutaj , a niektóre wykresy tutaj ).
msgs history length max GC pause (ms)
=================== =================
12500 3
25000 6
50000 13
100000 30
200000 56
400000 104
800000 199
1600000 487
3200000 1957
6400000 5378
Eksperymentowaliśmy z kilkoma innymi zmiennymi, aby dowiedzieć się, czy mogą one zmniejszyć to opóźnienie, z których żadna nie robi dużej różnicy. Wśród tych nieistotnych zmiennych są: optymalizacja ( -O
, -O2
); Opcje RTS GC ( -G
, -H
, -A
, -c
), liczba rdzeni ( -N
), różne struktury danych ( Data.Sequence
), rozmiar wiadomości, a ilość generowanych śmieci krótkotrwały. Decydującym czynnikiem jest liczba wiadomości w historii.
Nasza teoria robocza mówi, że przerwy są liniowe w liczbie komunikatów, ponieważ każdy cykl GC musi przejść przez całą dostępną pamięć roboczą i skopiować ją, co jest wyraźnie operacjami liniowymi.
Pytania:
- Czy ta teoria czasu liniowego jest poprawna? Czy długość przerw GC można wyrazić w ten prosty sposób, czy rzeczywistość jest bardziej złożona?
- Jeśli pauza GC jest liniowa w pamięci roboczej, czy istnieje sposób na zredukowanie stałych czynników?
- Czy są jakieś opcje przyrostowego GC lub coś podobnego? Możemy zobaczyć tylko artykuły naukowe. Jesteśmy bardzo chętni do wymiany przepustowości na mniejsze opóźnienia.
- Czy istnieją sposoby na „partycjonowanie” pamięci na mniejsze cykle GC, inne niż dzielenie na wiele procesów?
źródło
COntrol.Concurrent.Chan
na przykład nie używasz ? Mutowalne obiekty zmieniają równanie)? Proponuję zacząć od upewnienia się, że wiesz, jakie śmieci generujesz i robisz ich jak najmniej (np. Upewnij się, że fuzja się dzieje, spróbuj-funbox-strict
). Może spróbuj użyć biblioteki strumieniowej (iostreams, pipe, conduit, streaming) i dzwonićperformGC
bezpośrednio w częstszych odstępach czasu.MutableByteArray
; GC w ogóle nie będzie w tym przypadku zaangażowany)Odpowiedzi:
W rzeczywistości robisz całkiem nieźle, mając czas przerwy 51 ms z ponad 200 MB danych na żywo. System, nad którym pracuję, ma dłuższy maksymalny czas pauzy z połową ilości danych na żywo.
Twoje założenie jest poprawne, główny czas przerwy w GC jest wprost proporcjonalny do ilości danych w czasie rzeczywistym i niestety nie ma sposobu na obejście tego z GHC w obecnym stanie. W przeszłości eksperymentowaliśmy z przyrostowym GC, ale był to projekt badawczy i nie osiągnęliśmy poziomu dojrzałości potrzebnego do złożenia go w uwolnionym GHC.
Mamy nadzieję, że pomoże w tym w przyszłości w kompaktowych regionach: https://phabricator.haskell.org/D1264 . Jest to rodzaj ręcznego zarządzania pamięcią, w którym kompaktujesz strukturę w stercie, a GC nie musi przez nią przechodzić. Działa najlepiej w przypadku danych o długiej żywotności, ale być może będzie wystarczająco dobry, aby używać go do pojedynczych wiadomości w Twoim ustawieniu. Chcemy mieć to w GHC 8.2.0.
Jeśli jesteś w środowisku rozproszonym i masz jakiś system równoważenia obciążenia, możesz skorzystać ze sztuczek, aby uniknąć uderzenia pauzy, w zasadzie upewnij się, że moduł równoważenia obciążenia nie wysyła żądań do maszyn, które mają zamiar wykonaj główny GC i oczywiście upewnij się, że maszyna nadal kończy GC, nawet jeśli nie otrzymuje żądań.
źródło
performGC
? (2) Dlaczego zagęszczanie z użyciem-c
działa gorzej - przypuszczamy, że nie znajduje wielu rzeczy, które może pozostawić na miejscu? (3) Czy jest więcej szczegółów na temat kompaktów? Brzmi bardzo interesująco, ale niestety to trochę za daleko w przyszłości, abyśmy mogli to rozważyć.Wypróbowałem Twój fragment kodu z podejściem buforowania pierścienia, używając
IOVector
jako podstawowej struktury danych. W moim systemie (GHC 7.10.3, te same opcje kompilacji) spowodowało to skrócenie maksymalnego czasu (metryki, którą wspomniałeś w swoim OP) o ~ 22%.NB. Zrobiłem tutaj dwa założenia:
Z dodatkowymi
Int
parametrami i arytmetyką (na przykład gdy messageId są resetowane do 0 lubminBound
) powinno być łatwo określić, czy dana wiadomość jest nadal w historii i pobrać ją z odpowiedniego indeksu w buforze pierścieniowym.Dla przyjemności testowania:
import qualified Control.Exception as Exception import qualified Control.Monad as Monad import qualified Data.ByteString as ByteString import qualified Data.Map.Strict as Map import qualified Data.Vector.Mutable as Vector data Msg = Msg !Int !ByteString.ByteString type Chan = Map.Map Int ByteString.ByteString data Chan2 = Chan2 { next :: !Int , maxId :: !Int , ringBuffer :: !(Vector.IOVector ByteString.ByteString) } chanSize :: Int chanSize = 200000 message :: Int -> Msg message n = Msg n (ByteString.replicate 1024 (fromIntegral n)) newChan2 :: IO Chan2 newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize pushMsg2 :: Chan2 -> Msg -> IO Chan2 pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) = let ix' = if ix == chanSize then 0 else ix + 1 in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store) pushMsg :: Chan -> Msg -> IO Chan pushMsg chan (Msg msgId msgContent) = Exception.evaluate $ let inserted = Map.insert msgId msgContent chan in if chanSize < Map.size inserted then Map.deleteMin inserted else inserted main, main1, main2 :: IO () main = main2 main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000]) main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
źródło
IOVector
i (niezmienne, GC'd) w każdym indeksie. Obecnie badamy opcje ponownego wdrożenia przy użyciu zmiennych struktur. Prawdopodobnie będzie podobny do twojego systemu bufora pierścieniowego. Ale przenosimy to całkowicie poza przestrzeń pamięci Haskell, aby wykonać własne ręczne zarządzanie pamięcią.Muszę się zgodzić z innymi - jeśli masz trudne ograniczenia w czasie rzeczywistym, używanie języka GC nie jest idealne.
Możesz jednak rozważyć eksperymentowanie z innymi dostępnymi strukturami danych zamiast tylko Data.Map.
Przepisałem go za pomocą Data.Sequence i otrzymałem kilka obiecujących ulepszeń:
msgs history length max GC pause (ms) =================== ================= 12500 0.7 25000 1.4 50000 2.8 100000 5.4 200000 10.9 400000 21.8 800000 46 1600000 87 3200000 175 6400000 350
Mimo że optymalizujesz pod kątem opóźnienia, zauważyłem, że poprawiły się również inne wskaźniki. W przypadku 200000 czas wykonania spada z 1,5 s do 0,2 s, a całkowite użycie pamięci spada z 600 MB do 27 MB.
Powinienem zauważyć, że oszukałem, poprawiając projekt:
Int
zMsg
, więc nie jest w dwóch miejscach.Int
s doByteString
s, użyłem aSequence
zByteString
s iInt
myślę, że zamiast jednej na wiadomość, można to zrobić z jednąInt
dla całościSequence
. Zakładając, że wiadomości nie mogą zostać ponownie uporządkowane, możesz użyć pojedynczego przesunięcia, aby przetłumaczyć wiadomość, którą chcesz, na miejsce, w którym znajduje się w kolejce.(Dodałem dodatkową funkcję,
getMsg
aby to zademonstrować.){-# LANGUAGE BangPatterns #-} import qualified Control.Exception as Exception import qualified Control.Monad as Monad import qualified Data.ByteString as ByteString import Data.Sequence as S newtype Msg = Msg ByteString.ByteString data Chan = Chan Int (Seq ByteString.ByteString) message :: Int -> Msg message n = Msg (ByteString.replicate 1024 (fromIntegral n)) maxSize :: Int maxSize = 200000 pushMsg :: Chan -> Msg -> IO Chan pushMsg (Chan !offset sq) (Msg msgContent) = Exception.evaluate $ let newSize = 1 + S.length sq newSq = sq |> msgContent in if newSize <= maxSize then Chan offset newSq else case S.viewl newSq of (_ :< newSq') -> Chan (offset+1) newSq' S.EmptyL -> error "Can't happen" getMsg :: Chan -> Int -> Maybe Msg getMsg (Chan offset sq) i_ = getMsg' (i_ - offset) where getMsg' i | i < 0 = Nothing | i >= S.length sq = Nothing | otherwise = Just (Msg (S.index sq i)) main :: IO () main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])
źródło
Data.Sequence
- przetestowaliśmy to i stwierdziliśmy, że jest gorsze niż Data. Nie jestem pewien, jaka była różnica, więc będę musiał zbadać ...Jak wspomniano w innych odpowiedziach, moduł odśmiecania pamięci w GHC przetwarza dane na żywo, co oznacza, że im dłużej przechowywane są dane w pamięci, tym dłuższe będą przerwy GC.
GHC 8.2
Aby częściowo rozwiązać ten problem, w GHC-8.2 wprowadzono funkcję zwaną regionami zwartymi . Jest to zarówno funkcja systemu uruchomieniowego GHC, jak i biblioteka udostępniająca wygodny interfejs do pracy. Kompaktowa funkcja regionów umożliwia umieszczanie danych w oddzielnym miejscu w pamięci, a GC nie będzie ich przechodzić podczas fazy zbierania elementów bezużytecznych. Więc jeśli masz dużą strukturę, którą chcesz zachować w pamięci, rozważ użycie zwartych regionów. Jednak sam zwarty region nie ma wewnątrz mini garbage collectora , działa lepiej w przypadku struktur danych tylko do dołączania , a nie w
HashMap
przypadku, gdy chcesz również usunąć rzeczy. Chociaż możesz pokonać ten problem. Szczegółowe informacje można znaleźć w następującym poście na blogu:GHC 8.10
Co więcej, od GHC-8.10 zaimplementowano nowy algorytm przyrostowego garbage collector o małym opóźnieniu . Jest to alternatywny algorytm GC, który nie jest domyślnie włączony, ale możesz się do niego włączyć, jeśli chcesz. Możesz więc zmienić domyślny GC na nowszy, aby automatycznie uzyskać funkcje dostarczane przez regiony kompaktowe bez konieczności ręcznego owijania i rozpakowywania. Jednak nowy GC nie jest srebrną kulą i nie rozwiązuje wszystkich problemów w sposób automagiczny i ma swoje kompromisy. Testy porównawcze nowego GC znajdują się w następującym repozytorium GitHub:
źródło
Cóż, w GC znalazłeś ograniczenie języków: nie nadają się do hardkorowych systemów czasu rzeczywistego.
Masz 2 opcje:
1. Zwiększ rozmiar sterty i użyj dwupoziomowego systemu buforowania, najstarsze wiadomości są wysyłane na dysk, a najnowsze wiadomości przechowujesz w pamięci, możesz to zrobić za pomocą stronicowania systemu operacyjnego. Problem z tym rozwiązaniem polega jednak na tym, że stronicowanie może być kosztowne w zależności od możliwości odczytu używanej dodatkowej jednostki pamięci.
2. Zaprogramuj to rozwiązanie za pomocą 'C' i połącz je z FFI w haskell. W ten sposób możesz samodzielnie zarządzać pamięcią. To byłaby najlepsza opcja, ponieważ możesz samodzielnie kontrolować potrzebną pamięć.
źródło