from os import path import logging import bcrypt from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from models import Base, Student, Test, Question from contextlib import contextmanager # to use `with` statement for db sessions import test import threading logger = logging.getLogger(__name__) # ============================================================================ # Application # ============================================================================ class App(object): def __init__(self, filename, conf): # online = { # uid1: { # 'student': {'number': 123, 'name': john, ...}, # 'test': {...} # } # uid2: {...} # } logger.info('============= Running perguntations =============') self.lock = threading.Lock() self.online = dict() # {uid: {'student':{}}} self.allowed = set([]) # '0' is hardcoded to allowed elsewhere self.testfactory = test.TestFactory(filename, conf=conf) # database engine = create_engine('sqlite:///{}'.format(self.testfactory['database']), echo=False) Base.metadata.create_all(engine) # Criate schema if needed FIXME no student '0' self.Session = scoped_session(sessionmaker(bind=engine)) try: with self.db_session() as s: n = s.query(Student).filter(Student.id != '0').count() except Exception as e: logger.critical('Database not usable {}.'.format(self.testfactory['database'])) raise e else: logger.info('Database has {} students registered.'.format(n)) # ----------------------------------------------------------------------- # helper to manage db sessions using the `with` statement, for example # with self.db_session() as s: ... @contextmanager def db_session(self): try: yield self.Session() finally: self.Session.remove() # ----------------------------------------------------------------------- def exit(self): # FIXME what if there are online students? logger.critical('----------- !!! Server terminated !!! -----------') # ----------------------------------------------------------------------- def login(self, uid, try_pw): if uid not in self.allowed and uid != '0': # not allowed logger.warning('Student {}: not allowed to login.'.format(uid)) return False with self.db_session() as s: student = s.query(Student).filter(Student.id == uid).one_or_none() if student is None: # not found logger.warning('Student {}: not found in database.'.format(uid)) return False if student.password == '': # update password on first login hashed_pw = bcrypt.hashpw(try_pw.encode('utf-8'), bcrypt.gensalt()) student.password = hashed_pw s.commit() logger.warning('Student {}: first login, password updated.'.format(uid)) elif bcrypt.hashpw(try_pw.encode('utf-8'), student.password) != student.password: # wrong password logger.info('Student {}: wrong password.'.format(uid)) return False # success self.allowed.discard(uid) if uid in self.online: logger.warning('Student {}: already logged in.'.format(uid)) else: self.online[uid] = {'student': {'name': student.name, 'number': uid}} logger.info('Student {}: logged in.'.format(uid)) return True # ----------------------------------------------------------------------- def logout(self, uid): if uid not in self.online: # this should never happen logger.error('Student {}: tried to logout, but is not logged in.'.format(uid)) return False else: logger.info('Student {}: logged out.'.format(uid)) del self.online[uid] # FIXME Nao está a gravar o teste como desistencia... return True # ----------------------------------------------------------------------- def generate_test(self, uid): if uid in self.online: logger.info('Student {}: generating new test.'.format(uid)) student_id = self.online[uid]['student'] self.lock.acquire() # FIXME is it needed? self.online[uid]['test'] = self.testfactory.generate(student_id) self.lock.release() return self.online[uid]['test'] else: logger.error('Student {}: offline, can''t generate test'.format(uid)) return None # ----------------------------------------------------------------------- def correct_test(self, uid, ans): t = self.online[uid]['test'] t.update_answers(ans) grade = t.correct() logger.info('Student {0}: finished with {1} points.'.format(uid, grade)) # save JSON with the test fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json' fpath = path.abspath(path.join(t['answers_dir'], fname)) t.save_json(fpath) # insert test and questions into database with self.db_session() as s: s.add(Test( ref=t['ref'], title=t['title'], grade=t['grade'], starttime=str(t['start_time']), finishtime=str(t['finish_time']), filename=fpath, student_id=t['student']['number'])) s.add_all([Question( ref=q['ref'], grade=q['grade'], starttime=str(t['start_time']), finishtime=str(t['finish_time']), student_id=t['student']['number'], test_id=t['ref']) for q in t['questions'] if 'grade' in q]) s.commit() return grade # ----------------------------------------------------------------------- def giveup_test(self, uid): logger.info('Student {0}: gave up.'.format(uid)) t = self.online[uid]['test'] grade = t.giveup() if t['save_answers']: fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json' fpath = path.abspath(path.join(t['answers_dir'], fname)) t.save_json(fpath) return t # ----------------------------------------------------------------------- # --- helpers (getters) def get_student_name(self, uid): return self.online[uid]['student']['name'] def get_test(self, uid, default=None): return self.online[uid].get('test', default) def get_questions_path(self): return self.testfactory['questions_dir'] def get_test_qtypes(self, uid): return {q['ref']:q['type'] for q in self.online[uid]['test']['questions']} def get_student_grades_from_all_tests(self, uid): with self.db_session() as s: r = s.query(Test).filter_by(student_id=uid).order_by(Test.finishtime).all() return [(t.title, t.grade, t.finishtime) for t in r] def get_json_filename_of_test(self, test_id): with self.db_session() as s: return s.query(Test.filename).filter_by(id=test_id).one_or_none()[0] def get_online_students(self): # [('uid', 'name', 'starttime')] return [(k, v['student']['name'], str(v.get('test', {}).get('start_time', '---'))) for k,v in self.online.items() if k != '0'] def get_offline_students(self): # list of ('uid', 'name') sorted by uid return [u[:2] for u in self.get_all_students() if u[0] not in self.online] def get_all_students(self): # list of all ('uid', 'name', 'password') sorted by uid with self.db_session() as s: r = s.query(Student).all() return sorted(((u.id, u.name, u.password) for u in r if u.id != '0'), key=lambda k: k[0]) def get_student_grades_from_test(self, uid, testid): with self.db_session() as s: r = s.query(Test).filter_by(student_id=uid).filter_by(ref=testid).all() return [(u.grade, u.finishtime, u.id) for u in r] def get_students_state(self): # [{ # 'uid' : '12345' # 'name' : 'John Smith', # 'start_time': '', # 'grades' : [10.2, 13.1], # ... # }] l = [] for u in self.get_all_students(): uid, name, pw = u l.append({ 'uid': uid, 'name': name, 'allowed': uid in self.allowed, 'online': uid in self.online, 'start_time': self.online.get(uid, {}).get('test', {}).get('start_time',''), 'password_defined': pw != '', 'grades': self.get_student_grades_from_test(uid, self.testfactory['ref']), 'ip_address': self.online.get(uid, {}).get('student', {}).get('ip_address',''), 'user_agent': self.online.get(uid, {}).get('student', {}).get('user_agent',''), 'focus': self.online.get(uid, {}).get('student', {}).get('focus', True), }) return l def get_allowed_students(self): # set of 'uid' allowed to login return self.allowed def get_file(self, uid, ref, key): # return filename corresponding to (uid, ref, name) if declared in the question t = self.get_test(uid) for q in t['questions']: if q['ref'] == ref and key in q['files']: return path.abspath(path.join(q['path'], q['files'][key])) # --- helpers (change state) def allow_student(self, uid): self.allowed.add(uid) logger.info('Student {}: allowed to login.'.format(uid)) def deny_student(self, uid): self.allowed.discard(uid) logger.info('Student {}: denied to login'.format(uid)) def reset_password(self, uid): with self.db_session() as s: u = s.query(Student).filter(Student.id == uid).update({'password': ''}) s.commit() logger.info('Student {}: password reset to ""'.format(uid)) def set_user_agent(self, uid, user_agent=''): self.online[uid]['student']['user_agent'] = user_agent def set_user_ip(self, uid, ipaddress=''): self.online[uid]['student']['ip_address'] = ipaddress def insert_new_student(self, uid, name): try: with self.db_session() as s: s.add(Student(id=uid, name=name, password='')) s.commit() except Exception: logger.error('Insert failed: student {} already exists.'.format(uid)) else: logger.info('New student inserted into database: {}, {}'.format(uid, name)) def set_student_focus(self, uid, value): self.online[uid]['student']['focus'] = value