Oblicz średnią prędkość dróg [zamknięte]

20

Poszedłem na rozmowę o pracę inżyniera danych. Ankieter zadał mi pytanie. Dał mi pewną sytuację i poprosił mnie o zaprojektowanie przepływu danych dla tego systemu. Rozwiązałem to, ale nie podobało mi się moje rozwiązanie i nie udało mi się. Chciałbym wiedzieć, czy masz lepsze pomysły na rozwiązanie tego wyzwania.

Pytanie brzmiało:

Nasz system odbiera cztery strumienie danych. Dane zawierają identyfikator pojazdu, prędkość i koordynacje geolokalizacyjne. Każdy pojazd wysyła swoje dane raz na minutę. Nie ma połączenia między określonym strumieniem a określoną drogą, pojazdem lub czymkolwiek innym. Istnieje funkcja, która akceptuje koordynacje i zwraca nazwę odcinka drogi. Musimy znać średnią prędkość na odcinku drogi na 5 minut. Wreszcie chcemy zapisać wyniki do Kafki.

wprowadź opis zdjęcia tutaj

Więc moim rozwiązaniem było:

Najpierw zapisujemy wszystkie dane w klastrze Kafka, w jednym temacie, podzielonym przez 5-6 pierwszych cyfr szerokości geograficznej połączonych z 5-6 pierwszymi cyframi długości geograficznej. Następnie odczytaj dane według Structured Streaming, dodając dla każdego wiersza nazwę odcinka drogi przez koordynacje (jest do tego predefiniowany udf), a następnie dzieląc dane według nazwy odcinka drogi.

Ponieważ dzielę dane w Kafce według 5-6 pierwszych cyfr koordynacji, po przetłumaczeniu koordynacji na nazwę sekcji nie ma potrzeby przesyłania dużej ilości danych do poprawnej partycji i dlatego mogę skorzystać z operacji colesce () nie powoduje to pełnego losowania.

Następnie obliczamy średnią prędkość na executora.

Cały proces będzie się odbywał co 5 minut, a my zapisamy dane w trybie Append do ostatecznego zlewu Kafka.

wprowadź opis zdjęcia tutaj

Więc znowu, ankieterowi nie spodobało się moje rozwiązanie. Czy ktoś mógłby zasugerować, jak to ulepszyć lub zupełnie inny i lepszy pomysł?

Alon
źródło
Czy nie lepiej byłoby zapytać osobę, której dokładnie nie lubił?
Gino Pane
Myślę, że kiepskim pomysłem jest dzielenie według konkatenowanej długości. Czy punkt danych nie będzie zgłaszany dla każdej linii jako nieco inna współrzędna?
webber
@ webber dlatego biorę tylko kilka cyfr, więc pozycja nie będzie unikalna, ale względnie wielkości odcinka drogi.
Alon

Odpowiedzi:

6

Uznałem to pytanie za bardzo interesujące i pomyślałem o podjęciu próby.

Jak oceniłem dalej, twoja próba jest dobra, z wyjątkiem następujących:

podzielony przez 5-6 pierwszych cyfr szerokości geograficznej połączonych z 5-6 pierwszymi cyframi długości geograficznej

Jeśli masz już metodę uzyskania identyfikatora / nazwy odcinka drogi na podstawie szerokości i długości geograficznej, dlaczego nie wywołać tej metody w pierwszej kolejności i użyć identyfikatora / nazwy odcinka drogi do podziału danych na pierwszym miejscu?

A potem wszystko jest dość łatwe, więc topologia będzie

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key -> 
Use time windowed aggregation for the given time ->
Materialize it to a store. 

(Bardziej szczegółowe wyjaśnienia można znaleźć w komentarzach w kodzie poniżej. Zapytaj, czy coś jest niejasne)

Dodałem kod na końcu tej odpowiedzi, pamiętaj, że zamiast średniej użyłem sumy, ponieważ łatwiej jest to zademonstrować. Można zrobić średnią, przechowując dodatkowe dane.

