Wysyłam wiadomości tekstowe do Kafki V. 0.8 za pomocą Java Producer API. Jeśli rozmiar wiadomości to około 15 MB, otrzymam plik MessageSizeTooLargeException
. Próbowałem ustawić message.max.bytes
na 40 MB, ale nadal otrzymuję wyjątek. Małe wiadomości działały bez problemów.
(Wyjątek występuje u producenta, nie mam konsumenta w tej aplikacji.)
Co mogę zrobić, aby pozbyć się tego wyjątku?
Moja przykładowa konfiguracja producenta
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Dziennik błędów:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
java
apache-kafka
Sonson123
źródło
źródło
Odpowiedzi:
Musisz dostosować trzy (lub cztery) właściwości:
fetch.message.max.bytes
- to określi największy rozmiar wiadomości, jaką może pobrać konsument.replica.fetch.max.bytes
- pozwoli to replikom w brokerach na wysyłanie komunikatów w klastrze i zapewnianie poprawnej replikacji komunikatów. Jeśli jest zbyt mała, wiadomość nigdy nie zostanie zreplikowana, a zatem konsument nigdy nie zobaczy wiadomości, ponieważ wiadomość nigdy nie zostanie zatwierdzona (w pełni zreplikowana).message.max.bytes
- jest to największy rozmiar wiadomości, jaką broker może otrzymać od producenta.max.message.bytes
- jest to największy rozmiar wiadomości, jaką broker pozwoli na dołączenie do tematu. Ten rozmiar jest sprawdzany przed kompresją. (Domyślnie brokermessage.max.bytes
.)Dowiedziałem się na własnej skórze o numerze 2 - nie otrzymujesz ŻADNYCH wyjątków, wiadomości ani ostrzeżeń od Kafki, więc pamiętaj o tym, gdy wysyłasz duże wiadomości.
źródło
message.max.bytes
w kodzie źródłowym. Ale muszę ustawić te wartości w konfiguracji serwera Kafkaconfig/server.properties
. Teraz działają też większe wiadomości :).fetch.message.max.bytes
pamięć dla KAŻDEJ partycji. Oznacza to, że jeśli użyjesz dużej liczby wfetch.message.max.bytes
połączeniu z dużą liczbą partycji, zajmie to dużo pamięci. W rzeczywistości, ponieważ proces replikacji między brokerami jest również wyspecjalizowanym konsumentem, będzie to również zużywać pamięć brokerów.max.message.bytes
konfiguracja dla tematu, która może być niższa niż u brokeramessage.max.bytes
./.*fetch.*bytes/
nie wydają się być sztywnymi limitami: „To nie jest absolutne maksimum, jeśli [...] jest większe niż ta wartość, rekordowa partia będzie nadal będą zwracane, aby zapewnić postęp ”.Niewielkie zmiany wymagane dla Kafki 0.10 i nowego konsumenta w porównaniu do odpowiedzi laughing_mana :
message.max.bytes
ireplica.fetch.max.bytes
.message.max.bytes
musi być równe lub mniejsze (*) niżreplica.fetch.max.bytes
.max.request.size
aby wysłać większą wiadomość.max.partition.fetch.bytes
aby otrzymywać większe wiadomości.(*) Przeczytaj komentarze, aby dowiedzieć się więcej o
message.max.bytes
<=replica.fetch.max.bytes
źródło
message.max.bytes
musi być mniejszy niżreplica.fetch.max.bytes
?replica.fetch.max.bytes
powinien być większymessage.max.bytes
. Pracownik Confluent potwierdził dzisiaj to, co podejrzewałem: że te dwie wielkości mogą być w rzeczywistości równe.message.max.bytes<replica.fetch.max.bytes
lubmessage.max.bytes=replica.fetch.max.bytes
@Kostas?Musisz zastąpić następujące właściwości:
Konfiguracje brokera ($ KAFKA_HOME / config / server.properties)
Konfiguracje konsumentów ($ KAFKA_HOME / config / consumer.properties)
Ten krok nie zadziałał. Dodałem go do aplikacji konsumenckiej i działało dobrze
Zrestartuj serwer.
zajrzyj do tej dokumentacji, aby uzyskać więcej informacji: http://kafka.apache.org/08/configuration.html
źródło
Chodzi o to, aby wiadomość o równym rozmiarze była wysyłana od Kafka Producer do Kafka Broker, a następnie odbierana przez Kafka Consumer, tj.
Producent Kafki -> Broker Kafki -> Konsument Kafki
Załóżmy, że jeśli wymagane jest przesłanie 15 MB wiadomości, wówczas producent , pośrednik i konsument , wszyscy trzej, muszą być zsynchronizowani.
Producent platformy Kafka wysyła 15 MB -> Broker Kafka zezwala / przechowuje 15 MB -> Konsument Kafki otrzymuje 15 MB
Dlatego ustawienie powinno być:
a) u Brokera:
b) na Konsumenta:
źródło
Należy pamiętać, że
message.max.bytes
atrybut musi być zsynchronizowany zfetch.message.max.bytes
własnością konsumenta . rozmiar pobierania musi być co najmniej tak duży, jak maksymalny rozmiar wiadomości, w przeciwnym razie mogłaby zaistnieć sytuacja, w której producenci mogą wysyłać wiadomości większe niż konsument może odebrać / pobrać. Warto się temu przyjrzeć.Której wersji Kafki używasz? Podaj również więcej szczegółów śledzenia, które otrzymujesz. czy jest coś takiego jak ... pojawia
payload size of xxxx larger than 1000000
się w dzienniku?źródło
Odpowiedź od @laughing_man jest dość dokładna. Mimo wszystko chciałem wydać rekomendację, której nauczyłem się od eksperta Kafki Stephane'a Maarka z Quory .
Kafka nie jest przeznaczona do obsługi dużych wiadomości.
Twoje API powinno korzystać z magazynu w chmurze (Ex AWS S3) i po prostu wypchnąć do Kafki lub dowolnego brokera wiadomości referencję S3. Musisz znaleźć miejsce do utrwalenia danych, może to dysk sieciowy, może to cokolwiek, ale nie powinien to być broker wiadomości.
Teraz, jeśli nie chcesz korzystać z powyższego rozwiązania
Maksymalny rozmiar wiadomości to 1 MB (ustawienie w Twoim brokerze nazywa się
message.max.bytes
) Apache Kafka . Jeśli naprawdę bardzo tego potrzebujesz, możesz zwiększyć ten rozmiar i upewnić się, że zwiększysz bufory sieciowe dla swoich producentów i konsumentów.A jeśli naprawdę zależy ci na podzieleniu wiadomości, upewnij się, że każdy podział wiadomości ma dokładnie ten sam klucz, aby został wypchnięty na tę samą partycję, a treść wiadomości powinna zgłaszać „identyfikator części”, aby klient mógł w pełni zrekonstruować wiadomość .
Możesz także zbadać kompresję, jeśli Twoja wiadomość jest oparta na tekście (kompresja gzip, snappy, lz4), co może zmniejszyć rozmiar danych, ale nie magicznie.
Ponownie, musisz użyć zewnętrznego systemu do przechowywania tych danych i po prostu wypchnąć odwołanie zewnętrzne do Kafki. To bardzo powszechna architektura, którą należy stosować i która jest powszechnie akceptowana.
Pamiętaj, że Kafka działa najlepiej tylko wtedy, gdy wiadomości są ogromne, ale nie mają rozmiaru.
Źródło: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
źródło
Dla osób używających landoop kafka: Możesz przekazać wartości konfiguracyjne w zmiennych środowiskowych, takich jak:
A jeśli korzystasz z rdkafka, przekaż message.max.bytes w konfiguracji producenta na przykład:
Podobnie dla konsumenta
źródło