Właściwy sposób na realizację niekończącego się zadania. (Liczniki czasu a zadanie)

92

Tak więc moja aplikacja musi wykonywać akcję prawie w sposób ciągły (z przerwą około 10 sekund między każdym uruchomieniem) tak długo, jak działa lub żądane jest anulowanie. Praca, którą musi wykonać, może zająć do 30 sekund.

Czy lepiej jest użyć System.Timers.Timer i użyć funkcji AutoReset, aby upewnić się, że nie wykona on akcji przed zakończeniem poprzedniego „tiku”.

A może powinienem używać ogólnego zadania w trybie LongRunning z tokenem anulowania i mieć w nim regularną nieskończoną pętlę while wywołującą akcję wykonującą pracę z 10-sekundowym wątkiem. Jeśli chodzi o model async / await, nie jestem pewien, czy byłby tutaj odpowiedni, ponieważ nie mam żadnych wartości zwracanych z pracy.

CancellationTokenSource wtoken;
Task task;

void StopWork()
{
    wtoken.Cancel();

    try 
    {
        task.Wait();
    } catch(AggregateException) { }
}

void StartWork()
{
    wtoken = new CancellationTokenSource();

    task = Task.Factory.StartNew(() =>
    {
        while (true)
        {
            wtoken.Token.ThrowIfCancellationRequested();
            DoWork();
            Thread.Sleep(10000);
        }
    }, wtoken, TaskCreationOptions.LongRunning);
}

void DoWork()
{
    // Some work that takes up to 30 seconds but isn't returning anything.
}

lub po prostu użyj prostego licznika czasu podczas używania właściwości AutoReset i wywołaj .Stop (), aby go anulować?

Josh
źródło
Zadanie wydaje się przesadą, biorąc pod uwagę to, co próbujesz osiągnąć. en.wikipedia.org/wiki/KISS_principle . Zatrzymaj licznik czasu na początku OnTick (), sprawdź bool, aby zobaczyć, czy powinieneś coś robić, nie, pracuj, uruchom ponownie Timer, kiedy skończysz.
Mike Trusov

Odpowiedzi:

94

Użyłbym OC dataflow za to (ponieważ używasz .NET 4.5 i wykorzystuje Taskwewnętrznie). Możesz łatwo utworzyć, ActionBlock<TInput>który wysyła elementy do siebie po przetworzeniu jego akcji i odczekaniu odpowiedniej ilości czasu.

Najpierw stwórz fabrykę, która stworzy Twoje niekończące się zadanie:

ITargetBlock<DateTimeOffset> CreateNeverEndingTask(
    Action<DateTimeOffset> action, CancellationToken cancellationToken)
{
    // Validate parameters.
    if (action == null) throw new ArgumentNullException("action");

    // Declare the block variable, it needs to be captured.
    ActionBlock<DateTimeOffset> block = null;

    // Create the block, it will call itself, so
    // you need to separate the declaration and
    // the assignment.
    // Async so you can wait easily when the
    // delay comes.
    block = new ActionBlock<DateTimeOffset>(async now => {
        // Perform the action.
        action(now);

        // Wait.
        await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).
            // Doing this here because synchronization context more than
            // likely *doesn't* need to be captured for the continuation
            // here.  As a matter of fact, that would be downright
            // dangerous.
            ConfigureAwait(false);

        // Post the action back to the block.
        block.Post(DateTimeOffset.Now);
    }, new ExecutionDataflowBlockOptions { 
        CancellationToken = cancellationToken
    });

    // Return the block.
    return block;
}

Wybrałam ActionBlock<TInput>wziąć DateTimeOffsetstrukturę ; musisz przekazać parametr typu i równie dobrze może on przekazać jakiś użyteczny stan (możesz zmienić naturę stanu, jeśli chcesz).

Należy również pamiętać, że ActionBlock<TInput>domyślnie przetwarza tylko jeden element naraz, więc masz gwarancję, że zostanie przetworzona tylko jedna akcja (co oznacza, że ​​nie będziesz musiał zajmować się ponownym wejściem, gdy wywoła Postmetodę rozszerzenia z powrotem).

