Jak mogę wysyłać duże wiadomości za pomocą platformy Kafka (ponad 15 MB)?

120

Wysyłam wiadomości tekstowe do Kafki V. 0.8 za pomocą Java Producer API. Jeśli rozmiar wiadomości to około 15 MB, otrzymam plik MessageSizeTooLargeException. Próbowałem ustawić message.max.bytesna 40 MB, ale nadal otrzymuję wyjątek. Małe wiadomości działały bez problemów.

(Wyjątek występuje u producenta, nie mam konsumenta w tej aplikacji.)

Co mogę zrobić, aby pozbyć się tego wyjątku?

Moja przykładowa konfiguracja producenta

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Dziennik błędów:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Sonson123
źródło
5
Moim pierwszym odruchem byłoby poproszenie cię o podzielenie tej ogromnej wiadomości na kilka mniejszych: - / Domyślam się, że nie jest to możliwe z jakiegoś powodu, ale mimo to możesz to przemyśleć: Ogromne wiadomości zwykle oznaczają, że występuje wada projektowa gdzieś, co powinno być naprawione.
Aaron Digulla,
1
Dzięki, ale to znacznie skomplikowałoby moją logikę. Dlaczego używanie Kafki do wiadomości o rozmiarze około 15 MB jest złym pomysłem? Czy 1 MB to maksymalny rozmiar wiadomości, którego można użyć? Niewiele znalazłem w dokumentacji Kafki o limicie rozmiaru wiadomości.
Sonson123,
2
Jest to całkowicie niezwiązane z Kafką ani żadnym innym systemem przetwarzania wiadomości. Moje rozumowanie: jeśli coś pójdzie nie tak z plikiem o wielkości 15 MB, późniejsze posprzątanie bałaganu jest bardzo kosztowne. Dlatego zwykle dzielę duże pliki na wiele mniejszych zadań (które można następnie wykonać równolegle).
Aaron Digulla,
czy użyłeś jakiejś kompresji? czy mógłbyś
podać

Odpowiedzi:

182

Musisz dostosować trzy (lub cztery) właściwości:

  • Strona konsumenta: fetch.message.max.bytes- to określi największy rozmiar wiadomości, jaką może pobrać konsument.
  • Strona brokera: replica.fetch.max.bytes- pozwoli to replikom w brokerach na wysyłanie komunikatów w klastrze i zapewnianie poprawnej replikacji komunikatów. Jeśli jest zbyt mała, wiadomość nigdy nie zostanie zreplikowana, a zatem konsument nigdy nie zobaczy wiadomości, ponieważ wiadomość nigdy nie zostanie zatwierdzona (w pełni zreplikowana).
  • Strona brokera: message.max.bytes- jest to największy rozmiar wiadomości, jaką broker może otrzymać od producenta.
  • Strona brokera (według tematu): max.message.bytes- jest to największy rozmiar wiadomości, jaką broker pozwoli na dołączenie do tematu. Ten rozmiar jest sprawdzany przed kompresją. (Domyślnie broker message.max.bytes.)

Dowiedziałem się na własnej skórze o numerze 2 - nie otrzymujesz ŻADNYCH wyjątków, wiadomości ani ostrzeżeń od Kafki, więc pamiętaj o tym, gdy wysyłasz duże wiadomości.

laughing_man
źródło
3
Ok, ty i użytkownik2720864 mieliście rację. Ustawiłem tylko message.max.bytesw kodzie źródłowym. Ale muszę ustawić te wartości w konfiguracji serwera Kafka config/server.properties. Teraz działają też większe wiadomości :).
Sonson123
3
Czy są jakieś znane wady ustawienia zbyt wysokich wartości?
Ivan Balashov
7
Tak. Po stronie klienta przydzielasz fetch.message.max.bytespamięć dla KAŻDEJ partycji. Oznacza to, że jeśli użyjesz dużej liczby w fetch.message.max.bytespołączeniu z dużą liczbą partycji, zajmie to dużo pamięci. W rzeczywistości, ponieważ proces replikacji między brokerami jest również wyspecjalizowanym konsumentem, będzie to również zużywać pamięć brokerów.
laughing_man
3
Zauważ, że istnieje również max.message.byteskonfiguracja dla tematu, która może być niższa niż u brokera message.max.bytes.
Peter Davis
1
Według oficjalnego dokumentu parametry po stronie konsumenta i te dotyczące replikacji między brokerami /.*fetch.*bytes/nie wydają się być sztywnymi limitami: „To nie jest absolutne maksimum, jeśli [...] jest większe niż ta wartość, rekordowa partia będzie nadal będą zwracane, aby zapewnić postęp ”.
Bluu
56

