Jaki jest dobry algorytm ograniczania szybkości?

155

Mógłbym użyć jakiegoś pseudokodu lub lepiej, Pythona. Próbuję zaimplementować kolejkę ograniczającą prędkość dla bota IRC Pythona i to częściowo działa, ale jeśli ktoś wyzwala mniej wiadomości niż limit (np. Limit szybkości to 5 wiadomości na 8 sekund, a osoba wyzwala tylko 4), a następny wyzwalacz trwa ponad 8 sekund (np. 16 sekund później), bot wysyła wiadomość, ale kolejka się zapełnia i bot czeka 8 sekund, nawet jeśli nie jest to potrzebne, ponieważ upłynął okres 8 sekund.

miniman
źródło

Odpowiedzi:

231

Oto najprostszy algorytm , jeśli chcesz po prostu upuścić wiadomości, gdy nadejdą zbyt szybko (zamiast ustawiać je w kolejce, co ma sens, ponieważ kolejka może stać się dowolnie duża):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

W tym rozwiązaniu nie ma datastruktur, timerów itp. I działa ono bezproblemowo :) Aby to zobaczyć, „przydział” rośnie z prędkością maksymalnie 5/8 jednostek na sekundę, czyli maksymalnie pięć jednostek na osiem sekund. Każda przekazana wiadomość odejmuje jedną jednostkę, więc nie możesz wysłać więcej niż pięciu wiadomości w ciągu ośmiu sekund.

Zauważ, że ratepowinna to być liczba całkowita, tj. Bez niezerowej części dziesiętnej, w przeciwnym razie algorytm nie będzie działał poprawnie (rzeczywista stawka nie będzie rate/per). Np. rate=0.5; per=1.0;Nie działa, ponieważ allowancenigdy nie wzrośnie do 1,0. Ale rate=1.0; per=2.0;działa dobrze.

Antti Huima
źródło
4
Warto również zwrócić uwagę, że wymiar i skala „time_passed” muszą być takie same jak „per”, np. Sekundy.
skaffman
2
Cześć skaffman, dziękuję za komplementy --- wyrzuciłem to z rękawa ale z 99,9% prawdopodobieństwem ktoś wcześniej wymyślił podobne rozwiązanie :)
Antti Huima
52
To jest standardowy algorytm - to pojemnik na token, bez kolejki. Wiadro jest allowance. Rozmiar łyżki to rate. allowance += …Linia jest optymalizacja dodając token każdej stopy ÷ jednej sekundy.
derobert
5
@zwirbeltier To, co piszesz powyżej, nie jest prawdą. 'Allowance' jest zawsze ograniczone przez 'rate' (spójrz na linię "// throttle"), więc zezwoli tylko na serię wiadomości o dokładnie 'rate' w dowolnym określonym czasie, tj. 5.
Antti Huima
7
To jest dobre, ale może przekroczyć stawkę. Powiedzmy, że w czasie 0 przekierować 5 wiadomości, a następnie w czasie N * (8/5) dla n = 1, 2, ... można wysłać kolejną wiadomość, powodując więcej niż 5 wiadomości w 8 sekund
mindvirus
48

Użyj tego dekoratora @RateLimited (ratepersec) przed funkcją, która kolejkuje.

Zasadniczo sprawdza, czy minęła 1 / rate secs od ostatniego razu, a jeśli nie, czeka resztę czasu, w przeciwnym razie nie czeka. To skutecznie ogranicza cię do stawki / sek. Dekorator można zastosować do dowolnej funkcji, która ma ograniczoną cenę.

