Kiedy należy użyć TaskCompletionSource <T>?

199

AFAIK, wszystko, co wie, to to, że w pewnym momencie wywoływana jest jego metoda SetResultlub SetExceptionmetoda w celu uzupełnienia Task<T>ekspozycji poprzez swoją Taskwłaściwość.

Innymi słowy, działa jako producent dla Task<TResult>i jego ukończenia.

Widziałem tutaj przykład:

Jeśli potrzebuję sposobu, aby wykonać Func asynchronicznie i mam zadanie reprezentujące tę operację.

public static Task<T> RunAsync<T>(Func<T> function) 
{ 
    if (function == null) throw new ArgumentNullException(“function”); 
    var tcs = new TaskCompletionSource<T>(); 
    ThreadPool.QueueUserWorkItem(_ => 
    { 
        try 
        {  
            T result = function(); 
            tcs.SetResult(result);  
        } 
        catch(Exception exc) { tcs.SetException(exc); } 
    }); 
    return tcs.Task; 
}

Które mogłyby zostać wykorzystane * jeśli nie mam Task.Factory.StartNew- Ale ja nie mam Task.Factory.StartNew.

Pytanie:

Może ktoś proszę wyjaśnić na przykładzie scenariusza związanego bezpośrednio do TaskCompletionSource i nie do hipotetycznej sytuacji, w której nie mam Task.Factory.StartNew?

Royi Namir
źródło
5
TaskCompletionSource służy głównie do owijania interfejsu API asynchronicznego opartego na zdarzeniach za pomocą Task bez tworzenia nowych wątków.
Arvis

Odpowiedzi:

230

Najczęściej go używam, gdy dostępny jest tylko interfejs API oparty na zdarzeniu ( na przykład gniazda Windows Phone 8 ):

public Task<Args> SomeApiWrapper()
{
    TaskCompletionSource<Args> tcs = new TaskCompletionSource<Args>(); 

    var obj = new SomeApi();

    // will get raised, when the work is done
    obj.Done += (args) => 
    {
        // this will notify the caller 
        // of the SomeApiWrapper that 
        // the task just completed
        tcs.SetResult(args);
    }

    // start the work
    obj.Do();

    return tcs.Task;
}

Jest to szczególnie przydatne, gdy jest używane razem ze asyncsłowem kluczowym C # 5 .

GameScripting
źródło
4
czy możesz napisać słowami, co tu widzimy? czy to tak, jakby SomeApiWrappergdzieś na to czekało, dopóki wydawca nie podniesie zdarzenia, które spowoduje zakończenie tego zadania?
Royi Namir
spójrz na link, który właśnie dodałem
GameScripting
6
Po prostu aktualizacja, Microsoft wydał Microsoft.Bcl.Asyncpakiet na NuGet, który pozwala na async/awaitsłowa kluczowe w projektach .NET 4.0 (zalecane jest VS2012 i wyższe).
Erik
1
@ Fran_gg7 możesz użyć CancellationToken, patrz msdn.microsoft.com/en-us/library/dd997396(v=vs.110).aspx lub jako nowe pytanie tutaj na stackoverflow
GameScripting
1
Problem z tą implementacją polega na tym, że generuje to wyciek pamięci, ponieważ zdarzenie nigdy nie jest zwalniane z obj.Done
Walter Vehoeven
78

Z moich doświadczeń TaskCompletionSourcedoskonale nadaje się do zawijania starych wzorów asynchronicznych do nowoczesnego async/awaitwzoru.

Najbardziej korzystnym przykładem, jaki mogę wymyślić, jest praca z nim Socket. Ma starą APM i wzory PW, ale nie te awaitable Taskmetody, które TcpListeneri TcpClientmieć.

Osobiście mam kilka problemów z NetworkStreamklasą i wolę surowe Socket. Ponieważ uwielbiam ten async/awaitwzór, stworzyłem klasę rozszerzeń, SocketExtenderktóra tworzy kilka metod rozszerzenia Socket.

