Jaki jest najlepszy sposób wielokrotnego wykonywania funkcji co x sekund?

283

Chcę wielokrotnie wykonywać funkcję w Pythonie co 60 sekund na zawsze (tak jak NSTimer w Celu C). Ten kod będzie działał jako demon i faktycznie przypomina wywoływanie skryptu python co minutę przy użyciu crona, ale bez konieczności konfigurowania go przez użytkownika.

W tym pytaniu dotyczącym crona zaimplementowanego w Pythonie rozwiązanie wydaje się efektywnie po prostu sleep () przez x sekund. Nie potrzebuję tak zaawansowanych funkcji, więc być może coś takiego zadziałałoby

while True:
    # Code executed here
    time.sleep(60)

Czy są jakieś przewidywalne problemy z tym kodem?

DavidM
źródło
83
Punkt pedantyczny, ale może być krytyczny, kod powyżej kodu nie jest wykonywany co 60 sekund, co powoduje 60-sekundową przerwę między wykonaniami. Zdarza się to tylko co 60 sekund, jeśli wykonany kod nie zajmuje w ogóle czasu.
Simon
4
time.sleep(60)mogą również powrócić zarówno wcześniej, jak i później
jfs
5
Nadal zastanawiam się: czy są jakieś przewidywalne problemy z tym kodem?
Banan
1
„Przewidywalny problem” polega na tym, że nie można spodziewać się 60 iteracji na godzinę za pomocą funkcji time.sleep (60). Więc jeśli dodajesz jeden element na iterację i prowadzisz listę ustawionej długości ... średnia z tej listy nie będzie reprezentować spójnego „okresu” czasu; więc funkcje takie jak „średnia ruchoma” mogą odnosić się do punktów danych, które są zbyt stare, co zniekształca wskazanie.
litepresence
2
@Banana Tak, możesz spodziewać się problemów, ponieważ skrypt nie jest WYKONYWANY DOKŁADNIE co 60 sekund. Na przykład. Zacząłem robić coś takiego, aby podzielić strumienie wideo i przesyłać je, i ostatecznie otrzymałem strumienie 5-10 ~ sekund dłużej, ponieważ kolejka multimediów buforuje się podczas przetwarzania danych w pętli. To zależy od twoich danych. Jeśli ta funkcja jest jakimś prostym organem nadzorczym, który ostrzega, na przykład, gdy twój dysk jest pełny, nie powinieneś mieć z tym żadnych problemów. Jeśli sprawdzasz ostrzeżenia elektrowni jądrowej, możesz skończyć z miastem całkowicie wysadzony x
DGoiko

Odpowiedzi:

229

Jeśli twój program nie ma jeszcze pętli zdarzeń, użyj modułu harmonogramu , który implementuje harmonogram zdarzeń ogólnego przeznaczenia.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

Jeśli już przy użyciu biblioteki pętli wydarzenie jak asyncio, trio, tkinter, PyQt5, gobject, kivy, i wiele innych - po prostu zaplanować zadanie przy użyciu metod istniejącą bibliotekę pętli zdarzeń, w zamian.

nosklo
źródło
16
Moduł harmonogramu służy do uruchamiania funkcji planowania po pewnym czasie. Jak używać go do powtarzania wywołania funkcji co x sekund bez użycia time.sleep ()?
Baishampayan Ghose
2
@Baishampayan: Wystarczy zaplanować nowy bieg.
nosklo
3
Następnie apscheduler na paczkach.python.org/APScheduler również powinien otrzymać wzmiankę.
Daniel F
6
Uwaga: ta wersja może dryfować. Możesz użyć, enterabs()aby tego uniknąć. Oto wersja bez driftu dla porównania .
jfs
8
@JavaSa: ponieważ „rób swoje rzeczy” nie jest natychmiastowe, a błędy time.sleepmogą się tutaj gromadzić. „wykonaj co X sekund” i „wykonaj z opóźnieniem ~ X sekund wielokrotnie” nie są takie same. Zobacz także ten komentarz
jfs
180

Zablokuj pętlę czasu na zegarze systemowym w następujący sposób:

