tools.py 8.49 KB

# python standard library
import asyncio
import logging
from os import path
import re
import subprocess
from typing import Any, List

# third party libraries
import mistune
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import HtmlFormatter
import yaml


# setup logger for this module
logger = logging.getLogger(__name__)


# -------------------------------------------------------------------------
# Markdown to HTML renderer with support for LaTeX equations
# -------------------------------------------------------------------------


# -------------------------------------------------------------------------
# Block math:
#   $$x$$ or \begin{equation}x\end{equation}
# -------------------------------------------------------------------------
class MathBlockGrammar(mistune.BlockGrammar):
    block_math = re.compile(r'^\$\$(.*?)\$\$', re.DOTALL)
    latex_environment = re.compile(r'^\\begin\{([a-z]*\*?)\}(.*?)\\end\{\1\}',
                                   re.DOTALL)


class MathBlockLexer(mistune.BlockLexer):
    default_rules = ['block_math', 'latex_environment'] \
                    + mistune.BlockLexer.default_rules

    def __init__(self, rules=None, **kwargs):
        if rules is None:
            rules = MathBlockGrammar()
        super().__init__(rules, **kwargs)

    def parse_block_math(self, m):
        '''Parse a $$math$$ block'''
        self.tokens.append({
            'type': 'block_math',
            'text': m.group(1)
        })

    def parse_latex_environment(self, m):
        r'''Parse an environment \begin{name}text\end{name}'''
        self.tokens.append({
            'type': 'latex_environment',
            'name': m.group(1),
            'text': m.group(2)
        })


# -------------------------------------------------------------------------
# Inline math: $x$
# -------------------------------------------------------------------------
class MathInlineGrammar(mistune.InlineGrammar):
    math = re.compile(r'^\$(.+?)\$', re.DOTALL)
    block_math = re.compile(r'^\$\$(.+?)\$\$', re.DOTALL)
    text = re.compile(r'^[\s\S]+?(?=[\\<!\[_*`~$]|https?://| {2,}\n|$)')


class MathInlineLexer(mistune.InlineLexer):
    default_rules = ['block_math', 'math'] + mistune.InlineLexer.default_rules

    def __init__(self, renderer, rules=None, **kwargs):
        if rules is None:
            rules = MathInlineGrammar()
        super().__init__(renderer, rules, **kwargs)

    def output_math(self, m):
        return self.renderer.inline_math(m.group(1))

    def output_block_math(self, m):
        return self.renderer.block_math(m.group(1))


class MarkdownWithMath(mistune.Markdown):
    def __init__(self, renderer, **kwargs):
        if 'inline' not in kwargs:
            kwargs['inline'] = MathInlineLexer
        if 'block' not in kwargs:
            kwargs['block'] = MathBlockLexer
        super().__init__(renderer, **kwargs)

    def output_block_math(self):
        return self.renderer.block_math(self.token['text'])

    def output_latex_environment(self):
        return self.renderer.latex_environment(self.token['name'],
                                               self.token['text'])


class HighlightRenderer(mistune.Renderer):
    def block_code(self, code, lang='text'):
        try:
            lexer = get_lexer_by_name(lang, stripall=False)
        except Exception:
            lexer = get_lexer_by_name('text', stripall=False)

        formatter = HtmlFormatter()
        return highlight(code, lexer, formatter)

    def table(self, header, body):
        return '<table class="table table-sm"><thead class="thead-light">' \
               + header + '</thead><tbody>' + body + '</tbody></table>'

    def image(self, src, title, alt):
        alt = mistune.escape(alt, quote=True)
        title = mistune.escape(title or '', quote=True)
        return f'<img src="/file/{src}" ' \
               f'alt="{alt}" title="{title}">'
                # class="img-fluid mx-auto d-block"

    # Pass math through unaltered - mathjax does the rendering in the browser
    def block_math(self, text):
        return fr'\[ {text} \]'

    def latex_environment(self, name, text):
        return fr'\begin{{{name}}} {text} \end{{{name}}}'

    def inline_math(self, text):
        return fr'\( {text} \)'


# hard_wrap=True to insert <br> on newline
markdown = MarkdownWithMath(HighlightRenderer(escape=True))


def md_to_html(text: str, strip_p_tag: bool = False) -> str:
    md: str = markdown(text)
    if strip_p_tag and md.startswith('<p>') and md.endswith('</p>'):
        return md[3:-5]
    else:
        return md


# ---------------------------------------------------------------------------
# load data from yaml file
# ---------------------------------------------------------------------------
def load_yaml(filename: str, default: Any = None) -> Any:
    filename = path.expanduser(filename)
    try:
        f = open(filename, 'r', encoding='utf-8')
    except FileNotFoundError:
        logger.error(f'Cannot open "{filename}": not found')
    except PermissionError:
        logger.error(f'Cannot open "{filename}": no permission')
    except OSError:
        logger.error(f'Cannot open file "{filename}"')
    else:
        with f:
            try:
                default = yaml.safe_load(f)
            except yaml.YAMLError as e:
                if hasattr(e, 'problem_mark'):
                    mark = e.problem_mark
                    logger.error(f'File "{filename}" near line {mark.line+1}, '
                                 f'column {mark.column+1}')
                else:
                    logger.error(f'File "{filename}"')
    finally:
        return default


# ---------------------------------------------------------------------------
# Runs a script and returns its stdout parsed as yaml, or None on error.
# The script is run in another process but this function blocks waiting
# for its termination.
# ---------------------------------------------------------------------------
def run_script(script: str,
               args: List[str] = [],
               stdin: str = '',
               timeout: int = 2) -> Any:

    script = path.expanduser(script)
    try:
        cmd = [script] + [str(a) for a in args]
        p = subprocess.run(cmd,
                           input=stdin,
                           stdout=subprocess.PIPE,
                           stderr=subprocess.STDOUT,
                           universal_newlines=True,
                           timeout=timeout,
                           )
    except FileNotFoundError:
        logger.error(f'Can not execute script "{script}": not found.')
    except PermissionError:
        logger.error(f'Can not execute script "{script}": wrong permissions.')
    except OSError:
        logger.error(f'Can not execute script "{script}": unknown reason.')
    except subprocess.TimeoutExpired:
        logger.error(f'Timeout {timeout}s exceeded while running "{script}".')
    except Exception:
        logger.error(f'An Exception ocurred running {script}.')
    else:
        if p.returncode != 0:
            logger.error(f'Return code {p.returncode} running "{script}".')
        else:
            try:
                output = yaml.safe_load(p.stdout)
            except Exception:
                logger.error(f'Error parsing yaml output of "{script}"')
            else:
                return output


# ----------------------------------------------------------------------------
# Same as above, but asynchronous
# ----------------------------------------------------------------------------
async def run_script_async(script: str,
                           args: List[str] = [],
                           stdin: str = '',
                           timeout: int = 2) -> Any:

    script = path.expanduser(script)
    args = [str(a) for a in args]

    p = await asyncio.create_subprocess_exec(
        script, *args,
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.DEVNULL,
        )

    try:
        stdout, stderr = await asyncio.wait_for(
            p.communicate(input=stdin.encode('utf-8')),
            timeout=timeout
            )
    except asyncio.TimeoutError:
        logger.warning(f'Timeout {timeout}s running script "{script}".')
        return

    if p.returncode != 0:
        logger.error(f'Return code {p.returncode} running "{script}".')
    else:
        try:
            output = yaml.safe_load(stdout.decode('utf-8', 'ignore'))
        except Exception:
            logger.error(f'Error parsing yaml output of "{script}"')
        else:
            return output