test.py 10.3 KB

# python standard library
from os import path, listdir
import sys, fnmatch
import random
from datetime import datetime
import json
import logging
import asyncio

# this project
import questionfactory as questions

# Logger configuration
logger = logging.getLogger(__name__)

# ===========================================================================
class TestFactoryException(Exception):
    pass

# ===========================================================================
# Each instance of TestFactory() is a test generator.
# For example, if we want to serve two different tests, then we need two
# instances of TestFactory(), one for each test.
# ===========================================================================
class TestFactory(dict):
    # -----------------------------------------------------------------------
    # loads configuration from yaml file, then updates (overriding)
    # some configurations using the conf argument.
    # base questions are loaded from files into a pool.
    # -----------------------------------------------------------------------
    def __init__(self, conf):
        super().__init__(conf)

        # set defaults and sanity checks
        self.sanity_checks()

        if conf['review']:
            logger.info('Review mode. No questions loaded.')
            return

        # loads yaml files to question_factory
        self.question_factory = questions.QuestionFactory()
        self.question_factory.load_files(files=self['files'], questions_dir=self['questions_dir'])

        # check if all questions exist ('ref' keys are correct?)
        logger.info('Checking questions for errors:')
        errors_found = []
        i = 0
        for q in self['questions']:
            for r in q['ref']:
                i += 1
                try:
                    self.question_factory.generate(r)
                except:
                    logger.error(f'Can\'t generate question "{r}".')
                    errors_found.append(r)
                else:
                    logger.info(f'{i:4}.  "{r}" Ok.')


        if errors_found:
            logger.critical(f'Found {len(errors_found)} error(s) while generating questions.')
            raise TestFactoryException()
        else:
            logger.info(f'No errors found. Test factory ready for "{self["ref"]}".')


    # -----------------------------------------------------------------------
    # Checks for valid keys and sets default values.
    # Also checks if some files and directories exist
    # -----------------------------------------------------------------------
    def sanity_checks(self):

        # --- database
        if 'database' not in self:
            logger.critical('Missing "database" in configuration.')
            raise TestFactoryException()
        elif not path.isfile(path.expanduser(self['database'])):
            logger.critical(f'Can\'t find database {self["database"]}.')
            raise TestFactoryException()

        # --- answers_dir
        if 'answers_dir' not in self:
            logger.critical('Missing "answers_dir".')
            raise TestFactoryException()
        try:  # check if answers_dir is a writable directory
            f = open(path.join(path.expanduser(self['answers_dir']),'REMOVE-ME'), 'w')
        except OSError:
            logger.critical(f'Cannot write answers to "{self["answers_dir"]}".')
            raise TestFactoryException()
        else:
            with f:
                f.write('You can safely remove this file.')

        # --- ref
        if 'ref' not in self:
            logger.warning('Missing "ref". Will use filename.')
            self['ref'] = self['filename']

        # --- questions_dir
        if 'questions_dir' not in self:
            logger.warning(f'Missing "questions_dir". Using {path.abspath(path.curdir)}')
            self['questions_dir'] = path.curdir
        elif not path.isdir(path.expanduser(self['questions_dir'])):
            logger.critical(f'Can\'t find questions directory "{self["questions_dir"]}"')
            raise TestFactoryException()

        # --- files
        if 'files' not in self:
            logger.warning('Missing "files" key. Loading all YAML files from "questions_dir"... DANGEROUS!!!')
            try:
                self['files'] = fnmatch.filter(listdir(self['questions_dir']), '*.yaml')
            except OSError:
                logger.critical('Couldn\'t get list of YAML question files.')
                raise TestFactoryException()
        if isinstance(self['files'], str):
                self['files'] = [self['files']]

        # --- questions
        if 'questions' not in self:
            logger.critical(f'Missing "questions" in {self["filename"]}.')
            raise TestFactoryException()

        # normalize questions to a list of dictionaries
        for i, q in enumerate(self['questions']):
            # normalize question to a dict and ref to a list of references
            if isinstance(q, str):
                q = {'ref': [q]}
            elif isinstance(q, dict) and isinstance(q['ref'], str):
                    q['ref'] = [q['ref']]

            self['questions'][i] = q


        # --- defaults for optional keys
        self.setdefault('title', '')
        # self.setdefault('show_hints', False) # FIXME not implemented yet
        self.setdefault('show_points', False)
        self.setdefault('scale_points', True)
        self.setdefault('scale_max', 20.0)
        self.setdefault('duration', 0)  # FIXME unused

        self.setdefault('debug', False)
        self.setdefault('show_ref', False)


    # -----------------------------------------------------------------------
    # Given a dictionary with a student id {'name':'john', 'number': 123}
    # returns instance of Test() for that particular student
    # -----------------------------------------------------------------------
    async def generate(self, student):
        test = []
        total_points = 0.0

        n = 1
        loop = asyncio.get_running_loop()

        for qq in self['questions']:
            # generate Question() selected randomly from list of references
            qref = random.choice(qq['ref'])

            try:
                q = await loop.run_in_executor(None, self.question_factory.generate, qref)
            except:
                logger.error(f'Can\'t generate question "{qref}". Skipping.')
                continue

            # some defaults
            if q['type'] in ('information', 'success', 'warning', 'alert'):
                q['points'] = qq.get('points', 0.0)
            else:
                q['points'] = qq.get('points', 1.0)
                q['number'] = n
                n += 1

            total_points += q['points']
            test.append(q)

        # normalize question points to scale
        if self['scale_points']:
            for q in test:
                q['points'] *= self['scale_max'] / total_points

        return Test({
            'ref': self['ref'],
            'title': self['title'],         # title of the test
            'student': student,             # student id
            'questions': test,              # list of questions
            'answers_dir': self['answers_dir'],

            # FIXME which ones are required?
            # 'show_hints': self['show_hints'],
            'show_points': self['show_points'],
            'show_ref': self['show_ref'],
            'debug': self['debug'],     # required by template test.html
            'database': self['database'],
            'questions_dir': self['questions_dir'],
            # 'files': self['files'],
        })

    # -----------------------------------------------------------------------
    # def __repr__(self):
    #     return '{\n' + '\n'.join('  {0:14s}: {1}'.format(k, v) for k,v in self.items()) + '\n}'