import time
starttime = time.time()
while True:
    print "tick"
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))
Dave Rove
źródło
22
+1. Twoja i twistedodpowiedź to jedyne odpowiedzi, które uruchamiają funkcję co xsekundę. Reszta wykonuje funkcję z opóźnieniem xkilku sekund po każdym wywołaniu.
jfs
13
Jeśli dodasz do tego jakiś kod, który trwał dłużej niż jedną sekundę ... Wyrzuciłoby to limit czasu i zacząłby się opóźniać. Przyjęta odpowiedź w tym przypadku jest poprawna ... Każdy może zapętlić proste polecenie drukowania i niech działa co sekundę bez zwłoki ...
Angry 84
5
Wolę from time import time, sleepze względu na implikacje egzystencjalne;)
Czy
14
Działa fantastycznie. Nie musisz odejmować swojego, starttimejeśli zaczniesz od zsynchronizowania go z określonym czasem: time.sleep(60 - time.time() % 60)działało dobrze dla mnie. Użyłem go jako time.sleep(1200 - time.time() % 1200)i daje mi logi :00 :20 :40, dokładnie tak, jak chciałem.
TemporalWolf
2
@AntonSchigur, aby uniknąć znoszenia po wielu iteracjach. Indywidualny iteracja może nieco prędzej czy później zaczynają zależności sleep(), timer()precyzja i jak długo to trwa do wykonania ciała pętli, ale średnio iteracji zawsze występują na granicach przedziału (nawet jeśli niektóre z nich są pomijane) while keep_doing_it(): sleep(interval - timer() % interval). Porównaj to z tym, while keep_doing_it(): sleep(interval)gdzie błędy mogą się kumulować po kilku iteracjach.
jfs
71

Warto rozważyć Twisted, która jest biblioteką sieciową Pythona, która implementuje wzorzec Reactor .

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Podczas gdy „while True: sleep (60)” prawdopodobnie będzie działać Twisted prawdopodobnie już implementuje wiele funkcji, które w końcu będą potrzebne (demonizacja, rejestrowanie lub obsługa wyjątków, jak wskazał bobince) i prawdopodobnie będzie to bardziej niezawodne rozwiązanie

Aaron Maenpaa
źródło
Świetna odpowiedź, bardzo dokładna bez dryfu. Zastanawiam się, czy to również powoduje uśpienie procesora podczas oczekiwania na wykonanie zadania (inaczej nie czekanie na zajęcie)?
smoothware
1
dryfuje na poziomie milisekund
Derek Eden
Co znaczy „dryfuje na poziomie milisekund”?
Jean-Paul Calderone
67

Jeśli chcesz mieć nieblokujący sposób okresowego wykonywania swojej funkcji, zamiast blokującej nieskończonej pętli użyłbym wątkowego timera. W ten sposób Twój kod może działać i wykonywać inne zadania, a twoja funkcja może być wywoływana co n sekund. Często używam tej techniki do drukowania informacji o postępach w przypadku długich zadań intensywnie wykorzystujących procesor / dysk / sieć.

Oto kod, który opublikowałem w podobnym pytaniu, z kontrolą start () i stop ():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Stosowanie:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Cechy:

  • Tylko biblioteka standardowa, brak zewnętrznych zależności
  • start()i stop()można bezpiecznie dzwonić wiele razy, nawet jeśli stoper już się uruchomił / zatrzymał
  • wywoływana funkcja może mieć argumenty pozycyjne i nazwane
  • Możesz zmienić w intervaldowolnym momencie, zacznie obowiązywać po następnym uruchomieniu. To samo dotyczy args, kwargsa nawet function!
MestreLion
źródło
To rozwiązanie wydaje się dryfować z czasem; Potrzebowałem wersji, która ma wywoływać funkcję co n sekund bez dryfu. Opublikuję aktualizację w osobnym pytaniu.
eraoul,
W def _run(self)Próbuję owinąć głowę wokół dlaczego nazywasz self.start()wcześniej self.function(). Czy możesz rozwinąć? Wydaje mi się, że dzwonienie jako start()pierwsze self.is_runningzawsze Falsetak będzie, a potem zawsze będziemy rozwijać nowy wątek.
Rich Episcopo
1
Myślę, że doszedłem do sedna. @ Rozwiązanie MestreLion uruchamia funkcję co xsekundę (tj. T = 0, t = 1x, t = 2x, t = 3x, ...), gdzie na oryginalnych plakatach przykładowy kod uruchamia funkcję z przerwą x sekundową pomiędzy nimi. Ponadto, jak sądzę, w tym rozwiązaniu występuje błąd, który intervaljest krótszy niż czas potrzebny functiondo wykonania. W takim przypadku self._timerzostanie nadpisany w startfunkcji.
Rich Episcopo
Tak, @RichieEpiscopo, wezwanie do .function()after .start()ma uruchomić funkcję przy t = 0. I nie sądzę, że będzie to problem, jeśli functionzajmie to więcej czasu interval, ale tak, mogą być pewne warunki wyścigowe w kodzie.
MestreLion
To jedyny nieblokujący sposób, jaki mogłem uzyskać. Dzięki.
ukośnik odwrotny
35

