Mam kod biblioteki (sieci gniazd), który zapewnia Task
oparty na interfejsie API dla oczekujących odpowiedzi na żądania, w oparciu o TaskCompletionSource<T>
. Jednak w TPL występuje irytacja polegająca na tym, że wydaje się niemożliwe zapobiec synchronicznym kontynuacjom. Co ja lubię być w stanie zrobić, to albo:
- powiedz,
TaskCompletionSource<T>
że nie powinno pozwalać dzwoniącym na dołączanie się za pomocąTaskContinuationOptions.ExecuteSynchronously
lub - ustaw wynik (
SetResult
/TrySetResult
) w sposób, który określa, żeTaskContinuationOptions.ExecuteSynchronously
powinien być ignorowany, używając zamiast tego puli
W szczególności problem, który mam, polega na tym, że przychodzące dane są przetwarzane przez dedykowany czytnik i jeśli dzwoniący może się przyłączyć TaskContinuationOptions.ExecuteSynchronously
, może zablokować czytnik (co dotyczy nie tylko ich). Wcześniej obejrzałem to przez hakera, który wykrywa, czy są obecne jakieś kontynuacje, a jeśli tak, wypycha ukończenie do ThreadPool
, jednak ma to znaczący wpływ, jeśli dzwoniący wypełnił swoją kolejkę pracy, ponieważ zakończenie nie zostanie przetworzone w odpowiednim czasie. Jeśli używają Task.Wait()
(lub czegoś podobnego), będą w zasadzie zablokować się. Dlatego też czytelnik znajduje się w dedykowanym wątku, a nie korzysta z pracowników.
Więc; zanim spróbuję zganić zespół TPL: czy brakuje mi opcji?
Kluczowe punkty:
- Nie chcę, aby rozmówcy zewnętrzni mogli przejąć mój wątek
- Nie mogę użyć
ThreadPool
implementacji, ponieważ musi działać, gdy pula jest nasycona
Poniższy przykład daje wynik (kolejność może się różnić w zależności od czasu):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
Problem w tym, że przypadkowemu dzwoniącemu udało się uzyskać kontynuację w "Głównym wątku". W prawdziwym kodzie byłoby to zakłóceniem podstawowego czytelnika; złe rzeczy!
Kod:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
źródło
TaskCompletionSource
własnym API, aby zapobiec bezpośredniemu wywoływaniuContinueWith
, ponieważ ani nieTaskCompletionSource
, aniTask
nie nadają się dobrze do dziedziczenia po nich.Task
jest to ujawnione, a nieTaskCompletionSource
. To (wystawienie innego API) jest technicznie opcją, ale jest to dość ekstremalna rzecz, aby to zrobić ... Nie jestem pewien, czy to uzasadniaThreadPool
do tego celu (o którym już wspomniałem - powoduje to problemy), albo masz dedykowany wątek „oczekujących kontynuacji”, a następnie (kontynuacje zExecuteSynchronously
określonymi) mogą przejąć ten jeden zamiast tego - co powoduje dokładnie ten sam problem, ponieważ oznacza to, że kontynuacje innych wiadomości mogą zostać zablokowane, co ponownie wpływa na wielu dzwoniącychOdpowiedzi:
Nowość w .NET 4.6:
.NET 4.6 zawiera nowy
TaskCreationOptions
:RunContinuationsAsynchronously
.Ponieważ chcesz użyć Reflection, aby uzyskać dostęp do prywatnych pól ...
Możesz oznaczyć zadanie TCS
TASK_STATE_THREAD_WAS_ABORTED
flagą, co spowoduje, że wszystkie kontynuacje nie zostaną wstawione.const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
Edytować:
Zamiast używać emisji odbicia, sugeruję użycie wyrażeń. Jest to znacznie bardziej czytelne i ma tę zaletę, że jest zgodne z językiem PCL:
var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda<Action<Task>>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
Bez używania odbicia:
Jeśli ktoś jest zainteresowany, wymyśliłem sposób na zrobienie tego bez Refleksji, ale jest to również trochę "brudne" i oczywiście niesie za sobą niemającą znaczenia karę za perfekcję:
try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); }
źródło
TaskCreationOptions.DoNotInline
- i nawet nie potrzebowałaby zmiany podpisu ctora naTaskCompletionSource
ILGenerator
etc: github.com/StackExchange/StackExchange.Redis/blob/master/ ...Nie sądzę, aby w TPL było coś, co zapewniałoby wyraźną kontrolę API nad
TaskCompletionSource.SetResult
kontynuacjami. Postanowiłem zachować moją początkową odpowiedź dotyczącą kontrolowania tego zachowania wasync/await
scenariuszach.Oto inne rozwiązanie, które narzuca asynchroniczność
ContinueWith
, jeślitcs.SetResult
wyzwalana kontynuacja ma miejsce w tym samym wątku, w którymSetResult
zostało wywołane:public static class TaskExt { static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks = new ConcurrentDictionary<Task, Thread>(); // SetResultAsync static public void SetResultAsync<TResult>( this TaskCompletionSource<TResult> @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync<TResult>( this Task<TResult> @this, Action<Task<TResult>> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func<Task<TResult>, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } }
Zaktualizowano w celu uwzględnienia komentarza:
Nie wiedziałem, że nie kontrolujesz dzwoniącego. Niemniej jednak, jeśli nie kontrolujesz tego, prawdopodobnie nie przekazujesz również
TaskCompletionSource
obiektu bezpośrednio do wywołującego. Logicznie rzecz biorąc, przekazałbyś jego tokenową część, tjtcs.Task
. W takim przypadku rozwiązanie może być jeszcze łatwiejsze, dodając inną metodę rozszerzenia do powyższego:// ImposeAsync, TODO: more overrides static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this) { return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); }
Posługiwać się:
// library code var source = new TaskCompletionSource<int>(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123);
To faktycznie działa dla obu
await
iContinueWith
( skrzypce ) i jest wolne od refleksów.źródło
A zamiast robić
var task = source.Task;
zamiast tego robisz to
var task = source.Task.ContinueWith<Int32>( x => x.Result );
Dlatego zawsze dodajesz jedną kontynuację, która zostanie wykonana asynchronicznie, a wtedy nie ma znaczenia, czy subskrybenci chcą kontynuacji w tym samym kontekście. To trochę przykre zadanie, prawda?
źródło
ContinueWith
iawait
zwykle starają się unikać (sprawdzając już kompletne etc) - a ponieważ byłoby to wymusić wszystko na pracowników, to faktycznie pogorszyłoby sytuację. To pozytywny pomysł i dziękuję za niego: ale to nie pomoże w tym scenariuszu.jeśli możesz i jesteś gotów skorzystać z refleksji, to powinno wystarczyć;
public static class MakeItAsync { static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List<object>)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } }
źródło
Task.Run(() => tcs.SetResult(result))
?Zaktualizowałem , opublikowałem osobną odpowiedź, którą należy się zająć,
ContinueWith
w przeciwieństwie doawait
(ponieważContinueWith
nie dba o bieżący kontekst synchronizacji).Można użyć głupi kontekst synchronizacji narzucić asynchrony na kontynuację wywołane przez powołanie
SetResult/SetCancelled/SetException
się naTaskCompletionSource
. Uważam, że obecny kontekst synchronizacji (w momencieawait tcs.Task
) jest kryteriami, na podstawie których TPL decyduje, czy taka kontynuacja ma być synchroniczna, czy asynchroniczna.U mnie działa:
if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); }
SetResultAsync
jest zaimplementowany w następujący sposób:public static class TaskExt { static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal<FakeSynchronizationContext> s_context = new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } }
SynchronizationContext.SetSynchronizationContext
jest bardzo tani pod względem kosztów ogólnych, jakie dodaje. W rzeczywistości bardzo podobne podejście jest stosowane przy implementacji WPFDispatcher.BeginInvoke
.TPL porównuje docelowy kontekst synchronizacji w punkcie z kontekstem w
await
punkcietcs.SetResult
. Jeśli kontekst synchronizacji jest taki sam (lub nie ma kontekstu synchronizacji w obu miejscach), kontynuacja jest wywoływana bezpośrednio, synchronicznie. W przeciwnym razie jest umieszczany w kolejce przy użyciuSynchronizationContext.Post
kontekstu synchronizacji docelowej, tj. Normalnegoawait
zachowania. To podejście zawsze narzucaSynchronizationContext.Post
zachowanie (lub kontynuację wątku puli, jeśli nie ma docelowego kontekstu synchronizacji).Zaktualizowano , to nie zadziała
task.ContinueWith
, ponieważContinueWith
nie dba o bieżący kontekst synchronizacji. Działa jednak naawait task
( skrzypce ). To również działa dlaawait task.ConfigureAwait(false)
.OTOH, to podejście działa w przypadku
ContinueWith
.źródło
tcs.SetResult
wywołania. To trochę staje atomowej i bezpieczny wątku w ten sposób, ponieważ sama kontynuacja nastąpi na każdym innym wątku basenie lub na oryginalnym synchronizacji. kontekst przechwycony wawait tcs.Task
. ASynchronizationContext.SetSynchronizationContext
sam w sobie jest bardzo tani, znacznie tańszy niż sam przełącznik wątkowy.ThreadPool
. Dzięki temu rozwiązaniu TPL rzeczywiście użyjeThreadPool
, jeśli nie było synchronizacji. kontekst (lub był to podstawowy domyślny) wawait tcs.Task
. Ale to jest standardowe zachowanie OC.Podejście symulacyjne przerwania wyglądało naprawdę dobrze, ale w niektórych scenariuszach prowadziło do przejmowania wątków przez TPL .
Miałem wtedy implementację, która była podobna do sprawdzania obiektu kontynuacji , ale po prostu sprawdzałem, czy nie ma kontynuacji, ponieważ w rzeczywistości jest zbyt wiele scenariuszy, aby dany kod działał dobrze, ale oznaczało to, że nawet takie rzeczy
Task.Wait
spowodowały wyszukiwanie puli wątków.Ostatecznie, po sprawdzeniu wielu, wielu IL, jedynym bezpiecznym i użytecznym scenariuszem jest
SetOnInvokeMres
scenariusz (kontynuacja manual-reset-event-slim). Istnieje wiele innych scenariuszy:W końcu zdecydowałem się sprawdzić, czy nie ma zerowego obiektu kontynuacji; jeśli jest zerowa, w porządku (bez kontynuacji); jeśli jest różna od null, w specjalnych przypadkach sprawdź
SetOnInvokeMres
- jeśli tak jest: fine (bezpieczne wywołanie); w przeciwnym razie niech pula wątków wykona operacjęTrySetComplete
, bez mówienia zadaniu o zrobieniu czegoś specjalnego, na przykład przerwania spoofingu.Task.Wait
wykorzystujeSetOnInvokeMres
podejście, które jest konkretnym scenariuszem, którego naprawdę chcemy się postarać, aby uniknąć impasu.Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
źródło