Szczegółowo opisałem odpowiedź w komentarzach. Poniżej znajduje się schemat topologii wygenerowany z kodu (dzięki https://zz85.github.io/kafka-streams-viz/ )

Topologia:

Schemat topologii

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class VehicleStream {
        // 5 minutes aggregation window
        private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;

        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();

            // Setting configs, change accordingly
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            // initializing  a streambuilder for building topology.
            final StreamsBuilder builder = new StreamsBuilder();

            // Our initial 4 streams.
            List<String> streamInputTopics = Arrays.asList(
                    "vehicle.stream1", "vehicle.stream2",
                    "vehicle.stream3", "vehicle.stream4"
            );
            /*
             * Since there is no connection between a specific stream
             * to a specific road or vehicle or anything else,
             * we can take all four streams as a single stream
             */
            KStream<String, String> source = builder.stream(streamInputTopics);

            /*
             * The initial key is unimportant (which can be ignored),
             * Instead, we will be using the section name/id as key.
             * Data will contain comma separated values in following format.
             * VehicleId,Speed,Latitude,Longitude
             */
            WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
                    "windowSpeedStore",
                    AGGREGATION_WINDOW,
                    2, 10, true
            );
            source
                    .peek((k, v) -> printValues("Initial", k, v))
                    // First, we rekey the stream based on the road section.
                    .selectKey(VehicleStream::selectKeyAsRoadSection)
                    .peek((k, v) -> printValues("After rekey", k, v))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
                    .aggregate(
                            () -> "0.0", // Initialize
                            /*
                             * I'm using summing here for the aggregation as that's easier.
                             * It can be converted to average by storing extra details on number of records, etc..
                             */
                            (k, v, previousSpeed) ->  // Aggregator (summing speed)
                                    String.valueOf(
                                            Double.parseDouble(previousSpeed) +
                                                    VehicleSpeed.getVehicleSpeed(v).speed
                                    ),
                            Materialized.as(windowSpeedStore)
                    );
            // generating the topology
            final Topology topology = builder.build();
            System.out.print(topology.describe());

            // constructing a streams client with the properties and topology
            final KafkaStreams streams = new KafkaStreams(topology, properties);
            final CountDownLatch latch = new CountDownLatch(1);

            // attaching shutdown handler
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }


        private static void printValues(String message, String key, Object value) {
            System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
        }

        private static String selectKeyAsRoadSection(String key, String speedValue) {
            // Would make more sense when it's the section id, rather than a name.
            return coordinateToRoadSection(
                    VehicleSpeed.getVehicleSpeed(speedValue).latitude,
                    VehicleSpeed.getVehicleSpeed(speedValue).longitude
            );
        }

        private static String coordinateToRoadSection(String latitude, String longitude) {
            // Dummy function
            return "Area 51";
        }

        public static class VehicleSpeed {
            public String vehicleId;
            public double speed;
            public String latitude;
            public String longitude;

            public static VehicleSpeed getVehicleSpeed(String data) {
                return new VehicleSpeed(data);
            }

            public VehicleSpeed(String data) {
                String[] dataArray = data.split(",");
                this.vehicleId = dataArray[0];
                this.speed = Double.parseDouble(dataArray[1]);
                this.latitude = dataArray[2];
                this.longitude = dataArray[3];
            }

            @Override
            public String toString() {
                return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
            }
        }
    }
Irshad PI
źródło
Czy łączenie wszystkich strumieni nie jest złym pomysłem? Może to stać się wąskim gardłem w przepływie danych. Co się stanie, gdy zaczniesz otrzymywać coraz więcej strumieni wejściowych w miarę wzrostu systemu? Czy będzie to skalowalne?
wypul
@wypul> czy łączenie wszystkich strumieni nie jest złym pomysłem? -> Myślę, że nie. Równoległości w Kafce nie osiąga się za pomocą strumieni, ale poprzez partycje (i zadania), wątki itp. Strumienie są sposobem na grupowanie danych. > Czy będzie to skalowalne? -> tak. Ponieważ kluczujemy według odcinków dróg i zakładamy, że odcinki dróg są sprawiedliwie rozmieszczone, możemy zwiększyć liczbę partycji dla tych tematów, aby równolegle przetwarzać strumień w różnych kontenerach. Możemy użyć dobrego algorytmu partycjonowania opartego na odcinku drogi, aby rozłożyć obciążenie na repliki.
Irshad PI
1

Problem jako taki wydaje się prosty, a oferowane rozwiązania mają już wiele sensu. Zastanawiam się, czy ankieter był zaniepokojony projektem i wydajnością rozwiązania, na którym się skupiłeś, czy też dokładnością wyniku. Ponieważ inni skupili się na kodzie, projekcie i wydajności, będę się liczył na dokładność.

Rozwiązanie do przesyłania strumieniowego

Gdy napływają dane, możemy z grubsza oszacować średnią prędkość drogi. Oszacowanie to będzie pomocne w wykrywaniu zatorów, ale będzie wyłączone przy określaniu ograniczenia prędkości.

  1. Połącz wszystkie 4 strumienie danych razem.
  2. Utwórz okno o długości 5 minut, aby przechwycić dane ze wszystkich 4 strumieni w 5 minut.
  3. Zastosuj UDF do współrzędnych, aby uzyskać nazwę ulicy i nazwę miasta. Nazwy ulic są często powielane w różnych miastach, więc użyjemy nazwy miasta + ulicy jako klucza.
  4. Oblicz średnią prędkość za pomocą składni takiej jak -

    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

5. write the result to the Kafka Topic

Rozwiązanie wsadowe

To oszacowanie będzie wyłączone, ponieważ wielkość próby jest niewielka. Będziemy potrzebować przetwarzania wsadowego dla danych z całego miesiąca / kwartału / roku, aby dokładniej określić ograniczenie prędkości.

  1. Czytaj dane z lata z jeziora danych (lub tematu Kafki)

  2. Zastosuj UDF do współrzędnych, aby uzyskać nazwę ulicy i nazwę miasta.

  3. Oblicz średnią prędkość za pomocą składni takiej jak -


    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

  1. zapisz wynik do jeziora danych.