Przekazałem również CancellationTokenstrukturę zarówno do konstruktora metody, jak ActionBlock<TInput>i do wywołania Task.Delaymetody ; jeśli proces zostanie anulowany, anulowanie nastąpi przy pierwszej możliwej okazji.

Stamtąd można łatwo refaktoryzować kod w celu przechowywania ITargetBlock<DateTimeoffset>interfejsu zaimplementowanego przez ActionBlock<TInput>(jest to abstrakcja wyższego poziomu reprezentująca bloki, które są konsumentami, i chcesz mieć możliwość wyzwalania zużycia przez wywołanie Postmetody rozszerzenia):

CancellationTokenSource wtoken;
ActionBlock<DateTimeOffset> task;

Twoja StartWorkmetoda:

void StartWork()
{
    // Create the token source.
    wtoken = new CancellationTokenSource();

    // Set the task.
    task = CreateNeverEndingTask(now => DoWork(), wtoken.Token);

    // Start the task.  Post the time.
    task.Post(DateTimeOffset.Now);
}

A potem twoja StopWorkmetoda:

void StopWork()
{
    // CancellationTokenSource implements IDisposable.
    using (wtoken)
    {
        // Cancel.  This will cancel the task.
        wtoken.Cancel();
    }

    // Set everything to null, since the references
    // are on the class level and keeping them around
    // is holding onto invalid state.
    wtoken = null;
    task = null;
}

Dlaczego miałbyś chcieć tutaj używać TPL Dataflow? Kilka powodów:

Rozdzielenie obaw

CreateNeverEndingTaskMetoda jest fabryka, która tworzy swoje „usługi” że tak powiem. Kontrolujesz, kiedy się uruchamia i zatrzymuje, i jest całkowicie niezależny. Nie musisz przeplatać kontroli stanu licznika czasu z innymi aspektami kodu. Po prostu tworzysz blok, uruchamiasz go i zatrzymujesz, gdy skończysz.

Bardziej efektywne wykorzystanie wątków / zadań / zasobów

Domyślny harmonogram dla bloków w przepływie danych TPL jest taki sam dla a Task, czyli puli wątków. Używając ActionBlock<TInput>do przetwarzania swojej akcji, a także wywołania Task.Delay, dajesz kontrolę nad wątkiem, którego używałeś, gdy tak naprawdę nic nie robisz. To prawda, że ​​faktycznie prowadzi to do pewnego narzutu, gdy odradzasz nowy Task, który przetworzy kontynuację, ale to powinno być małe, biorąc pod uwagę, że nie przetwarzasz tego w ciasnej pętli (czekasz dziesięć sekund między wywołaniami).

Jeśli DoWorkfaktycznie można uczynić funkcję oczekującą (a mianowicie, że zwraca a Task), możesz (prawdopodobnie) zoptymalizować to jeszcze bardziej, dostosowując powyższą metodę fabryki, aby przyjmowała a Func<DateTimeOffset, CancellationToken, Task>zamiast an Action<DateTimeOffset>, jak na przykład:

ITargetBlock<DateTimeOffset> CreateNeverEndingTask(
    Func<DateTimeOffset, CancellationToken, Task> action, 
    CancellationToken cancellationToken)
{
    // Validate parameters.
    if (action == null) throw new ArgumentNullException("action");

    // Declare the block variable, it needs to be captured.
    ActionBlock<DateTimeOffset> block = null;

    // Create the block, it will call itself, so
    // you need to separate the declaration and
    // the assignment.
    // Async so you can wait easily when the
    // delay comes.
    block = new ActionBlock<DateTimeOffset>(async now => {
        // Perform the action.  Wait on the result.
        await action(now, cancellationToken).
            // Doing this here because synchronization context more than
            // likely *doesn't* need to be captured for the continuation
            // here.  As a matter of fact, that would be downright
            // dangerous.
            ConfigureAwait(false);

        // Wait.
        await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).
            // Same as above.
            ConfigureAwait(false);

        // Post the action back to the block.
        block.Post(DateTimeOffset.Now);
    }, new ExecutionDataflowBlockOptions { 
        CancellationToken = cancellationToken
    });

    // Return the block.
    return block;
}

