Jak czekać, aż wszystkie gorutiny skończą się, nie tracąc czasu.

111

Ten kod wybiera wszystkie pliki xml w tym samym folderze, ponieważ wywołany plik wykonywalny i asynchronicznie stosuje przetwarzanie do każdego wyniku w metodzie wywołania zwrotnego (w poniższym przykładzie drukowana jest tylko nazwa pliku).

Jak uniknąć używania metody uśpienia, aby główna metoda nie zakończyła się? Mam problemy z owinięciem głowy wokół kanałów (zakładam, że właśnie tego potrzeba, aby zsynchronizować wyniki), więc każda pomoc jest mile widziana!

package main

import (
    "fmt"
    "io/ioutil"
    "path"
    "path/filepath"
    "os"
    "runtime"
    "time"
)

func eachFile(extension string, callback func(file string)) {
    exeDir := filepath.Dir(os.Args[0])
    files, _ := ioutil.ReadDir(exeDir)
    for _, f := range files {
            fileName := f.Name()
            if extension == path.Ext(fileName) {
                go callback(fileName)
            }
    }
}


func main() {
    maxProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcs)

    eachFile(".xml", func(fileName string) {
                // Custom logic goes in here
                fmt.Println(fileName)
            })

    // This is what i want to get rid of
    time.Sleep(100 * time.Millisecond)
}
Dante
źródło

Odpowiedzi:

175

Możesz użyć sync.WaitGroup . Cytując połączony przykład:

package main

import (
        "net/http"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) {
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                }(url)
        }
        // Wait for all HTTP fetches to complete.
        wg.Wait()
}
zzzz
źródło
11
Czy jest jakiś powód, dla którego musisz zrobić wg.Add (1) poza rutynową procedurą go? Czy możemy to zrobić w środku tuż przed odroczeniem wg.Done ()?
sobota
19
sat, tak, jest powód, jest to opisane w sync.WaitGroup.Dodaj dokumenty: Note that calls with positive delta must happen before the call to Wait, or else Wait may wait for too small a group. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. See the WaitGroup example.
wobmene
18
Dostosowanie tego kodu spowodowało długą sesję debugowania, ponieważ mój goroutine był funkcją o nazwie, a przekazanie WaitGroup jako wartości spowoduje jej skopiowanie i sprawi, że wg.Done () nie będzie działać. Chociaż można to naprawić, przekazując wskaźnik & wg, lepszym sposobem uniknięcia takich błędów jest zadeklarowanie zmiennej WaitGroup jako wskaźnika w pierwszej kolejności: wg := new(sync.WaitGroup)zamiast var wg sync.WaitGroup.
Robert Jack Will
Wydaje mi się, że warto pisać wg.Add(len(urls))tuż nad linią for _, url := range urls, uważam, że lepiej jest użyć opcji Dodaj tylko raz.
Victor
@RobertJackWill: Dobra uwaga! Przy okazji, jest to omówione w dokumentacji : „Grupa WaitGroup nie może być kopiowana po pierwszym użyciu. Too bad Go nie ma sposobu na wymuszenie tego . W rzeczywistości jednak go vetwykrywa ten przypadek i ostrzega za pomocą„ func przechodzi blokada według wartości : sync.WaitGroup zawiera sync.noCopy ".
Brent Bradburn
57

WaitGroups to zdecydowanie kanoniczny sposób na zrobienie tego. Jednak tylko dla kompletności, oto rozwiązanie, które było powszechnie używane przed wprowadzeniem WaitGroups. Podstawowym pomysłem jest użycie kanału do powiedzenia „Skończyłem” i spowodowanie, aby główny rutynowy czekał, aż każda z nich zgłosi zakończenie.

func main() {
    c := make(chan struct{}) // We don't need any data to be passed, so use an empty struct
    for i := 0; i < 100; i++ {
        go func() {
            doSomething()
            c <- struct{}{} // signal that the routine has completed
        }()
    }

    // Since we spawned 100 routines, receive 100 messages.
    for i := 0; i < 100; i++ {
        <- c
    }
}
joshlf
źródło
9
Miło widzieć rozwiązanie z prostymi kanałami. Dodatkowy bonus: jeśli doSomething()zwróci jakiś wynik, to możesz umieścić go na kanale, a możesz zebrać i przetworzyć wyniki w drugiej pętli for (jak tylko będą gotowe)
andras
5
Działa tylko wtedy, gdy znasz już ilość gorutyny, którą chcesz rozpocząć. A co, jeśli piszesz jakiegoś robota html i uruchamiasz rekurencyjnie wszystkie linki na stronie?
shinydev
Będziesz musiał jakoś to śledzić, niezależnie od tego. Z WaitGroups jest to trochę łatwiejsze, ponieważ za każdym razem, gdy spawnujesz nowy goroutine, możesz to zrobić najpierw, wg.Add(1)a zatem będzie je śledzić. W przypadku kanałów byłoby to nieco trudniejsze.
joshlf
c zablokuje ponieważ wszystkie procedury GO próbować uzyskać do niego dostęp, a to niebuforowana
Edwin Ikechukwu Okonkwo
Jeśli przez „blok” masz na myśli, że program się zablokuje, to nieprawda. Możesz spróbować uruchomić to samodzielnie. Powodem jest to, że jedyne gorutyny, do których piszą, cróżnią się od głównej gorutyny, z której czytamy c. W ten sposób główna gorutyna jest zawsze dostępna, aby odczytać wartość z kanału, co nastąpi, gdy jeden z gorutyn będzie dostępny do zapisania wartości w kanale. Masz rację, że gdyby ten kod nie spowodował pojawienia się gorutyn, ale zamiast tego uruchomił wszystko w jednym, to by się zablokował.
joshlf
8

