rxjava: Czy mogę używać retry (), ale z opóźnieniem?

94

Używam rxjava w mojej aplikacji na Androida do asynchronicznej obsługi żądań sieciowych. Teraz chciałbym ponowić nieudane żądanie sieciowe dopiero po upływie określonego czasu.

Czy istnieje sposób użycia retry () na Observable, ale ponowna próba tylko po pewnym opóźnieniu?

Czy istnieje sposób, aby powiadomić Observable, że jest obecnie ponawiany (w przeciwieństwie do próby za pierwszym razem)?

Przyjrzałem się debounce () / throttleWithTimeout (), ale wydaje się, że robią coś innego.

Edytować:

Myślę, że znalazłem jeden sposób, aby to zrobić, ale byłbym zainteresowany albo potwierdzeniem, że jest to właściwy sposób, albo innymi, lepszymi sposobami.

Oto co robię: w metodzie call () mojego Observable.OnSubscribe, zanim wywołam metodę Subscribers onError (), po prostu pozwalam wątkowi spać przez żądany czas. Aby ponowić próbę co 1000 milisekund, robię coś takiego:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

Ponieważ ta metoda jest uruchomiona w wątku IO i tak, nie blokuje interfejsu użytkownika. Jedynym problemem, jaki widzę, jest to, że nawet pierwszy błąd jest zgłaszany z opóźnieniem, więc opóźnienie występuje, nawet jeśli nie ma ponownej próby (). Wolałbym, żeby opóźnienie nie zostało zastosowane po błędzie, ale przed ponowną próbą (ale oczywiście nie przed pierwszą próbą).

david.mihola
źródło

Odpowiedzi:

173

Możesz użyć retryWhen()operatora, aby dodać logikę ponawiania do dowolnego Observable.

Następująca klasa zawiera logikę ponawiania:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

Stosowanie:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));
kjones
źródło
2
Error:(73, 20) error: incompatible types: RetryWithDelay cannot be converted to Func1<? super Observable<? extends Throwable>,? extends Observable<?>>
Nima G
3
@nima Miałem ten sam problem, zmień RetryWithDelayna to: pastebin.com/6SiZeKnC
user1480019
2
Wygląda na to, że operator RxJava retryWhen zmienił się od czasu, gdy to napisałem. Otrzymam zaktualizowaną odpowiedź.
kjones
3
Powinieneś zaktualizować tę odpowiedź, aby była zgodna z RxJava 2
Vishnu M.,
1
jak wyglądałaby wersja rxjava 2 dla kotlin?
Gabriel Sanmartin
20

Zainspirowany odpowiedzią Paula i jeśli nie przejmujesz się retryWhenproblemami przedstawionymi przez Abhijita Sarkara , najprostszym sposobem na opóźnienie ponownej subskrypcji bezwarunkowej rxJava2 jest:

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

Możesz chcieć zobaczyć więcej przykładów i wyjaśnień na temat retryWhen i repeatWhen .

McX
źródło
15

Ten przykład działa z jxjava 2.2.2:

Spróbuj bezzwłocznie:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

Spróbuj ponownie z opóźnieniem:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));

Nasz singiel źródłowy zawiedzie, jeśli zawiedzie someConnection.send (). W takim przypadku obserwowalne niepowodzenia w programie retryWhen emituje błąd. Opóźniamy tę emisję o 300 ms i wysyłamy ją z powrotem, aby zasygnalizować ponowną próbę. take (5) gwarantuje, że nasza obserwowalna sygnalizacja zakończy się po otrzymaniu pięciu błędów. retryWhen widzi zakończenie i nie ponawia próby po piątym niepowodzeniu.

Erunafailaro
źródło
9

Jest to rozwiązanie oparte na fragmentach Ben Christensen ujrzałem RetryWhen przykładu i RetryWhenTestsConditional (musiałem zmienić n.getThrowable()się nza to do pracy). Użyłem evant / gradle-retrolambda, aby notacja lambda działała na Androidzie, ale nie musisz używać lambd (chociaż jest to wysoce zalecane). W przypadku opóźnienia zaimplementowałem wykładnicze wycofywanie, ale możesz podłączyć dowolną logikę wycofywania, jaką chcesz. Dla kompletności dodałem operatory subscribeOni observeOn. Używam ReactiveX / RxAndroid dla AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
david-hoze
źródło
2
wygląda to elegancko, ale nie używam funkcji lamba, jak mogę pisać bez lambda? @ amitai-hoze
ericn
jak też napisać to tak, aby móc ponownie użyć tej funkcji ponawiania dla innych Observableobiektów?
ericn
nieważne, użyłem kjonesrozwiązania i działa idealnie dla mnie, dzięki
ericn
8

zamiast używać MyRequestObservable.retry używam funkcji opakowującej retryObservable (MyRequestObservable, retrycount, seconds), która zwraca nową Observable, która obsługuje pośrednie opóźnienie, więc mogę to zrobić

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}
Alexis Contour
źródło
Strasznie mi przykro, że nie odpisałem wcześniej - jakoś przeoczyłem powiadomienie od SO, że była odpowiedź na moje pytanie ... Przegłosowałem Twoją odpowiedź, bo pomysł mi się podoba, ale nie jestem pewien czy - zgodnie z zasadami SO - Powinienem zaakceptować odpowiedź, ponieważ jest to raczej obejście niż bezpośrednia odpowiedź. Ale myślę, że skoro dajesz obejście, odpowiedź na moje początkowe pytanie brzmi „nie, nie możesz” ...
david.mihola
5

retryWhenjest skomplikowanym, być może nawet wadliwym operatorem. Oficjalny dokument i co najmniej jedna odpowiedź tutaj używają rangeoperatora, który nie powiedzie się, jeśli nie ma ponownych prób. Zobacz moją dyskusję z członkiem ReactiveX Davidem Karnok.