Oczywiście dobrą praktyką byłoby przeplatanie całej CancellationTokenmetody (jeśli ją akceptuje), co jest zrobione tutaj.

Oznacza to, że miałbyś wtedy DoWorkAsyncmetodę z następującym podpisem:

Task DoWorkAsync(CancellationToken cancellationToken);

Musiałbyś zmienić (tylko nieznacznie i nie wykrwawiasz tutaj oddzielenia obaw) StartWorkmetodę rozliczania nowego podpisu przekazanego do CreateNeverEndingTaskmetody, na przykład:

void StartWork()
{
    // Create the token source.
    wtoken = new CancellationTokenSource();

    // Set the task.
    task = CreateNeverEndingTask((now, ct) => DoWorkAsync(ct), wtoken.Token);

    // Start the task.  Post the time.
    task.Post(DateTimeOffset.Now, wtoken.Token);
}
casperOne
źródło
Witam, próbuję tej implementacji, ale mam problemy. Jeśli mój DoWork nie przyjmuje argumentu, task = CreateNeverEndingTask (now => DoWork (), wtoken.Token); daje mi błąd kompilacji (niezgodność typów). Z drugiej strony, jeśli mój DoWork przyjmuje parametr DateTimeOffset, ta sama linia daje mi inny błąd kompilacji, informując mnie, że żadne przeciążenie DoWork nie przyjmuje 0 argumentów. Czy mógłbyś mi pomóc to rozgryźć?
Bovaz
1
Właściwie rozwiązałem swój problem, dodając rzutowanie do wiersza, w którym przypisuję zadanie i przekazując parametr do DoWork: task = (ActionBlock <DateTimeOffset>) CreateNeverEndingTask (now => DoWork (teraz), wtoken.Token);
Bovaz
Można również zmienić typ zadania „ActionBlock <DateTimeOffset>;” do zadania ITargetBlock <DateTimeOffset>;
XOR
1
Uważam, że może to przydzielić pamięć na zawsze, co ostatecznie doprowadzi do przepełnienia.
Nate Gardner,
@NateGardner W której części?
casperOne
75

Uważam, że nowy interfejs oparty na zadaniach jest bardzo prosty w robieniu takich rzeczy - nawet łatwiejszy niż używanie klasy Timer.

Jest kilka drobnych poprawek, które możesz wprowadzić do swojego przykładu. Zamiast:

task = Task.Factory.StartNew(() =>
{
    while (true)
    {
        wtoken.Token.ThrowIfCancellationRequested();
        DoWork();
        Thread.Sleep(10000);
    }
}, wtoken, TaskCreationOptions.LongRunning);

Możesz to zrobić:

task = Task.Run(async () =>  // <- marked async
{
    while (true)
    {
        DoWork();
        await Task.Delay(10000, wtoken.Token); // <- await with cancellation
    }
}, wtoken.Token);

W ten sposób anulowanie nastąpi natychmiastowo, jeśli znajduje się w środku Task.Delay, zamiast czekać na Thread.Sleepzakończenie.

Ponadto użycie Task.Delayover Thread.Sleepoznacza, że ​​nie wiążesz nici, nie robiąc nic przez czas snu.

Jeśli możesz, możesz również DoWork()zaakceptować token anulowania, a anulowanie będzie znacznie szybsze.