Na podstawie tego dokładniejszego ograniczenia prędkości możemy przewidzieć powolny ruch w aplikacji do przesyłania strumieniowego.

Salim
źródło
1

Widzę kilka problemów z twoją strategią partycjonowania:

  • Kiedy mówisz, że zamierzasz podzielić dane na podstawie pierwszych 5-6 cyfr długości, nie będziesz w stanie określić liczby partycji kafka wcześniej. Będziesz miał wypaczone dane, ponieważ na niektórych odcinkach dróg zaobserwujesz dużą głośność niż na innych.

  • Twoja kombinacja klawiszy i tak nie gwarantuje tych samych danych odcinków drogi w tej samej partycji, dlatego nie możesz być pewien, że nie będzie tasowania.

Podane przez IMO informacje nie są wystarczające do zaprojektowania całego potoku danych. Ponieważ podczas projektowania potoku ważną rolę odgrywa sposób podziału danych na partycje. Powinieneś dowiedzieć się więcej o otrzymywanych danych, takich jak liczba pojazdów, wielkość strumieni danych wejściowych, czy liczba strumieni jest stała, czy może ona wzrosnąć w przyszłości? Czy otrzymywane strumienie danych wejściowych są strumieniami kafka? Ile danych otrzymujesz w 5 minut?

  • Załóżmy teraz, że masz 4 strumienie zapisane na 4 tematy w kafce lub 4 partycjach i nie masz żadnego konkretnego klucza, ale twoje dane są podzielone na partycje na podstawie jakiegoś klucza centrum danych lub jest podzielone na hash. Jeśli nie, należy to zrobić po stronie danych, zamiast kopiować dane w innym strumieniu kafka i partycjonować.
  • Jeśli otrzymujesz dane z różnych centrów danych, musisz przenieść je do jednego klastra i w tym celu możesz użyć narzędzia do tworzenia kopii lustrzanych Kafka lub czegoś podobnego.
  • Po zgromadzeniu wszystkich danych w jednym klastrze możesz uruchomić ustrukturyzowane zadanie przesyłania strumieniowego z 5-minutowym interwałem wyzwalania i znakiem wodnym w zależności od wymagań.
  • Aby obliczyć średnią i uniknąć częstego tasowania, możesz użyć kombinacji mapValuesi reduceByKeyzamiast groupBy. Zobacz to .
  • Możesz zapisać dane do zlewu Kafka po przetworzeniu.
wypul
źródło
mapValues ​​i redukcjaByKey należą do niskiego poziomu RDD. Czy Catalyst nie jest wystarczająco inteligentny, aby wygenerować najbardziej wydajny RDD, kiedy grupuję i obliczam średnią?
Alon
@Alon Catalyst z pewnością będzie w stanie znaleźć najlepszy plan uruchomienia zapytania, ale jeśli użyjesz groupBy, dane z tym samym kluczem zostaną najpierw przetasowane na tej samej partycji, a następnie zastosują na tym operację agregacji. mapValuesi reduceByrzeczywiście należy do RDD niskiego poziomu, ale nadal będzie działał lepiej w tej sytuacji, ponieważ najpierw oblicza agregację na partycję, a następnie tasuje.
wypul
0

Główne problemy, które widzę przy tym rozwiązaniu, to:

  • Odcinki dróg, które znajdują się na krawędzi 6-cyfrowego kwadratu mapy, będą zawierały dane w wielu partycjach tematycznych i będą miały wiele średnich prędkości.
  • Rozmiar danych spożycia dla twoich partycji Kafka może być niezrównoważony (miasto vs pustynia). Partycjonowanie według pierwszych cyfr samochodu może być dobrym pomysłem IMO.
  • Nie jestem pewien, czy podążyłem za częścią koalescencyjną, ale brzmi to problematycznie.

Powiedziałbym, że rozwiązanie musi zrobić: odczytać ze strumienia Kafka -> UDF -> zgrupować odcinek drogi -> średnio -> zapisać do strumienia Kafka.

David Taub
źródło
0

Mój projekt będzie zależał od

  1. Liczba dróg
  2. Liczba pojazdów
  3. Koszt obliczeniowy drogi ze współrzędnych

Jeśli chcę skalować dla dowolnej liczby obliczeń, projekt wyglądałby tak wprowadź opis zdjęcia tutaj

Obawy dotyczące tego projektu -

  1. Utrzymaj trwały stan strumieni wejściowych (jeśli wejście to kafka, możemy przechowywać przesunięcia za pomocą Kafki lub zewnętrznie)
  2. Okresowo stany punktów kontrolnych do systemu zewnętrznego (wolę używać asynchronicznych barier punktów kontrolnych we Flink )

Możliwe praktyczne ulepszenia tego projektu -

  1. Buforowanie funkcji mapowania odcinków dróg, jeśli to możliwe, na podstawie dróg
  2. Obsługa pominiętych pingów (w praktyce nie każdy ping jest dostępny)
  3. Biorąc pod uwagę krzywiznę drogi (biorąc pod uwagę namiar i wysokość)
yugandhar
źródło