From 44fecdfe41596acba584cdc9a93b67e92a3f1f0d Mon Sep 17 00:00:00 2001 From: Qrius Date: Mon, 10 Mar 2025 13:28:03 +0100 Subject: Finish feature parity --- src/skaldpress/file_metadata_extract.py | 50 +++++++++ src/skaldpress/file_metadata_extract.rs | 0 src/skaldpress/filelist.py | 154 +++++++++++++++++++++++++++ src/skaldpress/main.py | 180 +++++++++++++++++++++++++++++--- src/skaldpress/smp_macros.py | 1 - src/smp/builtins.py | 2 +- src/smp/macro_processor.py | 27 +++++ 7 files changed, 396 insertions(+), 18 deletions(-) create mode 100644 src/skaldpress/file_metadata_extract.py delete mode 100644 src/skaldpress/file_metadata_extract.rs create mode 100644 src/skaldpress/filelist.py (limited to 'src') diff --git a/src/skaldpress/file_metadata_extract.py b/src/skaldpress/file_metadata_extract.py new file mode 100644 index 0000000..b2da095 --- /dev/null +++ b/src/skaldpress/file_metadata_extract.py @@ -0,0 +1,50 @@ +import os.path +from pathlib import Path +from typing import Any +from copy import deepcopy +from datetime import datetime +from skaldpress.metadata_parser import extract_parse_yaml_metadata + + +def get_template_path(template: str, template_dir: str): + return Path(f"{template_dir}{template}") + + +def get_all_meta( + file_path: Path, template_dir: str, meta: dict[str, Any] +) -> tuple[dict[str, Any], str, datetime]: + filename, extension = os.path.splitext(file_path) + + fs_metadata = file_path.stat() + fs_modified = datetime.fromtimestamp(fs_metadata.st_mtime) + + try: + with open(file_path, "r") as f: + file_content = f.read() + except Exception: + file_content = "" + + map_with_meta = extract_parse_yaml_metadata(file_content)[0] + + map_base = deepcopy(meta) + map_base.update(map_with_meta) + + template = map_base.get("template") + if not template: + return map_base, extension, fs_modified + + template_file = get_template_path(str(template), template_dir) + + try: + map_templated, extension, template_fs_modified = get_all_meta( + template_file, template_dir, {} + ) + except Exception as e: + # raise Exception(f"MetadataError: {e}") + return map_base, extension, fs_modified + + map_templated.update(map_base) + # Shuld really add a cutsom extend function to the hashmap, + # so lists can be merged and such + + return map_templated, extension, max(fs_modified, template_fs_modified) diff --git a/src/skaldpress/file_metadata_extract.rs b/src/skaldpress/file_metadata_extract.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/skaldpress/filelist.py b/src/skaldpress/filelist.py new file mode 100644 index 0000000..b4fc843 --- /dev/null +++ b/src/skaldpress/filelist.py @@ -0,0 +1,154 @@ +import os +from os import DirEntry +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from enum import Enum +from typing import Any, Generator, Iterable +from skaldpress.file_metadata_extract import get_all_meta + + +class FileListFileTargetAction(Enum): + COMPILE = 1 + COPY = 2 + NONE = 3 + DELETE = 4 + + +@dataclass +class FileListFile: + file_path: DirEntry + file_rel: str + change_time: datetime + dependencies: list[str] + target_action: FileListFileTargetAction + metadata: dict[str, Any] + + +@dataclass +class FileList: + files: dict[str, FileListFile] + + def __init__(self): + self.files = dict() + + @staticmethod + def new(files) -> "FileList": + filelist = FileList() + for file in files: + filelist.add(file) + return filelist + + def add(self, file: FileListFile): + self.files[file.file_rel] = file + + def __getitem__(self, filename) -> FileListFile: + return self.files[filename] + + def __len__(self) -> int: + return len(self.files) + + def missing_from(self, other: "FileList") -> Generator[FileListFile, None, None]: + for fname, file in self.files.items(): + if fname not in other.files: + yield file + + def changed_from(self, other: "FileList") -> Generator[FileListFile, None, None]: + for fname, file in self.files.items(): + if ( + fname in other.files + and other.files[fname].change_time < file.change_time + ): + yield file + + +def file_pat_match(file: str, pat: str) -> bool: + assert len(pat) != 0 + if pat == "*": + return True + if pat[0] == "*" and file[-len(pat[1:]) :] == pat[1:]: + return True + if pat[-1] == "*" and file[:-1] == pat[:-1]: + return True + return file == pat + + +def file_filtered(file: str, filters: list[str], exclude: list[str]) -> bool: + # Now that we are in pyhton, this could be changed to regex. at the start of the program the filter can be transformed into a regex + # r = "^(" + # for i, filter in enumrate(filters): + # if i > 0: r += "|" + # r += filter.replace("(", "\(").replace(")", "\)") # This should check whether the parantheses is already escaped + # r += ")$" + # r = re.compile(r, re.IGNORE_CASE) + # This would probably be faster, as it runs in C + if any(file_pat_match(file, pat) for pat in exclude): + return True + if not filters: + return False + return not any(file_pat_match(file, pat) for pat in filters) + + +def walk_filetree( + directory, # : DirEntry +) -> Generator: + try: + for entry in os.scandir(directory): + if entry.is_file(): + yield entry + elif entry.is_dir(): + yield from walk_filetree(entry) + except OSError as e: + raise Exception(f"DirectoryReadError: {e}") + + +def enrich_with_metadata(entries, template_dir, additional_metadata, read_metadata): + for entry in entries: + if not read_metadata: + yield entry[0], entry[1], {}, datetime.fromtimestamp( + entry[0].stat().st_mtime + ) + continue + + meta, extension, oldest_modified = get_all_meta( + entry[0], template_dir, additional_metadata + ) + if extension[0] != ".": + extension = "." + extension + rel_path = os.path.splitext(entry[1])[0] + extension + + yield entry[0], rel_path, meta, oldest_modified + + +def make_filelist( + directory, # : DirEntry + base_dir: Path, + include: list[str], + exclude: list[str], + target_action: Any, + read_metadata: bool, + template_dir: str, + additional_metadata: dict[str, Any], +) -> Generator[FileListFile, None, None]: + filelist_gen = ( + (x, x.path.replace(base_dir, "", 1)) for x in walk_filetree(directory) + ) + filelist_gen = ( + x for x in filelist_gen if not file_filtered(x[1], include, exclude) + ) + filelist_gen = enrich_with_metadata( + filelist_gen, template_dir, additional_metadata, read_metadata + ) + filelist_gen = ( + FileListFile( + file_path=x[0], + file_rel=x[1], + change_time=x[3], + dependencies=[], + target_action=target_action, + metadata=x[2], + ) + for x in filelist_gen + ) + + return filelist_gen diff --git a/src/skaldpress/main.py b/src/skaldpress/main.py index cdc4464..5dbb196 100644 --- a/src/skaldpress/main.py +++ b/src/skaldpress/main.py @@ -1,9 +1,16 @@ import os -from argparse import ArgumentParser +import shutil +from pathlib import Path +from argparse import ArgumentParser, ArgumentTypeError from dataclasses import dataclass +from functools import partial +from itertools import chain +from collections import deque import smp.macro_processor from copy import deepcopy from skaldpress.metadata_parser import extract_parse_yaml_metadata +from skaldpress.filelist import make_filelist, FileList, FileListFileTargetAction +from time import perf_counter @dataclass @@ -264,7 +271,7 @@ def compile_file(file_path, opts): def compile_file_and_write(source_file_path, opts): - global COMPILED_FILES + global COMPILED_FILES, COMPILED_FILES_BY_TAG compiled_file = compile_file(source_file_path, opts) if opts.first_run: @@ -274,11 +281,10 @@ def compile_file_and_write(source_file_path, opts): tags = cfile.metadata.get("tags") if tags and isinstance(tags, list): - compiled_files_by_tag = COMPILED_FILES_BY_TAG for tag in tags: - if tag not in compiled_files_by_tag: - compiled_files_by_tag[tag] = [] - compiled_files_by_tag[tag].append(cfile_i) + if tag not in COMPILED_FILES_BY_TAG: + COMPILED_FILES_BY_TAG[tag] = [] + COMPILED_FILES_BY_TAG[tag].append(cfile_i) else: cfile_i = cached_file_id_by_path(compiled_file.source_path) if cfile_i is None: @@ -334,35 +340,177 @@ def compile_files_in_directory(directory, opts): compile_file_and_write(path, opts) except Exception as e: print(f"\033[31mError compiling {path}: {e}\033[0m") - raise e elif os.path.isdir(path): try: compile_files_in_directory(path, opts) except SkaldpressError as e: print(f"\033[31mError processing directory {path}: {e}\033[0m") + except Exception as e: + print(f"\033[31mError compiling {path}: {e}\033[0m") + + +def check_trailing_slash(arg): + if not arg.endswith("/"): + raise ArgumentTypeError("Argument must end with '/'") + return arg def main(): + comma_arg = partial(str.split, sep=",") parser = ArgumentParser() parser.add_argument( - "-o", "--out", "--output", metavar="path", default="build/", dest="build_dir" + "-o", + "--out", + "--output", + metavar="path", + default="build/", + dest="build_dir", + type=check_trailing_slash, + ) + parser.add_argument( + "-i", + "--input", + metavar="path", + default="content/", + dest="content_dir", + type=check_trailing_slash, ) parser.add_argument( - "-i", "--input", metavar="path", default="content/", dest="content_dir" + "-s", + "--static", + metavar="path", + default="static/", + dest="static_dir", + type=check_trailing_slash, ) - parser.add_argument("-s", "--static", metavar="path", default="static/") parser.add_argument( - "-t", "--templates", metavar="path", default="templates/", dest="template_dir" + "-t", + "--templates", + metavar="path", + default="templates/", + dest="template_dir", + type=check_trailing_slash, + ) + parser.add_argument("-f", "--filter", metavar="filter", default=[], type=comma_arg) + parser.add_argument("-e", "--exclude", metavar="filter", default=[], type=comma_arg) + parser.add_argument("-m", "--metadata", nargs="+", metavar="key=value", default=[], action="extend") + parser.add_argument( + "-c", "--compilefilter", metavar="filter", default=[], type=comma_arg + ) + parser.add_argument( + "-x", + "--xclude", + metavar="filter", + default=[], + dest="static_exclude", + type=comma_arg, ) - parser.add_argument("-f", "--filter", metavar="filter", default=[]) - parser.add_argument("-e", "--exclude", metavar="filter", default=[]) - parser.add_argument("-m", "--metadata", nargs="+", metavar="key=value", default=[]) - parser.add_argument("-c", "--compilefilter", metavar="filter", default=[]) - parser.add_argument("-x", "--xclude", metavar="filter", default=[]) args = parser.parse_args() args.first_run = True + metadata = {} + for val in args.metadata: + if "=" not in val: + raise ValueError("metadata must be KEY=VAL (got {repr(val)})") + key, val = map(str.strip, val.split("=", 1)) + if "," in val: + metadata[key] = [x for x in val.split(",") if x != ""] + else: + metadata[key] = val + args.metadata = metadata + + now = perf_counter() + filelist_dest = make_filelist( + args.build_dir, + args.build_dir, + [], + [], + FileListFileTargetAction.NONE, + False, + args.template_dir, + args.metadata, + ) + filelist_src = chain( + make_filelist( + args.static_dir, + args.static_dir, + [], + args.static_exclude, + FileListFileTargetAction.COPY, + False, + args.template_dir, + args.metadata, + ), + make_filelist( + args.content_dir, + args.content_dir, + args.filter, + args.exclude, + FileListFileTargetAction.COMPILE, + True, + args.template_dir, + args.metadata, + ), + ) + filelist_dest = FileList.new(filelist_dest) + filelist_src = FileList.new(filelist_src) + + elapsed = perf_counter() - now + print( + f"Generated filelist in {elapsed} seconds, {len(filelist_dest)} in destination, {len(filelist_src)} in source" + ) + + work_queue = deque() + + # We also will not currently discover empty directories from build target, + # we should attempt to do that. + for file in filelist_dest.missing_from(filelist_src): + filelist_dest[file.file_rel].target_action = FileListFileTargetAction.DELETE + work_queue.append((file.file_rel, FileListFileTargetAction.DELETE)) + + for file in filelist_src.missing_from(filelist_dest): + work_queue.append((file.file_rel, file.target_action)) + + for file in filelist_src.changed_from(filelist_dest): + work_queue.append((file.file_rel, file.target_action)) + + dependants = [] + for filename, file in filelist_src.files.items(): + if "dependencies" not in file.metadata: + continue + dependants.append((file.metadata["dependencies"], filename)) + + elapsed = perf_counter() - elapsed + print( + f"Generated work_queue in {elapsed} seconds, {len(work_queue)} actions to process" + ) + + for filename, action in work_queue: + if action != FileListFileTargetAction.COMPILE: + print(f"> {action} {filename}") + + if action == FileListFileTargetAction.DELETE: + file = filelist_dest[filename] + print(f" Deleting {file.file_path.path}") + try: + os.remove(file.file_path) + except Exception as e: + print(f" \u001b[31mCould not delete file ({e})\u001b[0m") + + elif action == FileListFileTargetAction.COPY: + file = filelist_src[filename] + dest_file_path = os.path.join(args.build_dir, file.file_rel) + print(f" Copying {file.file_path.path}") + print(f" {dest_file_path}") + if dest_file_path in filelist_dest.files: + print(f"Exists already {filelist_dest.files[dest_file_path]}") + + try: + shutil.copy(file.file_path, dest_file_path) + except Exception as e: + print(f" \u001b[31mCould not copy file ({e})\u001b[0m") + compile_files_in_directory(args.content_dir, args) print("\n=======================\n") args.first_run = False diff --git a/src/skaldpress/smp_macros.py b/src/skaldpress/smp_macros.py index 8b13789..e69de29 100644 --- a/src/skaldpress/smp_macros.py +++ b/src/skaldpress/smp_macros.py @@ -1 +0,0 @@ - diff --git a/src/smp/builtins.py b/src/smp/builtins.py index c1d67ce..3ff15c6 100644 --- a/src/smp/builtins.py +++ b/src/smp/builtins.py @@ -20,7 +20,7 @@ def smp_builtin_define(macro_processor, macro_name, macro_value=None): def smp_builtin_undefine(macro_processor, macro_name): if macro_name in macro_processor.macros: - del macro_processor[macro_name] + del macro_processor.macros[macro_name] return "" diff --git a/src/smp/macro_processor.py b/src/smp/macro_processor.py index bda6c6f..68fd726 100644 --- a/src/smp/macro_processor.py +++ b/src/smp/macro_processor.py @@ -31,6 +31,30 @@ def macro_name_clean(macro_name: str) -> str: return macro_name +def seek(input: str, start: int, target: str) -> int | None: + """Seek for a value in a string, consider using startswith instead""" + from warnings import warn + + warn( + "seek should be considered replaced with str.startswith", + DeprecationWarning, + stacklevel=2, + ) + input_end = len(input) + target_end = len(target) + + if input_end < start + target_end: + return None + + i = 0 + while i < len(target): + if input[start + i] != target[i]: + return None + i += 1 + + return start + target_end + + class MacroProcessor: """All currently defined macros in this MacroProcessor""" @@ -46,6 +70,9 @@ class MacroProcessor: special_macros: dict[str, tuple[Any, Any]] + start_quote: str = '%"' + end_quote: str = '"%' + def __init__(self, prefix=""): self.macros = dict() self.macro_invocations = list() -- cgit v1.2.3