Wiele gorutyn słuchających na jednym kanale

84

Mam wiele goroutines próbujących jednocześnie odbierać na tym samym kanale. Wygląda na to, że ostatnia gorutyna, która zaczyna odbierać na kanale, otrzymuje wartość. Czy jest to gdzieś w specyfikacji języka, czy jest to niezdefiniowane zachowanie?

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        <-c
        c <- fmt.Sprintf("goroutine %d", i)
    }(i)
}
c <- "hi"
fmt.Println(<-c)

Wynik:

goroutine 4

Przykład na placu zabaw

EDYTOWAĆ:

Właśnie zdałem sobie sprawę, że to bardziej skomplikowane niż myślałem. Wiadomość rozchodzi się po wszystkich gorutynach.

c := make(chan string)
for i := 0; i < 5; i++ {
    go func(i int) {
        msg := <-c
        c <- fmt.Sprintf("%s, hi from %d", msg, i)
    }(i)
}
c <- "original"
fmt.Println(<-c)

Wynik:

original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4

Przykład na placu zabaw

Ilia Choly
źródło
6
Wypróbowałem twój ostatni fragment i (ku mojej ogromnej uldze) wyszedł tylko original, hi from 4...
Chang Qian,
1
@ChangQian dodanie time.Sleep(time.Millisecond)między kanałem wysyłania i odbierania przywraca stare zachowanie.
Ilia Choly

Odpowiedzi:

78

Tak, to skomplikowane, ale istnieje kilka praktycznych zasad, które powinny sprawić, że wszystko będzie o wiele prostsze.

  • wolą używać formalnych argumentów dla kanałów które przekazujesz do procedur go, zamiast uzyskiwać dostęp do kanałów o zasięgu globalnym. W ten sposób możesz uzyskać więcej sprawdzania kompilatora, a także lepszą modułowość.
  • unikaj czytania i pisania na tym samym kanale w ramach określonej procedury go (w tym „głównej”). W przeciwnym razie impas jest znacznie większym ryzykiem.

Oto alternatywna wersja twojego programu, stosując te dwie wskazówki. Ten przypadek pokazuje wielu pisarzy i jednego czytelnika na kanale:

c := make(chan string)

for i := 1; i <= 5; i++ {
    go func(i int, co chan<- string) {
        for j := 1; j <= 5; j++ {
            co <- fmt.Sprintf("hi from %d.%d", i, j)
        }
    }(i, c)
}

for i := 1; i <= 25; i++ {
    fmt.Println(<-c)
}

http://play.golang.org/p/quQn7xePLw

Tworzy pięć podprogramów go zapisujących na jednym kanale, z których każda zapisuje pięć razy. Główna procedura go odczytuje wszystkie dwadzieścia pięć komunikatów - możesz zauważyć, że kolejność, w jakiej się pojawiają, często nie jest sekwencyjna (tj. Współbieżność jest oczywista).

Ten przykład demonstruje funkcję kanałów Go: możliwe jest współdzielenie jednego kanału przez wielu autorów; Go automatycznie przepleci wiadomości.

To samo dotyczy jednego pisarza i wielu czytelników na jednym kanale, jak widać w drugim przykładzie tutaj:

c := make(chan int)
var w sync.WaitGroup
w.Add(5)

for i := 1; i <= 5; i++ {
    go func(i int, ci <-chan int) {
        j := 1
        for v := range ci {
            time.Sleep(time.Millisecond)
            fmt.Printf("%d.%d got %d\n", i, j, v)
            j += 1
        }
        w.Done()
    }(i, c)
}

for i := 1; i <= 25; i++ {
    c <- i
}
close(c)
w.Wait()

Ten drugi przykład obejmuje oczekiwanie nałożone na główną gorutynę, która w przeciwnym razie natychmiast zakończyłaby pracę i spowodowała wcześniejsze zakończenie pozostałych pięciu gorutyn (dzięki olovowi za tę korektę) .

W obu przykładach buforowanie nie było potrzebne. Ogólnie rzecz biorąc, dobrą zasadą jest postrzeganie buforowania tylko jako środka zwiększającego wydajność. Jeśli twój program nie blokuje się bez buforów, nie blokuje się również z buforami (ale odwrotność nie zawsze jest prawdą). Tak więc, zgodnie z kolejną praktyczną zasadą, zacznij bez buforowania, a następnie dodaj je później, w razie potrzeby .

Rick-777
źródło
nie musisz czekać na zakończenie wszystkich gorutyn?
mlbright
Zależy, co masz na myśli. Spójrz na przykłady play.golang.org; mają mainfunkcję, która kończy się po osiągnięciu końca, niezależnie od tego, co robią inne gorutyny. W pierwszym przykładzie powyżej mainjest to blokada z innymi gorutynami, więc nie ma problemu. Drugi przykład również działa bez problemu, ponieważ wszystkie wiadomości są wysyłane za pośrednictwem c przedclose funkcja jest wywoływana i to się dzieje , zanim z mainwygaśnięciem goroutine. (Możesz argumentować, że dzwonienie closejest zbyteczne w tym przypadku, ale to dobra praktyka.)
Rick-777
1
zakładając, że chcesz (deterministycznie) zobaczyć 15 wydruków w ostatnim przykładzie, musisz poczekać. Aby wykazać, że tutaj jest to samo, ale z przykładem time.Sleep tuż przed printf: play.golang.org/p/cEP-UBPLv6
Olov
A oto sam przykład z time.Sleep i ustalona z WaitGroup czekać na goroutines: play.golang.org/p/ESq9he_WzS
Olov
Nie sądzę, aby na początku pomijać buforowanie. Bez buforowania w rzeczywistości nie piszesz kodu współbieżnego, a to prowadzi nie tylko do tego, że nie możesz zakleszczać się, ale także do tego, że wynik obsługi z drugiej strony kanału jest już dostępny w następnej instrukcji po wysłaniu, i możesz nieumyślnie (lub zdarzenie celowo w przypadku nowicjusza) na tym polegać. A kiedy już polegasz na fakcie, że otrzymujesz wynik natychmiast, bez specjalnego czekania na niego, i dodasz bufor, masz stan wyścigu.
użytkownik
25

