Załóżmy, że mam kilka przyszłości i muszę poczekać, aż którakolwiek z nich zawiedzie lub wszystkie odniosą sukces.
Na przykład niech istnieją 3 futures: f1
, f2
, f3
.
Jeśli się
f1
powiedzie if2
zawiedzie, nie czekam naf3
(i zwracam błąd klientowi).Jeśli
f2
zawiedzie podczasf1
if3
nadal działają, nie czekam na nie (i zwracam błąd )Jeśli się
f1
powiedzie, a potem sięf2
powiedzie, nadal czekamf3
.
Jak byś to zaimplementował?
scala
concurrency
future
Michał
źródło
źródło
Odpowiedzi:
Zamiast tego możesz użyć zrozumienia w następujący sposób:
val fut1 = Future{...} val fut2 = Future{...} val fut3 = Future{...} val aggFut = for{ f1Result <- fut1 f2Result <- fut2 f3Result <- fut3 } yield (f1Result, f2Result, f3Result)
W tym przykładzie futures 1, 2 i 3 są uruchamiane równolegle. Następnie, w celu zrozumienia, czekamy, aż dostępne będą wyniki 1, a następnie 2 i 3. Jeśli 1 lub 2 zawiedzie, nie będziemy już czekać na 3. Jeśli wszystkie 3 się powiedzie,
aggFut
val będzie trzymał krotkę z 3 miejscami, odpowiadającymi wynikom 3 futures.Teraz, jeśli potrzebujesz zachowania, w którym chcesz przestać czekać, jeśli powiedz, że fut2 zawodzi jako pierwszy, sprawy stają się trochę trudniejsze. W powyższym przykładzie musisz poczekać, aż fut1 się zakończy, zanim zorientujesz się, że fut2 się nie powiodło. Aby rozwiązać ten problem, możesz spróbować czegoś takiego:
val fut1 = Future{Thread.sleep(3000);1} val fut2 = Promise.failed(new RuntimeException("boo")).future val fut3 = Future{Thread.sleep(1000);3} def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = { val fut = if (futures.size == 1) futures.head._2 else Future.firstCompletedOf(futures.values) fut onComplete{ case Success(value) if (futures.size == 1)=> prom.success(value :: values) case Success(value) => processFutures(futures - value, value :: values, prom) case Failure(ex) => prom.failure(ex) } prom.future } val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]()) aggFut onComplete{ case value => println(value) }
Teraz działa to poprawnie, ale problem polega na tym, że wiemy, które
Future
z nich usunąć zMap
pomyślnego ukończenia. Tak długo, jak masz jakiś sposób na poprawne skorelowanie wyniku z Przyszłością, która zrodziła ten wynik, wtedy coś takiego działa. Po prostu rekurencyjnie usuwa ukończone kontrakty futures z mapy, a następnie wywołujeFuture.firstCompletedOf
pozostałe,Futures
dopóki ich nie ma, zbierając wyniki po drodze. To nie jest ładne, ale jeśli naprawdę potrzebujesz zachowania, o którym mówisz, to lub coś podobnego może zadziałać.źródło
fut2
wcześniej się nie udafut1
? Czyfut1
w takim razie nadal będziemy czekać ? Jeśli będziemy, to nie jest dokładnie to, czego chcę.onFailure
obsługi zafut2
szybko się nie uda, aonSuccess
naaggFut
sukces uchwytu. Sukces naaggFut
impliesfut2
zakończył się pomyślnie, więc masz tylko jednego z wywoływanych programów obsługi.Możesz użyć obietnicy i wysłać do niej albo pierwszą porażkę, albo ostatni zakończony, zagregowany sukces:
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { val p = Promise[M[A]]() // the first Future to fail completes the promise in.foreach(_.onFailure{case i => p.tryFailure(i)}) // if the whole sequence succeeds (i.e. no failures) // then the promise is completed with the aggregated success Future.sequence(in).foreach(p trySuccess _) p.future }
Następnie możesz to zrobić
Await
,Future
jeśli chcesz zablokować, lub po prostumap
zrobić coś innego.Różnica w przypadku dla zrozumienia polega na tym, że tutaj otrzymujesz błąd pierwszego niepowodzenia, podczas gdy w przypadku zrozumienia otrzymujesz pierwszy błąd w kolejności przechodzenia kolekcji wejściowej (nawet jeśli pierwszy zawiódł). Na przykład:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)} // this waits one second, then prints "java.lang.ArithmeticException: / by zero" // the first to fail in traversal order
I:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)} // this immediately prints "java.util.NoSuchElementException: None.get" // the 'actual' first to fail (usually...) // and it returns early (it does not wait 1 sec)
źródło
Oto rozwiązanie bez użycia aktorów.
import scala.util._ import scala.concurrent._ import java.util.concurrent.atomic.AtomicInteger // Nondeterministic. // If any failure, return it immediately, else return the final success. def allSucceed[T](fs: Future[T]*): Future[T] = { val remaining = new AtomicInteger(fs.length) val p = promise[T] fs foreach { _ onComplete { case s @ Success(_) => { if (remaining.decrementAndGet() == 0) { // Arbitrarily return the final success p tryComplete s } } case f @ Failure(_) => { p tryComplete f } } } p.future }
źródło
Możesz to zrobić tylko z futures. Oto jedna implementacja. Zauważ, że nie spowoduje to wcześniejszego zakończenia wykonywania! W takim przypadku musisz zrobić coś bardziej wyrafinowanego (i prawdopodobnie samodzielnie zaimplementować przerwę). Ale jeśli po prostu nie chcesz czekać na coś, co nie zadziała, kluczem jest czekanie na zakończenie pierwszej rzeczy i zatrzymanie się, gdy nic nie zostanie lub trafisz w wyjątek:
import scala.annotation.tailrec import scala.util.{Try, Success, Failure} import scala.concurrent._ import scala.concurrent.duration.Duration import ExecutionContext.Implicits.global @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): Either[Throwable, Seq[A]] = { val first = Future.firstCompletedOf(fs) Await.ready(first, Duration.Inf).value match { case None => awaitSuccess(fs, done) // Shouldn't happen! case Some(Failure(e)) => Left(e) case Some(Success(_)) => val (complete, running) = fs.partition(_.isCompleted) val answers = complete.flatMap(_.value) answers.find(_.isFailure) match { case Some(Failure(e)) => Left(e) case _ => if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done) else Right( answers.map(_.get) ++: done ) } } }
Oto przykład tego w akcji, gdy wszystko działa dobrze:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); println("Fancy meeting you here!") }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! Fancy meeting you here! Bye! res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
Ale kiedy coś pójdzie nie tak:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); throw new Exception("boo"); () }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo) scala> Bye!
źródło
W tym celu użyłbym aktora Akka. W przeciwieństwie do zrozumienia, zawodzi, gdy tylko zawodzi któraś z przyszłości, więc jest nieco bardziej wydajna w tym sensie.
class ResultCombiner(futs: Future[_]*) extends Actor { var origSender: ActorRef = null var futsRemaining: Set[Future[_]] = futs.toSet override def receive = { case () => origSender = sender for(f <- futs) f.onComplete(result => self ! if(result.isSuccess) f else false) case false => origSender ! SomethingFailed case f: Future[_] => futsRemaining -= f if(futsRemaining.isEmpty) origSender ! EverythingSucceeded } } sealed trait Result case object SomethingFailed extends Result case object EverythingSucceeded extends Result
Następnie stwórz aktora, wyślij do niego wiadomość (aby wiedział, do kogo wysłać odpowiedź) i poczekaj na odpowiedź.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3))) try { val f4: Future[Result] = actor ? () implicit val timeout = new Timeout(30 seconds) // or whatever Await.result(f4, timeout.duration).asInstanceOf[Result] match { case SomethingFailed => println("Oh noes!") case EverythingSucceeded => println("It all worked!") } } finally { // Avoid memory leaks: destroy the actor actor ! PoisonPill }
źródło
Odpowiedź na to pytanie została udzielona, ale publikuję moje rozwiązanie klasy wartości (klasy wartości zostały dodane w 2.10), ponieważ nie ma tutaj żadnego. Nie krępuj się krytykować.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def concurrently = ConcurrentFuture(self) } case class ConcurrentFuture[A](future: Future[A]) extends AnyVal { def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future)) def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class } def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = { val p = Promise[B]() val inner = f(outer.future) inner.future onFailure { case t => p.tryFailure(t) } outer.future onFailure { case t => p.tryFailure(t) } inner.future onSuccess { case b => p.trySuccess(b) } ConcurrentFuture(p.future) }
ConcurrentFuture to niepotrzebne opakowanie Future, które zmienia domyślną mapę przyszłości / flatMap z `` wykonaj to-to-tam '' na połączenie wszystkiego i niepowodzenia. Stosowanie:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 } def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" } def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 } val f : Future[(Int,String,Double)] = { for { f1 <- func1.concurrently f2 <- func2.concurrently f3 <- func3.concurrently } yield for { v1 <- f1 v2 <- f2 v3 <- f3 } yield (v1,v2,v3) }.future f.onFailure { case t => println("future failed $t") }
W powyższym przykładzie, f1, f2 i f3 będą działać jednocześnie, a jeśli jakikolwiek błąd w dowolnej kolejności, przyszłość krotki zakończy się natychmiastową porażką.
źródło
Możesz sprawdzić Future API Twittera. W szczególności metoda Future.collect. Robi dokładnie to, co chcesz: https://twitter.github.io/scala_school/finagle.html
Kod źródłowy Future.scala jest dostępny tutaj: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
źródło
Możesz użyć tego:
val l = List(1, 6, 8) val f = l.map{ i => future { println("future " +i) Thread.sleep(i* 1000) if (i == 12) throw new Exception("6 is not legal.") i } } val f1 = Future.sequence(f) f1 onSuccess{ case l => { logInfo("onSuccess") l.foreach(i => { logInfo("h : " + i) }) } } f1 onFailure{ case l => { logInfo("onFailure") }
źródło