Używam apache kafka do przesyłania wiadomości. Wdrożyłem producenta i konsumenta w Javie. Jak możemy uzyskać liczbę wiadomości w temacie?
źródło
Używam apache kafka do przesyłania wiadomości. Wdrożyłem producenta i konsumenta w Javie. Jak możemy uzyskać liczbę wiadomości w temacie?
Jedynym sposobem, który przychodzi na myśl z punktu widzenia konsumenta, jest faktyczne skonsumowanie wiadomości i ich policzenie.
Broker Kafka ujawnia liczniki JMX dla liczby wiadomości odebranych od momentu uruchomienia, ale nie można wiedzieć, ile z nich zostało już wyczyszczonych.
W większości scenariuszy wiadomości w Kafce są najlepiej postrzegane jako nieskończony strumień, a uzyskanie dyskretnej wartości tego, ile jest obecnie przechowywanych na dysku, nie ma znaczenia. Co więcej, sytuacja staje się bardziej skomplikowana, gdy mamy do czynienia z grupą brokerów, z których każdy ma podzbiór wiadomości w temacie.
To nie jest Java, ale może być przydatne
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
źródło
bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609
A potem różnica zwraca faktyczne wiadomości oczekujące w temacie? Mam rację?Właściwie używam tego do testowania mojego POC. Element, którego chcesz użyć ConsumerOffsetChecker. Możesz go uruchomić za pomocą skryptu bash, jak poniżej.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
A poniżej wynik: Jak widać na czerwonym polu, 999 to numer wiadomości aktualnie w temacie.
Aktualizacja: ConsumerOffsetChecker jest przestarzały od 0.10.0, możesz zacząć używać ConsumerGroupCommand.
źródło
Czasami interesuje nas znajomość liczby komunikatów w każdej partycji, na przykład podczas testowania niestandardowego partycji. Kolejne kroki zostały przetestowane pod kątem współpracy z Kafką 0.10.2.1-2 z Confluent 3.2. Biorąc pod uwagę temat Kafka
kt
i następujący wiersz poleceń:$ kafka-run-class kafka.tools.GetOffsetShell \ --broker-list host01:9092,host02:9092,host02:9092 --topic kt
To drukuje przykładowe dane wyjściowe pokazujące liczbę wiadomości w trzech partycjach:
kt:2:6138 kt:1:6123 kt:0:6137
Liczba wierszy może być mniej więcej w zależności od liczby partycji w temacie.
źródło
Ponieważ
ConsumerOffsetChecker
nie jest już obsługiwane, możesz użyć tego polecenia, aby sprawdzić wszystkie wiadomości w temacie:bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \ --group my-group \ --bootstrap-server localhost:9092 \ --describe
Gdzie
LAG
jest liczba wiadomości w partycji tematu:Możesz także spróbować użyć kafkacat . Jest to projekt open source, który może pomóc w czytaniu wiadomości z tematu i partycji oraz drukowaniu ich na standardowe wyjście. Oto przykład, który odczytuje 10 ostatnich wiadomości z
sample-kafka-topic
tematu, a następnie kończy:kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
źródło
Użyj https://prestodb.io/docs/current/connector/kafka-tutorial.html
Super silnik SQL dostarczony przez Facebooka, który łączy się z kilkoma źródłami danych (Cassandra, Kafka, JMX, Redis ...).
PrestoDB działa jako serwer z opcjonalnymi pracownikami (istnieje tryb autonomiczny bez dodatkowych pracowników), wtedy do tworzenia zapytań używasz małego wykonywalnego pliku JAR (zwanego presto CLI).
Po prawidłowym skonfigurowaniu serwera Presto możesz używać tradycyjnego SQL:
SELECT count(*) FROM TOPIC_NAME;
źródło
Polecenie Apache Kafka, aby uzyskać nieobsłużone komunikaty na wszystkich partycjach tematu:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group
Wydruki:
Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 none
Kolumna 6 zawiera nieobsłużone wiadomości. Dodaj je w ten sposób:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}'
awk czyta wiersze, pomija wiersz nagłówka i dodaje szóstą kolumnę, a na końcu wypisuje sumę.
Wydruki
5
źródło
Aby uzyskać wszystkie wiadomości zapisane dla tematu, możesz wyszukać konsumenta na początku i na końcu strumienia dla każdej partycji i zsumować wyniki
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
źródło
Uruchom następujące (zakładając, że
kafka-console-consumer.sh
jest na ścieżce):kafka-console-consumer.sh --from-beginning \ --bootstrap-server yourbroker:9092 --property print.key=true \ --property print.value=false --property print.partition \ --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
źródło
--new-consumer
tę opcję, ponieważ ta opcja nie jest już dostępna (lub najwyraźniej konieczna)Korzystając z klienta Java Kafki 2.11-1.0.0, możesz wykonać następujące czynności:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); } } }
Wynik wygląda mniej więcej tak:
offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13
źródło
seekToEnd(..)
iseekToBeginning(..)
metody, które zmieniają stanconsumer
.W najnowszych wersjach programu Kafka Manager znajduje się kolumna zatytułowana Sumowane ostatnie przesunięcia .
źródło
Miałem to samo pytanie i tak to robię od Konsumenta Kafki w Kotlinie:
Bardzo zgrubny kod, ponieważ właśnie to uruchomiłem, ale zasadniczo chcesz odjąć przesunięcie początku tematu od przesunięcia końcowego i będzie to bieżąca liczba wiadomości w temacie.
Nie możesz polegać tylko na przesunięciu końcowym ze względu na inne konfiguracje (zasady czyszczenia, retention-ms, itp.), Które mogą spowodować usunięcie starych wiadomości z tematu. Przesunięcia „przesuwają się” tylko do przodu, więc jest to początkowe przesunięcie, które przesunie się do przodu bliżej końcowego przesunięcia (lub ostatecznie do tej samej wartości, jeśli temat nie zawiera w tej chwili wiadomości).
Zasadniczo przesunięcie końcowe reprezentuje całkowitą liczbę wiadomości, które przeszły przez ten temat, a różnica między nimi reprezentuje liczbę wiadomości, które temat zawiera obecnie.
źródło
Fragmenty dokumentów Kafki
Wycofania w wersji 0.9.0.0
Element kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) został wycofany. Idąc dalej, użyj tej funkcji kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand).
Używam brokera Kafka z włączonym SSL zarówno dla serwera, jak i klienta. Poniżej polecenie, którego używam
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
gdzie / tmp / ssl_config jest jak poniżej
źródło
Jeśli masz dostęp do interfejsu JMX serwera, przesunięcia początku i końca są obecne w:
(musisz wymienić
TOPICNAME
&PARTITIONNUMBER
). Pamiętaj, że musisz sprawdzić dla każdej z replik danej partycji, lub musisz dowiedzieć się, który z brokerów jest liderem dla danej partycji (i może się to zmieniać w czasie).Alternatywnie możesz użyć metod Kafka Consumer
beginningOffsets
iendOffsets
.źródło
Nie próbowałem ten sam, ale wydaje się mieć sens.
Możesz także użyć
kafka.tools.ConsumerOffsetChecker
( źródło ).źródło
Najprostszym sposobem, jaki znalazłem, jest użycie Kafdrop REST API
/topic/topicName
i określenie klucza:"Accept"
/ value:"application/json"
nagłówek w celu uzyskania odpowiedzi JSON.Jest to udokumentowane tutaj .
źródło
Możesz użyć kafkatool . Sprawdź ten link -> http://www.kafkatool.com/download.html
źródło