You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

110 lines
2.9 KiB
Python

import requests
if __name__ == "__main__": # Handle
from config import ntfy_url
else:
from skills.config import ntfy_url
import threading
import schedule
import time
# def job_that_executes_once():
# Do some work that only needs to happen once...
# return schedule.CancelJob
def run_continuously(schedule, interval=1):
# Borrowed from schedule documentation, why reinvent the wheel when its been created.
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
# while True:
# schedule.run_pending()
# time.sleep(1)
class Timers:
def __init__(self):
self.trigger_phrase = "timer"
self.timers = {}
self.schedule = schedule.Scheduler()
def _notify(self, device_id, timer_name):
r = requests.post(f"https://ntfy.sh/{device_id}",
data=f"{timer_name}",
headers={
"Title": "Your timer is going off!",
"Priority": "default",
"Tags": "bell"
})
return r
def _add_timer(self, time, name):
if len(self.timers) == 0:
self.stop_run_continuously = run_continuously()
self.timers[name] = time
# duration =
self.schedule.every().day.at(time.strftime("%H:%M:%S", time.gmtime(duration))).do(self._trigger_timer(name)).tag(name)
# use https://schedule.readthedocs.io/en/stable/examples.html#run-a-job-once to trigger self._trigger_timer()
def _remove_timer(self, name):
del self.timers[name]
if len(self.timers) == 0:
self.stop_run_continuously.set()
def _trigger_timer(self, name):
if name in self.timers:
res = self._notify(ntfy_url, name).text
print(res)
self._remove_timer(name)
# #TODO: play timer done sound
return schedule.CancelJob
def get_remaining_time(self, name=""):
pass
# if name == "":
def run(self, query="", time=0, name=""):
if "add" in query:
self._add_timer(time, name)
return True # Return true to indicate success
if "remove" in query:
self._remove_timer(name)
return True
return False # Return false to indicate failure
def _disable_timer_check_thread(self):
self.stop_run_continuously.set()
if __name__ == "__main__":
time = Timers()
time._add_timer(123, "123")
time._trigger_timer("123")