diff --git a/.gitignore b/.gitignore
index 58fb6d2..4a9c790 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
+# ignore virtual environment
+.venv
+
__pycache__/
.DS_Store
demo/ans
diff --git a/BUGS.md b/BUGS.md
index 9121d45..2297613 100644
--- a/BUGS.md
+++ b/BUGS.md
@@ -5,11 +5,14 @@
- No python3.12 aparentemente nao se pode instalar com `pip --user`
- em caso de timeout na submissão (e.g. JOBE ou script nao responde) a correcção
não termina e o teste não é guardado.
+- scripts de correccao devem dar resultado em JSON e nao YAML (JSON para
+ computer generated, YAML para human generated). Isto evita que o output
+ gerado se possa confundir com a syntax yaml. Usar modulo JSON.
- modo --review nao implementado em testfactory.py
- talvez a base de dados devesse ter como chave do teste um id que fosse único
desse teste particular (não um auto counter, nem ref do teste)
-- em admin, quando scale_max não é 20, as cores das barras continuam a reflectir
- a escala 0,20. a tabela teste na DB não tem a escala desse teste.
+- em admin, quando scale_max não é 20, as cores das barras continuam a
+ reflectir a escala 0,20. a tabela teste na DB não tem a escala desse teste.
- a revisao do teste não mostra as imagens que nao estejam ja em cache.
- reload do teste recomeça a contagem no inicio do tempo.
- mensagems de erro do assembler aparecem na mesma linha na correcao e nao
@@ -18,9 +21,8 @@
## TODO
-- update datatables para 1.11
+- update datatables para 2
- update codemirror para 6.0
-- update mathjax para 3.2
- assinalar a vermelho os alunos que excederam o tempo.
- pagina de login semelhante ao aprendizations
- QuestionTextArea falta reportar nos comments os vários erros que podem ocorrer
diff --git a/README.md b/README.md
index aa9d51f..5c61899 100644
--- a/README.md
+++ b/README.md
@@ -11,14 +11,14 @@
## Requirements
-The webserver is a python application that requires `>=python3.8` and the
+The web server is a python application that requires `>=python3.11` and the
package installer for python `pip`. The node package management `npm` is also
-necessary in order to install the javascript libraries.
+necessary in order to install javascript libraries.
```sh
sudo apt install python3 python3-pip npm # Ubuntu
-sudo pkg install python38 py38-sqlite3 py38-pip npm # FreeBSD
-sudo port install python38 py38-pip py38-setuptools npm6 # MacOS (macports)
+sudo pkg install python py311-sqlite3 py311-pip npm # FreeBSD
+sudo port install python312 py312-pip py312-setuptools npm10 # MacOS (macports)
```
To make the `pip` install packages to a local directory, the file `pip.conf`
diff --git a/perguntations/TODO b/perguntations/TODO
new file mode 100644
index 0000000..f100c9c
--- /dev/null
+++ b/perguntations/TODO
@@ -0,0 +1,17 @@
+# TODO
+
+- opcao correct de testes que nao foram corrigidos automaticamente.
+- tests should store identification of the perguntations version.
+- detailed CSV devia ter as refs das perguntas e so preencher as que o aluno respondeu.
+- long pooling admin
+- student sends updated answers
+- questions with parts
+
+
+---- Estados -----
+
+inicialização (sincrona)
+ test factory
+ database setup: define lista de estudantes com teste=None
+ login admin:
+ gera todos os testes para os alunos na lista
diff --git a/perguntations/__init__.py b/perguntations/__init__.py
index da43e6f..d07b284 100644
--- a/perguntations/__init__.py
+++ b/perguntations/__init__.py
@@ -32,10 +32,10 @@ proof of submission and for review.
'''
APP_NAME = 'perguntations'
-APP_VERSION = '2022.04.dev1'
+APP_VERSION = '2024.07.dev1'
APP_DESCRIPTION = str(__doc__)
__author__ = 'Miguel Barão'
-__copyright__ = 'Copyright 2022, Miguel Barão'
+__copyright__ = 'Copyright 2024, Miguel Barão'
__license__ = 'MIT license'
__version__ = APP_VERSION
diff --git a/perguntations/app.py b/perguntations/app.py
index aba76b8..9263caf 100644
--- a/perguntations/app.py
+++ b/perguntations/app.py
@@ -1,7 +1,7 @@
-'''
+"""
File: perguntations/app.py
Description: Main application logic.
-'''
+"""
# python standard libraries
@@ -14,7 +14,7 @@ import os
from typing import Optional
# installed packages
-import bcrypt
+import argon2
from sqlalchemy import create_engine, select
from sqlalchemy.exc import OperationalError, NoResultFound, IntegrityError
from sqlalchemy.orm import Session
@@ -30,68 +30,75 @@ from .questions import question_from
# setup logger for this module
logger = logging.getLogger(__name__)
-async def check_password(password: str, hashed: bytes) -> bool:
- '''check password in executor'''
- loop = asyncio.get_running_loop()
- return await loop.run_in_executor(None, bcrypt.checkpw,
- password.encode('utf-8'), hashed)
+# ============================================================================
+ph = argon2.PasswordHasher()
-async def hash_password(password: str) -> bytes:
- '''get hash for password'''
+async def check_password(password: str, hashed: str) -> bool:
+ """check password in executor"""
+ loop = asyncio.get_running_loop()
+ try:
+ return await loop.run_in_executor(None, ph.verify, hashed, password)
+ except argon2.exceptions.VerifyMismatchError:
+ return False
+ except Exception:
+ logger.error('Unkown error while verifying password')
+ return False
+
+async def hash_password(password: str) -> str:
+ """get hash for password"""
loop = asyncio.get_running_loop()
- return await loop.run_in_executor(None, bcrypt.hashpw,
- password.encode('utf-8'), bcrypt.gensalt())
+ return await loop.run_in_executor(None, ph.hash, password)
+
# ============================================================================
class AppException(Exception):
- '''Exception raised in this module'''
+ """Exception raised in this module"""
+
# ============================================================================
# main application
# ============================================================================
-class App():
- '''
+class App:
+ """
Main application
- '''
+ """
# ------------------------------------------------------------------------
def __init__(self, config):
- self.debug = config['debug']
- self._make_test_factory(config['testfile'])
+ self.debug = config["debug"]
+ self._make_test_factory(config["testfile"])
self._db_setup() # setup engine and load all students
- # FIXME: get_event_loop will be deprecated in python3.10
- asyncio.get_event_loop().run_until_complete(self._assign_tests())
+ asyncio.run(self._assign_tests())
# command line options: --allow-all, --allow-list filename
- if config['allow_all']:
+ if config["allow_all"]:
self.allow_all_students()
- elif config['allow_list'] is not None:
- self.allow_from_list(config['allow_list'])
+ elif config["allow_list"] is not None:
+ self.allow_from_list(config["allow_list"])
else:
- logger.info('Students not allowed to login')
+ logger.info("Students not allowed to login")
- if config['correct']:
+ if config["correct"]:
self._correct_tests()
# ------------------------------------------------------------------------
def _db_setup(self) -> None:
- '''
+ """
Create database engine and checks for admin and students
- '''
- dbfile = os.path.expanduser(self._testfactory['database'])
+ """
+ dbfile = os.path.expanduser(self._testfactory["database"])
logger.debug('Checking database "%s"...', dbfile)
if not os.path.exists(dbfile):
raise AppException('No database, use "initdb" to create')
# connect to database and check for admin & registered students
- self._engine = create_engine(f'sqlite:///{dbfile}', future=True)
+ self._engine = create_engine(f"sqlite:///{dbfile}")
try:
- with Session(self._engine, future=True) as session:
- query = select(Student.id, Student.name)\
- .where(Student.id != '0')
+ with Session(self._engine) as session:
+ query = select(Student.id, Student.name).where(Student.id != "0")
dbstudents = session.execute(query).all()
- session.execute(select(Student).where(Student.id == '0')).one()
+ session.execute(select(Student).where(Student.id == "0")).one()
except NoResultFound:
msg = 'Database has no administrator (user "0")'
logger.error(msg)
@@ -100,183 +107,189 @@ class App():
msg = f'Database "{dbfile}" unusable'
logger.error(msg)
raise AppException(msg) from None
- logger.info('Database has %d students', len(dbstudents))
+ logger.info("Database has %d students", len(dbstudents))
- self._students = {uid: {
- 'name': name,
- 'state': 'offline',
- 'test': None,
- } for uid, name in dbstudents}
+ self._students = {
+ uid: {
+ "name": name,
+ "state": "offline",
+ "test": None,
+ }
+ for uid, name in dbstudents
+ }
# ------------------------------------------------------------------------
async def _assign_tests(self) -> None:
- '''Generate tests for all students that don't yet have a test'''
- logger.info('Generating tests...')
+ """Generate tests for all students that don't yet have a test"""
+ logger.info("Generating tests...")
for student in self._students.values():
- if student.get('test', None) is None:
- student['test'] = await self._testfactory.generate()
- logger.info('Tests assigned to all students')
+ if student.get("test", None) is None:
+ student["test"] = await self._testfactory.generate()
+ logger.info("Tests assigned to all students")
# ------------------------------------------------------------------------
async def login(self, uid: str, password: str, headers: dict) -> Optional[str]:
- '''
+ """
Login authentication
If successful returns None, else returns an error message
- '''
+ """
try:
- with Session(self._engine, future=True) as session:
+ with Session(self._engine) as session:
query = select(Student.password).where(Student.id == uid)
hashed = session.execute(query).scalar_one()
except NoResultFound:
logger.warning('"%s" does not exist', uid)
- return 'nonexistent'
+ return "nonexistent"
- if uid != '0' and self._students[uid]['state'] != 'allowed':
+ if uid != "0" and self._students[uid]["state"] != "allowed":
logger.warning('"%s" login not allowed', uid)
- return 'not_allowed'
+ return "not_allowed"
- if hashed == '': # set password on first login
+ if hashed == "": # set password on first login
await self.set_password(uid, password)
elif not await check_password(password, hashed):
logger.info('"%s" wrong password', uid)
- return 'wrong_password'
+ return "wrong_password"
# success
- if uid == '0':
- logger.info('Admin login from %s', headers['remote_ip'])
+ if uid == "0":
+ logger.info("Admin login from %s", headers["remote_ip"])
else:
student = self._students[uid]
- student['test'].start(uid)
- student['state'] = 'online'
- student['headers'] = headers
- student['unfocus'] = False
- student['area'] = 0.0
- logger.info('"%s" login from %s', uid, headers['remote_ip'])
+ student["test"].start(uid)
+ student["state"] = "online"
+ student["headers"] = headers
+ student["unfocus"] = False
+ student["area"] = 0.0
+ logger.info('"%s" login from %s', uid, headers["remote_ip"])
return None
# ------------------------------------------------------------------------
async def set_password(self, uid: str, password: str) -> None:
- '''change password in the database'''
- with Session(self._engine, future=True) as session:
+ """change password in the database"""
+ with Session(self._engine) as session:
query = select(Student).where(Student.id == uid)
student = session.execute(query).scalar_one()
- student.password = await hash_password(password) if password else ''
+ student.password = await hash_password(password) if password else ""
session.commit()
logger.info('"%s" password updated', uid)
# ------------------------------------------------------------------------
def logout(self, uid: str) -> None:
- '''student logout'''
+ """student logout"""
student = self._students.get(uid, None)
if student is not None:
# student['test'] = None
- student['state'] = 'offline'
- student.pop('headers', None)
- student.pop('unfocus', None)
- student.pop('area', None)
+ student["state"] = "offline"
+ student.pop("headers", None)
+ student.pop("unfocus", None)
+ student.pop("area", None)
logger.info('"%s" logged out', uid)
# ------------------------------------------------------------------------
def _make_test_factory(self, filename: str) -> None:
- '''
+ """
Setup a factory for the test
- '''
+ """
# load configuration from yaml file
try:
testconf = load_yaml(filename)
- testconf['testfile'] = filename
+ testconf["testfile"] = filename
except (OSError, yaml.YAMLError) as exc:
msg = f'Cannot read test configuration "{filename}"'
logger.error(msg)
raise AppException(msg) from exc
# make test factory
- logger.info('Running test factory...')
+ logger.info("Running test factory...")
try:
self._testfactory = TestFactory(testconf)
except TestFactoryException as exc:
logger.error(exc)
- raise AppException('Failed to create test factory!') from exc
+ raise AppException("Failed to create test factory!") from exc
# ------------------------------------------------------------------------
async def submit_test(self, uid, ans) -> None:
- '''
+ """
Handles test submission and correction.
ans is a dictionary {question_index: answer, ...} with the answers for
the complete test. For example: {0:'hello', 1:[1,2]}
- '''
- if self._students[uid]['state'] != 'online':
+ """
+ if self._students[uid]["state"] != "online":
logger.warning('"%s" INVALID SUBMISSION! STUDENT NOT ONLINE', uid)
return
# --- submit answers and correct test
logger.info('"%s" submitted %d answers', uid, len(ans))
- test = self._students[uid]['test']
+ test = self._students[uid]["test"]
test.submit(ans)
- if test['autocorrect']:
+ if test["autocorrect"]:
await test.correct_async()
- logger.info('"%s" grade = %g points', uid, test['grade'])
+ logger.info('"%s" grade = %g points', uid, test["grade"])
# --- save test in JSON format
fname = f'{uid}--{test["ref"]}--{test["finish_time"]}.json'
- fpath = os.path.join(test['answers_dir'], fname)
+ fpath = os.path.join(test["answers_dir"], fname)
test.save_json(fpath)
logger.info('"%s" saved JSON', uid)
# --- insert test and questions into the database
# only corrected questions are added
test_row = Test(
- ref=test['ref'],
- title=test['title'],
- grade=test['grade'],
- state=test['state'],
- comment=test['comment'],
- starttime=str(test['start_time']),
- finishtime=str(test['finish_time']),
+ ref=test["ref"],
+ title=test["title"],
+ grade=test["grade"],
+ state=test["state"],
+ comment=test["comment"],
+ starttime=str(test["start_time"]),
+ finishtime=str(test["finish_time"]),
filename=fpath,
- student_id=uid)
+ student_id=uid,
+ )
- if test['state'] == 'CORRECTED':
+ if test["state"] == "CORRECTED":
test_row.questions = [
Question(
number=n,
- ref=q['ref'],
- grade=q['grade'],
- comment=q.get('comment', ''),
- starttime=str(test['start_time']),
- finishtime=str(test['finish_time']),
- test_id=test['ref']
- )
- for n, q in enumerate(test['questions'])
- ]
-
- with Session(self._engine, future=True) as session:
+ ref=q["ref"],
+ grade=q["grade"],
+ comment=q.get("comment", ""),
+ starttime=str(test["start_time"]),
+ finishtime=str(test["finish_time"]),
+ test_id=test["ref"],
+ )
+ for n, q in enumerate(test["questions"])
+ ]
+
+ with Session(self._engine) as session:
session.add(test_row)
session.commit()
logger.info('"%s" database updated', uid)
# ------------------------------------------------------------------------
def _correct_tests(self) -> None:
- with Session(self._engine, future=True) as session:
+ with Session(self._engine) as session:
# Find which tests have to be corrected
- query = select(Test) \
- .where(Test.ref == self._testfactory['ref']) \
- .where(Test.state == "SUBMITTED")
+ query = (
+ select(Test)
+ .where(Test.ref == self._testfactory["ref"])
+ .where(Test.state == "SUBMITTED")
+ )
dbtests = session.execute(query).scalars().all()
if not dbtests:
- logger.info('No tests to correct')
+ logger.info("No tests to correct")
return
- logger.info('Correcting %d tests...', len(dbtests))
+ logger.info("Correcting %d tests...", len(dbtests))
for dbtest in dbtests:
try:
with open(dbtest.filename) as file:
testdict = json.load(file)
except OSError:
- logger.error('Failed: %s', dbtest.filename)
+ logger.error("Failed: %s", dbtest.filename)
continue
# creates a class Test with the methods to correct it
@@ -284,31 +297,32 @@ class App():
# question_from() to produce Question() instances that can be
# corrected. Finally the test can be corrected.
test = TestInstance(testdict)
- test['questions'] = [question_from(q) for q in test['questions']]
+ test["questions"] = [question_from(q) for q in test["questions"]]
test.correct()
- logger.info(' %s: %f', test['student'], test['grade'])
+ logger.info(" %s: %f", test["student"], test["grade"])
# save JSON file (overwriting the old one)
- uid = test['student']
+ uid = test["student"]
test.save_json(dbtest.filename)
- logger.debug('%s saved JSON file', uid)
+ logger.debug("%s saved JSON file", uid)
# update database
- dbtest.grade = test['grade']
- dbtest.state = test['state']
+ dbtest.grade = test["grade"]
+ dbtest.state = test["state"]
dbtest.questions = [
Question(
number=n,
- ref=q['ref'],
- grade=q['grade'],
- comment=q.get('comment', ''),
- starttime=str(test['start_time']),
- finishtime=str(test['finish_time']),
- test_id=test['ref']
- ) for n, q in enumerate(test['questions'])
- ]
+ ref=q["ref"],
+ grade=q["grade"],
+ comment=q.get("comment", ""),
+ starttime=str(test["start_time"]),
+ finishtime=str(test["finish_time"]),
+ test_id=test["ref"],
+ )
+ for n, q in enumerate(test["questions"])
+ ]
session.commit()
- logger.info('Database updated')
+ logger.info("Database updated")
# ------------------------------------------------------------------------
# def giveup_test(self, uid):
@@ -340,176 +354,196 @@ class App():
# ------------------------------------------------------------------------
def register_event(self, uid, cmd, value):
- '''handles browser events the occur during the test'''
- if cmd == 'focus':
+ """handles browser events the occur during the test"""
+ if cmd == "focus":
if value:
self._focus_student(uid)
else:
self._unfocus_student(uid)
- elif cmd == 'size':
+ elif cmd == "size":
self._set_screen_area(uid, value)
+ elif cmd == "update_answer":
+ print(cmd, value) # FIXME:
+ elif cmd == "lint":
+ print(cmd, value) # FIXME:
# ========================================================================
# GETTERS
# ========================================================================
def get_test(self, uid: str) -> Optional[dict]:
- '''return student test'''
- return self._students[uid]['test']
+ """return student test"""
+ return self._students[uid]["test"]
# ------------------------------------------------------------------------
def get_name(self, uid: str) -> str:
- '''return name of student'''
- return self._students[uid]['name']
+ """return name of student"""
+ return self._students[uid]["name"]
# ------------------------------------------------------------------------
def get_test_config(self) -> dict:
- '''return brief test configuration to use as header in /admin'''
- return {'title': self._testfactory['title'],
- 'ref': self._testfactory['ref'],
- 'filename': self._testfactory['testfile'],
- 'database': self._testfactory['database'],
- 'answers_dir': self._testfactory['answers_dir']
- }
+ """return brief test configuration to use as header in /admin"""
+ return {
+ "title": self._testfactory["title"],
+ "ref": self._testfactory["ref"],
+ "filename": self._testfactory["testfile"],
+ "database": self._testfactory["database"],
+ "answers_dir": self._testfactory["answers_dir"],
+ }
# ------------------------------------------------------------------------
def get_grades_csv(self):
- '''generates a CSV with the grades of the test currently running'''
- test_ref = self._testfactory['ref']
- with Session(self._engine, future=True) as session:
- query = select(Test.student_id, Test.grade,
- Test.starttime, Test.finishtime)\
- .where(Test.ref == test_ref)\
- .order_by(Test.student_id)
+ """generates a CSV with the grades of the test currently running"""
+ test_ref = self._testfactory["ref"]
+ with Session(self._engine) as session:
+ query = (
+ select(Test.student_id, Test.grade, Test.starttime, Test.finishtime)
+ .where(Test.ref == test_ref)
+ .order_by(Test.student_id)
+ )
tests = session.execute(query).all()
if not tests:
- logger.warning('Empty CSV: there are no tests!')
- return test_ref, ''
+ logger.warning("Empty CSV: there are no tests!")
+ return test_ref, ""
csvstr = io.StringIO()
- writer = csv.writer(csvstr, delimiter=';', quoting=csv.QUOTE_ALL)
- writer.writerow(('Aluno', 'Nota', 'Início', 'Fim'))
+ writer = csv.writer(csvstr, delimiter=";", quoting=csv.QUOTE_ALL)
+ writer.writerow(("Aluno", "Nota", "Início", "Fim"))
writer.writerows(tests)
return test_ref, csvstr.getvalue()
# ------------------------------------------------------------------------
def get_detailed_grades_csv(self):
- '''generates a CSV with the grades of the test'''
- test_ref = self._testfactory['ref']
- with Session(self._engine, future=True) as session:
- query = select(Test.id, Test.student_id, Test.starttime,
- Question.number, Question.grade)\
- .join(Question)\
- .where(Test.ref == test_ref)
+ """generates a CSV with the grades of the test"""
+ test_ref = self._testfactory["ref"]
+ with Session(self._engine) as session:
+ query = (
+ select(
+ Test.id,
+ Test.student_id,
+ Test.starttime,
+ Question.number,
+ Question.grade,
+ )
+ .join(Question)
+ .where(Test.ref == test_ref)
+ )
questions = session.execute(query).all()
- cols = ['Aluno', 'Início']
- tests = {} # {test_id: {student_id, starttime, 0: grade, ...}}
+ cols = ["Aluno", "Início"]
+ tests = {} # {test_id: {student_id, starttime, 0: grade, ...}}
for test_id, student_id, starttime, num, grade in questions:
- default_test_id = {'Aluno': student_id, 'Início': starttime}
+ default_test_id = {"Aluno": student_id, "Início": starttime}
tests.setdefault(test_id, default_test_id)[num] = grade
if num not in cols:
cols.append(num)
if not tests:
- logger.warning('Empty CSV: there are no tests!')
- return test_ref, ''
+ logger.warning("Empty CSV: there are no tests!")
+ return test_ref, ""
csvstr = io.StringIO()
- writer = csv.DictWriter(csvstr, fieldnames=cols, restval=None,
- delimiter=';', quoting=csv.QUOTE_ALL)
+ writer = csv.DictWriter(
+ csvstr, fieldnames=cols, restval=None, delimiter=";", quoting=csv.QUOTE_ALL
+ )
writer.writeheader()
writer.writerows(tests.values())
return test_ref, csvstr.getvalue()
# ------------------------------------------------------------------------
def get_json_filename_of_test(self, test_id):
- '''get JSON filename from database given the test_id'''
- with Session(self._engine, future=True) as session:
+ """get JSON filename from database given the test_id"""
+ with Session(self._engine) as session:
query = select(Test.filename).where(Test.id == test_id)
return session.execute(query).scalar()
# ------------------------------------------------------------------------
def get_grades(self, uid, ref):
- '''get grades of student for a given testid'''
- with Session(self._engine, future=True) as session:
- query = select(Test.grade, Test.finishtime, Test.id)\
- .where(Test.student_id == uid)\
- .where(Test.ref == ref)
+ """get grades of student for a given testid"""
+ with Session(self._engine) as session:
+ query = (
+ select(Test.grade, Test.finishtime, Test.id)
+ .where(Test.student_id == uid)
+ .where(Test.ref == ref)
+ )
grades = session.execute(query).all()
return [tuple(grade) for grade in grades]
# ------------------------------------------------------------------------
def get_students_state(self) -> list:
- '''get list of states of every student to show in /admin page'''
- return [{ 'uid': uid,
- 'name': student['name'],
- 'allowed': student['state'] == 'allowed',
- 'online': student['state'] == 'online',
- 'start_time': student.get('test', {}).get('start_time', ''),
- 'unfocus': student.get('unfocus', False),
- 'area': student.get('area', 1.0),
- 'grades': self.get_grades(uid, self._testfactory['ref']) }
- for uid, student in self._students.items()]
+ """get list of states of every student to show in /admin page"""
+ return [
+ {
+ "uid": uid,
+ "name": student["name"],
+ "allowed": student["state"] == "allowed",
+ "online": student["state"] == "online",
+ "start_time": student.get("test", {}).get("start_time", ""),
+ "unfocus": student.get("unfocus", False),
+ "area": student.get("area", 1.0),
+ "grades": self.get_grades(uid, self._testfactory["ref"]),
+ }
+ for uid, student in self._students.items()
+ ]
# ========================================================================
# SETTERS
# ========================================================================
def allow_student(self, uid: str) -> None:
- '''allow a single student to login'''
- self._students[uid]['state'] = 'allowed'
+ """allow a single student to login"""
+ self._students[uid]["state"] = "allowed"
logger.info('"%s" allowed to login', uid)
# ------------------------------------------------------------------------
def deny_student(self, uid: str) -> None:
- '''deny a single student to login'''
+ """deny a single student to login"""
student = self._students[uid]
- if student['state'] == 'allowed':
- student['state'] = 'offline'
+ if student["state"] == "allowed":
+ student["state"] = "offline"
logger.info('"%s" denied to login', uid)
# ------------------------------------------------------------------------
def allow_all_students(self) -> None:
- '''allow all students to login'''
+ """allow all students to login"""
for student in self._students.values():
- student['state'] = 'allowed'
- logger.info('Allowed %d students', len(self._students))
+ student["state"] = "allowed"
+ logger.info("Allowed %d students", len(self._students))
# ------------------------------------------------------------------------
def deny_all_students(self) -> None:
- '''deny all students to login'''
- logger.info('Denying all students...')
+ """deny all students to login"""
+ logger.info("Denying all students...")
for student in self._students.values():
- if student['state'] == 'allowed':
- student['state'] = 'offline'
+ if student["state"] == "allowed":
+ student["state"] = "offline"
# ------------------------------------------------------------------------
async def insert_new_student(self, uid: str, name: str) -> None:
- '''insert new student into the database'''
- with Session(self._engine, future=True) as session:
+ """insert new student into the database"""
+ with Session(self._engine) as session:
try:
- session.add(Student(id=uid, name=name, password=''))
+ session.add(Student(id=uid, name=name, password=""))
session.commit()
except IntegrityError:
logger.warning('"%s" already exists!', uid)
session.rollback()
return
- logger.info('New student added: %s %s', uid, name)
+ logger.info("New student added: %s %s", uid, name)
self._students[uid] = {
- 'name': name,
- 'state': 'offline',
- 'test': await self._testfactory.generate(),
- }
+ "name": name,
+ "state": "offline",
+ "test": await self._testfactory.generate(),
+ }
# ------------------------------------------------------------------------
def allow_from_list(self, filename: str) -> None:
- '''allow students listed in text file (one number per line)'''
+ """allow students listed in text file (one number per line)"""
# parse list of students to allow (one number per line)
try:
- with open(filename, 'r', encoding='utf-8') as file:
+ with open(filename, "r", encoding="utf-8") as file:
allowed = {line.strip() for line in file}
- allowed.discard('')
+ allowed.discard("")
except OSError as exc:
- error_msg = f'Cannot read file {filename}'
+ error_msg = f"Cannot read file {filename}"
logger.critical(error_msg)
raise AppException(error_msg) from exc
@@ -522,27 +556,34 @@ class App():
logger.warning('Allowed student "%s" does not exist!', uid)
missing += 1
- logger.info('Allowed %d students', len(allowed)-missing)
+ logger.info("Allowed %d students", len(allowed) - missing)
if missing:
- logger.warning(' %d missing!', missing)
+ logger.warning(" %d missing!", missing)
# ------------------------------------------------------------------------
def _focus_student(self, uid):
- '''set student in focus state'''
- self._students[uid]['unfocus'] = False
+ """set student in focus state"""
+ self._students[uid]["unfocus"] = False
logger.info('"%s" focus', uid)
# ------------------------------------------------------------------------
def _unfocus_student(self, uid):
- '''set student in unfocus state'''
- self._students[uid]['unfocus'] = True
+ """set student in unfocus state"""
+ self._students[uid]["unfocus"] = True
logger.info('"%s" unfocus', uid)
# ------------------------------------------------------------------------
def _set_screen_area(self, uid, sizes):
- '''set current browser area as detected in resize event'''
+ """set current browser area as detected in resize event"""
scr_y, scr_x, win_y, win_x = sizes
area = win_x * win_y / (scr_x * scr_y) * 100
- self._students[uid]['area'] = area
- logger.info('"%s" area=%g%%, window=%dx%d, screen=%dx%d',
- uid, area, win_x, win_y, scr_x, scr_y)
+ self._students[uid]["area"] = area
+ logger.info(
+ '"%s" area=%g%%, window=%dx%d, screen=%dx%d',
+ uid,
+ area,
+ win_x,
+ win_y,
+ scr_x,
+ scr_y,
+ )
diff --git a/perguntations/initdb.py b/perguntations/initdb.py
index b355076..19d40ec 100644
--- a/perguntations/initdb.py
+++ b/perguntations/initdb.py
@@ -1,18 +1,17 @@
#!/usr/bin/env python3
-'''
+"""
Commandline utility to initialize and update student database
-'''
+"""
# base
import csv
import argparse
import re
from string import capwords
-from concurrent.futures import ThreadPoolExecutor
# installed packages
-import bcrypt
+from argon2 import PasswordHasher
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
@@ -23,77 +22,84 @@ from .models import Base, Student
# ============================================================================
def parse_commandline_arguments():
- '''Parse command line options'''
+ """Parse command line options"""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- description='Insert new users into a database. Users can be imported '
- 'from CSV files in the SIIUE format or defined in the '
- 'command line. If the database does not exist, a new one '
- 'is created.')
-
- parser.add_argument('csvfile',
- nargs='*',
- type=str,
- default='',
- help='CSV file to import (SIIUE)')
-
- parser.add_argument('--db',
- default='students.db',
- type=str,
- help='database file')
-
- parser.add_argument('-A', '--admin',
- action='store_true',
- help='insert admin user 0 "Admin"')
-
- parser.add_argument('-a', '--add',
- nargs=2,
- action='append',
- metavar=('uid', 'name'),
- help='add new user id and name')
-
- parser.add_argument('-u', '--update',
- nargs='+',
- metavar='uid',
- default=[],
- help='list of users whose password is to be updated')
-
- parser.add_argument('-U', '--update-all',
- action='store_true',
- help='all except admin will have the password updated')
-
- parser.add_argument('--pw',
- default=None,
- type=str,
- help='password for new or updated users')
-
- parser.add_argument('-V', '--verbose',
- action='store_true',
- help='show all students in database')
+ description="Insert new users into a database. Users can be imported "
+ "from CSV files in the SIIUE format or defined in the "
+ "command line. If the database does not exist, a new one "
+ "is created.",
+ )
+
+ parser.add_argument(
+ "csvfile", nargs="*", type=str, default="", help="CSV file to import (SIIUE)"
+ )
+
+ parser.add_argument("--db", default="students.db", type=str, help="database file")
+
+ parser.add_argument(
+ "-A", "--admin", action="store_true", help='insert admin user 0 "Admin"'
+ )
+
+ parser.add_argument(
+ "-a",
+ "--add",
+ nargs=2,
+ action="append",
+ metavar=("uid", "name"),
+ help="add new user id and name",
+ )
+
+ parser.add_argument(
+ "-u",
+ "--update",
+ nargs="+",
+ metavar="uid",
+ default=[],
+ help="list of users whose password is to be updated",
+ )
+
+ parser.add_argument(
+ "-U",
+ "--update-all",
+ action="store_true",
+ help="all except admin will have the password updated",
+ )
+
+ parser.add_argument(
+ "--pw", default=None, type=str, help="password for new or updated users"
+ )
+
+ parser.add_argument(
+ "-V", "--verbose", action="store_true", help="show all students in database"
+ )
return parser.parse_args()
# ============================================================================
-def get_students_from_csv(filename):
- '''
+def get_students_from_csv(filename: str):
+ """
SIIUE names have alien strings like "(TE)" and are sometimes capitalized
We remove them so that students dont keep asking what it means
- '''
+ """
csv_settings = {
- 'delimiter': ';',
- 'quotechar': '"',
- 'skipinitialspace': True,
- }
+ "delimiter": ";",
+ "quotechar": '"',
+ "skipinitialspace": True,
+ }
try:
- with open(filename, encoding='iso-8859-1') as file:
+ with open(filename, encoding="iso-8859-1") as file:
csvreader = csv.DictReader(file, **csv_settings)
- students = [{
- 'uid': s['N.º'],
- 'name': capwords(re.sub(r'\(.*\)', '', s['Nome']).strip())
- } for s in csvreader]
+ students = [
+ {
+ "uid": s["N.º"],
+ "name": capwords(re.sub(r"\(.*\)", "", s["Nome"]).strip()),
+ }
+ for s in csvreader
+ ]
except OSError:
print(f'!!! Error reading file "{filename}" !!!')
students = []
@@ -104,123 +110,123 @@ def get_students_from_csv(filename):
return students
-# ============================================================================
-def hashpw(student, password=None) -> None:
- '''replace password by hash for a single student'''
- print('.', end='', flush=True)
- if password is None:
- student['pw'] = ''
- else:
- student['pw'] = bcrypt.hashpw(password.encode('utf-8'),
- bcrypt.gensalt())
-
-
-# ============================================================================
def insert_students_into_db(session, students) -> None:
- '''insert list of students into the database'''
+ """insert list of students into the database"""
try:
- session.add_all([Student(id=s['uid'], name=s['name'], password=s['pw'])
- for s in students])
+ session.add_all(
+ [Student(id=s["uid"], name=s["name"], password=s["pw"]) for s in students]
+ )
session.commit()
-
except IntegrityError:
- print('!!! Integrity error. Users already in database. Aborted !!!\n')
+ print("!!! Integrity error. Users already in database. Aborted !!!\n")
session.rollback()
# ============================================================================
def show_students_in_database(session, verbose=False):
- '''get students from database'''
+ """get students from database"""
users = session.execute(select(Student)).scalars().all()
- # users = session.query(Student).all()
total = len(users)
- print('Registered users:')
+ print("Registered users:")
if total == 0:
- print(' -- none --')
+ print(" -- none --")
else:
- users.sort(key=lambda u: f'{u.id:>12}') # sort by number
+ users.sort(key=lambda u: f"{u.id:>12}") # sort by number
if verbose:
for user in users:
- print(f'{user.id:>12} {user.name}')
+ print(f"{user.id:>12} {user.name}")
else:
- print(f'{users[0].id:>12} {users[0].name}')
+ print(f"{users[0].id:>12} {users[0].name}")
if total > 1:
- print(f'{users[1].id:>12} {users[1].name}')
+ print(f"{users[1].id:>12} {users[1].name}")
if total > 3:
- print(' | |')
+ print(" | |")
if total > 2:
- print(f'{users[-1].id:>12} {users[-1].name}')
- print(f'Total: {total}.')
+ print(f"{users[-1].id:>12} {users[-1].name}")
+ print(f"Total: {total}.")
# ============================================================================
def main():
- '''insert, update, show students from database'''
+ """insert, update, show students from database"""
+ ph = PasswordHasher()
args = parse_commandline_arguments()
# --- database
- print(f'Database: {args.db}')
- engine = create_engine(f'sqlite:///{args.db}', echo=False, future=True)
+ print(f"Database '{args.db}'")
+ engine = create_engine(f"sqlite:///{args.db}", echo=False) # no logging
Base.metadata.create_all(engine) # Criates schema if needed
- session = Session(engine, future=True)
+ session = Session(engine)
- # --- make list of students to insert
+ # --- build list of new students to insert
students = []
if args.admin:
- print('Adding user: 0, Admin.')
- students.append({'uid': '0', 'name': 'Admin'})
+ print("Adding user (0, Admin)")
+ students.append({"uid": "0", "name": "Admin"})
for csvfile in args.csvfile:
- print('Adding users from:', csvfile)
+ print(f"Adding users from '{csvfile}'")
students.extend(get_students_from_csv(csvfile))
if args.add:
for uid, name in args.add:
- print(f'Adding user: {uid}, {name}.')
- students.append({'uid': uid, 'name': name})
+ print(f"Adding user ({uid}, {name})")
+ students.append({"uid": uid, "name": name})
# --- insert new students
if students:
- print('Generating password hashes', end='')
- with ThreadPoolExecutor() as executor: # hashing
- executor.map(lambda s: hashpw(s, args.pw), students)
- print(f'\nInserting {len(students)}')
+ if args.pw is None:
+ print("Passwords set to empty")
+ for s in students:
+ s["pw"] = ""
+ else:
+ print("Generating password hashes")
+ for s in students:
+ s["pw"] = ph.hash(args.pw)
+ print(".", end="", flush=True)
+ print()
+ print(f"Inserting {len(students)}")
insert_students_into_db(session, students)
# --- update all students
if args.update_all:
- all_students = session.execute(
- select(Student).where(Student.id != '0')
- ).scalars().all()
-
- print(f'Updating password of {len(all_students)} users', end='')
- for student in all_students:
- password = (args.pw or student.id).encode('utf-8')
- student.password = bcrypt.hashpw(password, bcrypt.gensalt())
- print('.', end='', flush=True)
- print()
+ query = select(Student).where(Student.id != "0")
+ all_students = session.execute(query).scalars().all()
+ if args.pw is None:
+ print(f"Resetting password of {len(all_students)} users")
+ for student in all_students:
+ student.password = ''
+ else:
+ print(f"Updating password of {len(all_students)} users")
+ for student in all_students:
+ student.password = ph.hash(args.pw)
+ print(".", end="", flush=True)
+ print()
session.commit()
- # --- update some students
+ # --- update only specified students
else:
for student_id in args.update:
- print(f'Updating password of {student_id}')
- student = session.execute(
- select(Student).
- where(Student.id == student_id)
- ).scalar_one()
- new_password = (args.pw or student_id).encode('utf-8')
- student.password = bcrypt.hashpw(new_password, bcrypt.gensalt())
+ query = select(Student).where(Student.id == student_id)
+ student = session.execute(query).scalar_one()
+ if args.pw is None:
+ print(f"Resetting password of user {student_id}")
+ student.password = ""
+ else:
+ print(f"Updating password of user {student_id}")
+ student.password = ph.hash(args.pw)
session.commit()
+ print("Done!\n")
+
show_students_in_database(session, args.verbose)
session.close()
# ============================================================================
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/perguntations/main.py b/perguntations/main.py
index c488545..33f4aff 100644
--- a/perguntations/main.py
+++ b/perguntations/main.py
@@ -1,8 +1,8 @@
#!/usr/bin/env python3
-'''
+"""
Start application and web server
-'''
+"""
# python standard library
@@ -21,126 +21,157 @@ from . import APP_NAME, APP_VERSION
# ----------------------------------------------------------------------------
def parse_cmdline_arguments() -> argparse.Namespace:
- '''
+ """
Get command line arguments
- '''
+ """
parser = argparse.ArgumentParser(
- description='Server for online tests. Enrolled students and tests '
- 'have to be previously configured. Please read the documentation '
- 'included with this software before running the server.')
- parser.add_argument('testfile',
- type=str,
- help='tests in YAML format')
- parser.add_argument('--allow-all',
- action='store_true',
- help='Allow all students to login immediately')
- parser.add_argument('--allow-list',
- type=str,
- help='File with list of students to allow immediately')
- parser.add_argument('--debug',
- action='store_true',
- help='Enable debug messages')
- parser.add_argument('--review',
- action='store_true',
- help='Review mode: doesn\'t generate test')
- parser.add_argument('--correct',
- action='store_true',
- help='Correct test and update JSON files and database')
- parser.add_argument('--port',
- type=int,
- default=8443,
- help='port for the HTTPS server (default: 8443)')
- parser.add_argument('--version',
- action='version',
- version=f'{APP_VERSION} - python {sys.version}',
- help='Show version information and exit')
+ description="Server for online tests. Enrolled students and tests "
+ "have to be previously configured. Please read the documentation "
+ "included with this software before running the server."
+ )
+ parser.add_argument(
+ "testfile",
+ type=str,
+ help="test configuration in YAML format",
+ )
+ parser.add_argument(
+ "--allow-all",
+ action="store_true",
+ help="Allow login for all students",
+ )
+ parser.add_argument(
+ "--allow-list",
+ type=str,
+ help="File with list of students to allow immediately",
+ )
+ parser.add_argument(
+ "--debug",
+ action="store_true",
+ help="Enable debug mode in the logger and webserver",
+ )
+ parser.add_argument(
+ "--review",
+ action="store_true",
+ help="Fast start by not generating tests",
+ )
+ parser.add_argument(
+ "--correct",
+ action="store_true",
+ help="Correct test and update JSON files and database",
+ )
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=8443,
+ help="port for the HTTPS server (default: 8443)",
+ )
+ parser.add_argument(
+ "--version",
+ action="version",
+ version=f"{APP_VERSION} - python {sys.version}",
+ help="Show version information and exit",
+ )
return parser.parse_args()
+
# ----------------------------------------------------------------------------
def get_logger_config(debug=False) -> dict:
- '''
+ """
Load logger configuration from ~/.config directory if exists,
otherwise set default paramenters.
- '''
+ """
- file = 'logger-debug.yaml' if debug else 'logger.yaml'
- path = os.path.expanduser(os.environ.get('XDG_CONFIG_HOME', '~/.config/'))
+ file = "logger-debug.yaml" if debug else "logger.yaml"
+ path = os.path.expanduser(os.environ.get("XDG_CONFIG_HOME", "~/.config/"))
try:
return load_yaml(os.path.join(path, APP_NAME, file))
except OSError:
- print('Using default logger configuration...')
+ print("Using default logger configuration...")
if debug:
- level = 'DEBUG'
- fmt = '%(asctime)s %(levelname)-8s %(module)-12s%(lineno)4d| %(message)s'
- dateformat = ''
+ level = "DEBUG"
+ fmt = (
+ "%(asctime)s %(levelname)-8s %(module)-12s%(lineno)4d| %(message)s"
+ )
+ dateformat = ""
else:
- level = 'INFO'
- fmt = '%(asctime)s| %(levelname)-8s| %(message)s'
- dateformat = '%Y-%m-%d %H:%M:%S'
- modules = ['main', 'serve', 'app', 'models', 'questions', 'test',
- 'testfactory', 'tools']
- logger = {'handlers': ['default'], 'level': level, 'propagate': False}
+ level = "INFO"
+ fmt = "%(asctime)s| %(levelname)-8s| %(message)s"
+ dateformat = "%Y-%m-%d %H:%M:%S"
+ modules = [
+ "main",
+ "serve",
+ "app",
+ "models",
+ "questions",
+ "test",
+ "testfactory",
+ "tools",
+ ]
+ logger = {"handlers": ["default"], "level": level, "propagate": False}
return {
- 'version': 1,
- 'formatters': {
- 'standard': {
- 'format': fmt,
- 'datefmt': dateformat,
- },
- },
- 'handlers': {
- 'default': {
- 'level': level,
- 'class': 'logging.StreamHandler',
- 'formatter': 'standard',
- 'stream': 'ext://sys.stdout',
- },
- },
- 'loggers': {f'{APP_NAME}.{module}': logger for module in modules}
- }
+ "version": 1,
+ "formatters": {
+ "standard": {
+ "format": fmt,
+ "datefmt": dateformat,
+ },
+ },
+ "handlers": {
+ "default": {
+ "level": level,
+ "class": "logging.StreamHandler",
+ "formatter": "standard",
+ "stream": "ext://sys.stdout",
+ },
+ },
+ "loggers": {f"{APP_NAME}.{module}": logger for module in modules},
+ }
+
# ----------------------------------------------------------------------------
def main() -> None:
- '''
+ """
Tornado web server
- '''
+ """
args = parse_cmdline_arguments()
# --- Setup logging ------------------------------------------------------
logging.config.dictConfig(get_logger_config(args.debug))
logger = logging.getLogger(__name__)
- logger.info('================== Start Logging ==================')
+ logger.info("================== Start Logging ==================")
# --- start application --------------------------------------------------
config = {
- 'testfile': args.testfile,
- 'allow_all': args.allow_all,
- 'allow_list': args.allow_list,
- 'debug': args.debug,
- 'review': args.review,
- 'correct': args.correct,
- }
+ "testfile": args.testfile,
+ "allow_all": args.allow_all,
+ "allow_list": args.allow_list,
+ "debug": args.debug,
+ "review": args.review,
+ "correct": args.correct,
+ }
try:
app = App(config)
except AppException:
- logger.critical('Failed to start application!')
+ logger.critical("Failed to start application!")
sys.exit(1)
# --- get SSL certificates -----------------------------------------------
- if 'XDG_DATA_HOME' in os.environ:
- certs_dir = os.path.join(os.environ['XDG_DATA_HOME'], 'certs')
+ if "XDG_DATA_HOME" in os.environ:
+ certs_dir = os.path.join(os.environ["XDG_DATA_HOME"], "certs")
else:
- certs_dir = os.path.expanduser('~/.local/share/certs')
+ certs_dir = os.path.expanduser("~/.local/share/certs")
ssl_opt = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
try:
- ssl_opt.load_cert_chain(os.path.join(certs_dir, 'cert.pem'),
- os.path.join(certs_dir, 'privkey.pem'))
+ ssl_opt.load_cert_chain(
+ os.path.join(certs_dir, "cert.pem"),
+ os.path.join(certs_dir, "privkey.pem"),
+ )
except FileNotFoundError:
- logger.critical('SSL certificates missing in %s', certs_dir)
+ logger.critical("SSL certificates missing in %s", certs_dir)
sys.exit(1)
# --- run webserver ------------------------------------------------------
diff --git a/perguntations/models.py b/perguntations/models.py
index b93e358..13d77fd 100644
--- a/perguntations/models.py
+++ b/perguntations/models.py
@@ -1,93 +1,61 @@
-'''
+"""
perguntations/models.py
SQLAlchemy ORM
-'''
+"""
-from typing import Any
+from typing import List
-from sqlalchemy import Column, ForeignKey, Integer, Float, String
-from sqlalchemy.orm import declarative_base, relationship
+from sqlalchemy import ForeignKey, Integer, String
+from sqlalchemy.orm import DeclarativeBase, Mapped, relationship, mapped_column
-# FIXME: Any is a workaround for static type checking
-# (https://github.com/python/mypy/issues/6372)
-Base: Any = declarative_base()
+class Base(DeclarativeBase):
+ pass
+# [Student] ---1:N--- [Test] ---1:N--- [Question]
+
# ----------------------------------------------------------------------------
class Student(Base):
- '''Student table'''
__tablename__ = 'students'
- id = Column(String, primary_key=True)
- name = Column(String)
- password = Column(String)
+ id = mapped_column(String, primary_key=True)
+ name: Mapped[str]
+ password: Mapped[str]
# ---
- tests = relationship('Test', back_populates='student')
-
- def __repr__(self):
- return (f'Student('
- f'id={self.id!r}, '
- f'name={self.name!r}, '
- f'password={self.password!r})')
-
+ tests: Mapped[List['Test']] = relationship(back_populates='student')
# ----------------------------------------------------------------------------
class Test(Base):
- '''Test table'''
__tablename__ = 'tests'
- id = Column(Integer, primary_key=True) # auto_increment
- ref = Column(String)
- title = Column(String)
- grade = Column(Float)
- state = Column(String) # ACTIVE, SUBMITTED, CORRECTED, QUIT, NULL
- comment = Column(String)
- starttime = Column(String)
- finishtime = Column(String)
- filename = Column(String)
- student_id = Column(String, ForeignKey('students.id'))
+ id = mapped_column(Integer, primary_key=True) # auto_increment
+ ref: Mapped[str]
+ title: Mapped[str]
+ grade: Mapped[float]
+ state: Mapped[str] # ACTIVE, SUBMITTED, CORRECTED, QUIT, NULL
+ comment: Mapped[str]
+ starttime: Mapped[str]
+ finishtime: Mapped[str]
+ filename: Mapped[str]
+ student_id = mapped_column(String, ForeignKey('students.id'))
# ---
- student = relationship('Student', back_populates='tests')
- questions = relationship('Question', back_populates='test')
-
- def __repr__(self):
- return (f'Test('
- f'id={self.id!r}, '
- f'ref={self.ref!r}, '
- f'title={self.title!r}, '
- f'grade={self.grade!r}, '
- f'state={self.state!r}, '
- f'comment={self.comment!r}, '
- f'starttime={self.starttime!r}, '
- f'finishtime={self.finishtime!r}, '
- f'filename={self.filename!r}, '
- f'student_id={self.student_id!r})')
+ student: Mapped['Student'] = relationship(back_populates='tests')
+ questions: Mapped[List['Question']] = relationship(back_populates='test')
# ---------------------------------------------------------------------------
class Question(Base):
- '''Question table'''
__tablename__ = 'questions'
- id = Column(Integer, primary_key=True) # auto_increment
- number = Column(Integer) # question number (ref may be not be unique)
- ref = Column(String)
- grade = Column(Float)
- comment = Column(String)
- starttime = Column(String)
- finishtime = Column(String)
- test_id = Column(String, ForeignKey('tests.id'))
+ id = mapped_column(Integer, primary_key=True) # auto_increment
+ number: Mapped[int] # question number (ref may be repeated in the same test)
+ ref: Mapped[str]
+ grade: Mapped[float]
+ comment: Mapped[str]
+ starttime: Mapped[str]
+ finishtime: Mapped[str]
+ test_id = mapped_column(String, ForeignKey('tests.id'))
# ---
- test = relationship('Test', back_populates='questions')
- def __repr__(self):
- return (f'Question('
- f'id={self.id!r}, '
- f'number={self.number!r}, '
- f'ref={self.ref!r}, '
- f'grade={self.grade!r}, '
- f'comment={self.comment!r}, '
- f'starttime={self.starttime!r}, '
- f'finishtime={self.finishtime!r}, '
- f'test_id={self.test_id!r})')
+ test: Mapped['Test'] = relationship(back_populates='questions')
diff --git a/perguntations/parser_markdown.py b/perguntations/parser_markdown.py
index f2cd3c8..29dd3e2 100644
--- a/perguntations/parser_markdown.py
+++ b/perguntations/parser_markdown.py
@@ -1,8 +1,7 @@
-
-'''
+"""
Parse markdown and generate HTML
Includes support for LaTeX formulas
-'''
+"""
# python standard library
@@ -26,21 +25,26 @@ logger = logging.getLogger(__name__)
# Block math: $$x$$ or \begin{equation}x\end{equation}
# -------------------------------------------------------------------------
class MathBlockGrammar(mistune.BlockGrammar):
- '''
+ """
match block math $$x$$ and math environments begin{} end{}
- '''
+ """
+
# pylint: disable=too-few-public-methods
block_math = re.compile(r"^\$\$(.*?)\$\$", re.DOTALL)
- latex_environment = re.compile(r"^\\begin\{([a-z]*\*?)\}(.*?)\\end\{\1\}",
- re.DOTALL)
+ latex_environment = re.compile(
+ r"^\\begin\{([a-z]*\*?)\}(.*?)\\end\{\1\}", re.DOTALL
+ )
class MathBlockLexer(mistune.BlockLexer):
- '''
+ """
parser for block math and latex environment
- '''
- default_rules = ['block_math', 'latex_environment'] \
- + mistune.BlockLexer.default_rules
+ """
+
+ default_rules = [
+ "block_math",
+ "latex_environment",
+ ] + mistune.BlockLexer.default_rules
def __init__(self, rules=None, **kwargs):
if rules is None:
@@ -48,36 +52,37 @@ class MathBlockLexer(mistune.BlockLexer):
super().__init__(rules, **kwargs)
def parse_block_math(self, math):
- '''Parse a $$math$$ block'''
- self.tokens.append({
- 'type': 'block_math',
- 'text': math.group(1)
- })
+ """Parse a $$math$$ block"""
+ self.tokens.append({"type": "block_math", "text": math.group(1)})
def parse_latex_environment(self, math):
- '''Parse latex environment in formula'''
- self.tokens.append({
- 'type': 'latex_environment',
- 'name': math.group(1),
- 'text': math.group(2)
- })
+ """Parse latex environment in formula"""
+ self.tokens.append(
+ {
+ "type": "latex_environment",
+ "name": math.group(1),
+ "text": math.group(2),
+ }
+ )
class MathInlineGrammar(mistune.InlineGrammar):
- '''
+ """
match inline math $x$, block math $$x$$ and text
- '''
+ """
+
# pylint: disable=too-few-public-methods
math = re.compile(r"^\$(.+?)\$", re.DOTALL)
block_math = re.compile(r"^\$\$(.+?)\$\$", re.DOTALL)
- text = re.compile(r'^[\s\S]+?(?=[\\' \
- + header + '
' + body + ''
+ """render table"""
+ return (
+ '
'
+ + header
+ + ""
+ + body
+ + "
"
+ )
def image(self, src, title, text):
- '''render image'''
+ """render image"""
alt = mistune.escape(text, quote=True)
if title is not None:
if title: # not empty string, show as caption
title = mistune.escape(title, quote=True)
- caption = f'{title}' \
- ''
+ caption = f'{title}' ""
else: # title is an empty string, show as centered figure
- caption = ''
+ caption = ""
- return f'''
+ return f"""
- '''
+ """
# title indefined, show as inline image
- return f'''
+ return f"""
- '''
+ """
# Pass math through unaltered - mathjax does the rendering in the browser
def block_math(self, text):
- '''bypass block math'''
+ """bypass block math"""
# pylint: disable=no-self-use
- return fr'$$ {text} $$'
+ return rf"$$ {text} $$"
def latex_environment(self, name, text):
- '''bypass latex environment'''
+ """bypass latex environment"""
# pylint: disable=no-self-use
- return fr'\begin{{{name}}} {text} \end{{{name}}}'
+ return rf"\begin{{{name}}} {text} \end{{{name}}}"
def inline_math(self, text):
- '''bypass inline math'''
+ """bypass inline math"""
# pylint: disable=no-self-use
- return fr'$$$ {text} $$$'
+ return rf"$$$ {text} $$$"
-def md_to_html(qref='.'):
- '''markdown to html interface'''
+def md_to_html(qref="."):
+ """markdown to html interface"""
return MarkdownWithMath(HighlightRenderer(qref=qref))
diff --git a/perguntations/questions.py b/perguntations/questions.py
index 0b9a636..4053b3e 100644
--- a/perguntations/questions.py
+++ b/perguntations/questions.py
@@ -1,7 +1,7 @@
-'''
+"""
File: perguntations/questions.py
Description: Classes the implement several types of questions.
-'''
+"""
# python standard library
@@ -19,11 +19,11 @@ from .tools import run_script, run_script_async
# setup logger for this module
logger = logging.getLogger(__name__)
-QDict = NewType('QDict', Dict[str, Any])
+QDict = NewType("QDict", Dict[str, Any])
class QuestionException(Exception):
- '''Exceptions raised in this module'''
+ """Exceptions raised in this module"""
# ============================================================================
@@ -31,68 +31,72 @@ class QuestionException(Exception):
# presented to students.
# ============================================================================
class Question(dict):
- '''
+ """
Classes derived from this base class are meant to instantiate questions
for each student.
Instances can shuffle options or automatically generate questions.
- '''
+ """
def gen(self) -> None:
- '''
+ """
Sets defaults that are valid for any question type
- '''
+ """
# add required keys if missing
- self.set_defaults(QDict({
- 'title': '',
- 'answer': None,
- 'comments': '',
- 'solution': '',
- 'files': {},
- }))
+ self.set_defaults(
+ QDict(
+ {
+ "title": "",
+ "answer": None,
+ "comments": "",
+ "solution": "",
+ "files": {},
+ }
+ )
+ )
def set_answer(self, ans) -> None:
- '''set answer field and register time'''
- self['answer'] = ans
- self['finish_time'] = datetime.now()
+ """set answer field and register time"""
+ self["answer"] = ans
+ self["finish_time"] = datetime.now()
def correct(self) -> None:
- '''default correction (synchronous version)'''
- self['comments'] = ''
- self['grade'] = 0.0
+ """default correction (synchronous version)"""
+ self["comments"] = ""
+ self["grade"] = 0.0
async def correct_async(self) -> None:
- '''default correction (async version)'''
+ """default correction (async version)"""
self.correct()
def set_defaults(self, qdict: QDict) -> None:
- '''Add k:v pairs from default dict d for nonexistent keys'''
+ """Add k:v pairs from default dict d for nonexistent keys"""
for k, val in qdict.items():
self.setdefault(k, val)
# ============================================================================
class QuestionRadio(Question):
- '''An instance of QuestionRadio will always have the keys:
- type (str)
- text (str)
- options (list of strings)
- correct (list of floats)
- discount (bool, default=True)
- answer (None or an actual answer)
- shuffle (bool, default=True)
- choose (int) # only used if shuffle=True
- '''
+ """An instance of QuestionRadio will always have the keys:
+ type (str)
+ text (str)
+ options (list of strings)
+ correct (list of floats)
+ discount (bool, default=True)
+ answer (None or an actual answer)
+ shuffle (bool, default=True)
+ choose (int) # only used if shuffle=True
+ """
# ------------------------------------------------------------------------
def gen(self) -> None:
- '''
+ """
Sets defaults, performs checks and generates the actual question
by modifying the options and correct values
- '''
+ """
super().gen()
try:
- nopts = len(self['options'])
+ nopts = len(self["options"])
except KeyError as exc:
msg = f'Missing `options`. In question "{self["ref"]}"'
logger.error(msg)
@@ -102,121 +106,125 @@ class QuestionRadio(Question):
logger.error(msg)
raise QuestionException(msg) from exc
- self.set_defaults(QDict({
- 'text': '',
- 'correct': 0,
- 'shuffle': True,
- 'discount': True,
- 'max_tries': (nopts + 3) // 4 # 1 try for each 4 options
- }))
+ self.set_defaults(
+ QDict(
+ {
+ "text": "",
+ "correct": 0,
+ "shuffle": True,
+ "discount": True,
+ "max_tries": (nopts + 3) // 4, # 1 try for each 4 options
+ }
+ )
+ )
# check correct bounds and convert int to list,
# e.g. correct: 2 --> correct: [0,0,1,0,0]
- if isinstance(self['correct'], int):
- if not 0 <= self['correct'] < nopts:
+ if isinstance(self["correct"], int):
+ if not 0 <= self["correct"] < nopts:
msg = f'"{self["ref"]}": correct out of range 0..{nopts-1}'
logger.error(msg)
raise QuestionException(msg)
- self['correct'] = [1.0 if x == self['correct'] else 0.0
- for x in range(nopts)]
+ self["correct"] = [
+ 1.0 if x == self["correct"] else 0.0 for x in range(nopts)
+ ]
- elif isinstance(self['correct'], list):
+ elif isinstance(self["correct"], list):
# must match number of options
- if len(self['correct']) != nopts:
+ if len(self["correct"]) != nopts:
msg = f'"{self["ref"]}": number of options/correct mismatch'
logger.error(msg)
raise QuestionException(msg)
# make sure is a list of floats
try:
- self['correct'] = [float(x) for x in self['correct']]
+ self["correct"] = [float(x) for x in self["correct"]]
except (ValueError, TypeError) as exc:
msg = f'"{self["ref"]}": correct must contain floats or bools'
logger.error(msg)
raise QuestionException(msg) from exc
# check grade boundaries
- if self['discount'] and not all(0.0 <= x <= 1.0
- for x in self['correct']):
+ if self["discount"] and not all(0.0 <= x <= 1.0 for x in self["correct"]):
msg = f'"{self["ref"]}": correct must be in [0.0, 1.0]'
logger.error(msg)
raise QuestionException(msg)
# at least one correct option
- if all(x < 1.0 for x in self['correct']):
+ if all(x < 1.0 for x in self["correct"]):
msg = f'"{self["ref"]}": has no correct options'
logger.error(msg)
raise QuestionException(msg)
# If shuffle==false, all options are shown as defined
# otherwise, select 1 correct and choose a few wrong ones
- if self['shuffle']:
+ if self["shuffle"]:
# lists with indices of right and wrong options
- right = [i for i in range(nopts) if self['correct'][i] >= 1]
- wrong = [i for i in range(nopts) if self['correct'][i] < 1]
+ right = [i for i in range(nopts) if self["correct"][i] >= 1]
+ wrong = [i for i in range(nopts) if self["correct"][i] < 1]
- self.set_defaults(QDict({'choose': 1+len(wrong)}))
+ self.set_defaults(QDict({"choose": 1 + len(wrong)}))
# try to choose 1 correct option
if right:
sel = random.choice(right)
- options = [self['options'][sel]]
- correct = [self['correct'][sel]]
+ options = [self["options"][sel]]
+ correct = [self["correct"][sel]]
else:
options = []
correct = []
# choose remaining wrong options
- nwrong = self['choose'] - len(correct)
+ nwrong = self["choose"] - len(correct)
wrongsample = random.sample(wrong, k=nwrong)
- options += [self['options'][i] for i in wrongsample]
- correct += [self['correct'][i] for i in wrongsample]
+ options += [self["options"][i] for i in wrongsample]
+ correct += [self["correct"][i] for i in wrongsample]
# final shuffle of the options
- perm = random.sample(range(self['choose']), k=self['choose'])
- self['options'] = [str(options[i]) for i in perm]
- self['correct'] = [correct[i] for i in perm]
+ perm = random.sample(range(self["choose"]), k=self["choose"])
+ self["options"] = [str(options[i]) for i in perm]
+ self["correct"] = [correct[i] for i in perm]
# ------------------------------------------------------------------------
def correct(self) -> None:
- '''
+ """
Correct `answer` and set `grade`.
Can assign negative grades for wrong answers
- '''
+ """
super().correct()
- if self['answer'] is not None:
- grade = self['correct'][int(self['answer'])] # grade of the answer
- nopts = len(self['options'])
- grade_aver = sum(self['correct']) / nopts # expected value
+ if self["answer"] is not None:
+ grade = self["correct"][int(self["answer"])] # grade of the answer
+ nopts = len(self["options"])
+ grade_aver = sum(self["correct"]) / nopts # expected value
# note: there are no numerical errors when summing 1.0s so the
# x_aver can be exactly 1.0 if all options are right
- if self['discount'] and grade_aver != 1.0:
+ if self["discount"] and grade_aver != 1.0:
grade = (grade - grade_aver) / (1.0 - grade_aver)
- self['grade'] = grade
+ self["grade"] = grade
# ============================================================================
class QuestionCheckbox(Question):
- '''An instance of QuestionCheckbox will always have the keys:
- type (str)
- text (str)
- options (list of strings)
- shuffle (bool, default True)
- correct (list of floats)
- discount (bool, default True)
- choose (int)
- answer (None or an actual answer)
- '''
+ """An instance of QuestionCheckbox will always have the keys:
+ type (str)
+ text (str)
+ options (list of strings)
+ shuffle (bool, default True)
+ correct (list of floats)
+ discount (bool, default True)
+ choose (int)
+ answer (None or an actual answer)
+ """
# ------------------------------------------------------------------------
def gen(self) -> None:
super().gen()
try:
- nopts = len(self['options'])
+ nopts = len(self["options"])
except KeyError as exc:
msg = f'Missing `options`. In question "{self["ref"]}"'
logger.error(msg)
@@ -227,50 +235,56 @@ class QuestionCheckbox(Question):
raise QuestionException(msg) from exc
# set defaults if missing
- self.set_defaults(QDict({
- 'text': '',
- 'correct': [1.0] * nopts, # Using 0.0 breaks (right, wrong)
- 'shuffle': True,
- 'discount': True,
- 'choose': nopts, # number of options
- 'max_tries': max(1, min(nopts - 1, 3))
- }))
+ self.set_defaults(
+ QDict(
+ {
+ "text": "",
+ "correct": [1.0] * nopts, # Using 0.0 breaks (right, wrong)
+ "shuffle": True,
+ "discount": True,
+ "choose": nopts, # number of options
+ "max_tries": max(1, min(nopts - 1, 3)),
+ }
+ )
+ )
# must be a list of numbers
- if not isinstance(self['correct'], list):
- msg = 'Correct must be a list of numbers or booleans'
+ if not isinstance(self["correct"], list):
+ msg = "Correct must be a list of numbers or booleans"
logger.error(msg)
raise QuestionException(msg)
# must match number of options
- if len(self['correct']) != nopts:
- msg = (f'{nopts} options vs {len(self["correct"])} correct. '
- f'In question "{self["ref"]}"')
+ if len(self["correct"]) != nopts:
+ msg = (
+ f'{nopts} options vs {len(self["correct"])} correct. '
+ f'In question "{self["ref"]}"'
+ )
logger.error(msg)
raise QuestionException(msg)
# make sure is a list of floats
try:
- self['correct'] = [float(x) for x in self['correct']]
+ self["correct"] = [float(x) for x in self["correct"]]
except (ValueError, TypeError) as exc:
- msg = ('`correct` must be list of numbers or booleans.'
- f'In "{self["ref"]}"')
+ msg = "`correct` must be list of numbers or booleans." f'In "{self["ref"]}"'
logger.error(msg)
raise QuestionException(msg) from exc
# check grade boundaries
- if self['discount'] and not all(0.0 <= x <= 1.0
- for x in self['correct']):
- msg = ('values in the `correct` field of checkboxes must be in '
- 'the [0.0, 1.0] interval. '
- f'Please fix "{self["ref"]}" in "{self["path"]}"')
+ if self["discount"] and not all(0.0 <= x <= 1.0 for x in self["correct"]):
+ msg = (
+ "values in the `correct` field of checkboxes must be in "
+ "the [0.0, 1.0] interval. "
+ f'Please fix "{self["ref"]}" in "{self["path"]}"'
+ )
logger.error(msg)
raise QuestionException(msg)
# if an option is a list of (right, wrong), pick one
options = []
correct = []
- for option, corr in zip(self['options'], self['correct']):
+ for option, corr in zip(self["options"], self["correct"]):
if isinstance(option, list):
sel = random.randint(0, 1)
option = option[sel]
@@ -281,252 +295,288 @@ class QuestionCheckbox(Question):
# generate random permutation, e.g. [2,1,4,0,3]
# and apply to `options` and `correct`
- if self['shuffle']:
- perm = random.sample(range(nopts), k=self['choose'])
- self['options'] = [options[i] for i in perm]
- self['correct'] = [correct[i] for i in perm]
+ if self["shuffle"]:
+ perm = random.sample(range(nopts), k=self["choose"])
+ self["options"] = [options[i] for i in perm]
+ self["correct"] = [correct[i] for i in perm]
else:
- self['options'] = options[:self['choose']]
- self['correct'] = correct[:self['choose']]
+ self["options"] = options[: self["choose"]]
+ self["correct"] = correct[: self["choose"]]
# ------------------------------------------------------------------------
# can return negative values for wrong answers
def correct(self) -> None:
super().correct()
- if self['answer'] is not None:
+ if self["answer"] is not None:
grade = 0.0
- if self['discount']:
- sum_abs = sum(abs(2*p-1) for p in self['correct'])
- for i, pts in enumerate(self['correct']):
- grade += 2*pts-1 if str(i) in self['answer'] else 1-2*pts
+ if self["discount"]:
+ sum_abs = sum(abs(2 * p - 1) for p in self["correct"])
+ for i, pts in enumerate(self["correct"]):
+ grade += 2 * pts - 1 if str(i) in self["answer"] else 1 - 2 * pts
else:
- sum_abs = sum(abs(p) for p in self['correct'])
- for i, pts in enumerate(self['correct']):
- grade += pts if str(i) in self['answer'] else 0.0
+ sum_abs = sum(abs(p) for p in self["correct"])
+ for i, pts in enumerate(self["correct"]):
+ grade += pts if str(i) in self["answer"] else 0.0
try:
- self['grade'] = grade / sum_abs
+ self["grade"] = grade / sum_abs
except ZeroDivisionError:
- self['grade'] = 1.0 # limit p->0
+ self["grade"] = 1.0 # limit p->0
# ============================================================================
class QuestionText(Question):
- '''An instance of QuestionText will always have the keys:
- type (str)
- text (str)
- correct (list of str)
- answer (None or an actual answer)
- '''
+ """An instance of QuestionText will always have the keys:
+ type (str)
+ text (str)
+ correct (list of str)
+ answer (None or an actual answer)
+ """
# ------------------------------------------------------------------------
def gen(self) -> None:
super().gen()
- self.set_defaults(QDict({
- 'text': '',
- 'correct': [], # no correct answers, always wrong
- 'transform': [], # transformations applied to the answer, in order
- }))
+ self.set_defaults(
+ QDict(
+ {
+ "text": "",
+ "correct": [], # no correct answers, always wrong
+ "transform": [], # transformations applied to the answer, in order
+ }
+ )
+ )
# make sure its always a list of possible correct answers
- if not isinstance(self['correct'], list):
- self['correct'] = [str(self['correct'])]
+ if not isinstance(self["correct"], list):
+ self["correct"] = [str(self["correct"])]
else:
# make sure all elements of the list are strings
- self['correct'] = [str(a) for a in self['correct']]
-
- for transform in self['transform']:
- if transform not in ('remove_space', 'trim', 'normalize_space',
- 'lower', 'upper'):
- msg = (f'Unknown transform "{transform}" in "{self["ref"]}"')
+ self["correct"] = [str(a) for a in self["correct"]]
+
+ for transform in self["transform"]:
+ if transform not in (
+ "remove_space",
+ "trim",
+ "normalize_space",
+ "lower",
+ "upper",
+ ):
+ msg = f'Unknown transform "{transform}" in "{self["ref"]}"'
raise QuestionException(msg)
# check if answers are invariant with respect to the transforms
- if any(c != self.transform(c) for c in self['correct']):
- logger.warning('in "%s", correct answers are not invariant wrt '
- 'transformations => never correct', self["ref"])
+ if any(c != self.transform(c) for c in self["correct"]):
+ logger.warning(
+ 'in "%s", correct answers are not invariant wrt '
+ "transformations => never correct",
+ self["ref"],
+ )
# ------------------------------------------------------------------------
def transform(self, ans):
- '''apply optional filters to the answer'''
+ """apply optional filters to the answer"""
# apply transformations in sequence
- for transform in self['transform']:
- if transform == 'remove_space': # removes all spaces
- ans = ans.replace(' ', '')
- elif transform == 'trim': # removes spaces around
+ for transform in self["transform"]:
+ if transform == "remove_space": # removes all spaces
+ ans = ans.replace(" ", "")
+ elif transform == "trim": # removes spaces around
ans = ans.strip()
- elif transform == 'normalize_space': # replaces many spaces by one
- ans = re.sub(r'\s+', ' ', ans.strip())
- elif transform == 'lower': # convert to lowercase
+ elif transform == "normalize_space": # replaces many spaces by one
+ ans = re.sub(r"\s+", " ", ans.strip())
+ elif transform == "lower": # convert to lowercase
ans = ans.lower()
- elif transform == 'upper': # convert to uppercase
+ elif transform == "upper": # convert to uppercase
ans = ans.upper()
else:
- logger.warning('in "%s", unknown transform "%s"',
- self["ref"], transform)
+ logger.warning(
+ 'in "%s", unknown transform "%s"', self["ref"], transform
+ )
return ans
# ------------------------------------------------------------------------
def correct(self) -> None:
super().correct()
- if self['answer'] is not None:
- answer = self.transform(self['answer'])
- self['grade'] = 1.0 if answer in self['correct'] else 0.0
+ if self["answer"] is not None:
+ answer = self.transform(self["answer"])
+ self["grade"] = 1.0 if answer in self["correct"] else 0.0
# ============================================================================
class QuestionTextRegex(Question):
- '''An instance of QuestionTextRegex will always have the keys:
- type (str)
- text (str)
- correct (str or list[str])
- answer (None or an actual answer)
+ """An instance of QuestionTextRegex will always have the keys:
+ type (str)
+ text (str)
+ correct (str or list[str])
+ answer (None or an actual answer)
- The correct strings are python standard regular expressions.
- Grade is 1.0 when the answer matches any of the regex in the list.
- '''
+ The correct strings are python standard regular expressions.
+ Grade is 1.0 when the answer matches any of the regex in the list.
+ """
# ------------------------------------------------------------------------
def gen(self) -> None:
super().gen()
- self.set_defaults(QDict({
- 'text': '',
- 'correct': ['$.^'], # will always return false
- }))
+ self.set_defaults(
+ QDict(
+ {
+ "text": "",
+ "correct": ["$.^"], # will always return false
+ }
+ )
+ )
# make sure its always a list of regular expressions
- if not isinstance(self['correct'], list):
- self['correct'] = [self['correct']]
+ if not isinstance(self["correct"], list):
+ self["correct"] = [self["correct"]]
# ------------------------------------------------------------------------
def correct(self) -> None:
super().correct()
- if self['answer'] is not None:
- for regex in self['correct']:
+ if self["answer"] is not None:
+ for regex in self["correct"]:
try:
- if re.fullmatch(regex, self['answer']):
- self['grade'] = 1.0
+ if re.fullmatch(regex, self["answer"]):
+ self["grade"] = 1.0
return
except TypeError:
- logger.error('While matching regex "%s" with answer "%s".',
- regex, self['answer'])
- self['grade'] = 0.0
+ logger.error(
+ 'While matching regex "%s" with answer "%s".',
+ regex,
+ self["answer"],
+ )
+ self["grade"] = 0.0
+
# ============================================================================
class QuestionNumericInterval(Question):
- '''An instance of QuestionTextNumeric will always have the keys:
+ """An instance of QuestionTextNumeric will always have the keys:
type (str)
text (str)
correct (list [lower bound, upper bound])
answer (None or an actual answer)
An answer is correct if it's in the closed interval.
- '''
+ """
# ------------------------------------------------------------------------
def gen(self) -> None:
super().gen()
- self.set_defaults(QDict({
- 'text': '',
- 'correct': [1.0, -1.0], # will always return false
- }))
+ self.set_defaults(
+ QDict(
+ {
+ "text": "",
+ "correct": [1.0, -1.0], # will always return false
+ }
+ )
+ )
# if only one number n is given, make an interval [n,n]
- if isinstance(self['correct'], (int, float)):
- self['correct'] = [float(self['correct']), float(self['correct'])]
+ if isinstance(self["correct"], (int, float)):
+ self["correct"] = [float(self["correct"]), float(self["correct"])]
# make sure its a list of two numbers
- elif isinstance(self['correct'], list):
- if len(self['correct']) != 2:
- msg = (f'Numeric interval must be a list with two numbers, in '
- f'{self["ref"]}')
+ elif isinstance(self["correct"], list):
+ if len(self["correct"]) != 2:
+ msg = (
+ f"Numeric interval must be a list with two numbers, in "
+ f'{self["ref"]}'
+ )
logger.error(msg)
raise QuestionException(msg)
try:
- self['correct'] = [float(n) for n in self['correct']]
+ self["correct"] = [float(n) for n in self["correct"]]
except Exception as exc:
- msg = (f'Numeric interval must be a list with two numbers, in '
- f'{self["ref"]}')
+ msg = (
+ f"Numeric interval must be a list with two numbers, in "
+ f'{self["ref"]}'
+ )
logger.error(msg)
raise QuestionException(msg) from exc
# invalid
else:
- msg = (f'Numeric interval must be a list with two numbers, in '
- f'{self["ref"]}')
+ msg = (
+ f"Numeric interval must be a list with two numbers, in "
+ f'{self["ref"]}'
+ )
logger.error(msg)
raise QuestionException(msg)
# ------------------------------------------------------------------------
def correct(self) -> None:
super().correct()
- if self['answer'] is not None:
- lower, upper = self['correct']
+ if self["answer"] is not None:
+ lower, upper = self["correct"]
- try: # replace , by . and convert to float
- answer = float(self['answer'].replace(',', '.', 1))
+ try: # replace , by . and convert to float
+ answer = float(self["answer"].replace(",", ".", 1))
except ValueError:
- self['comments'] = ('A resposta tem de ser numérica, '
- 'por exemplo `12.345`.')
- self['grade'] = 0.0
+ self["comments"] = (
+ "A resposta tem de ser numérica, " "por exemplo `12.345`."
+ )
+ self["grade"] = 0.0
else:
- self['grade'] = 1.0 if lower <= answer <= upper else 0.0
+ self["grade"] = 1.0 if lower <= answer <= upper else 0.0
# ============================================================================
class QuestionTextArea(Question):
- '''An instance of QuestionTextArea will always have the keys:
- type (str)
- text (str)
- correct (str with script to run)
- answer (None or an actual answer)
- '''
+ """An instance of QuestionTextArea will always have the keys:
+ type (str)
+ text (str)
+ correct (str with script to run)
+ answer (None or an actual answer)
+ """
# ------------------------------------------------------------------------
def gen(self) -> None:
super().gen()
- self.set_defaults(QDict({
- 'text': '',
- 'timeout': 5, # seconds
- 'correct': '', # trying to execute this will fail => grade 0.0
- 'args': []
- }))
+ self.set_defaults(
+ QDict(
+ {
+ "text": "",
+ "timeout": 5, # seconds
+ "correct": "", # trying to execute this will fail => grade 0.0
+ "args": [],
+ }
+ )
+ )
- self['correct'] = os.path.join(self['path'], self['correct'])
+ self["correct"] = os.path.join(self["path"], self["correct"])
# ------------------------------------------------------------------------
def correct(self) -> None:
super().correct()
- if self['answer'] is not None: # correct answer and parse yaml ouput
+ if self["answer"] is not None: # correct answer and parse yaml ouput
out = run_script(
- script=self['correct'],
- args=self['args'],
- stdin=self['answer'],
- timeout=self['timeout']
- )
+ script=self["correct"],
+ args=self["args"],
+ stdin=self["answer"],
+ timeout=self["timeout"],
+ )
if out is None:
logger.warning('No grade after running "%s".', self["correct"])
- self['comments'] = 'O programa de correcção abortou...'
- self['grade'] = 0.0
+ self["comments"] = "O programa de correcção abortou..."
+ self["grade"] = 0.0
elif isinstance(out, dict):
- self['comments'] = out.get('comments', '')
+ self["comments"] = out.get("comments", "")
try:
- self['grade'] = float(out['grade'])
+ self["grade"] = float(out["grade"])
except ValueError:
logger.error('Output error in "%s".', self["correct"])
except KeyError:
logger.error('No grade in "%s".', self["correct"])
else:
try:
- self['grade'] = float(out)
+ self["grade"] = float(out)
except (TypeError, ValueError):
logger.error('Invalid grade in "%s".', self["correct"])
@@ -534,92 +584,96 @@ class QuestionTextArea(Question):
async def correct_async(self) -> None:
super().correct()
- if self['answer'] is not None: # correct answer and parse yaml ouput
+ if self["answer"] is not None: # correct answer and parse yaml ouput
out = await run_script_async(
- script=self['correct'],
- args=self['args'],
- stdin=self['answer'],
- timeout=self['timeout']
- )
+ script=self["correct"],
+ args=self["args"],
+ stdin=self["answer"],
+ timeout=self["timeout"],
+ )
if out is None:
logger.warning('No grade after running "%s".', self["correct"])
- self['comments'] = 'O programa de correcção abortou...'
- self['grade'] = 0.0
+ self["comments"] = "O programa de correcção abortou..."
+ self["grade"] = 0.0
elif isinstance(out, dict):
- self['comments'] = out.get('comments', '')
+ self["comments"] = out.get("comments", "")
try:
- self['grade'] = float(out['grade'])
+ self["grade"] = float(out["grade"])
except ValueError:
logger.error('Output error in "%s".', self["correct"])
except KeyError:
logger.error('No grade in "%s".', self["correct"])
else:
try:
- self['grade'] = float(out)
+ self["grade"] = float(out)
except (TypeError, ValueError):
logger.error('Invalid grade in "%s".', self["correct"])
# ============================================================================
class QuestionInformation(Question):
- '''
+ """
Not really a question, just an information panel.
The correction is always right.
- '''
+ """
# ------------------------------------------------------------------------
def gen(self) -> None:
super().gen()
- self.set_defaults(QDict({
- 'text': '',
- }))
+ self.set_defaults(
+ QDict(
+ {
+ "text": "",
+ }
+ )
+ )
# ------------------------------------------------------------------------
def correct(self) -> None:
super().correct()
- self['grade'] = 1.0 # always "correct" but points should be zero!
+ self["grade"] = 1.0 # always "correct" but points should be zero!
# ============================================================================
def question_from(qdict: QDict) -> Question:
- '''
+ """
Converts a question specified in a dict into an instance of Question()
- '''
+ """
types = {
- 'radio': QuestionRadio,
- 'checkbox': QuestionCheckbox,
- 'text': QuestionText,
- 'text-regex': QuestionTextRegex,
- 'numeric-interval': QuestionNumericInterval,
- 'textarea': QuestionTextArea,
+ "radio": QuestionRadio,
+ "checkbox": QuestionCheckbox,
+ "text": QuestionText,
+ "text-regex": QuestionTextRegex,
+ "numeric-interval": QuestionNumericInterval,
+ "textarea": QuestionTextArea,
# -- informative panels --
- 'information': QuestionInformation,
- 'success': QuestionInformation,
- 'warning': QuestionInformation,
- 'alert': QuestionInformation,
- }
+ "information": QuestionInformation,
+ "success": QuestionInformation,
+ "warning": QuestionInformation,
+ "alert": QuestionInformation,
+ }
# Get class for this question type
try:
- qclass = types[qdict['type']]
+ qclass = types[qdict["type"]]
except KeyError:
- logger.error('Invalid type "%s" in "%s"', qdict['type'], qdict['ref'])
+ logger.error('Invalid type "%s" in "%s"', qdict["type"], qdict["ref"])
raise
# Create an instance of Question() of appropriate type
try:
qinstance = qclass(qdict.copy())
except QuestionException:
- logger.error('Generating "%s" in %s', qdict['ref'], qdict['filename'])
+ logger.error('Generating "%s" in %s', qdict["ref"], qdict["filename"])
raise
return qinstance
# ============================================================================
-class QFactory():
- '''
+class QFactory:
+ """
QFactory is a class that can generate question instances, e.g. by shuffling
options, running a script to generate the question, etc.
@@ -644,35 +698,35 @@ class QFactory():
question.set_answer(42) # set answer
question.correct() # correct answer
grade = question['grade'] # get grade
- '''
+ """
def __init__(self, qdict: QDict = QDict({})) -> None:
self.qdict = qdict
# ------------------------------------------------------------------------
async def gen_async(self) -> Question:
- '''
+ """
generates a question instance of QuestionRadio, QuestionCheckbox, ...,
which is a descendent of base class Question.
- '''
+ """
- logger.debug('generating %s...', self.qdict["ref"])
+ logger.debug("generating %s...", self.qdict["ref"])
# Shallow copy so that script generated questions will not replace
# the original generators
qdict = QDict(self.qdict.copy())
- qdict['qid'] = str(uuid.uuid4()) # unique for each question
+ qdict["qid"] = str(uuid.uuid4()) # unique for each question
# If question is of generator type, an external program will be run
# which will print a valid question in yaml format to stdout. This
# output is then yaml parsed into a dictionary `q`.
- if qdict['type'] == 'generator':
- logger.debug(' \\_ Running "%s"', qdict['script'])
- qdict.setdefault('args', [])
- qdict.setdefault('stdin', '')
- script = os.path.join(qdict['path'], qdict['script'])
- out = await run_script_async(script=script,
- args=qdict['args'],
- stdin=qdict['stdin'])
+ if qdict["type"] == "generator":
+ logger.debug(' \\_ Running "%s"', qdict["script"])
+ qdict.setdefault("args", [])
+ qdict.setdefault("stdin", "")
+ script = os.path.join(qdict["path"], qdict["script"])
+ out = await run_script_async(
+ script=script, args=qdict["args"], stdin=qdict["stdin"]
+ )
qdict.update(out)
question = question_from(qdict) # returns a Question instance
diff --git a/perguntations/serve.py b/perguntations/serve.py
index 2d30d2c..757f808 100644
--- a/perguntations/serve.py
+++ b/perguntations/serve.py
@@ -1,9 +1,8 @@
-#!/usr/bin/env python3
-'''
+"""
Handles the web, http & html part of the application interface.
Uses the tornadoweb framework.
-'''
+"""
# python standard library
import asyncio
@@ -21,9 +20,7 @@ from typing import Dict, Tuple
import uuid
# user installed libraries
-import tornado.ioloop
-import tornado.web
-import tornado.httpserver
+import tornado
# this project
from .parser_markdown import md_to_html
@@ -35,29 +32,30 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
class WebApplication(tornado.web.Application):
- '''
+ """
Web Application. Routes to handler classes.
- '''
+ """
+
def __init__(self, testapp, debug=False):
handlers = [
- (r'/login', LoginHandler),
- (r'/logout', LogoutHandler),
- (r'/review', ReviewHandler),
- (r'/admin', AdminHandler),
- (r'/file', FileHandler),
- (r'/adminwebservice', AdminWebservice),
- (r'/studentwebservice', StudentWebservice),
- (r'/', RootHandler),
+ (r"/login", LoginHandler),
+ (r"/logout", LogoutHandler),
+ (r"/review", ReviewHandler),
+ (r"/admin", AdminHandler),
+ (r"/file", FileHandler),
+ (r"/adminwebservice", AdminWebservice),
+ (r"/studentwebservice", StudentWebservice),
+ (r"/", RootHandler),
]
settings = {
- 'template_path': path.join(path.dirname(__file__), 'templates'),
- 'static_path': path.join(path.dirname(__file__), 'static'),
- 'static_url_prefix': '/static/',
- 'xsrf_cookies': True,
- 'cookie_secret': base64.b64encode(uuid.uuid4().bytes),
- 'login_url': '/login',
- 'debug': debug,
+ "template_path": path.join(path.dirname(__file__), "templates"),
+ "static_path": path.join(path.dirname(__file__), "static"),
+ "static_url_prefix": "/static/",
+ "xsrf_cookies": True,
+ "cookie_secret": base64.b64encode(uuid.uuid4().bytes),
+ "login_url": "/login",
+ "debug": debug,
}
super().__init__(handlers, **settings)
self.testapp = testapp
@@ -65,32 +63,34 @@ class WebApplication(tornado.web.Application):
# ----------------------------------------------------------------------------
def admin_only(func):
- '''
+ """
Decorator to restrict access to the administrator:
@admin_only
def get(self):
- '''
+ """
+
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
- if self.current_user != '0':
+ if self.current_user != "0":
raise tornado.web.HTTPError(403) # forbidden
await func(self, *args, **kwargs)
+
return wrapper
# ----------------------------------------------------------------------------
# pylint: disable=abstract-method
class BaseHandler(tornado.web.RequestHandler):
- '''
+ """
Handlers should inherit this one instead of tornado.web.RequestHandler.
It automatically gets the user cookie, which is required to identify the
user in most handlers.
- '''
+ """
@property
def testapp(self):
- '''simplifies access to the application a little bit'''
+ """simplifies access to the application a little bit"""
return self.application.testapp
# @property
@@ -99,62 +99,62 @@ class BaseHandler(tornado.web.RequestHandler):
# return self.application.testapp.debug
def get_current_user(self):
- '''
+ """
Since HTTP is stateless, a cookie is used to identify the user.
This function returns the cookie for the current user.
- '''
- cookie = self.get_secure_cookie('perguntations_user')
+ """
+ cookie = self.get_secure_cookie("perguntations_user")
if cookie:
- return cookie.decode('utf-8')
+ return cookie.decode("utf-8")
return None
# ----------------------------------------------------------------------------
# pylint: disable=abstract-method
class LoginHandler(BaseHandler):
- '''Handles /login'''
+ """Handles /login"""
- _prefix = re.compile(r'[a-z]')
+ _prefix = re.compile(r"[a-z]")
_error_msg = {
- 'wrong_password': 'Senha errada',
- 'not_allowed': 'Não está autorizado a fazer o teste',
- 'nonexistent': 'Número de aluno inválido'
+ "wrong_password": "Senha errada",
+ "not_allowed": "Não está autorizado a fazer o teste",
+ "nonexistent": "Número de aluno inválido",
}
def get(self):
- '''Render login page.'''
- self.render('login.html', error='')
+ """Render login page."""
+ self.render("login.html", error="")
async def post(self):
- '''Authenticates student and login.'''
- uid = self.get_body_argument('uid')
- password = self.get_body_argument('pw')
+ """Authenticates student and login."""
+ uid = self.get_body_argument("uid")
+ password = self.get_body_argument("pw")
headers = {
- 'remote_ip': self.request.remote_ip,
- 'user_agent': self.request.headers.get('User-Agent')
+ "remote_ip": self.request.remote_ip,
+ "user_agent": self.request.headers.get("User-Agent"),
}
error = await self.testapp.login(uid, password, headers)
if error is not None:
await asyncio.sleep(3) # delay to avoid spamming the server...
- self.render('login.html', error=self._error_msg[error])
+ self.render("login.html", error=self._error_msg[error])
else:
- self.set_secure_cookie('perguntations_user', str(uid))
- self.redirect('/')
+ self.set_secure_cookie("perguntations_user", str(uid))
+ self.redirect("/")
# ----------------------------------------------------------------------------
# pylint: disable=abstract-method
class LogoutHandler(BaseHandler):
- '''Handle /logout'''
+ """Handle /logout"""
@tornado.web.authenticated
def get(self):
- '''Logs out a user.'''
+ """Logs out a user."""
self.testapp.logout(self.current_user)
- self.clear_cookie('perguntations_user')
- self.render('login.html', error='')
+ self.clear_cookie("perguntations_user")
+ self.render("login.html", error="")
# ----------------------------------------------------------------------------
@@ -162,57 +162,64 @@ class LogoutHandler(BaseHandler):
# ----------------------------------------------------------------------------
# pylint: disable=abstract-method
class RootHandler(BaseHandler):
- '''
+ """
Presents test to student.
Receives answers, corrects the test and sends back the grade.
Redirects user 0 to /admin.
- '''
+ """
_templates = {
# -- question templates --
- 'radio': 'question-radio.html',
- 'checkbox': 'question-checkbox.html',
- 'text': 'question-text.html',
- 'text-regex': 'question-text.html',
- 'numeric-interval': 'question-text.html',
- 'textarea': 'question-textarea.html',
+ "radio": "question-radio.html",
+ "checkbox": "question-checkbox.html",
+ "text": "question-text.html",
+ "text-regex": "question-text.html",
+ "numeric-interval": "question-text.html",
+ "textarea": "question-textarea.html",
# -- information panels --
- 'information': 'question-information.html',
- 'success': 'question-information.html',
- 'warning': 'question-information.html',
- 'alert': 'question-information.html',
+ "information": "question-information.html",
+ "success": "question-information.html",
+ "warning": "question-information.html",
+ "alert": "question-information.html",
}
# --- GET
@tornado.web.authenticated
async def get(self):
- '''
+ """
Handles GET /
Sends test to student or redirects 0 to admin page.
Multiple calls to this function will return the same test.
- '''
+ """
uid = self.current_user
logger.debug('"%s" GET /', uid)
- if uid == '0':
- self.redirect('/admin')
+ if uid == "0":
+ self.redirect("/admin")
else:
test = self.testapp.get_test(uid)
name = self.testapp.get_name(uid)
- self.render('test.html', t=test, uid=uid, name=name, md=md_to_html,
- templ=self._templates, debug=self.testapp.debug)
+ self.render(
+ "test.html",
+ t=test,
+ uid=uid,
+ name=name,
+ md=md_to_html,
+ templ=self._templates,
+ debug=self.testapp.debug,
+ )
# --- POST
@tornado.web.authenticated
async def post(self):
- '''
+ """
Receives answers, fixes some html weirdness, corrects test and
renders the grade.
self.request.arguments = {'answered-0': [b'on'], '0': [b'13.45']}
builds dictionary ans = {0: 'answer0', 1:, 'answer1', ...}
unanswered questions are not included.
- '''
+ """
starttime = timer() # performance timer
uid = self.current_user
@@ -224,42 +231,47 @@ class RootHandler(BaseHandler):
raise tornado.web.HTTPError(403) # Forbidden
ans = {}
- for i, question in enumerate(test['questions']):
+ for i, question in enumerate(test["questions"]):
qid = str(i)
- if f'answered-{qid}' in self.request.arguments:
+ if f"answered-{qid}" in self.request.arguments:
ans[i] = self.get_body_arguments(qid)
# remove enclosing list in some question types
- if question['type'] == 'radio':
+ if question["type"] == "radio":
ans[i] = ans[i][0] if ans[i] else None
- elif question['type'] in ('text', 'text-regex', 'textarea',
- 'numeric-interval'):
+ elif question["type"] in (
+ "text",
+ "text-regex",
+ "textarea",
+ "numeric-interval",
+ ):
ans[i] = ans[i][0]
# submit answered questions, correct
await self.testapp.submit_test(uid, ans)
name = self.testapp.get_name(uid)
- self.render('grade.html', t=test, uid=uid, name=name)
- self.clear_cookie('perguntations_user')
+ self.render("grade.html", t=test, uid=uid, name=name)
+ self.clear_cookie("perguntations_user")
self.testapp.logout(uid)
- logger.info(' elapsed time: %fs', timer() - starttime)
+ logger.info(" elapsed time: %fs", timer() - starttime)
# ----------------------------------------------------------------------------
# pylint: disable=abstract-method
+# FIXME: also to update answers
class StudentWebservice(BaseHandler):
- '''
+ """
Receive ajax from students during the test in response to the events
- focus, unfocus and resize.
- '''
+ focus, unfocus and resize, etc.
+ """
@tornado.web.authenticated
def post(self):
- '''handle ajax post'''
+ """handle ajax post"""
uid = self.current_user
- cmd = self.get_body_argument('cmd', None)
- value = self.get_body_argument('value', None)
+ cmd = self.get_body_argument("cmd", None)
+ value = self.get_body_argument("value", None)
if cmd is not None and value is not None:
self.testapp.register_event(uid, cmd, json.loads(value))
@@ -267,29 +279,31 @@ class StudentWebservice(BaseHandler):
# ----------------------------------------------------------------------------
# pylint: disable=abstract-method
class AdminWebservice(BaseHandler):
- '''
+ """
Receive ajax requests from admin
- '''
+ """
@tornado.web.authenticated
@admin_only
async def get(self):
- '''admin webservices that do not change state'''
- cmd = self.get_query_argument('cmd')
- logger.debug('GET /adminwebservice %s', cmd)
+ """admin webservices that do not change state"""
+ cmd = self.get_query_argument("cmd")
+ logger.debug("GET /adminwebservice %s", cmd)
- if cmd == 'testcsv':
+ if cmd == "testcsv":
test_ref, data = self.testapp.get_grades_csv()
- self.set_header('Content-Type', 'text/csv')
- self.set_header('content-Disposition',
- f'attachment; filename={test_ref}.csv')
+ self.set_header("Content-Type", "text/csv")
+ self.set_header(
+ "content-Disposition", f"attachment; filename={test_ref}.csv"
+ )
self.write(data)
await self.flush()
- elif cmd == 'questionscsv':
+ elif cmd == "questionscsv":
test_ref, data = self.testapp.get_detailed_grades_csv()
- self.set_header('Content-Type', 'text/csv')
- self.set_header('content-Disposition',
- f'attachment; filename={test_ref}-detailed.csv')
+ self.set_header("Content-Type", "text/csv")
+ self.set_header(
+ "content-Disposition", f"attachment; filename={test_ref}-detailed.csv"
+ )
self.write(data)
await self.flush()
@@ -297,52 +311,53 @@ class AdminWebservice(BaseHandler):
# ----------------------------------------------------------------------------
# pylint: disable=abstract-method
class AdminHandler(BaseHandler):
- '''Handle /admin'''
+ """Handle /admin"""
# --- GET
@tornado.web.authenticated
@admin_only
async def get(self):
- '''
+ """
Admin page.
- '''
- cmd = self.get_query_argument('cmd', default=None)
- logger.debug('GET /admin (cmd=%s)', cmd)
+ """
+ cmd = self.get_query_argument("cmd", default=None)
+ logger.debug("GET /admin (cmd=%s)", cmd)
if cmd is None:
- self.render('admin.html')
- elif cmd == 'test':
- data = { 'data': self.testapp.get_test_config() }
+ self.render("admin.html")
+ elif cmd == "test":
+ data = {"data": self.testapp.get_test_config()}
self.write(json.dumps(data, default=str))
- elif cmd == 'students_table':
- data = {'data': self.testapp.get_students_state()}
+ elif cmd == "students_table":
+ data = {"data": self.testapp.get_students_state()}
self.write(json.dumps(data, default=str))
# --- POST
@tornado.web.authenticated
@admin_only
async def post(self):
- '''
+ """
Executes commands from the admin page.
- '''
- cmd = self.get_body_argument('cmd', None)
- value = self.get_body_argument('value', None)
- logger.debug('POST /admin (cmd=%s, value=%s)', cmd, value)
+ """
+ cmd = self.get_body_argument("cmd", None)
+ value = self.get_body_argument("value", None)
+ logger.debug("POST /admin (cmd=%s, value=%s)", cmd, value)
- if cmd == 'allow':
+ if cmd == "allow":
self.testapp.allow_student(value)
- elif cmd == 'deny':
+ elif cmd == "deny":
self.testapp.deny_student(value)
- elif cmd == 'allow_all':
+ elif cmd == "allow_all":
self.testapp.allow_all_students()
- elif cmd == 'deny_all':
+ elif cmd == "deny_all":
self.testapp.deny_all_students()
- elif cmd == 'reset_password':
- await self.testapp.set_password(uid=value, password='')
- elif cmd == 'insert_student' and value is not None:
+ elif cmd == "reset_password":
+ await self.testapp.set_password(uid=value, password="")
+ elif cmd == "insert_student" and value is not None:
student = json.loads(value)
- await self.testapp.insert_new_student(uid=student['number'],
- name=student['name'])
+ await self.testapp.insert_new_student(
+ uid=student["number"], name=student["name"]
+ )
# ----------------------------------------------------------------------------
@@ -350,22 +365,22 @@ class AdminHandler(BaseHandler):
# ----------------------------------------------------------------------------
# pylint: disable=abstract-method
class FileHandler(BaseHandler):
- '''
+ """
Handles static files from questions like images, etc.
- '''
+ """
_filecache: Dict[Tuple[str, str], bytes] = {}
@tornado.web.authenticated
async def get(self):
- '''
+ """
Returns requested file. Files are obtained from the 'public' directory
of each question.
- '''
+ """
uid = self.current_user
- ref = self.get_query_argument('ref', None)
- image = self.get_query_argument('image', None)
- logger.debug('GET /file (ref=%s, image=%s)', ref, image)
+ ref = self.get_query_argument("ref", None)
+ image = self.get_query_argument("image", None)
+ logger.debug("GET /file (ref=%s, image=%s)", ref, image)
if ref is None or image is None:
return
@@ -373,7 +388,7 @@ class FileHandler(BaseHandler):
content_type = mimetypes.guess_type(image)[0]
if (ref, image) in self._filecache:
- logger.debug('using cached file')
+ logger.debug("using cached file")
self.write(self._filecache[(ref, image)])
if content_type is not None:
self.set_header("Content-Type", content_type)
@@ -383,16 +398,16 @@ class FileHandler(BaseHandler):
try:
test = self.testapp.get_test(uid)
except KeyError:
- logger.warning('Could not get test to serve image file')
+ logger.warning("Could not get test to serve image file")
raise tornado.web.HTTPError(404) from None # Not Found
# search for the question that contains the image
- for question in test['questions']:
- if question['ref'] == ref:
- filepath = path.join(question['path'], 'public', image)
+ for question in test["questions"]:
+ if question["ref"] == ref:
+ filepath = path.join(question["path"], "public", image)
try:
- with open(filepath, 'rb') as file:
+ with open(filepath, "rb") as file:
data = file.read()
except OSError:
logger.error('Error reading file "%s"', filepath)
@@ -408,39 +423,39 @@ class FileHandler(BaseHandler):
# --- REVIEW -----------------------------------------------------------------
# pylint: disable=abstract-method
class ReviewHandler(BaseHandler):
- '''
+ """
Show test for review
- '''
+ """
_templates = {
- 'radio': 'review-question-radio.html',
- 'checkbox': 'review-question-checkbox.html',
- 'text': 'review-question-text.html',
- 'text-regex': 'review-question-text.html',
- 'numeric-interval': 'review-question-text.html',
- 'textarea': 'review-question-text.html',
+ "radio": "review-question-radio.html",
+ "checkbox": "review-question-checkbox.html",
+ "text": "review-question-text.html",
+ "text-regex": "review-question-text.html",
+ "numeric-interval": "review-question-text.html",
+ "textarea": "review-question-text.html",
# -- information panels --
- 'information': 'review-question-information.html',
- 'success': 'review-question-information.html',
- 'warning': 'review-question-information.html',
- 'alert': 'review-question-information.html',
+ "information": "review-question-information.html",
+ "success": "review-question-information.html",
+ "warning": "review-question-information.html",
+ "alert": "review-question-information.html",
}
@tornado.web.authenticated
@admin_only
async def get(self):
- '''
+ """
Opens JSON file with a given corrected test and renders it
- '''
- test_id = self.get_query_argument('test_id', None)
- logger.info('Review test %s.', test_id)
+ """
+ test_id = self.get_query_argument("test_id", None)
+ logger.info("Review test %s.", test_id)
fname = self.testapp.get_json_filename_of_test(test_id)
if fname is None:
raise tornado.web.HTTPError(404) # Not Found
try:
- with open(path.expanduser(fname), encoding='utf-8') as jsonfile:
+ with open(path.expanduser(fname), encoding="utf-8") as jsonfile:
test = json.load(jsonfile)
except OSError:
msg = f'Cannot open "{fname}" for review.'
@@ -451,57 +466,65 @@ class ReviewHandler(BaseHandler):
logger.error(msg)
raise tornado.web.HTTPError(status_code=404, reason=msg)
- uid = test['student']
+ uid = test["student"]
name = self.testapp.get_name(uid)
- self.render('review.html', t=test, uid=uid, name=name, md=md_to_html,
- templ=self._templates, debug=self.testapp.debug)
+ self.render(
+ "review.html",
+ t=test,
+ uid=uid,
+ name=name,
+ md=md_to_html,
+ templ=self._templates,
+ debug=self.testapp.debug,
+ )
# ----------------------------------------------------------------------------
def signal_handler(*_):
- '''
+ """
Catches Ctrl-C and stops webserver
- '''
- reply = input(' --> Stop webserver? (yes/no) ')
- if reply.lower() == 'yes':
+ """
+ reply = input(" --> Stop webserver? (yes/no) ")
+ if reply.lower() == "yes":
tornado.ioloop.IOLoop.current().stop()
- logger.critical('Webserver stopped.')
+ logger.critical("Webserver stopped.")
sys.exit(0)
+
# ----------------------------------------------------------------------------
def run_webserver(app, ssl_opt, port, debug):
- '''
+ """
Starts and runs webserver until a SIGINT signal (Ctrl-C) is received.
- '''
+ """
# --- create web application
- logger.info('-------- Starting WebApplication (tornado) --------')
+ logger.info("-------- Starting WebApplication (tornado) --------")
try:
webapp = WebApplication(app, debug=debug)
except Exception:
- logger.critical('Failed to start web application.')
+ logger.critical("Failed to start web application.")
raise
# --- create httpserver
try:
httpserver = tornado.httpserver.HTTPServer(webapp, ssl_options=ssl_opt)
except ValueError:
- logger.critical('Certificates cert.pem, privkey.pem not found')
+ logger.critical("Certificates cert.pem, privkey.pem not found")
sys.exit(1)
try:
httpserver.listen(port)
except OSError:
- logger.critical('Cannot bind port %d. Already in use?', port)
+ logger.critical("Cannot bind port %d. Already in use?", port)
sys.exit(1)
- logger.info('Listening on port %d... (Ctrl-C to stop)', port)
+ logger.info("Listening on port %d... (Ctrl-C to stop)", port)
signal.signal(signal.SIGINT, signal_handler)
# --- run webserver
try:
tornado.ioloop.IOLoop.current().start() # running...
except Exception:
- logger.critical('Webserver stopped!')
+ logger.critical("Webserver stopped!")
tornado.ioloop.IOLoop.current().stop()
raise
diff --git a/perguntations/templates/question-textarea.html b/perguntations/templates/question-textarea.html
index d6afa28..288c6d9 100644
--- a/perguntations/templates/question-textarea.html
+++ b/perguntations/templates/question-textarea.html
@@ -4,4 +4,6 @@
+
+
{% end %}
diff --git a/perguntations/test.py b/perguntations/test.py
index 742c7a3..f559780 100644
--- a/perguntations/test.py
+++ b/perguntations/test.py
@@ -1,6 +1,6 @@
-'''
+"""
Test - instances of this class are individual tests
-'''
+"""
# python standard library
from datetime import datetime
@@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
# ============================================================================
class Test(dict):
- '''
+ """
Each instance Test() is a concrete test of a single student.
A test can be in one of the states: ACTIVE, SUBMITTED, CORRECTED, QUIT
Methods:
@@ -26,88 +26,90 @@ class Test(dict):
t.correct() - corrects questions and compute grade, register state
t.giveup() - register the test as given up, answers are not corrected
t.save_json(filename) - save the current test to file in JSON format
- '''
+ """
# ------------------------------------------------------------------------
def __init__(self, d: dict):
super().__init__(d)
- self['grade'] = nan
- self['comment'] = ''
+ self["grade"] = nan
+ self["comment"] = ""
# ------------------------------------------------------------------------
def start(self, uid: str) -> None:
- '''
+ """
Register student id and start time in the test
- '''
- self['student'] = uid
- self['start_time'] = datetime.now()
- self['finish_time'] = None
- self['state'] = 'ACTIVE'
+ """
+ self["student"] = uid
+ self["start_time"] = datetime.now()
+ self["finish_time"] = None
+ self["state"] = "ACTIVE"
# ------------------------------------------------------------------------
def reset_answers(self) -> None:
- '''Removes all answers from the test (clean)'''
- for question in self['questions']:
- question['answer'] = None
+ """Removes all answers from the test (clean)"""
+ for question in self["questions"]:
+ question["answer"] = None
# ------------------------------------------------------------------------
def update_answer(self, ref: str, ans) -> None:
- '''updates one answer in the test'''
- self['questions'][ref].set_answer(ans)
+ """updates one answer in the test"""
+ self["questions"][ref].set_answer(ans)
# ------------------------------------------------------------------------
def submit(self, answers: dict) -> None:
- '''
+ """
Given a dictionary ans={'ref': 'some answer'} updates the answers of
multiple questions in the test.
Only affects the questions referred in the dictionary.
- '''
- self['finish_time'] = datetime.now()
+ """
+ self["finish_time"] = datetime.now()
for ref, ans in answers.items():
- self['questions'][ref].set_answer(ans)
- self['state'] = 'SUBMITTED'
+ self["questions"][ref].set_answer(ans)
+ self["state"] = "SUBMITTED"
# ------------------------------------------------------------------------
async def correct_async(self) -> None:
- '''Corrects all the answers of the test and computes the final grade'''
+ """Corrects all the answers of the test and computes the final grade"""
grade = 0.0
- for question in self['questions']:
+ for question in self["questions"]:
await question.correct_async()
- grade += question['grade'] * question['points']
- logger.debug('Correcting %30s: %3g%%',
- question['ref'], question['grade']*100)
+ grade += question["grade"] * question["points"]
+ logger.debug(
+ "Correcting %30s: %3g%%", question["ref"], question["grade"] * 100
+ )
# truncate to avoid negative final grade and adjust scale
- self['grade'] = max(0.0, grade) + self['scale'][0]
- self['state'] = 'CORRECTED'
+ self["grade"] = max(0.0, grade) + self["scale"][0]
+ self["state"] = "CORRECTED"
# ------------------------------------------------------------------------
def correct(self) -> None:
- '''Corrects all the answers of the test and computes the final grade'''
+ """Corrects all the answers of the test and computes the final grade"""
grade = 0.0
- for question in self['questions']:
+ for question in self["questions"]:
question.correct()
- grade += question['grade'] * question['points']
- logger.debug('Correcting %30s: %3g%%',
- question['ref'], question['grade']*100)
+ grade += question["grade"] * question["points"]
+ logger.debug(
+ "Correcting %30s: %3g%%", question["ref"], question["grade"] * 100
+ )
# truncate to avoid negative final grade and adjust scale
- self['grade'] = max(0.0, grade) + self['scale'][0]
- self['state'] = 'CORRECTED'
+ self["grade"] = max(0.0, grade) + self["scale"][0]
+ self["state"] = "CORRECTED"
# ------------------------------------------------------------------------
def giveup(self) -> None:
- '''Test is marqued as QUIT and is not corrected'''
- self['finish_time'] = datetime.now()
- self['state'] = 'QUIT'
- self['grade'] = 0.0
+ """Test is marqued as QUIT and is not corrected"""
+ self["finish_time"] = datetime.now()
+ self["state"] = "QUIT"
+ self["grade"] = 0.0
# ------------------------------------------------------------------------
def save_json(self, filename: str) -> None:
- '''save test in JSON format'''
- with open(filename, 'w', encoding='utf-8') as file:
+ """save test in JSON format"""
+ with open(filename, "w", encoding="utf-8") as file:
json.dump(self, file, indent=2, default=str) # str for datetime
# ------------------------------------------------------------------------
def __str__(self) -> str:
- return '\n'.join([f'{k}: {v}' for k,v in self.items()])
+ return "\n".join([f"{k}: {v}" for k, v in self.items()])
diff --git a/perguntations/testfactory.py b/perguntations/testfactory.py
index f91f959..654127c 100644
--- a/perguntations/testfactory.py
+++ b/perguntations/testfactory.py
@@ -1,6 +1,6 @@
-'''
+"""
TestFactory - generates tests for students
-'''
+"""
# python standard library
import asyncio
@@ -9,7 +9,8 @@ import random
import logging
# other libraries
-import schema
+# import schema
+from schema import And, Or, Optional, Regex, Schema, Use
# this project
from .questions import QFactory, QuestionException, QDict
@@ -21,17 +22,18 @@ logger = logging.getLogger(__name__)
# --- test validation --------------------------------------------------------
def check_answers_directory(ans: str) -> bool:
- '''Checks is answers_dir exists and is writable'''
- testfile = path.join(path.expanduser(ans), 'REMOVE-ME')
+ """Checks is answers_dir exists and is writable"""
+ testfile = path.join(path.expanduser(ans), "REMOVE-ME")
try:
- with open(testfile, 'w', encoding='utf-8') as file:
- file.write('You can safely remove this file.')
+ with open(testfile, "w", encoding="utf-8") as file:
+ file.write("You can safely remove this file.")
except OSError:
return False
return True
+
def check_import_files(files: list) -> bool:
- '''Checks if the question files exist'''
+ """Checks if the question files exist"""
if not files:
return False
for file in files:
@@ -39,117 +41,132 @@ def check_import_files(files: list) -> bool:
return False
return True
-def normalize_question_list(questions: list) -> None:
- '''convert question ref from string to list of string'''
- for question in questions:
- if isinstance(question['ref'], str):
- question['ref'] = [question['ref']]
-
-test_schema = schema.Schema({
- 'ref': schema.Regex('^[a-zA-Z0-9_-]+$'),
- 'database': schema.And(str, path.isfile),
- 'answers_dir': schema.And(str, check_answers_directory),
- 'title': str,
- schema.Optional('duration'): int,
- schema.Optional('autosubmit'): bool,
- schema.Optional('autocorrect'): bool,
- schema.Optional('show_points'): bool,
- schema.Optional('scale'): schema.And([schema.Use(float)],
- lambda s: len(s) == 2),
- 'files': schema.And([str], check_import_files),
- 'questions': [{
- 'ref': schema.Or(str, [str]),
- schema.Optional('points'): float
- }]
- }, ignore_extra_keys=True)
+
+test_schema = Schema(
+ {
+ "ref": Regex("^[a-zA-Z0-9_-]+$"),
+ "database": And(str, path.isfile),
+ "answers_dir": Use(check_answers_directory),
+ "title": str,
+ Optional("duration"): int,
+ Optional("autosubmit"): bool,
+ Optional("autocorrect"): bool,
+ Optional("show_points"): bool,
+ Optional("scale"): And([Use(float)], lambda s: len(s) == 2),
+ "files": And([str], check_import_files),
+ "questions": [{
+ "ref": Or(str, [str]),
+ Optional("points"): float
+ }],
+ },
+ ignore_extra_keys=True,
+)
+# FIXME: schema error with 'testfile' which is added in the code
# ============================================================================
+def normalize_question_list(questions: list) -> None: # FIXME: move inside the class?
+ """convert question ref from string to list of string"""
+ for question in questions:
+ if isinstance(question["ref"], str):
+ question["ref"] = [question["ref"]]
+
+
class TestFactoryException(Exception):
- '''exception raised in this module'''
+ """exception raised in this module"""
# ============================================================================
class TestFactory(dict):
- '''
+ """
Each instance of TestFactory() is a test generator.
For example, if we want to serve two different tests, then we need two
instances of TestFactory(), one for each test.
- '''
+ """
# ------------------------------------------------------------------------
def __init__(self, conf) -> None:
- '''
+ """
Loads configuration from yaml file, then overrides some configurations
using the conf argument.
Base questions are added to a pool of questions factories.
- '''
+ """
test_schema.validate(conf)
# --- set test defaults and then use given configuration
- super().__init__({ # defaults
- 'show_points': True,
- 'scale': None,
- 'duration': 0, # 0=infinite
- 'autosubmit': False,
- 'autocorrect': True,
- })
+ super().__init__(
+ { # defaults
+ "show_points": True,
+ "scale": None,
+ "duration": 0, # 0=infinite
+ "autosubmit": False,
+ "autocorrect": True,
+ }
+ )
self.update(conf)
- normalize_question_list(self['questions'])
+ normalize_question_list(self["questions"])
# --- for review, we are done. no factories needed
+<<<<<<< HEAD
# if self['review']: FIXME: make it work!
+=======
+ # if self['review']: FIXME:
+>>>>>>> dev
# logger.info('Review mode. No questions loaded. No factories.')
# return
# --- find refs of all questions used in the test
- qrefs = {r for qq in self['questions'] for r in qq['ref']}
- logger.info('Declared %d questions (each test uses %d).',
- len(qrefs), len(self["questions"]))
+ qrefs = {r for qq in self["questions"] for r in qq["ref"]}
+ logger.info(
+ "Declared %d questions (each test uses %d).",
+ len(qrefs),
+ len(self["questions"]),
+ )
# --- load and build question factories
- self['question_factory'] = {}
+ self["question_factory"] = {}
for file in self["files"]:
fullpath = path.normpath(file)
logger.info('Loading "%s"...', fullpath)
- questions = load_yaml(fullpath) # , default=[])
+ questions = load_yaml(fullpath) # , default=[])
for i, question in enumerate(questions):
# make sure every question in the file is a dictionary
if not isinstance(question, dict):
- msg = f'Question {i} in {file} is not a dictionary'
+ msg = f"Question {i} in {file} is not a dictionary"
raise TestFactoryException(msg)
# check if ref is missing, then set to '//file.yaml:3'
- if 'ref' not in question:
- question['ref'] = f'{file}:{i:04}'
+ if "ref" not in question:
+ question["ref"] = f"{file}:{i:04}"
logger.warning('Missing ref set to "%s"', question["ref"])
# check for duplicate refs
- qref = question['ref']
- if qref in self['question_factory']:
- other = self['question_factory'][qref]
- otherfile = path.join(other.question['path'],
- other.question['filename'])
+ qref = question["ref"]
+ if qref in self["question_factory"]:
+ other = self["question_factory"][qref]
+ otherfile = path.join(
+ other.question["path"], other.question["filename"]
+ )
msg = f'Duplicate "{qref}" in {otherfile} and {fullpath}'
raise TestFactoryException(msg)
# make factory only for the questions used in the test
if qref in qrefs:
- question.update(zip(('path', 'filename', 'index'),
- path.split(fullpath) + (i,)))
- self['question_factory'][qref] = QFactory(QDict(question))
+ question.update(
+ zip(("path", "filename", "index"), path.split(fullpath) + (i,))
+ )
+ self["question_factory"][qref] = QFactory(QDict(question))
- qmissing = qrefs.difference(set(self['question_factory'].keys()))
+ qmissing = qrefs.difference(set(self["question_factory"].keys()))
if qmissing:
- raise TestFactoryException(f'Could not find questions {qmissing}.')
-
- self.check_questions()
+ raise TestFactoryException(f"Could not find questions {qmissing}.")
- logger.info('Test factory ready. No errors found.')
+ asyncio.run(self.check_questions())
+ logger.info("Test factory ready. No errors found.")
# ------------------------------------------------------------------------
# def check_test_ref(self) -> None:
@@ -201,8 +218,8 @@ class TestFactory(dict):
# 'question files to import!')
# raise TestFactoryException(msg)
- # if isinstance(self['files'], str):
- # self['files'] = [self['files']]
+ # if isinstance(self['files'], str):
+ # self['files'] = [self['files']]
# def check_question_list(self) -> None:
# '''normalize question list'''
@@ -234,124 +251,138 @@ class TestFactory(dict):
# logger.warning(msg)
# self['scale'] = [self['scale_min'], self['scale_max']]
-
# ------------------------------------------------------------------------
# def sanity_checks(self) -> None:
# '''
# Checks for valid keys and sets default values.
# Also checks if some files and directories exist
# '''
- # self.check_test_ref()
- # self.check_missing_database()
- # self.check_missing_answers_directory()
- # self.check_answers_directory_writable()
- # self.check_questions_directory()
- # self.check_import_files()
- # self.check_question_list()
- # self.check_missing_title()
- # self.check_grade_scaling()
+ # self.check_test_ref()
+ # self.check_missing_database()
+ # self.check_missing_answers_directory()
+ # self.check_answers_directory_writable()
+ # self.check_questions_directory()
+ # self.check_import_files()
+ # self.check_question_list()
+ # self.check_missing_title()
+ # self.check_grade_scaling()
# ------------------------------------------------------------------------
- def check_questions(self) -> None:
- '''
+ async def check_questions(self) -> None:
+ """
checks if questions can be correctly generated and corrected
- '''
- logger.info('Checking questions...')
- # FIXME: get_event_loop will be deprecated in python3.10
- loop = asyncio.get_event_loop()
- for i, (qref, qfact) in enumerate(self['question_factory'].items()):
+ """
+ logger.info("Checking questions...")
+ for i, (qref, qfact) in enumerate(self["question_factory"].items()):
try:
- question = loop.run_until_complete(qfact.gen_async())
+ question = await qfact.gen_async()
except Exception as exc:
msg = f'Failed to generate "{qref}"'
raise TestFactoryException(msg) from exc
else:
- logger.info('%4d. %s: Ok', i, qref)
+ logger.info("%4d. %s: Ok", i, qref)
- if question['type'] == 'textarea':
+ if question["type"] == "textarea":
_runtests_textarea(qref, question)
# ------------------------------------------------------------------------
async def generate(self):
- '''
+ """
Given a dictionary with a student dict {'name':'john', 'number': 123}
returns instance of Test() for that particular student
- '''
+ """
# make list of questions
questions = []
qnum = 1 # track question number
nerr = 0 # count errors during questions generation
- for qlist in self['questions']:
+ for qlist in self["questions"]:
# choose list of question variants
- choose = qlist.get('choose', 1)
- qrefs = random.sample(qlist['ref'], k=choose)
+ choose = qlist.get("choose", 1)
+ qrefs = random.sample(qlist["ref"], k=choose)
for qref in qrefs:
# generate instance of question
try:
- question = await self['question_factory'][qref].gen_async()
+ question = await self["question_factory"][qref].gen_async()
except QuestionException:
logger.error('Can\'t generate question "%s". Skipping.', qref)
nerr += 1
continue
# some defaults
- if question['type'] in ('information', 'success', 'warning',
- 'alert'):
- question['points'] = qlist.get('points', 0.0)
+ if question["type"] in ("information", "success", "warning", "alert"):
+ question["points"] = qlist.get("points", 0.0)
else:
- question['points'] = qlist.get('points', 1.0)
- question['number'] = qnum # counter for non informative panels
+ question["points"] = qlist.get("points", 1.0)
+ question["number"] = qnum # counter for non informative panels
qnum += 1
questions.append(question)
# setup scale
- total_points = sum(q['points'] for q in questions)
+ total_points = sum(q["points"] for q in questions)
if total_points > 0:
# normalize question points to scale
- if self['scale'] is not None:
- scale_min, scale_max = self['scale']
+ if self["scale"] is not None:
+ scale_min, scale_max = self["scale"]
+ factor = (scale_max - scale_min) / total_points
for question in questions:
- question['points'] *= (scale_max - scale_min) / total_points
+ question["points"] *= factor
+ logger.debug(
+ "Points normalized from %g to [%g, %g]",
+ total_points,
+ scale_min,
+ scale_max,
+ )
else:
- self['scale'] = [0, total_points]
+ self["scale"] = [0, total_points]
else:
- logger.warning('Total points is **ZERO**.')
- if self['scale'] is None:
- self['scale'] = [0, 20] # default
+ logger.warning("Total points is **ZERO**.")
+ if self["scale"] is None:
+ self["scale"] = [0, 20] # default
if nerr > 0:
- logger.error('%s errors found!', nerr)
+ logger.error("%s errors found!", nerr)
# copy these from the test configuratoin to each test instance
- inherit = ['ref', 'title', 'database', 'answers_dir', 'files', 'scale',
- 'duration', 'autosubmit', 'autocorrect', 'show_points']
-
- return Test({'questions': questions, **{k:self[k] for k in inherit}})
+ inherit = [
+ "ref",
+ "title",
+ "database",
+ "answers_dir",
+ "files",
+ "scale",
+ "duration",
+ "autosubmit",
+ "autocorrect",
+ "show_points",
+ ]
+
+ return Test({"questions": questions, **{k: self[k] for k in inherit}})
# ------------------------------------------------------------------------
def __repr__(self):
- testsettings = '\n'.join(f' {k:14s}: {v}' for k, v in self.items())
- return 'TestFactory({\n' + testsettings + '\n})'
+ testsettings = "\n".join(f" {k:14s}: {v}" for k, v in self.items())
+ return "TestFactory({\n" + testsettings + "\n})"
+
# ============================================================================
def _runtests_textarea(qref, question):
- '''
+ """
Checks if correction script works and runs tests if available
- '''
+ """
try:
- question.set_answer('')
+ question.set_answer("")
question.correct()
except Exception as exc:
msg = f'Failed to correct "{qref}"'
raise TestFactoryException(msg) from exc
- logger.info(' correction works')
+ logger.info(" correction works")
- for tnum, right_answer in enumerate(question.get('tests_right', {})):
+ for tnum, right_answer in enumerate(question.get("tests_right", {})):
try:
question.set_answer(right_answer)
question.correct()
@@ -359,12 +390,12 @@ def _runtests_textarea(qref, question):
msg = f'Failed to correct "{qref}"'
raise TestFactoryException(msg) from exc
- if question['grade'] == 1.0:
- logger.info(' tests_right[%i] Ok', tnum)
+ if question["grade"] == 1.0:
+ logger.info(" tests_right[%i] Ok", tnum)
else:
- logger.error(' tests_right[%i] FAILED!!!', tnum)
+ logger.error(" tests_right[%i] FAILED!!!", tnum)
- for tnum, wrong_answer in enumerate(question.get('tests_wrong', {})):
+ for tnum, wrong_answer in enumerate(question.get("tests_wrong", {})):
try:
question.set_answer(wrong_answer)
question.correct()
@@ -372,7 +403,7 @@ def _runtests_textarea(qref, question):
msg = f'Failed to correct "{qref}"'
raise TestFactoryException(msg) from exc
- if question['grade'] < 1.0:
- logger.info(' tests_wrong[%i] Ok', tnum)
+ if question["grade"] < 1.0:
+ logger.info(" tests_wrong[%i] Ok", tnum)
else:
- logger.error(' tests_wrong[%i] FAILED!!!', tnum)
+ logger.error(" tests_wrong[%i] FAILED!!!", tnum)
diff --git a/perguntations/tools.py b/perguntations/tools.py
index cda529d..ff427fe 100644
--- a/perguntations/tools.py
+++ b/perguntations/tools.py
@@ -1,7 +1,7 @@
-'''
+"""
File: perguntations/tools.py
Description: Helper functions to load yaml files and run external programs.
-'''
+"""
# python standard library
@@ -21,20 +21,18 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
def load_yaml(filename: str) -> Any:
- '''load yaml file or raise exception on error'''
- with open(path.expanduser(filename), 'r', encoding='utf-8') as file:
+ """load yaml file or raise exception on error"""
+ with open(path.expanduser(filename), "r", encoding="utf-8") as file:
return yaml.safe_load(file)
+
# ---------------------------------------------------------------------------
-def run_script(script: str,
- args: List[str],
- stdin: str = '',
- timeout: int = 3) -> Any:
- '''
+def run_script(script: str, args: List[str], stdin: str = "", timeout: int = 3) -> Any:
+ """
Runs a script and returns its stdout parsed as yaml, or None on error.
The script is run in another process but this function blocks waiting
for its termination.
- '''
+ """
logger.debug('run_script "%s"', script)
output = None
@@ -43,14 +41,15 @@ def run_script(script: str,
# --- run process
try:
- proc = subprocess.run(cmd,
- input=stdin,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- universal_newlines=True,
- timeout=timeout,
- check=True,
- )
+ proc = subprocess.run(
+ cmd,
+ input=stdin,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True,
+ timeout=timeout,
+ check=True,
+ )
except subprocess.TimeoutExpired:
logger.error('Timeout %ds exceeded running "%s".', timeout, script)
return output
@@ -71,11 +70,10 @@ def run_script(script: str,
# ----------------------------------------------------------------------------
-async def run_script_async(script: str,
- args: List[str],
- stdin: str = '',
- timeout: int = 3) -> Any:
- '''Same as above, but asynchronous'''
+async def run_script_async(
+ script: str, args: List[str], stdin: str = "", timeout: int = 3
+) -> Any:
+ """Same as above, but asynchronous"""
script = path.expanduser(script)
args = [str(a) for a in args]
@@ -84,11 +82,12 @@ async def run_script_async(script: str,
# --- start process
try:
proc = await asyncio.create_subprocess_exec(
- script, *args,
+ script,
+ *args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
- )
+ )
except OSError:
logger.error('Can not execute script "%s".', script)
return output
@@ -96,8 +95,8 @@ async def run_script_async(script: str,
# --- send input and wait for termination
try:
stdout, _ = await asyncio.wait_for(
- proc.communicate(input=stdin.encode('utf-8')),
- timeout=timeout)
+ proc.communicate(input=stdin.encode("utf-8")), timeout=timeout
+ )
except asyncio.TimeoutError:
logger.warning('Timeout %ds running script "%s".', timeout, script)
return output
@@ -109,7 +108,7 @@ async def run_script_async(script: str,
# --- parse yaml
try:
- output = yaml.safe_load(stdout.decode('utf-8', 'ignore'))
+ output = yaml.safe_load(stdout.decode("utf-8", "ignore"))
except yaml.YAMLError:
logger.error('Error parsing yaml output of "%s"', script)
diff --git a/setup.py b/setup.py
index 6dbf6c5..b6070e4 100644
--- a/setup.py
+++ b/setup.py
@@ -24,13 +24,13 @@ setup(
include_package_data=True, # install files from MANIFEST.in
python_requires='>=3.9',
install_requires=[
- 'bcrypt>=3.1',
+ 'argon2-cffi>=23.1',
'mistune<2.0',
'pyyaml>=5.1',
'pygments',
'schema>=0.7.5',
- 'sqlalchemy>=1.4',
- 'tornado>=6.1',
+ 'sqlalchemy>=2.0',
+ 'tornado>=6.4',
],
entry_points={
'console_scripts': [
--
libgit2 0.21.2