Ograniczanie wywołań metody do żądań M w ciągu N sekund

137

Potrzebuję komponentu / klasy, który ogranicza wykonanie jakiejś metody do maksymalnej liczby wywołań M w ciągu N sekund (lub ms lub nanos, nie ma znaczenia).

Innymi słowy, muszę się upewnić, że moja metoda jest wykonywana nie więcej niż M razy w przesuwanym oknie trwającym N sekund.

Jeśli nie znasz istniejącej klasy, możesz zamieścić swoje rozwiązania / pomysły, jak byś to wdrożył.

vtrubnikov
źródło
3
Istnieje kilka świetnych odpowiedzi na ten problem na stackoverflow.com/questions/667508/ ...
skaffman
> Muszę się upewnić, że moja metoda jest> wykonywana nie więcej niż M razy w> przesuwanym oknie trwającym N sekund. Niedawno napisałem post na blogu o tym, jak to zrobić w .NET. Możesz stworzyć coś podobnego w Javie. Lepsze ograniczanie stawek w .NET
Jack Leitch
Oryginalne pytanie bardzo przypomina problem rozwiązany w tym poście na blogu: [Wielokanałowy Asynchroniczny Throttler Java] ( cordinc.com/blog/2010/04/java-multichannel-asynchronous.html ). W przypadku liczby wywołań M w N sekund ogranicznik omówiony na tym blogu gwarantuje, że każdy interwał o długości N na osi czasu nie będzie zawierał więcej niż M. wywołań.
Hbf

Odpowiedzi:

81

Użyłbym bufora pierścieniowego ze znacznikami czasu o stałym rozmiarze M. Za każdym razem, gdy wywoływana jest metoda, sprawdzasz najstarszy wpis, a jeśli jest mniej niż N sekund w przeszłości, wykonujesz i dodajesz kolejny wpis, w przeciwnym razie śpisz za różnicę czasu.

Michael Borgwardt
źródło
4
Śliczny. Właśnie to czego potrzebuje. Szybkie próby pokazują ~ 10 linii, aby zaimplementować to i minimalne zużycie pamięci. Wystarczy pomyśleć o bezpieczeństwie wątków i kolejkowaniu przychodzących żądań.
vtrubnikov
5
Dlatego używasz DelayQueue z java.util.concurrent. Zapobiega problemowi wielu wątków działających na tym samym wpisie.
erickson,
5
Myślę, że w przypadku przypadku wielowątkowego podejście zasobnika tokenów może być lepszym wyborem.
Michael Borgwardt,
1
Czy wiesz, jak nazywa się ten algorytm, jeśli ma w ogóle jakąś nazwę?
Vlado Pandžić
80

To, co dla mnie zadziałało, to Google Guava RateLimiter .

// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);

private void someMethod() {
    throttle.acquire();
    // Do something
}
schnatterer
źródło
19
Nie polecałbym tego rozwiązania, ponieważ Guava RateLimiter zablokuje wątek, a to z łatwością wyczerpie pulę wątków.
kaviddiss
18
@kaviddiss, jeśli nie chcesz blokować, użyjtryAquire()
slf
7
Problem z obecną implementacją RateLimiter (przynajmniej dla mnie) polega na tym, że nie pozwala na okresy dłuższe niż 1 sekunda, a zatem stawki na przykład 1 na minutę.
John B
4
@John B O ile rozumiem, możesz osiągnąć 1 żądanie na minutę z RateLimiter, używając RateLimiter.create (60.0) + rateLimiter.acquire (60)
divideByZero
2
@radiantRazor Ratelimiter.create (1,0 / 60) i pozyskiwanie () osiąga 1 połączenie na minutę.
bizentass
30

Mówiąc konkretnie, powinieneś być w stanie to zaimplementować za pomocą DelayQueue. Zainicjuj kolejkę M Delayedinstancjami z opóźnieniem początkowo ustawionym na zero. Gdy przychodzą żądania do metody, taketoken, który powoduje blokowanie metody do momentu spełnienia wymagania ograniczania przepustowości. Kiedy token zostanie zabrany, addnowy żeton do kolejki z opóźnieniem N.

