tools.py
8.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# python standard library
import asyncio
import logging
from os import path
import re
import subprocess
from typing import Any, List
# third party libraries
import mistune
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import HtmlFormatter
import yaml
# setup logger for this module
logger = logging.getLogger(__name__)
# -------------------------------------------------------------------------
# Markdown to HTML renderer with support for LaTeX equations
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
# Block math:
# $$x$$ or \begin{equation}x\end{equation}
# -------------------------------------------------------------------------
class MathBlockGrammar(mistune.BlockGrammar):
block_math = re.compile(r'^\$\$(.*?)\$\$', re.DOTALL)
latex_environment = re.compile(r'^\\begin\{([a-z]*\*?)\}(.*?)\\end\{\1\}',
re.DOTALL)
class MathBlockLexer(mistune.BlockLexer):
default_rules = ['block_math', 'latex_environment'] \
+ mistune.BlockLexer.default_rules
def __init__(self, rules=None, **kwargs):
if rules is None:
rules = MathBlockGrammar()
super().__init__(rules, **kwargs)
def parse_block_math(self, m):
'''Parse a $$math$$ block'''
self.tokens.append({
'type': 'block_math',
'text': m.group(1)
})
def parse_latex_environment(self, m):
r'''Parse an environment \begin{name}text\end{name}'''
self.tokens.append({
'type': 'latex_environment',
'name': m.group(1),
'text': m.group(2)
})
# -------------------------------------------------------------------------
# Inline math: $x$
# -------------------------------------------------------------------------
class MathInlineGrammar(mistune.InlineGrammar):
math = re.compile(r'^\$(.+?)\$', re.DOTALL)
block_math = re.compile(r'^\$\$(.+?)\$\$', re.DOTALL)
text = re.compile(r'^[\s\S]+?(?=[\\<!\[_*`~$]|https?://| {2,}\n|$)')
class MathInlineLexer(mistune.InlineLexer):
default_rules = ['block_math', 'math'] + mistune.InlineLexer.default_rules
def __init__(self, renderer, rules=None, **kwargs):
if rules is None:
rules = MathInlineGrammar()
super().__init__(renderer, rules, **kwargs)
def output_math(self, m):
return self.renderer.inline_math(m.group(1))
def output_block_math(self, m):
return self.renderer.block_math(m.group(1))
class MarkdownWithMath(mistune.Markdown):
def __init__(self, renderer, **kwargs):
if 'inline' not in kwargs:
kwargs['inline'] = MathInlineLexer
if 'block' not in kwargs:
kwargs['block'] = MathBlockLexer
super().__init__(renderer, **kwargs)
def output_block_math(self):
return self.renderer.block_math(self.token['text'])
def output_latex_environment(self):
return self.renderer.latex_environment(self.token['name'],
self.token['text'])
class HighlightRenderer(mistune.Renderer):
def block_code(self, code, lang='text'):
try:
lexer = get_lexer_by_name(lang, stripall=False)
except Exception:
lexer = get_lexer_by_name('text', stripall=False)
formatter = HtmlFormatter()
return highlight(code, lexer, formatter)
def table(self, header, body):
return ('<table class="table table-sm"><thead class="thead-light">'
f'{header}</thead><tbody>{body}</tbody></table>')
def image(self, src, title, alt):
alt = mistune.escape(alt, quote=True)
title = mistune.escape(title or '', quote=True)
return (f'<img src="/file/{src}" alt="{alt}" title="{title}"'
f'class="img-fluid">') # class="img-fluid mx-auto d-block"
# Pass math through unaltered - mathjax does the rendering in the browser
def block_math(self, text):
return fr'\[ {text} \]'
def latex_environment(self, name, text):
return fr'\begin{{{name}}} {text} \end{{{name}}}'
def inline_math(self, text):
return fr'\( {text} \)'
# hard_wrap=True to insert <br> on newline
markdown = MarkdownWithMath(HighlightRenderer(escape=True))
def md_to_html(text: str, strip_p_tag: bool = False) -> str:
md: str = markdown(text)
if strip_p_tag and md.startswith('<p>') and md.endswith('</p>'):
return md[3:-5]
else:
return md
# ---------------------------------------------------------------------------
# load data from yaml file
# ---------------------------------------------------------------------------
def load_yaml(filename: str, default: Any = None) -> Any:
filename = path.expanduser(filename)
try:
f = open(filename, 'r', encoding='utf-8')
except Exception as e:
logger.error(e)
if default is not None:
return default
else:
raise
with f:
try:
return yaml.safe_load(f)
except yaml.YAMLError as e:
logger.error(str(e).replace('\n', ' '))
if default is not None:
return default
else:
raise
# ---------------------------------------------------------------------------
# Runs a script and returns its stdout parsed as yaml, or None on error.
# The script is run in another process but this function blocks waiting
# for its termination.
# ---------------------------------------------------------------------------
def run_script(script: str,
args: List[str] = [],
stdin: str = '',
timeout: int = 2) -> Any:
script = path.expanduser(script)
try:
cmd = [script] + [str(a) for a in args]
p = subprocess.run(cmd,
input=stdin,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
timeout=timeout,
)
except FileNotFoundError:
logger.error(f'Can not execute script "{script}": not found.')
except PermissionError:
logger.error(f'Can not execute script "{script}": wrong permissions.')
except OSError:
logger.error(f'Can not execute script "{script}": unknown reason.')
except subprocess.TimeoutExpired:
logger.error(f'Timeout {timeout}s exceeded while running "{script}".')
except Exception:
logger.error(f'An Exception ocurred running {script}.')
else:
if p.returncode != 0:
logger.error(f'Return code {p.returncode} running "{script}".')
else:
try:
output = yaml.safe_load(p.stdout)
except Exception:
logger.error(f'Error parsing yaml output of "{script}"')
else:
return output
# ----------------------------------------------------------------------------
# Same as above, but asynchronous
# ----------------------------------------------------------------------------
async def run_script_async(script: str,
args: List[str] = [],
stdin: str = '',
timeout: int = 2) -> Any:
script = path.expanduser(script)
args = [str(a) for a in args]
p = await asyncio.create_subprocess_exec(
script, *args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
try:
stdout, stderr = await asyncio.wait_for(
p.communicate(input=stdin.encode('utf-8')),
timeout=timeout
)
except asyncio.TimeoutError:
logger.warning(f'Timeout {timeout}s running script "{script}".')
return
if p.returncode != 0:
logger.error(f'Return code {p.returncode} running "{script}".')
else:
try:
output = yaml.safe_load(stdout.decode('utf-8', 'ignore'))
except Exception:
logger.error(f'Error parsing yaml output of "{script}"')
else:
return output