import os import re import time from pathlib import Path from lupa import LuaRuntime, LuaError from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from log.Logger import Logger PLUGINS_DIR = Path(__file__).parent / "plugins" HTML_DIR = Path(__file__).parent / "../html" MARKDOWN_DIR = Path(__file__).parent / ".." / "markdown" class PluginFSHandler(FileSystemEventHandler): def __init__(self, manager): self.manager = manager def on_modified(self, event): if event.is_directory: return if event.src_path.endswith(".lua"): Logger.log_info(f"Plugin changed: {event.src_path}, reloading") self.manager.reload_plugin(Path(event.src_path)) def on_created(self, event): if event.is_directory: return if event.src_path.endswith(".lua"): Logger.log_info(f"New plugin: {event.src_path}, loading") self.manager.load_plugin(Path(event.src_path)) def on_deleted(self, event): if event.is_directory: return p = Path(event.src_path) if p.suffix == ".lua": Logger.log_info(f"Plugin removed: {p.name}") self.manager.unload_plugin(p.name) class PluginManager: def __init__(self): self.lua = LuaRuntime(unpack_returned_tuples=True) self.plugins = {} # name -> dict{path, lua_module, hooks, routes} self.routes = {} # path -> (plugin_name, lua_fn) self.hooks = {} # hook_name -> list of (plugin_name, lua_fn) # Create directories PLUGINS_DIR.mkdir(exist_ok=True) HTML_DIR.mkdir(exist_ok=True) MARKDOWN_DIR.mkdir(exist_ok=True) # Expose helpers to Lua self._setup_lua_globals() self._start_watcher() def _setup_lua_globals(self): """Set up all Lua global functions and guardrails""" g = self.lua.globals() # Route and hook registration g.add_route = self._expose_add_route g.register_hook = self._expose_register_hook # Logging - using custom Logger g.log = lambda msg: Logger.log_info(f"[lua] {msg}") g.log_warn = lambda msg: Logger.log_warning(f"[lua] {msg}") g.log_error = lambda msg: Logger.log_error(f"[lua] {msg}") # Safe file operations (sandboxed) g.read_file = self._safe_read_file g.write_file = self._safe_write_file # HTML manipulation g.read_html = lambda filename: self._read_content(HTML_DIR, filename) g.write_html = lambda filename, content: self._write_content(HTML_DIR, filename, content) g.list_html_files = lambda: self._list_files(HTML_DIR, ".html") # Markdown manipulation g.read_markdown = lambda filename: self._read_content(MARKDOWN_DIR, filename) g.write_markdown = lambda filename, content: self._write_content(MARKDOWN_DIR, filename, content) g.list_markdown_files = lambda: self._list_files(MARKDOWN_DIR, ".md") # HTML modifier helpers g.html_find_tag = self._html_find_tag g.html_replace_tag = self._html_replace_tag g.html_insert_before = self._html_insert_before g.html_insert_after = self._html_insert_after g.html_wrap_content = self._html_wrap_content # Markdown modifier helpers g.md_add_header = self._md_add_header g.md_replace_section = self._md_replace_section g.md_append_content = self._md_append_content # Utility functions g.table_to_json = self._table_to_json g.json_to_table = self._json_to_table # Guardrails - predefined safe patterns self._setup_lua_guardrails() def _setup_lua_guardrails(self): """Set up Lua guardrails and safe patterns""" guardrails_code = """ -- Guardrails and safe patterns for plugin development -- Safe string operations function safe_concat(...) local result = {} for i, v in ipairs({...}) do if v ~= nil then table.insert(result, tostring(v)) end end return table.concat(result) end -- Safe table operations function table_contains(tbl, value) for _, v in ipairs(tbl) do if v == value then return true end end return false end function table_keys(tbl) local keys = {} for k, _ in pairs(tbl) do table.insert(keys, k) end return keys end function table_values(tbl) local values = {} for _, v in pairs(tbl) do table.insert(values, v) end return values end -- Safe string escaping function escape_html(str) if str == nil then return "" end local s = tostring(str) s = string.gsub(s, "&", "&") s = string.gsub(s, "<", "<") s = string.gsub(s, ">", ">") s = string.gsub(s, '"', """) s = string.gsub(s, "'", "'") return s end -- Pattern validation function is_valid_filename(name) if name == nil or name == "" then return false end -- Block directory traversal if string.match(name, "%.%.") then return false end if string.match(name, "/") or string.match(name, "\\\\") then return false end return true end -- Safe error handling wrapper function try_catch(fn, catch_fn) local status, err = pcall(fn) if not status and catch_fn then catch_fn(err) end return status end -- Request validation function validate_request(req, required_fields) if type(req) ~= "table" then return false, "Request must be a table" end for _, field in ipairs(required_fields) do if req[field] == nil then return false, "Missing required field: " .. field end end return true, nil end -- Rate limiting helper (simple in-memory) _rate_limits = _rate_limits or {} function check_rate_limit(key, max_calls, window_seconds) local now = os.time() if _rate_limits[key] == nil then _rate_limits[key] = {count = 1, window_start = now} return true end local rl = _rate_limits[key] if now - rl.window_start > window_seconds then -- Reset window rl.count = 1 rl.window_start = now return true end if rl.count >= max_calls then return false end rl.count = rl.count + 1 return true end log("Lua guardrails initialized") """ try: self.lua.execute(guardrails_code) except LuaError as e: Logger.log_error(f"Failed to initialize Lua guardrails: {e}") # Safe file operations def _safe_read_file(self, path): """Safe file reading with path validation""" try: p = Path(path) # Prevent directory traversal if ".." in str(p.parts): raise ValueError("Path traversal not allowed") return p.read_text(encoding="utf-8") except Exception as e: Logger.log_error(f"Error reading file {path}: {e}") return None def _safe_write_file(self, path, content): """Safe file writing with path validation""" try: p = Path(path) # Prevent directory traversal if ".." in str(p.parts): raise ValueError("Path traversal not allowed") p.write_text(content, encoding="utf-8") return True except Exception as e: Logger.log_error(f"Error writing file {path}: {e}") return False # HTML/Markdown content operations def _read_content(self, base_dir, filename): """Read content from HTML or Markdown directory""" try: path = base_dir / filename if not path.is_relative_to(base_dir): raise ValueError("Invalid path") if path.exists(): return path.read_text(encoding="utf-8") return None except Exception as e: Logger.log_error(f"Error reading {filename}: {e}") return None def _write_content(self, base_dir, filename, content): """Write content to HTML or Markdown directory""" try: path = base_dir / filename if not path.is_relative_to(base_dir): raise ValueError("Invalid path") path.write_text(content, encoding="utf-8") return True except Exception as e: Logger.log_error(f"Error writing {filename}: {e}") return False def _list_files(self, base_dir, extension): """List files with given extension""" try: return [f.name for f in base_dir.glob(f"*{extension}")] except Exception as e: Logger.log_error(f"Error listing files: {e}") return [] # HTML manipulation helpers def _html_find_tag(self, html, tag): """Find first occurrence of HTML tag""" pattern = f"<{tag}[^>]*>.*?" match = re.search(pattern, html, re.DOTALL | re.IGNORECASE) return match.group(0) if match else None def _html_replace_tag(self, html, tag, new_content): """Replace HTML tag content""" pattern = f"(<{tag}[^>]*>).*?()" return re.sub(pattern, f"\\1{new_content}\\2", html, flags=re.DOTALL | re.IGNORECASE) def _html_insert_before(self, html, marker, content): """Insert content before a marker""" return html.replace(marker, content + marker) def _html_insert_after(self, html, marker, content): """Insert content after a marker""" return html.replace(marker, marker + content) def _html_wrap_content(self, html, tag, wrapper_tag, attrs=""): """Wrap tag content with another tag""" pattern = f"(<{tag}[^>]*>)(.*?)()" def replacer(match): open_tag, content, close_tag = match.groups() return f"{open_tag}<{wrapper_tag} {attrs}>{content}{close_tag}" return re.sub(pattern, replacer, html, flags=re.DOTALL | re.IGNORECASE) # Markdown manipulation helpers def _md_add_header(self, markdown, level, text): """Add header to markdown""" prefix = "#" * level return f"{prefix} {text}\n\n{markdown}" def _md_replace_section(self, markdown, header, new_content): """Replace markdown section""" # Find section starting with header pattern = f"(#{1,6}\\s+{re.escape(header)}.*?)(?=#{1,6}\\s+|$)" return re.sub(pattern, f"## {header}\n\n{new_content}\n\n", markdown, flags=re.DOTALL) def _md_append_content(self, markdown, content): """Append content to markdown""" return markdown.rstrip() + "\n\n" + content # JSON conversion helpers def _table_to_json(self, lua_table): """Convert Lua table to JSON string""" import json try: # Convert lupa table to Python dict py_dict = dict(lua_table) return json.dumps(py_dict) except Exception as e: Logger.log_error(f"Error converting table to JSON: {e}") return "{}" def _json_to_table(self, json_str): """Convert JSON string to Lua table""" import json try: return json.loads(json_str) except Exception as e: Logger.log_error(f"Error parsing JSON: {e}") return {} """ Lifecycle of Plugin """ def load_all(self): for p in PLUGINS_DIR.glob("*.lua"): self.load_plugin(p) def load_plugin(self, path: Path): name = path.name try: code = path.read_text(encoding="utf-8") self._current_plugin = name lua_module = self.lua.execute(code) if lua_module is None: lua_module = {} self.plugins[name] = {"path": path, "module": lua_module} Logger.log_info(f"Loaded plugin: {name}") except LuaError as e: Logger.log_error(f"Lua error while loading {name}: {e}") except Exception as e: Logger.log_error(f"Error loading plugin {name}: {e}") finally: self._current_plugin = None def reload_plugin(self, path: Path): name = path.name # remove previous hooks/routes self.unload_plugin(name) time.sleep(0.05) self.load_plugin(path) def unload_plugin(self, name: str): # Remove routes/hook registrations from this plugin to_remove_routes = [r for r, v in self.routes.items() if v[0] == name] for r in to_remove_routes: del self.routes[r] for hook, lst in list(self.hooks.items()): self.hooks[hook] = [x for x in lst if x[0] != name] if not self.hooks[hook]: del self.hooks[hook] if name in self.plugins: del self.plugins[name] Logger.log_info(f"Unloaded plugin {name}") """ Expose a new API route """ def _expose_add_route(self, path, lua_fn): """Called from Lua as add_route(path, function(req) ... end)""" p = str(path) plugin_name = self._current_loading_plugin_name() if not plugin_name: plugin_name = "" self.routes[p] = (plugin_name, lua_fn) Logger.log_info(f"Plugin {plugin_name} registered route {p}") def _expose_register_hook(self, hook_name, lua_fn): hook = str(hook_name) plugin_name = self._current_loading_plugin_name() or "" self.hooks.setdefault(hook, []).append((plugin_name, lua_fn)) Logger.log_info(f"Plugin {plugin_name} registered hook {hook}") def _current_loading_plugin_name(self): return getattr(self, "_current_plugin", None) """ Running hooks & handling routes """ def handle_request(self, path, request_info=None): """If a plugin registered a route for this path, call it and return (status, headers, body) or raw str.""" if path in self.routes: plugin_name, lua_fn = self.routes[path] try: lua_req = request_info or {} res = lua_fn(lua_req) if isinstance(res, tuple) and len(res) == 3: return res return (200, {"Content-Type": "text/html"}, str(res)) except LuaError as e: Logger.log_error(f"Lua error in route {path}: {e}") return (500, {"Content-Type": "text/plain"}, f"Plugin error: {e}") return None def run_hook(self, hook_name: str, *args): """Run all registered hook functions for hook_name; return last non-None return value.""" if hook_name not in self.hooks: return None last = None for plugin_name, fn in list(self.hooks[hook_name]): try: out = fn(*args) if out is not None: last = out except LuaError as e: Logger.log_error(f"Lua error in hook {hook_name} from {plugin_name}: {e}") return last """ File watcher """ def _start_watcher(self): self.observer = Observer() self.fs_handler = PluginFSHandler(self) self.observer.schedule(self.fs_handler, str(PLUGINS_DIR), recursive=False) self.observer.start() Logger.log_info("Started plugin folder watcher") def stop(self): self.observer.stop() self.observer.join()