Zdezorientowany, gdy metoda boost :: asio :: io_service run blokuje / odblokowuje

88

Będąc całkowitym początkującym w Boost.Asio, jestem zdezorientowany io_service::run(). Byłbym wdzięczny, gdyby ktoś mógł mi wytłumaczyć, kiedy ta metoda blokuje / odblokowuje. Dokumentacja stwierdza:

Te run()bloki funkcyjne, aż cała praca zakończyła i nie ma więcej koparki być wysłane lub dopóki io_servicezostał zatrzymany.

Wiele wątków może wywołać run()funkcję w celu skonfigurowania puli wątków, z których io_servicemogą wykonywać programy obsługi. Wszystkie wątki oczekujące w puli są równoważne i io_servicemoże wybrać dowolny z nich do wywołania procedury obsługi.

Normalne wyjście z run()funkcji oznacza, że io_serviceobiekt jest zatrzymany ( stopped()funkcja zwraca wartość true). Kolejne wywołania run(), run_one(), poll()lub poll_one()wróci natychmiast, chyba że istnieje wcześniejsze wywołanie reset().

Co oznacza poniższe stwierdzenie?

[...] nie ma więcej handlowców do wysłania [...]


Próbując zrozumieć zachowanie io_service::run(), natknąłem się na ten przykład (przykład 3a). W jej obrębie obserwuję, że io_service->run()blokuje się i czeka na zlecenia pracy.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

Jednak w poniższym kodzie, nad którym pracowałem, klient łączy się za pomocą protokołu TCP / IP, a metoda run metoda blokuje, aż dane zostaną odebrane asynchronicznie.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

Jakiekolwiek wyjaśnienie run()tego opisujące jego zachowanie w dwóch poniższych przykładach byłoby mile widziane.

MistyD
źródło

Odpowiedzi:

234

Fundacja

Zacznijmy od uproszczonego przykładu i przeanalizujmy odpowiednie elementy Boost.Asio:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

Co to jest Handler ?

Program obsługi to nic innego jak wywołanie zwrotne. W przykładowym kodzie są 3 programy obsługi:

  • Przewodnik print(1).
  • Przewodnik handle_async_receive(3).
  • Przewodnik print(4).

Mimo że ta sama print()funkcja jest używana dwukrotnie, każde użycie jest uważane za utworzenie własnej, unikatowej procedury obsługi. Programy obsługi mogą mieć wiele kształtów i rozmiarów, od podstawowych funkcji, takich jak te powyżej, do bardziej złożonych konstrukcji, takich jak funktory generowane z boost::bind()i lambdy. Niezależnie od złożoności, program obsługi nadal pozostaje niczym więcej niż wywołaniem zwrotnym.

Co to jest praca ?

Praca to przetwarzanie, o wykonanie którego Boost.Asio został poproszony w imieniu kodu aplikacji. Czasami Boost.Asio może rozpocząć część pracy, gdy tylko zostanie o tym poinformowana, a innym razem może zaczekać na wykonanie pracy w późniejszym czasie. Po zakończeniu pracy Boost.Asio poinformuje aplikację, wywołując dostarczoną procedurę obsługi .

Boost.Asio gwarantuje, że koparki będzie działać tylko w wątku, który jest obecnie wywołującego run(), run_one(), poll(), lub poll_one(). To są wątki, które będą działać i programy obsługi połączeń . Dlatego w powyższym przykładzie print()nie jest wywoływana, gdy jest wysyłana do io_service(1). Zamiast tego jest dodawany do io_servicei zostanie wywołany w późniejszym czasie. W tym przypadku w ciągu io_service.run()(5).

Co to są operacje asynchroniczne?

Operacja asynchroniczna tworzy pracę, a Boost.Asio wywoła procedurę obsługi, aby poinformować aplikację o zakończeniu pracy. Operacje asynchroniczne są tworzone przez wywołanie funkcji, która ma nazwę z prefiksem async_. Funkcje te są również znane jako funkcje inicjujące .

