app.py 11.5 KB

# base packages
from os import path
import logging
from contextlib import contextmanager  # `with` statement in db sessions

# installed packages
import bcrypt
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

# this project
from models import Student, Test, Question
import test
from tools import load_yaml

logger = logging.getLogger(__name__)

# ============================================================================
class AppException(Exception):
    pass

# ============================================================================
#   Application
# ============================================================================
class App(object):
    def __init__(self, conf={}):
        # online = {
        #   uid1: {
        #     'student': {'number': 123, 'name': john, ...},
        #     'test': {...}
        #   },
        #   uid2: {...}
        # }
        logger.info('Starting application')
        self.online = dict()    # {uid: {'student':{}}}
        self.allowed = set([])  # '0' is hardcoded to allowed elsewhere

        # build test configuration dictionary
        testconf = {}
        if conf['filename']:
            logger.info(f'Loading test configuration "{conf["filename"]}".')
            testconf.update(load_yaml(conf['filename']))

        testconf.update(conf)  # configuration overrides

        # start test factory
        try:
            self.testfactory = test.TestFactory(testconf)
        except test.TestFactoryException:
            logger.critical('Can\'t create test factory.')
            raise AppException()

        # connect to database and check registered students
        dbfile = path.expanduser(self.testfactory['database'])
        engine = create_engine(f'sqlite:///{dbfile}', echo=False)
        self.Session = scoped_session(sessionmaker(bind=engine))

        try:
            with self.db_session() as s:
                n = s.query(Student).filter(Student.id != '0').count()
        except Exception as e:
            logger.critical(f'Database not usable {self.testfactory["database"]}.')
            raise e
        else:
            logger.info(f'Database has {n} students registered.')

        # command line option --allow-all
        if conf['allow_all']:
            logger.info('Allowing all students')
            for student in self.get_all_students():
                self.allow_student(student[0])

    # -----------------------------------------------------------------------
    def exit(self):
        if len(self.online) > 1:
            online_students = ', '.join(self.online)
            logger.warning(f'Students still online: {online_students}')
        logger.critical('----------- !!! Server terminated !!! -----------')

    # -----------------------------------------------------------------------
    # helper to manage db sessions using the `with` statement, for example
    #   with self.db_session() as s:  s.query(...)
    @contextmanager
    def db_session(self):
        try:
            yield self.Session()
        finally:
            self.Session.remove()

    # -----------------------------------------------------------------------
    def login(self, uid, try_pw):
        if uid not in self.allowed and uid != '0':
            # not allowed
            logger.warning(f'Student {uid}: not allowed to login.')
            return False

        with self.db_session() as s:
            student =  s.query(Student).filter(Student.id == uid).one_or_none()

            if student is None:
                # not found
                logger.warning(f'Student {uid}: not found in database.')
                return False

            if student.password == '':
                # update password on first login
                hashed_pw = bcrypt.hashpw(try_pw.encode('utf-8'), bcrypt.gensalt())
                student.password = hashed_pw
                s.commit()
                logger.info(f'Student {uid}: first login, password updated.')

            elif bcrypt.hashpw(try_pw.encode('utf-8'), student.password) != student.password:
                # wrong password
                logger.info(f'Student {uid}: wrong password.')
                return False

            # success
            self.allowed.discard(uid)
            if uid in self.online:
                logger.warning(f'Student {uid}: already logged in.')
            else:
                self.online[uid] = {'student': {'name': student.name, 'number': uid}}
                logger.info(f'Student {uid}: logged in.')

        return True

    # -----------------------------------------------------------------------
    def logout(self, uid):
        self.online.pop(uid, None) # remove from dict if exists
        logger.info(f'Student {uid}: logged out.')

    # -----------------------------------------------------------------------
    def generate_test(self, uid):
        if uid in self.online:
            logger.info(f'Student {uid}: generating new test.')
            student_id = self.online[uid]['student']
            self.online[uid]['test'] = self.testfactory.generate(student_id)
            return self.online[uid]['test']
        else:
            # this implies an error in the code. should never be here!
            logger.critical(f'Student {uid}: offline, can\'t generate test')
            return None

    # -----------------------------------------------------------------------
    # ans is a dictionary {question_index: answer, ...}
    # for example:  {0:'hello', 1:[1,2]}
    def correct_test(self, uid, ans):
        t = self.online[uid]['test']
        t.update_answers(ans)
        grade = t.correct()

        # save test in JSON format
        fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json'
        fpath = path.join(t['answers_dir'], fname)
        t.save_json(fpath)

        # insert test and questions into database
        with self.db_session() as s:
            s.add(Test(
                ref=t['ref'],
                title=t['title'],
                grade=t['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                filename=fpath,
                student_id=t['student']['number'],
                state=t['state'],
                comment=''))
            s.add_all([Question(
                ref=q['ref'],
                grade=q['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                student_id=t['student']['number'],
                test_id=t['ref']) for q in t['questions'] if 'grade' in q])
            s.commit()

        logger.info(f'Student {uid}: finished test.')
        return grade

    # -----------------------------------------------------------------------
    def giveup_test(self, uid):
        t = self.online[uid]['test']
        t.giveup()

        # save JSON with the test
        # fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json'
        # fpath = path.abspath(path.join(t['answers_dir'], fname))
        fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json'
        fpath = path.join(t['answers_dir'], fname)
        t.save_json(fpath)

        # insert test into database
        with self.db_session() as s:
            s.add(Test(
                ref=t['ref'],
                title=t['title'],
                grade=t['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                filename=fpath,
                student_id=t['student']['number'],
                state=t['state'],
                comment=''))
            s.commit()

        logger.info(f'Student {uid}: gave up.')
        return t

    # -----------------------------------------------------------------------

    # --- helpers (getters)
    def get_student_name(self, uid):
        return self.online[uid]['student']['name']
    def get_test(self, uid, default=None):
        return self.online[uid].get('test', default)
    def get_questions_path(self):
        return self.testfactory['questions_dir']
    def get_student_grades_from_all_tests(self, uid):
        with self.db_session() as s:
            r = s.query(Test).filter_by(student_id=uid).order_by(Test.finishtime).all()
        return [(t.title, t.grade, t.finishtime) for t in r]
    def get_json_filename_of_test(self, test_id):
        with self.db_session() as s:
            return s.query(Test.filename).filter_by(id=test_id).one_or_none()[0]
    def get_online_students(self):
        # [('uid', 'name', 'starttime')]
        return [(k, v['student']['name'], str(v.get('test', {}).get('start_time', '---'))) for k,v in self.online.items() if k != '0']

    def get_offline_students(self):
        # list of ('uid', 'name') sorted by uid
        return [u[:2] for u in self.get_all_students() if u[0] not in self.online]

    def get_all_students(self):
        # list of all ('uid', 'name', 'password') sorted by uid
        with self.db_session() as s:
            r = s.query(Student).all()
        return sorted(((u.id, u.name, u.password) for u in r if u.id != '0'), key=lambda k: k[0])

    def get_student_grades_from_test(self, uid, testid):
        with self.db_session() as s:
            r = s.query(Test).filter_by(student_id=uid).filter_by(ref=testid).all()
        return [(u.grade, u.finishtime, u.id) for u in r]

    def get_students_state(self):
        # [{
        #    'uid'       : '12345'
        #    'name'      : 'John Smith',
        #    'start_time': '',
        #    'grades'    : [10.2, 13.1],
        #    ...
        # }]
        l = []
        for u in self.get_all_students():
            uid, name, pw = u
            l.append({
                'uid': uid,
                'name': name,
                'allowed': uid in self.allowed,
                'online': uid in self.online,
                'start_time': self.online.get(uid, {}).get('test', {}).get('start_time',''),
                'password_defined': pw != '',
                'grades': self.get_student_grades_from_test(uid, self.testfactory['ref']),
                'focus': self.online.get(uid, {}).get('student', {}).get('focus', True), # FIXME
                })
        return l

    def get_allowed_students(self):
        # set of 'uid' allowed to login
        return self.allowed

    def get_file(self, uid, ref, key):
        # return filename corresponding to (uid, ref, name) if declared in the question
        t = self.get_test(uid)
        for q in t['questions']:
            if q['ref'] == ref and key in q['files']:
                return path.abspath(path.join(q['path'], q['files'][key]))

    # --- helpers (change state)
    def allow_student(self, uid):
        self.allowed.add(uid)
        logger.info(f'Student {uid}: allowed to login.')

    def deny_student(self, uid):
        self.allowed.discard(uid)
        logger.info(f'Student {uid}: denied to login')

    def reset_password(self, uid):
        with self.db_session() as s:
            u = s.query(Student).filter(Student.id == uid).update({'password': ''})
            s.commit()
        logger.info(f'Student {uid}: password reset to ""')

    def insert_new_student(self, uid, name):
        try:
            with self.db_session() as s:
                s.add(Student(id=uid, name=name, password=''))
                s.commit()
        except Exception:
            logger.error(f'Insert failed: student {uid} already exists.')
        else:
            logger.info(f'New student inserted: {uid}, {name}')

    def set_student_focus(self, uid, value):
        self.online[uid]['student']['focus'] = value