Porge
źródło
1
Zobacz, jakie zadanie otrzymasz, jeśli użyjesz async lambda jako parametru Task.Factory.StartNew - blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx When you do task.Wait ( ); po prośbie o anulowanie będziesz czekał na nieprawidłowe zadanie.
Lukas Pirkl,
Tak, to właściwie powinno być Task.Run teraz, które ma poprawne przeciążenie.
porges
Według http://blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx wygląda na to, że Task.Runużywa puli wątków, więc twój przykład używając Task.Runzamiast Task.Factory.StartNewwith TaskCreationOptions.LongRunningnie robi dokładnie tego samego - gdybym potrzebował zadania, aby skorzystać z LongRunningopcji, czy nie byłbym w stanie użyć tego Task.Run, co pokazałeś, czy czegoś mi brakuje?
Jeff
@Lumirris: Celem async / await jest uniknięcie wiązania wątku przez cały czas jego wykonywania (tutaj, podczas wywołania Delay zadanie nie używa wątku). Tak więc używanie LongRunningjest w pewnym sensie niezgodne z celem, jakim jest nie wiązanie wątków. Jeśli chcesz zagwarantować działanie na własnym wątku, możesz go użyć, ale tutaj będziesz rozpoczynać wątek, który śpi przez większość czasu. Jaki jest przypadek użycia?
porges
@Porges Point zajęte. Moim przypadkiem użycia byłoby zadanie uruchamiające nieskończoną pętlę, w której każda iteracja wykonywałaby kawałek pracy i „relaksowała się” przez 2 sekundy przed wykonaniem kolejnej porcji pracy w następnej iteracji. Działa wiecznie, ale robi regularne 2-sekundowe przerwy. Mój komentarz dotyczył jednak bardziej tego, czy można określić to przy LongRunningużyciu Task.Runskładni. Z dokumentacji wynika, że Task.Runskładnia jest bardziej przejrzysta, o ile jesteś zadowolony z domyślnych ustawień, których używa. Wygląda na to, że nie ma przeciążenia, które przyjmuje TaskCreationOptionsargument.
Jeff,
4

Oto co wymyśliłem:

  • Dziedzicz NeverEndingTaski Zastąp plikExecutionCore metody metodę pracą, którą chcesz wykonać.
  • Zmiana ExecutionLoopDelayMspozwala dostosować czas między pętlami, np. Jeśli chcesz użyć algorytmu wycofywania.
  • Start/Stop zapewniają synchroniczny interfejs do uruchamiania / zatrzymywania zadania.
  • LongRunningoznacza, że ​​otrzymasz jeden dedykowany wątek na plik NeverEndingTask.
  • Ta klasa nie przydziela pamięci w pętli w przeciwieństwie do ActionBlockrozwiązania bazowego powyżej.
  • Poniższy kod to szkic, niekoniecznie kod produkcyjny :)

:

public abstract class NeverEndingTask
{
    // Using a CTS allows NeverEndingTask to "cancel itself"
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();

    protected NeverEndingTask()
    {
         TheNeverEndingTask = new Task(
            () =>
            {
                // Wait to see if we get cancelled...
                while (!_cts.Token.WaitHandle.WaitOne(ExecutionLoopDelayMs))
                {
                    // Otherwise execute our code...
                    ExecutionCore(_cts.Token);
                }
                // If we were cancelled, use the idiomatic way to terminate task
                _cts.Token.ThrowIfCancellationRequested();
            },
            _cts.Token,
            TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning);

        // Do not forget to observe faulted tasks - for NeverEndingTask faults are probably never desirable
        TheNeverEndingTask.ContinueWith(x =>
        {
            Trace.TraceError(x.Exception.InnerException.Message);
            // Log/Fire Events etc.
        }, TaskContinuationOptions.OnlyOnFaulted);

    }

    protected readonly int ExecutionLoopDelayMs = 0;
    protected Task TheNeverEndingTask;

    public void Start()
    {
       // Should throw if you try to start twice...
       TheNeverEndingTask.Start();
    }

    protected abstract void ExecutionCore(CancellationToken cancellationToken);

    public void Stop()
    {
        // This code should be reentrant...
        _cts.Cancel();
        TheNeverEndingTask.Wait();
    }
}
Jack Ukleja
źródło