import requests if __name__ == "__main__": # Handle running this script directly vs as a project from config import ntfy_url from utility import parsetime2 else: from skills.config import ntfy_url from skills.utility import parsetime2 import threading import schedule import time # def job_that_executes_once(): # Do some work that only needs to happen once... # return schedule.CancelJob def run_continuously(schedule, interval=1): # Borrowed from schedule documentation, why reinvent the wheel when its been created. """Continuously run, while executing pending jobs at each elapsed time interval. @return cease_continuous_run: threading. Event which can be set to cease continuous run. Please note that it is *intended behavior that run_continuously() does not run missed jobs*. For example, if you've registered a job that should run every minute and you set a continuous run interval of one hour then your job won't be run 60 times at each interval but only once. """ cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not cease_continuous_run.is_set(): schedule.run_pending() time.sleep(interval) continuous_thread = ScheduleThread() continuous_thread.start() return cease_continuous_run # while True: # schedule.run_pending() # time.sleep(1) class Timers: def __init__(self): self.trigger_phrase = "timer" self.timers = {} self.schedule = schedule.Scheduler() def _notify(self, device_id, timer_name): r = requests.post(f"https://ntfy.sh/{device_id}", data=f"{timer_name}", headers={ "Title": "Your timer is going off!", "Priority": "default", "Tags": "bell" }) return r def _add_timer(self, duration, name): if len(self.timers) == 0: self.stop_run_continuously = run_continuously(self.schedule) self.timers[name] = time.mktime(duration.timetuple()) self.schedule.every().day.at(time.strftime("%H:%M:%S", duration.timetuple())).do(self._trigger_timer, name).tag(name) # use https://schedule.readthedocs.io/en/stable/examples.html#run-a-job-once to trigger self._trigger_timer() def _remove_timer(self, name): del self.timers[name] if len(self.timers) == 0: self.stop_run_continuously.set() def _trigger_timer(self, name): if name in self.timers: res = self._notify(ntfy_url, name).text print(res) self._remove_timer(name) # TODO: play timer done sound, send response on api saying to listen to ntfy.sh for signal to trigger sound. # Better option for two way communication: WEBSOCKETS! return schedule.CancelJob def get_remaining_time(self, name=""): #TODO: test this function """Returns time remaining for timer as seconds remaining""" return self.timers[name]-time.mktime(datetime.now().timetuple()) # if name == "": def run(self, query="", duration_string="", name=""): if "add" in query: # duration = time.mktime(parsetime2(duration_string).timetuple()) duration = parsetime2(duration_string) self._add_timer(duration, name) return True # Return true to indicate success if "remove" in query: self._remove_timer(name) return True return False # Return false to indicate failure def _disable_timer_check_thread(self): self.stop_run_continuously.set() if __name__ == "__main__": dur = Timers() dur.run("add", "15 seconds", "test timer") # dur._add_timer(123, "123") # dur._trigger_timer("123")