erickson
źródło
1
Tak, to wystarczy. Ale nie podoba mi się szczególnie DelayQueue, ponieważ używa (przez PriortyQueue) zbalansowanego binarnego skrótu (co oznacza wiele porównań offeri możliwy wzrost tablicy) i jest dla mnie trochę ciężki. Myślę, że dla innych może to być całkowicie w porządku.
vtrubnikov
5
W rzeczywistości w tej aplikacji, ponieważ nowy element dodany do sterty prawie zawsze będzie maksymalnym elementem sterty (tj. Ma największe opóźnienie), zwykle wymagane jest jedno porównanie na dodatek. Ponadto tablica nigdy się nie powiększy, jeśli algorytm zostanie zaimplementowany poprawnie, ponieważ jeden element jest dodawany dopiero po pobraniu jednego elementu.
erickson,
3
Zauważyłem, że jest to pomocne również w przypadkach, gdy nie chcesz, aby żądania występowały w dużych seriach, utrzymując rozmiar M i opóźnienie N stosunkowo małe, rzędu kilku milisekund. na przykład. M = 5, N = 20 ms zapewniłoby przepustowość 250 / sek. W rozmiarze 5.
FUD
Czy to skaluje się dla miliona obrotów na minutę i kiedy dozwolone są jednoczesne żądania? Musiałbym dodać milion opóźnionych elementów. Również przypadki narożne będą miały wysokie opóźnienie - przypadek, w którym wiele wątków wywołuje funkcję poll () i za każdym razem blokuje się.
Aditya Joshee
@AdityaJoshee Nie testowałem tego, ale jeśli będę miał trochę czasu, spróbuję zrozumieć koszty ogólne. Należy jednak pamiętać, że nie potrzebujesz 1 miliona tokenów, które wygasają w ciągu 1 sekundy. Możesz mieć 100 tokenów, które wygasają po 10 milisekundach, 10 tokenów, które tracą ważność po milisekundach, itd. To faktycznie zmusza chwilową stawkę do zbliżenia się do średniej, wygładzając skoki, które mogą powodować tworzenie kopii zapasowych u klienta, ale jest to naturalna konsekwencja ograniczenia prędkości. Jednak 1 milion obrotów na minutę nie brzmi jak dławienie. Jeśli możesz wyjaśnić swój przypadek użycia, mogę mieć lepsze pomysły.
erickson
21

Przeczytaj informacje o zasobniku tokenów algorytmie tokenów. Zasadniczo masz wiadro z tokenami. Za każdym razem, gdy wykonujesz metodę, bierzesz token. Jeśli nie ma więcej tokenów, blokujesz, dopóki go nie zdobędziesz. W międzyczasie istnieje zewnętrzny aktor, który uzupełnia tokeny w ustalonych odstępach czasu.

Nie znam biblioteki, która mogłaby to zrobić (lub coś podobnego). Możesz zapisać tę logikę w swoim kodzie lub użyć AspectJ, aby dodać zachowanie.

Kevin
źródło
3
Dzięki za sugestię, ciekawy algo. Ale to nie jest dokładnie to, czego potrzebuję. Na przykład muszę ograniczyć wykonywanie do 5 wywołań na sekundę. Jeśli użyję zasobnika tokenów i 10 żądań przychodzących w tym samym czasie, pierwsze 5 wywołań zajmie wszystkie dostępne tokeny i zostanie wykonane chwilowo, podczas gdy pozostałe 5 wywołań zostanie zrealizowanych w ustalonych odstępach 1/5 s. W takiej sytuacji potrzebuję pozostałych 5 wywołań do wykonania w jednej serii dopiero po upływie 1 sekundy.
vtrubnikov
5
Co by było, gdybyś co sekundę dodawał 5 żetonów do wiadra (lub 5 - (pozostało 5) zamiast 1 co 1/5 sekundy?
Kevin
@Kevin no, to nadal nie dałoby mi efektu
``
2
@valery tak, to by było. (Pamiętaj jednak, aby zamknąć żetony na M)
nr
nie potrzeba „aktora zewnętrznego”. Wszystko można zrobić w jednym wątku, jeśli przechowujesz metadane dotyczące czasów żądań.
Marsellus Wallace
8

Jeśli potrzebujesz ogranicznika szybkości przesuwanego okna opartego na Javie, który będzie działał w systemie rozproszonym, możesz rzucić okiem na projekt https://github.com/mokies/ratelimitj .

Konfiguracja wspierana przez Redis w celu ograniczenia żądań przez IP do 50 na minutę wyglądałaby następująco:

import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;

RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);

boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");

Więcej informacji na temat konfiguracji Redis można znaleźć pod adresem https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis .

user2326162
źródło
5

To zależy od aplikacji.

Wyobraźmy sobie sytuację, w której wiele wątków chcesz token zrobić jakąś akcję szybkość ograniczona globalnie z żadnym wybuchu dozwolonym (tj chcesz ograniczyć 10 działań na 10 sekund, ale nie chcesz 10 Działania nastąpić w pierwszej sekundy, a następnie pozostanie 9 sekund zatrzymane).

Kolejka DelayedQueue ma wadę: kolejność, w której wątki żądają tokenów, może nie być kolejnością, w której ich żądanie zostało spełnione. Jeśli wiele wątków jest zablokowanych w oczekiwaniu na token, nie jest jasne, który z nich weźmie następny dostępny token. Moim zdaniem możesz nawet mieć wątki czekające wiecznie.

Jednym z rozwiązań jest zachowanie minimalnego odstępu czasu między dwoma kolejnymi akcjami i wykonywanie działań w tej samej kolejności, w jakiej zostały one zażądane.

Oto realizacja:

public class LeakyBucket {
    protected float maxRate;
    protected long minTime;
    //holds time of last action (past or future!)
    protected long lastSchedAction = System.currentTimeMillis();

    public LeakyBucket(float maxRate) throws Exception {
        if(maxRate <= 0.0f) {
            throw new Exception("Invalid rate");
        }
        this.maxRate = maxRate;
        this.minTime = (long)(1000.0f / maxRate);
    }

    public void consume() throws InterruptedException {
        long curTime = System.currentTimeMillis();
        long timeLeft;

        //calculate when can we do the action
        synchronized(this) {
            timeLeft = lastSchedAction + minTime - curTime;
            if(timeLeft > 0) {
                lastSchedAction += minTime;
            }
            else {
                lastSchedAction = curTime;
            }
        }

        //If needed, wait for our time
        if(timeLeft <= 0) {
            return;
        }
        else {
            Thread.sleep(timeLeft);
        }
    }
}
Duarte Meneses
źródło
co to minTimeznaczy? Co to robi? czy możesz to wyjaśnić?
flash
minTimeto minimalny czas, jaki musi upłynąć po zużyciu tokenu, zanim będzie można wykorzystać następny token.
Duarte Meneses
3

Chociaż nie jest to to, o co prosiłeś, ThreadPoolExecutorco ma na celu ograniczenie do M jednoczesnych żądań zamiast M żądań w ciągu N sekund, może być również przydatne.

Eugene Yokota
źródło
2

Zaimplementowałem prosty algorytm dławienia. Wypróbuj ten link, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html

Krótka informacja o algorytmie,

Ten algorytm wykorzystuje możliwości opóźnionej kolejki Java . Utwórz opóźniony obiekt z oczekiwanym opóźnieniem (tutaj 1000 / M dla milisekund TimeUnit ). Umieść ten sam obiekt w opóźnionej kolejce, która zapewni nam ruchome okno. Następnie przed każdym wywołaniem metody weź obiekt z kolejki, weź jest wywołanie blokujące, które powróci dopiero po określonym opóźnieniu, a po wywołaniu metody nie zapomnij umieścić obiektu w kolejce ze zaktualizowanym czasem (tutaj bieżące milisekundy) .

Tutaj możemy również mieć wiele opóźnionych obiektów z różnym opóźnieniem. Takie podejście zapewni również wysoką przepustowość.