W twoim przypadku, jeśli chcesz maksymalnie 5 wiadomości na 8 sekund, użyj @RateLimited (0.625) przed funkcją sendToQueue.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)
Carlos A. Ibarra
źródło
Podoba mi się pomysł wykorzystania do tego celu dekoratora. Dlaczego lastTimeCalled jest listą? Wątpię też, żeby to zadziałało, gdy wiele wątków wywołuje tę samą funkcję RateLimited ...
Stephan202
8
To lista, ponieważ proste typy, takie jak float, są stałe, gdy są przechwytywane przez zamknięcie. Tworząc ją jako listę, jest ona stała, ale jej zawartość nie. Tak, nie jest bezpieczny dla wątków, ale można go łatwo naprawić za pomocą zamków.
Carlos A. Ibarra
time.clock()nie ma wystarczającej rozdzielczości w moim systemie, więc dostosowałem kod i zmieniłem go do użyciatime.time()
mtrbean
3
Do ograniczania szybkości zdecydowanie nie chcesz używać time.clock(), który mierzy upływający czas procesora. Czas procesora może działać znacznie szybciej lub znacznie wolniej niż „rzeczywisty” czas. Zamiast tego chcesz użyć time.time(), który mierzy czas ściany („rzeczywisty” czas).
John Wiseman,
1
BTW dla rzeczywistych systemów produkcyjnych: implementacja ograniczenia szybkości za pomocą wywołania sleep () może nie być dobrym pomysłem, ponieważ zablokuje wątek, a tym samym uniemożliwi innemu klientowi korzystanie z niego.
Maresh
28

Token Bucket jest dość prosty do zaimplementowania.

Zacznij od wiadra z 5 tokenami.

Co 5/8 sekund: jeśli w wiadrze jest mniej niż 5 żetonów, dodaj jeden.

Za każdym razem, gdy chcesz wysłać wiadomość: Jeśli wiadro ma ≥1 token, wyjmij jeden token i wyślij wiadomość. W przeciwnym razie poczekaj / upuść wiadomość / cokolwiek.

(oczywiście w rzeczywistym kodzie używałbyś licznika liczb całkowitych zamiast prawdziwych tokenów i możesz zoptymalizować co 5 / 8s krok, przechowując znaczniki czasu)


Czytając ponownie pytanie, jeśli limit szybkości jest całkowicie resetowany co 8 sekund, oto modyfikacja:

Zacznij od sygnatury last_sendczasowej dawno temu (np. W epoce). Zacznij też od tego samego zasobnika składającego się z 5 tokenów.

Zastosuj regułę co 5/8 sekund.

Za każdym razem, gdy wysyłasz wiadomość: Najpierw sprawdź, czy last_send≥ 8 sekund temu. Jeśli tak, napełnij wiadro (ustaw na 5 żetonów). Po drugie, jeśli w wiadrze znajdują się tokeny, wyślij wiadomość (w przeciwnym razie upuść / czekaj / itp.). Po trzecie, ustawione last_sendna teraz.

To powinno działać w tym scenariuszu.


Właściwie napisałem bota IRC używając takiej strategii (pierwsze podejście). Jest w Perlu, nie w Pythonie, ale oto kod do zilustrowania:

