Jak skonfigurować dostrojoną pulę wątków dla kontraktów terminowych?

80

Jak duża jest pula wątków Scali dla kontraktów terminowych?

Moja aplikacja Scala zarabia wiele milionów future {}i zastanawiam się, czy jest coś, co mogę zrobić, aby je zoptymalizować, konfigurując pulę wątków.

Dziękuję Ci.

Erik Kaplun
źródło
Slick 3.0 używa własnych połączeń i pula wątków więc dlaczego musimy zapewnić niejawny executioncontext do wycieku, gdy zarządza własną puli wątków
Rahul Gulabani
1
@RahulGulabani, z książki „essential slick”:The reason is that map, flatMap methods of Action allows you to call arbitrary code when joining the actions together. Slick cannot allow that code to be run on its own execution context, because it has no way to know if you are going to tie up Slicks threads for a long time.
srzhio,

Odpowiedzi:

90

Możesz określić własny ExecutionContext, w którym będą działać Twoje futures, zamiast importować globalny niejawny ExecutionContext.

import java.util.concurrent.Executors
import scala.concurrent._

implicit val ec = new ExecutionContext {
    val threadPool = Executors.newFixedThreadPool(1000)

    def execute(runnable: Runnable) {
        threadPool.submit(runnable)
    }

    def reportFailure(t: Throwable) {}
}
Josh Gao
źródło
73
Świetna odpowiedź, możesz nieco zredukować standardowy schemat, używając metod pomocniczych w ExecutionContext, które pozwalają na tworzenie instancji bezpośrednio z danego Executora. Np. Implicit val ec = ExecutionContext.fromExecutor (Executors.newFixedThreadPool (10))
sksamuel
1
To prawda, że ​​wszystko jest fajne, ale czy istnieje rzeczywisty limit wątków dotyczących implicits.global? Jeśli tak, to czy jest to konfigurowalne jak akka przez application.conf?
Nick
6
@Nick tak, implicits.global to tylko 1 wątek na rdzeń procesora. Optymalne do zadań związanych z procesorem. Ale dla klasycznego blokowania IO (np. Jdbc) jest to katastrofa wydajności.
Ben Hutchison
2
Musiałem dodać wywołanie, aby zamknąć pulę wątków po użyciu tego lub program nigdy się nie kończy ... def shutdown () = threadPool.shutdown ()
justinhj
2
Dlaczego zamyka się w „normalnym” przypadku, ale nie wtedy, gdy ustawimy niejawne na coś innego?
hbogert
152

Ta odpowiedź pochodzi z monkjack, komentarz z zaakceptowanej odpowiedzi. Można jednak przegapić tę świetną odpowiedź, więc zamieszczam ją tutaj.

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

Jeśli potrzebujesz tylko zmienić liczbę puli wątków, po prostu użyj globalnego modułu wykonawczego i przekaż następujące właściwości systemowe.

-Dscala.concurrent.context.numThreads=8 -Dscala.concurrent.context.maxThreads=8
Bienvenido David
źródło
2
to jest bardziej eleganckie rozwiązanie
Ben Hutchison
Wypróbowałem oba te z wartością 5 i nadal widzę do 8 wątków działających jednocześnie.
micseydel
3

najlepszy sposób na określenie Threadpool w Scala Futures:

implicit val ec = new ExecutionContext {
      val threadPool = Executors.newFixedThreadPool(conf.getInt("5"));
      override def reportFailure(cause: Throwable): Unit = {};
      override def execute(runnable: Runnable): Unit = threadPool.submit(runnable);
      def shutdown() = threadPool.shutdown();
    }
S'chn T'gai Spock
źródło
0
class ThreadPoolExecutionContext(val executionContext: ExecutionContext)

object ThreadPoolExecutionContext {

  val executionContextProvider: ThreadPoolExecutionContext = {
    try {
      val executionContextExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(25))
      new ThreadPoolExecutionContext(executionContextExecutor)
    } catch {
      case exception: Exception => {
        Log.error("Failed to create thread pool", exception)
        throw exception
      }
    }
  }
}
VAIBHAV GOUR
źródło