You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

115 lines
3.5 KiB
Python

"""
Basically going to be the same as timers.py
Maybe we just call timers.py from here?
"""
import requests
# if __name__ == "__main__": # Handle running this script directly vs as a project
from config import ntfy_url
from utility import parsetime6
from skill import Skill
# else:
# from skills.config import ntfy_url
# from skills.utility import parsetime
# from skills.skill import Skill
import threading
import schedule
import time
def run_continuously(schedule, interval=1):
# Borrowed from schedule documentation, why reinvent the wheel when its been created.
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
class Alarms(Skill):
def __init__(self):
self.trigger_phrase = "alarm"
self.alarms = {} #This is a good canidate for dashboard data
self.schedule = schedule.Scheduler()
def _notify(self, device_id, alarm_name):
r = requests.post(f"https://ntfy.sh/{device_id}",
data=f"{alarm_name}",
headers={
"Title": "Your alarm is going off!",
"Priority": "default",
"Tags": "bell"
})
return r
def _add_alarm(self, duration, name):
if len(self.alarms) == 0:
self.stop_run_continuously = run_continuously(self.schedule)
self.alarms[name] = time.mktime(duration.timetuple())
self.schedule.every().day.at(time.strftime("%H:%M:%S", duration.timetuple())).do(self._trigger_alarm, name).tag(name)
# use https://schedule.readthedocs.io/en/stable/examples.html#run-a-job-once to trigger self._trigger_alarm()
def _remove_alarm(self, name):
del self.alarms[name]
if len(self.alarms) == 0:
self.stop_run_continuously.set()
def _trigger_alarm(self, name):
if name in self.alarms:
res = self._notify(ntfy_url, name).text
print(res)
self._remove_alarm(name)
# TODO: play alarm done sound, send response on api saying to listen to ntfy.sh for signal to trigger sound.
# Better option for two way communication: WEBSOCKETS!
return schedule.CancelJob
def get_remaining_time(self, name=""): #TODO: test this function
"""Returns time remaining for alarm as seconds remaining"""
return self.alarms[name]-time.mktime(datetime.now().timetuple())
# if name == "":
def run(self, query="", duration_string="", name=""):
if "add" in query:
# duration = time.mktime(parsetime2(duration_string).timetuple())
duration = parsetime6(duration_string)[0]
self._add_alarm(duration, name)
return True # Return true to indicate success
if "remove" in query:
self._remove_alarm(name)
return True
return False # Return false to indicate failure
def _disable_alarm_check_thread(self):
self.stop_run_continuously.set()
if __name__ == "__main__":
dur = Alarms()
dur.run("add", "818 in the afternoon", "test alarm")
# dur._add_alarm(123, "123")
# dur._trigger_alarm("123")