aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/skaldpress/__init__.py6
-rw-r--r--src/skaldpress/file_metadata_extract.rs0
-rw-r--r--src/skaldpress/main.py326
-rw-r--r--src/skaldpress/metadata_parser.py74
-rw-r--r--src/skaldpress/smp_macros.py1
-rw-r--r--src/smp/__init__.py13
-rw-r--r--src/smp/builtins.py6
-rw-r--r--src/smp/macro_processor.py76
8 files changed, 465 insertions, 37 deletions
diff --git a/src/skaldpress/__init__.py b/src/skaldpress/__init__.py
new file mode 100644
index 0000000..f00b84f
--- /dev/null
+++ b/src/skaldpress/__init__.py
@@ -0,0 +1,6 @@
+__version__ = "0.0.1"
+# import skaldpress.smp_macros
+#
+# __all__ = [
+# "skaldpress.smp_macros",
+# ]
diff --git a/src/skaldpress/file_metadata_extract.rs b/src/skaldpress/file_metadata_extract.rs
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/skaldpress/file_metadata_extract.rs
diff --git a/src/skaldpress/main.py b/src/skaldpress/main.py
new file mode 100644
index 0000000..66fd0b1
--- /dev/null
+++ b/src/skaldpress/main.py
@@ -0,0 +1,326 @@
+import os
+from argparse import ArgumentParser
+from dataclasses import dataclass
+import smp.macro_processor
+from skaldpress.metadata_parser import extract_parse_yaml_metadata
+
+
+@dataclass
+class CompiledFile:
+ content: str
+ metadata: dict
+ extension: str
+ stored_smp_state: dict
+ source_path: str
+ needs_recompilation: bool
+
+
+COMPILED_FILES: list[CompiledFile] = list()
+COMPILED_FILES_BY_TAG: dict[str, CompiledFile] = dict()
+
+
+class SkaldpressError(Exception):
+ def __init__(self, code, error, path=None):
+ self.code = code
+ self.error = error
+ self.path = path
+
+
+def sp_template(macro_processor, template, content):
+ with open(template, "r") as f:
+ file_content = f.read()
+ macro_processor.macros["CONTENT"] = content
+ return macro_processor.process_input(file_content)
+
+
+def get_template_path(template: str, opts):
+ return f"{opts.template_dir}{template}"
+
+
+def cached_file_id_by_path(source_path: str) -> int | None:
+ for i in range(len(COMPILED_FILES)):
+ if COMPILED_FILES[i] == source_path:
+ return i
+ return None
+
+
+def print_warnings(macro_processor):
+ for warning in macro_processor.warnings:
+ print(f" \u001b[33m{warning.description}\u001b[0m")
+
+
+def file_pat_match(file: str, pat: str) -> bool:
+ if file == pat:
+ return True
+ if pat.startswith("*") and file.endswith(pat.removeprefix("*")):
+ return True
+ if pat.startswith("*") and file.endswith(pat.removeprefix("*")):
+ return True
+ return False
+
+
+def file_filtered(file: str, filters: list[str], exclude: list[str]) -> bool:
+ for filter in exclude:
+ if file_pat_match(file, filter):
+ return True
+ if len(filters) == 0:
+ return False
+ for filter in filters:
+ if file_pat_match(file, filter):
+ return False
+ return True
+
+
+def macro_processor_initialize(metadata, old_macro_processor, additional_state=None):
+ macro_processor = old_macro_processor
+ # macro_processor.define_macro("all_tagged_by", sp_all_tagged_by)
+ macro_processor.define_macro("template", sp_template)
+
+ for key, value in metadata.items():
+ macro_name = f"METADATA_{key}"
+ if macro_name not in macro_processor.macros:
+ if isinstance(value, list):
+ out = [str(el) for el in value]
+ macro_value = out
+ else:
+ macro_value = str(value)
+ macro_processor.define_macro(macro_name, macro_value)
+
+ if additional_state:
+ for key, value in additional_state.items():
+ macro_processor.define_macro(key, value)
+
+
+def extract_requested_macro_processor_state(macro_processor):
+ requested_keys = macro_processor.macros.get("METADATA_keep_states")
+ if requested_keys:
+ if isinstance(requested_keys, list):
+ requested_keys = [str(el) for el in requested_keys]
+ elif isinstance(requested_keys, str):
+ requested_keys = [str(requested_keys)]
+ else:
+ macro_processor.warnings.append(
+ "keep_states specification must be list or scalar"
+ )
+ return {}
+
+ res = {}
+ for stored_key in requested_keys:
+ stored_value = macro_processor.macros.get(stored_key)
+ if stored_value:
+ res[stored_key] = stored_value
+ return res
+ return {}
+
+
+def needs_recompilation(macro_processor):
+ if "METADATA_keep_states" in macro_processor.macros:
+ return True
+ for macro_name in macro_processor.macro_invocations:
+ if macro_name == "all_tagged_by":
+ return True
+ return False
+
+
+def wrap_template(macro_processor, template_file, file_content, opts):
+ try:
+ with open(template_file, "r") as f:
+ template = f.read()
+ except OSError as e:
+ raise SkaldpressError(1, e, template_file)
+
+ template_extension = os.path.splitext(template_file)[1][1:] or ""
+
+ template_metadata, template_content = extract_parse_yaml_metadata(template) or (
+ {},
+ template,
+ )
+
+ macro_processor_initialize(template_metadata, macro_processor, None)
+ macro_processor.define_macro_string("CONTENT", file_content)
+ try:
+ content = macro_processor.process_input(template_content)
+ except Exception as e:
+ raise SkaldpressError(2, e)
+
+ template_parent = template_metadata.get("template")
+ if not template_parent:
+ return content, template_extension
+
+ template_parent = str(template_parent)
+ print(f" Wrapping in template {template_parent}")
+ return wrap_template(
+ macro_processor, get_template_path(template_parent, opts), content, opts
+ )
+
+
+def compile_file(file_path, opts):
+ extension = os.path.splitext(file_path)[1][1:] or ""
+ if not extension:
+ raise SkaldpressError(3, None)
+
+ try:
+ with open(file_path, "r") as f:
+ file_content = f.read()
+ except OSError as e:
+ raise SkaldpressError(1, e, file_path)
+
+ map, file_content = extract_parse_yaml_metadata(file_content) or ({}, file_content)
+ map.update(opts.metadata)
+ filename = os.path.relpath(file_path, opts.content_dir)
+ map["filename"] = os.path.splitext(filename)[0]
+
+ skip_smp = map.get("skip_smp", "").lower() == "true"
+ if opts.compilefilter and not file_filtered(file_path, opts.compilefilter, []):
+ skip_smp = True
+
+ if skip_smp:
+ return CompiledFile(
+ content=file_content,
+ metadata=map,
+ extension=extension,
+ source_path=file_path,
+ needs_recompilation=False,
+ stored_smp_state={},
+ )
+
+ stored_smp_state = None
+ cfile_i = cached_file_id_by_path(file_path)
+ if cfile_i is not None:
+ stored_smp_state = COMPILED_FILES[cfile_i].stored_smp_state
+
+ macro_processor = smp.macro_processor.MacroProcessor()
+ macro_processor_initialize(map, macro_processor, stored_smp_state)
+
+ if extension == "md":
+ file_content = f'html_from_markdown(%"{file_content}"%)'
+
+ if "template" not in map:
+ file_content = macro_processor.process_input(file_content)
+ print_warnings(macro_processor)
+ return CompiledFile(
+ content=file_content,
+ stored_smp_state=extract_requested_macro_processor_state(macro_processor),
+ metadata=map,
+ extension=extension,
+ source_path=file_path,
+ needs_recompilation=needs_recompilation(macro_processor),
+ )
+
+ template_file = get_template_path(map["template"], opts)
+ content, template_extension = wrap_template(
+ macro_processor, template_file, file_content, opts
+ )
+
+ print_warnings(macro_processor)
+ return CompiledFile(
+ content=content,
+ stored_smp_state=extract_requested_macro_processor_state(macro_processor),
+ metadata=map,
+ extension=template_extension,
+ source_path=file_path,
+ needs_recompilation=needs_recompilation(macro_processor),
+ )
+
+
+def compile_file_and_write(source_file_path, opts):
+ compiled_file = compile_file(source_file_path, opts)
+
+ if opts.first_run:
+ COMPILED_FILES.append(compiled_file)
+ cfile_i = len(COMPILED_FILES) - 1
+ cfile = COMPILED_FILES[cfile_i]
+
+ tags = cfile.metadata.get("tags")
+ if tags and isinstance(tags, list):
+ compiled_files_by_tag = COMPILED_FILES_BY_TAG
+ for tag in tags:
+ if tag not in compiled_files_by_tag:
+ compiled_files_by_tag[tag] = []
+ compiled_files_by_tag[tag].append(cfile_i)
+ else:
+ cfile_i = cached_file_id_by_path(compiled_file.source_path)
+ COMPILED_FILES[cfile_i], compiled_file = compiled_file, COMPILED_FILES[cfile_i]
+ cfile = COMPILED_FILES[cfile_i]
+
+ skip_build = cfile.metadata.get("skip_build")
+ if skip_build and skip_build.lower() == "true":
+ return
+
+ dest_file_path = os.path.join(
+ opts.build_dir, os.path.relpath(source_file_path, opts.content_dir)
+ )
+ dest_file_path = os.path.splitext(dest_file_path)[0] + "." + cfile.extension
+
+ target_filename = cfile.metadata.get("target_filename")
+ if target_filename and isinstance(target_filename, str):
+ dest_file_path = os.path.join(
+ os.path.dirname(dest_file_path), target_filename + "." + cfile.extension
+ )
+
+ dest_dir = os.path.dirname(dest_file_path)
+ os.makedirs(dest_dir, exist_ok=True)
+
+ print(f"> Writing {source_file_path} to {dest_file_path}")
+ with open(dest_file_path, "w") as f:
+ f.write(cfile.content)
+
+
+def compile_files_in_directory(directory, opts):
+ try:
+ entries = os.listdir(directory)
+ except OSError as e:
+ raise Exception(f"Error 8 {e} {directory}")
+
+ for entry in entries:
+ path = os.path.join(directory, entry)
+ # try:
+ # metadata = os.stat(path)
+ # except OSError as e:
+ # print(f"\033[31mError getting file metadata {e}\033[0m")
+ # continue
+
+ needs_recompilation = False
+ cfile_i = cached_file_id_by_path(path)
+ if cfile_i is not None:
+ needs_recompilation = COMPILED_FILES[cfile_i].needs_recompilation
+
+ should_compile = (opts.first_run or needs_recompilation) and not file_filtered(
+ path, opts.filter, opts.exclude
+ )
+ if os.path.isfile(path) and should_compile:
+ print(f"< Compiling {path}")
+ try:
+ compile_file_and_write(path, opts)
+ except Exception as e:
+ print(f"\033[31mError compiling {path}: {e}\033[0m")
+ raise e
+ elif os.path.isdir(path):
+ try:
+ compile_files_in_directory(path, opts)
+ except SkaldpressError as e:
+ print(f"\033[31mError processing directory {path}: {e}\033[0m")
+
+
+def main():
+ parser = ArgumentParser()
+ parser.add_argument(
+ "-o", "--out", "--output", metavar="path", default="build/", dest="build_dir"
+ )
+ parser.add_argument(
+ "-i", "--input", metavar="path", default="content/", dest="content_dir"
+ )
+ parser.add_argument("-s", "--static", metavar="path", default="static/")
+ parser.add_argument(
+ "-t", "--templates", metavar="path", default="templates/", dest="template_dir"
+ )
+ parser.add_argument("-f", "--filter", metavar="filter", default=[])
+ parser.add_argument("-e", "--exclude", metavar="filter", default=[])
+ parser.add_argument("-m", "--metadata", nargs="+", metavar="key=value", default=[])
+ parser.add_argument("-c", "--compilefilter", metavar="filter", default=[])
+ parser.add_argument("-x", "--xclude", metavar="filter", default=[])
+ args = parser.parse_args()
+
+ args.first_run = True
+
+ compile_files_in_directory(args.content_dir, args)
diff --git a/src/skaldpress/metadata_parser.py b/src/skaldpress/metadata_parser.py
new file mode 100644
index 0000000..28cab31
--- /dev/null
+++ b/src/skaldpress/metadata_parser.py
@@ -0,0 +1,74 @@
+import datetime
+from typing import Any
+
+
+def str_to_yaml_value(in_str: str) -> Any:
+ in_str = in_str.strip()
+
+ try:
+ return int(in_str)
+ except:
+ pass
+
+ try:
+ return datetime.datetime.strptime(in_str, "%Y-%m-%dT%H:%M:%S%z")
+ except:
+ pass
+
+ return str(in_str)
+
+
+def extract_parse_yaml_metadata(file_content, newline="\n") -> tuple[dict, str]:
+ file_lines = file_content.split(newline)
+ if len(file_lines) < 1:
+ return {}, file_content
+
+ if next(iter(file_lines)).strip() != "---":
+ return {}, file_content
+
+ yaml_map: dict[str, Any] = {}
+ yaml_started = yaml_ended = False
+ end_index = 0
+ current_key = None
+ current_list = list()
+
+ for i, line in enumerate(file_lines):
+ if line.strip() == "---":
+ if yaml_started:
+ yaml_ended = True
+ end_index = sum(
+ map(
+ lambda x: len(x) + len(newline),
+ file_content.split(newline)[: i + 1],
+ )
+ )
+ break
+ else:
+ yaml_started = True
+ elif yaml_started and not yaml_ended:
+ if line.strip().startswith("-") and current_key is not None:
+ current_list.append(line.strip().lstrip("-").strip())
+ elif ":" in line:
+ key, value = line.split(":", 1)
+ if current_key is not None:
+ if len(current_list) > 0:
+ yaml_map[key] = current_list
+ current_list = list()
+
+ current_key = key.strip()
+ if value.strip() != "":
+ yaml_map[current_key] = str_to_yaml_value(value.strip())
+ current_key = None
+
+ if current_key is not None:
+ if len(current_list) > 0:
+ yaml_map[current_key] = current_list
+
+ if not yaml_ended:
+ end_index = len(file_content)
+
+ if "publish_date" in yaml_map:
+ if "change_date" not in yaml_map:
+ yaml_map["change_date"] = yaml_map["publish_date"]
+
+ return yaml_map, file_content[end_index:]
diff --git a/src/skaldpress/smp_macros.py b/src/skaldpress/smp_macros.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/src/skaldpress/smp_macros.py
@@ -0,0 +1 @@
+
diff --git a/src/smp/__init__.py b/src/smp/__init__.py
index 63cecaf..22085ae 100644
--- a/src/smp/__init__.py
+++ b/src/smp/__init__.py
@@ -2,6 +2,11 @@ __version__ = "0.0.1"
import smp.macro_processor
import smp.builtins
+__all__ = [
+ "smp.macro_processor",
+ "smp.builtins",
+]
+
def repl():
print("=Skaldpress Macro Processor (REPL)")
@@ -15,8 +20,8 @@ def read_stdin():
import sys
data = sys.stdin.read()
- smp = macro_processor.MacroProcessor()
- res = smp.process_input(data)
+ macro_processor = smp.macro_processor.MacroProcessor()
+ res = macro_processor.process_input(data)
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", file=sys.stderr)
print(res)
@@ -39,7 +44,7 @@ def main():
with open(sys.argv[1], "r") as f:
file_content = f.read()
- smp = macro_processor.MacroProcessor()
- res = smp.process_input(file_content)
+ macro_processor = smp.macro_processor.MacroProcessor()
+ res = macro_processor.process_input(file_content)
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", file=sys.stderr)
print(res)
diff --git a/src/smp/builtins.py b/src/smp/builtins.py
index f463a24..9a27864 100644
--- a/src/smp/builtins.py
+++ b/src/smp/builtins.py
@@ -1,10 +1,10 @@
-import smp.exceptions
+# import smp.exceptions
import subprocess
import urllib.request
import urllib.error
import datetime
import markdown
-from gfm import AutolinkExtension, TaskListExtension
+from gfm import AutolinkExtension, TaskListExtension # type: ignore
def smp_builtin_define(macro_processor, macro_name, macro_value=None):
@@ -147,7 +147,7 @@ def smp_builtin_html_from_markdown(macro_processor, text, extensions=list()):
global LINK_CACHE
-LINK_CACHE = dict()
+LINK_CACHE: dict[str, tuple[bool, int, str]] = dict()
def smp_builtin_wodl(macro_processor, link, timeout_seconds=5):
diff --git a/src/smp/macro_processor.py b/src/smp/macro_processor.py
index e85fbe9..8fa9d91 100644
--- a/src/smp/macro_processor.py
+++ b/src/smp/macro_processor.py
@@ -41,6 +41,8 @@ class MacroProcessor:
warnings: list[Any]
""" Global environment for python execution """
py_global_env: dict
+ py_local_env_alt: dict
+ py_local_env_current: dict
special_macros: dict[str, tuple[Any, Any]]
@@ -49,32 +51,40 @@ class MacroProcessor:
self.macro_invocations = list()
self.warnings = list()
self.py_global_env = dict()
- self._define_builtins(prefix=prefix)
-
- def _define_builtins(self, prefix=""):
- self.macros[f"{prefix}define"] = smp.builtins.smp_builtin_define
- self.macros[f"{prefix}undefine"] = smp.builtins.smp_builtin_undefine
- self.macros[f"{prefix}define_array"] = smp.builtins.smp_builtin_define_array
- self.macros[f"{prefix}ifdef"] = smp.builtins.smp_builtin_ifdef
- self.macros[f"{prefix}ifndef"] = smp.builtins.smp_builtin_ifndef
- self.macros[f"{prefix}ifeq"] = smp.builtins.smp_builtin_ifeq
- self.macros[f"{prefix}ifneq"] = smp.builtins.smp_builtin_ifneq
- self.macros[f"{prefix}include"] = smp.builtins.smp_builtin_include
- self.macros[f"{prefix}include_verbatim"] = (
- smp.builtins.smp_builtin_include_verbatim
- )
- self.macros[f"{prefix}shell"] = smp.builtins.smp_builtin_shell
- self.macros[f"{prefix}dumpenv"] = smp.builtins.smp_builtin_dumpenv
- self.macros[f"{prefix}eval"] = smp.builtins.smp_builtin_eval
- self.macros[f"{prefix}array_push"] = smp.builtins.smp_builtin_array_push
- self.macros[f"{prefix}array_each"] = smp.builtins.smp_builtin_array_each
- self.macros[f"{prefix}array_size"] = smp.builtins.smp_builtin_array_size
- self.macros[f"{prefix}explode"] = smp.builtins.smp_builtin_explode
- self.macros[f"{prefix}format_time"] = smp.builtins.smp_builtin_format_time
- self.macros[f"{prefix}html_from_markdown"] = (
- smp.builtins.smp_builtin_html_from_markdown
- )
- self.macros[f"{prefix}wodl"] = smp.builtins.smp_builtin_wodl
+ self.py_local_env_alt = dict()
+ self.py_local_env_current = self.macros
+ self.indent_level = ""
+
+ self._define_builtins(self.macros, prefix=prefix)
+ self._define_builtins(self.py_local_env_alt, prefix=prefix)
+
+ def _define_builtins(self, env, prefix=""):
+ env[f"{prefix}macro_processor"] = self
+ env[f"{prefix}define"] = smp.builtins.smp_builtin_define
+ env[f"{prefix}undefine"] = smp.builtins.smp_builtin_undefine
+ env[f"{prefix}define_array"] = smp.builtins.smp_builtin_define_array
+ env[f"{prefix}ifdef"] = smp.builtins.smp_builtin_ifdef
+ env[f"{prefix}ifndef"] = smp.builtins.smp_builtin_ifndef
+ env[f"{prefix}ifeq"] = smp.builtins.smp_builtin_ifeq
+ env[f"{prefix}ifneq"] = smp.builtins.smp_builtin_ifneq
+ env[f"{prefix}include"] = smp.builtins.smp_builtin_include
+ env[f"{prefix}include_verbatim"] = smp.builtins.smp_builtin_include_verbatim
+ env[f"{prefix}shell"] = smp.builtins.smp_builtin_shell
+ env[f"{prefix}dumpenv"] = smp.builtins.smp_builtin_dumpenv
+ env[f"{prefix}eval"] = smp.builtins.smp_builtin_eval
+ env[f"{prefix}array_push"] = smp.builtins.smp_builtin_array_push
+ env[f"{prefix}array_each"] = smp.builtins.smp_builtin_array_each
+ env[f"{prefix}array_size"] = smp.builtins.smp_builtin_array_size
+ env[f"{prefix}explode"] = smp.builtins.smp_builtin_explode
+ env[f"{prefix}format_time"] = smp.builtins.smp_builtin_format_time
+ env[f"{prefix}html_from_markdown"] = smp.builtins.smp_builtin_html_from_markdown
+ env[f"{prefix}wodl"] = smp.builtins.smp_builtin_wodl
+
+ def define_macro_string(self, macro_name, macro_value):
+ self.define_macro(macro_name, str(macro_value))
+
+ def define_macro(self, macro_name, macro_value):
+ self.macros[macro_name] = macro_value
def expand_macro(self, macro_name: str, args: list[str] = list()) -> str:
# Ignore trailing underscore in macro name, the parser will pop a space in front if
@@ -104,14 +114,18 @@ class MacroProcessor:
if callable(macro):
signature = inspect.signature(macro)
- macro_args = []
+ macro_args: list[Any] = []
if (
"macro_processor" in signature.parameters
or "smp" in signature.parameters
):
macro_args.append(self)
macro_args.extend(args)
- return str(macro(*macro_args))
+ try:
+ return str(macro(*macro_args))
+ except Exception as e:
+ s = f"{macro_name}({','.join([repr(x) for x in macro_args])})"
+ raise Exception(s)
if isinstance(macro, str):
expanded = macro
for i, arg in enumerate(args):
@@ -143,8 +157,11 @@ class MacroProcessor:
skip_next_line_ending = False
+ line_begin = True
+
# We should keep track of filename, linenumber, and character number on line here
# So we can give sensible error messages
+ # Probably add to python stack trace?
quote_level = 0
parens_level = 0
@@ -153,7 +170,6 @@ class MacroProcessor:
while i < len(input):
c = input[i]
peek = None if i + 1 >= len(input) else input[i + 1]
-
# import sys
# print(f"[{i:4}] {repr(c):4} -> {repr(peek):4} [{state}] = {repr(output)}", file=sys.stderr)
@@ -264,7 +280,7 @@ class MacroProcessor:
try:
f = StringIO()
with redirect_stdout(f):
- exec(py_expr, self.py_global_env, self.macros)
+ exec(py_expr, self.py_global_env, self.py_local_env_current)
s = f.getvalue()
if s != "":
output += s