Używam klienta Confluent.Kafka .NET w wersji 1.3.0. Postępuję zgodnie z dokumentami :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
Problemem jest to, że nawet jeśli I ustosunkowania się linii consumer.StoreOffset(consumerResult);
, ciśgle następnej wiadomości niewykorzystanymi następnym razem spożywać , czyli offset stale się powiększa, które nie wydają się być co twierdzi dokumentacja to robi, to znaczy co najmniej jedną dostawę .
Nawet jeśli zestaw I EnableAutoCommit = false
i usunąć „EnableAutoOffsetStore = false” z config i wymienić consumer.StoreOffset(consumerResult)
z consumer.Commit()
, wciąż widzę ten sam problem, czyli nawet jeśli I ustosunkowania się Commit
, nadal trzymać się kolejne niewykorzystanymi wiadomości.
Czuję, że brakuje mi czegoś fundamentalnego, ale nie mogę zrozumieć, co. Każda pomoc jest mile widziana!
EnableAutoCommit
jest ustawione. Powiedzmy, że mamyEnableAutoCommit = false
, a kiedy jaConsume
odzyskuję wiadomość z przesunięciem 11. Spodziewałem się, że będę otrzymywał tę samą wiadomość z przesunięciem 11 w kółko, jeśli przetworzenie komunikatu będzie kontynuowane, a zatem nieCommit
nastąpi wywołanie .Consume
), używając jużCommit
po tym, jak jużSubscribe
temat. Kafka (jak w kliencie lib) za sceną zachowuje wszystkie przesunięcia, które wysłał do aplikacji wConsume
i wyśle je liniowo. Aby więc ponownie przetworzyć wiadomość, tak jak w scenariuszu awarii, musisz ją wyśledzić w swoim kodzie i spróbować skompensować i rozpocząć przetwarzanie wiadomości, a także powinieneś wiedzieć, co pominąć, jeśli zostało już przetworzone we wcześniejszych żądaniach. Nie znam biblioteki .net, ale nie powinno to mieć większego znaczenia, ponieważ jest to projekt kafka.Odpowiedzi:
Niestety nie mogę jeszcze dodać komentarza. Konsument Kafka zużywa wiadomości w partiach, więc być może nadal wykonujesz iterację partii wstępnie pobranej przez wątek w tle .
Możesz sprawdzić, czy Twój klient naprawdę zgodził się na przesunięcie, czy nie, używając Kafka
kafka-consumer-groups.sh
źródło
Możesz chcieć mieć logikę ponownej próby przetwarzania każdej wiadomości przez określoną liczbę razy, np. Powiedzmy 5. Jeśli to się nie powiedzie podczas tych 5 ponownych prób, możesz dodać tę wiadomość do innego tematu dotyczącego obsługi wszystkich nieudane wiadomości, które mają pierwszeństwo przed rzeczywistym tematem. Lub możesz dodać wiadomość, która się nie powiodła, do tego samego tematu, aby mogła zostać odebrana później, gdy wszystkie pozostałe wiadomości zostaną zużyte.
Jeśli przetwarzanie któregokolwiek komunikatu zakończy się powodzeniem w ciągu tych 5 ponownych prób, możesz przejść do następnej wiadomości w kolejce.
źródło