Pierwsza część dotyczy dodawania tokenów do wiadra. Możesz zobaczyć optymalizację dodawania tokenów na podstawie czasu (od drugiej do ostatniej linii), a następnie ostatnia linia ogranicza zawartość wiadra do maksimum (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn to przekazywana struktura danych. Jest to metoda, która działa rutynowo (oblicza, kiedy następnym razem będzie mieć coś do zrobienia, i śpi tak długo lub do momentu, gdy uzyska ruch w sieci). Kolejna część metody zajmuje się wysyłaniem. Jest to dość skomplikowane, ponieważ wiadomości mają przypisane priorytety.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

To pierwsza kolejka, która jest uruchamiana bez względu na wszystko. Nawet jeśli spowoduje to utratę połączenia z powodu zalania. Używane do niezwykle ważnych rzeczy, takich jak odpowiadanie na PING serwera. Następnie pozostałe kolejki:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Wreszcie status zasobnika jest zapisywany z powrotem w strukturze danych $ conn (właściwie nieco później w metodzie; najpierw oblicza, jak szybko będzie miał więcej pracy)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

Jak widać, rzeczywisty kod obsługi zasobnika jest bardzo mały - około czterech wierszy. Pozostała część kodu to priorytetowa obsługa kolejek. Bot ma kolejki priorytetowe, więc np. Ktoś z nim rozmawiający nie może przeszkodzić mu w wykonywaniu ważnych obowiązków związanych z wyrzucaniem / banowaniem.

derobert
źródło
Czy coś mi brakuje ... Wygląda na to, że ograniczyłoby to Cię do 1 wiadomości co 8 sekund po przejściu przez pierwsze 5
dreszcze42
@ chills42: Tak, źle przeczytałem pytanie ... zobacz drugą połowę odpowiedzi.
derobert
@chills: Jeśli last_send jest <8 sekund, nie dodajesz żadnych tokenów do wiadra. Jeśli Twój zasobnik zawiera tokeny, możesz wysłać wiadomość; inaczej nie możesz (wysłałeś już 5 wiadomości w ciągu ostatnich 8 sekund)
derobert
3
Byłbym wdzięczny, gdyby ludzie, którzy to odrzucili, wyjaśniliby, dlaczego ... Chciałbym naprawić wszelkie napotkane problemy, ale trudno to zrobić bez opinii!
derobert
10

aby zablokować przetwarzanie do momentu wysłania wiadomości, a tym samym kolejkowanie kolejnych wiadomości, piękne rozwiązanie antti można również zmodyfikować w następujący sposób:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

po prostu czeka, aż będzie wystarczająca ilość uprawnień do wysłania wiadomości. aby nie rozpoczynać się od dwukrotności stawki, zasiłek może być również zainicjowany wartością 0.

san
źródło
5
Kiedy śpisz (1-allowance) * (per/rate), musisz dodać tę samą kwotę last_check.
Alp
2

Zachowaj czas wysłania ostatnich pięciu wierszy. Zatrzymaj wiadomości w kolejce do czasu, gdy piąta najnowsza wiadomość (jeśli istnieje) znajdzie się co najmniej 8 sekund w przeszłości (z last_five jako tablicą czasów):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()
pesto
źródło
Odkąd to poprawiłeś, nie jestem.
Pesto
Przechowujesz pięć znaczników czasu i wielokrotnie przesuwasz je w pamięci (lub wykonujesz połączone operacje na listach). Przechowuję jeden licznik całkowity i jeden znacznik czasu. I tylko arytmetyka i przypisywanie.
derobert
2
Tyle że moja będzie działać lepiej, jeśli spróbujesz wysłać 5 linii, ale tylko 3 więcej są dozwolone w tym okresie. Twój pozwoli wysłać pierwsze trzy i wymusi 8-sekundowe oczekiwanie przed wysłaniem 4 i 5. Mój pozwoli na wysłanie 4 i 5 8 sekund po czwartej i piątej ostatniej linii.
Pesto
1
Ale jeśli chodzi o ten temat, wydajność można poprawić, używając okrągłej listy połączonej o długości 5, wskazującej piątą ostatnią wysyłkę, nadpisując ją w nowej wysyłce i przesuwając wskaźnik do przodu.
Pesto
dla bota IRC z ogranicznikiem szybkości nie stanowi problemu. Wolę rozwiązanie listy, ponieważ jest bardziej czytelne. udzielona odpowiedź zbiorcza jest myląca z powodu rewizji, ale nie ma też w niej nic złego.
jheriko
2

Jednym z rozwiązań jest dołączenie znacznika czasu do każdego elementu w kolejce i odrzucenie elementu po upływie 8 sekund. Możesz wykonać to sprawdzenie za każdym razem, gdy kolejka jest dodawana do.

Działa to tylko wtedy, gdy ograniczysz rozmiar kolejki do 5 i odrzucisz wszelkie dodatki, gdy kolejka jest pełna.

jheriko
źródło
1

Jeśli ktoś nadal jest zainteresowany, używam tej prostej wywoływalnej klasy w połączeniu z czasowym przechowywaniem wartości klucza LRU, aby ograniczyć liczbę żądań na IP. Używa deque, ale może zostać przepisany, aby był używany z listą.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")
sanyi
źródło
1

Po prostu pythonowa implementacja kodu z zaakceptowanej odpowiedzi.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler
Hodza
źródło
Został on zasugerował mi , że proponuję dodać Przykład użycia kodu .
Luc,
0

Co powiesz na to:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

źródło
0

Potrzebowałem odmiany Scali. Oto ona:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A  B) extends (A  B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Oto, jak można go użyć:

val f = Limiter((5d, 8d), { 
  _: Unit  
    println(System.currentTimeMillis) 
})
while(true){f(())}
Landon Kuhn
źródło