Niewielkie zmiany wymagane dla Kafki 0.10 i nowego konsumenta w porównaniu do odpowiedzi laughing_mana :

  • Broker: bez zmian, nadal musisz zwiększyć właściwości message.max.bytesi replica.fetch.max.bytes. message.max.bytesmusi być równe lub mniejsze (*) niż replica.fetch.max.bytes.
  • Producent: Zwiększ, max.request.sizeaby wysłać większą wiadomość.
  • Konsument: zwiększ, max.partition.fetch.bytesaby otrzymywać większe wiadomości.

(*) Przeczytaj komentarze, aby dowiedzieć się więcej o message.max.bytes<=replica.fetch.max.bytes

Sascha Vetter
źródło
2
Czy wiesz, dlaczego message.max.bytesmusi być mniejszy niż replica.fetch.max.bytes?
Kostas
2
replica.fetch.max.bytes (domyślnie: 1 MB) - Maksymalny rozmiar danych, które może replikować broker. Musi być większy niż message.max.bytes , w przeciwnym razie broker zaakceptuje wiadomości i nie powiedzie się ich replikacja. potencjalna utrata danych ”. Źródło: handling-large-messages-kafka
Sascha Vetter
2
Dziękuję za przesłanie linku. Wydaje się, że odzwierciedla to również sugestie przewodnika Cloudera . Oba są jednak błędne - zauważ, że nie podają żadnego technicznego powodu, dlaczego replica.fetch.max.bytes powinien być większy message.max.bytes. Pracownik Confluent potwierdził dzisiaj to, co podejrzewałem: że te dwie wielkości mogą być w rzeczywistości równe.
Kostas
2
Czy są jakieś aktualizacje dotyczące message.max.bytes<replica.fetch.max.byteslub message.max.bytes=replica.fetch.max.bytes@Kostas?
Sascha Vetter
2
Tak, mogą być równe: mail-archive.com/[email protected]/msg25494.html (Ismael pracuje dla Confluent)
Kostas
13

Musisz zastąpić następujące właściwości:

Konfiguracje brokera ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Konfiguracje konsumentów ($ KAFKA_HOME / config / consumer.properties)
Ten krok nie zadziałał. Dodałem go do aplikacji konsumenckiej i działało dobrze

  • fetch.message.max.bytes

Zrestartuj serwer.

zajrzyj do tej dokumentacji, aby uzyskać więcej informacji: http://kafka.apache.org/08/configuration.html

user2550587
źródło
1
dla konsumenta wiersza poleceń muszę użyć flagi --fetch-size = <bajes>. Wygląda na to, że nie czyta pliku consumer.properties (kafka 0.8.1). Poleciłbym również włączyć kompresję od strony producenta za pomocą opcji compress.codec.
Ziggy Eunicien
Komentarz Ziggy'ego zadziałał dla mnie kafka 0.8.1.1. Dziękuję Ci!
James
czy to możliwe, że fetch.message.max.bytes został zastąpiony przez max.partition.fetch.bytes w ConsumerConfig?
s_bei
12

Chodzi o to, aby wiadomość o równym rozmiarze była wysyłana od Kafka Producer do Kafka Broker, a następnie odbierana przez Kafka Consumer, tj.

Producent Kafki -> Broker Kafki -> Konsument Kafki

Załóżmy, że jeśli wymagane jest przesłanie 15 MB wiadomości, wówczas producent , pośrednik i konsument , wszyscy trzej, muszą być zsynchronizowani.