Uważam, że łatwiejszym sposobem jest:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

W ten sposób Twój kod jest wykonywany, a następnie czeka 60 sekund, a następnie wykonuje ponownie, czeka, wykonuje itp. Nie musisz komplikować rzeczy: D

Itxaka
źródło
Słowo kluczowe True powinno być pisane wielkimi literami
Sean Cain
38
W rzeczywistości nie jest to odpowiedź: time sleep () może być używany tylko do oczekiwania X sekund po każdym wykonaniu. Na przykład, jeśli twoja funkcja wykonuje 0,5 sekundy i używasz time.sleep (1), oznacza to, że twoja funkcja wykonuje się co 1,5 sekundy, a nie 1. Powinieneś używać innych modułów i / lub wątków, aby upewnić się, że coś działa na Y razy co X sekund.
kommradHomer
1
@kommradHomer: odpowiedź Dave Rove za dowodzi, że można używać time.sleep()uruchomić coś co X sekund
JFS
2
Moim zdaniem kod powinien wywoływać się time.sleep()w while Truepętli, jak:def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)
Leonard Lepadatu,
22
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Jeśli chcesz to zrobić bez blokowania pozostałego kodu, możesz użyć tego, aby uruchomić go we własnym wątku:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

To rozwiązanie łączy w sobie kilka funkcji rzadko spotykanych w innych rozwiązaniach:

  • Obsługa wyjątków: O ile to możliwe na tym poziomie, wyjątki są obsługiwane odpowiednio, tj. Należy się zalogować w celu debugowania bez przerywania działania naszego programu.
  • Bez łączenia: wspólna implementacja łańcuchowa (służąca do planowania następnego zdarzenia), którą można znaleźć w wielu odpowiedziach, jest krucha pod tym względem, że jeśli coś pójdzie nie tak w mechanizmie szeregowania ( threading.Timerlub czymkolwiek innym ), spowoduje to przerwanie łańcucha. Wówczas nie będą wykonywane dalsze egzekucje, nawet jeśli przyczyna problemu została już naprawiona. Prosta pętla i czekanie z prostą sleep()jest znacznie bardziej niezawodne w porównaniu.
  • Bez dryfu: moje rozwiązanie dokładnie śledzi czasy, w których ma działać. Nie ma dryfu w zależności od czasu wykonania (jak w wielu innych rozwiązaniach).
  • Pomijanie: Moje rozwiązanie pominie zadania, jeśli jedno wykonanie zajmie zbyt dużo czasu (np. Wykonaj X co pięć sekund, ale X zajmie 6 sekund). Jest to standardowe zachowanie crona (i nie bez powodu). Wiele innych rozwiązań po prostu wykonuje to zadanie kilka razy z rzędu bez żadnych opóźnień. W większości przypadków (np. Zadania czyszczenia) nie jest to pożądane. Jeśli to jest szkoda, wystarczy użyć next_time += delayzamiast.
Alfe
źródło
2
najlepsza odpowiedź za brak dryfowania.
Sebastian Stark,
1
@PirateApp Zrobiłbym to w innym wątku. Możesz to zrobić w tym samym wątku, ale w końcu programujesz swój własny system planowania, który jest zbyt skomplikowany, aby komentować.
Alfe
1
W Pythonie, dzięki GIL, dostęp do zmiennych w dwóch wątkach jest całkowicie bezpieczny. A samo czytanie w dwóch wątkach nigdy nie powinno stanowić problemu (również w innych środowiskach wątkowych). Tylko pisanie z dwóch różnych wątków w systemie bez GIL (np. W Javie, C ++ itp.) Wymaga wyraźnej synchronizacji.
Alfe
1
@ user50473 Bez dalszych informacji najpierw podszedłbym do zadania od strony wątków. Jeden wątek od czasu do czasu odczytuje dane, a następnie śpi, aż nadszedł czas, aby to zrobić. Powyższe rozwiązanie można oczywiście oczywiście wykorzystać. Ale mogłem sobie wyobrazić wiele powodów, aby pójść inną drogą. Powodzenia :)
Alfe,
1
Tryb uśpienia można zastąpić wątkami. Poczekaj, aż upłynie limit czasu, aby lepiej reagować na zamknięcie aplikacji. stackoverflow.com/questions/29082268/…
themadmax
20

