import os import sys import threading import subprocess from http.server import BaseHTTPRequestHandler, HTTPServer import mimetypes import json from jsmin import jsmin from pathlib import Path import requests from functools import lru_cache import hashlib from typing import Optional, Tuple, Dict import gzip import time from PIL import Image from io import BytesIO from log.Logger import * from lua import plugin_manager from PyPost import extract_summary logger = Logger() plugin_manager = plugin_manager.PluginManager() plugin_manager.load_all() PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) HTML_DIR = os.path.join(PROJECT_ROOT, "html") MARKDOWN_DIR = os.path.join(PROJECT_ROOT, "markdown") BASE_FILE = os.path.join(HTML_DIR, "base", "index.html") LUA_DIR = Path(PROJECT_ROOT) / "lua" / "plugins" CACHE_DIR = os.path.join(PROJECT_ROOT, "cache") CDN_CACHE_DIR = os.path.join(CACHE_DIR, "cdn") IMAGE_CACHE_DIR = os.path.join(CACHE_DIR, "images") # Image optimization settings IMAGE_EXTENSIONS = {'.webp', '.jpg', '.jpeg', '.png', '.gif', '.ico', '.svg'} ICON_MAX_SIZE = (128, 128) # Max dimensions for icons STANDARD_IMAGE_MAX_SIZE = (1920, 1080) # Max dimensions for regular images WEBP_QUALITY = 65 # Quality for WebP conversion ICON_QUALITY = 90 # Higher quality for icons to preserve detail # CDN Resources to fetch and cache CDN_RESOURCES = { "/package/css/prism.min.css": "https://cdn.jsdelivr.net/npm/prismjs/themes/prism.min.css", "/package/js/prism.min.js": "https://cdn.jsdelivr.net/npm/prismjs/prism.min.js", "/package/js/prism-python.min.js": "https://cdn.jsdelivr.net/npm/prismjs/components/prism-python.min.js", "/package/js/prism-javascript.min.js": "https://cdn.jsdelivr.net/npm/prismjs/components/prism-javascript.min.js", "/package/js/mathjax.js": "https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js", } CDN_BASES = { "mathjax": "https://cdn.jsdelivr.net/npm/mathjax@3/es5" } CACHEABLE_EXTENSIONS = {'.css', '.js', '.webp', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.woff', '.woff2', '.ttf', '.eot', '.ico'} # Compression settings COMPRESS_MIME_TYPES = {'text/html', 'text/css', 'application/javascript', 'application/json', 'text/markdown', 'text/x-lua'} MIN_COMPRESS_SIZE = 1024 # Only compress files larger than 1KB # Session for connection pooling session = requests.Session() session.mount('https://', requests.adapters.HTTPAdapter( pool_connections=10, pool_maxsize=20, max_retries=3 )) def ensure_cache_dirs(): """Ensure cache directories exist""" os.makedirs(CDN_CACHE_DIR, exist_ok=True) os.makedirs(IMAGE_CACHE_DIR, exist_ok=True) logger.log_info(f"Cache directories ready: {CDN_CACHE_DIR}, {IMAGE_CACHE_DIR}") def fetch_cdn_resources(): ensure_cache_dirs() logger.log_info("Fetching CDN resources...") def fetch_single_resource(local_path, cdn_url): try: url_hash = hashlib.md5(local_path.encode()).hexdigest() cache_file = os.path.join(CDN_CACHE_DIR, url_hash) if os.path.exists(cache_file): logger.log_debug(f"CDN resource already cached: {local_path}") return True logger.log_info(f"Fetching {cdn_url}...") response = session.get(cdn_url, timeout=30) response.raise_for_status() with open(cache_file, 'wb') as f: f.write(response.content) logger.log_info(f"Cached CDN resource: {local_path} ({len(response.content)} bytes)") return True except Exception as e: logger.log_error(f"Failed to fetch CDN resource {cdn_url}: {e}") return False # Parallel fetch with threads threads = [] for local_path, cdn_url in CDN_RESOURCES.items(): t = threading.Thread(target=fetch_single_resource, args=(local_path, cdn_url)) t.start() threads.append(t) for t in threads: t.join() def fetch_cdn_resource_on_demand(local_path: str) -> Optional[bytes]: """ On demand fetching of a CDN """ if local_path.startswith("/package/js/"): relative_path = local_path[12:] if any(x in relative_path for x in ["a11y/", "input/", "output/", "ui/", "sre"]): cdn_url = f"{CDN_BASES['mathjax']}/{relative_path}" else: return None try: url_hash = hashlib.md5(local_path.encode()).hexdigest() cache_file = os.path.join(CDN_CACHE_DIR, url_hash) if os.path.exists(cache_file): with open(cache_file, 'rb') as f: return f.read() logger.log_info(f"Fetching on-demand: {cdn_url}") response = session.get(cdn_url, timeout=10) response.raise_for_status() with open(cache_file, 'wb') as f: f.write(response.content) logger.log_info(f"Cached on-demand: {local_path}") return response.content except Exception as e: logger.log_error(f"Failed to fetch on-demand resource {cdn_url}: {e}") return None return None @lru_cache(maxsize=2048) def load_file_cached(file_path: str, is_js: bool = False, optimize_img: bool = False) -> Tuple[bytes, str]: # Handle image optimization if optimize_img and should_optimize_image(file_path): return optimize_image(file_path) mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "application/octet-stream" with open(file_path, "rb") as f: content = f.read() # Minify JS files if is_js or mime_type == "application/javascript" or file_path.endswith(".js"): try: content = jsmin(content.decode("utf-8")).encode("utf-8") except Exception as err: logger.log_error(f"Error minifying JS file {file_path}: {err}") return content, mime_type @lru_cache(maxsize=1024) def compress_content(content: bytes) -> bytes: """LRU cached gzip compression""" return gzip.compress(content, compresslevel=6) def is_icon(file_path: str) -> bool: """Determine if file is an icon based on path or name""" lower_path = file_path.lower() return ( 'icon' in lower_path or 'favicon' in lower_path or file_path.endswith('.ico') or '/icons/' in lower_path ) def get_image_cache_path(file_path: str) -> str: """Generate cache path for optimized image""" file_hash = hashlib.md5(file_path.encode()).hexdigest() file_stat = os.stat(file_path) # Include mtime in hash to invalidate cache when file changes cache_key = f"{file_hash}_{int(file_stat.st_mtime)}" return os.path.join(IMAGE_CACHE_DIR, cache_key + ".webp") def optimize_image(file_path: str) -> Tuple[bytes, str]: try: # Check cache first cache_path = get_image_cache_path(file_path) if os.path.exists(cache_path): with open(cache_path, 'rb') as f: return f.read(), "image/webp" # Open and process image with Image.open(file_path) as img: # Preserve transparency by converting to RGBA if needed if img.mode == 'P': # Palette mode - convert to RGBA to preserve transparency img = img.convert('RGBA') elif img.mode == 'LA': # Grayscale with alpha - convert to RGBA img = img.convert('RGBA') elif img.mode not in ('RGBA', 'RGB', 'L'): # Other modes - try to preserve alpha if present if 'transparency' in img.info: img = img.convert('RGBA') else: img = img.convert('RGB') # If already RGBA or RGB, keep as is # Determine if it's an icon and resize accordingly if is_icon(file_path): max_size = ICON_MAX_SIZE quality = ICON_QUALITY else: max_size = STANDARD_IMAGE_MAX_SIZE quality = WEBP_QUALITY # Resize if image is larger than max size if img.size[0] > max_size[0] or img.size[1] > max_size[1]: img.thumbnail(max_size, Image.Resampling.LANCZOS) logger.log_debug(f"Resized image {file_path} to {img.size}") # Save as WebP to BytesIO with lossless for transparency output = BytesIO() # Use lossless=True for images with alpha channel to preserve transparency perfectly if img.mode == 'RGBA': img.save(output, format='WEBP', quality=quality, method=6, lossless=False) else: img.save(output, format='WEBP', quality=quality, method=6) optimized_content = output.getvalue() # Cache the optimized image with open(cache_path, 'wb') as f: f.write(optimized_content) original_size = os.path.getsize(file_path) optimized_size = len(optimized_content) savings = ((original_size - optimized_size) / original_size) * 100 logger.log_info(f"Optimized {file_path}: {original_size} to {optimized_size} bytes ({savings:.1f}% reduction)") return optimized_content, "image/webp" except Exception as e: logger.log_error(f"Error compressing image {file_path}: {e}") # Fall back to original file with open(file_path, 'rb') as f: content = f.read() mime_type, _ = mimetypes.guess_type(file_path) return content, mime_type or "application/octet-stream" def prewarm_image_cache(): for root, _, files in os.walk(PROJECT_ROOT): for f in files: if should_optimize_image(f): optimize_image(os.path.join(root, f)) def should_optimize_image(file_path: str) -> bool: ext = os.path.splitext(file_path)[1].lower() # If its a svg then just return. SVG is good return ext in IMAGE_EXTENSIONS and ext != '.svg' def should_cache_file(file_path: str) -> bool: ext = os.path.splitext(file_path)[1].lower() return ext in CACHEABLE_EXTENSIONS def get_html_files(directory=HTML_DIR): html_files = [] for entry in os.listdir(directory): full_path = os.path.join(directory, entry) if os.path.isfile(full_path) and entry.endswith(".html"): html_files.append(entry) return html_files _index_cache = {"content": None, "timestamp": 0} INDEX_CACHE_TTL = 300 # 300/60 = 5min def build_index_page(force_refresh: bool = False) -> str: """Build index page with caching""" global _index_cache current_time = time.time() if not force_refresh and _index_cache["content"] and (current_time - _index_cache["timestamp"]) < INDEX_CACHE_TTL: return _index_cache["content"] with open(BASE_FILE, "r", encoding="utf-8") as f: base_html = f.read() articles = [] for md_path in Path(MARKDOWN_DIR).rglob("*.md"): try: summary_data = extract_summary(md_path) if summary_data: html_name, summary = summary_data else: html_name = md_path.stem + ".html" summary = "No Summary for this Article!" text = md_path.read_text(encoding="utf-8") title = md_path.stem for line in text.splitlines(): if line.startswith("# "): title = line[2:].strip() break article_html = f"""

