Czy istnieje sposób na usunięcie wszystkich danych z tematu lub usunięcie tematu przed każdym uruchomieniem?
Czy mogę zmodyfikować plik KafkaConfig.scala, aby zmienić logRetentionHours
właściwość? Czy istnieje sposób, w jaki wiadomości są usuwane, gdy tylko konsument je przeczyta?
Używam producentów do pobierania danych skądś i wysyłania danych do określonego tematu, w którym konsument konsumuje, czy mogę usunąć wszystkie dane z tego tematu przy każdym uruchomieniu? Chcę tylko nowych danych za każdym razem w temacie. Czy jest sposób na ponowne zainicjowanie tematu?
apache-kafka
apache-zookeeper
TommyT
źródło
źródło
Odpowiedzi:
Nie myśl, że jest jeszcze obsługiwany.Spójrz na to wydanie JIRA „Dodaj obsługę usuwania tematów”.Aby usunąć ręcznie:
log.dir
atrybut w pliku konfiguracyjnym kafka ) oraz dane zookeeperaW przypadku każdego tematu możesz zrobić
/tmp/kafka-logs/MyTopic-0
miejscu/tmp/kafka-logs
określonym przezlog.dir
atrybutTo jest
NOT
dobre i zalecane podejście, ale powinno działać. W pliku konfiguracyjnym brokera Kafkalog.retention.hours.per.topic
atrybut jest używany do definiowaniaThe number of hours to keep a log file before deleting it for some specific topic
Z dokumentacji Kafki :
Mówią, że za znalezienie przesunięcia początkowego do przeczytania w przykładzie Prostego konsumenta Kafki 0.8
Możesz tam również znaleźć przykładowy kod do zarządzania przesunięciem po stronie klienta.
źródło
brokers/topics/<topic_to_delete>
a także dzienniki, aby się go pozbyć.kafka-run-class.sh kafka.admin.DeleteTopicCommand
.kafka-run-class.sh kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181
Jak wspomniałem tutaj Oczyść kolejkę Kafki :
Testowane w Kafka 0.8.2, dla przykładu szybkiego startu: Najpierw dodaj jedną linię do pliku server.properties w folderze konfiguracyjnym:
następnie możesz uruchomić to polecenie:
źródło
Testowany z kafką 0.10
Uwaga: jeśli usuwasz folder / y tematów wewnątrz dzienników kafka, ale nie z folderu zookeeper-data, zobaczysz, że tematy nadal tam są.
źródło
Jako brudne obejście można dostosować ustawienia przechowywania w czasie wykonywania dla poszczególnych tematów, np.
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1
( Może również działać retention.bytes = 0 )Po chwili kafka powinna zwolnić miejsce. Nie jestem pewien, czy ma to jakieś konsekwencje w porównaniu z ponownym utworzeniem tematu.
ps. Lepiej przywróć ustawienia retencji, gdy kafka zakończy czyszczenie.
Możesz również użyć
retention.ms
do utrwalenia danych historycznychźródło
Poniżej znajdują się skrypty do opróżniania i usuwania tematu Kafki, przyjmując localhost jako serwer zookeeper, a Kafka_Home jest ustawiony na katalog instalacyjny:
Poniższy skrypt opróżni temat, ustawiając jego czas przechowywania na 1 sekundę, a następnie usuwając konfigurację:
Aby całkowicie usunąć tematy, należy zatrzymać wszystkie odpowiednie brokery kafka i usunąć jego katalog (y) z katalogu dziennika kafka (domyślnie: / tmp / kafka-logs), a następnie uruchomić ten skrypt, aby usunąć temat z zookeepera. Aby sprawdzić, czy został usunięty z zookeepera, dane wyjściowe ls / brokers / topics nie powinny już zawierać tematu:
źródło
grep "log.retention.check.interval" $Kafka_Home/config/server.properties
--add config
raczej nie jest--add-config
Wypróbowaliśmy mniej więcej to, co opisują inne odpowiedzi, z umiarkowanym poziomem sukcesu. To, co naprawdę zadziałało dla nas (Apache Kafka 0.8.1), to polecenie klasy
sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181
źródło
Error: Could not find or load main class kafka.admin.DeleteTopicCommand
Dla użytkowników piwa
Jeśli używasz czegoś
brew
takiego jak ja i tracisz dużo czasu na szukanie niesławnegokafka-logs
folderu, nie bój się więcej. (i daj mi znać, jeśli to działa dla Ciebie i wielu różnych wersji Homebrew, Kafka itp. :))Prawdopodobnie znajdziesz to pod:
Lokalizacja:
/usr/local/var/lib/kafka-logs
Jak właściwie znaleźć tę ścieżkę
(jest to również przydatne w zasadzie dla każdej aplikacji instalowanej przez brew)
1)
brew services list
2) Otwórz i przeczytaj,
plist
że znalazłeś powyżej3) Znajdź linię określającą
server.properties
lokalizację otwórz ją, w moim przypadku:/usr/local/etc/kafka/server.properties
4) Poszukaj
log.dirs
linii:5) Przejdź do tej lokalizacji i usuń dzienniki dotyczące żądanych tematów
6) Zrestartuj Kafkę za pomocą
brew services restart kafka
źródło
Wszystkie dane dotyczące tematów i ich partycji są przechowywane w plikach
tmp/kafka-logs/
. Ponadto są przechowywane w formacietopic-partionNumber
, więc jeśli chcesz usunąć tematnewTopic
, możesz:rm -rf /tmp/kafka-logs/newTopic-*
źródło
log.retention.hours
i dodawaćlog.retention.ms=1000
. To zachowałoby rekord Kafki Topic tylko przez jedną sekundę.log.retention.hours
na żądaną figurę.źródło
Od wersji kafka 2.3.0 istnieje alternatywny sposób miękkiego usuwania Kafki (stare podejście jest przestarzałe).
Zaktualizuj retention.ms do 1 sekundy (1000 ms), a następnie ustaw ponownie po minucie, do ustawienia domyślnego, tj. 7 dni (168 godzin, 604 800 000 w ms)
Miękkie usuwanie: - (rentention.ms = 1000) (przy użyciu kafka-configs.sh)
Ustawienie domyślne: - 7 dni (168 godzin, retention.ms = 604800000)
źródło
Podczas ręcznego usuwania tematu z klastra kafka, możesz po prostu sprawdzić to https://github.com/darrenfu/bigdata/issues/6 Istotnym krokiem, którego wiele brakuje w większości rozwiązań, jest usunięcie
/config/topics/<topic_name>
w ZK.źródło
Używam tego skryptu:
źródło
Używam poniższego narzędzia do czyszczenia po uruchomieniu testu integracji.
Korzysta z najnowszego
AdminZkClient
interfejsu API. Starszy interfejs API został wycofany.Istnieje opcja usunięcia tematu. Ale oznacza temat do usunięcia. Zookeeper później usuwa temat. Ponieważ może to być nieprzewidywalnie długie, wolę podejście retention.ms
źródło