app.py 11.3 KB

# python standard libraries
from os import path
import logging
from contextlib import contextmanager  # `with` statement in db sessions
import asyncio

# user installed packages
import bcrypt
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker #, scoped_session

# this project
from models import Student, Test, Question
import test
from tools import load_yaml

logger = logging.getLogger(__name__)

# ============================================================================
class AppException(Exception):
    pass

# ============================================================================
# helper functions
# ============================================================================
async def check_password(try_pw, password):
    try_pw = try_pw.encode('utf-8')
    loop = asyncio.get_running_loop()
    hashed_pw = await loop.run_in_executor(None, bcrypt.hashpw, try_pw, password)
    return password == hashed_pw

# ============================================================================
#   Application
# ============================================================================
class App(object):
    # -----------------------------------------------------------------------
    # helper to manage db sessions using the `with` statement, for example
    #   with self.db_session() as s:  s.query(...)
    @contextmanager
    def db_session(self):
        session = self.Session()
        try:
            yield session
            session.commit()
        except:
            session.rollback()
            logger.error('DB rollback!!!')
        finally:
            session.close()

    # ------------------------------------------------------------------------
    def __init__(self, conf={}):
        logger.info('Starting application')
        self.online = dict()    # {uid: {'student':{...}, 'test': {...}}, ...}
        self.allowed = set([])  # '0' is hardcoded to allowed elsewhere

        # build test configuration dictionary
        testconf = {}
        if conf['filename']:
            logger.info(f'Loading test configuration "{conf["filename"]}".')
            testconf.update(load_yaml(conf['filename']))

        testconf.update(conf)  # configuration overrides

        # start test factory
        try:
            self.testfactory = test.TestFactory(testconf)
        except test.TestFactoryException:
            logger.critical('Can\'t create test factory.')
            raise AppException()

        # connect to database and check registered students
        dbfile = path.expanduser(self.testfactory['database'])
        engine = create_engine(f'sqlite:///{dbfile}', echo=False)
        self.Session = sessionmaker(bind=engine)

        try:
            with self.db_session() as s:
                n = s.query(Student).filter(Student.id != '0').count()
        except Exception as e:
            logger.critical(f'Database not usable {self.testfactory["database"]}.')
            raise e
        else:
            logger.info(f'Database {self.testfactory["database"]} has {n} students.')

        # command line option --allow-all
        if conf['allow_all']:
            logger.info('Allowing all students')
            for student in self.get_all_students():
                self.allow_student(student[0])

    # -----------------------------------------------------------------------
    def exit(self):
        if len(self.online) > 1:
            online_students = ', '.join(self.online)
            logger.warning(f'Students still online: {online_students}')
        logger.critical('----------- !!! Server terminated !!! -----------')

    # -----------------------------------------------------------------------
    async def login(self, uid, try_pw):
        if uid.startswith('l'):  # remove prefix 'l'
            uid = uid[1:]

        if uid not in self.allowed and uid != '0':      # not allowed
            logger.warning(f'Student {uid}: not allowed to login.')
            return False

        # get name+password from db
        with self.db_session() as s:
            name, password = s.query(Student.name, Student.password).filter_by(id=uid).one()

        # first login updates the password
        if password == '':              # update password on first login
            self.update_student_password(uid, try_pw)
            pw_ok = True
        else:                           # check password
            pw_ok = await check_password(try_pw, password)  # async bcrypt

        if pw_ok:      # success            
            self.allowed.discard(uid)  # remove from set of allowed students
            if uid in self.online:
                logger.warning(f'Student {uid}: already logged in.')
            else:                      # make student online
                self.online[uid] = {'student': {'name': name, 'number': uid}}
                logger.info(f'Student {uid}: logged in.')
            return True
        else:          # wrong password
            logger.info(f'Student {uid}: wrong password.')
            return False


    # -----------------------------------------------------------------------
    def logout(self, uid):
        self.online.pop(uid, None) # remove from dict if exists
        logger.info(f'Student {uid}: logged out.')

    # -----------------------------------------------------------------------
    def generate_test(self, uid):
        if uid in self.online:
            logger.info(f'Student {uid}: generating new test.')
            student_id = self.online[uid]['student']
            self.online[uid]['test'] = self.testfactory.generate(student_id)
            return self.online[uid]['test']
        else:
            # this implies an error in the code. should never be here!
            logger.critical(f'Student {uid}: offline, can\'t generate test')
            return None

    # -----------------------------------------------------------------------
    # ans is a dictionary {question_index: answer, ...}
    # for example:  {0:'hello', 1:[1,2]}
    async def correct_test(self, uid, ans):
        t = self.online[uid]['test']
        t.update_answers(ans)
        grade = await t.correct()

        # save test in JSON format
        fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json'
        fpath = path.join(t['answers_dir'], fname)
        t.save_json(fpath)

        # insert test and questions into database
        with self.db_session() as s:
            s.add(Test(
                ref=t['ref'],
                title=t['title'],
                grade=t['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                filename=fpath,
                student_id=t['student']['number'],
                state=t['state'],
                comment=''))
            s.add_all([Question(
                ref=q['ref'],
                grade=q['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                student_id=t['student']['number'],
                test_id=t['ref']) for q in t['questions'] if 'grade' in q])

        logger.info(f'Student {uid}: finished test.')
        return grade

    # -----------------------------------------------------------------------
    def giveup_test(self, uid):
        t = self.online[uid]['test']
        t.giveup()

        # save JSON with the test
        # fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json'
        # fpath = path.abspath(path.join(t['answers_dir'], fname))
        fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json'
        fpath = path.join(t['answers_dir'], fname)
        t.save_json(fpath)

        # insert test into database
        with self.db_session() as s:
            s.add(Test(
                ref=t['ref'],
                title=t['title'],
                grade=t['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                filename=fpath,
                student_id=t['student']['number'],
                state=t['state'],
                comment=''))

        logger.info(f'Student {uid}: gave up.')
        return t

    # -----------------------------------------------------------------------

    # --- helpers (getters)
    def get_student_name(self, uid):
        return self.online[uid]['student']['name']
    def get_student_test(self, uid, default=None):
        return self.online[uid].get('test', default)
    def get_questions_path(self):
        return self.testfactory['questions_dir']
    def get_student_grades_from_all_tests(self, uid):
        with self.db_session() as s:
            return s.query(Test.title, Test.grade, Test.finishtime).filter_by(student_id=uid).order_by(Test.finishtime)

    def get_json_filename_of_test(self, test_id):
        with self.db_session() as s:
            return s.query(Test.filename).filter_by(id=test_id).scalar()

    def get_online_students(self):            # [('uid', 'name', 'starttime')]
        return [(k, v['student']['name'], str(v.get('test', {}).get('start_time', '---'))) for k,v in self.online.items() if k != '0']

    def get_all_students(self):
        with self.db_session() as s:
            return s.query(Student.id, Student.name, Student.password).filter(Student.id!='0').order_by(Student.id)

    def get_student_grades_from_test(self, uid, testid):
        with self.db_session() as s:
            return s.query(Test.grade, Test.finishtime, Test.id).filter_by(student_id=uid).filter_by(ref=testid).all()

    def get_students_state(self):
        return [{
                'uid': uid,
                'name': name,
                'allowed': uid in self.allowed,
                'online': uid in self.online,
                'start_time': self.online.get(uid, {}).get('test', {}).get('start_time',''),
                'password_defined': pw != '',
                'grades': self.get_student_grades_from_test(uid, self.testfactory['ref']),
                # 'focus': self.online.get(uid, {}).get('student', {}).get('focus', True), # FIXME
            } for uid, name, pw in self.get_all_students()]

    def get_allowed_students(self):
        # set of 'uid' allowed to login
        return self.allowed

    def get_file(self, uid, ref, key):
        # return filename corresponding to (uid, ref, name) if declared in the question
        t = self.get_test(uid)
        for q in t['questions']:
            if q['ref'] == ref and key in q['files']:
                return path.abspath(path.join(q['path'], q['files'][key]))

    # --- helpers (change state)
    def allow_student(self, uid):
        self.allowed.add(uid)
        logger.info(f'Student {uid}: allowed to login.')

    def deny_student(self, uid):
        self.allowed.discard(uid)
        logger.info(f'Student {uid}: denied to login')

    def update_student_password(self, uid, pw=''):
        if pw:
            pw = bcrypt.hashpw(pw.encode('utf-8'), bcrypt.gensalt())
        with self.db_session() as s:
            student = s.query(Student).filter_by(id=uid).one()
            student.password = pw
        logger.info(f'Student {uid}: password updated.')

    def insert_new_student(self, uid, name):
        try:
            with self.db_session() as s:
                s.add(Student(id=uid, name=name, password=''))
        except Exception:
            logger.error(f'Insert failed: student {uid} already exists.')
        else:
            logger.info(f'New student inserted: {uid}, {name}')