sync.WaitGroup może Ci tutaj pomóc.

package main

import (
    "fmt"
    "sync"
    "time"
)


func wait(seconds int, wg * sync.WaitGroup) {
    defer wg.Done()

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println("Slept ", seconds, " seconds ..")
}


func main() {
    var wg sync.WaitGroup

    for i := 0; i <= 5; i++ {
        wg.Add(1)   
        go wait(i, &wg)
    }
    wg.Wait()
}
dimmg
źródło
1

Chociaż sync.waitGroup(wg) jest kanoniczną drogą do przodu, wymaga wykonania przynajmniej niektórych wg.Addpołączeń przed wg.Waitwykonaniem wszystkich. Może to być niewykonalne w przypadku prostych rzeczy, takich jak robot sieciowy, w przypadku którego nie znasz wcześniej liczby wywołań rekurencyjnych, a pobranie danych, które je napędzają, zajmuje trochę czasu wg.Add. W końcu musisz załadować i przeanalizować pierwszą stronę, zanim poznasz rozmiar pierwszej partii stron podrzędnych.

Napisałem rozwiązanie przy użyciu kanałów, unikając waitGroupw swoim rozwiązaniu ćwiczenia Tour of Go - web crawler . Za każdym razem, gdy uruchamiana jest jedna lub więcej procedur go, wysyłasz numer do childrenkanału. Za każdym razem, gdy procedura GO ma się zakończyć, wysyłasz 1do donekanału. Kiedy suma dzieci równa się sumie wykonanych, skończymy.

Moim jedynym problemem jest zakodowany rozmiar resultskanału, ale jest to (obecne) ograniczenie Go.


// recursionController is a data structure with three channels to control our Crawl recursion.
// Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
// The idea is to have three channels, counting the outstanding calls (children), completed calls 
// (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
// sufficiently careful to signal any new children before closing your current one, as you may be the last one).
//
type recursionController struct {
    results  chan string
    children chan int
    done     chan int
}

// instead of instantiating one instance, as we did above, use a more idiomatic Go solution
func NewRecursionController() recursionController {
    // we buffer results to 1000, so we cannot crawl more pages than that.  
    return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
}

// recursionController.Add: convenience function to add children to controller (similar to waitGroup)
func (rc recursionController) Add(children int) {
    rc.children <- children
}

// recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
func (rc recursionController) Done() {
    rc.done <- 1
}

// recursionController.Wait will wait until all children are done
func (rc recursionController) Wait() {
    fmt.Println("Controller waiting...")
    var children, done int
    for {
        select {
        case childrenDelta := <-rc.children:
            children += childrenDelta
            // fmt.Printf("children found %v total %v\n", childrenDelta, children)
        case <-rc.done:
            done += 1
            // fmt.Println("done found", done)
        default:
            if done > 0 && children == done {
                fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
                close(rc.results)
                return
            }
        }
    }
}

Pełny kod źródłowy rozwiązania

dirkjot
źródło
1

Oto rozwiązanie wykorzystujące WaitGroup.

Najpierw zdefiniuj 2 metody narzędziowe:

package util

import (
    "sync"
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) {
    allNodesWaitGroup.Add(1)
    go func() {
        defer allNodesWaitGroup.Done()
        f()
    }()
}

func WaitForAllNodes() {
    allNodesWaitGroup.Wait()
}

Następnie zamień wywołanie callback:

go callback(fileName)

W przypadku wywołania funkcji narzędzia:

util.GoNode(func() { callback(fileName) })

Ostatni krok, dodaj tę linię na końcu swojego main, zamiast twojego sleep. Dzięki temu główny wątek będzie czekał na zakończenie wszystkich procedur, zanim program będzie mógł się zatrzymać.

func main() {
  // ...
  util.WaitForAllNodes()
}
gamliela
źródło