Jak mogę zapobiec synchronicznym kontynuacjom zadania?

83

Mam kod biblioteki (sieci gniazd), który zapewnia Taskoparty na interfejsie API dla oczekujących odpowiedzi na żądania, w oparciu o TaskCompletionSource<T>. Jednak w TPL występuje irytacja polegająca na tym, że wydaje się niemożliwe zapobiec synchronicznym kontynuacjom. Co ja lubię być w stanie zrobić, to albo:

  • powiedz, TaskCompletionSource<T>że nie powinno pozwalać dzwoniącym na dołączanie się za pomocą TaskContinuationOptions.ExecuteSynchronouslylub
  • ustaw wynik ( SetResult/ TrySetResult) w sposób, który określa, że TaskContinuationOptions.ExecuteSynchronouslypowinien być ignorowany, używając zamiast tego puli

W szczególności problem, który mam, polega na tym, że przychodzące dane są przetwarzane przez dedykowany czytnik i jeśli dzwoniący może się przyłączyć TaskContinuationOptions.ExecuteSynchronously, może zablokować czytnik (co dotyczy nie tylko ich). Wcześniej obejrzałem to przez hakera, który wykrywa, czy są obecne jakieś kontynuacje, a jeśli tak, wypycha ukończenie do ThreadPool, jednak ma to znaczący wpływ, jeśli dzwoniący wypełnił swoją kolejkę pracy, ponieważ zakończenie nie zostanie przetworzone w odpowiednim czasie. Jeśli używają Task.Wait()(lub czegoś podobnego), będą w zasadzie zablokować się. Dlatego też czytelnik znajduje się w dedykowanym wątku, a nie korzysta z pracowników.

Więc; zanim spróbuję zganić zespół TPL: czy brakuje mi opcji?

Kluczowe punkty:

  • Nie chcę, aby rozmówcy zewnętrzni mogli przejąć mój wątek
  • Nie mogę użyć ThreadPoolimplementacji, ponieważ musi działać, gdy pula jest nasycona

Poniższy przykład daje wynik (kolejność może się różnić w zależności od czasu):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

Problem w tym, że przypadkowemu dzwoniącemu udało się uzyskać kontynuację w "Głównym wątku". W prawdziwym kodzie byłoby to zakłóceniem podstawowego czytelnika; złe rzeczy!

Kod:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}
Marc Gravell
źródło
2
Spróbowałbym owinąć TaskCompletionSourcewłasnym API, aby zapobiec bezpośredniemu wywoływaniu ContinueWith, ponieważ ani nie TaskCompletionSource, ani Tasknie nadają się dobrze do dziedziczenia po nich.
Dennis
1
@Dennis, żeby było jasne, w rzeczywistości Taskjest to ujawnione, a nie TaskCompletionSource. To (wystawienie innego API) jest technicznie opcją, ale jest to dość ekstremalna rzecz, aby to zrobić ... Nie jestem pewien, czy to uzasadnia
Marc Gravell
2
@MattH niezupełnie - po prostu przeformułowuje pytanie: albo używasz ThreadPooldo tego celu (o którym już wspomniałem - powoduje to problemy), albo masz dedykowany wątek „oczekujących kontynuacji”, a następnie (kontynuacje z ExecuteSynchronouslyokreślonymi) mogą przejąć ten jeden zamiast tego - co powoduje dokładnie ten sam problem, ponieważ oznacza to, że kontynuacje innych wiadomości mogą zostać zablokowane, co ponownie wpływa na wielu dzwoniących
Marc Gravell
3
@Andrey, że (to działa tak, jakby wszystkie wywołujące używały ContinueWith bez exec-sync) jest dokładnie tym, co chcę osiągnąć. Problem polega na tym, że jeśli moja biblioteka wręczy komuś zadanie, może zrobić coś bardzo niepożądanego: mogą przerwać mojemu czytnikowi (niewskazane) użycie exec-sync. Jest to niezwykle niebezpieczne, dlatego chciałbym zapobiec przedostawaniu się go do wnętrza biblioteki .
Marc Gravell
2
@Andrey ponieważ a: wiele zadań nigdy nie jest kontynuowanych w pierwszej kolejności (szczególnie podczas pracy zbiorczej) - wymusiłoby to każde zadanie, b: nawet te, które miałyby kontynuację, mają teraz znacznie większą złożoność, koszty ogólne i operacje robotnicze. To ma znaczenie.
Marc Gravell