Wszystkie te metody wykorzystują TaskCompletionSource<T>do owijania wywołań asynchronicznych:

    public static Task<Socket> AcceptAsync(this Socket socket)
    {
        if (socket == null)
            throw new ArgumentNullException("socket");

        var tcs = new TaskCompletionSource<Socket>();

        socket.BeginAccept(asyncResult =>
        {
            try
            {
                var s = asyncResult.AsyncState as Socket;
                var client = s.EndAccept(asyncResult);

                tcs.SetResult(client);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }

        }, socket);

        return tcs.Task;
    }

I przekazać socketdo BeginAcceptmetody, tak aby uzyskać niewielki wzrost wydajności z kompilator nie trzeba podnosić lokalny parametr.

Potem piękno tego wszystkiego:

 var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 listener.Bind(new IPEndPoint(IPAddress.Loopback, 2610));
 listener.Listen(10);

 var client = await listener.AcceptAsync();
Erik
źródło
1
Dlaczego Task.Factory.StartNew nie działałoby tutaj?
Tola Odejayi
23
@Tola Ponieważ utworzyłoby to nowe zadanie działające w wątku puli wątków, ale powyższy kod wykorzystuje wątek zakończenia we / wy rozpoczęty przez BeginAccept, iow: nie uruchamia nowego wątku.
Frans Bouma,
4
Dzięki, Frans-Bouma. Więc TaskCompletionSource to przydatny sposób konwersji kodu, który używa instrukcji Begin ... End ... w zadaniu?
Tola Odejayi
3
@TolaOdejayi Trochę spóźniona odpowiedź, ale tak, to jeden z głównych przypadków użycia, jakie dla niej znalazłem. Działa wspaniale w przypadku tego przejścia kodu.
Erik
4
Spójrz na TaskFactory <TResult> .FromAsync, aby zawinąć Begin.. End...instrukcje.
MicBig,
37

Dla mnie klasycznym scenariuszem użycia TaskCompletionSourcejest sytuacja, w której moja metoda niekoniecznie musi wykonywać czasochłonną operację. Pozwala nam to wybrać konkretne przypadki, w których chcielibyśmy użyć nowego wątku.

Dobrym przykładem tego jest użycie pamięci podręcznej. Możesz mieć GetResourceAsyncmetodę, która szuka w pamięci podręcznej żądanego zasobu i zwraca natychmiast (bez użycia nowego wątku, przy użyciu TaskCompletionSource), jeśli zasób został znaleziony. Tylko jeśli zasób nie został znaleziony, chcielibyśmy użyć nowego wątku i pobrać go za pomocą Task.Run().

Przykład kodu można zobaczyć tutaj: Jak warunkowo uruchomić kod asynchronicznie za pomocą zadań

Adi Lester
źródło
Widziałem twoje pytanie i odpowiedź. (spójrz na mój komentarz do odpowiedzi) .... :-) i rzeczywiście jest to pytanie edukacyjne i odpowiedź.
Royi Namir,
11
W rzeczywistości nie jest to sytuacja, w której potrzebny jest TCS. Możesz tego po prostu użyć Task.FromResult. Oczywiście, jeśli używasz 4.0 i nie masz Task.FromResulttego, do czego użyjesz TCS, to napisz własne FromResult .
Servy
@Servy Task.FromResultjest dostępny tylko od .NET 4.5. Wcześniej był to sposób na osiągnięcie tego zachowania.
Adi Lester
@AdiLester Twoja odpowiedź odnosi się Task.Run, wskazując, że jest w wersji 4.5+. I mój poprzedni komentarz konkretnie dotyczył .NET 4.0.
Servy
@Servy Nie wszyscy czytający tę odpowiedź są kierowani na .NET 4.5+. Uważam, że jest to dobra i poprawna odpowiedź, która pomaga ludziom zadać pytanie OP (które przy okazji jest oznaczone .NET-4.0). Tak czy inaczej, głosowanie w dół wydaje mi się trochę za duże, ale jeśli naprawdę uważasz, że zasługuje na głosowanie, to idź dalej.
Adi Lester
25