Oto aktualizacja kodu MestreLion, która pozwala uniknąć dryfowania w czasie.

Klasa RepeatedTimer wywołuje tutaj daną funkcję co „interwał” sekund, zgodnie z żądaniem OP; harmonogram nie zależy od czasu wykonywania funkcji. Podoba mi się to rozwiązanie, ponieważ nie ma zależności od biblioteki zewnętrznej; to jest po prostu czysty python.

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

Przykładowe użycie (skopiowane z odpowiedzi MestreLion):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
eraoul
źródło
5

Jakiś czas temu napotkałem podobny problem. Może http://cronus.readthedocs.org może pomóc?

W wersji 0.2 działa następujący fragment kodu

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
Anay
źródło
4

Główną różnicą między tym a cron jest to, że wyjątek na dobre zabije demona. Możesz owinąć się łapaczem wyjątków i rejestratorem.

Bobin
źródło
4

Jedna możliwa odpowiedź:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()
sks
źródło
1
To jest zajęte czekaniem i dlatego bardzo źle.
Alfe
Dobre rozwiązanie dla kogoś, kto szuka nieblokującego timera.
Noel
3

Skończyło się na użyciu modułu harmonogramu . Interfejs API jest fajny.

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
Nauka statystyk przez przykład
źródło
Trudno mi w szczególności użyć tego modułu, muszę odblokować główny wątek, sprawdziłem FAQ na stronie dokumentacji harmonogramu, ale tak naprawdę nie zrozumiałem dostarczonego obejścia. Czy ktoś wie, gdzie mogę znaleźć działający przykład, który nie blokuje głównego wątku?
5Daydreams
1

Używam metody Tkinter after (), która nie „kradnie gry” (podobnie jak moduł harmonogramu , który został wcześniej zaprezentowany), tzn. Pozwala na równoległe działanie innych rzeczy:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()i do_something2()może działać równolegle i z dowolną prędkością interwału. Tutaj drugi zostanie wykonany dwa razy szybciej. Zauważ też, że użyłem prostego licznika jako warunku do zakończenia którejkolwiek z funkcji. Możesz użyć dowolnej innej lub dowolnej opcji, jeśli chcesz, jaką funkcję uruchomić, dopóki program się nie zakończy (np. Zegar).

Apostolos
źródło
Uważaj na swoje sformułowanie: afternie pozwala na równoległe działanie. Tkinter jest jednowątkowy i może robić tylko jedną rzecz naraz. Jeśli coś zaplanowane przez afterjest uruchomione, nie działa równolegle z resztą kodu. Jeśli oba do_something1i do_something2są zaplanowane w tym samym czasie, będą uruchamiane sekwencyjnie, a nie równolegle.
Bryan Oakley
@Apostolos wszystko, co robi twoje rozwiązanie, to używać tloter mainloop zamiast harmonogramu mainloop, więc działa dokładnie w ten sam sposób, ale pozwala interfejsom tkinter nadal odpowiadać. Jeśli nie używasz tkintera do innych rzeczy, nie zmienia to niczego w odniesieniu do rozwiązania harmonogramu. W rozwiązaniu możesz użyć dwóch lub więcej zaplanowanych funkcji z różnymi interwałami sched, a to będzie działać dokładnie tak samo jak twoje.
nosklo
Nie, to nie działa w ten sam sposób. Wyjaśniłem to. Ten „blokuje” program (tzn. Zatrzymuje przepływ, nie możesz zrobić nic innego - nawet nie zaczynasz kolejnej scecduled pracy, jak sugerujesz), dopóki się nie skończy, a drugi pozwoli uwolnić ręce / uwolnić (tzn. Możesz zrobić inne rzeczy po tym, jak się zaczął. Nie musisz czekać, aż się skończy. To ogromna różnica. Gdybyś wypróbował metodę, którą przedstawiłem, sam byś zobaczył. Próbowałem twojego. Dlaczego nie spróbuj też mój?
Apostolos
1

Oto dostosowana wersja do kodu z MestreLion. Oprócz oryginalnej funkcji ten kod:

1) dodaj pierwszy_interwał używany do uruchomienia timera w określonym czasie (osoba dzwoniąca musi obliczyć pierwszy_interwał i przekazać)

