initdb.py 6.45 KB
#!/usr/bin/env python3

'''
Commandline utilizty to initialize and update student database
'''

# base
import csv
import argparse
import re
from string import capwords
from concurrent.futures import ThreadPoolExecutor

# installed packages
import bcrypt
import sqlalchemy as sa

# this project
from perguntations.models import Base, Student


# ===========================================================================
def parse_commandline_arguments():
    '''Parse command line options'''
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
        description='Insert new users into a database. Users can be imported '
                    'from CSV files in the SIIUE format or defined in the '
                    'command line. If the database does not exist, a new one '
                    'is created.')

    parser.add_argument('csvfile',
                        nargs='*',
                        type=str,
                        default='',
                        help='CSV file to import (SIIUE)')

    parser.add_argument('--db',
                        default='students.db',
                        type=str,
                        help='database file')

    parser.add_argument('-A', '--admin',
                        action='store_true',
                        help='insert the admin user')

    parser.add_argument('-a', '--add',
                        nargs=2,
                        action='append',
                        metavar=('uid', 'name'),
                        help='add new user')

    parser.add_argument('-u', '--update',
                        nargs='+',
                        metavar='uid',
                        default=[],
                        help='users to update')

    parser.add_argument('--pw',
                        default=None,
                        type=str,
                        help='set password for new and updated users')

    parser.add_argument('-V', '--verbose',
                        action='store_true',
                        help='show all students in database')

    return parser.parse_args()


# ===========================================================================
def get_students_from_csv(filename):
    '''
    SIIUE names have alien strings like "(TE)" and are sometimes capitalized
    We remove them so that students dont keep asking what it means
    '''
    csv_settings = {
        'delimiter': ';',
        'quotechar': '"',
        'skipinitialspace': True,
        }

    try:
        with open(filename, encoding='iso-8859-1') as file:
            csvreader = csv.DictReader(file, **csv_settings)
            students = [{
                'uid': s['N.º'],
                'name': capwords(re.sub(r'\(.*\)', '', s['Nome']).strip())
                } for s in csvreader]
    except OSError:
        print(f'!!! Error reading file "{filename}" !!!')
        students = []
    except csv.Error:
        print(f'!!! Error parsing CSV from "{filename}" !!!')
        students = []

    return students


# ===========================================================================
def hashpw(student, password=None):
    '''replace password by hash for a single student'''
    print('.', end='', flush=True)
    if password is None:
        student['pw'] = ''
    else:
        student['pw'] = bcrypt.hashpw(password.encode('utf-8'),
                                      bcrypt.gensalt())


# ===========================================================================
def insert_students_into_db(session, students):
    '''insert list of students into the database'''
    try:
        session.add_all([Student(id=s['uid'], name=s['name'], password=s['pw'])
                         for s in students])
        session.commit()

    except sa.exc.IntegrityError:
        print('!!! Integrity error. Users already in database. Aborted !!!\n')
        session.rollback()


# ============================================================================
def show_students_in_database(session, verbose=False):
    '''get students from database'''
    users = session.query(Student).all()

    total_users = len(users)
    print('Registered users:')
    if total_users == 0:
        print('  -- none --')
    else:
        users.sort(key=lambda u: f'{u.id:>12}')  # sort by number
        if verbose:
            for user in users:
                print(f'{user.id:>12}   {user.name}')
        else:
            print(f'{users[0].id:>12}   {users[0].name}')
            if total_users > 1:
                print(f'{users[1].id:>12}   {users[1].name}')
            if total_users > 3:
                print('           |   |')
            if total_users > 2:
                print(f'{users[-1].id:>12}   {users[-1].name}')
    print(f'Total: {total_users}.')


# ============================================================================
def main():
    '''insert, update, show students from database'''

    args = parse_commandline_arguments()

    # --- make list of students to insert/update
    students = []

    for csvfile in args.csvfile:
        print('Adding users from:', csvfile)
        students.extend(get_students_from_csv(csvfile))

    if args.admin:
        print('Adding user: 0, Admin.')
        students.append({'uid': '0', 'name': 'Admin'})

    if args.add:
        for uid, name in args.add:
            print(f'Adding user: {uid}, {name}.')
            students.append({'uid': uid, 'name': name})

    # --- password hashing
    if students:
        print('Generating password hashes', end='')
        with ThreadPoolExecutor() as executor:  # hashing in parallel
            executor.map(lambda s: hashpw(s, args.pw), students)
        print()

    # --- database stuff
    print(f'Using database: {args.db}')
    engine = sa.create_engine(f'sqlite:///{args.db}', echo=False)
    Base.metadata.create_all(engine)  # Criates schema if needed
    SessionMaker = sa.orm.sessionmaker(bind=engine)
    session = SessionMaker()

    if students:
        print(f'Inserting {len(students)}')
        insert_students_into_db(session, students)

    for student_id in args.update:
        print(f'Updating password of: {student_id}')
        student = session.query(Student).get(student_id)
        password = (args.pw or student_id).encode('utf-8')
        student.password = bcrypt.hashpw(password, bcrypt.gensalt())
        session.commit()

    show_students_in_database(session, args.verbose)

    session.close()


# ============================================================================
if __name__ == '__main__':
    main()