app.py 14.7 KB
'''
Main application module
'''


# python standard libraries
import asyncio
from contextlib import contextmanager  # `with` statement in db sessions
import csv
import io
import json
import logging
from os import path

# installed packages
import bcrypt
from sqlalchemy import create_engine, exc
from sqlalchemy.orm import sessionmaker

# this project
from perguntations.models import Student, Test, Question
from perguntations.tools import load_yaml
from perguntations.test import TestFactory, TestFactoryException

logger = logging.getLogger(__name__)


# ============================================================================
class AppException(Exception):
    '''Exception raised in this module'''


# ============================================================================
# helper functions
# ============================================================================
async def check_password(try_pw, password):
    '''check password in executor'''
    try_pw = try_pw.encode('utf-8')
    loop = asyncio.get_running_loop()
    hashed = await loop.run_in_executor(None, bcrypt.hashpw, try_pw, password)
    return password == hashed


async def hash_password(password):
    '''hash password in executor'''
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, bcrypt.hashpw,
                                      password.encode('utf-8'),
                                      bcrypt.gensalt())


# ============================================================================
# ============================================================================
class App():
    '''
    This is the main application
    state:
      self.Session
      self.online - {uid:
                         {'student':{...}, 'test': {...}},
                          ...
                     }
      self.allowd - {'123', '124', ...}
      self.testfactory - TestFactory
    '''

    # ------------------------------------------------------------------------
    @contextmanager
    def db_session(self):
        '''
        helper to manage db sessions using the `with` statement, for example:
        with self.db_session() as s:  s.query(...)
        '''
        session = self.Session()
        try:
            yield session
            session.commit()
        except exc.SQLAlchemyError:
            logger.error('DB rollback!!!')
            session.rollback()
            raise
        finally:
            session.close()

    # ------------------------------------------------------------------------
    def __init__(self, conf):
        self.online = dict()    # {uid: {'student':{...}, 'test': {...}}, ...}
        self.allowed = set([])  # '0' is hardcoded to allowed elsewhere

        logger.info('Loading test configuration "%s".', conf["testfile"])
        try:
            testconf = load_yaml(conf['testfile'])
        except Exception as exc:
            logger.critical('Error loading test configuration YAML.')
            raise AppException(exc)

        testconf.update(conf)  # command line options override configuration

        # start test factory
        try:
            self.testfactory = TestFactory(testconf)
        except TestFactoryException as exc:
            logger.critical(exc)
            raise AppException('Failed to create test factory!')
        else:
            logger.info('No errors found. Test factory ready.')

        # connect to database and check registered students
        dbfile = self.testfactory['database']
        database = f'sqlite:///{path.expanduser(dbfile)}'
        engine = create_engine(database, echo=False)
        self.Session = sessionmaker(bind=engine)
        try:
            with self.db_session() as sess:
                num = sess.query(Student).filter(Student.id != '0').count()
        except Exception:
            raise AppException(f'Database unusable {dbfile}.')
        else:
            logger.info('Database "%s" has %s students.', dbfile, num)

        # command line option --allow-all
        if conf['allow_all']:
            logger.info('Allowing all students:')
            for student in self.get_all_students():
                self.allow_student(student[0])
        else:
            logger.info('Students not yet allowed to login.')

    # ------------------------------------------------------------------------
    # FIXME unused???
    # def exit(self):
    #     if len(self.online) > 1:
    #         online_students = ', '.join(self.online)
    #         logger.warning(f'Students still online: {online_students}')
    #     logger.critical('----------- !!! Server terminated !!! -----------')

    # ------------------------------------------------------------------------
    async def login(self, uid, try_pw):
        '''login authentication'''
        if uid not in self.allowed and uid != '0':      # not allowed
            logger.warning('Student %s: not allowed to login.', uid)
            return False

        # get name+password from db
        with self.db_session() as sess:
            name, password = sess.query(Student.name, Student.password)\
                                 .filter_by(id=uid)\
                                 .one()

        # first login updates the password
        if password == '':              # update password on first login
            await self.update_student_password(uid, try_pw)
            pw_ok = True
        else:                           # check password
            pw_ok = await check_password(try_pw, password)  # async bcrypt

        if pw_ok:      # success
            self.allowed.discard(uid)  # remove from set of allowed students
            if uid in self.online:
                logger.warning('Student %s: already logged in.', uid)
            else:                      # make student online
                self.online[uid] = {'student': {'name': name, 'number': uid}}
                logger.info('Student %s: logged in.', uid)
            return True
                        # wrong password
        logger.info('Student %s: wrong password.', uid)
        return False

    # ------------------------------------------------------------------------
    def logout(self, uid):
        '''student logout'''
        self.online.pop(uid, None)  # remove from dict if exists
        logger.info('Student %s: logged out.', uid)

    # ------------------------------------------------------------------------
    async def generate_test(self, uid):
        '''generate a test for a given student'''
        if uid in self.online:
            logger.info('Student %s: generating new test.', uid)
            student_id = self.online[uid]['student']  # {number, name}
            test = await self.testfactory.generate(student_id)
            self.online[uid]['test'] = test
            logger.info('Student %s: test is ready.', uid)
            return self.online[uid]['test']

        # this implies an error in the code. should never be here!
        logger.critical('Student %s: offline, can\'t generate test', uid)

    # ------------------------------------------------------------------------
    async def correct_test(self, uid, ans):
        '''
        Corrects test

        ans is a dictionary {question_index: answer, ...}
        for example:  {0:'hello', 1:[1,2]}
        '''
        test = self.online[uid]['test']

        # --- submit answers and correct test
        test.update_answers(ans)
        logger.info('Student %s: %d answers submitted.', uid, len(ans))

        grade = await test.correct()
        logger.info('Student %s: grade = %g points.', uid, grade)

        # --- save test in JSON format
        fields = (uid, test['ref'], str(test['finish_time']))
        fname = '--'.join(fields) + '.json'
        fpath = path.join(test['answers_dir'], fname)
        with open(path.expanduser(fpath), 'w') as file:
            # default=str required for datetime objects
            json.dump(test, file, indent=2, default=str)
        logger.info('Student %s: saved JSON.', uid)

        # --- insert test and questions into database
        with self.db_session() as sess:
            sess.add(Test(
                ref=test['ref'],
                title=test['title'],
                grade=test['grade'],
                starttime=str(test['start_time']),
                finishtime=str(test['finish_time']),
                filename=fpath,
                student_id=uid,
                state=test['state'],
                comment=''))
            sess.add_all([Question(
                ref=q['ref'],
                grade=q['grade'],
                starttime=str(test['start_time']),
                finishtime=str(test['finish_time']),
                student_id=uid,
                test_id=test['ref'])
                          for q in test['questions'] if 'grade' in q])

        logger.info('Student %s: database updated.', uid)
        return grade

    # ------------------------------------------------------------------------
    def giveup_test(self, uid):
        '''giveup test - not used??'''
        test = self.online[uid]['test']
        test.giveup()

        # save JSON with the test
        fields = (test['student']['number'], test['ref'],
                  str(test['finish_time']))
        fname = '--'.join(fields) + '.json'
        fpath = path.join(test['answers_dir'], fname)
        test.save_json(fpath)

        # insert test into database
        with self.db_session() as sess:
            sess.add(Test(ref=test['ref'],
                          title=test['title'],
                          grade=test['grade'],
                          starttime=str(test['start_time']),
                          finishtime=str(test['finish_time']),
                          filename=fpath,
                          student_id=test['student']['number'],
                          state=test['state'],
                          comment=''))

        logger.info('Student %s: gave up.', uid)
        return test

    # ------------------------------------------------------------------------
    def event_test(self, uid, cmd, value):
        '''handles browser events the occur during the test'''
        if cmd == 'focus':
            logger.info('Student %s: focus %s', uid, value)
        elif cmd == 'size':
            scr_y, scr_x, win_y, win_x = value
            area = win_x * win_y / (scr_x * scr_y) * 100
            logger.info('Student %s: area=%g%%, window=%dx%d, screen=%dx%d',
                        uid, area, win_x, win_y, scr_x, scr_y)

    # ------------------------------------------------------------------------

    # --- helpers (getters)
    # def get_student_name(self, uid):
    #     return self.online[uid]['student']['name']

    def get_test_csv(self):
        '''generates a CSV with the grades of the test'''
        with self.db_session() as sess:
            grades = sess.query(Test.student_id, Test.grade,
                                Test.starttime, Test.finishtime)\
                         .filter(Test.ref == self.testfactory['ref'])\
                         .order_by(Test.student_id)\
                         .all()

        csvstr = io.StringIO()
        writer = csv.writer(csvstr, delimiter=';', quoting=csv.QUOTE_ALL)
        writer.writerow(('Número', 'Nota', 'Início', 'Fim'))
        writer.writerows(grades)
        return csvstr.getvalue()

    def get_student_test(self, uid, default=None):
        '''get test from online student'''
        return self.online[uid].get('test', default)

    # def get_questions_dir(self):
    #     return self.testfactory['questions_dir']

    def get_student_grades_from_all_tests(self, uid):
        '''get grades of student from all tests'''
        with self.db_session() as sess:
            return sess.query(Test.title, Test.grade, Test.finishtime)\
                       .filter_by(student_id=uid)\
                       .order_by(Test.finishtime)

    def get_json_filename_of_test(self, test_id):
        '''get JSON filename from database given the test_id'''
        with self.db_session() as sess:
            return sess.query(Test.filename)\
                       .filter_by(id=test_id)\
                       .scalar()

    def get_all_students(self):
        '''get all students from database'''
        with self.db_session() as sess:
            return sess.query(Student.id, Student.name, Student.password)\
                       .filter(Student.id != '0')\
                       .order_by(Student.id)

    def get_student_grades_from_test(self, uid, testid):
        '''get grades of student for a given testid'''
        with self.db_session() as sess:
            return sess.query(Test.grade, Test.finishtime, Test.id)\
                       .filter_by(student_id=uid)\
                       .filter_by(ref=testid)\
                       .all()

    def get_students_state(self):
        '''get list of states of every student'''
        return [{
            'uid': uid,
            'name': name,
            'allowed': uid in self.allowed,
            'online': uid in self.online,
            'start_time': self.online.get(uid, {}).get('test', {})
                          .get('start_time', ''),
            'password_defined': pw != '',
            'grades': self.get_student_grades_from_test(
                uid, self.testfactory['ref'])
            } for uid, name, pw in self.get_all_students()]

    # def get_allowed_students(self):
    #     # set of 'uid' allowed to login
    #     return self.allowed

    # def get_file(self, uid, ref, key):
    #     # get filename of (uid, ref, name) if declared in the question
    #     t = self.get_student_test(uid)
    #     for q in t['questions']:
    #         if q['ref'] == ref and key in q['files']:
    #             return path.abspath(path.join(q['path'], q['files'][key]))

    # --- helpers (change state)
    def allow_student(self, uid):
        '''allow a single student to login'''
        self.allowed.add(uid)
        logger.info('Student %s: allowed to login.', uid)

    def deny_student(self, uid):
        '''deny a single student to login'''
        self.allowed.discard(uid)
        logger.info('Student %s: denied to login', uid)

    async def update_student_password(self, uid, password=''):
        '''change password on the database'''
        if password:
            password = await hash_password(password)
        with self.db_session() as sess:
            student = sess.query(Student).filter_by(id=uid).one()
            student.password = password
        logger.info('Student %s: password updated.', uid)

    def insert_new_student(self, uid, name):
        '''insert new student into the database'''
        try:
            with self.db_session() as sess:
                sess.add(Student(id=uid, name=name, password=''))
        except exc.SQLAlchemyError:
            logger.error('Insert failed: student %s already exists?', uid)
        else:
            logger.info('New student inserted: %s, %s', uid, name)