Files
PyPost/webserver.py

339 lines
14 KiB
Python

import os
import sys
import threading
import subprocess
from http.server import BaseHTTPRequestHandler, HTTPServer
import mimetypes
import json
from jsmin import jsmin # pip install jsmin
from pathlib import Path
from log.Logger import *
from lua import plugin_manager
from PyPost import extract_summary
logger = Logger()
plugin_manager = plugin_manager.PluginManager()
plugin_manager.load_all() # load all plugins
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
HTML_DIR = os.path.join(PROJECT_ROOT, "html")
MARKDOWN_DIR = os.path.join(PROJECT_ROOT, "markdown")
BASE_FILE = os.path.join(HTML_DIR, "base", "index.html")
LUA_DIR = Path(PROJECT_ROOT) / "lua" / "plugins"
def get_html_files(directory=HTML_DIR):
html_files = []
for entry in os.listdir(directory):
full_path = os.path.join(directory, entry)
if os.path.isfile(full_path) and entry.endswith(".html"):
html_files.append(entry)
return html_files
def build_index_page():
with open(BASE_FILE, "r", encoding="utf-8") as f:
base_html = f.read()
articles = []
for md_path in Path(MARKDOWN_DIR).rglob("*.md"):
try:
summary_data = extract_summary(md_path)
if summary_data:
html_name, summary = summary_data
else:
html_name = md_path.stem + ".html"
summary = "No Summary for this Article!"
text = md_path.read_text(encoding="utf-8")
title = md_path.stem
for line in text.splitlines():
if line.startswith("# "):
title = line[2:].strip()
break
article_html = f"""
<article>
<h3><a href="/html/{html_name}">{title}</a></h3>
<p>{summary}</p>
</article>
"""
articles.append(article_html)
except Exception as e:
logger.log_warning(f"Exception with summary: {e} at {md_path}")
continue
full_content = "\n".join(articles) + "</main>" + index_footer()
return base_html.replace("<!-- CONTENT -->", full_content)
import base64
import random
from hashes.hashes import hash_list
# Generate hashes only once at server start
H1 = random.choice(hash_list)
H2_CANDIDATES = [h for h in hash_list if h != H1]
H2 = random.choice(H2_CANDIDATES) if H2_CANDIDATES else H1
# cahcing was a bad, idea, servertime got stuck. it is now a variable ;)
def index_footer():
tor_link = "http://7uhuxits7qfmiagkmpazxvh3rtk6aijs6pbawge3fl77y4xqjixlhkqd.onion/"
return f"""
<!-- Footer styling doesnt need to work with
flex, or anything else, because pagnation.
-->
<div class="footer">
<footer>
<p>
<!-- Server Time -->
<img src="../css/icons/date.webp" width="16" height="16" alt="date" loading="lazy" style="vertical-align: middle;" />
Server-Time (CET ; GMT+2): <i>{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}</i><br />
<!-- Hashes -->
<img src="../css/icons/magnifier.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
Hash 1 (<b>UTF-8</b>)<i>:{base64.b64encode(H1.encode("utf-8")).decode("utf-8")}</i><br />
<img src="../css/icons/magnifier.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
Hash 2 (<b>Windows-1252</b>)<i>:{base64.b64encode(H2.encode("windows-1252")).decode("windows-1252")}</i><br />
<!-- Git Repository Link -->
<img src="../css/icons/written.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
<a style="text-decoration:none;color:#0066cc;font-style:italic;padding-top:5px;" href="https://rattatwinko.servecounterstrike.com/gitea/rattatwinko/PyPost">View Git-Repository</a><br />
<img src="../css/icons/script.webp" width="16" height="16" alt="Hash2" loading="lazy" style="display:inline; vertical-align:middle;" />
<a style="text-decoration:none;color:#0066cc;font-style:italic;padding-top:5px;" href="{tor_link}">View Tor Site</a>
</p>
</footer>
</div>
"""
class MyHandler(BaseHTTPRequestHandler):
# This is a Helper Function for the POST Endpoints
def _parse_post_data(self):
"""Parse POST request body"""
import json
content_length = int(self.headers.get('Content-Length', 0))
if content_length == 0:
return {}
post_data = self.rfile.read(content_length)
content_type = self.headers.get('Content-Type', '')
try:
if 'application/json' in content_type:
return json.loads(post_data.decode('utf-8'))
elif 'application/x-www-form-urlencoded' in content_type:
from urllib.parse import parse_qs
parsed = parse_qs(post_data.decode('utf-8'))
return {k: v[0] if len(v) == 1 else v for k, v in parsed.items()}
else:
return {"raw": post_data}
except Exception as e:
logger.log_error(f"Error parsing POST data: {e}")
return {"raw": post_data}
def do_POST(self):
"""Handle POST requests - primarily for plugin routes"""
req_path = self.path.lstrip("/")
# Parse POST data
post_data = self._parse_post_data()
# Add additional request info
request_data = {
"path": self.path,
"headers": dict(self.headers),
"data": post_data,
"method": "POST"
}
# Check plugin routes
plugin_result = plugin_manager.handle_request("/" + req_path, request_data, method="POST")
if plugin_result is not None:
status, headers, body = plugin_result
self.send_response(status)
for key, value in headers.items():
self.send_header(key, value)
self.end_headers()
if isinstance(body, str):
self.wfile.write(body.encode("utf-8"))
elif isinstance(body, bytes):
self.wfile.write(body)
else:
self.wfile.write(str(body).encode("utf-8"))
return
# No plugin handled this POST request
self.send_response(404)
self.send_header("Content-type", "application/json")
self.end_headers()
error_response = json.dumps({"error": "Route not found"})
self.wfile.write(error_response.encode("utf-8"))
def do_GET(self):
req_path = self.path.lstrip("/") # normalize leading /
# Handle root/index
if req_path == "" or req_path == "index.html":
content = build_index_page()
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(content.encode("utf-8"))
return
# CHECK PLUGIN ROUTES FIRST
plugin_result = plugin_manager.handle_request("/" + req_path, {"path": self.path})
if plugin_result is not None:
status, headers, body = plugin_result
self.send_response(status)
for key, value in headers.items():
self.send_header(key, value)
self.end_headers()
self.wfile.write(body.encode("utf-8") if isinstance(body, str) else body)
return
# Handle markdown file downloads
if req_path.startswith("markdown/"):
markdown_filename = req_path[9:] # Remove "markdown/" prefix
# Security check
if not markdown_filename.endswith(".md") or ".." in markdown_filename or "/" in markdown_filename:
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden: Only .md files allowed")
return
markdown_file_path = os.path.join(MARKDOWN_DIR, markdown_filename)
if not os.path.exists(markdown_file_path) or not os.path.isfile(markdown_file_path):
self.send_response(404)
self.end_headers()
self.wfile.write(b"404 - Markdown file not found")
return
resolved_path = os.path.realpath(markdown_file_path)
resolved_markdown_dir = os.path.realpath(MARKDOWN_DIR)
if not resolved_path.startswith(resolved_markdown_dir):
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden")
return
try:
with open(markdown_file_path, "rb") as f:
content = f.read()
self.send_response(200)
self.send_header("Content-type", "text/markdown")
self.send_header("Content-Disposition", f'attachment; filename="{markdown_filename}"')
self.end_headers()
self.wfile.write(content)
logger.log_info(f"Served markdown file: {markdown_filename}")
return
except Exception as err:
logger.log_error(f"Error serving markdown file {markdown_filename}: {err}")
self.send_response(500)
self.end_headers()
self.wfile.write(b"500 - Internal Server Error")
return
# Handle Lua file downloads
if req_path.startswith("lua/"):
lua_filename = req_path[4:] # Remove "lua/" prefix
# Security check
if not lua_filename.endswith(".lua") or ".." in lua_filename or "/" in lua_filename:
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden: Only .lua files allowed")
return
lua_file_path = os.path.join(LUA_DIR, lua_filename)
if not os.path.exists(lua_file_path) or not os.path.isfile(lua_file_path):
self.send_response(404)
self.end_headers()
self.wfile.write(b"404 - Lua file not found")
return
resolved_path = os.path.realpath(lua_file_path)
resolved_lua_dir = os.path.realpath(LUA_DIR)
if not resolved_path.startswith(resolved_lua_dir):
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden")
return
try:
with open(lua_file_path, "rb") as f:
content = f.read()
self.send_response(200)
self.send_header("Content-type", "text/x-lua")
self.send_header("Content-Disposition", f'attachment; filename="{lua_filename}"')
self.end_headers()
self.wfile.write(content)
logger.log_info(f"Served Lua file: {lua_filename}")
return
except Exception as err:
logger.log_error(f"Error serving Lua file {lua_filename}: {err}")
self.send_response(500)
self.end_headers()
self.wfile.write(b"500 - Internal Server Error")
return
# Handle other files (existing functionality)
file_path = os.path.normpath(os.path.join(PROJECT_ROOT, req_path))
if not file_path.startswith(PROJECT_ROOT):
self.send_response(403)
self.end_headers()
self.wfile.write(b"403 - Forbidden")
return
if os.path.isfile(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = "application/octet-stream"
with open(file_path, "rb") as f:
content = f.read()
# Obfuscate JS on the fly
if mime_type == "application/javascript" or file_path.endswith(".js"):
try:
content = jsmin(content.decode("utf-8")).encode("utf-8")
except Exception as err:
logger.log_error(f"Error minifying JS file {file_path}: {err}")
self.send_response(200)
self.send_header("Content-type", mime_type)
self.end_headers()
self.wfile.write(content)
return
self.send_response(404)
self.end_headers()
self.wfile.write(b"404 - Not Found")
def run_pypost():
"""Run PyPost.py in a separate process."""
script = os.path.join(PROJECT_ROOT, "PyPost.py")
subprocess.run([sys.executable, script])
if __name__ == "__main__":
try:
threading.Thread(target=run_pypost, daemon=True).start()
logger.log_debug("Started PyPost.py in background watcher thread.")
server_address = ("localhost", 8000)
httpd: HTTPServer = HTTPServer(server_address, MyHandler) # type: ignore[arg-type]
logger.log_info(f"Serving on http://{server_address[0]}:{server_address[1]}")
httpd.serve_forever()
except (Exception, KeyboardInterrupt) as e:
logger.log_info(f"Shutting down server.\n Reason: {e}")
httpd.server_close()