import requests if __name__ == "__main__": # Handle from config import ntfy_url else: from skills.config import ntfy_url import threading import schedule import time # def job_that_executes_once(): # Do some work that only needs to happen once... # return schedule.CancelJob def run_continuously(schedule, interval=1): # Borrowed from schedule documentation, why reinvent the wheel when its been created. """Continuously run, while executing pending jobs at each elapsed time interval. @return cease_continuous_run: threading. Event which can be set to cease continuous run. Please note that it is *intended behavior that run_continuously() does not run missed jobs*. For example, if you've registered a job that should run every minute and you set a continuous run interval of one hour then your job won't be run 60 times at each interval but only once. """ cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not cease_continuous_run.is_set(): schedule.run_pending() time.sleep(interval) continuous_thread = ScheduleThread() continuous_thread.start() return cease_continuous_run # while True: # schedule.run_pending() # time.sleep(1) class Timers: def __init__(self): self.trigger_phrase = "timer" self.timers = {} self.schedule = schedule.Scheduler() def _notify(self, device_id, timer_name): r = requests.post(f"https://ntfy.sh/{device_id}", data=f"{timer_name}", headers={ "Title": "Your timer is going off!", "Priority": "default", "Tags": "bell" }) return r def _add_timer(self, time, name): if len(self.timers) == 0: self.stop_run_continuously = run_continuously() self.timers[name] = time # duration = self.schedule.every().day.at(time.strftime("%H:%M:%S", time.gmtime(duration))).do(self._trigger_timer(name)).tag(name) # use https://schedule.readthedocs.io/en/stable/examples.html#run-a-job-once to trigger self._trigger_timer() def _remove_timer(self, name): del self.timers[name] if len(self.timers) == 0: self.stop_run_continuously.set() def _trigger_timer(self, name): if name in self.timers: res = self._notify(ntfy_url, name).text print(res) self._remove_timer(name) # #TODO: play timer done sound return schedule.CancelJob def get_remaining_time(self, name=""): pass # if name == "": def run(self, query="", time=0, name=""): if "add" in query: self._add_timer(time, name) return True # Return true to indicate success if "remove" in query: self._remove_timer(name) return True return False # Return false to indicate failure def _disable_timer_check_thread(self): self.stop_run_continuously.set() if __name__ == "__main__": time = Timers() time._add_timer(123, "123") time._trigger_timer("123")