Odpowiedzi:

50

Nowość w .NET 4.6:

.NET 4.6 zawiera nowy TaskCreationOptions: RunContinuationsAsynchronously.


Ponieważ chcesz użyć Reflection, aby uzyskać dostęp do prywatnych pól ...

Możesz oznaczyć zadanie TCS TASK_STATE_THREAD_WAS_ABORTEDflagą, co spowoduje, że wszystkie kontynuacje nie zostaną wstawione.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Edytować:

Zamiast używać emisji odbicia, sugeruję użycie wyrażeń. Jest to znacznie bardziej czytelne i ma tę zaletę, że jest zgodne z językiem PCL:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Bez używania odbicia:

Jeśli ktoś jest zainteresowany, wymyśliłem sposób na zrobienie tego bez Refleksji, ale jest to również trochę "brudne" i oczywiście niesie za sobą niemającą znaczenia karę za perfekcję:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}
Eli Arbel
źródło
3
@MarcGravell Użyj tego, aby utworzyć pseudo-próbkę dla zespołu TPL i zgłosić żądanie zmiany dotyczące możliwości zrobienia tego za pomocą opcji konstruktora lub czegoś podobnego.
Adam Houldsworth,
1
@Adam tak, gdybyś musiał nazwać tę flagę „co to robi” zamiast „co to powoduje”, byłoby to coś w rodzaju TaskCreationOptions.DoNotInline- i nawet nie potrzebowałaby zmiany podpisu ctora naTaskCompletionSource
Marc Gravell
2
@AdamHouldsworth i nie martw się, już wysyłam im e-maile tak samo; p
Marc Gravell
1
Dla twojego zainteresowania: tutaj, zoptymalizowany przez ILGeneratoretc: github.com/StackExchange/StackExchange.Redis/blob/master/ ...
Marc Gravell
1
@Noseratio yup, sprawdziłem je - dzięki; wszystkie są w porządku IMO; Zgadzam się, że to czyste obejście, ale daje dokładnie poprawne wyniki.
Marc Gravell
9

Nie sądzę, aby w TPL było coś, co zapewniałoby wyraźną kontrolę API nad TaskCompletionSource.SetResultkontynuacjami. Postanowiłem zachować moją początkową odpowiedź dotyczącą kontrolowania tego zachowania w async/awaitscenariuszach.

Oto inne rozwiązanie, które narzuca asynchroniczność ContinueWith, jeśli tcs.SetResultwyzwalana kontynuacja ma miejsce w tym samym wątku, w którym SetResultzostało wywołane:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Zaktualizowano w celu uwzględnienia komentarza:

Nie kontroluję dzwoniącego - nie mogę zmusić ich do używania określonego wariantu kontynuowania: gdybym mógł, problem nie istniałby w pierwszej kolejności

Nie wiedziałem, że nie kontrolujesz dzwoniącego. Niemniej jednak, jeśli nie kontrolujesz tego, prawdopodobnie nie przekazujesz również TaskCompletionSourceobiektu bezpośrednio do wywołującego. Logicznie rzecz biorąc, przekazałbyś jego tokenową część, tj tcs.Task. W takim przypadku rozwiązanie może być jeszcze łatwiejsze, dodając inną metodę rozszerzenia do powyższego:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Posługiwać się:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

To faktycznie działa dla obu awaitiContinueWith ( skrzypce ) i jest wolne od refleksów.

