"""Python FastAPI Auth0 integration example """ from datetime import datetime from pydantic import BaseModel from fastapi import FastAPI, Security from fastapi.staticfiles import StaticFiles from .utils import VerifyToken from .database import TodoDatabase from .thermal_print import ThermalPrinter from .sync_calendar import SyncData from .tododatatypes import * from .scheduler_process import run_continuously import errno import os # from wsgiref import simple_server # Creates app instance app = FastAPI() auth = VerifyToken() data = TodoDatabase() calendar = SyncData() printer = ThermalPrinter(data) stop_run_continuously = run_continuously() # printer.print_default() #temp debug # printer.print_custom("Configuration Site: \n\n") # printer.print_qrcode() # printer.print_custom("https://thermaltodos.imsam.ca\n\n") # printer.finished_printing() # class Todo(BaseModel): # time: str # task: str # recurring: bool # class TodoList(BaseModel): # date: str #Always selected date OLD TEXT: #Either current date in %Y-%m-%d format or "recurring" for a recurring task # todos: list[Todo] # class TodoDate(BaseModel): # date: str # # date: str = datetime.today().strftime('%Y-%m-%d') # class PrintAction(BaseModel): # date: str = datetime.today().strftime('%Y-%m-%d') # action: str #Options: # # all (pulls current todos from date, needs date) # # sudoku (prints a random sudoku) # # todos (prints only dates todos) # # wordsearch (prints a random wordsearch) # # quote (prints a random quote) # # greeting (prints a greeting) # # sentence (prints a custom message) # sentence: str = "" # class GoogleUpdate(BaseModel): # code: str # @app.get("/api/public") # def public(): # """No access token required to access this route""" # result = { # "status": "success", # "msg": ("Hello from a public endpoint! You don't need to be " # "authenticated to see this.") # } # return result # import google.oauth2.credentials # import google_auth_oauthlib.flow # import googleapiclient.discovery # from google.auth.transport.requests import Request # from google.oauth2.credentials import Credentials # import os # import json # from google_auth_oauthlib.flow import InstalledAppFlow # from googleapiclient.discovery import build # from googleapiclient.errors import HttpError # scopes = # SCOPES = ['https://www.googleapis.com/auth/calendar.readonly'] @app.post("/api/google/update") def google_update(googleCode: GoogleUpdate, auth_result: str = Security(auth.verify)): if calendar.get_new_creds(googleCode.code): calendar.get_calendar_events() with open(".firsttimeruncheck", 'w') as f: f.write("true") return {"status": "success"} return {"status": "error"} # print(googleCode.code) # creds = None # if os.path.exists('token.json'): # creds = Credentials.from_authorized_user_file('token.json', SCOPES) # # else: # # flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( # # 'client_secret.json', # # scopes=['https://www.googleapis.com/auth/calendar.readonly'], # # redirect_uri="postmessage" # # # state=state # # ) # # creds = flow.fetch_token(code=googleCode.code) # # print(creds) # # with open("token.json", 'w') as token: # # token.write(creds.to_json()) # if not creds or not creds.valid: # if creds and creds.expired and creds.refresh_token: # creds.refresh(Request()) # else: # # flow = InstalledAppFlow.from_client_secrets_file( # # 'credentials.json', SCOPES) # # creds = flow.run_local_server(port=0) # flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( # 'client_secret.json', # scopes=SCOPES, # redirect_uri="postmessage" # # state=state # ) # flow.fetch_token(code=googleCode.code) # creds = flow.credentials # # print(creds) # # Save the credentials for the next run # with open('token.json', 'w') as token: # token.write(creds.to_json()) # # json.dump(token, creds) # # if not creds or not creds.valid: # # return {"status": "google already autherized"} # # creds = Credentials.from_authorized_user_file('token.json', SCOPES) @app.get('/api/first_run') def first_run(): if os.path.exists('.firsttimeruncheck'): with open('.firsttimeruncheck', 'r') as f: data = f.read() if data == "true": return {"status": "Not First Run"} else: with open('.firsttimeruncheck', 'w') as f: f.write("false") return {"status": "First Run"} # try: # with open('.firsttimeruncheck', 'x') as f: # f.write("") # return {"status": "First Run"} # except OSError as e: # print(e) # if e.errno == errno.EEXIST: # return {"status": "Not First Run"} @app.get("/api/todos/get") def get_todos(date: str = datetime.today().strftime('%Y-%m-%d'), auth_result: str = Security(auth.verify)): """A valid access token is required to access this route""" return { "todos": data.read_todos(date) } # result = { # "todos": [ # {"time": "1:00pm to 2:00pm", "text": "Read a book"}, # {"time": "2:00pm to 3:00pm", "text": "Go for a walk"} # ] # } #TODO: replace with database access # return result # return auth_result @app.post("/api/todos/write") def write_todos(todos: TodoList, auth_result: str = Security(auth.verify)): """A valid access token is required to access this route""" # result = { # "todos": [ # {"time": "1:00pm to 2:00pm", "text": "Read a book"}, # {"time": "2:00pm to 3:00pm", "text": "Go for a walk"} # ] # } #TODO: replace with database write # return result # data.save_todo() print(todos) # if todos.date != "recurring": # data.remove_todos(todos.date) data.remove_todos(todos.date) data.save_todos(todos.date, todos.todos) return {"result": todos.date} # return todos # @app.post("/api/todos/write_recurring") # def write_todos_recurring(todos: TodoList, auth_result: str = Security(auth.verify)): # data.save_todos("recurring", todos.todos) # return {"result": "success"} @app.post("/api/todos/remove_recurring") def remove_recurring_todo(todo: Todo, auth_result: str = Security(auth.verify)): data.remove_recurring(todo) return {"status": "success"} @app.post("/api/todos/print") def print_todos(action: PrintAction, auth_result: str = Security(auth.verify)): """A valid access token is required to access this route""" # result = { # "todos": [ # {"time": "1:00pm to 2:00pm", "text": "Read a book"}, # {"time": "2:00pm to 3:00pm", "text": "Go for a walk"} # ] # } #TODO: replace with access to database if no todos provided from request, otherwise print request data #Options: # all (pulls current todos from date, needs date) # sudoku (prints a random sudoku) # todos (prints only dates todos) # wordsearch (prints a random wordsearch) # quote (prints a random quote) # greeting (prints a greeting) # sentence (prints a custom message) if action.action == "all": printer.print_default() if action.action == "sudoku": printer.print_sudoku() if action.action == "todos": todos = data.read_todos(action.date) printer.print_todos(todos) if action.action == "wordsearch": printer.print_wordsearch() if action.action == "quote": printer.print_random_quote() if action.action == "greeting": printer.print_greeting() if action.action == "sentence": printer.print_custom(action.sentence) printer.finished_printing() # todos = data.get_todos(action.date) # printer.print_todos() return {"result": "success"} # class SaveTodos: # def on_post(self, req, resp): # pass #Handle web interface sending updated todo list # class PrintTodos: # def on_post(self, req, resp): # pass #Handle web interface requesting a print action # @app.get("/api/get_todos-scoped") # def private_scoped(auth_result: str = Security(auth.verify, scopes=['todos:all'])): # """A valid access token and an appropriate scope are required to access # this route # """ # return auth_result app.mount("/", StaticFiles(directory="application/static",html = True), name="static") if __name__ == '__main__': printer.print_default() # if __name__ == '__main__': # print('ay') # httpd = simple_server.make_server('127.0.0.1', 8000, app) # httpd.serve_forever()