Spóźniona odpowiedź, ale mam nadzieję, że pomoże to innym w przyszłości, np. Długie ankiety, przycisk „Globalny”, Rozgłaszanie do wszystkich?

Effective Go wyjaśnia problem:

Odbiorcy zawsze blokują się, dopóki nie ma danych do odebrania.

Oznacza to, że nie możesz słuchać więcej niż 1 gorutyny na 1 kanale i oczekiwać, że WSZYSTKIE gorutyny otrzymają tę samą wartość.

Uruchom ten przykład kodu .

package main

import "fmt"

func main() {
    c := make(chan int)

    for i := 1; i <= 5; i++ {
        go func(i int) {
        for v := range c {
                fmt.Printf("count %d from goroutine #%d\n", v, i)
            }
        }(i)
    }

    for i := 1; i <= 25; i++ {
        c<-i
    }

    close(c)
}

Nie zobaczysz słowa „count 1” więcej niż raz, mimo że kanał słucha 5 goroutines. Dzieje się tak, ponieważ kiedy pierwsza gorutyna blokuje kanał, wszystkie inne gorutyny muszą czekać w kolejce. Kiedy kanał jest odblokowany, licznik został już odebrany i usunięty z kanału, więc następna goroutine w linii otrzymuje następną wartość licznika.

Brenden
źródło
1
Dzięki - teraz ten przykład ma sens github.com/goinaction/code/blob/master/chapter6/listing20/…
user31208
Ach, to było pomocne. Czy dobrą alternatywą byłoby utworzenie kanału dla każdej procedury Go wymagającej informacji, a następnie wysłanie wiadomości na wszystkich kanałach, gdy jest to konieczne? To jest opcja, którą mogę sobie wyobrazić.
ThePartyTurtle
9

To skomplikowane.

Zobacz też, co się stanie z GOMAXPROCS = NumCPU+1. Na przykład,

package main

import (
    "fmt"
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU() + 1)
    fmt.Print(runtime.GOMAXPROCS(0))
    c := make(chan string)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- ", original"
    fmt.Println(<-c)
}

Wynik:

5, original, hi from 4

Zobacz, co się dzieje z kanałami buforowanymi. Na przykład,

package main

import "fmt"

func main() {
    c := make(chan string, 5+1)
    for i := 0; i < 5; i++ {
        go func(i int) {
            msg := <-c
            c <- fmt.Sprintf("%s, hi from %d", msg, i)
        }(i)
    }
    c <- "original"
    fmt.Println(<-c)
}

Wynik:

original

Powinieneś być w stanie wyjaśnić również te przypadki.

peterSO
źródło
7

Przestudiowałem istniejące rozwiązania i stworzyłem prostą bibliotekę transmisji https://github.com/grafov/bcast .

    group := bcast.NewGroup() // you created the broadcast group
    go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members

    member := group.Join() // then you join member(s) from other goroutine(s)
    member.Send("test message") // or send messages of any type to the group 

    member1 := group.Join() // then you join member(s) from other goroutine(s)
    val := member1.Recv() // and for example listen for messages
Alexander I.Grafov
źródło
2
Świetna libacja, którą tam masz! Znalazłem również github.com/asaskevich/EventBus
użytkownik
I nie jest to wielka sprawa, ale być może powinieneś wspomnieć o tym, jak się odłączyć w pliku readme.
użytkownik
Wyciek pamięci tam
jhvaras
:( Czy możesz wyjaśnić szczegóły @jhvaras?
Alexander I.Grafov
2

Aby słuchać wielu gorutynów na jednym kanale, tak, jest to możliwe. kluczową kwestią jest sama wiadomość, możesz zdefiniować taką wiadomość:

package main

import (
    "fmt"
    "sync"
)

type obj struct {
    msg string
    receiver int
}

func main() {
    ch := make(chan *obj) // both block or non-block are ok
    var wg sync.WaitGroup
    receiver := 25 // specify receiver count

    sender := func() {
        o := &obj {
            msg: "hello everyone!",
            receiver: receiver,
        }
        ch <- o
    }
    recv := func(idx int) {
        defer wg.Done()
        o := <-ch
        fmt.Printf("%d received at %d\n", idx, o.receiver)
        o.receiver--
        if o.receiver > 0 {
            ch <- o // forward to others
        } else {
            fmt.Printf("last receiver: %d\n", idx)
        }
    }

    go sender()
    for i:=0; i<reciever; i++ {
        wg.Add(1)
        go recv(i)
    }

    wg.Wait()
}

Wynik jest losowy:

5 received at 25
24 received at 24
6 received at 23
7 received at 22
8 received at 21
9 received at 20
10 received at 19
11 received at 18
12 received at 17
13 received at 16
14 received at 15
15 received at 14
16 received at 13
17 received at 12
18 received at 11
19 received at 10
20 received at 9
21 received at 8
22 received at 7
23 received at 6
2 received at 5
0 received at 4
1 received at 3
3 received at 2
4 received at 1
last receiver 4
coanor
źródło