W tym wpisie na blogu Levi Botelho opisuje, jak użyć TaskCompletionSourcedo napisania asynchronicznego opakowania dla procesu, abyś mógł go uruchomić i poczekać na jego zakończenie.

public static Task RunProcessAsync(string processPath)
{
    var tcs = new TaskCompletionSource<object>();
    var process = new Process
    {
        EnableRaisingEvents = true,
        StartInfo = new ProcessStartInfo(processPath)
        {
            RedirectStandardError = true,
            UseShellExecute = false
        }
    };
    process.Exited += (sender, args) =>
    {
        if (process.ExitCode != 0)
        {
            var errorMessage = process.StandardError.ReadToEnd();
            tcs.SetException(new InvalidOperationException("The process did not exit correctly. " +
                "The corresponding error message was: " + errorMessage));
        }
        else
        {
            tcs.SetResult(null);
        }
        process.Dispose();
    };
    process.Start();
    return tcs.Task;
}

i jego użycie

await RunProcessAsync("myexecutable.exe");
Sarin
źródło
14

Wygląda na to nikt nie wspomniał, ale myślę, że testy jednostkowe mogą być uznane za prawdziwe życie wystarczy.

Uważam TaskCompletionSourcebyć przydatna, gdy wyśmianie zależność z metody asynchronicznej.

W aktualnie testowanym programie:

public interface IEntityFacade
{
  Task<Entity> GetByIdAsync(string id);
}

W testach jednostkowych:

// set up mock dependency (here with NSubstitute)

TaskCompletionSource<Entity> queryTaskDriver = new TaskCompletionSource<Entity>();

IEntityFacade entityFacade = Substitute.For<IEntityFacade>();

entityFacade.GetByIdAsync(Arg.Any<string>()).Returns(queryTaskDriver.Task);

// later on, in the "Act" phase

private void When_Task_Completes_Successfully()
{
  queryTaskDriver.SetResult(someExpectedEntity);
  // ...
}

private void When_Task_Gives_Error()
{
  queryTaskDriver.SetException(someExpectedException);
  // ...
}

W końcu to użycie TaskCompletionSource wydaje się kolejnym przypadkiem „obiektu Task, który nie wykonuje kodu”.

superjos
źródło
11

TaskCompletionSource służy do tworzenia obiektów zadań , które nie wykonują kodu. W rzeczywistych scenariuszach TaskCompletionSource jest idealny do operacji związanych z operacjami we / wy. W ten sposób uzyskasz wszystkie korzyści z zadań (np. Zwracane wartości, kontynuacje itp.) Bez blokowania wątku na czas trwania operacji. Jeśli twoja „funkcja” jest operacją związaną z We / Wy, nie jest zalecane blokowanie wątku za pomocą nowego zadania . Zamiast tego za pomocą TaskCompletionSource można utworzyć zadanie podrzędne, aby wskazać, kiedy zakończy się operacja związana z operacjami we / wy lub wystąpi błąd.

v1p3r
źródło
5

W tym poście z bloga „Parallel Programming with .NET” znajduje się przykład z prawdziwego świata . Naprawdę powinieneś to przeczytać, ale i tak jest to streszczenie.

W blogu przedstawiono dwie implementacje:

„fabryczna metoda tworzenia„ opóźnionych ”zadań, które nie zostaną zaplanowane, dopóki nie upłynie limit czasu dostarczony przez użytkownika.”

Pierwsza pokazana implementacja oparta jest na Task<>dwóch głównych wadach i ma dwie wady. Drugi post dotyczący implementacji pozwala na ich złagodzenie za pomocą TaskCompletionSource<>.

Oto druga implementacja:

