339 lines
14 KiB
Python
339 lines
14 KiB
Python
import os
|
|
import sys
|
|
import threading
|
|
import subprocess
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
import mimetypes
|
|
import json
|
|
from jsmin import jsmin # pip install jsmin
|
|
from pathlib import Path
|
|
|
|
from log.Logger import *
|
|
from lua import plugin_manager
|
|
from PyPost import extract_summary
|
|
|
|
logger = Logger()
|
|
plugin_manager = plugin_manager.PluginManager()
|
|
plugin_manager.load_all() # load all plugins
|
|
|
|
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
|
|
HTML_DIR = os.path.join(PROJECT_ROOT, "html")
|
|
MARKDOWN_DIR = os.path.join(PROJECT_ROOT, "markdown")
|
|
BASE_FILE = os.path.join(HTML_DIR, "base", "index.html")
|
|
LUA_DIR = Path(PROJECT_ROOT) / "lua" / "plugins"
|
|
|
|
def get_html_files(directory=HTML_DIR):
|
|
html_files = []
|
|
for entry in os.listdir(directory):
|
|
full_path = os.path.join(directory, entry)
|
|
if os.path.isfile(full_path) and entry.endswith(".html"):
|
|
html_files.append(entry)
|
|
return html_files
|
|
|
|
|
|
def build_index_page() -> str:
|
|
with open(BASE_FILE, "r", encoding="utf-8") as f:
|
|
base_html = f.read()
|
|
|
|
articles = []
|
|
for md_path in Path(MARKDOWN_DIR).rglob("*.md"):
|
|
try:
|
|
summary_data = extract_summary(md_path)
|
|
if summary_data:
|
|
html_name, summary = summary_data
|
|
else:
|
|
html_name = md_path.stem + ".html"
|
|
summary = "No Summary for this Article!"
|
|
|
|
text = md_path.read_text(encoding="utf-8")
|
|
title = md_path.stem
|
|
for line in text.splitlines():
|
|
if line.startswith("# "):
|
|
title = line[2:].strip()
|
|
break
|
|
|
|
article_html = f"""
|
|
<article>
|
|
<h3><a href="/html/{html_name}">{title}</a></h3>
|
|
<p>{summary}</p>
|
|
</article>
|
|
"""
|
|
articles.append(article_html)
|
|
|
|
except Exception as e:
|
|
logger.log_warning(f"Exception with summary: {e} at {md_path}")
|
|
continue
|
|
|
|
full_content = "\n".join(articles) + "</main>" + index_footer()
|
|
return base_html.replace("<!-- CONTENT -->", full_content)
|
|
|
|
|
|
|
|
import base64
|
|
import random
|
|
|
|
from hashes.hashes import hash_list
|
|
|
|
# Generate hashes only once at server start
|
|
H1 = random.choice(hash_list)
|
|
H2_CANDIDATES = [h for h in hash_list if h != H1]
|
|
H2 = random.choice(H2_CANDIDATES) if H2_CANDIDATES else H1
|
|
|
|
# cahcing was a bad, idea, servertime got stuck. it is now a variable ;)
|
|
def index_footer() -> str:
|
|
tor_link = "http://7uhuxits7qfmiagkmpazxvh3rtk6aijs6pbawge3fl77y4xqjixlhkqd.onion/"
|
|
return f"""
|
|
<!-- Footer styling doesnt need to work with
|
|
flex, or anything else, because pagnation.
|
|
-->
|
|
<div class="footer">
|
|
<footer>
|
|
<p>
|
|
<!-- Server Time -->
|
|
<img src="../css/icons/date.webp" width="16" height="16" alt="date" loading="lazy" style="vertical-align: middle;" />
|
|
Server-Time (CET ; GMT+2): <i>{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}</i><br />
|
|
<!-- Hashes -->
|
|
<img src="../css/icons/magnifier.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
|
|
Hash 1 (<b>UTF-8</b>)<i>:{base64.b64encode(H1.encode("utf-8")).decode("utf-8")}</i><br />
|
|
<img src="../css/icons/magnifier.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
|
|
Hash 2 (<b>Windows-1252</b>)<i>:{base64.b64encode(H2.encode("windows-1252")).decode("windows-1252")}</i><br />
|
|
<!-- Git Repository Link -->
|
|
<img src="../css/icons/written.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
|
|
<a style="text-decoration:none;color:#0066cc;font-style:italic;padding-top:5px;" href="https://rattatwinko.servecounterstrike.com/gitea/rattatwinko/PyPost">View Git-Repository</a><br />
|
|
<img src="../css/icons/script.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
|
|
<a style="text-decoration:none;color:#0066cc;font-style:italic;padding-top:5px;" href="{tor_link}">View Tor Site</a>
|
|
</p>
|
|
</footer>
|
|
</div>
|
|
"""
|
|
|
|
class WebServerHTTPRequestHandler(BaseHTTPRequestHandler):
|
|
# This is a Helper Function for the POST Endpoints
|
|
def _parse_post_data(self):
|
|
"""Parse POST request body"""
|
|
import json
|
|
content_length = int(self.headers.get('Content-Length', 0))
|
|
if content_length == 0:
|
|
return {}
|
|
|
|
post_data = self.rfile.read(content_length)
|
|
content_type = self.headers.get('Content-Type', '')
|
|
|
|
try:
|
|
if 'application/json' in content_type:
|
|
return json.loads(post_data.decode('utf-8'))
|
|
elif 'application/x-www-form-urlencoded' in content_type:
|
|
from urllib.parse import parse_qs
|
|
parsed = parse_qs(post_data.decode('utf-8'))
|
|
return {k: v[0] if len(v) == 1 else v for k, v in parsed.items()}
|
|
else:
|
|
return {"raw": post_data}
|
|
except Exception as e:
|
|
logger.log_error(f"Error parsing POST data: {e}")
|
|
return {"raw": post_data}
|
|
|
|
def do_POST(self):
|
|
"""Handle POST requests - primarily for plugin routes"""
|
|
req_path = self.path.lstrip("/")
|
|
|
|
# Parse POST data
|
|
post_data = self._parse_post_data()
|
|
|
|
# Add additional request info
|
|
request_data = {
|
|
"path": self.path,
|
|
"headers": dict(self.headers),
|
|
"data": post_data,
|
|
"method": "POST"
|
|
}
|
|
|
|
# Check plugin routes
|
|
plugin_result = plugin_manager.handle_request("/" + req_path, request_data, method="POST")
|
|
if plugin_result is not None:
|
|
status, headers, body = plugin_result
|
|
self.send_response(status)
|
|
for key, value in headers.items():
|
|
self.send_header(key, value)
|
|
self.end_headers()
|
|
|
|
if isinstance(body, str):
|
|
self.wfile.write(body.encode("utf-8"))
|
|
elif isinstance(body, bytes):
|
|
self.wfile.write(body)
|
|
else:
|
|
self.wfile.write(str(body).encode("utf-8"))
|
|
return
|
|
|
|
# No plugin handled this POST request
|
|
self.send_response(404)
|
|
self.send_header("Content-type", "application/json")
|
|
self.end_headers()
|
|
error_response = json.dumps({"error": "Route not found"})
|
|
self.wfile.write(error_response.encode("utf-8"))
|
|
|
|
def do_GET(self):
|
|
req_path = self.path.lstrip("/") # normalize leading /
|
|
|
|
# Handle root/index
|
|
if req_path == "" or req_path == "index.html":
|
|
content = build_index_page()
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "text/html")
|
|
self.end_headers()
|
|
self.wfile.write(content.encode("utf-8"))
|
|
return
|
|
|
|
# CHECK PLUGIN ROUTES FIRST
|
|
plugin_result = plugin_manager.handle_request("/" + req_path, {"path": self.path})
|
|
if plugin_result is not None:
|
|
status, headers, body = plugin_result
|
|
self.send_response(status)
|
|
for key, value in headers.items():
|
|
self.send_header(key, value)
|
|
self.end_headers()
|
|
self.wfile.write(body.encode("utf-8") if isinstance(body, str) else body)
|
|
return
|
|
|
|
# Handle markdown file downloads
|
|
if req_path.startswith("markdown/"):
|
|
markdown_filename = req_path[9:] # Remove "markdown/" prefix
|
|
|
|
# Security check
|
|
if not markdown_filename.endswith(".md") or ".." in markdown_filename or "/" in markdown_filename:
|
|
self.send_response(403)
|
|
self.end_headers()
|
|
self.wfile.write(b"403 - Forbidden: Only .md files allowed")
|
|
return
|
|
|
|
markdown_file_path = os.path.join(MARKDOWN_DIR, markdown_filename)
|
|
|
|
if not os.path.exists(markdown_file_path) or not os.path.isfile(markdown_file_path):
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
self.wfile.write(b"404 - Markdown file not found")
|
|
return
|
|
|
|
resolved_path = os.path.realpath(markdown_file_path)
|
|
resolved_markdown_dir = os.path.realpath(MARKDOWN_DIR)
|
|
if not resolved_path.startswith(resolved_markdown_dir):
|
|
self.send_response(403)
|
|
self.end_headers()
|
|
self.wfile.write(b"403 - Forbidden")
|
|
return
|
|
|
|
try:
|
|
with open(markdown_file_path, "rb") as f:
|
|
content = f.read()
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "text/markdown")
|
|
self.send_header("Content-Disposition", f'attachment; filename="{markdown_filename}"')
|
|
self.end_headers()
|
|
self.wfile.write(content)
|
|
logger.log_info(f"Served markdown file: {markdown_filename}")
|
|
return
|
|
|
|
except Exception as err:
|
|
logger.log_error(f"Error serving markdown file {markdown_filename}: {err}")
|
|
self.send_response(500)
|
|
self.end_headers()
|
|
self.wfile.write(b"500 - Internal Server Error")
|
|
return
|
|
|
|
# Handle Lua file downloads
|
|
if req_path.startswith("lua/"):
|
|
lua_filename = req_path[4:] # Remove "lua/" prefix
|
|
|
|
# Security check
|
|
if not lua_filename.endswith(".lua") or ".." in lua_filename or "/" in lua_filename:
|
|
self.send_response(403)
|
|
self.end_headers()
|
|
self.wfile.write(b"403 - Forbidden: Only .lua files allowed")
|
|
return
|
|
|
|
lua_file_path = os.path.join(LUA_DIR, lua_filename)
|
|
|
|
if not os.path.exists(lua_file_path) or not os.path.isfile(lua_file_path):
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
self.wfile.write(b"404 - Lua file not found")
|
|
return
|
|
|
|
resolved_path = os.path.realpath(lua_file_path)
|
|
resolved_lua_dir = os.path.realpath(LUA_DIR)
|
|
if not resolved_path.startswith(resolved_lua_dir):
|
|
self.send_response(403)
|
|
self.end_headers()
|
|
self.wfile.write(b"403 - Forbidden")
|
|
return
|
|
|
|
try:
|
|
with open(lua_file_path, "rb") as f:
|
|
content = f.read()
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "text/x-lua")
|
|
self.send_header("Content-Disposition", f'attachment; filename="{lua_filename}"')
|
|
self.end_headers()
|
|
self.wfile.write(content)
|
|
logger.log_info(f"Served Lua file: {lua_filename}")
|
|
return
|
|
|
|
except Exception as err:
|
|
logger.log_error(f"Error serving Lua file {lua_filename}: {err}")
|
|
self.send_response(500)
|
|
self.end_headers()
|
|
self.wfile.write(b"500 - Internal Server Error")
|
|
return
|
|
|
|
# Handle other files (existing functionality)
|
|
file_path = os.path.normpath(os.path.join(PROJECT_ROOT, req_path))
|
|
if not file_path.startswith(PROJECT_ROOT):
|
|
self.send_response(403)
|
|
self.end_headers()
|
|
self.wfile.write(b"403 - Forbidden")
|
|
return
|
|
|
|
if os.path.isfile(file_path):
|
|
mime_type, _ = mimetypes.guess_type(file_path)
|
|
if mime_type is None:
|
|
mime_type = "application/octet-stream"
|
|
|
|
with open(file_path, "rb") as f:
|
|
content = f.read()
|
|
|
|
# Obfuscate JS on the fly
|
|
if mime_type == "application/javascript" or file_path.endswith(".js"):
|
|
try:
|
|
content = jsmin(content.decode("utf-8")).encode("utf-8")
|
|
except Exception as err:
|
|
logger.log_error(f"Error minifying JS file {file_path}: {err}")
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-type", mime_type)
|
|
self.end_headers()
|
|
self.wfile.write(content)
|
|
return
|
|
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
self.wfile.write(b"404 - Not Found")
|
|
|
|
|
|
def run_pypost():
|
|
"""Run PyPost.py in a separate process."""
|
|
script = os.path.join(PROJECT_ROOT, "PyPost.py")
|
|
subprocess.run([sys.executable, script])
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
threading.Thread(target=run_pypost, daemon=True).start()
|
|
logger.log_debug("Started PyPost.py in background watcher thread.")
|
|
|
|
server_address = ("localhost", 8000)
|
|
httpd: HTTPServer = HTTPServer(server_address, WebServerHTTPRequestHandler) # type: ignore[arg-type]
|
|
logger.log_info(f"Serving on http://{server_address[0]}:{server_address[1]}")
|
|
httpd.serve_forever()
|
|
except (Exception, KeyboardInterrupt) as e:
|
|
logger.log_info(f"Shutting down server.\n Reason: {e}")
|
|
httpd.server_close() |