Unikanie wycieków pamięci dzięki Scalaz 7 zipWithIndex / group enumeratees

106

tło

Jak wspomniano w tym pytaniu , używam iteracji Scalaz 7 do przetwarzania dużego (tj. Nieograniczonego) strumienia danych w stałej przestrzeni sterty.

Mój kod wygląda tak:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

Problem

Wygląda na to, że napotkałem wyciek pamięci, ale nie jestem wystarczająco zaznajomiony ze Scalaz / FP, aby wiedzieć, czy błąd jest w Scalaz, czy w moim kodzie. Intuicyjnie oczekuję, że ten kod będzie wymagał tylko (rzędu) P- krotności Chunkspacji.

Uwaga: znalazłem podobne pytanie, w którym OutOfMemoryErrornapotkano, ale mój kod nie używa consume.

Testowanie

Przeprowadziłem kilka testów, aby spróbować wyodrębnić problem. Podsumowując, wyciek pojawia się tylko wtedy, gdy używane są oba zipWithIndexi group.

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Kod do testów:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

pytania

  • Czy błąd występuje w moim kodzie?
  • Jak mogę sprawić, by to działało w stałej przestrzeni sterty?
Aaron Novstrup
źródło
6
W końcu zgłosiłem to jako problem w Scalaz .
Aaron Novstrup
1
Nie będzie to żadna zabawa, ale możesz spróbować -XX:+HeapDumpOnOutOfMemoryErrorprzeanalizować zrzut za pomocą eclipse MAT eclipse.org/mat, aby zobaczyć, który wiersz kodu zachowuje tablice.
huynhjl
10
@huynhjl FWIW, próbowałem analizować stertę zarówno za pomocą JProfilera, jak i MAT, ale nie byłem w stanie przebrnąć przez wszystkie odniesienia do anonimowych klas funkcji itp. Scala naprawdę potrzebuje dedykowanych narzędzi do tego typu rzeczy.
Aaron Novstrup
A co, jeśli nie ma wycieku, a po prostu to, co robisz, wymaga szalenie rosnącej ilości pamięci? Możesz łatwo replikować zipWithIndex bez tej konkretnej konstrukcji FP, po prostu utrzymując varlicznik w trakcie.
Ezekiel Victor
@EzekielVictor Nie jestem pewien, czy rozumiem komentarz. Sugerujesz, że dodanie pojedynczego Longindeksu na porcję zmieniłoby algorytm ze stałego na niestały obszar sterty? Wersja niezapinana wyraźnie wykorzystuje stałe miejsce na stosie, ponieważ może „przetworzyć” tyle fragmentów, na które chcesz czekać.
Aaron Novstrup

Odpowiedzi:

4

Będzie to mała pociecha dla każdego, kto utknął w starszym iterateeAPI, ale niedawno sprawdziłem, że równoważny test przechodzi w stosunku do API scalaz-stream . To jest nowszy interfejs API przetwarzania strumienia, który ma zastąpić iteratee.

Aby uzyskać kompletność, oto kod testowy:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

Powinno to zadziałać z dowolną wartością nparametru (pod warunkiem, że chcesz czekać wystarczająco długo) - testowałem z 2 ^ 14 tablicami 32 MiB (czyli łącznie pół TiB pamięci przydzielonej w czasie).

Aaron Novstrup
źródło