noseratio
źródło
1
Nie kontroluję dzwoniącego - nie mogę zmusić ich do użycia określonego wariantu kontynuowania: gdybym mógł, problem nie istniałby w pierwszej kolejności
Marc Gravell
@MarcGravell, nie wiedziałem, że nie możesz kontrolować dzwoniącego. Opublikowałem aktualizację, jak sobie z tym poradzę.
noseratio
dylemat autora biblioteki; p Zwróć uwagę, że ktoś znalazł znacznie prostszy i bardziej bezpośredni sposób osiągnięcia pożądanego rezultatu
Marc Gravell
4

A zamiast robić

var task = source.Task;

zamiast tego robisz to

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Dlatego zawsze dodajesz jedną kontynuację, która zostanie wykonana asynchronicznie, a wtedy nie ma znaczenia, czy subskrybenci chcą kontynuacji w tym samym kontekście. To trochę przykre zadanie, prawda?

Ivan Zlatanov
źródło
1
To pojawiło się w komentarzach (patrz Andrey); Problem nie jest to, że zmusza wszystkie zadania, aby mieć kontynuację gdy nie mieliby inaczej, co jest coś, że zarówno ContinueWithi awaitzwykle starają się unikać (sprawdzając już kompletne etc) - a ponieważ byłoby to wymusić wszystko na pracowników, to faktycznie pogorszyłoby sytuację. To pozytywny pomysł i dziękuję za niego: ale to nie pomoże w tym scenariuszu.
Marc Gravell
3

jeśli możesz i jesteś gotów skorzystać z refleksji, to powinno wystarczyć;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}
Fredou
źródło
Ten hack może po prostu przestać działać w następnej wersji Framework.
noseratio
@Noseratio, to prawda, ale teraz działa i mogą również zaimplementować właściwy sposób, aby to zrobić w następnej wersji
Fredou
Ale dlaczego miałbyś tego potrzebować, skoro po prostu możesz to zrobić Task.Run(() => tcs.SetResult(result))?
noseratio
@Noseratio, nie wiem, zadaj to pytanie Markowi :-), po prostu usuwam flagę TaskContinuationOptions.ExecuteSynchronously na wszystkich zadaniach połączonych z TaskCompletionSource, które upewniają się, że wszyscy używają puli wątków zamiast głównego wątku
Fredou
Hack m_continuationObject jest w rzeczywistości kodem, którego już używam do identyfikowania potencjalnie problematycznych zadań - więc nie jest to poza rozważeniem. Ciekawe, dzięki. Jak dotąd jest to najbardziej użyteczna opcja.
Marc Gravell
3

Zaktualizowałem , opublikowałem osobną odpowiedź, którą należy się zająć, ContinueWithw przeciwieństwie do await(ponieważ ContinueWithnie dba o bieżący kontekst synchronizacji).

Można użyć głupi kontekst synchronizacji narzucić asynchrony na kontynuację wywołane przez powołanie SetResult/SetCancelled/SetExceptionsię na TaskCompletionSource. Uważam, że obecny kontekst synchronizacji (w momencie await tcs.Task) jest kryteriami, na podstawie których TPL decyduje, czy taka kontynuacja ma być synchroniczna, czy asynchroniczna.

U mnie działa:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync jest zaimplementowany w następujący sposób:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext jest bardzo tani pod względem kosztów ogólnych, jakie dodaje. W rzeczywistości bardzo podobne podejście jest stosowane przy implementacji WPFDispatcher.BeginInvoke .

TPL porównuje docelowy kontekst synchronizacji w punkcie z kontekstem w awaitpunkcie tcs.SetResult. Jeśli kontekst synchronizacji jest taki sam (lub nie ma kontekstu synchronizacji w obu miejscach), kontynuacja jest wywoływana bezpośrednio, synchronicznie. W przeciwnym razie jest umieszczany w kolejce przy użyciu SynchronizationContext.Postkontekstu synchronizacji docelowej, tj. Normalnego awaitzachowania. To podejście zawsze narzuca SynchronizationContext.Postzachowanie (lub kontynuację wątku puli, jeśli nie ma docelowego kontekstu synchronizacji).

