testfactory.py 14.6 KB
'''
TestFactory - generates tests for students
'''

# python standard library
from os import path
import random
import logging

# other libraries
import schema

# this project
from perguntations.questions import QFactory, QuestionException, QDict
from perguntations.test import Test
from perguntations.tools import load_yaml

# Logger configuration
logger = logging.getLogger(__name__)

# --- test validation --------------------------------------------------------
def check_answers_directory(ans: str) -> bool:
    '''Checks is answers_dir exists and is writable'''
    testfile = path.join(path.expanduser(ans), 'REMOVE-ME')
    try:
        with open(testfile, 'w', encoding='utf-8') as file:
            file.write('You can safely remove this file.')
    except OSError:
        return False
    return True

def check_import_files(files: list) -> bool:
    '''Checks if the question files exist'''
    if not files:
        return False
    for file in files:
        if not path.isfile(file):
            return False
    return True

def normalize_question_list(questions: list) -> None:
    '''convert question ref from string to list of string'''
    for question in questions:
        if isinstance(question['ref'], str):
            question['ref'] = [question['ref']]

test_schema = schema.Schema({
    'ref': schema.Regex('^[a-zA-Z0-9_-]+$'),
    'database': schema.And(str, path.isfile),
    'answers_dir': schema.And(str, check_answers_directory),
    'title': str,
    schema.Optional('duration'): int,
    schema.Optional('autosubmit'): bool,
    schema.Optional('autocorrect'): bool,
    schema.Optional('show_points'): bool,
    schema.Optional('scale'): schema.And([schema.Use(float)],
                                         lambda s: len(s) == 2),
    'files': schema.And([str], check_import_files),
    'questions': [{
        'ref': schema.Or(str, [str]),
        schema.Optional('points'): float
        }]
    }, ignore_extra_keys=True)

# ============================================================================
class TestFactoryException(Exception):
    '''exception raised in this module'''