Operacje asynchroniczne można rozłożyć na trzy unikalne kroki:

  • Zainicjowanie lub poinformowanie powiązanego, io_serviceże działa, musi zostać wykonane. async_receiveOperacja (3) informuje io_service, że będzie trzeba asynchronicznie odczytu danych z gniazda, a następnie async_receivezwraca natychmiast.
  • Wykonuję właściwą pracę. W takim przypadku po socketotrzymaniu danych bajty zostaną odczytane i skopiowane do buffer. Rzeczywista praca zostanie wykonana w:
    • Funkcja inicjująca (3), jeśli Boost.Asio może określić, że nie będzie blokować.
    • Gdy aplikacja jawnie uruchamia io_service(5).
  • Wywołanie handle_async_receive ReadHandler . Po raz kolejny programy obsługi są wywoływane tylko w wątkach z uruchomionym io_service. Zatem niezależnie od tego, kiedy praca zostanie wykonana (3 lub 5), gwarantuje się, że handle_async_receive()zostanie ona wywołana tylko w ciągu io_service.run()(5).

Oddzielenie w czasie i przestrzeni między tymi trzema etapami jest znane jako inwersja przepływu sterowania. Jest to jedna ze złożoności, która utrudnia programowanie asynchroniczne. Istnieją jednak techniki, które mogą pomóc w złagodzeniu tego problemu , na przykład za pomocą programów .

Co robi io_service.run()?

Gdy wywoła wątek io_service.run(), praca i programy obsługi będą wywoływane z poziomu tego wątku. W powyższym przykładzie io_service.run()(5) będzie blokować do:

  • Wywołał i zwrócił z obu printprogramów obsługi, operacja odbierania zakończyła się sukcesem lub niepowodzeniem, a jej handle_async_receiveprogram obsługi został wywołany i zwrócony.
  • io_serviceWyraźnie zatrzymała io_service::stop().
  • Wyjątek jest zgłaszany z wnętrza programu obsługi.

Jeden potencjalny pseudo-przepływ można opisać następująco:

utwórz io_service
utwórz gniazdo
dodaj moduł obsługi drukowania do io_service (1)
poczekaj, aż gniazdo się połączy (2)
dodaj asynchroniczne żądanie odczytu pracy do io_service (3)
dodaj obsługę drukowania do io_service (4)
uruchom io_service (5)
  czy jest tam praca lub opiekunowie?
    tak, jest 1 praca i 2 opiekunów
      czy gniazdo ma dane? nie, nic nie rób
      uruchom program obsługi drukowania (1)
  czy jest tam praca lub opiekunowie?
    tak, jest 1 praca i 1 przewodnik
      czy gniazdo ma dane? nie, nic nie rób
      uruchom program obsługi drukowania (4)
  czy jest tam praca lub opiekunowie?
    tak, jest 1 praca
      czy gniazdo ma dane? nie, czekaj dalej
  - gniazdo odbiera dane -
      gniazdo zawiera dane, wczytaj je do bufora
      dodaj program obsługi handle_async_receive do io_service
  czy jest tam praca lub opiekunowie?
    tak, jest 1 przewodnik
      uruchom handle_async_receive handler (3)
  czy jest tam praca lub opiekunowie?
    nie, ustaw io_service jako zatrzymane i wróć

Zwróć uwagę, jak po zakończeniu odczytu dodano kolejną procedurę obsługi do io_service. Ten subtelny szczegół jest ważną cechą programowania asynchronicznego. Pozwala on na koparki być przykuty razem. Na przykład, jeśli handle_async_receivenie otrzyma wszystkich oczekiwanych danych, jego implementacja może wysłać kolejną asynchroniczną operację odczytu, co spowoduje io_servicewięcej pracy, a tym samym nie powróci z io_service.run().

Zwróć uwagę, że gdy io_serviceskończy się praca, aplikacja musi reset()io_serviceprzed ponownym uruchomieniem.


Przykładowe pytanie i przykładowy kod 3a

Teraz przyjrzyjmy się dwóm fragmentom kodu, do których odwołuje się pytanie.

Kod pytania

socket->async_receivedodaje pracę do io_service. W ten sposób io_service->run()będzie blokować, dopóki operacja odczytu nie zakończy się sukcesem lub błędem i ClientReceiveEventalbo zakończy działanie, albo zgłosi wyjątek.

Przykład 3a Kod

W nadziei na ułatwienie zrozumienia, oto mniejszy przykład 3a z adnotacjami:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

Na wysokim poziomie program utworzy 2 wątki, które będą przetwarzać io_servicepętlę zdarzeń (2). Daje to prostą pulę wątków, która obliczy liczby Fibonacciego (3).

