# python standard libraries from os import path import logging from contextlib import contextmanager # `with` statement in db sessions import asyncio # user installed packages import bcrypt from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker #, scoped_session # this project from models import Student, Test, Question import test from tools import load_yaml logger = logging.getLogger(__name__) # ============================================================================ class AppException(Exception): pass # ============================================================================ # helper functions # ============================================================================ async def check_password(try_pw, password): try_pw = try_pw.encode('utf-8') loop = asyncio.get_running_loop() hashed_pw = await loop.run_in_executor(None, bcrypt.hashpw, try_pw, password) return password == hashed_pw # ============================================================================ # Application # ============================================================================ class App(object): # ----------------------------------------------------------------------- # helper to manage db sessions using the `with` statement, for example # with self.db_session() as s: s.query(...) @contextmanager def db_session(self): session = self.Session() try: yield session session.commit() except: session.rollback() logger.error('DB rollback!!!') finally: session.close() # ------------------------------------------------------------------------ def __init__(self, conf={}): logger.info('Starting application') self.online = dict() # {uid: {'student':{...}, 'test': {...}}, ...} self.allowed = set([]) # '0' is hardcoded to allowed elsewhere # build test configuration dictionary testconf = {} if conf['filename']: logger.info(f'Loading test configuration "{conf["filename"]}".') testconf.update(load_yaml(conf['filename'])) testconf.update(conf) # configuration overrides # start test factory try: self.testfactory = test.TestFactory(testconf) except test.TestFactoryException: logger.critical('Can\'t create test factory.') raise AppException() # connect to database and check registered students dbfile = path.expanduser(self.testfactory['database']) engine = create_engine(f'sqlite:///{dbfile}', echo=False) self.Session = sessionmaker(bind=engine) try: with self.db_session() as s: n = s.query(Student).filter(Student.id != '0').count() except Exception as e: logger.critical(f'Database not usable {self.testfactory["database"]}.') raise e else: logger.info(f'Database {self.testfactory["database"]} has {n} students.') # command line option --allow-all if conf['allow_all']: logger.info('Allowing all students') for student in self.get_all_students(): self.allow_student(student[0]) # ----------------------------------------------------------------------- def exit(self): if len(self.online) > 1: online_students = ', '.join(self.online) logger.warning(f'Students still online: {online_students}') logger.critical('----------- !!! Server terminated !!! -----------') # ----------------------------------------------------------------------- async def login(self, uid, try_pw): if uid.startswith('l'): # remove prefix 'l' uid = uid[1:] if uid not in self.allowed and uid != '0': # not allowed logger.warning(f'Student {uid}: not allowed to login.') return False # get name+password from db with self.db_session() as s: name, password = s.query(Student.name, Student.password).filter_by(id=uid).one() # first login updates the password if password == '': # update password on first login self.update_student_password(uid, try_pw) pw_ok = True else: # check password pw_ok = await check_password(try_pw, password) # async bcrypt if pw_ok: # success self.allowed.discard(uid) # remove from set of allowed students if uid in self.online: logger.warning(f'Student {uid}: already logged in.') else: # make student online self.online[uid] = {'student': {'name': name, 'number': uid}} logger.info(f'Student {uid}: logged in.') return True else: # wrong password logger.info(f'Student {uid}: wrong password.') return False # ----------------------------------------------------------------------- def logout(self, uid): self.online.pop(uid, None) # remove from dict if exists logger.info(f'Student {uid}: logged out.') # ----------------------------------------------------------------------- async def generate_test(self, uid): if uid in self.online: logger.info(f'Student {uid}: started generating new test.') student_id = self.online[uid]['student'] # {number, name} self.online[uid]['test'] = await self.testfactory.generate(student_id) logger.debug(f'Student {uid}: finished generating test.') return self.online[uid]['test'] else: # this implies an error in the code. should never be here! logger.critical(f'Student {uid}: offline, can\'t generate test') # ----------------------------------------------------------------------- # ans is a dictionary {question_index: answer, ...} # for example: {0:'hello', 1:[1,2]} async def correct_test(self, uid, ans): t = self.online[uid]['test'] t.update_answers(ans) grade = await t.correct() # save test in JSON format fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json' fpath = path.join(t['answers_dir'], fname) t.save_json(fpath) # insert test and questions into database with self.db_session() as s: s.add(Test( ref=t['ref'], title=t['title'], grade=t['grade'], starttime=str(t['start_time']), finishtime=str(t['finish_time']), filename=fpath, student_id=t['student']['number'], state=t['state'], comment='')) s.add_all([Question( ref=q['ref'], grade=q['grade'], starttime=str(t['start_time']), finishtime=str(t['finish_time']), student_id=t['student']['number'], test_id=t['ref']) for q in t['questions'] if 'grade' in q]) logger.info(f'Student {uid}: finished test.') return grade # ----------------------------------------------------------------------- def giveup_test(self, uid): t = self.online[uid]['test'] t.giveup() # save JSON with the test # fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json' # fpath = path.abspath(path.join(t['answers_dir'], fname)) fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json' fpath = path.join(t['answers_dir'], fname) t.save_json(fpath) # insert test into database with self.db_session() as s: s.add(Test( ref=t['ref'], title=t['title'], grade=t['grade'], starttime=str(t['start_time']), finishtime=str(t['finish_time']), filename=fpath, student_id=t['student']['number'], state=t['state'], comment='')) logger.info(f'Student {uid}: gave up.') return t # ----------------------------------------------------------------------- # --- helpers (getters) def get_student_name(self, uid): return self.online[uid]['student']['name'] def get_student_test(self, uid, default=None): return self.online[uid].get('test', default) def get_questions_path(self): return self.testfactory['questions_dir'] def get_student_grades_from_all_tests(self, uid): with self.db_session() as s: return s.query(Test.title, Test.grade, Test.finishtime).filter_by(student_id=uid).order_by(Test.finishtime) def get_json_filename_of_test(self, test_id): with self.db_session() as s: return s.query(Test.filename).filter_by(id=test_id).scalar() def get_online_students(self): # [('uid', 'name', 'starttime')] return [(k, v['student']['name'], str(v.get('test', {}).get('start_time', '---'))) for k,v in self.online.items() if k != '0'] def get_all_students(self): with self.db_session() as s: return s.query(Student.id, Student.name, Student.password).filter(Student.id!='0').order_by(Student.id) def get_student_grades_from_test(self, uid, testid): with self.db_session() as s: return s.query(Test.grade, Test.finishtime, Test.id).filter_by(student_id=uid).filter_by(ref=testid).all() def get_students_state(self): return [{ 'uid': uid, 'name': name, 'allowed': uid in self.allowed, 'online': uid in self.online, 'start_time': self.online.get(uid, {}).get('test', {}).get('start_time',''), 'password_defined': pw != '', 'grades': self.get_student_grades_from_test(uid, self.testfactory['ref']), # 'focus': self.online.get(uid, {}).get('student', {}).get('focus', True), # FIXME } for uid, name, pw in self.get_all_students()] def get_allowed_students(self): # set of 'uid' allowed to login return self.allowed def get_file(self, uid, ref, key): # return filename corresponding to (uid, ref, name) if declared in the question t = self.get_test(uid) for q in t['questions']: if q['ref'] == ref and key in q['files']: return path.abspath(path.join(q['path'], q['files'][key])) # --- helpers (change state) def allow_student(self, uid): self.allowed.add(uid) logger.info(f'Student {uid}: allowed to login.') def deny_student(self, uid): self.allowed.discard(uid) logger.info(f'Student {uid}: denied to login') def update_student_password(self, uid, pw=''): if pw: pw = bcrypt.hashpw(pw.encode('utf-8'), bcrypt.gensalt()) with self.db_session() as s: student = s.query(Student).filter_by(id=uid).one() student.password = pw logger.info(f'Student {uid}: password updated.') def insert_new_student(self, uid, name): try: with self.db_session() as s: s.add(Student(id=uid, name=name, password='')) except Exception: logger.error(f'Insert failed: student {uid} already exists.') else: logger.info(f'New student inserted: {uid}, {name}')