309 lines
8.6 KiB
Python
309 lines
8.6 KiB
Python
"""Python FastAPI Auth0 integration example
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from pydantic import BaseModel
|
|
|
|
from fastapi import FastAPI, Security
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
|
from .utils import VerifyToken
|
|
from .database import TodoDatabase
|
|
from .thermal_print import ThermalPrinter
|
|
from .sync_calendar import SyncData
|
|
|
|
import errno
|
|
import os
|
|
# from wsgiref import simple_server
|
|
|
|
# Creates app instance
|
|
app = FastAPI()
|
|
auth = VerifyToken()
|
|
data = TodoDatabase()
|
|
calendar = SyncData()
|
|
printer = ThermalPrinter(data)
|
|
# printer.print_default() #temp debug
|
|
# printer.print_custom("Configuration Site: \n\n")
|
|
# printer.print_qrcode()
|
|
# printer.print_custom("https://thermaltodos.imsam.ca\n\n")
|
|
# printer.finished_printing()
|
|
|
|
|
|
|
|
|
|
class Todo(BaseModel):
|
|
time: str
|
|
task: str
|
|
recurring: bool
|
|
|
|
class TodoList(BaseModel):
|
|
date: str #Always selected date OLD TEXT: #Either current date in %Y-%m-%d format or "recurring" for a recurring task
|
|
todos: list[Todo]
|
|
|
|
|
|
class TodoDate(BaseModel):
|
|
date: str
|
|
# date: str = datetime.today().strftime('%Y-%m-%d')
|
|
|
|
|
|
class PrintAction(BaseModel):
|
|
date: str = datetime.today().strftime('%Y-%m-%d')
|
|
action: str #Options:
|
|
# all (pulls current todos from date, needs date)
|
|
# sudoku (prints a random sudoku)
|
|
# todos (prints only dates todos)
|
|
# wordsearch (prints a random wordsearch)
|
|
# quote (prints a random quote)
|
|
# greeting (prints a greeting)
|
|
# sentence (prints a custom message)
|
|
sentence: str = ""
|
|
|
|
class GoogleUpdate(BaseModel):
|
|
code: str
|
|
|
|
# @app.get("/api/public")
|
|
# def public():
|
|
# """No access token required to access this route"""
|
|
|
|
# result = {
|
|
# "status": "success",
|
|
# "msg": ("Hello from a public endpoint! You don't need to be "
|
|
# "authenticated to see this.")
|
|
# }
|
|
# return result
|
|
|
|
# import google.oauth2.credentials
|
|
# import google_auth_oauthlib.flow
|
|
# import googleapiclient.discovery
|
|
|
|
# from google.auth.transport.requests import Request
|
|
# from google.oauth2.credentials import Credentials
|
|
# import os
|
|
# import json
|
|
|
|
# from google_auth_oauthlib.flow import InstalledAppFlow
|
|
# from googleapiclient.discovery import build
|
|
# from googleapiclient.errors import HttpError
|
|
|
|
# scopes =
|
|
# SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
|
|
|
|
@app.post("/api/google/update")
|
|
def google_update(googleCode: GoogleUpdate, auth_result: str = Security(auth.verify)):
|
|
|
|
if calendar.get_new_creds(googleCode.code):
|
|
calendar.get_calendar_events()
|
|
with open(".firsttimeruncheck", 'w') as f:
|
|
f.write("true")
|
|
return {"status": "success"}
|
|
return {"status": "error"}
|
|
|
|
|
|
# print(googleCode.code)
|
|
|
|
# creds = None
|
|
|
|
# if os.path.exists('token.json'):
|
|
# creds = Credentials.from_authorized_user_file('token.json', SCOPES)
|
|
# # else:
|
|
# # flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
|
|
# # 'client_secret.json',
|
|
# # scopes=['https://www.googleapis.com/auth/calendar.readonly'],
|
|
# # redirect_uri="postmessage"
|
|
# # # state=state
|
|
# # )
|
|
|
|
# # creds = flow.fetch_token(code=googleCode.code)
|
|
# # print(creds)
|
|
# # with open("token.json", 'w') as token:
|
|
# # token.write(creds.to_json())
|
|
|
|
# if not creds or not creds.valid:
|
|
# if creds and creds.expired and creds.refresh_token:
|
|
# creds.refresh(Request())
|
|
# else:
|
|
# # flow = InstalledAppFlow.from_client_secrets_file(
|
|
# # 'credentials.json', SCOPES)
|
|
# # creds = flow.run_local_server(port=0)
|
|
|
|
# flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
|
|
# 'client_secret.json',
|
|
# scopes=SCOPES,
|
|
# redirect_uri="postmessage"
|
|
# # state=state
|
|
# )
|
|
# flow.fetch_token(code=googleCode.code)
|
|
# creds = flow.credentials
|
|
# # print(creds)
|
|
|
|
# # Save the credentials for the next run
|
|
# with open('token.json', 'w') as token:
|
|
# token.write(creds.to_json())
|
|
# # json.dump(token, creds)
|
|
|
|
|
|
# # if not creds or not creds.valid:
|
|
# # return {"status": "google already autherized"}
|
|
# # creds = Credentials.from_authorized_user_file('token.json', SCOPES)
|
|
|
|
|
|
|
|
|
|
|
|
@app.get('/api/first_run')
|
|
def first_run():
|
|
if os.path.exists('.firsttimeruncheck'):
|
|
with open('.firsttimeruncheck', 'r') as f:
|
|
data = f.read()
|
|
if data == "true":
|
|
return {"status": "Not First Run"}
|
|
else:
|
|
with open('.firsttimeruncheck', 'w') as f:
|
|
f.write("false")
|
|
|
|
return {"status": "First Run"}
|
|
# try:
|
|
# with open('.firsttimeruncheck', 'x') as f:
|
|
# f.write("")
|
|
# return {"status": "First Run"}
|
|
# except OSError as e:
|
|
# print(e)
|
|
# if e.errno == errno.EEXIST:
|
|
# return {"status": "Not First Run"}
|
|
|
|
|
|
|
|
@app.get("/api/todos/get")
|
|
def get_todos(date: str = datetime.today().strftime('%Y-%m-%d'), auth_result: str = Security(auth.verify)):
|
|
"""A valid access token is required to access this route"""
|
|
|
|
return {
|
|
"todos": data.read_todos(date)
|
|
}
|
|
|
|
# result = {
|
|
# "todos": [
|
|
# {"time": "1:00pm to 2:00pm", "text": "Read a book"},
|
|
# {"time": "2:00pm to 3:00pm", "text": "Go for a walk"}
|
|
# ]
|
|
# } #TODO: replace with database access
|
|
|
|
# return result
|
|
|
|
|
|
# return auth_result
|
|
|
|
|
|
@app.post("/api/todos/write")
|
|
def write_todos(todos: TodoList, auth_result: str = Security(auth.verify)):
|
|
"""A valid access token is required to access this route"""
|
|
|
|
# result = {
|
|
# "todos": [
|
|
# {"time": "1:00pm to 2:00pm", "text": "Read a book"},
|
|
# {"time": "2:00pm to 3:00pm", "text": "Go for a walk"}
|
|
# ]
|
|
# } #TODO: replace with database write
|
|
|
|
# return result
|
|
|
|
# data.save_todo()
|
|
|
|
print(todos)
|
|
|
|
# if todos.date != "recurring":
|
|
# data.remove_todos(todos.date)
|
|
data.remove_todos(todos.date)
|
|
data.save_todos(todos.date, todos.todos)
|
|
|
|
return {"result": todos.date}
|
|
|
|
# return todos
|
|
|
|
|
|
# @app.post("/api/todos/write_recurring")
|
|
# def write_todos_recurring(todos: TodoList, auth_result: str = Security(auth.verify)):
|
|
|
|
# data.save_todos("recurring", todos.todos)
|
|
|
|
# return {"result": "success"}
|
|
|
|
|
|
@app.post("/api/todos/print")
|
|
def print_todos(action: PrintAction, auth_result: str = Security(auth.verify)):
|
|
"""A valid access token is required to access this route"""
|
|
|
|
# result = {
|
|
# "todos": [
|
|
# {"time": "1:00pm to 2:00pm", "text": "Read a book"},
|
|
# {"time": "2:00pm to 3:00pm", "text": "Go for a walk"}
|
|
# ]
|
|
# } #TODO: replace with access to database if no todos provided from request, otherwise print request data
|
|
|
|
#Options:
|
|
# all (pulls current todos from date, needs date)
|
|
# sudoku (prints a random sudoku)
|
|
# todos (prints only dates todos)
|
|
# wordsearch (prints a random wordsearch)
|
|
# quote (prints a random quote)
|
|
# greeting (prints a greeting)
|
|
# sentence (prints a custom message)
|
|
if action.action == "all":
|
|
printer.print_default()
|
|
|
|
if action.action == "sudoku":
|
|
printer.print_sudoku()
|
|
|
|
if action.action == "todos":
|
|
todos = data.read_todos(action.date)
|
|
printer.print_todos(todos)
|
|
|
|
if action.action == "wordsearch":
|
|
printer.print_wordsearch()
|
|
|
|
if action.action == "quote":
|
|
printer.print_random_quote()
|
|
|
|
if action.action == "greeting":
|
|
printer.print_greeting()
|
|
|
|
if action.action == "sentence":
|
|
printer.print_custom(action.sentence)
|
|
|
|
|
|
printer.finished_printing()
|
|
|
|
|
|
# todos = data.get_todos(action.date)
|
|
# printer.print_todos()
|
|
|
|
return {"result": "success"}
|
|
|
|
# class SaveTodos:
|
|
# def on_post(self, req, resp):
|
|
# pass #Handle web interface sending updated todo list
|
|
|
|
# class PrintTodos:
|
|
# def on_post(self, req, resp):
|
|
# pass #Handle web interface requesting a print action
|
|
|
|
|
|
# @app.get("/api/get_todos-scoped")
|
|
# def private_scoped(auth_result: str = Security(auth.verify, scopes=['todos:all'])):
|
|
# """A valid access token and an appropriate scope are required to access
|
|
# this route
|
|
# """
|
|
|
|
# return auth_result
|
|
|
|
|
|
app.mount("/", StaticFiles(directory="application/static",html = True), name="static")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
printer.print_default()
|
|
|
|
# if __name__ == '__main__':
|
|
# print('ay')
|
|
# httpd = simple_server.make_server('127.0.0.1', 8000, app)
|
|
# httpd.serve_forever() |