# ===========================================================================
# Each instance of the Test() class is a concrete test to be answered by
# a single student. It must/will contain at least these keys:
#   start_time, finish_time, questions, grade [0,20]
# Note: for the save_json() function other keys are required
# Note: grades are rounded to 1 decimal point: 0.0 - 20.0
# ===========================================================================
class Test(dict):
    # -----------------------------------------------------------------------
    def __init__(self, d):
        super().__init__(d)
        self['start_time'] = datetime.now()
        self['finish_time'] = None
        self['state'] = 'ONGOING'
        self['comment'] = ''
        logger.info(f'Student {self["student"]["number"]}:  new test.')

    # -----------------------------------------------------------------------
    # Removes all answers from the test (clean)
    # def reset_answers(self):
    #     for q in self['questions']:
    #         q['answer'] = None
    #     logger.info(f'Student {self["student"]["number"]}:  all answers cleared.')

    # -----------------------------------------------------------------------
    # Given a dictionary ans={index: 'some answer'} updates the
    # answers of the test. Only affects questions referred.
    def update_answers(self, ans):
        for i in ans:
            self['questions'][i]['answer'] = ans[i]
        logger.info(f'Student {self["student"]["number"]}:  {len(ans)} answers updated.')

    # -----------------------------------------------------------------------
    # Corrects all the answers and computes the final grade
    async def correct(self):
        self['finish_time'] = datetime.now()
        self['state'] = 'FINISHED'
        grade = 0.0
        for q in self['questions']:
            grade += await q.correct_async() * q['points']
        self['grade'] = max(0, round(grade, 1))  # avoid negative grade
        logger.info(f'Student {self["student"]["number"]}:  {self["grade"]} points.')
        return self['grade']

    # -----------------------------------------------------------------------
    def giveup(self):
        self['finish_time'] = datetime.now()
        self['state'] = 'QUIT'
        self['grade'] = 0.0
        logger.info(f'Student {self["student"]["number"]}:  gave up.')
        return self['grade']

    # -----------------------------------------------------------------------
    def save_json(self, filepath):
        with open(path.expanduser(filepath), 'w') as f:
            json.dump(self, f, indent=2, default=str) # default=str required for datetime objects
        logger.info(f'Student {self["student"]["number"]}:  saved JSON file.')