Krishas
źródło
6
Powinieneś opublikować podsumowanie swojego algorytmu. Jeśli twój link zniknie, twoja odpowiedź stanie się bezużyteczna.
jwr
Dzięki, dodałem brief.
Krishas
1

Moja implementacja poniżej może obsłużyć dowolną precyzję czasu żądania, ma złożoność czasową O (1) dla każdego żądania, nie wymaga żadnego dodatkowego bufora, np. Złożoność przestrzeni O (1), a ponadto nie wymaga wątku w tle do zwolnienia tokenu, zamiast tego tokeny są zwalniane zgodnie z upływem czasu od ostatniego żądania.

class RateLimiter {
    int limit;
    double available;
    long interval;

    long lastTimeStamp;

    RateLimiter(int limit, long interval) {
        this.limit = limit;
        this.interval = interval;

        available = 0;
        lastTimeStamp = System.currentTimeMillis();
    }

    synchronized boolean canAdd() {
        long now = System.currentTimeMillis();
        // more token are released since last request
        available += (now-lastTimeStamp)*1.0/interval*limit; 
        if (available>limit)
            available = limit;

        if (available<1)
            return false;
        else {
            available--;
            lastTimeStamp = now;
            return true;
        }
    }
}
tonywl
źródło
0

Spróbuj zastosować to proste podejście:

public class SimpleThrottler {

private static final int T = 1; // min
private static final int N = 345;

private Lock lock = new ReentrantLock();
private Condition newFrame = lock.newCondition();
private volatile boolean currentFrame = true;

public SimpleThrottler() {
    handleForGate();
}

/**
 * Payload
 */
private void job() {
    try {
        Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98)));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.err.print(" J. ");
}

public void doJob() throws InterruptedException {
    lock.lock();
    try {

        while (true) {

            int count = 0;

            while (count < N && currentFrame) {
                job();
                count++;
            }

            newFrame.await();
            currentFrame = true;
        }

    } finally {
        lock.unlock();
    }
}

