app.py 11.3 KB


from os import path
import logging
import bcrypt
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from models import Base, Student, Test, Question
from contextlib import contextmanager  # to use `with` statement for db sessions

import test
import threading

logger = logging.getLogger(__name__)


# ============================================================================
#   Application
# ============================================================================
class App(object):
    def __init__(self, filename, conf):
        # online = {
        #   uid1: {
        #     'student': {'number': 123, 'name': john, ...},
        #     'test': {...}
        #   }
        #   uid2: {...}
        # }
        logger.info('============= Running perguntations =============')
        self.lock = threading.Lock()
        self.online = dict()    # {uid: {'student':{}}}
        self.allowed = set([])  # '0' is hardcoded to allowed elsewhere

        self.testfactory = test.TestFactory(filename, conf=conf)

        # database
        engine = create_engine('sqlite:///{}'.format(self.testfactory['database']), echo=False)
        self.Session = scoped_session(sessionmaker(bind=engine))

        try:
            with self.db_session() as s:
                n = s.query(Student).filter(Student.id != '0').count()
        except Exception as e:
            logger.critical('Database not usable {}.'.format(self.testfactory['database']))
            raise e
        else:
            logger.info('Database has {} students registered.'.format(n))


    # -----------------------------------------------------------------------
    # helper to manage db sessions using the `with` statement, for example
    #   with self.db_session() as s:  ...
    @contextmanager
    def db_session(self):
        try:
            yield self.Session()
        finally:
            self.Session.remove()

    # -----------------------------------------------------------------------
    def exit(self):
        # FIXME what if there are online students?
        logger.critical('----------- !!! Server terminated !!! -----------')

    # -----------------------------------------------------------------------
    def login(self, uid, try_pw):
        if uid not in self.allowed and uid != '0':
            # not allowed
            logger.warning('Student {}:  not allowed to login.'.format(uid))
            return False

        with self.db_session() as s:
            student =  s.query(Student).filter(Student.id == uid).one_or_none()

            if student is None:
                # not found
                logger.warning('Student {}:  not found in database.'.format(uid))
                return False

            if student.password == '':
                # update password on first login
                hashed_pw = bcrypt.hashpw(try_pw.encode('utf-8'), bcrypt.gensalt())
                student.password = hashed_pw
                s.commit()
                logger.warning('Student {}:  first login, password updated.'.format(uid))

            elif bcrypt.hashpw(try_pw.encode('utf-8'), student.password) != student.password:
                # wrong password
                logger.info('Student {}:  wrong password.'.format(uid))
                return False

            # success
            self.allowed.discard(uid)
            if uid in self.online:
                logger.warning('Student {}:  already logged in.'.format(uid))
            else:
                self.online[uid] = {'student': {'name': student.name, 'number': uid}}
                logger.info('Student {}:  logged in.'.format(uid))

        return True

    # -----------------------------------------------------------------------
    def logout(self, uid):
        if uid not in self.online:
            # this should never happen
            logger.error('Student {}:  tried to logout, but is not logged in.'.format(uid))
            return False
        else:
            logger.info('Student {}:  logged out.'.format(uid))
            del self.online[uid]  # FIXME Nao está a gravar o teste como desistencia...
            return True

    # -----------------------------------------------------------------------
    def generate_test(self, uid):
        if uid in self.online:
            logger.info('Student {}:  generating new test.'.format(uid))
            student_id = self.online[uid]['student']
            self.lock.acquire()  # FIXME is it needed?
            self.online[uid]['test'] = self.testfactory.generate(student_id)
            self.lock.release()
            return self.online[uid]['test']
        else:
            logger.error('Student {}:  offline, can''t generate test'.format(uid))
            return None

    # -----------------------------------------------------------------------
    def correct_test(self, uid, ans):
        t = self.online[uid]['test']
        t.update_answers(ans)
        grade = t.correct()

        # save test in JSON format
        fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json'
        fpath = path.abspath(path.join(t['answers_dir'], fname))
        t.save_json(fpath)

        # insert test and questions into database
        with self.db_session() as s:
            s.add(Test(
                ref=t['ref'],
                title=t['title'],
                grade=t['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                filename=fpath,
                student_id=t['student']['number'],
                state=t['state'],
                comment=''))
            s.add_all([Question(
                ref=q['ref'],
                grade=q['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                student_id=t['student']['number'],
                test_id=t['ref']) for q in t['questions'] if 'grade' in q])
            s.commit()

        logger.info('Student {0}:  finished test.'.format(uid))
        return grade

    # -----------------------------------------------------------------------
    def giveup_test(self, uid):
        t = self.online[uid]['test']
        grade = t.giveup()

        # save JSON with the test
        fname = ' -- '.join((t['student']['number'], t['ref'], str(t['finish_time']))) + '.json'
        fpath = path.abspath(path.join(t['answers_dir'], fname))
        t.save_json(fpath)

        # insert test into database
        with self.db_session() as s:
            s.add(Test(
                ref=t['ref'],
                title=t['title'],
                grade=t['grade'],
                starttime=str(t['start_time']),
                finishtime=str(t['finish_time']),
                filename=fpath,
                student_id=t['student']['number'],
                state=t['state'],
                comment=''))
            s.commit()

        logger.info('Student {0}:  gave up.'.format(uid))
        return t

    # -----------------------------------------------------------------------

    # --- helpers (getters)
    def get_student_name(self, uid):
        return self.online[uid]['student']['name']
    def get_test(self, uid, default=None):
        return self.online[uid].get('test', default)
    def get_questions_path(self):
        return self.testfactory['questions_dir']
    def get_test_qtypes(self, uid):
        return {q['ref']:q['type'] for q in self.online[uid]['test']['questions']}
    def get_student_grades_from_all_tests(self, uid):
        with self.db_session() as s:
            r = s.query(Test).filter_by(student_id=uid).order_by(Test.finishtime).all()
        return [(t.title, t.grade, t.finishtime) for t in r]
    def get_json_filename_of_test(self, test_id):
        with self.db_session() as s:
            return s.query(Test.filename).filter_by(id=test_id).one_or_none()[0]
    def get_online_students(self):
        # [('uid', 'name', 'starttime')]
        return [(k, v['student']['name'], str(v.get('test', {}).get('start_time', '---'))) for k,v in self.online.items() if k != '0']

    def get_offline_students(self):
        # list of ('uid', 'name') sorted by uid
        return [u[:2] for u in self.get_all_students() if u[0] not in self.online]

    def get_all_students(self):
        # list of all ('uid', 'name', 'password') sorted by uid
        with self.db_session() as s:
            r = s.query(Student).all()
        return sorted(((u.id, u.name, u.password) for u in r if u.id != '0'), key=lambda k: k[0])

    def get_student_grades_from_test(self, uid, testid):
        with self.db_session() as s:
            r = s.query(Test).filter_by(student_id=uid).filter_by(ref=testid).all()
        return [(u.grade, u.finishtime, u.id) for u in r]

    def get_students_state(self):
        # [{
        #    'uid'       : '12345'
        #    'name'      : 'John Smith',
        #    'start_time': '',
        #    'grades'    : [10.2, 13.1],
        #    ...
        # }]
        l = []
        for u in self.get_all_students():
            uid, name, pw = u
            l.append({
                'uid': uid,
                'name': name,
                'allowed': uid in self.allowed,
                'online': uid in self.online,
                'start_time': self.online.get(uid, {}).get('test', {}).get('start_time',''),
                'password_defined': pw != '',
                'grades': self.get_student_grades_from_test(uid, self.testfactory['ref']),
                'ip_address': self.online.get(uid, {}).get('student', {}).get('ip_address',''),
                'user_agent': self.online.get(uid, {}).get('student', {}).get('user_agent',''),
                'focus': self.online.get(uid, {}).get('student', {}).get('focus', True),
                })
        return l

    def get_allowed_students(self):
        # set of 'uid' allowed to login
        return self.allowed

    def get_file(self, uid, ref, key):
        # return filename corresponding to (uid, ref, name) if declared in the question
        t = self.get_test(uid)
        for q in t['questions']:
            if q['ref'] == ref and key in q['files']:
                return path.abspath(path.join(q['path'], q['files'][key]))

    # --- helpers (change state)
    def allow_student(self, uid):
        self.allowed.add(uid)
        logger.info('Student {}:  allowed to login.'.format(uid))

    def deny_student(self, uid):
        self.allowed.discard(uid)
        logger.info('Student {}:  denied to login'.format(uid))

    def reset_password(self, uid):
        with self.db_session() as s:
            u = s.query(Student).filter(Student.id == uid).update({'password': ''})
            s.commit()
        logger.info('Student {}:  password reset to ""'.format(uid))

    def set_user_agent(self, uid, user_agent=''):
        self.online[uid]['student']['user_agent'] = user_agent

    def set_user_ip(self, uid, ipaddress=''):
        self.online[uid]['student']['ip_address'] = ipaddress

    def insert_new_student(self, uid, name):
        try:
            with self.db_session() as s:
                s.add(Student(id=uid, name=name, password=''))
                s.commit()
        except Exception:
            logger.error('Insert failed: student {} already exists.'.format(uid))
        else:
            logger.info('New student inserted into database: {}, {}'.format(uid, name))

    def set_student_focus(self, uid, value):
        self.online[uid]['student']['focus'] = value