{title}

{summary}

""" articles.append(article_html) except Exception as e: logger.log_warning(f"Exception with summary: {e} at {md_path}") continue full_content = "\n".join(articles) + "" + index_footer() content = base_html.replace("", full_content) # Update cache _index_cache["content"] = content _index_cache["timestamp"] = current_time return content import base64 import random from hashes.hashes import hash_list # Generate hashes only once at server start H1 = random.choice(hash_list) H2_CANDIDATES = [h for h in hash_list if h != H1] H2 = random.choice(H2_CANDIDATES) if H2_CANDIDATES else H1 def index_footer() -> str: tor_link = "http://7uhuxits7qfmiagkmpazxvh3rtk6aijs6pbawge3fl77y4xqjixlhkqd.onion/" return f""" """ class WebServerHTTPRequestHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass def _supports_gzip(self) -> bool: accept_encoding = self.headers.get('Accept-Encoding', '') return 'gzip' in accept_encoding.lower() def _send_compressed_response(self, content: bytes, mime_type: str, cache_control: str = None): should_compress = ( self._supports_gzip() and mime_type in COMPRESS_MIME_TYPES and len(content) >= MIN_COMPRESS_SIZE ) if should_compress: # Use cached compression compressed = compress_content(content) self.send_response(200) self.send_header("Content-type", mime_type) self.send_header("Content-Encoding", "gzip") self.send_header("Content-Length", len(compressed)) if cache_control: self.send_header("Cache-Control", cache_control) self.end_headers() self.wfile.write(compressed) else: self.send_response(200) self.send_header("Content-type", mime_type) self.send_header("Content-Length", len(content)) if cache_control: self.send_header("Cache-Control", cache_control) self.end_headers() self.wfile.write(content) def _parse_post_data(self): content_length = int(self.headers.get('Content-Length', 0)) if content_length == 0: return {} post_data = self.rfile.read(content_length) content_type = self.headers.get('Content-Type', '') try: if 'application/json' in content_type: return json.loads(post_data.decode('utf-8')) elif 'application/x-www-form-urlencoded' in content_type: from urllib.parse import parse_qs parsed = parse_qs(post_data.decode('utf-8')) return {k: v[0] if len(v) == 1 else v for k, v in parsed.items()} else: return {"raw": post_data} except Exception as e: logger.log_error(f"Error parsing POST data: {e}") return {"raw": post_data} def do_POST(self): """Handle POST for Plugins""" req_path = self.path.lstrip("/") post_data = self._parse_post_data() request_data = { "path": self.path, "headers": dict(self.headers), "data": post_data, "method": "POST" } plugin_result = plugin_manager.handle_request("/" + req_path, request_data, method="POST") if plugin_result is not None: status, headers, body = plugin_result self.send_response(status) for key, value in headers.items(): self.send_header(key, value) self.end_headers() if isinstance(body, str): self.wfile.write(body.encode("utf-8")) elif isinstance(body, bytes): self.wfile.write(body) else: self.wfile.write(str(body).encode("utf-8")) return self.send_response(404) self.send_header("Content-type", "application/json") self.end_headers() error_response = json.dumps({"error": "Route not found"}) self.wfile.write(error_response.encode("utf-8")) def do_GET(self): req_path = self.path.lstrip("/") # Handle root/index with caching if req_path == "" or req_path == "index.html": content = build_index_page() self._send_compressed_response( content.encode("utf-8"), "text/html", "public, max-age=300" # Cache for 5 minutes ) return # Handle CDN package requests if req_path.startswith("package/"): cdn_path = "/" + req_path if cdn_path in CDN_RESOURCES: url_hash = hashlib.md5(cdn_path.encode()).hexdigest() cache_file = os.path.join(CDN_CACHE_DIR, url_hash) if os.path.exists(cache_file): with open(cache_file, 'rb') as f: cached_content = f.read() else: cached_content = None else: cached_content = fetch_cdn_resource_on_demand(cdn_path) if cached_content: if cdn_path.endswith('.css'): mime_type = "text/css" elif cdn_path.endswith('.js'): mime_type = "application/javascript" elif cdn_path.endswith('.wasm'): mime_type = "application/wasm" elif cdn_path.endswith('.json'): mime_type = "application/json" else: mime_type = "application/octet-stream" self.send_response(200) self.send_header("Content-type", mime_type) self.send_header("Cache-Control", "public, max-age=86400") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(cached_content) return else: logger.log_warning(f"CDN resource not found: {cdn_path}") self.send_response(404) self.end_headers() self.wfile.write(b"404 - CDN resource not available") return # check the plugins routes plugin_result = plugin_manager.handle_request("/" + req_path, {"path": self.path}) if plugin_result is not None: status, headers, body = plugin_result self.send_response(status) for key, value in headers.items(): self.send_header(key, value) self.end_headers() self.wfile.write(body.encode("utf-8") if isinstance(body, str) else body) return # Handle markdown file downloads if req_path.startswith("markdown/"): markdown_filename = req_path[9:] if not markdown_filename.endswith(".md") or ".." in markdown_filename or "/" in markdown_filename: self.send_response(403) self.end_headers() self.wfile.write(b"403 - Forbidden: Only .md files allowed") return markdown_file_path = os.path.join(MARKDOWN_DIR, markdown_filename) if not os.path.exists(markdown_file_path) or not os.path.isfile(markdown_file_path): self.send_response(404) self.end_headers() self.wfile.write(b"404 - Markdown file not found") return resolved_path = os.path.realpath(markdown_file_path) resolved_markdown_dir = os.path.realpath(MARKDOWN_DIR) if not resolved_path.startswith(resolved_markdown_dir): self.send_response(403) self.end_headers() self.wfile.write(b"403 - Forbidden") return try: with open(markdown_file_path, "rb") as f: content = f.read() self._send_compressed_response(content, "text/markdown") logger.log_info(f"Served markdown file: {markdown_filename}") return except Exception as err: logger.log_error(f"Error serving markdown file {markdown_filename}: {err}") self.send_response(500) self.end_headers() self.wfile.write(b"500 - Internal Server Error") return # Handle Lua file downloads if req_path.startswith("lua/"): lua_filename = req_path[4:] if not lua_filename.endswith(".lua") or ".." in lua_filename or "/" in lua_filename: self.send_response(403) self.end_headers() self.wfile.write(b"403 - Forbidden: Only .lua files allowed") return lua_file_path = os.path.join(LUA_DIR, lua_filename) if not os.path.exists(lua_file_path) or not os.path.isfile(lua_file_path): self.send_response(404) self.end_headers() self.wfile.write(b"404 - Lua file not found") return resolved_path = os.path.realpath(lua_file_path) resolved_lua_dir = os.path.realpath(LUA_DIR) if not resolved_path.startswith(resolved_lua_dir): self.send_response(403) self.end_headers() self.wfile.write(b"403 - Forbidden") return try: with open(lua_file_path, "rb") as f: content = f.read() self._send_compressed_response(content, "text/x-lua") logger.log_info(f"Served Lua file: {lua_filename}") return except Exception as err: logger.log_error(f"Error serving Lua file {lua_filename}: {err}") self.send_response(500) self.end_headers() self.wfile.write(b"500 - Internal Server Error") return # Handle other files with LRU caching for static assets file_path = os.path.normpath(os.path.join(PROJECT_ROOT, req_path)) if not file_path.startswith(PROJECT_ROOT): self.send_response(403) self.end_headers() self.wfile.write(b"403 - Forbidden") return if os.path.isfile(file_path): try: if should_cache_file(file_path): is_js = file_path.endswith('.js') optimize_img = should_optimize_image(file_path) content, mime_type = load_file_cached(file_path, is_js, optimize_img) self._send_compressed_response( content, mime_type, "public, max-age=3600" ) else: mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "application/octet-stream" with open(file_path, "rb") as f: content = f.read() if mime_type == "application/javascript" or file_path.endswith(".js"): try: content = jsmin(content.decode("utf-8")).encode("utf-8") except Exception as err: logger.log_error(f"Error minifying JS file {file_path}: {err}") self._send_compressed_response(content, mime_type) return except Exception as err: logger.log_error(f"Error serving file {file_path}: {err}") self.send_response(500) self.end_headers() self.wfile.write(b"500 - Internal Server Error") return self.send_response(404) self.end_headers() self.wfile.write(b"404 - Not Found") def run_pypost(): """Run PyPost.py in a separate process.""" script = os.path.join(PROJECT_ROOT, "PyPost.py") subprocess.run([sys.executable, script]) if __name__ == "__main__": try: logger.log_info("Initializing cache directories") ensure_cache_dirs() logger.log_info("Initializing CDN resource cache") fetch_cdn_resources() logger.log_info("CDN resources ready!") threading.Thread(target=run_pypost, daemon=True).start() logger.log_debug("Started PyPost.py in background watcher thread.") server_address = ("localhost", 8000) httpd: HTTPServer = HTTPServer(server_address, WebServerHTTPRequestHandler) logger.log_info(f"Serving on http://{server_address[0]}:{server_address[1]}") logger.log_info(f"Icon max size: {ICON_MAX_SIZE} \n Image max size: {STANDARD_IMAGE_MAX_SIZE}") httpd.serve_forever() prewarm_image_cache() except (Exception, KeyboardInterrupt) as e: if KeyboardInterrupt: logger.log_info(f"Shutting down server.\n Reason: KeyboardInterrupt") else: logger.log_info(f"Shutting down server.\n Reason: {e}") httpd.server_close()