Zaktualizowano , to nie zadziała task.ContinueWith, ponieważ ContinueWithnie dba o bieżący kontekst synchronizacji. Działa jednak na await task( skrzypce ). To również działa dla await task.ConfigureAwait(false).

OTOH, to podejście działa w przypadku ContinueWith.

noseratio
źródło
Kuszące, ale zmiana kontekstu synchronizacji prawie na pewno wpłynęłaby na aplikację wywołującą - na przykład aplikacja internetowa lub Windows, która akurat korzysta z mojej biblioteki, nie powinna wykrywać zmiany kontekstu synchronizacji setki razy na sekundę.
Marc Gravell
@MarcGravell, zmieniam to tylko dla zakresu tcs.SetResultwywołania. To trochę staje atomowej i bezpieczny wątku w ten sposób, ponieważ sama kontynuacja nastąpi na każdym innym wątku basenie lub na oryginalnym synchronizacji. kontekst przechwycony w await tcs.Task. A SynchronizationContext.SetSynchronizationContextsam w sobie jest bardzo tani, znacznie tańszy niż sam przełącznik wątkowy.
noseratio
Może to jednak nie spełniać drugiego wymagania: nie używać ThreadPool. Dzięki temu rozwiązaniu TPL rzeczywiście użyje ThreadPool, jeśli nie było synchronizacji. kontekst (lub był to podstawowy domyślny) w await tcs.Task. Ale to jest standardowe zachowanie OC.
noseratio
Hmmm ... ponieważ kontekst synchronizacji jest dla wątku, może to być faktycznie wykonalne - i nie musiałbym przełączać ctx - po prostu ustaw go raz dla wątku roboczego. Będę musiał z tym zagrać
Marc Gravell
1
@Noseracja ah, prawda: nie było jasne, że kluczową kwestią było to, że byli inni . Spojrzę. Dzięki.
Marc Gravell
3

Podejście symulacyjne przerwania wyglądało naprawdę dobrze, ale w niektórych scenariuszach prowadziło do przejmowania wątków przez TPL .

Miałem wtedy implementację, która była podobna do sprawdzania obiektu kontynuacji , ale po prostu sprawdzałem, czy nie ma kontynuacji, ponieważ w rzeczywistości jest zbyt wiele scenariuszy, aby dany kod działał dobrze, ale oznaczało to, że nawet takie rzeczy Task.Waitspowodowały wyszukiwanie puli wątków.

Ostatecznie, po sprawdzeniu wielu, wielu IL, jedynym bezpiecznym i użytecznym scenariuszem jest SetOnInvokeMresscenariusz (kontynuacja manual-reset-event-slim). Istnieje wiele innych scenariuszy:

  • niektóre nie są bezpieczne i prowadzą do przechwytywania wątków
  • reszta nie jest przydatna, ponieważ ostatecznie prowadzi do puli wątków

W końcu zdecydowałem się sprawdzić, czy nie ma zerowego obiektu kontynuacji; jeśli jest zerowa, w porządku (bez kontynuacji); jeśli jest różna od null, w specjalnych przypadkach sprawdź SetOnInvokeMres- jeśli tak jest: fine (bezpieczne wywołanie); w przeciwnym razie niech pula wątków wykona operację TrySetComplete, bez mówienia zadaniu o zrobieniu czegoś specjalnego, na przykład przerwania spoofingu. Task.Waitwykorzystuje SetOnInvokeMrespodejście, które jest konkretnym scenariuszem, którego naprawdę chcemy się postarać, aby uniknąć impasu.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
Marc Gravell
źródło