# base packages from os import path import logging from contextlib import contextmanager # `with` statement in db sessions # installed packages import bcrypt from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session # this project from models import Student, Test, Question import test from tools import load_yaml logger = logging.getLogger(__name__) # ============================================================================ class AppException(Exception): pass # ============================================================================ # Application # ============================================================================ class App(object): def __init__(self, conf={}): # online = { # uid1: { # 'student': {'number': 123, 'name': john, ...}, # 'test': {...} # }, # uid2: {...} # } logger.info('Starting application') self.online = dict() # {uid: {'student':{}}} self.allowed = set([]) # '0' is hardcoded to allowed elsewhere # build test configuration dictionary testconf = {} if conf['filename']: logger.info(f'Loading test configuration "{conf["filename"]}".') testconf.update(load_yaml(conf['filename'])) testconf.update(conf) # configuration overrides # start test factory try: self.testfactory = test.TestFactory(testconf) except test.TestFactoryException: logger.critical('Can\'t create test factory.') raise AppException() # connect to database and check registered students dbfile = path.expanduser(self.testfactory['database']) engine = create_engine(f'sqlite:///{dbfile}', echo=False) self.Session = scoped_session(sessionmaker(bind=engine)) try: with self.db_session() as s: n = s.query(Student).filter(Student.id != '0').count() except Exception as e: logger.critical(f'Database not usable {self.testfactory["database"]}.') raise e else: logger.info(f'Database {self.testfactory["database"]} has {n} students.') # command line option --allow-all if conf['allow_all']: logger.info('Allowing all students') for student in self.get_all_students(): self.allow_student(student[0]) # ----------------------------------------------------------------------- def exit(self): if len(self.online) > 1: online_students = ', '.join(self.online) logger.warning(f'Students still online: {online_students}') logger.critical('----------- !!! Server terminated !!! -----------') # ----------------------------------------------------------------------- # helper to manage db sessions using the `with` statement, for example # with self.db_session() as s: s.query(...) @contextmanager def db_session(self): try: yield self.Session() finally: self.Session.remove() # ----------------------------------------------------------------------- def login(self, uid, try_pw): if uid not in self.allowed and uid != '0': # not allowed logger.warning(f'Student {uid}: not allowed to login.') return False with self.db_session() as s: student = s.query(Student).filter(Student.id == uid).one_or_none() if student is None: # not found logger.warning(f'Student {uid}: not found in database.') return False if student.password == '': # update password on first login hashed_pw = bcrypt.hashpw(try_pw.encode('utf-8'), bcrypt.gensalt()) student.password = hashed_pw s.commit() logger.info(f'Student {uid}: first login, password updated.') elif bcrypt.hashpw(try_pw.encode('utf-8'), student.password) != student.password: # wrong password logger.info(f'Student {uid}: wrong password.') return False # success self.allowed.discard(uid) if uid in self.online: logger.warning(f'Student {uid}: already logged in.') else: self.online[uid] = {'student': {'name': student.name, 'number': uid}} logger.info(f'Student {uid}: logged in.') return True # ----------------------------------------------------------------------- def logout(self, uid): self.online.pop(uid, None) # remove from dict if exists logger.info(f'Student {uid}: logged out.') # ----------------------------------------------------------------------- def generate_test(self, uid): if uid in self.online: logger.info(f'Student {uid}: generating new test.') student_id = self.online[uid]['student'] self.online[uid]['test'] = self.testfactory.generate(student_id) return self.online[uid]['test'] else: # this implies an error in the code. should never be here! logger.critical(f'Student {uid}: offline, can\'t generate test') return None # ----------------------------------------------------------------------- # ans is a dictionary {question_index: answer, ...} # for example: {0:'hello', 1:[1,2]} def correct_test(self, uid, ans): t = self.online[uid]['test'] t.update_answers(ans) grade = t.correct() # save test in JSON format fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json' fpath = path.join(t['answers_dir'], fname) t.save_json(fpath) # insert test and questions into database with self.db_session() as s: s.add(Test( ref=t['ref'], title=t['title'], grade=t['grade'], starttime=str(t['start_time']), finishtime=str(t['finish_time']), filename=fpath, student_id=t['student']['number'], state=t['state'], comment='')) s.add_all([Question( ref=q['ref'], grade=q['grade'], starttime=str(t['start_time']), finishtime=str(t['finish_time']), student_id=t['student']['number'], test_id=t['ref']) for q in t['questions'] if 'grade' in q]) s.commit() logger.info(f'Student {uid}: finished test.') return grade # ----------------------------------------------------------------------- def giveup_test(self, uid): t = self.online[uid]['test'] t.giveup() # save JSON with the test # fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json' # fpath = path.abspath(path.join(t['answers_dir'], fname)) fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json' fpath = path.join(t['answers_dir'], fname) t.save_json(fpath) # insert test into database with self.db_session() as s: s.add(Test( ref=t['ref'], title=t['title'], grade=t['grade'], starttime=str(t['start_time']), finishtime=str(t['finish_time']), filename=fpath, student_id=t['student']['number'], state=t['state'], comment='')) s.commit() logger.info(f'Student {uid}: gave up.') return t # ----------------------------------------------------------------------- # --- helpers (getters) def get_student_name(self, uid): return self.online[uid]['student']['name'] def get_test(self, uid, default=None): return self.online[uid].get('test', default) def get_questions_path(self): return self.testfactory['questions_dir'] def get_student_grades_from_all_tests(self, uid): with self.db_session() as s: r = s.query(Test).filter_by(student_id=uid).order_by(Test.finishtime).all() return [(t.title, t.grade, t.finishtime) for t in r] def get_json_filename_of_test(self, test_id): with self.db_session() as s: return s.query(Test.filename).filter_by(id=test_id).one_or_none()[0] def get_online_students(self): # [('uid', 'name', 'starttime')] return [(k, v['student']['name'], str(v.get('test', {}).get('start_time', '---'))) for k,v in self.online.items() if k != '0'] def get_offline_students(self): # list of ('uid', 'name') sorted by uid return [u[:2] for u in self.get_all_students() if u[0] not in self.online] def get_all_students(self): # list of all ('uid', 'name', 'password') sorted by uid with self.db_session() as s: r = s.query(Student).all() return sorted(((u.id, u.name, u.password) for u in r if u.id != '0'), key=lambda k: k[0]) def get_student_grades_from_test(self, uid, testid): with self.db_session() as s: r = s.query(Test).filter_by(student_id=uid).filter_by(ref=testid).all() return [(u.grade, u.finishtime, u.id) for u in r] def get_students_state(self): # [{ # 'uid' : '12345' # 'name' : 'John Smith', # 'start_time': '', # 'grades' : [10.2, 13.1], # ... # }] l = [] for u in self.get_all_students(): uid, name, pw = u l.append({ 'uid': uid, 'name': name, 'allowed': uid in self.allowed, 'online': uid in self.online, 'start_time': self.online.get(uid, {}).get('test', {}).get('start_time',''), 'password_defined': pw != '', 'grades': self.get_student_grades_from_test(uid, self.testfactory['ref']), 'focus': self.online.get(uid, {}).get('student', {}).get('focus', True), # FIXME }) return l def get_allowed_students(self): # set of 'uid' allowed to login return self.allowed def get_file(self, uid, ref, key): # return filename corresponding to (uid, ref, name) if declared in the question t = self.get_test(uid) for q in t['questions']: if q['ref'] == ref and key in q['files']: return path.abspath(path.join(q['path'], q['files'][key])) # --- helpers (change state) def allow_student(self, uid): self.allowed.add(uid) logger.info(f'Student {uid}: allowed to login.') def deny_student(self, uid): self.allowed.discard(uid) logger.info(f'Student {uid}: denied to login') def reset_password(self, uid): with self.db_session() as s: u = s.query(Student).filter(Student.id == uid).update({'password': ''}) s.commit() logger.info(f'Student {uid}: password reset') def insert_new_student(self, uid, name): try: with self.db_session() as s: s.add(Student(id=uid, name=name, password='')) s.commit() except Exception: logger.error(f'Insert failed: student {uid} already exists.') else: logger.info(f'New student inserted: {uid}, {name}') def set_student_focus(self, uid, value): self.online[uid]['student']['focus'] = value