Czy istnieje sposób na oczyszczenie tematu w kafce?
Przesunąłem wiadomość, która była zbyt duża, do tematu wiadomości Kafka na mojej lokalnej maszynie, teraz pojawia się błąd:
kafka.common.InvalidMessageSizeException: invalid message size
Zwiększanie liczby fetch.size
nie jest tutaj idealne, ponieważ tak naprawdę nie chcę akceptować tak dużych wiadomości.
apache-kafka
purge
Peter Klipfel
źródło
źródło
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
--delete-config retention.ms
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
Pozwala to również sprawdzić bieżący okres przechowywania, np. Kafka-configs --zookeeper <zkhost>: 2181 --describe - tematy typu -entity - nazwa-name <nazwa tematu>Aby wyczyścić kolejkę, możesz usunąć temat:
następnie utwórz go ponownie:
źródło
delete.topic.enable=true
w plikuconfig/server.properties
, jak mówi ostrzeżenie wydrukowane przez wspomniane polecenieNote: This will have no impact if delete.topic.enable is not set to true.
Oto kroki, które wykonuję, aby usunąć temat o nazwie
MyTopic
:rm -rf /tmp/kafka-logs/MyTopic-0
. Powtórz dla innych partycji i wszystkich replikzkCli.sh
następniermr /brokers/MyTopic
Jeśli przegapisz krok 3, to Apache Kafka będzie nadal zgłaszać ten temat jako obecny (na przykład, jeśli uruchomisz
kafka-list-topic.sh
).Testowane z Apache Kafka 0.8.0.
źródło
./zookeeper-shell.sh localhost:2181
./kafka-topics.sh --list --zookeeper localhost:2181
zookeeper-client
zamiastzkCli.sh
(wypróbowany na Cloudera CDH5)Chociaż przyjęta odpowiedź jest poprawna, metoda ta jest przestarzała. Konfiguracja tematu powinna być teraz wykonana za pośrednictwem
kafka-configs
.Konfiguracje ustawione za pomocą tej metody można wyświetlić za pomocą polecenia
źródło
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
Testowany w Kafka 0.8.2, na przykład szybkiego uruchamiania: Najpierw dodaj jedną linię do pliku server.properties w folderze config:
następnie możesz uruchomić to polecenie:
źródło
Od kafka 1.1
Wyczyść temat
poczekaj 1 minutę, aby być pewnym, że kafka wyczyści temat, usuń konfigurację, a następnie przejdź do wartości domyślnej
źródło
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
kafka nie ma bezpośredniej metody czyszczenia / czyszczenia tematu (kolejek), ale może to zrobić poprzez usunięcie tego tematu i odtworzenie go.
najpierw upewnij się, że plik sever.properties ma, a jeśli nie, dodaj
delete.topic.enable=true
następnie Usuń temat
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
następnie utwórz go ponownie.
źródło
Czasami, jeśli masz nasycony klaster (zbyt wiele partycji, używasz zaszyfrowanych danych tematów lub używasz protokołu SSL lub kontroler znajduje się w złym węźle lub połączenie jest niestabilne, wyczyszczenie tego tematu zajmie dużo czasu .
Wykonuję te kroki, szczególnie jeśli używasz Avro.
1: Uruchom z narzędziami Kafka:
2: Uruchom w węźle rejestru Schemat:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Przywróć zachowanie tematu do pierwotnego ustawienia, gdy temat będzie pusty.
Mam nadzieję, że to komuś pomoże, ponieważ nie jest łatwo reklamowane.
źródło
kafka-avro-console-consumer
nie jest konieczneAKTUALIZACJA: Ta odpowiedź dotyczy Kafka 0.6. Dla Kafki 0.8 i późniejszych patrz odpowiedź @Patrick.
Tak, zatrzymaj Kafka i ręcznie usuń wszystkie pliki z odpowiedniego podkatalogu (łatwo je znaleźć w katalogu danych Kafka). Po ponownym uruchomieniu Kafka temat będzie pusty.
źródło
Najprostszym podejściem jest ustawienie daty poszczególnych plików dziennika na starsze niż okres przechowywania. Następnie broker powinien je wyczyścić i usunąć dla Ciebie w ciągu kilku sekund. Ma to kilka zalet:
Z mojego doświadczenia z Kafka 0.7.x usunięcie plików dziennika i ponowne uruchomienie brokera może prowadzić do nieprawidłowych wyjątków przesunięcia dla niektórych konsumentów. Stałoby się tak, ponieważ broker ponownie uruchamia przesunięcia na zero (przy braku istniejących plików dziennika), a konsument, który wcześniej konsumował z tematu, połączyłby się ponownie, aby zażądać określonego [raz poprawnego] przesunięcia. Jeśli to przesunięcie nie mieści się w granicach nowych dzienników tematów, nie ma żadnych szkód, a konsument wznawia na początku lub na końcu. Ale jeśli przesunięcie mieści się w granicach nowych dzienników tematów, broker próbuje pobrać zestaw komunikatów, ale kończy się niepowodzeniem, ponieważ przesunięcie nie jest wyrównane z rzeczywistym komunikatem.
Można to złagodzić, usuwając również przesunięcia konsumentów w zookeeper dla tego tematu. Ale jeśli nie potrzebujesz dziewiczego tematu i chcesz po prostu usunąć istniejącą zawartość, po prostu „dotknięcie” kilku dzienników tematów jest o wiele łatwiejsze i bardziej niezawodne niż zatrzymywanie brokerów, usuwanie dzienników tematów i czyszczenie niektórych węzłów dozorcy .
źródło
Rada Thomasa jest świetna, ale niestety
zkCli
w starych wersjach Zookeepera (na przykład 3.3.6) nie wydaje się wspieraćrmr
. Na przykład porównaj implementację wiersza poleceń we współczesnym Zookeeperze z wersją 3.3 .Jeśli masz do czynienia ze starą wersją Zookeepera, jednym z rozwiązań jest użycie biblioteki klienta takiej jak zc.zk dla Pythona. Dla osób, które nie znają Pythona, musisz zainstalować go za pomocą pip lub easy_install . Następnie uruchom powłokę Pythona (
python
) i możesz:lub nawet
jeśli chcesz usunąć wszystkie tematy z Kafki.
źródło
Aby wyczyścić wszystkie wiadomości z określonego tematu przy użyciu grupy aplikacji (nazwa_grupy powinna być taka sama jak nazwa grupy aplikacji kafka).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
źródło
Po odpowiedzi na @steven appleyard wykonałem następujące polecenia na Kafce 2.2.0 i zadziałały dla mnie.
źródło
Wiele świetnych odpowiedzi tutaj, ale wśród nich nie znalazłem żadnej o dokerze. Spędziłem trochę czasu, aby dowiedzieć się, że użycie kontenera brokera jest niewłaściwe w tym przypadku (oczywiście !!!)
i powinienem był użyć
zookeeper:2181
zamiast--zookeeper localhost:2181
jak na mój plik tworzeniapoprawne byłoby polecenie
Mam nadzieję, że pozwoli to komuś zaoszczędzić czas.
Pamiętaj również, że wiadomości nie zostaną natychmiast usunięte i zdarzy się to, gdy segment dziennika zostanie zamknięty.
źródło
localhost:2181
... Np. Nie rozumiesz funkcji sieciowych Dockera. Ponadto nie wszystkie pojemniki Zookeeper mająkafka-topics
, więc najlepiej nie używać go w ten sposób. Najnowsze instalacje Kafka pozwalają na--bootstrap-servers
zmianę tematu zamiast--zookeeper
you can use
--zookeeper zookeeper: 2181` z kontenera Kafka jest moim celem. Lub nawet wyodrębnij wiersz Zookeeper z pliku server.propertiesNie można dodać jako komentarza ze względu na rozmiar: Nie jestem pewien, czy to prawda, oprócz aktualizacji retention.ms i retention.bytes, ale zauważyłem, że zasadą czyszczenia tematów powinno być „usuwanie” (domyślnie), jeśli „kompaktowe”, to będzie trzymaj wiadomości dłużej, tzn. jeśli są „zwarte”, musisz również określić delete.retention.ms .
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Musiałem również monitorować najwcześniejsze / najnowsze przesunięcia powinny być takie same, aby potwierdzić, że to się pomyślnie wydarzyło, można również sprawdzić du -h / tmp / kafka-logs / test-topic-3-100- *
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
Innym problemem jest to, trzeba uzyskać bieżący config pierwszy więc pamiętać, aby powrócić po usunięciu powiedzie:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
źródło
Innym, raczej ręcznym podejściem do usuwania tematu jest:
u brokerów:
sudo service kafka stop
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
w dozorcy:
sudo /usr/lib/zookeeper/bin/zkCli.sh
rmr /brokers/topic/<some_topic_name>
ponownie u brokerów:
sudo service kafka start
źródło
To powinno dać
retention.ms
skonfigurowane. Następnie możesz użyć powyższej komendy alter, aby zmienić na 1 sekundę (a później wrócić do ustawień domyślnych).źródło
Z Java, używając nowego
AdminZkClient
zamiast przestarzałegoAdminUtils
:źródło
AdminClient
lubKafkaAdminClient