Python threading.timer - funkcja powtarzania co „n” sekund

96

Chcę uruchamiać funkcję co 0,5 sekundy i mieć możliwość uruchamiania i zatrzymywania oraz resetowania licznika czasu. Nie mam zbyt dużej wiedzy na temat działania wątków Pythona i mam problemy z zegarem Pythona.

Jednak wciąż otrzymuję, RuntimeError: threads can only be started oncegdy wykonuję threading.timer.start()dwa razy. Czy jest na to jakiś sposób? Próbowałem aplikować threading.timer.cancel()przed każdym startem.

Pseudo kod:

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()
user1431282
źródło

Odpowiedzi:

114

Najlepszym sposobem jest jednorazowe uruchomienie wątku czasowego. W wątku timera zakodowałbyś następujący kod

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function

W kodzie, który uruchomił licznik, możesz następnie setzatrzymać zdarzenie, aby zatrzymać licznik.

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
Hans Wtedy
źródło
4
Wtedy zakończy swój sen i potem przestanie. Nie ma sposobu, aby na siłę zawiesić wątek w Pythonie. To jest decyzja projektowa podjęta przez programistów Pythona. Jednak wynik netto będzie taki sam. Twój wątek będzie nadal działał (uśpiony) przez krótką chwilę, ale nie będzie wykonywał Twojej funkcji.
Hans Potem
13
Cóż, właściwie, jeśli chcesz mieć możliwość natychmiastowego zatrzymania wątku timera, po prostu użyj a threading.Eventi waitzamiast sleep. Następnie, aby go obudzić, wystarczy ustawić wydarzenie. Nie potrzebujesz nawet self.stoppedwtedy, ponieważ po prostu sprawdzasz flagę wydarzenia.
nneonneo
3
Zdarzenie byłoby używane wyłącznie do przerwania wątku czasowego. Zwykle po event.waitprostu wygaśnie i zachowa się jak sen, ale jeśli chcesz zatrzymać (lub w inny sposób przerwać wątek), ustawisz zdarzenie wątku i natychmiast się obudzisz.
nneonneo
2
Zaktualizowałem moją odpowiedź, aby używała event.wait (). Dzięki za sugestie.
Hans Potem
1
tylko pytanie, jak mogę po tym ponownie uruchomić wątek? dzwonienie thread.start()daje mithreads can only be started once
Motassem MK
33

Z odpowiednika setInterval w Pythonie :

import threading

def setInterval(interval):
    def decorator(function):
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop(): # executed in another thread
                while not stopped.wait(interval): # until stopped
                    function(*args, **kwargs)

            t = threading.Thread(target=loop)
            t.daemon = True # stop if the program exits
            t.start()
            return stopped
        return wrapper
    return decorator

Stosowanie:

@setInterval(.5)
def function():
    "..."

stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set() 

Lub tutaj jest ta sama funkcjonalność, ale jako samodzielna funkcja zamiast dekoratora :

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls() 

Oto jak to zrobić bez używania wątków .

jfs
źródło
jak zmieniłbyś interwał, używając dekoratora? powiedzieć, że chcę zmienić .5s w czasie wykonywania na 1 sekundę lub cokolwiek?
lightxx
@lightxx: po prostu użyj @setInterval(1).
jfs
hm. więc albo jestem trochę powolny, albo mnie źle zrozumiałeś. Miałem na myśli w czasie wykonywania. Wiem, że w każdej chwili mogę zmienić dekorator w kodzie źródłowym. co, na przykład, miałem trzy funkcje, z których każda była ozdobiona @setInterval (n). teraz w czasie wykonywania chcę zmienić interwał funkcji 2, ale zostawić funkcje 1 i 3 w spokoju.
lightxx
@lightxx: możesz użyć innego interfejsu, np stop = repeat(every=second, call=your_function); ...; stop().
jfs
31

Korzystanie z wątków czasowych

from threading import Timer,Thread,Event


class perpetualTimer():

   def __init__(self,t,hFunction):
      self.t=t
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      self.thread.start()

   def start(self):
      self.thread.start()

   def cancel(self):
      self.thread.cancel()

def printer():
    print 'ipsem lorem'

t = perpetualTimer(5,printer)
t.start()

można to zatrzymać t.cancel()

swapnil jariwala
źródło
3
Uważam, że ten kod ma błąd w cancelmetodzie. Po wywołaniu wątek 1) nie działa lub 2) działa. W 1) czekamy na uruchomienie funkcji, więc anulowanie będzie działać poprawnie. w 2), w którym aktualnie pracujemy, więc anulowanie nie będzie miało wpływu na bieżące wykonanie. co więcej, bieżące wykonanie zmienia harmonogram, więc nie będzie miało wpływu na przyszłe etihery.
Rich Episcopo
1
Ten kod tworzy nowy wątek za każdym razem, gdy czasomierz się kończy. Jest to kolosalne marnotrawstwo w porównaniu z przyjętą odpowiedzią.
Adrian W
Tego rozwiązania należy unikać z wyżej wymienionego powodu: za każdym razem tworzy nowy wątek
Pynchia
20