Producent platformy Kafka wysyła 15 MB -> Broker Kafka zezwala / przechowuje 15 MB -> Konsument Kafki otrzymuje 15 MB

Dlatego ustawienie powinno być:

a) u Brokera:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) na Konsumenta:

fetch.message.max.bytes=15728640
Ravi
źródło
2
czy to możliwe, że fetch.message.max.bytes został zastąpiony przez max.partition.fetch.bytes w ConsumerConfig?
s_bei
7

Należy pamiętać, że message.max.bytesatrybut musi być zsynchronizowany z fetch.message.max.byteswłasnością konsumenta . rozmiar pobierania musi być co najmniej tak duży, jak maksymalny rozmiar wiadomości, w przeciwnym razie mogłaby zaistnieć sytuacja, w której producenci mogą wysyłać wiadomości większe niż konsument może odebrać / pobrać. Warto się temu przyjrzeć.
Której wersji Kafki używasz? Podaj również więcej szczegółów śledzenia, które otrzymujesz. czy jest coś takiego jak ... pojawia payload size of xxxx larger than 1000000się w dzienniku?

user2720864
źródło
1
Zaktualizowałem moje pytanie o więcej informacji: Kafka Wersja 2.8.0-0.8.0; teraz potrzebuję tylko producenta.
Sonson123,
7

Odpowiedź od @laughing_man jest dość dokładna. Mimo wszystko chciałem wydać rekomendację, której nauczyłem się od eksperta Kafki Stephane'a Maarka z Quory .

Kafka nie jest przeznaczona do obsługi dużych wiadomości.

Twoje API powinno korzystać z magazynu w chmurze (Ex AWS S3) i po prostu wypchnąć do Kafki lub dowolnego brokera wiadomości referencję S3. Musisz znaleźć miejsce do utrwalenia danych, może to dysk sieciowy, może to cokolwiek, ale nie powinien to być broker wiadomości.

Teraz, jeśli nie chcesz korzystać z powyższego rozwiązania

Maksymalny rozmiar wiadomości to 1 MB (ustawienie w Twoim brokerze nazywa się message.max.bytes) Apache Kafka . Jeśli naprawdę bardzo tego potrzebujesz, możesz zwiększyć ten rozmiar i upewnić się, że zwiększysz bufory sieciowe dla swoich producentów i konsumentów.

A jeśli naprawdę zależy ci na podzieleniu wiadomości, upewnij się, że każdy podział wiadomości ma dokładnie ten sam klucz, aby został wypchnięty na tę samą partycję, a treść wiadomości powinna zgłaszać „identyfikator części”, aby klient mógł w pełni zrekonstruować wiadomość .

Możesz także zbadać kompresję, jeśli Twoja wiadomość jest oparta na tekście (kompresja gzip, snappy, lz4), co może zmniejszyć rozmiar danych, ale nie magicznie.

Ponownie, musisz użyć zewnętrznego systemu do przechowywania tych danych i po prostu wypchnąć odwołanie zewnętrzne do Kafki. To bardzo powszechna architektura, którą należy stosować i która jest powszechnie akceptowana.

Pamiętaj, że Kafka działa najlepiej tylko wtedy, gdy wiadomości są ogromne, ale nie mają rozmiaru.

Źródło: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Bhanu Hoysala
źródło
4
Możesz zauważyć, że „Twoja” rekomendacja jest niemal dosłownie kopią rekomendacji Stéphane'a Maarka Quora na quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
Mike
Kafka działa z dużymi wiadomościami, absolutnie nie ma problemu. Strona wprowadzająca na stronie głównej Kafki odnosi się nawet do niej jako do systemu przechowywania.
calloc_org
3

Dla osób używających landoop kafka: Możesz przekazać wartości konfiguracyjne w zmiennych środowiskowych, takich jak:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

A jeśli korzystasz z rdkafka, przekaż message.max.bytes w konfiguracji producenta na przykład:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Podobnie dla konsumenta

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
informator
źródło