public void handleForGate() {
    Thread handler = new Thread(() -> {
        while (true) {
            try {
                Thread.sleep(1 * 900);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                currentFrame = false;

                lock.lock();
                try {
                    newFrame.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    handler.start();
}

}

SergeZ
źródło
0

Apache Camel obsługuje również mechanizm Throttler w następujący sposób:

from("seda:a").throttle(100).asyncDelayed().to("seda:b");
gtoniczny
źródło
0

To jest aktualizacja powyższego kodu LeakyBucket. Działa to dla ponad 1000 żądań na sekundę.

import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;

class LeakyBucket {
  private long minTimeNano; // sec / billion
  private long sched = System.nanoTime();

  /**
   * Create a rate limiter using the leakybucket alg.
   * @param perSec the number of requests per second
   */
  public LeakyBucket(double perSec) {
    if (perSec <= 0.0) {
      throw new RuntimeException("Invalid rate " + perSec);
    }
    this.minTimeNano = (long) (1_000_000_000.0 / perSec);
  }

  @SneakyThrows public void consume() {
    long curr = System.nanoTime();
    long timeLeft;

    synchronized (this) {
      timeLeft = sched - curr + minTimeNano;
      sched += minTimeNano;
    }
    if (timeLeft <= minTimeNano) {
      return;
    }
    TimeUnit.NANOSECONDS.sleep(timeLeft);
  }
}

a najbardziej nieprzejrzyste z powyższych:

import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class LeakyBucketTest {
  @Test @Ignore public void t() {
    double numberPerSec = 10000;
    LeakyBucket b = new LeakyBucket(numberPerSec);
    Stopwatch w = Stopwatch.createStarted();
    IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
        x -> b.consume());
    System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
  }
}
peterreilly
źródło
co to minTimeNanoznaczy? możesz wytłumaczyć?
flash
0

Oto trochę zaawansowana wersja prostego ogranicznika prędkości

/**
 * Simple request limiter based on Thread.sleep method.
 * Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request.
 * If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit
 */
public class RequestRateLimiter {

    private long minTime;

    private long lastSchedAction;
    private double avgSpent = 0;

    ArrayList<RatePeriod> periods;


    @AllArgsConstructor
    public static class RatePeriod{

        @Getter
        private LocalTime start;

        @Getter
        private LocalTime end;

        @Getter
        private float maxRate;
    }


    /**
     * Create request limiter with maxRate - maximum number of requests per second
     * @param maxRate - maximum number of requests per second
     * @return
     */
    public static RequestRateLimiter create(float maxRate){
        return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0),
                LocalTime.of(23,59,59), maxRate)));
    }

    /**
     * Create request limiter with ratePeriods calendar - maximum number of requests per second in every period
     * @param ratePeriods - rate calendar
     * @return
     */
    public static RequestRateLimiter create(List<RatePeriod> ratePeriods){
        return new RequestRateLimiter(ratePeriods);
    }

    private void checkArgs(List<RatePeriod> ratePeriods){

        for (RatePeriod rp: ratePeriods ){
            if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end )
                throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length");
        }
    }

    private float getCurrentRate(){

        LocalTime now = LocalTime.now();

        for (RatePeriod rp: periods){
            if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) )
                return rp.maxRate;
        }

        return Float.MAX_VALUE;
    }



    private RequestRateLimiter(List<RatePeriod> ratePeriods){

        checkArgs(ratePeriods);
        periods = new ArrayList<>(ratePeriods.size());
        periods.addAll(ratePeriods);

        this.minTime = (long)(1000.0f / getCurrentRate());
        this.lastSchedAction = System.currentTimeMillis() - minTime;
    }

    /**
     * Call this method before making actual request.
     * Method call locks until current rate falls down below the limit
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {

        long timeLeft;

        synchronized(this) {
            long curTime = System.currentTimeMillis();

            minTime = (long)(1000.0f / getCurrentRate());
            timeLeft = lastSchedAction + minTime - curTime;

            long timeSpent = curTime - lastSchedAction + timeLeft;
            avgSpent = (avgSpent + timeSpent) / 2;

            if(timeLeft <= 0) {
                lastSchedAction = curTime;
                return;
            }

            lastSchedAction = curTime + timeLeft;
        }

        Thread.sleep(timeLeft);
    }

    public synchronized float getCuRate(){
        return (float) ( 1000d / avgSpent);
    }
}

I testy jednostkowe

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class RequestRateLimiterTest {


    @Test(expected = IllegalArgumentException.class)
    public void checkSingleThreadZeroRate(){

        // Zero rate
        RequestRateLimiter limiter = RequestRateLimiter.create(0);
        try {
            limiter.consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void checkSingleThreadUnlimitedRate(){

        // Unlimited
        RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) < 1000));
    }

    @Test
    public void rcheckSingleThreadRate(){

        // 3 request per minute
        RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 3; i++ ){

            try {
                limiter.consume();
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000));
    }



    @Test
    public void checkSingleThreadRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ));
    }

    @Test
    public void checkMultiThreadedRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreaded32RateLimit(){

        // 0,2 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(0.2f);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(8);
        ExecutorService exec = Executors.newFixedThreadPool(8);

        for ( int i = 0; i < 8; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 2; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreadedRateLimitDynamicRate(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {

                Random r = new Random();
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

}
Leonid Astakhov
źródło
Kod jest dość prosty. Po prostu tworzysz limiter za pomocą maxRate lub okresów i stawki. A potem po prostu zadzwoń, konsumuj każdą prośbę. Jeśli szybkość nie zostanie przekroczona, ogranicznik natychmiast powraca lub czeka jakiś czas, zanim powróci do niższej bieżącej szybkości żądań. Posiada również metodę bieżącej stopy procentowej, która zwraca średnią ruchomą bieżącej stopy.
Leonid Astakhov
0

Moje rozwiązanie: Prosta metoda użycia, którą można zmodyfikować, aby utworzyć klasę opakowania.

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

Weź z JAVA Thread Debounce and Throttle

benbai123
źródło