# ============================================================================
class TestFactory(dict):
    '''
    Each instance of TestFactory() is a test generator.
    For example, if we want to serve two different tests, then we need two
    instances of TestFactory(), one for each test.
    '''

    # ------------------------------------------------------------------------
    def __init__(self, conf) -> None:
        '''
        Loads configuration from yaml file, then overrides some configurations
        using the conf argument.
        Base questions are added to a pool of questions factories.
        '''

        test_schema.validate(conf)

        # --- set test defaults and then use given configuration
        super().__init__({  # defaults
            'show_points': True,
            'scale': None,
            'duration': 0,  # 0=infinite
            'autosubmit': False,
            'autocorrect': True,
            })
        self.update(conf)
        normalize_question_list(self['questions'])

        # --- for review, we are done. no factories needed
        # if self['review']:  FIXME
        #     logger.info('Review mode. No questions loaded. No factories.')
        #     return

        # --- find refs of all questions used in the test
        qrefs = {r for qq in self['questions'] for r in qq['ref']}
        logger.info('Declared %d questions (each test uses %d).',
                    len(qrefs), len(self["questions"]))

        # --- load and build question factories
        self['question_factory'] = {}

        for file in self["files"]:
            fullpath = path.normpath(file)

            logger.info('Loading "%s"...', fullpath)
            questions = load_yaml(fullpath)   # , default=[])

            for i, question in enumerate(questions):
                # make sure every question in the file is a dictionary
                if not isinstance(question, dict):
                    msg = f'Question {i} in {file} is not a dictionary'
                    raise TestFactoryException(msg)

                # check if ref is missing, then set to '//file.yaml:3'
                if 'ref' not in question:
                    question['ref'] = f'{file}:{i:04}'
                    logger.warning('Missing ref set to "%s"', question["ref"])

                # check for duplicate refs
                qref = question['ref']
                if qref in self['question_factory']:
                    other = self['question_factory'][qref]
                    otherfile = path.join(other.question['path'],
                                          other.question['filename'])
                    msg = f'Duplicate "{qref}" in {otherfile} and {fullpath}'
                    raise TestFactoryException(msg)

                # make factory only for the questions used in the test
                if qref in qrefs:
                    question.update(zip(('path', 'filename', 'index'),
                                        path.split(fullpath) + (i,)))
                    self['question_factory'][qref] = QFactory(QDict(question))

        qmissing = qrefs.difference(set(self['question_factory'].keys()))
        if qmissing:
            raise TestFactoryException(f'Could not find questions {qmissing}.')

        self.check_questions()

        logger.info('Test factory ready. No errors found.')


    # ------------------------------------------------------------------------
    # def check_test_ref(self) -> None:
    #     '''Test must have a `ref`'''
    #     if 'ref' not in self:
    #         raise TestFactoryException('Missing "ref" in configuration!')
    #     if not re.match(r'^[a-zA-Z0-9_-]+$', self['ref']):
    #         raise TestFactoryException('Test "ref" can only contain the '
    #                                    'characters a-zA-Z0-9_-')

    # def check_missing_database(self) -> None:
    #     '''Test must have a database'''
    #     if 'database' not in self:
    #         raise TestFactoryException('Missing "database" in configuration')
    #     if not path.isfile(path.expanduser(self['database'])):
    #         msg = f'Database "{self["database"]}" not found!'
    #         raise TestFactoryException(msg)

    # def check_missing_answers_directory(self) -> None:
    #     '''Test must have a answers directory'''
    #     if 'answers_dir' not in self:
    #         msg = 'Missing "answers_dir" in configuration'
    #         raise TestFactoryException(msg)

    # def check_answers_directory_writable(self) -> None:
    #     '''Answers directory must be writable'''
    #     testfile = path.join(path.expanduser(self['answers_dir']), 'REMOVE-ME')
    #     try:
    #         with open(testfile, 'w', encoding='utf-8') as file:
    #             file.write('You can safely remove this file.')
    #     except OSError as exc:
    #         msg = f'Cannot write answers to directory "{self["answers_dir"]}"'
    #         raise TestFactoryException(msg) from exc

    # def check_questions_directory(self) -> None:
    #     '''Check if questions directory is missing or not accessible.'''
    #     if 'questions_dir' not in self:
    #         logger.warning('Missing "questions_dir". Using "%s"',
    #                        path.abspath(path.curdir))
    #         self['questions_dir'] = path.curdir
    #     elif not path.isdir(path.expanduser(self['questions_dir'])):
    #         raise TestFactoryException(f'Can\'t find questions directory '
    #                                    f'"{self["questions_dir"]}"')

    # def check_import_files(self) -> None:
    #     '''Check if there are files to import (with questions)'''
    #     if 'files' not in self:
    #         msg = ('Missing "files" in configuration with the list of '
    #                'question files to import!')
    #         raise TestFactoryException(msg)

        # if isinstance(self['files'], str):
        #     self['files'] = [self['files']]

    # def check_question_list(self) -> None:
    #     '''normalize question list'''
    #     if 'questions' not in self:
    #         raise TestFactoryException('Missing "questions" in configuration')

    #     for i, question in enumerate(self['questions']):
    #         # normalize question to a dict and ref to a list of references
    #         if isinstance(question, str):  # e.g.,    - some_ref
    #             logger.warning(f'Question "{question}" should be a dictionary')
    #             question = {'ref': [question]}    # becomes  - ref: [some_ref]
    #         elif isinstance(question, dict) and isinstance(question['ref'], str):
    #             question['ref'] = [question['ref']]
    #         elif isinstance(question, list):
    #             question = {'ref': [str(a) for a in question]}

    #         self['questions'][i] = question

    # def check_missing_title(self) -> None:
    #     '''Warns if title is missing'''
    #     if not self['title']:
    #         logger.warning('Title is undefined!')

    # def check_grade_scaling(self) -> None:
    #     '''Just informs the scale limits'''
    #     if 'scale_points' in self:
    #         msg = ('*** DEPRECATION WARNING: *** scale_points, scale_min, '
    #                'scale_max were replaced by "scale: [min, max]".')
    #         logger.warning(msg)
    #         self['scale'] = [self['scale_min'], self['scale_max']]


    # ------------------------------------------------------------------------
    # def sanity_checks(self) -> None:
    #     '''
    #     Checks for valid keys and sets default values.
    #     Also checks if some files and directories exist
    #     '''
        # self.check_test_ref()
        # self.check_missing_database()
        # self.check_missing_answers_directory()
        # self.check_answers_directory_writable()
        # self.check_questions_directory()
        # self.check_import_files()
        # self.check_question_list()
        # self.check_missing_title()
        # self.check_grade_scaling()

    # ------------------------------------------------------------------------
    def check_questions(self) -> None:
        '''
        checks if questions can be correctly generated and corrected
        '''
        logger.info('Checking if questions can be generated and corrected...')
        for i, (qref, qfact) in enumerate(self['question_factory'].items()):
            try:
                question = qfact.generate()
            except Exception as exc:
                msg = f'Failed to generate "{qref}"'
                raise TestFactoryException(msg) from exc
            else:
                logger.info('%4d.  %s:  Ok', i, qref)

            if question['type'] == 'textarea':
                _runtests_textarea(qref, question)
    # ------------------------------------------------------------------------
    async def generate(self):
        '''
        Given a dictionary with a student dict {'name':'john', 'number': 123}
        returns instance of Test() for that particular student
        '''

        # make list of questions
        questions = []
        qnum = 1  # track question number
        nerr = 0  # count errors during questions generation

        for qlist in self['questions']:
            # choose list of question variants
            choose = qlist.get('choose', 1)
            qrefs = random.sample(qlist['ref'], k=choose)

            for qref in qrefs:
                # generate instance of question
                try:
                    question = await self['question_factory'][qref].gen_async()
                except QuestionException:
                    logger.error('Can\'t generate question "%s". Skipping.', qref)
                    nerr += 1
                    continue

                # some defaults
                if question['type'] in ('information', 'success', 'warning',
                                        'alert'):
                    question['points'] = qlist.get('points', 0.0)
                else:
                    question['points'] = qlist.get('points', 1.0)
                    question['number'] = qnum  # counter for non informative panels
                    qnum += 1

                questions.append(question)

        # setup scale
        total_points = sum(q['points'] for q in questions)

        if total_points > 0:
            # normalize question points to scale
            if self['scale'] is not None:
                scale_min, scale_max = self['scale']
                for question in questions:
                    question['points'] *= (scale_max - scale_min) / total_points
            else:
                self['scale'] = [0, total_points]
        else:
            logger.warning('Total points is **ZERO**.')
            if self['scale'] is None:
                self['scale'] = [0, 20]  # default

        if nerr > 0:
            logger.error('%s errors found!', nerr)

        # copy these from the test configuratoin to each test instance
        inherit = ['ref', 'title', 'database', 'answers_dir', 'files', 'scale',
                   'duration', 'autosubmit', 'autocorrect', 'show_points']

        return Test({'questions': questions, **{k:self[k] for k in inherit}})

    # ------------------------------------------------------------------------
    def __repr__(self):
        testsettings = '\n'.join(f'  {k:14s}: {v}' for k, v in self.items())
        return 'TestFactory({\n' + testsettings + '\n})'

# ============================================================================
def _runtests_textarea(qref, question):
    '''
    Checks if correction script works and runs tests if available
    '''
    try:
        question.set_answer('')
        question.correct()
    except Exception as exc:
        msg = f'Failed to correct "{qref}"'
        raise TestFactoryException(msg) from exc
    logger.info('         correction works')

    for tnum, right_answer in enumerate(question.get('tests_right', {})):
        try:
            question.set_answer(right_answer)
            question.correct()
        except Exception as exc:
            msg = f'Failed to correct "{qref}"'
            raise TestFactoryException(msg) from exc

        if question['grade'] == 1.0:
            logger.info('         tests_right[%i] Ok', tnum)
        else:
            logger.error('         tests_right[%i] FAILED!!!', tnum)

    for tnum, wrong_answer in enumerate(question.get('tests_wrong', {})):
        try:
            question.set_answer(wrong_answer)
            question.correct()
        except Exception as exc:
            msg = f'Failed to correct "{qref}"'
            raise TestFactoryException(msg) from exc

        if question['grade'] < 1.0:
            logger.info('         tests_wrong[%i] Ok', tnum)
        else:
            logger.error('         tests_wrong[%i] FAILED!!!', tnum)