Trochę poprawiając odpowiedź Hansa Then , możemy po prostu podklasować funkcję Timer. Poniższy kod staje się naszym całym kodem „licznika powtórzeń” i może być używany jako zamiennik typu drop-in dla wątków.Timer z tymi samymi argumentami:

from threading import Timer

class RepeatTimer(Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

Przykład użycia:

def dummyfn(msg="foo"):
    print(msg)

timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

generuje następujący wynik:

foo
foo
foo
foo

i

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

produkuje

bar
bar
bar
bar
right2clicky
źródło
Czy to podejście pozwoli mi rozpocząć / anulować / rozpocząć / anulować wątek licznika czasu?
Paul Knopf,
1
Nie. Chociaż takie podejście pozwala na zrobienie wszystkiego, co zrobiłbyś ze zwykłym Timerem, nie możesz tego zrobić zwykłym Timerem. Od początku / anulowanie jest związany z podstawową wątku, jeśli spróbujesz .start () wątku, który wcześniej .cancel () 'ed potem dostaniesz wyjątek RuntimeError: threads can only be started once.
right2clicky
1
Naprawdę eleganckie rozwiązanie! Dziwne, że nie uwzględnili tylko klasy, która to robi.
Roger Dahl
to rozwiązanie jest bardzo imponujące, ale trudno mi było zrozumieć, jak zostało zaprojektowane, po prostu czytając dokumentację interfejsu timera wątków Python3 . Wydaje się, że odpowiedź opiera się na znajomości implementacji, przechodząc do samego threading.pymodułu.
Adam. Przy Epsilonie
15

Aby udzielić poprawnej odpowiedzi przy użyciu Timera zgodnie z żądaniem OP, poprawię odpowiedź swapnil jariwala :

from threading import Timer


class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
Bill Schumacher
źródło
3

Zmieniłem trochę kodu w kodzie swapnil-jariwala, aby stworzyć mały zegar konsoli.

from threading import Timer, Thread, Event
from datetime import datetime

class PT():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

def printer():
    tempo = datetime.today()
    h,m,s = tempo.hour, tempo.minute, tempo.second
    print(f"{h}:{m}:{s}")


t = PT(1, printer)
t.start()

WYNIK

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

Zegar z graficznym interfejsem tkinter

Ten kod umieszcza licznik czasu w małym okienku z tkinter

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk

app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def printer():
    tempo = datetime.today()
    clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
    try:
        lab['text'] = clock
    except RuntimeError:
        exit()


t = perpetualTimer(1, printer)
t.start()
app.mainloop()

Przykład gry w karty flash (w pewnym sensie)

from threading import Timer, Thread, Event
from datetime import datetime


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


x = datetime.today()
start = x.second


def printer():
    global questions, counter, start
    x = datetime.today()
    tempo = x.second
    if tempo - 3 > start:
        show_ans()
    #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
    print()
    print("-" + questions[counter])
    counter += 1
    if counter == len(answers):
        counter = 0


def show_ans():
    global answers, c2
    print("It is {}".format(answers[c2]))
    c2 += 1
    if c2 == len(answers):
        c2 = 0


questions = ["What is the capital of Italy?",
             "What is the capital of France?",
             "What is the capital of England?",
             "What is the capital of Spain?"]

answers = "Rome", "Paris", "London", "Madrid"

counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

wynik:

Get ready to answer
>>> 
-What is the capital of Italy?
It is Rome

-What is the capital of France?
It is Paris

-What is the capital of England?
...
Giovanni G. PY
źródło
jeśli hFunction blokuje się, czy nie spowodowałoby to opóźnienia w kolejnych czasach uruchamiania? Może mógłbyś zamienić linie tak, aby handle_function najpierw uruchamiał licznik czasu, a następnie wywoływał hFunction?
Wąsy
2

Musiałem to zrobić dla projektu. Skończyło się na tym, że uruchomiłem osobny wątek dla funkcji

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

**** bicie serca to moja funkcja, pracownik to jeden z moich argumentów ****

wewnątrz funkcji bicia serca:

def heartbeat(worker):

    while True:
        time.sleep(5)
        #all of my code

Więc kiedy uruchamiam wątek, funkcja będzie wielokrotnie czekać 5 sekund, uruchamiać cały mój kod i robić to w nieskończoność. Jeśli chcesz zabić proces, po prostu zabij wątek.

Andrew Wilkins
źródło
1

Zaimplementowałem klasę, która działa jako licznik czasu.

Zostawiam link tutaj na wypadek, gdyby ktoś go potrzebował: https://github.com/ivanhalencp/python/tree/master/xTimer

Ivan Coppes
źródło
3
Chociaż może to teoretycznie odpowiedzieć na pytanie, lepiej byłoby zawrzeć tutaj zasadnicze części odpowiedzi i podać link do odniesienia.
Karl Richter
1
from threading import Timer
def TaskManager():
    #do stuff
    t = Timer( 1, TaskManager )
    t.start()

TaskManager()

Oto mała próbka, która pomoże lepiej zrozumieć, jak to działa. function taskManager () na końcu tworzy opóźnione wywołanie funkcji do siebie.

Spróbuj zmienić zmienną „dalay”, a zobaczysz różnicę

from threading import Timer, _sleep

# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True

def taskManager():

    global counter, DATA, delay, allow_run
    counter += 1

    if len(DATA) > 0:
        if FIFO:
            print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
        else:
            print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")

    else:
        print("["+str(counter)+"] no data")

    if allow_run:
        #delayed method/function call to it self
        t = Timer( dalay, taskManager )
        t.start()

    else:
        print(" END task-manager: disabled")

# ------------------------------------------
def main():

    DATA.append("data from main(): 0")
    _sleep(2)
    DATA.append("data from main(): 1")
    _sleep(2)


# ------------------------------------------
print(" START task-manager:")
taskManager()

_sleep(2)
DATA.append("first data")

_sleep(2)
DATA.append("second data")

print(" START main():")
main()
print(" END main():")

_sleep(2)
DATA.append("last data")

allow_run = False
ch3ll0v3k
źródło
1
czy możesz też powiedzieć trochę więcej, dlaczego to działa?
minocha
Twój przykład był trochę zagmatwany, pierwszy blok kodu był wszystkim, co musiałeś powiedzieć.
Partack
1

Podoba mi się odpowiedź right2clicky, zwłaszcza, że ​​nie wymaga ona zrywania wątku i tworzenia nowego za każdym razem, gdy Timer tyka. Ponadto łatwo jest utworzyć klasę z wywołaniem zwrotnym timera, które jest wywoływane okresowo. To mój normalny przypadek użycia:

class MyClass(RepeatTimer):
    def __init__(self, period):
        super().__init__(period, self.on_timer)

    def on_timer(self):
        print("Tick")


if __name__ == "__main__":
    mc = MyClass(1)
    mc.start()
    time.sleep(5)
    mc.cancel()
Frank P.
źródło
1

Jest to alternatywna implementacja wykorzystująca funkcję zamiast klasy. Zainspirowany przez @Andrew Wilkins powyżej.

Ponieważ czekanie jest dokładniejsze niż uśpienie (bierze pod uwagę czas działania funkcji):

import threading

PING_ON = threading.Event()

def ping():
  while not PING_ON.wait(1):
    print("my thread %s" % str(threading.current_thread().ident))

t = threading.Thread(target=ping)
t.start()

sleep(5)
PING_ON.set()
Paul Kenjora
źródło
1

Wymyśliłem inne rozwiązanie z klasą SingleTon. Proszę, powiedz mi, czy doszło do wycieku pamięci.

import time,threading

class Singleton:
  __instance = None
  sleepTime = 1
  executeThread = False

  def __init__(self):
     if Singleton.__instance != None:
        raise Exception("This class is a singleton!")
     else:
        Singleton.__instance = self

  @staticmethod
  def getInstance():
     if Singleton.__instance == None:
        Singleton()
     return Singleton.__instance


  def startThread(self):
     self.executeThread = True
     self.threadNew = threading.Thread(target=self.foo_target)
     self.threadNew.start()
     print('doing other things...')


  def stopThread(self):
     print("Killing Thread ")
     self.executeThread = False
     self.threadNew.join()
     print(self.threadNew)


  def foo(self):
     print("Hello in " + str(self.sleepTime) + " seconds")


  def foo_target(self):
     while self.executeThread:
        self.foo()
        print(self.threadNew)
        time.sleep(self.sleepTime)

        if not self.executeThread:
           break


sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()

sClass.getInstance().sleepTime = 2
sClass.startThread()
Yunus
źródło
0

Oprócz powyższych świetnych odpowiedzi przy użyciu wątków, na wypadek, gdybyś musiał użyć swojego głównego wątku lub wolisz podejście asynchroniczne - owinąłem krótką klasę wokół klasy aio_timers Timer (aby umożliwić powtarzanie)

import asyncio
from aio_timers import Timer

class RepeatingAsyncTimer():
    def __init__(self, interval, cb, *args, **kwargs):
        self.interval = interval
        self.cb = cb
        self.args = args
        self.kwargs = kwargs
        self.aio_timer = None
        self.start_timer()
    
    def start_timer(self):
        self.aio_timer = Timer(delay=self.interval, 
                               callback=self.cb_wrapper, 
                               callback_args=self.args, 
                               callback_kwargs=self.kwargs
                              )
    
    def cb_wrapper(self, *args, **kwargs):
        self.cb(*args, **kwargs)
        self.start_timer()


from time import time
def cb(timer_name):
    print(timer_name, time())

print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')

zegar zaczyna się od: 16024388 40 .9690785

timer_ 1 16024388 45 .980087

timer_ 2 16024388 50 .9806316

timer_ 1 16024388 50 .9808934

timer_ 1 16.024.388 55 +0,9863033

timer_ 2 16.024.388 60 0,9868324

timer_ 1 16.024.388 60 ,9876585

mork
źródło