jak słuchać kanałów N? (instrukcja dynamicznego wyboru)

116

aby rozpocząć nieskończoną pętlę wykonywania dwóch goroutines, mogę użyć poniższego kodu:

po otrzymaniu wiadomości uruchomi nowy goroutine i będzie trwał wiecznie.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

Chciałbym teraz mieć takie samo zachowanie dla N gorutyn, ale jak będzie wyglądać instrukcja select w tym przypadku?

To jest kod, od którego zacząłem, ale nie mam pojęcia, jak zakodować instrukcję select

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}
John Smith
źródło
4
Myślę, że chcesz multipleksowania kanałów. golang.org/doc/effective_go.html#chan_of_chan Zasadniczo masz jeden kanał, którego słuchasz, a następnie wiele kanałów podrzędnych, które prowadzą do kanału głównego. Powiązane pytanie SO: stackoverflow.com/questions/10979608/…
Brenden

Odpowiedzi:

152

Możesz to zrobić za pomocą Selectfunkcji z pakietu Reflect :

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select wykonuje operację wyboru opisaną na liście przypadków. Podobnie jak instrukcja Go select, blokuje się, dopóki co najmniej jeden z przypadków nie będzie mógł być kontynuowany, dokona jednolitego wyboru pseudolosowego, a następnie wykona ten przypadek. Zwraca indeks wybranej sprawy i, jeśli ten przypadek był operacją odbioru, otrzymaną wartość i wartość logiczną wskazującą, czy wartość odpowiada wysyłce na kanale (w przeciwieństwie do wartości zerowej otrzymanej z powodu zamknięcia kanału).

Przekazujesz tablicę SelectCasestruktur, które identyfikują kanał do wybrania, kierunek operacji i wartość do wysłania w przypadku operacji wysyłania.

Możesz więc zrobić coś takiego:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

Możesz eksperymentować z bardziej rozbudowanym przykładem tutaj: http://play.golang.org/p/8zwvSk4kjx

James Henstridge
źródło
4
Czy istnieje praktyczne ograniczenie liczby przypadków w takiej selekcji? Taką, że jeśli wyjdziesz poza to, poważnie wpłynie to na wydajność?
Maxim Vladimirsky
4
Może to moja niekompetencja, ale stwierdziłem, że ten wzorzec jest naprawdę trudny do wykorzystania, gdy wysyłasz i odbierasz złożone struktury przez kanał. Przekazanie wspólnego kanału „zbiorczego”, jak powiedział Tim Allclair, było w moim przypadku znacznie łatwiejsze.
Bora M. Alper,
90

Możesz to osiągnąć, opakowując każdy kanał w gorutynę, która „przekazuje” wiadomości do wspólnego kanału „zbiorczego”. Na przykład:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

Jeśli chcesz wiedzieć, z którego kanału pochodzi wiadomość, możesz owinąć ją w strukturę z dodatkowymi informacjami przed przekazaniem jej do kanału zbiorczego.

W moich (ograniczonych) testach ta metoda znacznie przewyższa wydajność przy użyciu pakietu Reflect:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

Kod testu porównawczego tutaj