public static Task StartNewDelayed(int millisecondsDelay, Action action)
{
    // Validate arguments
    if (millisecondsDelay < 0)
        throw new ArgumentOutOfRangeException("millisecondsDelay");
    if (action == null) throw new ArgumentNullException("action");

    // Create a trigger used to start the task
    var tcs = new TaskCompletionSource<object>();

    // Start a timer that will trigger it
    var timer = new Timer(
        _ => tcs.SetResult(null), null, millisecondsDelay, Timeout.Infinite);

    // Create and return a task that will be scheduled when the trigger fires.
    return tcs.Task.ContinueWith(_ =>
    {
        timer.Dispose();
        action();
    });
}
Urig
źródło
lepiej byłoby użyć funkcji czekaj na tcs.Task, a następnie użyj akcji () po
Royi Namir
5
ponieważ wracasz do kontekstu, w którym opuściłeś, gdzie Continuewith nie zachowuje kontekstu. (nie domyślnie) również, jeśli następna instrukcja w akcji () powoduje wyjątek, trudno byłoby go złapać tam, gdzie użycie funkcji wyczekiwania wyświetli jako zwykły wyjątek.
Royi Namir
3
Dlaczego nie tylko await Task.Delay(millisecondsDelay); action(); return;lub (w .Net 4.0)return Task.Delay(millisecondsDelay).ContinueWith( _ => action() );
sgnsajgon
@sgnsajgon, które z pewnością łatwiej byłoby czytać i utrzymywać
JwJosefy
@JwJosefy W rzeczywistości metoda Task.Delay może być zaimplementowana przy użyciu TaskCompletionSource , podobnie jak powyższy kod. Prawdziwa implementacja jest tutaj: Task.cs
sgnsajgon
4

Może to upraszczać rzeczy, ale źródło TaskCompletion pozwala oczekiwać na zdarzenie. Ponieważ tcs.SetResult jest ustawiany tylko po wystąpieniu zdarzenia, dzwoniący może oczekiwać na zadanie.

Obejrzyj ten film, aby uzyskać więcej informacji:

http://channel9.msdn.com/Series/Three-Essential-Tips-for-Async/Lucian03-TipsForAsyncThreadsAndDatabinding

nmishr
źródło
1
W tym miejscu umieść odpowiedni kod lub dokumentację, ponieważ linki mogą się zmieniać w czasie i sprawić, że ta odpowiedź nie będzie miała znaczenia.
rfornal
3

Rzeczywisty scenariusz, w którym korzystałem, TaskCompletionSourcedotyczy implementacji kolejki pobierania. W moim przypadku, jeśli użytkownik rozpocznie 100 pobrań, nie chcę uruchamiać ich wszystkich jednocześnie, więc zamiast zwracać zlecenie strate, zwracam zadanie dołączone TaskCompletionSource. Po zakończeniu pobierania wątek, który działa, kolejka wykonuje zadanie.

Kluczową koncepcją jest to, że rozłączam się, gdy klient prosi o uruchomienie zadania od momentu, w którym faktycznie się zaczyna. W tym przypadku, ponieważ nie chcę, aby klient miał do czynienia z zarządzaniem zasobami.

Zauważ, że możesz używać asynchronizacji / oczekiwania na .NET 4, o ile używasz kompilatora C # 5 (VS 2012+), patrz tutaj po więcej szczegółów.

Yaur
źródło
0

Zwykłem TaskCompletionSourceuruchamiać zadanie, dopóki nie zostanie anulowane. W tym przypadku jest to subskrybent ServiceBus, który normalnie chcę uruchamiać tak długo, jak długo działa aplikacja.

public async Task RunUntilCancellation(
    CancellationToken cancellationToken,
    Func<Task> onCancel)
{
    var doneReceiving = new TaskCompletionSource<bool>();

    cancellationToken.Register(
        async () =>
        {
            await onCancel();
            doneReceiving.SetResult(true); // Signal to quit message listener
        });

    await doneReceiving.Task.ConfigureAwait(false); // Listen until quit signal is received.
}
Johan Gov
źródło
1
Nie trzeba używać „asynchronizacji” z „TaskCompletionSource”, ponieważ już utworzył zadanie
Mandeep Janjua,