#!/usr/bin/env python3 # base from os import path import sys import argparse import logging.config import json import base64 import uuid import mimetypes import signal # packages import tornado.ioloop import tornado.web import tornado.httpserver from tornado import template, gen # project from tools import load_yaml, md_to_html from app import App, AppException class WebApplication(tornado.web.Application): def __init__(self, testapp, debug=False): handlers = [ (r'/login', LoginHandler), (r'/logout', LogoutHandler), (r'/test', TestHandler), (r'/review', ReviewHandler), (r'/admin', AdminHandler), (r'/file/(.+)', FileHandler), # FIXME (r'/', RootHandler), # TODO multiple tests ] settings = { 'template_path': path.join(path.dirname(__file__), 'templates'), 'static_path': path.join(path.dirname(__file__), 'static'), 'static_url_prefix': '/static/', 'xsrf_cookies': False, # FIXME not needed on private network 'cookie_secret': base64.b64encode(uuid.uuid4().bytes), 'login_url': '/login', 'debug': debug, } super().__init__(handlers, **settings) self.testapp = testapp # ------------------------------------------------------------------------- # Base handler common to all handlers. # ------------------------------------------------------------------------- class BaseHandler(tornado.web.RequestHandler): @property def testapp(self): return self.application.testapp def get_current_user(self): cookie = self.get_secure_cookie('user') if cookie: return cookie.decode('utf-8') # ------------------------------------------------------------------------- # /login and /logout # ------------------------------------------------------------------------- class LoginHandler(BaseHandler): def get(self): self.render('login.html', error='') # @gen.coroutine def post(self): uid = self.get_body_argument('uid') if uid.startswith('l'): # remove prefix 'l' uid = uid[1:] pw = self.get_body_argument('pw') if self.testapp.login(uid, pw): self.set_secure_cookie("user", str(uid), expires_days=30) self.redirect(self.get_argument("next", "/")) else: self.render("login.html", error='Não autorizado ou número/senha inválido') # ------------------------------------------------------------------------- class LogoutHandler(BaseHandler): @tornado.web.authenticated def get(self): self.clear_cookie('user') self.redirect('/') def on_finish(self): self.testapp.logout(self.current_user) # ---------------------------------------------------------------------------- # Serves files from the /public subdir of the topics. # Based on https://bhch.github.io/posts/2017/12/serving-large-files-with-tornado-safely-without-blocking/ # ---------------------------------------------------------------------------- class FileHandler(BaseHandler): chunk_size = 4 * 1024 * 1024 # serve up to 4 MiB multiple times @tornado.web.authenticated def get(self, filename): uid = self.current_user # ref = self.get_query_argument('ref') # print(ref) # questions_path = self.testapp.get_questions_path() # p = path.join(questions_path, ref, 'public', filename) # print(p) logging.error(f'{uid} requested file but FileHandler is not working!!!') self.write('image') # # public_dir = self.learn.get_current_public_dir(uid) # filepath = path.expanduser(path.join(public_dir, filename)) # try: # f = open(filepath, 'rb') # except FileNotFoundError: # logging.error(f'File not found: {filepath}') # except PermissionError: # logging.error(f'No permission: {filepath}') # else: # content_type = mimetypes.guess_type(filename) # self.set_header("Content-Type", content_type[0]) # # divide the file into chunks and write one chunk at a time, so # # that the write does not block the ioloop for very long. # with f: # chunk = f.read(self.chunk_size) # while chunk: # try: # self.write(chunk) # write the cunk to response # await self.flush() # flush the current chunk to socket # except iostream.StreamClosedError: # break # client closed the connection # finally: # del chunk # await gen.sleep(0.000000001) # 1 nanosecond (hack) # # FIXME in the upcomming tornado 5.0 use `await asyncio.sleep(0)` instead # chunk = f.read(self.chunk_size) # ------------------------------------------------------------------------- # FIXME checkit # class FileHandler(BaseHandler): # @tornado.web.authenticated # def get(self): # uid = self.current_user # qref = self.get_query_argument('ref') # qfile = self.get_query_argument('file') # print(f'FileHandler: ref={ref}, file={file}') # self.write(self.testapp.get_file(ref, filename)) # if not os.path.isfile(file_location): # raise tornado.web.HTTPError(status_code=404) # content_type, _ = guess_type(file_location) # self.add_header('Content-Type', content_type) # with open(file_location) as source_file: # self.write(source_file.read()) # public_dir = self.learn.get_current_public_dir(uid) # FIXME!!! # filepath = path.expanduser(path.join(public_dir, filename)) # try: # f = open(filepath, 'rb') # except FileNotFoundError: # raise tornado.web.HTTPError(404) # else: # self.write(f.read()) # f.close() # ------------------------------------------------------------------------- # FIXME images missing, needs testing # ------------------------------------------------------------------------- class TestHandler(BaseHandler): templates = { 'radio': 'question-radio.html', 'checkbox': 'question-checkbox.html', 'text': 'question-text.html', 'text-regex': 'question-text.html', 'numeric-interval': 'question-text.html', 'textarea': 'question-textarea.html', # -- information panels -- 'information': 'question-information.html', 'info': 'question-information.html', 'warning': 'question-warning.html', 'warn': 'question-warning.html', 'alert': 'question-alert.html', 'success': 'question-success.html', } # GET @tornado.web.authenticated def get(self): uid = self.current_user t = self.testapp.get_test(uid) or self.testapp.generate_test(uid) self.render('test.html', t=t, md=md_to_html, templ=self.templates) # POST @tornado.web.authenticated def post(self): uid = self.current_user # self.request.arguments = {'answered-0': [b'on'], '0': [b'13.45']} # build dictionary ans={0: 'answer0', 1:, 'answer1', ...} # unanswered questions not included. t = self.testapp.get_test(uid) ans = {} for i, q in enumerate(t['questions']): qid = str(i) # question id if 'answered-' + qid in self.request.arguments: ans[i] = self.get_body_arguments(qid) # remove list when it does not make sense... if q['type'] == 'radio': if not ans[i]: ans[i] = None else: ans[i] = ans[i][0] elif q['type'] in ('text', 'text-regex', 'textarea', 'numeric-interval'): ans[i] = ans[i][0] self.testapp.correct_test(uid, ans) self.testapp.logout(uid) self.clear_cookie('user') self.render('grade.html', t=t, allgrades=self.testapp.get_student_grades_from_all_tests(uid)) # --- REVIEW ------------------------------------------------------------- class ReviewHandler(BaseHandler): _templates = { 'radio': 'review-question-radio.html', 'checkbox': 'review-question-checkbox.html', 'text': 'review-question-text.html', 'text-regex': 'review-question-text.html', 'numeric-interval': 'review-question-text.html', 'textarea': 'review-question-text.html', # -- information panels -- 'information': 'question-information.html', 'info': 'question-information.html', 'warning': 'question-warning.html', 'warn': 'question-warning.html', 'alert': 'question-alert.html', 'success': 'question-success.html', } @tornado.web.authenticated def get(self): uid = self.current_user if uid != '0': raise tornado.web.HTTPError(404) test_id = self.get_query_argument('test_id', None) try: fname = self.testapp.get_json_filename_of_test(test_id) except: raise tornado.web.HTTPError(404, 'Test ID not found.') try: f = open(path.expanduser(fname)) except OSError: logging.error(f'Cannot open "{fname}" for review.') else: with f: t = json.load(f) self.render('review.html', t=t, md=md_to_html, templ=self._templates) # ------------------------------------------------------------------------- # FIXME this should be a post in the test with command giveup instead of correct... class GiveupHandler(BaseHandler): @tornado.web.authenticated def get(self): uid = self.current_user t = self.testapp.giveup_test(uid) self.testapp.logout(uid) # --- Show result to student self.render('grade.html', t=t, allgrades=self.testapp.get_student_grades_from_all_tests(uid)) # ------------------------------------------------------------------------- # FIXME list available tests class RootHandler(BaseHandler): @tornado.web.authenticated def get(self): if self.current_user == '0': self.redirect('/admin') else: self.redirect('/test') # ------------------------------------------------------------------------- class AdminHandler(BaseHandler): @tornado.web.authenticated def get(self): if self.current_user != '0': raise tornado.web.HTTPError(404) # FIXME denied or not found?? cmd = self.get_query_argument('cmd', default=None) if cmd == 'students_table': data = {'data': self.testapp.get_students_state()} self.write(json.dumps(data, default=str)) elif cmd == 'test': # FIXME which test? data = { 'data': { 'title': self.testapp.testfactory['title'], 'ref': self.testapp.testfactory['ref'], 'filename': self.testapp.testfactory['filename'], 'database': self.testapp.testfactory['database'], 'answers_dir': self.testapp.testfactory['answers_dir'], } } self.write(json.dumps(data, default=str)) else: self.render('admin.html') @tornado.web.authenticated def post(self): if self.current_user != '0': self.redirect('/') cmd = self.get_body_argument('cmd', None) value = self.get_body_argument('value', None) if cmd == 'allow': self.testapp.allow_student(value) elif cmd == 'deny': self.testapp.deny_student(value) elif cmd == 'reset_password': self.testapp.reset_password(value) elif cmd == 'insert_student': s = json.loads(value) self.testapp.insert_new_student(uid=s['number'], name=s['name']) else: logging.error(f'Unknown command in post: "{cmd}"') # ------------------------------------------------------------------------- def signal_handler(signal, frame): r = input(' --> Stop webserver? (yes/no) ') if r in ('yes', 'YES'): tornado.ioloop.IOLoop.current().stop() logging.critical('Webserver stopped.') sys.exit(0) # ------------------------------------------------------------------------- # Tornado web server # ------------------------------------------------------------------------- def main(): # --- Commandline argument parsing argparser = argparse.ArgumentParser(description='Server for online tests. Enrolled students and tests have to be previously configured. Please read the documentation included with this software before running the server.') argparser.add_argument('testfile', type=str, nargs='+', help='test/exam in YAML format.') # FIXME only one exam supported at the moment argparser.add_argument('--allow-all', action='store_true', help='Students are initially allowed to login (can be denied later)') argparser.add_argument('--debug', action='store_true', help='Enable debug messages') argparser.add_argument('--show-ref', action='store_true', help='Show question references') argparser.add_argument('--review', action='store_true', help='Review mode: don\'t generate test') arg = argparser.parse_args() # --- Setup logging logger_file = 'logger-debug.yaml' if arg.debug else 'logger.yaml' SERVER_PATH = path.dirname(path.realpath(__file__)) LOGGER_CONF = path.join(SERVER_PATH, f'config/{logger_file}') try: logging.config.dictConfig(load_yaml(LOGGER_CONF)) except: print('An error ocurred while setting up the logging system.') sys.exit(1) logging.info('===============================================') # --- start application config = { 'filename': arg.testfile[0] or '', 'debug': arg.debug, 'allow_all': arg.allow_all, 'show_ref': arg.show_ref, 'review': arg.review, } try: testapp = App(config) except AppException: logging.critical('Failed to start application.') sys.exit(1) # --- create web application logging.info('Starting Web App (tornado)') try: webapp = WebApplication(testapp, debug=arg.debug) except Exception as e: logging.critical('Failed to start web application.') raise e # --- create webserver try: http_server = tornado.httpserver.HTTPServer(webapp, ssl_options={ "certfile": "certs/cert.pem", "keyfile": "certs/privkey.pem" }) except ValueError: logging.critical('Certificates cert.pem, privkey.pem not found') sys.exit(1) else: http_server.listen(8443) # --- run webserver logging.info('Webserver running... (Ctrl-C to stop)') signal.signal(signal.SIGINT, signal_handler) try: tornado.ioloop.IOLoop.current().start() # running... except Exception: logging.critical('Webserver stopped.') tornado.ioloop.IOLoop.current().stop() raise # ------------------------------------------------------------------------- if __name__ == "__main__": main()