Tim Allclair
źródło
2
Twój kod testu porównawczego jest nieprawidłowy, musisz powtórzyćb.N test porównawczy. W przeciwnym razie wyniki (które są podzielone przez b.N1 i 2000000000 w wyniku) będą całkowicie bez znaczenia.
Dave C
2
@DaveC Dziękuję! Wniosek się nie zmienia, ale wyniki są znacznie bardziej rozsądne.
Tim Allclair
1
Rzeczywiście, szybko zhakowałem kod twojego testu porównawczego, aby uzyskać rzeczywiste liczby . Bardzo dobrze może być coś, czego wciąż brakuje / jest nie tak z tego testu porównawczego, ale jedyną rzeczą, którą ma do tego bardziej skomplikowany kod refleksu, jest to, że konfiguracja jest szybsza (z GOMAXPROCS = 1), ponieważ nie wymaga wielu goroutines. W każdym innym przypadku prosty kanał łączący gorutynę zdmuchuje roztwór odbicia (o ~ 2 rzędy wielkości).
Dave C
2
Jedną ważną wadą (w porównaniu z reflect.Selectpodejściem) jest to, że gorutynki wykonują bufor scalający przy co najmniej jednej wartości na każdym łączonym kanale. Zwykle nie stanowi to problemu, ale w niektórych konkretnych zastosowaniach może to być przełomem :(.
Dave C
1
buforowany kanał scalający pogarsza problem. Problem polega na tym, że tylko rozwiązanie odbicia może mieć całkowicie niebuforowaną semantykę. Poszedłem naprzód i opublikowałem kod testowy, z którym eksperymentowałem, jako oddzielną odpowiedź (miejmy nadzieję) na wyjaśnienie tego, co próbowałem powiedzieć.
Dave C
22

Aby rozwinąć niektóre komentarze do poprzednich odpowiedzi i zapewnić jaśniejsze porównanie, poniżej przedstawiono przykład obu podejść przedstawionych do tej pory, biorąc pod uwagę te same dane wejściowe, wycinek kanałów, z których można odczytać i funkcję do wywołania dla każdej wartości, która również musi wiedzieć, z której kanał, z którego pochodzi wartość.

Istnieją trzy główne różnice między podejściami:

  • Złożoność. Chociaż może to być częściowo preferencja czytelnika, uważam, że podejście kanałowe jest bardziej idiomatyczne, proste i czytelne.

  • Występ. W moim systemie Xeon amd64 goroutines + channel out wykonuje rozwiązanie odbicia o około dwa rzędy wielkości (ogólnie odbicie w Go jest często wolniejsze i powinno być używane tylko wtedy, gdy jest to absolutnie wymagane). Oczywiście, jeśli występuje jakiekolwiek znaczne opóźnienie w funkcji przetwarzającej wyniki lub w zapisywaniu wartości do kanałów wejściowych, ta różnica w wydajności może łatwo stać się nieistotna.

  • Semantyka blokowania / buforowania. Znaczenie tego zależy od przypadku użycia. Najczęściej albo nie ma to znaczenia, albo niewielkie dodatkowe buforowanie w rozwiązaniu scalającym goroutine może być pomocne dla przepustowości. Jeśli jednak pożądane jest, aby semantyka była odblokowana tylko dla jednego modułu zapisującego, a jego wartość jest w pełni obsługiwana przed odblokowaniem jakiegokolwiek innego modułu zapisującego, można to osiągnąć tylko za pomocą rozwiązania refleksyjnego.

Należy zauważyć, że oba podejścia można uprościć, jeśli „id” kanału wysyłającego nie jest wymagany lub jeśli kanały źródłowe nigdy nie zostaną zamknięte.

Kanał łączenia Goroutine:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Wybierz odbicie:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Pełny kod na placu zabaw Go .]

Dave C.
źródło
1
Warto również zauważyć, że rozwiązanie goroutines + kanały nie może zrobić wszystkiego selectlub reflect.Selectrobi. Gorutyny będą się obracać, dopóki nie pochłoną wszystkiego z kanałów, więc nie ma jasnego sposobu na Process1wcześniejsze wyjście. Istnieje również możliwość wystąpienia problemów, jeśli masz wielu czytelników, ponieważ gorutyny buforują jeden element z każdego z kanałów, co się nie stanie select.
James Henstridge
@JamesHenstridge, Twoja pierwsza uwaga na temat zatrzymania nie jest prawdą. Zaplanowałbyś zatrzymanie Procesu1 dokładnie w taki sam sposób, w jaki zorganizowałbyś zatrzymanie Procesu2; np. przez dodanie kanału „stop”, który jest zamykany, gdy goroutyny powinny się zatrzymać. Proces1 wymagałby dwóch przypadków selectw forpętli zamiast prostszej for rangepętli obecnie używanej. Process2 musiałby umieścić inny przypadek casesi specjalny sposób obsługiwać tę wartość i.
Dave C
To nadal nie rozwiązuje problemu, że odczytujesz wartości z kanałów, które nie będą używane we wczesnym przypadku zatrzymania.
James Henstridge,
0

Dlaczego to podejście nie działałoby przy założeniu, że ktoś wysyła zdarzenia?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}
noonex
źródło
8
To jest pętla spinowa. Podczas oczekiwania na uzyskanie wartości przez kanał wejściowy, zużywa to cały dostępny procesor. Cały sens selectna wielu kanałach (bez defaultklauzuli) polega na tym, że efektywnie czeka, aż przynajmniej jeden będzie gotowy bez wirowania.
Dave C
0

Prawdopodobnie prostsza opcja:

Zamiast mieć tablicę kanałów, dlaczego nie przekazać tylko jednego kanału jako parametru do funkcji uruchamianych na oddzielnych gorutynach, a następnie słuchać kanału w gorutynach konsumenckich?

Dzięki temu możesz wybierać tylko na jednym kanale w swoim odbiorniku, ułatwiając wybór i unikając tworzenia nowych goroutines w celu agregowania wiadomości z wielu kanałów?

Fernando Sanchez
źródło