Użyj ponownie tego samego komunikatu, jeśli przetwarzanie komunikatu nie powiedzie się

10

Używam klienta Confluent.Kafka .NET w wersji 1.3.0. Postępuję zgodnie z dokumentami :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

Problemem jest to, że nawet jeśli I ustosunkowania się linii consumer.StoreOffset(consumerResult);, ciśgle następnej wiadomości niewykorzystanymi następnym razem spożywać , czyli offset stale się powiększa, które nie wydają się być co twierdzi dokumentacja to robi, to znaczy co najmniej jedną dostawę .

Nawet jeśli zestaw I EnableAutoCommit = falsei usunąć „EnableAutoOffsetStore = false” z config i wymienić consumer.StoreOffset(consumerResult)z consumer.Commit(), wciąż widzę ten sam problem, czyli nawet jeśli I ustosunkowania się Commit, nadal trzymać się kolejne niewykorzystanymi wiadomości.

Czuję, że brakuje mi czegoś fundamentalnego, ale nie mogę zrozumieć, co. Każda pomoc jest mile widziana!

havij
źródło
Wiadomości zostały już zwrócone do aplikacji z punktu widzenia Kafki, więc po zatwierdzeniu są one zapisywane jako ostatnie zatwierdzone przesunięcia, ale konsumpcja będzie nadal zwracać kolejne wiadomości, niezależnie od tego, czy zostały zużyte, czy nie. Jakie są twoje oczekiwania tutaj? Czy mógłbyś wyjaśnić, czego się spodziewasz przed / po zatwierdzeniu i konsumpcji?
Sagar Veeram
Wiadomości nie są ponownie pobierane, dopóki nie użyjesz polecenia „szukaj”, aby je skompensować. Spowoduje to zużycie i wiadomości zostaną zwrócone z wyszukiwania offset.
Sagar Veeram
@ user2683814 W moim poście wspomniałem o dwóch scenariuszach w zależności od tego, co EnableAutoCommitjest ustawione. Powiedzmy, że mamy EnableAutoCommit = false, a kiedy ja Consumeodzyskuję wiadomość z przesunięciem 11. Spodziewałem się, że będę otrzymywał tę samą wiadomość z przesunięciem 11 w kółko, jeśli przetworzenie komunikatu będzie kontynuowane, a zatem nie Commitnastąpi wywołanie .
havij
Nie, tak nie jest. Nie możesz kontrolować, co chcesz odpytać ( Consume), używając już Commitpo tym, jak już Subscribetemat. Kafka (jak w kliencie lib) za sceną zachowuje wszystkie przesunięcia, które wysłał do aplikacji w Consumei wyśle ​​je liniowo. Aby więc ponownie przetworzyć wiadomość, tak jak w scenariuszu awarii, musisz ją wyśledzić w swoim kodzie i spróbować skompensować i rozpocząć przetwarzanie wiadomości, a także powinieneś wiedzieć, co pominąć, jeśli zostało już przetworzone we wcześniejszych żądaniach. Nie znam biblioteki .net, ale nie powinno to mieć większego znaczenia, ponieważ jest to projekt kafka.
Sagar Veeram
Myślę, że musisz użyć kombinacji subskrypcji i przypisania i może potrzebować różnych klientów do obsługi twojego przypadku użycia. W przypadku awarii skorzystaj z funkcji przypisania / wyszukiwania dla partycji tematycznych z jednym konsumentem w celu ponownego przetworzenia wiadomości, a do normalnego przetwarzania użyj innego konsumenta z przepływem subskrypcji / konsumpcji / zatwierdzania.
Sagar Veeram

Odpowiedzi:

0

Niestety nie mogę jeszcze dodać komentarza. Konsument Kafka zużywa wiadomości w partiach, więc być może nadal wykonujesz iterację partii wstępnie pobranej przez wątek w tle .

Możesz sprawdzić, czy Twój klient naprawdę zgodził się na przesunięcie, czy nie, używając Kafka kafka-consumer-groups.sh

kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group consumer_group  --describe
Tuyen Luong
źródło
0

Możesz chcieć mieć logikę ponownej próby przetwarzania każdej wiadomości przez określoną liczbę razy, np. Powiedzmy 5. Jeśli to się nie powiedzie podczas tych 5 ponownych prób, możesz dodać tę wiadomość do innego tematu dotyczącego obsługi wszystkich nieudane wiadomości, które mają pierwszeństwo przed rzeczywistym tematem. Lub możesz dodać wiadomość, która się nie powiodła, do tego samego tematu, aby mogła zostać odebrana później, gdy wszystkie pozostałe wiadomości zostaną zużyte.

Jeśli przetwarzanie któregokolwiek komunikatu zakończy się powodzeniem w ciągu tych 5 ponownych prób, możesz przejść do następnej wiadomości w kolejce.

Raju Dasupally
źródło