2) rozwiązać warunek wyścigu w oryginalnym kodzie. W oryginalnym kodzie, jeśli wątek kontrolny nie anulował uruchomionego timera („Zatrzymaj timer i anuluj wykonanie akcji timera. Działa to tylko wtedy, gdy timer jest nadal w fazie oczekiwania.” Cytowany z https: // docs.python.org/2/library/threading.html ), licznik czasu będzie działał bez końca.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False
dproc
źródło
1

Wydaje się to o wiele prostsze niż przyjęte rozwiązanie - czy ma wady, których nie rozważam? Przybyłem tutaj, szukając makaronu, który byłby bardzo prosty i był rozczarowany.

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)

thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

Który wyprowadza asynchronicznie.

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

Ma dryf w tym sensie, że jeśli uruchamiane zadanie zajmuje znaczną ilość czasu, wówczas interwał wynosi 2 sekundy + czas zadania, więc jeśli potrzebujesz dokładnego planowania, nie jest to dla ciebie.

Uwaga: daemon=Trueflaga oznacza, że ​​ten wątek nie zablokuje zamknięcia aplikacji. Na przykład wystąpił problem pytestpolegający na tym, że zawieszał się w nieskończoność po uruchomieniu testów, czekając na zakończenie thead.

Adam Hughes
źródło
Nie, drukuje tylko pierwszą datę i godzinę, a następnie zatrzymuje się ...
Alex Poca
Jesteś pewien - właśnie skopiowałem i wkleiłem w terminalu. Od razu wraca, ale dla mnie wydruk jest kontynuowany w tle.
Adam Hughes
Wygląda na to, że czegoś mi brakuje. I kopiowania / wklejania kodu w test.py i uruchomić z Pythona test.py . W Python2.7 muszę usunąć daemon = True, który nie został rozpoznany i przeczytałem wiele wydruków. W Python3.8 zatrzymuje się po pierwszym wydruku i po jego zakończeniu żaden proces nie jest aktywny. Usuwanie demona = Prawda Czytałem wiele odbitek ...
Alex Poca
hmm dziwne - korzystam z Pythona 3.6.10, ale nie wiem, dlaczego miałoby to mieć znaczenie
Adam Hughes
Ponownie: Python3.4.2 (Debian GNU / Linux 8 (jessie)) musiał usunąć daemon = True, aby mógł drukować wielokrotnie. Z demonem pojawia się błąd składniowy. Poprzednie testy z Python2.7 i 3.8 dotyczyły Ubuntu 19.10. Czy to możliwe, że demon jest traktowany inaczej w zależności od systemu operacyjnego?
Alex Poca,
0

Używam tego, aby powodować 60 zdarzeń na godzinę, przy czym większość zdarzeń występuje w tej samej liczbie sekund po całej minucie:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

W zależności od rzeczywistych warunków możesz dostać kleszcze długości:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

ale pod koniec 60 minut będziesz miał 60 tyknięć; i większość z nich pojawi się z prawidłowym przesunięciem w stosunku do minuty, którą preferujesz.

W moim systemie mam typowy dryf <1/20 sekundy, aż pojawi się potrzeba korekty.

Zaletą tej metody jest rozdzielczość dryftu zegara; co może powodować problemy, jeśli robisz takie rzeczy, jak dodawanie jednego elementu na tyknięcie i oczekujesz 60 elementów dodawanych na godzinę. Nieuwzględnienie dryftu może powodować wtórne wskazania, takie jak średnie ruchome, w celu uwzględnienia danych zbyt głęboko w przeszłości, co prowadzi do wadliwych wyników.

litepresence
źródło
0

np. Wyświetl aktualny czas lokalny

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
powstanie. rio
źródło
0
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
raviGupta
źródło
Na podstawie danych wprowadzonych przez użytkownika będzie iterować tę metodę w każdym przedziale czasu.
raviGupta
0

Oto inne rozwiązanie bez użycia dodatkowych bibliotek.

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()
Dat Nguyen
źródło