diff options
author | Qrius <[email protected]> | 2025-04-11 17:14:32 +0200 |
---|---|---|
committer | Qrius <[email protected]> | 2025-04-11 17:14:35 +0200 |
commit | 15b9c3af6b0d58f8d6bb0729a217dc6d9f4666e6 (patch) | |
tree | 88ae7f0ba9a1ef3491f44324c0b24036c1d5afff | |
parent | 9a3ce865e64d496cb835ece3e5a84a80361480ab (diff) | |
download | skaldpress-15b9c3af6b0d58f8d6bb0729a217dc6d9f4666e6.tar.gz skaldpress-15b9c3af6b0d58f8d6bb0729a217dc6d9f4666e6.zip |
Begin rewrite of internals to be more malleable
-rw-r--r-- | src/skaldpress/filelist.py | 81 | ||||
-rw-r--r-- | src/skaldpress/main.py | 338 | ||||
-rw-r--r-- | src/skaldpress/metadata_parser.py | 78 | ||||
-rw-r--r-- | src/smp/__init__.py | 8 | ||||
-rw-r--r-- | src/smp/builtins.py | 105 | ||||
-rw-r--r-- | src/smp/macro_processor.py | 145 |
6 files changed, 383 insertions, 372 deletions
diff --git a/src/skaldpress/filelist.py b/src/skaldpress/filelist.py index 285efb0..92180fa 100644 --- a/src/skaldpress/filelist.py +++ b/src/skaldpress/filelist.py @@ -5,7 +5,7 @@ from datetime import datetime from pathlib import Path from enum import Enum from typing import Any, Generator -from skaldpress.metadata_parser import get_all_meta +from skaldpress.metadata_parser import get_all_meta_iter class FileListFileTargetAction(Enum): @@ -99,28 +99,31 @@ def walk_filetree( elif entry.is_dir(): yield from walk_filetree(entry) except OSError as e: - raise Exception(f"DirectoryReadError: {e}") + raise Exception(f"DirectoryReadError: {directory} {e}") -def enrich_with_metadata(entries, template_dir, additional_metadata, read_metadata): - for entry in entries: - if not read_metadata: - yield entry[0], entry[1], {}, datetime.fromtimestamp( - entry[0].stat().st_mtime - ) - continue +def enrich_with_metadata( + file, path_relative, template_dir, additional_metadata, read_metadata +): + if not read_metadata: + return path_relative, {}, datetime.fromtimestamp(file.stat().st_mtime) - meta, extension, oldest_modified = get_all_meta( - entry[0], template_dir, additional_metadata - ) - if extension[0] != ".": - extension = "." + extension - rel_path = os.path.splitext(entry[1])[0] + extension + meta, extension, oldest_modified = get_all_meta_iter( + file, template_dir, additional_metadata + ) + + if "target_filename" in meta: + path_relative = meta["target_filename"] - yield entry[0], rel_path, meta, oldest_modified + if len(extension) > 0 and extension[0] != ".": + extension = "." + extension + path_relative = os.path.splitext(path_relative)[0] + extension -def make_filelist( + return path_relative, meta, oldest_modified + + +def make_filelist_iter( directory, # : DirEntry base_dir: Path, include: list[str], @@ -129,26 +132,28 @@ def make_filelist( read_metadata: bool, template_dir: str, additional_metadata: dict[str, Any], -) -> Generator[FileListFile, None, None]: - filelist_gen1 = ( - (x, x.path.replace(base_dir, "", 1)) for x in walk_filetree(directory) - ) - filelist_gen2 = ( - x for x in filelist_gen1 if not file_filtered(x[1], include, exclude) - ) - filelist_gen3 = enrich_with_metadata( - filelist_gen2, template_dir, additional_metadata, read_metadata - ) - filelist_gen4 = ( - FileListFile( - file_path=x[0], - file_rel=x[1], - change_time=x[3], - dependencies=[], - target_action=target_action, - metadata=x[2], +) -> list[FileListFile]: + + filelist = [] + for file in walk_filetree(directory): + path_relative = file.path.replace(base_dir, "", 1) + + if file_filtered(path_relative, include, exclude): + continue + + path_relative, meta, oldest_modified = enrich_with_metadata( + file, path_relative, template_dir, additional_metadata, read_metadata + ) + + filelist.append( + FileListFile( + file_path=file, + file_rel=path_relative, + change_time=oldest_modified, + dependencies=[], + target_action=target_action, + metadata=meta, + ) ) - for x in filelist_gen3 - ) - return filelist_gen4 + return filelist diff --git a/src/skaldpress/main.py b/src/skaldpress/main.py index 57da9e5..b58f4ad 100644 --- a/src/skaldpress/main.py +++ b/src/skaldpress/main.py @@ -1,75 +1,60 @@ import os import shutil +import sys +import traceback from argparse import ArgumentParser, ArgumentTypeError -from dataclasses import dataclass from functools import partial from itertools import chain from collections import deque import smp.macro_processor from copy import deepcopy -from skaldpress.metadata_parser import extract_parse_yaml_metadata from skaldpress.filelist import ( - make_filelist, + make_filelist_iter, FileList, FileListFileTargetAction, file_filtered, ) +from smp.builtins import ( + smp_builtin_read, + smp_builtin_add_metadata, +) from time import perf_counter -@dataclass -class CompiledFile: - content: str - metadata: dict - extension: str - stored_smp_state: dict - source_path: str - needs_recompilation: bool - - -COMPILED_FILES: list[CompiledFile] = list() -COMPILED_FILES_BY_TAG: dict[str, list[int]] = dict() - - -def sp_template(macro_processor, template, content): - with open(template, "r") as f: - file_content = f.read() - macro_processor.macros["CONTENT"] = content - return macro_processor.process_input(file_content) - - -# SMP Macro for getting all files with specific tag, this is only _really_ effective the second run -# -# Usage in files: -# all_tagged_by(<tag name>, <template> [, <field to sort by>] [, reversed]) def sp_all_tagged_by( macro_processor, tag: str, template: str, field=None, reversed="" ) -> str: - global COMPILED_FILES, COMPILED_FILES_BY_TAG - - if tag not in COMPILED_FILES_BY_TAG: - print(f" \u001b[35mNo tags for {tag}\u001b[0m") + """ + SMP Macro for getting all files with specific tag, this is only _really_ effective the second run + + Usage in files: + all_tagged_by(<tag name>, <template> [, <field to sort by>] [, reversed]) + """ + tagged_files = [ + k + for k, v in macro_processor.py_global_env["macro_processor_state"].items() + if "METADATA_tags" in v["stored_data"] + ] + + if len(tagged_files) == 0: + macro_processor.log_warning(f"No tags for {tag}\u001b[0m") return "" - tagged_files = deepcopy(COMPILED_FILES_BY_TAG[tag]) - - out = "" - # if field is not None: - # tagged_files = sorted(tagged_files, lambda x: x[field]) - # if args.len() > 3 && args[3] == "reversed" { - # tagged_files.sort_by(|a, b| order_index_by_cached_data(macro_processor, &args[2], b, a)); - # } else { - # tagged_files.sort_by(|a, b| order_index_by_cached_data(macro_processor, &args[2], b, a)); - # } + # FRAGILE + if field is not None: + tagged_files.sort( + key=lambda fname: macro_processor.py_global_env["macro_processor_state"][ + fname + ]["stored_data"][f"METADATA_{field}"], + reverse=(reversed != ""), + ) - for doc_i in tagged_files: - file = COMPILED_FILES[doc_i] + out = "" + for filename in tagged_files: + file = macro_processor.py_global_env["macro_processor_state"][filename] smp_local = deepcopy(macro_processor) - - macro_processor_initialize(file.metadata, smp_local, None) - - out += sp_template(smp_local, template, file.content) - + smp_local.macros.update(file["stored_data"]) + out += smp_builtin_read(smp_local, template, template_content=file["content"]) print_warnings(smp_local) return out @@ -81,212 +66,59 @@ class SkaldpressError(Exception): self.path = path -def cached_file_id_by_path(source_path: str) -> int | None: - global COMPILED_FILES - for i in range(len(COMPILED_FILES)): - if COMPILED_FILES[i].source_path == source_path: - return i - return None - - def print_warnings(macro_processor): for warning in macro_processor.warnings: - # print(f" \u001b[33m{warning.description}\u001b[0m") print(f" \u001b[33m{warning}\u001b[0m") def macro_processor_initialize(metadata, old_macro_processor, additional_state=None): macro_processor = old_macro_processor macro_processor.define_macro("all_tagged_by", sp_all_tagged_by) - macro_processor.define_macro("template", sp_template) - - for key, value in metadata.items(): - macro_name = f"METADATA_{key}" - if macro_name not in macro_processor.macros: - if isinstance(value, list): - macro_value = [str(el) for el in value] - else: - macro_value = str(value) - macro_processor.define_macro(macro_name, macro_value) - + smp_builtin_add_metadata(macro_processor, metadata, overwrite=False) if additional_state: for key, value in additional_state.items(): macro_processor.define_macro(key, value) - global COMPILED_FILES, COMPILED_FILES_BY_TAG - macro_processor.py_global_env["compiled_files"] = COMPILED_FILES - macro_processor.py_global_env["compiled_files_by_tag"] = COMPILED_FILES_BY_TAG - - -def extract_requested_macro_processor_state(macro_processor): - requested_keys = macro_processor.macros.get("METADATA_keep_states") - if requested_keys: - if isinstance(requested_keys, list): - requested_keys = [str(el) for el in requested_keys] - elif isinstance(requested_keys, str): - requested_keys = [str(requested_keys)] - else: - macro_processor.warnings.append( - "keep_states specification must be list or scalar" - ) - return {} - - res = {} - for stored_key in requested_keys: - stored_value = macro_processor.macros.get(stored_key) - if stored_value: - res[stored_key] = stored_value - return res - return {} - - -def needs_recompilation(macro_processor): - if "METADATA_keep_states" in macro_processor.macros: - return True - for macro_name, args in macro_processor.macro_invocations: - if macro_name == "all_tagged_by": - return True - return False - - -def wrap_template(macro_processor, template_file, file_content, opts): - try: - with open(template_file, "r") as f: - template = f.read() - except OSError as e: - raise SkaldpressError(1, e, template_file) - - template_extension = os.path.splitext(template_file)[1][1:] or "" - - template_metadata, template_content = extract_parse_yaml_metadata(template) or ( - {}, - template, - ) - - macro_processor_initialize(template_metadata, macro_processor, None) - macro_processor.define_macro_string("CONTENT", file_content) - try: - content = macro_processor.process_input(template_content) - except Exception as e: - raise SkaldpressError(2, e) - - template_parent = template_metadata.get("template") - if not template_parent: - return content, template_extension - - template_parent = str(template_parent) - print(f" Wrapping in template {template_parent}") - return wrap_template( - macro_processor, f"{opts.template_dir}{template_parent}", content, opts - ) - - -def compile_file(file_path, opts): - global COMPILED_FILES +def compile_file(smps: smp.macro_processor.MacroProcessorState, file_path, opts): extension = os.path.splitext(file_path)[1][1:] or "" if not extension: raise SkaldpressError(3, None) - try: - with open(file_path, "r") as f: - file_content = f.read() - except OSError as e: - raise SkaldpressError(1, e, file_path) - - map, file_content = extract_parse_yaml_metadata(file_content) or ({}, file_content) - map.update(opts.metadata) - filename = os.path.relpath(file_path, opts.content_dir) - map["filename"] = os.path.splitext(filename)[0] - - skip_smp = map.get("skip_smp", "").lower() == "true" - if opts.compilefilter and not file_filtered(file_path, opts.compilefilter, []): - skip_smp = True - - if skip_smp: - return CompiledFile( - content=file_content, - metadata=map, - extension=extension, - source_path=file_path, - needs_recompilation=False, - stored_smp_state={}, - ) - stored_smp_state = None - cfile_i = cached_file_id_by_path(file_path) - if cfile_i is not None: - stored_smp_state = COMPILED_FILES[cfile_i].stored_smp_state - - macro_processor = smp.macro_processor.MacroProcessor() - macro_processor_initialize(map, macro_processor, stored_smp_state) - - if extension == "md": - file_content = f'html_from_markdown(%"{file_content}"%)' - - if "template" not in map: - file_content = macro_processor.process_input(file_content) - print_warnings(macro_processor) - return CompiledFile( - content=file_content, - stored_smp_state=extract_requested_macro_processor_state(macro_processor), - metadata=map, - extension=extension, - source_path=file_path, - needs_recompilation=needs_recompilation(macro_processor), - ) + if file_path in smps.global_state: + stored_smp_state = smps.global_state[file_path]["stored_data"] - template_file = f"{opts.template_dir}{map['template']}" - content, template_extension = wrap_template( - macro_processor, template_file, file_content, opts + macro_processor = smps.macro_processor() + macro_processor.define_macro_string( + "METADATA_filename", os.path.splitext(os.path.relpath(file_path, opts.content_dir))[0] ) + macro_processor.source_file_path = file_path + macro_processor_initialize(opts.metadata, macro_processor, stored_smp_state) + + content = smp_builtin_read(macro_processor, file_path) + print_warnings(macro_processor) - return CompiledFile( - content=content, - stored_smp_state=extract_requested_macro_processor_state(macro_processor), - metadata=map, - extension=template_extension, - source_path=file_path, - needs_recompilation=needs_recompilation(macro_processor), - ) + return macro_processor.store(content=content) -def compile_file_and_write(source_file_path, opts): - global COMPILED_FILES, COMPILED_FILES_BY_TAG - compiled_file = compile_file(source_file_path, opts) - - if opts.first_run: - COMPILED_FILES.append(compiled_file) - cfile_i = len(COMPILED_FILES) - 1 - cfile = COMPILED_FILES[cfile_i] - - tags = cfile.metadata.get("tags") - if tags and isinstance(tags, list): - for tag in tags: - if tag not in COMPILED_FILES_BY_TAG: - COMPILED_FILES_BY_TAG[tag] = [] - COMPILED_FILES_BY_TAG[tag].append(cfile_i) - else: - cfile_i = cached_file_id_by_path(compiled_file.source_path) - if cfile_i is None: - return - COMPILED_FILES[cfile_i], compiled_file = compiled_file, COMPILED_FILES[cfile_i] - cfile = COMPILED_FILES[cfile_i] - - skip_build = cfile.metadata.get("skip_build") - if skip_build and skip_build.lower() == "true": - return +def compile_file_and_write( + smps: smp.macro_processor.MacroProcessorState, source_file_path, opts +): + compiled_file = compile_file(smps, source_file_path, opts) dest_file_path = os.path.join( opts.build_dir, os.path.relpath(source_file_path, opts.content_dir) ) - dest_file_path = os.path.splitext(dest_file_path)[0] + "." + cfile.extension + dest_file_path = ( + os.path.splitext(dest_file_path)[0] + "." + compiled_file["extension"] + ) - target_filename = cfile.metadata.get("target_filename") - if target_filename and isinstance(target_filename, str): + if compiled_file["target_filename"] is not None: dest_file_path = os.path.join( - os.path.dirname(dest_file_path), target_filename + "." + cfile.extension + os.path.dirname(dest_file_path), + compiled_file["target_filename"] + "." + compiled_file["extension"], ) dest_dir = os.path.dirname(dest_file_path) @@ -294,12 +126,12 @@ def compile_file_and_write(source_file_path, opts): print(f"> Writing {source_file_path} to {dest_file_path}") with open(dest_file_path, "w") as f: - f.write(cfile.content) + f.write(compiled_file["content"]) -def compile_files_in_directory(directory, opts): - global COMPILED_FILES - +def compile_files_in_directory( + smps: smp.macro_processor.MacroProcessorState, directory: str, opts +): try: entries = os.listdir(directory) except OSError as e: @@ -308,27 +140,30 @@ def compile_files_in_directory(directory, opts): for entry in entries: path = os.path.join(directory, entry) - needs_recompilation = False - cfile_i = cached_file_id_by_path(path) - if cfile_i is not None: - needs_recompilation = COMPILED_FILES[cfile_i].needs_recompilation + if path in smps.global_state: + needs_compilation = smps.global_state[path]["needs_recompilation"] + else: + needs_compilation = True - should_compile = (opts.first_run or needs_recompilation) and not file_filtered( + should_compile = (needs_compilation) and not file_filtered( path, opts.filter, opts.exclude ) + if os.path.isfile(path) and should_compile: print(f"< Compiling {path}") try: - compile_file_and_write(path, opts) + compile_file_and_write(smps, path, opts) except Exception as e: print(f"\033[31mError compiling {path}: {e}\033[0m") + print(f"{traceback.format_exc()}", file=sys.stderr) elif os.path.isdir(path): try: - compile_files_in_directory(path, opts) + compile_files_in_directory(smps, path, opts) except SkaldpressError as e: print(f"\033[31mError processing directory {path}: {e}\033[0m") except Exception as e: print(f"\033[31mError compiling {path}: {e}\033[0m") + print(f"{traceback.format_exc()}", file=sys.stderr) def check_trailing_slash(arg): @@ -391,8 +226,6 @@ def main(): ) args = parser.parse_args() - args.first_run = True - metadata = {} for val in args.metadata: if "=" not in val: @@ -405,7 +238,7 @@ def main(): args.metadata = metadata now = perf_counter() - filelist_dest = make_filelist( + filelist_dest = make_filelist_iter( args.build_dir, args.build_dir, [], @@ -416,7 +249,7 @@ def main(): args.metadata, ) filelist_src = chain( - make_filelist( + make_filelist_iter( args.static_dir, args.static_dir, [], @@ -426,7 +259,7 @@ def main(): args.template_dir, args.metadata, ), - make_filelist( + make_filelist_iter( args.content_dir, args.content_dir, args.filter, @@ -437,13 +270,14 @@ def main(): args.metadata, ), ) + filelist_dest = FileList.new(filelist_dest) filelist_src = FileList.new(filelist_src) - elapsed = perf_counter() - now print( - f"Generated filelist in {elapsed} seconds, {len(filelist_dest)} in destination, {len(filelist_src)} in source" + f"Generated filelist in {perf_counter() - now:.6f} seconds, {len(filelist_dest)} in destination, {len(filelist_src)} in source" ) + now = perf_counter() work_queue = deque() @@ -459,17 +293,13 @@ def main(): for file in filelist_src.changed_from(filelist_dest): work_queue.append((file.file_rel, file.target_action)) - dependants = [] - for filename, file in filelist_src.files.items(): - if "dependencies" not in file.metadata: - continue - dependants.append((file.metadata["dependencies"], filename)) - - elapsed = perf_counter() - elapsed print( - f"Generated work_queue in {elapsed} seconds, {len(work_queue)} actions to process" + f"Generated work_queue in {perf_counter() - now:.2f} seconds, {len(work_queue)} actions to process" ) + dest_dir = os.path.dirname(args.build_dir) + os.makedirs(dest_dir, exist_ok=True) + for filename, action in work_queue: if action != FileListFileTargetAction.COMPILE: print(f"> {action} {filename}") @@ -495,7 +325,7 @@ def main(): except Exception as e: print(f" \u001b[31mCould not copy file ({e})\u001b[0m") - compile_files_in_directory(args.content_dir, args) + macro_processor_state = smp.macro_processor.MacroProcessorState() + compile_files_in_directory(macro_processor_state, args.content_dir, args) print("\n=======================\n") - args.first_run = False - compile_files_in_directory(args.content_dir, args) + compile_files_in_directory(macro_processor_state, args.content_dir, args) diff --git a/src/skaldpress/metadata_parser.py b/src/skaldpress/metadata_parser.py index 85e6a65..18ae56b 100644 --- a/src/skaldpress/metadata_parser.py +++ b/src/skaldpress/metadata_parser.py @@ -5,44 +5,48 @@ from copy import deepcopy from datetime import datetime -def get_all_meta( - file_path: Path, template_dir: str, meta: dict[str, Any] +def dict_soft_extend(a: dict[Any, Any], b: dict[Any, Any]): + """ + Extend b onto a in-place. + This will add new keys, and extend sub-lists/dicts, but not overwrite. + """ + for k, v in b.items(): + if k not in a: + a[k] = v + + +def get_all_meta_iter( + file: Path, template_dir: str, meta: dict[str, Any] ) -> tuple[dict[str, Any], str, datetime]: - extension = os.path.splitext(file_path)[1] - - fs_metadata = file_path.stat() - fs_modified = datetime.fromtimestamp(fs_metadata.st_mtime) - - try: - with open(file_path, "r") as f: - file_content = f.read() - except Exception: - file_content = "" - - map_with_meta = extract_parse_yaml_metadata(file_content)[0] - - map_base = deepcopy(meta) - map_base.update(map_with_meta) - - template = map_base.get("template") - if not template: - return map_base, extension, fs_modified - - template_file = Path(f"{template_dir}{str(template)}") - - try: - map_templated, extension, template_fs_modified = get_all_meta( - template_file, template_dir, {} - ) - except Exception: - # raise Exception(f"MetadataError: {e}") - return map_base, extension, fs_modified - - map_templated.update(map_base) - # Shuld really add a cutsom extend function to the hashmap, - # so lists can be merged and such - - return map_templated, extension, max(fs_modified, template_fs_modified) + """ + Will extract all metadata, + weird things will happen if the first file doesn't exist + """ + + meta = deepcopy(meta) + fs_modified = datetime.min + + while file is not None: + try: + extension = os.path.splitext(file)[1] + fs_metadata = file.stat() + fs_modified = max(datetime.fromtimestamp(fs_metadata.st_mtime), fs_modified) + + with open(file, "r") as f: + file_content = f.read() + except Exception: + file_content = "" + + map_with_meta = extract_parse_yaml_metadata(file_content)[0] + dict_soft_extend(meta, map_with_meta) + + template = map_with_meta.get("template") + if template is not None: + file = Path(f"{template_dir}{str(template)}") + else: + file = None + + return meta, extension, fs_modified def str_to_yaml_value(in_str: str) -> Any: diff --git a/src/smp/__init__.py b/src/smp/__init__.py index 22085ae..d6e5d52 100644 --- a/src/smp/__init__.py +++ b/src/smp/__init__.py @@ -20,7 +20,8 @@ def read_stdin(): import sys data = sys.stdin.read() - macro_processor = smp.macro_processor.MacroProcessor() + macro_processor_state = smp.macro_processor.MacroProcessorState() + macro_processor = macro_processor_state.macro_processor() res = macro_processor.process_input(data) print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", file=sys.stderr) print(res) @@ -44,7 +45,10 @@ def main(): with open(sys.argv[1], "r") as f: file_content = f.read() - macro_processor = smp.macro_processor.MacroProcessor() + macro_processor_state = smp.macro_processor.MacroProcessorState() + macro_processor = macro_processor_state.macro_processor() res = macro_processor.process_input(file_content) + macro_processor.store("", "", "") + breakpoint() print("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━", file=sys.stderr) print(res) diff --git a/src/smp/builtins.py b/src/smp/builtins.py index 3ff15c6..0997165 100644 --- a/src/smp/builtins.py +++ b/src/smp/builtins.py @@ -1,11 +1,14 @@ # import smp.exceptions +import os import subprocess import urllib.request import urllib.error import urllib.parse import datetime import markdown +from skaldpress.metadata_parser import extract_parse_yaml_metadata from gfm import AutolinkExtension, TaskListExtension # type: ignore +from typing import Any def smp_builtin_define(macro_processor, macro_name, macro_value=None): @@ -66,11 +69,38 @@ def smp_builtin_ifneq(macro_processor, a, b, iftrue, iffalse=None): return "" +def smp_builtin_add_metadata(macro_processor, metadata: dict[str, Any], overwrite=True): + """ + Not added to macro_processor as macro + """ + for macro_name, value in metadata.items(): + if not macro_name.startswith( + macro_processor._get_macro_with_prefix("metadata_prefix") + ): + macro_name = f"{macro_processor._get_macro_with_prefix('metadata_prefix')}{macro_name}" + + macro_value = str(value) + if isinstance(value, list): + macro_value = [str(el) for el in value] + + if macro_name in macro_processor.macros: + macro_value.extend(macro_processor.macros[macro_name]) + + if overwrite or macro_name not in macro_processor.macros: + macro_processor.define_macro(macro_name, macro_value) + + def smp_builtin_include(macro_processor, filename): - filename = macro_processor.process_input(filename) - with open(filename, "r") as f: - file_content = f.read() - return macro_processor.process_input(file_content) + return smp_builtin_read(macro_processor, filename, template_content=None) + + +def smp_builtin_parse_leading_yaml(macro_processor, content): + """ + Not added to macro_processor as macro + """ + metadata, content = extract_parse_yaml_metadata(content) + smp_builtin_add_metadata(macro_processor, metadata, overwrite=True) + return content def smp_builtin_include_verbatim(macro_processor, filename): @@ -147,6 +177,51 @@ def smp_builtin_html_from_markdown(macro_processor, text, extensions=list()): return markdown.markdown(text, extensions=extensions) +def _smp_builtin_template_content(content): + def inner(macro_processor): + """ + This should do some kind of stack thing, so we can track which file we are processing. + entering the CONTENT is fine, the question is how to handle exiting it. + + could have a "once" macro or something, that is added to the end of the content. + """ + return content + + return inner + + +def smp_builtin_template(macro_processor, template, content): + return smp_builtin_read(macro_processor, template, template_content=content) + + +def smp_builtin_read(macro_processor, filename, template_content=None): + with open(filename, "r") as f: + file_content = f.read() + + metadata = {} + if macro_processor._get_macro_with_prefix("parse_file_yaml"): + metadata, file_content = extract_parse_yaml_metadata(file_content) + smp_builtin_add_metadata(macro_processor, metadata, overwrite=False) + + extension = os.path.splitext(filename)[1][1:] or "" + macro_processor._define_macro_with_prefix("target_file_extension", extension) + + if template_content is not None: + macro_processor._get_macro_with_prefix("template_stack").append(filename) + macro_processor.macros["CONTENT"] = template_content + + content = macro_processor.process_input(file_content) + + if extension == "md": + content = smp_builtin_html_from_markdown(macro_processor, content) + + if (template := macro_processor.macros.get("METADATA_template")) is not None: + if template not in macro_processor._get_macro_with_prefix("template_stack"): + return smp_builtin_read(macro_processor, template, content) + + return content + + global LINK_CACHE LINK_CACHE: dict[str, tuple[bool, int, str]] = dict() @@ -168,14 +243,24 @@ def smp_builtin_wodl(macro_processor, link, timeout_seconds=5): working_link = (r.status == 200) and (r.reason == "OK") LINK_CACHE[link] = (working_link, r.status, r.reason) if not working_link: - macro_processor.warnings.append( - f"Dead link {link} ({r.status} {r.reason})!" - ) + macro_processor.log_warning(f"Dead link {link} ({r.status} {r.reason})!") except urllib.error.URLError as e: - macro_processor.warnings.append(f"Dead link {link} ({e})!") + macro_processor.log_warning(f"Dead link {link} ({e})!") return "" +def smp_builtin_once(macro_processor, content): + if (cache := macro_processor._get_macro_with_prefix("once_cache")) is not None: + if (exp := cache.get(content)) is not None: + return exp + else: + macro_processor._define_macro_with_prefix("once_cache", {}) + + expanded_content = macro_processor.process_input(content) + macro_processor._get_macro_with_prefix("once_cache", expanded_content) + return expanded_content + + def smp_builtin_dumpenv(macro_processor): out = "" out += "━ Macros ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" @@ -188,7 +273,3 @@ def smp_builtin_dumpenv(macro_processor): out += f"{repr(key)}: {repr(val)}\n" out += "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" return out - - -# TODO Add macro that spawns a interactive shell with the python env. would allow interactive debugging :) -# needs to have a continue function or something (probably on C-d diff --git a/src/smp/macro_processor.py b/src/smp/macro_processor.py index 68fd726..2af26dd 100644 --- a/src/smp/macro_processor.py +++ b/src/smp/macro_processor.py @@ -56,15 +56,18 @@ def seek(input: str, start: int, target: str) -> int | None: class MacroProcessor: - """All currently defined macros in this MacroProcessor""" + source_file_path: str + """All currently defined macros in this MacroProcessor""" macros: dict[str, Any] """ All macro invocations that has happened """ macro_invocations: list[tuple[str, list[str]]] - """ Emitted warnings """ warnings: list[Any] + """ Global environment for python execution """ + py_global_env: dict + py_local_env_alt: dict py_local_env_current: dict @@ -72,6 +75,7 @@ class MacroProcessor: start_quote: str = '%"' end_quote: str = '"%' + prefix: str = "" def __init__(self, prefix=""): self.macros = dict() @@ -81,31 +85,46 @@ class MacroProcessor: self.py_local_env_alt = dict() self.py_local_env_current = self.macros self.indent_level = "" - - self._define_builtins(self.macros, prefix=prefix) - self._define_builtins(self.py_local_env_alt, prefix=prefix) - - def _define_builtins(self, env, prefix=""): - env[f"{prefix}macro_processor"] = self - env[f"{prefix}define"] = smp.builtins.smp_builtin_define - env[f"{prefix}undefine"] = smp.builtins.smp_builtin_undefine - env[f"{prefix}define_array"] = smp.builtins.smp_builtin_define_array - env[f"{prefix}ifdef"] = smp.builtins.smp_builtin_ifdef - env[f"{prefix}ifndef"] = smp.builtins.smp_builtin_ifndef - env[f"{prefix}ifeq"] = smp.builtins.smp_builtin_ifeq - env[f"{prefix}ifneq"] = smp.builtins.smp_builtin_ifneq - env[f"{prefix}include"] = smp.builtins.smp_builtin_include - env[f"{prefix}include_verbatim"] = smp.builtins.smp_builtin_include_verbatim - env[f"{prefix}shell"] = smp.builtins.smp_builtin_shell - env[f"{prefix}dumpenv"] = smp.builtins.smp_builtin_dumpenv - env[f"{prefix}eval"] = smp.builtins.smp_builtin_eval - env[f"{prefix}array_push"] = smp.builtins.smp_builtin_array_push - env[f"{prefix}array_each"] = smp.builtins.smp_builtin_array_each - env[f"{prefix}array_size"] = smp.builtins.smp_builtin_array_size - env[f"{prefix}explode"] = smp.builtins.smp_builtin_explode - env[f"{prefix}format_time"] = smp.builtins.smp_builtin_format_time - env[f"{prefix}html_from_markdown"] = smp.builtins.smp_builtin_html_from_markdown - env[f"{prefix}wodl"] = smp.builtins.smp_builtin_wodl + self.prefix = prefix + + self._define_builtins(self.macros) + self._define_builtins(self.py_local_env_alt) + + def _define_builtins(self, env): + env[f"{self.prefix}macro_processor"] = self + env[f"{self.prefix}define"] = smp.builtins.smp_builtin_define + env[f"{self.prefix}undefine"] = smp.builtins.smp_builtin_undefine + env[f"{self.prefix}define_array"] = smp.builtins.smp_builtin_define_array + env[f"{self.prefix}ifdef"] = smp.builtins.smp_builtin_ifdef + env[f"{self.prefix}ifndef"] = smp.builtins.smp_builtin_ifndef + env[f"{self.prefix}ifeq"] = smp.builtins.smp_builtin_ifeq + env[f"{self.prefix}ifneq"] = smp.builtins.smp_builtin_ifneq + env[f"{self.prefix}once"] = smp.builtins.smp_builtin_once + env[f"{self.prefix}include"] = smp.builtins.smp_builtin_include + env[f"{self.prefix}include_verbatim"] = ( + smp.builtins.smp_builtin_include_verbatim + ) + env[f"{self.prefix}shell"] = smp.builtins.smp_builtin_shell + env[f"{self.prefix}dumpenv"] = smp.builtins.smp_builtin_dumpenv + env[f"{self.prefix}eval"] = smp.builtins.smp_builtin_eval + env[f"{self.prefix}array_push"] = smp.builtins.smp_builtin_array_push + env[f"{self.prefix}array_each"] = smp.builtins.smp_builtin_array_each + env[f"{self.prefix}array_size"] = smp.builtins.smp_builtin_array_size + env[f"{self.prefix}explode"] = smp.builtins.smp_builtin_explode + env[f"{self.prefix}format_time"] = smp.builtins.smp_builtin_format_time + env[f"{self.prefix}html_from_markdown"] = ( + smp.builtins.smp_builtin_html_from_markdown + ) + env[f"{self.prefix}wodl"] = smp.builtins.smp_builtin_wodl + env[f"{self.prefix}template"] = smp.builtins.smp_builtin_template + env[f"{self.prefix}template_stack"] = [] + + # If true, include-macros will parse yaml in beginning of content + env[f"{self.prefix}parse_file_yaml"] = True + # If true, some macros will run in a draft-mode, + # meaning they will skip steps that are slow. + env[f"{self.prefix}draft"] = False + env[f"{self.prefix}metadata_prefix"] = "METADATA_" def define_macro_string(self, macro_name, macro_value): self.define_macro(macro_name, str(macro_value)) @@ -113,6 +132,18 @@ class MacroProcessor: def define_macro(self, macro_name, macro_value): self.macros[macro_name] = macro_value + def _define_macro_with_prefix(self, macro_name, macro_value, sub_prefix: str = ""): + self.macros[f"{self.prefix}{sub_prefix}{macro_name}"] = macro_value + + def _get_macro_with_prefix(self, macro_name, sub_prefix: str = "", default=None): + return self.macros.get(f"{self.prefix}{sub_prefix}{macro_name}", default) + + def log_warning(self, message): + """ + Here we should add some more information, line number, file etc, when that is available + """ + self.warnings.append(message) + def expand_macro(self, macro_name: str, args: list[str] = list()) -> str: # Ignore trailing underscore in macro name, the parser will pop a space in front if # present, but we should ignore it for finding the macro. @@ -152,7 +183,9 @@ class MacroProcessor: return str(macro(*macro_args)) except Exception as e: s = f"{macro_name}({','.join([repr(x) for x in args])})" - self.warnings.append(f"Error expanding macro {s} ({e})") + self.log_warning( + f"Error expanding macro {s} ({e})\n{traceback.format_exc()}" + ) return s if isinstance(macro, str): expanded = macro @@ -175,6 +208,10 @@ class MacroProcessor: @for <python-expression> @endfor + + Note: Consider writing a new implementation that does it the same way M4 does, + by pushing the expanded macros back to the input string, this may be more confusing, + but may also be faster (stream or mutable string) """ output = "" state = ParserState.NORMAL @@ -322,8 +359,58 @@ class MacroProcessor: # Handle cases where the text ends with a macro without arguments if macro_name != "": if macro_is_whitespace_deleting(macro_name): - if output[-1] == " ": + if len(output) > 0 and output[-1] == " ": output = output[:-1] macro_name = macro_name_clean(macro_name) output += self.expand_macro(macro_name) return output + + def store(self, **xargs): + requested_keys = self.macros.get("METADATA_keep_states", self.macros.keys()) + for key in self.macros.keys(): + if key.startswith("METADATA_") and key not in requested_keys: + requested_keys.append(key) + + if isinstance(requested_keys, str): + requested_keys = [str(requested_keys)] + + needs_recompilation = ("METADATA_keep_states" in self.macros) or ( + "all_tagged_by" in [x[0] for x in self.macro_invocations] + ) + + target_filename = self._get_macro_with_prefix( + "target_filename", sub_prefix="METADATA_" + ) + + self.py_global_env["macro_processor_state"][self.source_file_path] = dict( + { + # "content": "", + "stored_data": { + k: v for k, v in self.macros.items() if k in requested_keys + }, + "extension": self._get_macro_with_prefix("target_file_extension"), + "source_path": self.source_file_path, + "needs_recompilation": needs_recompilation, + "target_filename": target_filename, + **xargs, + } + ) + return self.py_global_env["macro_processor_state"][self.source_file_path] + + +class MacroProcessorState: + global_state: dict + + def __init__(self): + self.global_state = dict() + + def macro_processor(self, macro_processor=None): + if macro_processor is None: + macro_processor = MacroProcessor() + + macro_processor.py_global_env["macro_processor_state"] = self.global_state + return macro_processor + + def print_state(self): + for key, val in self.global_state.items(): + print(f"{key[-20:]:20} {val}") |