Jedną główną różnicą między kodem pytania a tym kodem jest to, że ten kod wywołuje io_service::run()(2) przed dodaniem rzeczywistej pracy i programów obsługi do io_service(3). Aby zapobiec io_service::run()natychmiastowemu powracaniu, io_service::worktworzony jest obiekt (1). Ten obiekt zapobiega io_servicebrakowi pracy; dlatego io_service::run()nie powróci w wyniku braku pracy.

Ogólny przepływ jest następujący:

  1. Utwórz i dodaj io_service::workobiekt dodany do io_service.
  2. Utworzono pulę wątków, która wywołuje io_service::run(). Te wątki robocze nie powrócą z io_servicepowodu io_service::workobiektu.
  3. Dodaj 3 procedury obsługi, które obliczają liczby Fibonacciego io_service, i natychmiast wróć. Wątki robocze, a nie wątek główny, mogą natychmiast rozpocząć uruchamianie tych programów obsługi.
  4. Usuń io_service::workobiekt.
  5. Poczekaj, aż wątki robocze zostaną zakończone. Nastąpi to dopiero, gdy wszystkie 3 procedury obsługi zakończą wykonywanie, ponieważ io_serviceżaden z nich nie ma obsługi ani nie działa.

Kod można napisać inaczej, w taki sam sposób jak kod oryginalny, w którym procedury obsługi są dodawane do elementu io_service, a następnie io_serviceprzetwarzana jest pętla zdarzeń. Eliminuje to potrzebę używania io_service::worki skutkuje następującym kodem:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

Synchroniczne a asynchroniczne

Chociaż kod w pytaniu używa operacji asynchronicznej, skutecznie działa synchronicznie, ponieważ czeka na zakończenie operacji asynchronicznej:

socket.async_receive(buffer, handler)
io_service.run();

jest równa:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

Zgodnie z ogólną zasadą należy unikać mieszania operacji synchronicznych i asynchronicznych. Często może zmienić złożony system w skomplikowany system. Ta odpowiedź podkreśla zalety programowania asynchronicznego, z których niektóre są również omówione w dokumentacji Boost.Asio .

Tanner Sansbury
źródło
13
Świetny post. Chciałbym dodać tylko jedną rzecz, ponieważ wydaje mi się, że nie zwraca na nią wystarczającej uwagi: po powrocie run () musisz wywołać reset () na swojej usłudze io_service, zanim będziesz mógł ją ponownie uruchomić (). W przeciwnym razie może powrócić natychmiast, niezależnie od tego, czy są oczekujące operacje async_, czy nie.
DeVadder
Skąd się bierze bufor? Co to jest?
ruipacheco
Nadal jestem zdezorientowany. Jeśli miksowanie jest synchronizowane, a asynchroniczne nie jest zalecane, to jaki jest tryb czystej asynchronii? czy możesz podać przykład pokazujący kod bez io_service.run () ;?
Splash
@Splash Można go użyć io_service.poll()do przetwarzania pętli zdarzeń bez blokowania zaległych operacji. Podstawowym zaleceniem unikania mieszania operacji synchronicznych i asynchronicznych jest unikanie dodawania niepotrzebnej złożoności i zapobieganie słabej reakcji, gdy procedury obsługi zajmują dużo czasu do ukończenia. Istnieją przypadki, w których jest to bezpieczne, na przykład gdy wiadomo, że praca synchroniczna nie zostanie zablokowana.
Tanner Sansbury
Co masz na myśli, mówiącrun() „aktualnie” w „Boost.Asio” gwarantuje, że programy obsługi będą działać tylko w wątku, który aktualnie wywołuje .... ” ? Jeśli istnieje N wątków (które wywołały run()), to który z nich jest „bieżącym” wątkiem? Może być ich wiele? Czy masz na myśli to, że wątek, który zakończył wykonywanie async_*()(powiedzmy async_read), ma również gwarancję wywołania swoich programów obsługi?
Nawaz
18

Aby uprościć, jak to rundziała, pomyśl o tym jak o pracowniku, który musi przetworzyć stos papieru; bierze jeden arkusz, robi to, co mówi arkusz, wyrzuca go i bierze następny; kiedy zabraknie mu pościeli, opuszcza biuro. Na każdym arkuszu może znajdować się dowolna instrukcja, nawet dodanie nowego arkusza do stosu. Wracając do asio: możesz oddać io_servicepracę na dwa sposoby, zasadniczo: używając postna niej, tak jak w próbce, którą łączysz, lub używając innych obiektów, które wewnętrznie odwołują postsię do metody io_service, takich jak the socketi jej async_*metody.

Loghorn
źródło