Czy ktoś może wyjaśnić w prosty sposób, jaki jest wzór zakłócający?

Odpowiedzi:

33

Fowler Artykuł dostawcy dobry grunt, a to wyjaśnienie:

Na prymitywnym poziomie można uznać Disruptor za wykres multiemisji kolejek, w którym producenci umieszczają na nim obiekty, które są wysyłane do wszystkich konsumentów w celu równoległego zużycia poprzez osobne kolejki. Gdy zajrzysz do środka, zobaczysz, że ta sieć kolejek to tak naprawdę pojedyncza struktura danych - bufor pierścieniowy.

Każdy producent i konsument ma licznik sekwencji wskazujący, na którym slocie w buforze aktualnie pracuje. Każdy producent / konsument pisze własny licznik sekwencji, ale może odczytać liczniki sekwencji innych. W ten sposób producent może odczytać liczniki konsumentów, aby upewnić się, że gniazdo, w którym chce pisać, jest dostępne bez żadnych blokad liczników. Podobnie konsument może upewnić się, że przetwarza wiadomości tylko wtedy, gdy zrobi to inny konsument, obserwując liczniki.

wprowadź opis zdjęcia tutaj

Bardziej konwencjonalne podejście może wykorzystywać Kolejkę producenta i Kolejkę klienta, z których każda używa blokad jako mechanizmów współbieżności. W praktyce to, co dzieje się z kolejkami producentów i konsumentów, polega na tym, że przez większość czasu kolejki są całkowicie puste lub całkowicie zapełnione, co powoduje rywalizację o blokadę i marnowanie cykli zegara. Dezintegrator łagodzi to częściowo, ponieważ wszyscy producenci i konsumenci korzystają z tego samego mechanizmu kolejki, koordynując ze sobą, obserwując liczniki sekwencji, a nie stosując mechanizmy blokujące.

Robert Harvey
źródło
9

Z tego artykułu o CoralQueue :

Wzorzec zakłócający jest kolejką wsadową wspieraną przez macierz kołową (tj. Bufor pierścieniowy) wypełnioną wstępnie przydzielonymi obiektami przesyłania, które wykorzystują bariery pamięci do synchronizacji producentów i konsumentów poprzez sekwencje.

Tak więc producenci i konsumenci nie nadepną na siebie wewnątrz okrągłego układu , sprawdzając odpowiadające im sekwencje . Aby przekazywać sobie nawzajem swoje sekwencje, używają barier pamięci zamiast zamków. To najszybszy sposób, w jaki mogą się komunikować bez blokady.

Na szczęście nie trzeba zagłębiać się w wewnętrzne szczegóły wzoru zakłócającego, aby go użyć. Oprócz implementacji LMAX istnieje CoralQueue opracowany przez Coral Blocks, z którym jestem związany. Niektórym łatwiej jest zrozumieć pojęcie, czytając kod, dlatego poniżej znajduje się prosty przykład pojedynczego producenta wysyłającego wiadomości do jednego konsumenta. Możesz również sprawdzić to pytanie na przykładzie demultipleksera (od jednego producenta do wielu konsumentów).

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.Builder;

public class Basics {

    public static void main(String[] args) {

        final Queue<StringBuilder> queue = new AtomicQueue<StringBuilder>(1024, new Builder<StringBuilder>() {
            @Override
            public StringBuilder newInstance() {
                return new StringBuilder(1024);
            }
        });

        Thread producer = new Thread(new Runnable() {

            private final StringBuilder getStringBuilder() {
                StringBuilder sb;
                while((sb = queue.nextToDispatch()) == null) {
                    // queue can be full if the size of the queue
                    // is small and/or the consumer is too slow

                    // busy spin (you can also use a wait strategy instead)
                }
                return sb;
            }

            @Override
            public void run() {

                StringBuilder sb;

                while(true) { // the main loop of the thread

                    // (...) do whatever you have to do here...

                    // and whenever you want to send a message to
                    // the other thread you can just do:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hello!");
                    queue.flush();

                    // you can also send in batches to increase throughput:
                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi!");

                    sb = getStringBuilder();
                    sb.setLength(0);
                    sb.append("Hi again!");

                    queue.flush(); // dispatch the two messages above...
                }
            }
        }, "Producer");

        Thread consumer = new Thread(new Runnable() {

            @Override
            public void run() {

                while (true) { // the main loop of the thread

                    // (...) do whatever you have to do here...

                    // and whenever you want to check if the producer
                    // has sent a message you just do:

                    long avail;
                    while((avail = queue.availableToPoll()) == 0) {
                        // queue can be empty!
                        // busy spin (you can also use a wait strategy instead)
                    }

                    for(int i = 0; i < avail; i++) {
                        StringBuilder sb = queue.poll();
                        // (...) do whatever you want to do with the data
                        // just don't call toString() to create garbage...
                        // copy byte-by-byte instead...
                    }
                    queue.donePolling();
                }
            }
        }, "Consumer");

        consumer.start();
        producer.start();
    }
}

Oświadczenie: Jestem jednym z twórców CoralQueue.

rdalmeida
źródło
1
Byłoby miło podać swoją przynależność do opisywanego oprogramowania.
Deer Hunter