aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQrius <[email protected]>2025-04-11 17:14:32 +0200
committerQrius <[email protected]>2025-04-11 17:14:35 +0200
commit15b9c3af6b0d58f8d6bb0729a217dc6d9f4666e6 (patch)
tree88ae7f0ba9a1ef3491f44324c0b24036c1d5afff
parent9a3ce865e64d496cb835ece3e5a84a80361480ab (diff)
downloadskaldpress-15b9c3af6b0d58f8d6bb0729a217dc6d9f4666e6.tar.gz
skaldpress-15b9c3af6b0d58f8d6bb0729a217dc6d9f4666e6.zip
Begin rewrite of internals to be more malleable
-rw-r--r--src/skaldpress/filelist.py81
-rw-r--r--src/skaldpress/main.py338
-rw-r--r--src/skaldpress/metadata_parser.py78
-rw-r--r--src/smp/__init__.py8
-rw-r--r--src/smp/builtins.py105
-rw-r--r--src/smp/macro_processor.py145
6 files changed, 383 insertions, 372 deletions
diff --git a/src/skaldpress/filelist.py b/src/skaldpress/filelist.py
index 285efb0..92180fa 100644
--- a/src/skaldpress/filelist.py
+++ b/src/skaldpress/filelist.py
@@ -5,7 +5,7 @@ from datetime import datetime
from pathlib import Path
from enum import Enum
from typing import Any, Generator
-from skaldpress.metadata_parser import get_all_meta
+from skaldpress.metadata_parser import get_all_meta_iter
class FileListFileTargetAction(Enum):
@@ -99,28 +99,31 @@ def walk_filetree(
elif entry.is_dir():
yield from walk_filetree(entry)
except OSError as e:
- raise Exception(f"DirectoryReadError: {e}")
+ raise Exception(f"DirectoryReadError: {directory} {e}")
-def enrich_with_metadata(entries, template_dir, additional_metadata, read_metadata):
- for entry in entries:
- if not read_metadata:
- yield entry[0], entry[1], {}, datetime.fromtimestamp(
- entry[0].stat().st_mtime
- )
- continue
+def enrich_with_metadata(
+ file, path_relative, template_dir, additional_metadata, read_metadata
+):
+ if not read_metadata:
+ return path_relative, {}, datetime.fromtimestamp(file.stat().st_mtime)
- meta, extension, oldest_modified = get_all_meta(
- entry[0], template_dir, additional_metadata
- )
- if extension[0] != ".":
- extension = "." + extension
- rel_path = os.path.splitext(entry[1])[0] + extension
+ meta, extension, oldest_modified = get_all_meta_iter(
+ file, template_dir, additional_metadata
+ )
+
+ if "target_filename" in meta:
+ path_relative = meta["target_filename"]
- yield entry[0], rel_path, meta, oldest_modified
+ if len(extension) > 0 and extension[0] != ".":
+ extension = "." + extension
+ path_relative = os.path.splitext(path_relative)[0] + extension
-def make_filelist(
+ return path_relative, meta, oldest_modified
+
+
+def make_filelist_iter(
directory, # : DirEntry
base_dir: Path,
include: list[str],
@@ -129,26 +132,28 @@ def make_filelist(
read_metadata: bool,
template_dir: str,
additional_metadata: dict[str, Any],
-) -> Generator[FileListFile, None, None]:
- filelist_gen1 = (
- (x, x.path.replace(base_dir, "", 1)) for x in walk_filetree(directory)
- )
- filelist_gen2 = (
- x for x in filelist_gen1 if not file_filtered(x[1], include, exclude)
- )
- filelist_gen3 = enrich_with_metadata(
- filelist_gen2, template_dir, additional_metadata, read_metadata
- )
- filelist_gen4 = (
- FileListFile(
- file_path=x[0],
- file_rel=x[1],
- change_time=x[3],
- dependencies=[],
- target_action=target_action,
- metadata=x[2],
+) -> list[FileListFile]:
+
+ filelist = []
+ for file in walk_filetree(directory):
+ path_relative = file.path.replace(base_dir, "", 1)
+
+ if file_filtered(path_relative, include, exclude):
+ continue
+
+ path_relative, meta, oldest_modified = enrich_with_metadata(
+ file, path_relative, template_dir, additional_metadata, read_metadata
+ )
+
+ filelist.append(
+ FileListFile(
+ file_path=file,
+ file_rel=path_relative,
+ change_time=oldest_modified,
+ dependencies=[],
+ target_action=target_action,
+ metadata=meta,
+ )
)
- for x in filelist_gen3
- )
- return filelist_gen4
+ return filelist
diff --git a/src/skaldpress/main.py b/src/skaldpress/main.py
index 57da9e5..b58f4ad 100644
--- a/src/skaldpress/main.py
+++ b/src/skaldpress/main.py
@@ -1,75 +1,60 @@
import os
import shutil
+import sys
+import traceback
from argparse import ArgumentParser, ArgumentTypeError
-from dataclasses import dataclass
from functools import partial
from itertools import chain
from collections import deque
import smp.macro_processor
from copy import deepcopy
-from skaldpress.metadata_parser import extract_parse_yaml_metadata
from skaldpress.filelist import (
- make_filelist,
+ make_filelist_iter,
FileList,
FileListFileTargetAction,
file_filtered,
)
+from smp.builtins import (
+ smp_builtin_read,
+ smp_builtin_add_metadata,
+)
from time import perf_counter
-@dataclass
-class CompiledFile:
- content: str
- metadata: dict
- extension: str
- stored_smp_state: dict
- source_path: str
- needs_recompilation: bool
-
-
-COMPILED_FILES: list[CompiledFile] = list()
-COMPILED_FILES_BY_TAG: dict[str, list[int]] = dict()
-
-
-def sp_template(macro_processor, template, content):
- with open(template, "r") as f:
- file_content = f.read()
- macro_processor.macros["CONTENT"] = content
- return macro_processor.process_input(file_content)
-
-
-# SMP Macro for getting all files with specific tag, this is only _really_ effective the second run
-#
-# Usage in files:
-# all_tagged_by(<tag name>, <template> [, <field to sort by>] [, reversed])
def sp_all_tagged_by(
macro_processor, tag: str, template: str, field=None, reversed=""
) -> str:
- global COMPILED_FILES, COMPILED_FILES_BY_TAG
-
- if tag not in COMPILED_FILES_BY_TAG:
- print(f" \u001b[35mNo tags for {tag}\u001b[0m")
+ """
+ SMP Macro for getting all files with specific tag, this is only _really_ effective the second run
+
+ Usage in files:
+ all_tagged_by(<tag name>, <template> [, <field to sort by>] [, reversed])
+ """
+ tagged_files = [
+ k
+ for k, v in macro_processor.py_global_env["macro_processor_state"].items()
+ if "METADATA_tags" in v["stored_data"]
+ ]
+
+ if len(tagged_files) == 0:
+ macro_processor.log_warning(f"No tags for {tag}\u001b[0m")
return ""
- tagged_files = deepcopy(COMPILED_FILES_BY_TAG[tag])
-
- out = ""
- # if field is not None:
- # tagged_files = sorted(tagged_files, lambda x: x[field])
- # if args.len() > 3 && args[3] == "reversed" {
- # tagged_files.sort_by(|a, b| order_index_by_cached_data(macro_processor, &args[2], b, a));
- # } else {
- # tagged_files.sort_by(|a, b| order_index_by_cached_data(macro_processor, &args[2], b, a));
- # }
+ # FRAGILE
+ if field is not None:
+ tagged_files.sort(
+ key=lambda fname: macro_processor.py_global_env["macro_processor_state"][
+ fname
+ ]["stored_data"][f"METADATA_{field}"],
+ reverse=(reversed != ""),
+ )
- for doc_i in tagged_files:
- file = COMPILED_FILES[doc_i]
+ out = ""
+ for filename in tagged_files:
+ file = macro_processor.py_global_env["macro_processor_state"][filename]
smp_local = deepcopy(macro_processor)
-
- macro_processor_initialize(file.metadata, smp_local, None)
-
- out += sp_template(smp_local, template, file.content)
-
+ smp_local.macros.update(file["stored_data"])
+ out += smp_builtin_read(smp_local, template, template_content=file["content"])
print_warnings(smp_local)
return out
@@ -81,212 +66,59 @@ class SkaldpressError(Exception):
self.path = path
-def cached_file_id_by_path(source_path: str) -> int | None:
- global COMPILED_FILES
- for i in range(len(COMPILED_FILES)):
- if COMPILED_FILES[i].source_path == source_path:
- return i
- return None
-
-
def print_warnings(macro_processor):
for warning in macro_processor.warnings:
- # print(f" \u001b[33m{warning.description}\u001b[0m")
print(f" \u001b[33m{warning}\u001b[0m")
def macro_processor_initialize(metadata, old_macro_processor, additional_state=None):
macro_processor = old_macro_processor
macro_processor.define_macro("all_tagged_by", sp_all_tagged_by)
- macro_processor.define_macro("template", sp_template)
-
- for key, value in metadata.items():
- macro_name = f"METADATA_{key}"
- if macro_name not in macro_processor.macros:
- if isinstance(value, list):
- macro_value = [str(el) for el in value]
- else:
- macro_value = str(value)
- macro_processor.define_macro(macro_name, macro_value)
-
+ smp_builtin_add_metadata(macro_processor, metadata, overwrite=False)
if additional_state:
for key, value in additional_state.items():
macro_processor.define_macro(key, value)
- global COMPILED_FILES, COMPILED_FILES_BY_TAG
- macro_processor.py_global_env["compiled_files"] = COMPILED_FILES
- macro_processor.py_global_env["compiled_files_by_tag"] = COMPILED_FILES_BY_TAG
-
-
-def extract_requested_macro_processor_state(macro_processor):
- requested_keys = macro_processor.macros.get("METADATA_keep_states")
- if requested_keys:
- if isinstance(requested_keys, list):
- requested_keys = [str(el) for el in requested_keys]
- elif isinstance(requested_keys, str):
- requested_keys = [str(requested_keys)]
- else:
- macro_processor.warnings.append(
- "keep_states specification must be list or scalar"
- )
- return {}
-
- res = {}
- for stored_key in requested_keys:
- stored_value = macro_processor.macros.get(stored_key)
- if stored_value:
- res[stored_key] = stored_value
- return res
- return {}
-
-
-def needs_recompilation(macro_processor):
- if "METADATA_keep_states" in macro_processor.macros:
- return True
- for macro_name, args in macro_processor.macro_invocations:
- if macro_name == "all_tagged_by":
- return True
- return False
-
-
-def wrap_template(macro_processor, template_file, file_content, opts):
- try:
- with open(template_file, "r") as f:
- template = f.read()
- except OSError as e:
- raise SkaldpressError(1, e, template_file)
-
- template_extension = os.path.splitext(template_file)[1][1:] or ""
-
- template_metadata, template_content = extract_parse_yaml_metadata(template) or (
- {},
- template,
- )
-
- macro_processor_initialize(template_metadata, macro_processor, None)
- macro_processor.define_macro_string("CONTENT", file_content)
- try:
- content = macro_processor.process_input(template_content)
- except Exception as e:
- raise SkaldpressError(2, e)
-
- template_parent = template_metadata.get("template")
- if not template_parent:
- return content, template_extension
-
- template_parent = str(template_parent)
- print(f" Wrapping in template {template_parent}")
- return wrap_template(
- macro_processor, f"{opts.template_dir}{template_parent}", content, opts
- )
-
-
-def compile_file(file_path, opts):
- global COMPILED_FILES
+def compile_file(smps: smp.macro_processor.MacroProcessorState, file_path, opts):
extension = os.path.splitext(file_path)[1][1:] or ""
if not extension:
raise SkaldpressError(3, None)
- try:
- with open(file_path, "r") as f:
- file_content = f.read()
- except OSError as e:
- raise SkaldpressError(1, e, file_path)
-
- map, file_content = extract_parse_yaml_metadata(file_content) or ({}, file_content)
- map.update(opts.metadata)
- filename = os.path.relpath(file_path, opts.content_dir)
- map["filename"] = os.path.splitext(filename)[0]
-
- skip_smp = map.get("skip_smp", "").lower() == "true"
- if opts.compilefilter and not file_filtered(file_path, opts.compilefilter, []):
- skip_smp = True
-
- if skip_smp:
- return CompiledFile(
- content=file_content,
- metadata=map,
- extension=extension,
- source_path=file_path,
- needs_recompilation=False,
- stored_smp_state={},
- )
-
stored_smp_state = None
- cfile_i = cached_file_id_by_path(file_path)
- if cfile_i is not None:
- stored_smp_state = COMPILED_FILES[cfile_i].stored_smp_state
-
- macro_processor = smp.macro_processor.MacroProcessor()
- macro_processor_initialize(map, macro_processor, stored_smp_state)
-
- if extension == "md":
- file_content = f'html_from_markdown(%"{file_content}"%)'
-
- if "template" not in map:
- file_content = macro_processor.process_input(file_content)
- print_warnings(macro_processor)
- return CompiledFile(
- content=file_content,
- stored_smp_state=extract_requested_macro_processor_state(macro_processor),
- metadata=map,
- extension=extension,
- source_path=file_path,
- needs_recompilation=needs_recompilation(macro_processor),
- )
+ if file_path in smps.global_state:
+ stored_smp_state = smps.global_state[file_path]["stored_data"]
- template_file = f"{opts.template_dir}{map['template']}"
- content, template_extension = wrap_template(
- macro_processor, template_file, file_content, opts
+ macro_processor = smps.macro_processor()
+ macro_processor.define_macro_string(
+ "METADATA_filename", os.path.splitext(os.path.relpath(file_path, opts.content_dir))[0]
)
+ macro_processor.source_file_path = file_path
+ macro_processor_initialize(opts.metadata, macro_processor, stored_smp_state)
+
+ content = smp_builtin_read(macro_processor, file_path)
+
print_warnings(macro_processor)
- return CompiledFile(
- content=content,
- stored_smp_state=extract_requested_macro_processor_state(macro_processor),
- metadata=map,
- extension=template_extension,
- source_path=file_path,
- needs_recompilation=needs_recompilation(macro_processor),
- )
+ return macro_processor.store(content=content)
-def compile_file_and_write(source_file_path, opts):
- global COMPILED_FILES, COMPILED_FILES_BY_TAG
- compiled_file = compile_file(source_file_path, opts)
-
- if opts.first_run:
- COMPILED_FILES.append(compiled_file)
- cfile_i = len(COMPILED_FILES) - 1
- cfile = COMPILED_FILES[cfile_i]
-
- tags = cfile.metadata.get("tags")
- if tags and isinstance(tags, list):
- for tag in tags:
- if tag not in COMPILED_FILES_BY_TAG:
- COMPILED_FILES_BY_TAG[tag] = []
- COMPILED_FILES_BY_TAG[tag].append(cfile_i)
- else:
- cfile_i = cached_file_id_by_path(compiled_file.source_path)
- if cfile_i is None:
- return
- COMPILED_FILES[cfile_i], compiled_file = compiled_file, COMPILED_FILES[cfile_i]
- cfile = COMPILED_FILES[cfile_i]
-
- skip_build = cfile.metadata.get("skip_build")
- if skip_build and skip_build.lower() == "true":
- return
+def compile_file_and_write(
+ smps: smp.macro_processor.MacroProcessorState, source_file_path, opts
+):
+ compiled_file = compile_file(smps, source_file_path, opts)
dest_file_path = os.path.join(
opts.build_dir, os.path.relpath(source_file_path, opts.content_dir)
)
- dest_file_path = os.path.splitext(dest_file_path)[0] + "." + cfile.extension
+ dest_file_path = (
+ os.path.splitext(dest_file_path)[0] + "." + compiled_file["extension"]
+ )
- target_filename = cfile.metadata.get("target_filename")
- if target_filename and isinstance(target_filename, str):
+ if compiled_file["target_filename"] is not None:
dest_file_path = os.path.join(
- os.path.dirname(dest_file_path), target_filename + "." + cfile.extension
+ os.path.dirname(dest_file_path),
+ compiled_file["target_filename"] + "." + compiled_file["extension"],
)
dest_dir = os.path.dirname(dest_file_path)
@@ -294,12 +126,12 @@ def compile_file_and_write(source_file_path, opts):
print(f"> Writing {source_file_path} to {dest_file_path}")
with open(dest_file_path, "w") as f:
- f.write(cfile.content)
+ f.write(compiled_file["content"])
-def compile_files_in_directory(directory, opts):
- global COMPILED_FILES
-
+def compile_files_in_directory(
+ smps: smp.macro_processor.MacroProcessorState, directory: str, opts
+):
try:
entries = os.listdir(directory)
except OSError as e:
@@ -308,27 +140,30 @@ def compile_files_in_directory(directory, opts):
for entry in entries:
path = os.path.join(directory, entry)
- needs_recompilation = False
- cfile_i = cached_file_id_by_path(path)
- if cfile_i is not None:
- needs_recompilation = COMPILED_FILES[cfile_i].needs_recompilation
+ if path in smps.global_state:
+ needs_compilation = smps.global_state[path]["needs_recompilation"]
+ else:
+ needs_compilation = True
- should_compile = (opts.first_run or needs_recompilation) and not file_filtered(
+ should_compile = (needs_compilation) and not file_filtered(
path, opts.filter, opts.exclude
)
+
if os.path.isfile(path) and should_compile:
print(f"< Compiling {path}")
try:
- compile_file_and_write(path, opts)
+ compile_file_and_write(smps, path, opts)
except Exception as e:
print(f"\033[31mError compiling {path}: {e}\033[0m")
+ print(f"{traceback.format_exc()}", file=sys.stderr)
elif os.path.isdir(path):
try:
- compile_files_in_directory(path, opts)
+ compile_files_in_directory(smps, path, opts)
except SkaldpressError as e:
print(f"\033[31mError processing directory {path}: {e}\033[0m")
except Exception as e:
print(f"\033[31mError compiling {path}: {e}\033[0m")
+ print(f"{traceback.format_exc()}", file=sys.stderr)
def check_trailing_slash(arg):
@@ -391,8 +226,6 @@ def main():
)
args = parser.parse_args()
- args.first_run = True
-
metadata = {}
for val in args.metadata:
if "=" not in val:
@@ -405,7 +238,7 @@ def main():
args.metadata = metadata
now = perf_counter()
- filelist_dest = make_filelist(
+ filelist_dest = make_filelist_iter(
args.build_dir,
args.build_dir,
[],
@@ -416,7 +249,7 @@ def main():
args.metadata,
)
filelist_src = chain(
- make_filelist(
+ make_filelist_iter(
args.static_dir,
args.static_dir,
[],
@@ -426,7 +259,7 @@ def main():
args.template_dir,
args.metadata,
),
- make_filelist(
+ make_filelist_iter(
args.content_dir,
args.content_dir,
args.filter,
@@ -437,13 +270,14 @@ def main():
args.metadata,
),
)
+
filelist_dest = FileList.new(filelist_dest)
filelist_src = FileList.new(filelist_src)
- elapsed = perf_counter() - now
print(
- f"Generated filelist in {elapsed} seconds, {len(filelist_dest)} in destination, {len(filelist_src)} in source"
+ f"Generated filelist in {perf_counter() - now:.6f} seconds, {len(filelist_dest)} in destination, {len(filelist_src)} in source"
)
+ now = perf_counter()
work_queue = deque()
@@ -459,17 +293,13 @@ def main():
for file in filelist_src.changed_from(filelist_dest):
work_queue.append((file.file_rel, file.target_action))
- dependants = []
- for filename, file in filelist_src.files.items():
- if "dependencies" not in file.metadata:
- continue
- dependants.append((file.metadata["dependencies"], filename))
-
- elapsed = perf_counter() - elapsed
print(
- f"Generated work_queue in {elapsed} seconds, {len(work_queue)} actions to process"
+ f"Generated work_queue in {perf_counter() - now:.2f} seconds, {len(work_queue)} actions to process"
)
+ dest_dir = os.path.dirname(args.build_dir)
+ os.makedirs(dest_dir, exist_ok=True)
+
for filename, action in work_queue:
if action != FileListFileTargetAction.COMPILE:
print(f"> {action} {filename}")
@@ -495,7 +325,7 @@ def main():
except Exception as e:
print(f" \u001b[31mCould not copy file ({e})\u001b[0m")
- compile_files_in_directory(args.content_dir, args)
+ macro_processor_state = smp.macro_processor.MacroProcessorState()
+ compile_files_in_directory(macro_processor_state, args.content_dir, args)
print("\n=======================\n")
- args.first_run = False
- compile_files_in_directory(args.content_dir, args)
+ compile_files_in_directory(macro_processor_state, args.content_dir, args)
diff --git a/src/skaldpress/metadata_parser.py b/src/skaldpress/metadata_parser.py
index 85e6a65..18ae56b 100644
--- a/src/skaldpress/metadata_parser.py
+++ b/src/skaldpress/metadata_parser.py
@@ -5,44 +5,48 @@ from copy import deepcopy
from datetime import datetime
-def get_all_meta(
- file_path: Path, template_dir: str, meta: dict[str, Any]
+def dict_soft_extend(a: dict[Any, Any], b: dict[Any, Any]):
+ """
+ Extend b onto a in-place.
+ This will add new keys, and extend sub-lists/dicts, but not overwrite.
+ """
+ for k, v in b.items():
+ if k not in a:
+ a[k] = v
+
+
+def get_all_meta_iter(
+ file: Path, template_dir: str, meta: dict[str, Any]
) -> tuple[dict[str, Any], str, datetime]:
- extension = os.path.splitext(file_path)[1]
-
- fs_metadata = file_path.stat()
- fs_modified = datetime.fromtimestamp(fs_metadata.st_mtime)
-
- try:
- with open(file_path, "r") as f:
- file_content = f.read()
- except Exception:
- file_content = ""
-
- map_with_meta = extract_parse_yaml_metadata(file_content)[0]
-
- map_base = deepcopy(meta)
- map_base.update(map_with_meta)
-
- template = map_base.get("template")
- if not template:
- return map_base, extension, fs_modified
-
- template_file = Path(f"{template_dir}{str(template)}")
-
- try:
- map_templated, extension, template_fs_modified = get_all_meta(
- template_file, template_dir, {}
- )
- except Exception:
- # raise Exception(f"MetadataError: {e}")
- return map_base, extension, fs_modified
-
- map_templated.update(map_base)
- # Shuld really add a cutsom extend function to the hashmap,
- # so lists can be merged and such
-
- return map_templated, extension, max(fs_modified, template_fs_modified)
+ """
+ Will extract all metadata,
+ weird things will happen if the first file doesn't exist
+ """
+
+ meta = deepcopy(meta)
+ fs_modified = datetime.min
+
+ while file is not None:
+ try:
+ extension = os.path.splitext(file)[1]
+ fs_metadata = file.stat()
+ fs_modified = max(datetime.fromtimestamp(fs_metadata.st_mtime), fs_modified)
+
+ with open(file, "r") as f:
+ file_content = f.read()
+ except Exception:
+ file_content = ""
+
+ map_with_meta = extract_parse_yaml_metadata(file_content)[0]
+ dict_soft_extend(meta, map_with_meta)
+
+ template = map_with_meta.get("template")
+ if template is not None:
+ file = Path(f"{template_dir}{str(template)}")
+ else:
+ file = None
+
+ return meta, extension, fs_modified
def str_to_yaml_value(in_str: str) -> Any:
diff --git a/src/smp/__init__.py b/src/smp/__init__.py
index 22085ae..d6e5d52 100644
--- a/src/smp/__init__.py
+++ b/src/smp/__init__.py
@@ -20,7 +20,8 @@ def read_stdin():
import sys
data = sys.stdin.read()
- macro_processor = smp.macro_processor.MacroProcessor()
+ macro_processor_state = smp.macro_processor.MacroProcessorState()
+ macro_processor = macro_processor_state.macro_processor()
res = macro_processor.process_input(data)
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", file=sys.stderr)
print(res)
@@ -44,7 +45,10 @@ def main():
with open(sys.argv[1], "r") as f:
file_content = f.read()
- macro_processor = smp.macro_processor.MacroProcessor()
+ macro_processor_state = smp.macro_processor.MacroProcessorState()
+ macro_processor = macro_processor_state.macro_processor()
res = macro_processor.process_input(file_content)
+ macro_processor.store("", "", "")
+ breakpoint()
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", file=sys.stderr)
print(res)
diff --git a/src/smp/builtins.py b/src/smp/builtins.py
index 3ff15c6..0997165 100644
--- a/src/smp/builtins.py
+++ b/src/smp/builtins.py
@@ -1,11 +1,14 @@
# import smp.exceptions
+import os
import subprocess
import urllib.request
import urllib.error
import urllib.parse
import datetime
import markdown
+from skaldpress.metadata_parser import extract_parse_yaml_metadata
from gfm import AutolinkExtension, TaskListExtension # type: ignore
+from typing import Any
def smp_builtin_define(macro_processor, macro_name, macro_value=None):
@@ -66,11 +69,38 @@ def smp_builtin_ifneq(macro_processor, a, b, iftrue, iffalse=None):
return ""
+def smp_builtin_add_metadata(macro_processor, metadata: dict[str, Any], overwrite=True):
+ """
+ Not added to macro_processor as macro
+ """
+ for macro_name, value in metadata.items():
+ if not macro_name.startswith(
+ macro_processor._get_macro_with_prefix("metadata_prefix")
+ ):
+ macro_name = f"{macro_processor._get_macro_with_prefix('metadata_prefix')}{macro_name}"
+
+ macro_value = str(value)
+ if isinstance(value, list):
+ macro_value = [str(el) for el in value]
+
+ if macro_name in macro_processor.macros:
+ macro_value.extend(macro_processor.macros[macro_name])
+
+ if overwrite or macro_name not in macro_processor.macros:
+ macro_processor.define_macro(macro_name, macro_value)
+
+
def smp_builtin_include(macro_processor, filename):
- filename = macro_processor.process_input(filename)
- with open(filename, "r") as f:
- file_content = f.read()
- return macro_processor.process_input(file_content)
+ return smp_builtin_read(macro_processor, filename, template_content=None)
+
+
+def smp_builtin_parse_leading_yaml(macro_processor, content):
+ """
+ Not added to macro_processor as macro
+ """
+ metadata, content = extract_parse_yaml_metadata(content)
+ smp_builtin_add_metadata(macro_processor, metadata, overwrite=True)
+ return content
def smp_builtin_include_verbatim(macro_processor, filename):
@@ -147,6 +177,51 @@ def smp_builtin_html_from_markdown(macro_processor, text, extensions=list()):
return markdown.markdown(text, extensions=extensions)
+def _smp_builtin_template_content(content):
+ def inner(macro_processor):
+ """
+ This should do some kind of stack thing, so we can track which file we are processing.
+ entering the CONTENT is fine, the question is how to handle exiting it.
+
+ could have a "once" macro or something, that is added to the end of the content.
+ """
+ return content
+
+ return inner
+
+
+def smp_builtin_template(macro_processor, template, content):
+ return smp_builtin_read(macro_processor, template, template_content=content)
+
+
+def smp_builtin_read(macro_processor, filename, template_content=None):
+ with open(filename, "r") as f:
+ file_content = f.read()
+
+ metadata = {}
+ if macro_processor._get_macro_with_prefix("parse_file_yaml"):
+ metadata, file_content = extract_parse_yaml_metadata(file_content)
+ smp_builtin_add_metadata(macro_processor, metadata, overwrite=False)
+
+ extension = os.path.splitext(filename)[1][1:] or ""
+ macro_processor._define_macro_with_prefix("target_file_extension", extension)
+
+ if template_content is not None:
+ macro_processor._get_macro_with_prefix("template_stack").append(filename)
+ macro_processor.macros["CONTENT"] = template_content
+
+ content = macro_processor.process_input(file_content)
+
+ if extension == "md":
+ content = smp_builtin_html_from_markdown(macro_processor, content)
+
+ if (template := macro_processor.macros.get("METADATA_template")) is not None:
+ if template not in macro_processor._get_macro_with_prefix("template_stack"):
+ return smp_builtin_read(macro_processor, template, content)
+
+ return content
+
+
global LINK_CACHE
LINK_CACHE: dict[str, tuple[bool, int, str]] = dict()
@@ -168,14 +243,24 @@ def smp_builtin_wodl(macro_processor, link, timeout_seconds=5):
working_link = (r.status == 200) and (r.reason == "OK")
LINK_CACHE[link] = (working_link, r.status, r.reason)
if not working_link:
- macro_processor.warnings.append(
- f"Dead link {link} ({r.status} {r.reason})!"
- )
+ macro_processor.log_warning(f"Dead link {link} ({r.status} {r.reason})!")
except urllib.error.URLError as e:
- macro_processor.warnings.append(f"Dead link {link} ({e})!")
+ macro_processor.log_warning(f"Dead link {link} ({e})!")
return ""
+def smp_builtin_once(macro_processor, content):
+ if (cache := macro_processor._get_macro_with_prefix("once_cache")) is not None:
+ if (exp := cache.get(content)) is not None:
+ return exp
+ else:
+ macro_processor._define_macro_with_prefix("once_cache", {})
+
+ expanded_content = macro_processor.process_input(content)
+ macro_processor._get_macro_with_prefix("once_cache", expanded_content)
+ return expanded_content
+
+
def smp_builtin_dumpenv(macro_processor):
out = ""
out += "━ Macros ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
@@ -188,7 +273,3 @@ def smp_builtin_dumpenv(macro_processor):
out += f"{repr(key)}: {repr(val)}\n"
out += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
return out
-
-
-# TODO Add macro that spawns a interactive shell with the python env. would allow interactive debugging :)
-# needs to have a continue function or something (probably on C-d
diff --git a/src/smp/macro_processor.py b/src/smp/macro_processor.py
index 68fd726..2af26dd 100644
--- a/src/smp/macro_processor.py
+++ b/src/smp/macro_processor.py
@@ -56,15 +56,18 @@ def seek(input: str, start: int, target: str) -> int | None:
class MacroProcessor:
- """All currently defined macros in this MacroProcessor"""
+ source_file_path: str
+ """All currently defined macros in this MacroProcessor"""
macros: dict[str, Any]
""" All macro invocations that has happened """
macro_invocations: list[tuple[str, list[str]]]
- """ Emitted warnings """
warnings: list[Any]
+
""" Global environment for python execution """
+
py_global_env: dict
+
py_local_env_alt: dict
py_local_env_current: dict
@@ -72,6 +75,7 @@ class MacroProcessor:
start_quote: str = '%"'
end_quote: str = '"%'
+ prefix: str = ""
def __init__(self, prefix=""):
self.macros = dict()
@@ -81,31 +85,46 @@ class MacroProcessor:
self.py_local_env_alt = dict()
self.py_local_env_current = self.macros
self.indent_level = ""
-
- self._define_builtins(self.macros, prefix=prefix)
- self._define_builtins(self.py_local_env_alt, prefix=prefix)
-
- def _define_builtins(self, env, prefix=""):
- env[f"{prefix}macro_processor"] = self
- env[f"{prefix}define"] = smp.builtins.smp_builtin_define
- env[f"{prefix}undefine"] = smp.builtins.smp_builtin_undefine
- env[f"{prefix}define_array"] = smp.builtins.smp_builtin_define_array
- env[f"{prefix}ifdef"] = smp.builtins.smp_builtin_ifdef
- env[f"{prefix}ifndef"] = smp.builtins.smp_builtin_ifndef
- env[f"{prefix}ifeq"] = smp.builtins.smp_builtin_ifeq
- env[f"{prefix}ifneq"] = smp.builtins.smp_builtin_ifneq
- env[f"{prefix}include"] = smp.builtins.smp_builtin_include
- env[f"{prefix}include_verbatim"] = smp.builtins.smp_builtin_include_verbatim
- env[f"{prefix}shell"] = smp.builtins.smp_builtin_shell
- env[f"{prefix}dumpenv"] = smp.builtins.smp_builtin_dumpenv
- env[f"{prefix}eval"] = smp.builtins.smp_builtin_eval
- env[f"{prefix}array_push"] = smp.builtins.smp_builtin_array_push
- env[f"{prefix}array_each"] = smp.builtins.smp_builtin_array_each
- env[f"{prefix}array_size"] = smp.builtins.smp_builtin_array_size
- env[f"{prefix}explode"] = smp.builtins.smp_builtin_explode
- env[f"{prefix}format_time"] = smp.builtins.smp_builtin_format_time
- env[f"{prefix}html_from_markdown"] = smp.builtins.smp_builtin_html_from_markdown
- env[f"{prefix}wodl"] = smp.builtins.smp_builtin_wodl
+ self.prefix = prefix
+
+ self._define_builtins(self.macros)
+ self._define_builtins(self.py_local_env_alt)
+
+ def _define_builtins(self, env):
+ env[f"{self.prefix}macro_processor"] = self
+ env[f"{self.prefix}define"] = smp.builtins.smp_builtin_define
+ env[f"{self.prefix}undefine"] = smp.builtins.smp_builtin_undefine
+ env[f"{self.prefix}define_array"] = smp.builtins.smp_builtin_define_array
+ env[f"{self.prefix}ifdef"] = smp.builtins.smp_builtin_ifdef
+ env[f"{self.prefix}ifndef"] = smp.builtins.smp_builtin_ifndef
+ env[f"{self.prefix}ifeq"] = smp.builtins.smp_builtin_ifeq
+ env[f"{self.prefix}ifneq"] = smp.builtins.smp_builtin_ifneq
+ env[f"{self.prefix}once"] = smp.builtins.smp_builtin_once
+ env[f"{self.prefix}include"] = smp.builtins.smp_builtin_include
+ env[f"{self.prefix}include_verbatim"] = (
+ smp.builtins.smp_builtin_include_verbatim
+ )
+ env[f"{self.prefix}shell"] = smp.builtins.smp_builtin_shell
+ env[f"{self.prefix}dumpenv"] = smp.builtins.smp_builtin_dumpenv
+ env[f"{self.prefix}eval"] = smp.builtins.smp_builtin_eval
+ env[f"{self.prefix}array_push"] = smp.builtins.smp_builtin_array_push
+ env[f"{self.prefix}array_each"] = smp.builtins.smp_builtin_array_each
+ env[f"{self.prefix}array_size"] = smp.builtins.smp_builtin_array_size
+ env[f"{self.prefix}explode"] = smp.builtins.smp_builtin_explode
+ env[f"{self.prefix}format_time"] = smp.builtins.smp_builtin_format_time
+ env[f"{self.prefix}html_from_markdown"] = (
+ smp.builtins.smp_builtin_html_from_markdown
+ )
+ env[f"{self.prefix}wodl"] = smp.builtins.smp_builtin_wodl
+ env[f"{self.prefix}template"] = smp.builtins.smp_builtin_template
+ env[f"{self.prefix}template_stack"] = []
+
+ # If true, include-macros will parse yaml in beginning of content
+ env[f"{self.prefix}parse_file_yaml"] = True
+ # If true, some macros will run in a draft-mode,
+ # meaning they will skip steps that are slow.
+ env[f"{self.prefix}draft"] = False
+ env[f"{self.prefix}metadata_prefix"] = "METADATA_"
def define_macro_string(self, macro_name, macro_value):
self.define_macro(macro_name, str(macro_value))
@@ -113,6 +132,18 @@ class MacroProcessor:
def define_macro(self, macro_name, macro_value):
self.macros[macro_name] = macro_value
+ def _define_macro_with_prefix(self, macro_name, macro_value, sub_prefix: str = ""):
+ self.macros[f"{self.prefix}{sub_prefix}{macro_name}"] = macro_value
+
+ def _get_macro_with_prefix(self, macro_name, sub_prefix: str = "", default=None):
+ return self.macros.get(f"{self.prefix}{sub_prefix}{macro_name}", default)
+
+ def log_warning(self, message):
+ """
+ Here we should add some more information, line number, file etc, when that is available
+ """
+ self.warnings.append(message)
+
def expand_macro(self, macro_name: str, args: list[str] = list()) -> str:
# Ignore trailing underscore in macro name, the parser will pop a space in front if
# present, but we should ignore it for finding the macro.
@@ -152,7 +183,9 @@ class MacroProcessor:
return str(macro(*macro_args))
except Exception as e:
s = f"{macro_name}({','.join([repr(x) for x in args])})"
- self.warnings.append(f"Error expanding macro {s} ({e})")
+ self.log_warning(
+ f"Error expanding macro {s} ({e})\n{traceback.format_exc()}"
+ )
return s
if isinstance(macro, str):
expanded = macro
@@ -175,6 +208,10 @@ class MacroProcessor:
@for <python-expression>
@endfor
+
+ Note: Consider writing a new implementation that does it the same way M4 does,
+ by pushing the expanded macros back to the input string, this may be more confusing,
+ but may also be faster (stream or mutable string)
"""
output = ""
state = ParserState.NORMAL
@@ -322,8 +359,58 @@ class MacroProcessor:
# Handle cases where the text ends with a macro without arguments
if macro_name != "":
if macro_is_whitespace_deleting(macro_name):
- if output[-1] == " ":
+ if len(output) > 0 and output[-1] == " ":
output = output[:-1]
macro_name = macro_name_clean(macro_name)
output += self.expand_macro(macro_name)
return output
+
+ def store(self, **xargs):
+ requested_keys = self.macros.get("METADATA_keep_states", self.macros.keys())
+ for key in self.macros.keys():
+ if key.startswith("METADATA_") and key not in requested_keys:
+ requested_keys.append(key)
+
+ if isinstance(requested_keys, str):
+ requested_keys = [str(requested_keys)]
+
+ needs_recompilation = ("METADATA_keep_states" in self.macros) or (
+ "all_tagged_by" in [x[0] for x in self.macro_invocations]
+ )
+
+ target_filename = self._get_macro_with_prefix(
+ "target_filename", sub_prefix="METADATA_"
+ )
+
+ self.py_global_env["macro_processor_state"][self.source_file_path] = dict(
+ {
+ # "content": "",
+ "stored_data": {
+ k: v for k, v in self.macros.items() if k in requested_keys
+ },
+ "extension": self._get_macro_with_prefix("target_file_extension"),
+ "source_path": self.source_file_path,
+ "needs_recompilation": needs_recompilation,
+ "target_filename": target_filename,
+ **xargs,
+ }
+ )
+ return self.py_global_env["macro_processor_state"][self.source_file_path]
+
+
+class MacroProcessorState:
+ global_state: dict
+
+ def __init__(self):
+ self.global_state = dict()
+
+ def macro_processor(self, macro_processor=None):
+ if macro_processor is None:
+ macro_processor = MacroProcessor()
+
+ macro_processor.py_global_env["macro_processor_state"] = self.global_state
+ return macro_processor
+
+ def print_state(self):
+ for key, val in self.global_state.items():
+ print(f"{key[-20:]:20} {val}")