Poprawiłem odpowiedź Kjonesa, przechodząc flatMapdo concatMapi dodając RetryDelayStrategyklasę. flatMapnie zachowuje kolejności emisji concatMap, co jest ważne w przypadku opóźnień z wycofywaniem. Jak RetryDelayStrategynazwa wskazuje, dajmy użytkownikowi do wyboru różne tryby generowania opóźnień ponownych prób, w tym back-off. Kod jest dostępny na moim GitHub wraz z następującymi przypadkami testowymi:

  1. Sukces przy pierwszej próbie (bez ponownych prób)
  2. Niepowodzenie po 1 ponownej próbie
  3. Próbuje ponowić próbę 3 razy, ale kończy się sukcesem za drugim, dlatego nie próbuje ponownie za trzecim razem
  4. Sukces przy 3. ponownej próbie

Zobacz setRandomJokesmetodę.

Abhijit Sarkar
źródło
5

Na podstawie odpowiedzi kjones tutaj jest wersja Kotlin RxJava 2.x ponowna próba z opóźnieniem jako przedłużeniem. Zastąp, Observableaby utworzyć to samo rozszerzenie dla Flowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

Następnie po prostu użyj go na obserwowalnym observable.retryWithDelay(3, 1000)

JuliusScript
źródło
Czy można to również zastąpić Single?
Papps
2
@Papps Tak, to powinno działać, po prostu zwróć uwagę, flatMapże będzie musiał używać Flowable.timeri Flowable.error nawet jeśli funkcja jest Single<T>.retryWithDelay.
JuliusScript
3

Teraz z RxJava w wersji 1.0+ możesz użyć zipWith, aby ponowić próbę z opóźnieniem.

Dodawanie modyfikacji do odpowiedzi kjones .

Zmodyfikowano

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}
Omkar
źródło
3

Ta sama odpowiedź jak w kjones, ale zaktualizowana do najnowszej wersji Dla wersji RxJava 2.x : ('io.reactivex.rxjava2: rxjava: 2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}

Stosowanie:

// Dodaj logikę ponawiania do istniejących obserwowalnych. // Spróbuj ponownie maksymalnie 3 razy z opóźnieniem 2 sekund.

observable
    .retryWhen(new RetryWithDelay(3, 2000));
Mihuilk
źródło
1

Możesz dodać opóźnienie w Observable zwróconym w operatorze retryWhen

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

Możesz zobaczyć więcej przykładów tutaj. https://github.com/politrons/reactive

Paweł
źródło
0

Po prostu zrób to w ten sposób:

                  Observable.just("")
                            .delay(2, TimeUnit.SECONDS) //delay
                            .flatMap(new Func1<String, Observable<File>>() {
                                @Override
                                public Observable<File> call(String s) {
                                    L.from(TAG).d("postAvatar=");

                                    File file = PhotoPickUtil.getTempFile();
                                    if (file.length() <= 0) {
                                        throw new NullPointerException();
                                    }
                                    return Observable.just(file);
                                }
                            })
                            .retry(6)
                            .subscribe(new Action1<File>() {
                                @Override
                                public void call(File file) {
                                    postAvatar(file);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {

                                }
                            });
Allen Vork
źródło
0

Dla wersji Kotlin i RxJava1

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
    : Function1<Observable<out Throwable>, Observable<*>> {

    private val START_RETRY: Int = 1

    override fun invoke(observable: Observable<out Throwable>): Observable<*> {
        return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
            .zipWith(Observable.range(START_RETRY, MAX_RETRIES),
                object : Function2<Throwable, Int, Int> {
                    override fun invoke(throwable: Throwable, attempt: Int): Int {
                        return attempt
                    }
                })
    }
}
Cody
źródło
0

(Kotlin) Poprawiłem trochę kod z wykładniczym wycofywaniem i zastosowałem emisję obronną Observable.range ():

    fun testOnRetryWithDelayExponentialBackoff() {
    val interval = 1
    val maxCount = 3
    val ai = AtomicInteger(1);
    val source = Observable.create<Unit> { emitter ->
        val attempt = ai.getAndIncrement()
        println("Subscribe ${attempt}")
        if (attempt >= maxCount) {
            emitter.onNext(Unit)
            emitter.onComplete()
        }
        emitter.onError(RuntimeException("Test $attempt"))
    }

    // Below implementation of "retryWhen" function, remove all "println()" for real code.
    val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
        throwableRx.doOnNext({ println("Error: $it") })
                .zipWith(Observable.range(1, maxCount)
                        .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
                        BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
                )
                .flatMap { pair ->
                    if (pair.second >= maxCount) {
                        Observable.error(pair.first)
                    } else {
                        val delay = interval * 2F.pow(pair.second)
                        println("retry delay: $delay")
                        Observable.timer(delay.toLong(), TimeUnit.SECONDS)
                    }
                }
    }

    //Code to print the result in terminal.
    sourceWithRetry
            .doOnComplete { println("Complete") }
            .doOnError({ println("Final Error: $it") })
            .blockingForEach { println("$it") }
}
ultraon
źródło
0

w przypadku, gdy chcesz wydrukować liczbę ponownych prób, możesz skorzystać z przykładu podanego na stronie wiki Rxjavy https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

observable.retryWhen(errors ->
    // Count and increment the number of errors.
    errors.map(error -> 1).scan((i, j) -> i + j)  
       .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
       // Limit the maximum number of retries.
       .takeWhile(errorCount -> errorCount < retryCounts)   
       // Signal resubscribe event after some delay.
       .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
Angel Koh
źródło