Files
PyPost/webserver.py
rattatwinko 5c170a195e FaF - Ws
What does this mean?
Fast as Fuck WebServer. Now we compress and cache like hell. so its fast as fuck. 1 new requirement which isnt too bad: Pillow, for images!

Clear cache and restart the server!
2025-10-13 18:20:27 +02:00

684 lines
27 KiB
Python

import os
import sys
import threading
import subprocess
from http.server import BaseHTTPRequestHandler, HTTPServer
import mimetypes
import json
from jsmin import jsmin
from pathlib import Path
import requests
from functools import lru_cache
import hashlib
from typing import Optional, Tuple, Dict
import gzip
import time
from PIL import Image
from io import BytesIO
from log.Logger import *
from lua import plugin_manager
from PyPost import extract_summary
logger = Logger()
plugin_manager = plugin_manager.PluginManager()
plugin_manager.load_all()
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
HTML_DIR = os.path.join(PROJECT_ROOT, "html")
MARKDOWN_DIR = os.path.join(PROJECT_ROOT, "markdown")
BASE_FILE = os.path.join(HTML_DIR, "base", "index.html")
LUA_DIR = Path(PROJECT_ROOT) / "lua" / "plugins"
CACHE_DIR = os.path.join(PROJECT_ROOT, "cache")
CDN_CACHE_DIR = os.path.join(CACHE_DIR, "cdn")
IMAGE_CACHE_DIR = os.path.join(CACHE_DIR, "images")
# Image optimization settings
IMAGE_EXTENSIONS = {'.webp', '.jpg', '.jpeg', '.png', '.gif', '.ico', '.svg'}
ICON_MAX_SIZE = (128, 128) # Max dimensions for icons
STANDARD_IMAGE_MAX_SIZE = (1920, 1080) # Max dimensions for regular images
WEBP_QUALITY = 65 # Quality for WebP conversion
ICON_QUALITY = 90 # Higher quality for icons to preserve detail
# CDN Resources to fetch and cache
CDN_RESOURCES = {
"/package/css/prism.min.css": "https://cdn.jsdelivr.net/npm/prismjs/themes/prism.min.css",
"/package/js/prism.min.js": "https://cdn.jsdelivr.net/npm/prismjs/prism.min.js",
"/package/js/prism-python.min.js": "https://cdn.jsdelivr.net/npm/prismjs/components/prism-python.min.js",
"/package/js/prism-javascript.min.js": "https://cdn.jsdelivr.net/npm/prismjs/components/prism-javascript.min.js",
"/package/js/mathjax.js": "https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js",
}
CDN_BASES = {
"mathjax": "https://cdn.jsdelivr.net/npm/mathjax@3/es5"
}
CACHEABLE_EXTENSIONS = {'.css', '.js', '.webp', '.jpg', '.jpeg', '.png', '.gif', '.svg', '.woff', '.woff2', '.ttf', '.eot', '.ico'}
# Compression settings
COMPRESS_MIME_TYPES = {'text/html', 'text/css', 'application/javascript', 'application/json', 'text/markdown', 'text/x-lua'}
MIN_COMPRESS_SIZE = 1024 # Only compress files larger than 1KB
# Session for connection pooling
session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=20,
max_retries=3
))
def ensure_cache_dirs():
"""Ensure cache directories exist"""
os.makedirs(CDN_CACHE_DIR, exist_ok=True)
os.makedirs(IMAGE_CACHE_DIR, exist_ok=True)
logger.log_info(f"Cache directories ready: {CDN_CACHE_DIR}, {IMAGE_CACHE_DIR}")
def fetch_cdn_resources():
ensure_cache_dirs()
logger.log_info("Fetching CDN resources...")
def fetch_single_resource(local_path, cdn_url):
try:
url_hash = hashlib.md5(local_path.encode()).hexdigest()
cache_file = os.path.join(CDN_CACHE_DIR, url_hash)
if os.path.exists(cache_file):
logger.log_debug(f"CDN resource already cached: {local_path}")
return True
logger.log_info(f"Fetching {cdn_url}...")
response = session.get(cdn_url, timeout=30)
response.raise_for_status()
with open(cache_file, 'wb') as f:
f.write(response.content)
logger.log_info(f"Cached CDN resource: {local_path} ({len(response.content)} bytes)")
return True
except Exception as e:
logger.log_error(f"Failed to fetch CDN resource {cdn_url}: {e}")
return False
# Parallel fetch with threads
threads = []
for local_path, cdn_url in CDN_RESOURCES.items():
t = threading.Thread(target=fetch_single_resource, args=(local_path, cdn_url))
t.start()
threads.append(t)
for t in threads:
t.join()
def fetch_cdn_resource_on_demand(local_path: str) -> Optional[bytes]:
""" On demand fetching of a CDN """
if local_path.startswith("/package/js/"):
relative_path = local_path[12:]
if any(x in relative_path for x in ["a11y/", "input/", "output/", "ui/", "sre"]):
cdn_url = f"{CDN_BASES['mathjax']}/{relative_path}"
else:
return None
try:
url_hash = hashlib.md5(local_path.encode()).hexdigest()
cache_file = os.path.join(CDN_CACHE_DIR, url_hash)
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
return f.read()
logger.log_info(f"Fetching on-demand: {cdn_url}")
response = session.get(cdn_url, timeout=10)
response.raise_for_status()
with open(cache_file, 'wb') as f:
f.write(response.content)
logger.log_info(f"Cached on-demand: {local_path}")
return response.content
except Exception as e:
logger.log_error(f"Failed to fetch on-demand resource {cdn_url}: {e}")
return None
return None
@lru_cache(maxsize=2048)
def load_file_cached(file_path: str, is_js: bool = False, optimize_img: bool = False) -> Tuple[bytes, str]:
# Handle image optimization
if optimize_img and should_optimize_image(file_path):
return optimize_image(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = "application/octet-stream"
with open(file_path, "rb") as f:
content = f.read()
# Minify JS files
if is_js or mime_type == "application/javascript" or file_path.endswith(".js"):
try:
content = jsmin(content.decode("utf-8")).encode("utf-8")
except Exception as err:
logger.log_error(f"Error minifying JS file {file_path}: {err}")
return content, mime_type
@lru_cache(maxsize=1024)
def compress_content(content: bytes) -> bytes:
"""LRU cached gzip compression"""
return gzip.compress(content, compresslevel=6)
def is_icon(file_path: str) -> bool:
"""Determine if file is an icon based on path or name"""
lower_path = file_path.lower()
return (
'icon' in lower_path or
'favicon' in lower_path or
file_path.endswith('.ico') or
'/icons/' in lower_path
)
def get_image_cache_path(file_path: str) -> str:
"""Generate cache path for optimized image"""
file_hash = hashlib.md5(file_path.encode()).hexdigest()
file_stat = os.stat(file_path)
# Include mtime in hash to invalidate cache when file changes
cache_key = f"{file_hash}_{int(file_stat.st_mtime)}"
return os.path.join(IMAGE_CACHE_DIR, cache_key + ".webp")
def optimize_image(file_path: str) -> Tuple[bytes, str]:
try:
# Check cache first
cache_path = get_image_cache_path(file_path)
if os.path.exists(cache_path):
with open(cache_path, 'rb') as f:
return f.read(), "image/webp"
# Open and process image
with Image.open(file_path) as img:
# Preserve transparency by converting to RGBA if needed
if img.mode == 'P':
# Palette mode - convert to RGBA to preserve transparency
img = img.convert('RGBA')
elif img.mode == 'LA':
# Grayscale with alpha - convert to RGBA
img = img.convert('RGBA')
elif img.mode not in ('RGBA', 'RGB', 'L'):
# Other modes - try to preserve alpha if present
if 'transparency' in img.info:
img = img.convert('RGBA')
else:
img = img.convert('RGB')
# If already RGBA or RGB, keep as is
# Determine if it's an icon and resize accordingly
if is_icon(file_path):
max_size = ICON_MAX_SIZE
quality = ICON_QUALITY
else:
max_size = STANDARD_IMAGE_MAX_SIZE
quality = WEBP_QUALITY
# Resize if image is larger than max size
if img.size[0] > max_size[0] or img.size[1] > max_size[1]:
img.thumbnail(max_size, Image.Resampling.LANCZOS)
logger.log_debug(f"Resized image {file_path} to {img.size}")
# Save as WebP to BytesIO with lossless for transparency
output = BytesIO()
# Use lossless=True for images with alpha channel to preserve transparency perfectly
if img.mode == 'RGBA':
img.save(output, format='WEBP', quality=quality, method=6, lossless=False)
else:
img.save(output, format='WEBP', quality=quality, method=6)
optimized_content = output.getvalue()
# Cache the optimized image
with open(cache_path, 'wb') as f:
f.write(optimized_content)
original_size = os.path.getsize(file_path)
optimized_size = len(optimized_content)
savings = ((original_size - optimized_size) / original_size) * 100
logger.log_info(f"Optimized {file_path}: {original_size} to {optimized_size} bytes ({savings:.1f}% reduction)")
return optimized_content, "image/webp"
except Exception as e:
logger.log_error(f"Error compressing image {file_path}: {e}")
# Fall back to original file
with open(file_path, 'rb') as f:
content = f.read()
mime_type, _ = mimetypes.guess_type(file_path)
return content, mime_type or "application/octet-stream"
def prewarm_image_cache():
for root, _, files in os.walk(PROJECT_ROOT):
for f in files:
if should_optimize_image(f):
optimize_image(os.path.join(root, f))
def should_optimize_image(file_path: str) -> bool:
ext = os.path.splitext(file_path)[1].lower()
# If its a svg then just return. SVG is good
return ext in IMAGE_EXTENSIONS and ext != '.svg'
def should_cache_file(file_path: str) -> bool:
ext = os.path.splitext(file_path)[1].lower()
return ext in CACHEABLE_EXTENSIONS
def get_html_files(directory=HTML_DIR):
html_files = []
for entry in os.listdir(directory):
full_path = os.path.join(directory, entry)
if os.path.isfile(full_path) and entry.endswith(".html"):
html_files.append(entry)
return html_files
_index_cache = {"content": None, "timestamp": 0}
INDEX_CACHE_TTL = 300 # 300/60 = 5min
def build_index_page(force_refresh: bool = False) -> str:
"""Build index page with caching"""
global _index_cache
current_time = time.time()
if not force_refresh and _index_cache["content"] and (current_time - _index_cache["timestamp"]) < INDEX_CACHE_TTL:
return _index_cache["content"]
with open(BASE_FILE, "r", encoding="utf-8") as f:
base_html = f.read()
articles = []
for md_path in Path(MARKDOWN_DIR).rglob("*.md"):
try:
summary_data = extract_summary(md_path)
if summary_data:
html_name, summary = summary_data
else:
html_name = md_path.stem + ".html"
summary = "No Summary for this Article!"
text = md_path.read_text(encoding="utf-8")
title = md_path.stem
for line in text.splitlines():
if line.startswith("# "):
title = line[2:].strip()
break
article_html = f"""
<article>
<h3><a href="/html/{html_name}">{title}</a></h3>
<p>{summary}</p>
</article>
"""
articles.append(article_html)
except Exception as e:
logger.log_warning(f"Exception with summary: {e} at {md_path}")
continue
full_content = "\n".join(articles) + "</main>" + index_footer()
content = base_html.replace("<!-- CONTENT -->", full_content)
# Update cache
_index_cache["content"] = content
_index_cache["timestamp"] = current_time
return content
import base64
import random
from hashes.hashes import hash_list
# Generate hashes only once at server start
H1 = random.choice(hash_list)
H2_CANDIDATES = [h for h in hash_list if h != H1]
H2 = random.choice(H2_CANDIDATES) if H2_CANDIDATES else H1
def index_footer() -> str:
tor_link = "http://7uhuxits7qfmiagkmpazxvh3rtk6aijs6pbawge3fl77y4xqjixlhkqd.onion/"
return f"""
<div class="footer">
<footer>
<p>
<!-- Server Time -->
<img src="../css/icons/date.webp" width="16" height="16" alt="date" loading="lazy" style="vertical-align: middle;" />
Server-Time (CET ; GMT+2): <i>{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}</i><br />
<!-- Hashes -->
<img src="../css/icons/magnifier.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
Hash 1 (<b>UTF-8</b>)<i>:{base64.b64encode(H1.encode("utf-8")).decode("utf-8")}</i><br />
<img src="../css/icons/magnifier.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
Hash 2 (<b>Windows-1252</b>)<i>:{base64.b64encode(H2.encode("windows-1252")).decode("windows-1252")}</i><br />
<!-- Git Repository Link -->
<img src="../css/icons/written.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
<a style="text-decoration:none;color:#0066cc;font-style:italic;padding-top:5px;" href="https://rattatwinko.servecounterstrike.com/gitea/rattatwinko/PyPost">View Git-Repository</a><br />
<img src="../css/icons/script.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
<a style="text-decoration:none;color:#0066cc;font-style:italic;padding-top:5px;" href="{tor_link}">View Tor Site</a>
</p>
</footer>
</div>
"""
class WebServerHTTPRequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
def _supports_gzip(self) -> bool:
accept_encoding = self.headers.get('Accept-Encoding', '')
return 'gzip' in accept_encoding.lower()
def _send_compressed_response(self, content: bytes, mime_type: str, cache_control: str = None):
should_compress = (
self._supports_gzip() and
mime_type in COMPRESS_MIME_TYPES and
len(content) >= MIN_COMPRESS_SIZE
)
if should_compress:
# Use cached compression
compressed = compress_content(content)
self.send_response(200)
self.send_header("Content-type", mime_type)
self.send_header("Content-Encoding", "gzip")
self.send_header("Content-Length", len(compressed))
if cache_control:
self.send_header("Cache-Control", cache_control)
self.end_headers()
self.wfile.write(compressed)
else:
self.send_response(200)
self.send_header("Content-type", mime_type)
self.send_header("Content-Length", len(content))
if cache_control:
self.send_header("Cache-Control", cache_control)
self.end_headers()
self.wfile.write(content)
def _parse_post_data(self):
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
return {}
post_data = self.rfile.read(content_length)
content_type = self.headers.get('Content-Type', '')
try:
if 'application/json' in content_type:
return json.loads(post_data.decode('utf-8'))
elif 'application/x-www-form-urlencoded' in content_type:
from urllib.parse import parse_qs
parsed = parse_qs(post_data.decode('utf-8'))
return {k: v[0] if len(v) == 1 else v for k, v in parsed.items()}
else:
return {"raw": post_data}
except Exception as e:
logger.log_error(f"Error parsing POST data: {e}")
return {"raw": post_data}
def do_POST(self):
"""Handle POST for Plugins"""
req_path = self.path.lstrip("/")
post_data = self._parse_post_data()
request_data = {
"path": self.path,
"headers": dict(self.headers),
"data": post_data,
"method": "POST"
}
plugin_result = plugin_manager.handle_request("/" + req_path, request_data, method="POST")
if plugin_result is not None:
status, headers, body = plugin_result
self.send_response(status)
for key, value in headers.items():
self.send_header(key, value)
self.end_headers()
if isinstance(body, str):
self.wfile.write(body.encode("utf-8"))
elif isinstance(body, bytes):
self.wfile.write(body)
else:
self.wfile.write(str(body).encode("utf-8"))
return
self.send_response(404)
self.send_header("Content-type", "application/json")
self.end_headers()
error_response = json.dumps({"error": "Route not found"})
self.wfile.write(error_response.encode("utf-8"))
def do_GET(self):
req_path = self.path.lstrip("/")
# Handle root/index with caching
if req_path == "" or req_path == "index.html":
content = build_index_page()
self._send_compressed_response(
content.encode("utf-8"),
"text/html",
"public, max-age=300" # Cache for 5 minutes
)
return
# Handle CDN package requests
if req_path.startswith("package/"):
cdn_path = "/" + req_path
if cdn_path in CDN_RESOURCES:
url_hash = hashlib.md5(cdn_path.encode()).hexdigest()
cache_file = os.path.join(CDN_CACHE_DIR, url_hash)
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
cached_content = f.read()
else:
cached_content = None
else:
cached_content = fetch_cdn_resource_on_demand(cdn_path)
if cached_content:
if cdn_path.endswith('.css'):
mime_type = "text/css"
elif cdn_path.endswith('.js'):
mime_type = "application/javascript"
elif cdn_path.endswith('.wasm'):
mime_type = "application/wasm"
elif cdn_path.endswith('.json'):
mime_type = "application/json"
else:
mime_type = "application/octet-stream"
self.send_response(200)
self.send_header("Content-type", mime_type)
self.send_header("Cache-Control", "public, max-age=86400")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(cached_content)
return
else:
logger.log_warning(f"CDN resource not found: {cdn_path}")
self.send_response(404)
self.end_headers()
self.wfile.write(b"404 - CDN resource not available")
return
# check the plugins routes
plugin_result = plugin_manager.handle_request("/" + req_path, {"path": self.path})
if plugin_result is not None:
status, headers, body = plugin_result
self.send_response(status)
for key, value in headers.items():
self.send_header(key, value)
self.end_headers()
self.wfile.write(body.encode("utf-8") if isinstance(body, str) else body)
return
# Handle markdown file downloads
if req_path.startswith("markdown/"):
markdown_filename = req_path[9:]
if not markdown_filename.endswith(".md") or ".." in markdown_filename or "/" in markdown_filename:
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden: Only .md files allowed")
return
markdown_file_path = os.path.join(MARKDOWN_DIR, markdown_filename)
if not os.path.exists(markdown_file_path) or not os.path.isfile(markdown_file_path):
self.send_response(404)
self.end_headers()
self.wfile.write(b"404 - Markdown file not found")
return
resolved_path = os.path.realpath(markdown_file_path)
resolved_markdown_dir = os.path.realpath(MARKDOWN_DIR)
if not resolved_path.startswith(resolved_markdown_dir):
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden")
return
try:
with open(markdown_file_path, "rb") as f:
content = f.read()
self._send_compressed_response(content, "text/markdown")
logger.log_info(f"Served markdown file: {markdown_filename}")
return
except Exception as err:
logger.log_error(f"Error serving markdown file {markdown_filename}: {err}")
self.send_response(500)
self.end_headers()
self.wfile.write(b"500 - Internal Server Error")
return
# Handle Lua file downloads
if req_path.startswith("lua/"):
lua_filename = req_path[4:]
if not lua_filename.endswith(".lua") or ".." in lua_filename or "/" in lua_filename:
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden: Only .lua files allowed")
return
lua_file_path = os.path.join(LUA_DIR, lua_filename)
if not os.path.exists(lua_file_path) or not os.path.isfile(lua_file_path):
self.send_response(404)
self.end_headers()
self.wfile.write(b"404 - Lua file not found")
return
resolved_path = os.path.realpath(lua_file_path)
resolved_lua_dir = os.path.realpath(LUA_DIR)
if not resolved_path.startswith(resolved_lua_dir):
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden")
return
try:
with open(lua_file_path, "rb") as f:
content = f.read()
self._send_compressed_response(content, "text/x-lua")
logger.log_info(f"Served Lua file: {lua_filename}")
return
except Exception as err:
logger.log_error(f"Error serving Lua file {lua_filename}: {err}")
self.send_response(500)
self.end_headers()
self.wfile.write(b"500 - Internal Server Error")
return
# Handle other files with LRU caching for static assets
file_path = os.path.normpath(os.path.join(PROJECT_ROOT, req_path))
if not file_path.startswith(PROJECT_ROOT):
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden")
return
if os.path.isfile(file_path):
try:
if should_cache_file(file_path):
is_js = file_path.endswith('.js')
optimize_img = should_optimize_image(file_path)
content, mime_type = load_file_cached(file_path, is_js, optimize_img)
self._send_compressed_response(
content,
mime_type,
"public, max-age=3600"
)
else:
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = "application/octet-stream"
with open(file_path, "rb") as f:
content = f.read()
if mime_type == "application/javascript" or file_path.endswith(".js"):
try:
content = jsmin(content.decode("utf-8")).encode("utf-8")
except Exception as err:
logger.log_error(f"Error minifying JS file {file_path}: {err}")
self._send_compressed_response(content, mime_type)
return
except Exception as err:
logger.log_error(f"Error serving file {file_path}: {err}")
self.send_response(500)
self.end_headers()
self.wfile.write(b"500 - Internal Server Error")
return
self.send_response(404)
self.end_headers()
self.wfile.write(b"404 - Not Found")
def run_pypost():
"""Run PyPost.py in a separate process."""
script = os.path.join(PROJECT_ROOT, "PyPost.py")
subprocess.run([sys.executable, script])
if __name__ == "__main__":
try:
logger.log_info("Initializing cache directories")
ensure_cache_dirs()
logger.log_info("Initializing CDN resource cache")
fetch_cdn_resources()
logger.log_info("CDN resources ready!")
threading.Thread(target=run_pypost, daemon=True).start()
logger.log_debug("Started PyPost.py in background watcher thread.")
server_address = ("localhost", 8000)
httpd: HTTPServer = HTTPServer(server_address, WebServerHTTPRequestHandler)
logger.log_info(f"Serving on http://{server_address[0]}:{server_address[1]}")
logger.log_info(f"Icon max size: {ICON_MAX_SIZE} \n Image max size: {STANDARD_IMAGE_MAX_SIZE}")
httpd.serve_forever()
prewarm_image_cache()
except (Exception, KeyboardInterrupt) as e:
if KeyboardInterrupt:
logger.log_info(f"Shutting down server.\n Reason: KeyboardInterrupt")
else:
logger.log_info(f"Shutting down server.\n Reason: {e}")
httpd.server_close()