Alarm inital copy from Timer, Inital Phone skill, readme updates

This commit is contained in:
samerbam 2023-09-06 20:25:18 -04:00
parent df6f072158
commit 5329e0af72
5 changed files with 158 additions and 18 deletions

View File

@ -6,6 +6,110 @@ Maybe we just call timers.py from here?
"""
class Alarms:
import requests
if __name__ == "__main__": # Handle running this script directly vs as a project
from config import ntfy_url
from utility import parsetime2
from skill import Skill
else:
from skills.config import ntfy_url
from skills.utility import parsetime2
from skills.skill import Skill
import threading
import schedule
import time
def run_continuously(schedule, interval=1):
# Borrowed from schedule documentation, why reinvent the wheel when its been created.
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
class Alarms(Skill):
def __init__(self):
self.trigger_phrase = "alarms"
self.trigger_phrase = "alarm"
self.alarms = {} #This is a good canidate for dashboard data
self.schedule = schedule.Scheduler()
def _notify(self, device_id, alarm_name):
r = requests.post(f"https://ntfy.sh/{device_id}",
data=f"{alarm_name}",
headers={
"Title": "Your alarm is going off!",
"Priority": "default",
"Tags": "bell"
})
return r
def _add_alarm(self, duration, name):
if len(self.alarms) == 0:
self.stop_run_continuously = run_continuously(self.schedule)
self.alarms[name] = time.mktime(duration.timetuple())
self.schedule.every().day.at(time.strftime("%H:%M:%S", duration.timetuple())).do(self._trigger_alarm, name).tag(name)
# use https://schedule.readthedocs.io/en/stable/examples.html#run-a-job-once to trigger self._trigger_alarm()
def _remove_alarm(self, name):
del self.alarms[name]
if len(self.alarms) == 0:
self.stop_run_continuously.set()
def _trigger_alarm(self, name):
if name in self.alarms:
res = self._notify(ntfy_url, name).text
print(res)
self._remove_alarm(name)
# TODO: play alarm done sound, send response on api saying to listen to ntfy.sh for signal to trigger sound.
# Better option for two way communication: WEBSOCKETS!
return schedule.CancelJob
def get_remaining_time(self, name=""): #TODO: test this function
"""Returns time remaining for alarm as seconds remaining"""
return self.alarms[name]-time.mktime(datetime.now().timetuple())
# if name == "":
def run(self, query="", duration_string="", name=""):
if "add" in query:
# duration = time.mktime(parsetime2(duration_string).timetuple())
duration = parsetime2(duration_string)
self._add_alarm(duration, name)
return True # Return true to indicate success
if "remove" in query:
self._remove_alarm(name)
return True
return False # Return false to indicate failure
def _disable_alarm_check_thread(self):
self.stop_run_continuously.set()
if __name__ == "__main__":
dur = Alarms()
dur.run("add", "15 seconds", "test alarm")
# dur._add_alarm(123, "123")
# dur._trigger_alarm("123")

View File

@ -7,9 +7,8 @@ https://git.imsam.ca/sam/ThermalTodos/src/branch/main/application/sync_calendar.
Choice:
Google Calendar or iCloud or both
Both would be more work, but would allow for more flexibility
* Google Calendar or iCloud or both
* Both would be more work, but would allow for more flexibility
"""
from skills.skill import Skill

39
backend/skills/phone.py Normal file
View File

@ -0,0 +1,39 @@
import requests
if __name__ == '__main__':
from skill import Skill
from config import ntfy_url
else:
from skills.skill import Skill
from skills.config import ntfy_url
class Phone(Skill):
def __init__(self):
self.trigger_phrase = "phone"
def _send_to_phone(self, name, number):
r = requests.post(f"https://ntfy.sh/{ntfy_url}",
data=f" ",
headers={
"Title": f"Phone {name}",
"Priority": "default",
"Tags": "iphone",
"Click": f"tel://{number}"
})
return r
def parse_phone_string(self, text):
return ["name", "111-222-3333"]
#Todo: parse out phone number from icloud contacts (maybe google contacts? depending on where its stored)
def phone_call(self, text):
parsed_output = self.parse_phone_string(text)
self._send_to_phone(parsed_output[0], parsed_output[1])
if __name__ == '__main__':
p = Phone()
p._send_to_phone("Trav", "4169074987")

View File

@ -9,16 +9,10 @@ else:
from skills.utility import parsetime2
from skills.skill import Skill
import threading
import schedule
import time
# def job_that_executes_once():
# Do some work that only needs to happen once...
# return schedule.CancelJob
def run_continuously(schedule, interval=1):
# Borrowed from schedule documentation, why reinvent the wheel when its been created.
"""Continuously run, while executing pending jobs at each
@ -45,11 +39,6 @@ def run_continuously(schedule, interval=1):
return cease_continuous_run
# while True:
# schedule.run_pending()
# time.sleep(1)
class Timers(Skill):
def __init__(self):
self.trigger_phrase = "timer"

View File

@ -11,6 +11,7 @@ def parsetime(phrase):
ts = datetime.now()
p = ctparse(phrase, ts=ts)
# print(p)
if p is not None:
return p.resolution.dt
return p
@ -27,12 +28,16 @@ def parsetime2(phrase):
"""
time_struct, parse_status = parsedatetime.Calendar().parse(phrase)
print(time_struct, parse_status)
return datetime(*time_struct[:6])
if __name__ == "__main__":
t = parsetime('May 5th 2:30 in the afternoon')
t = parsetime('May 5th in the afternoon')
print(t)
# t5 = parsetime('in 5 minutes 30 seconds')
# print(t5)
# t2 = parsetime('15 seconds')
# # print(t2)
# # print(t2)
@ -46,6 +51,10 @@ if __name__ == "__main__":
t3 = parsetime2('in 5 minutes 30 seconds')
print(time.mktime(t3.timetuple()))
print(time.strftime("%H:%M:%S", t3.timetuple()))
t4 = parsetime2('4 in the afternoon')
print(time.mktime(t4.timetuple()))
print(t4)
# print(time.strftime("%H:%M:%S", t3.timetuple()))
# for x in t:
# print(x)