From 8dd5fcbeb7ae1d2bfaa3884552239cb83b104c5f Mon Sep 17 00:00:00 2001 From: rattatwinko Date: Sun, 17 Aug 2025 12:19:18 +0200 Subject: [PATCH] md for database as it was fucked up in windows script --- requirements.txt | 12 +- run.bash | 42 ++- run.bat | 4 +- src/app.py | 272 +++++++-------- src/problem_scanner.py | 769 +++++++++++++++++++++-------------------- 5 files changed, 556 insertions(+), 543 deletions(-) diff --git a/requirements.txt b/requirements.txt index 4d9b37d..26a9044 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ -Flask>=3.0 -Flask-SQLAlchemy>=3.1 -Markdown>=3.6 -MarkupSafe>=2.1 -watchdog>=4.0 \ No newline at end of file +Flask>=3.0 +Flask-SQLAlchemy>=3.1 +Markdown>=3.6 +MarkupSafe>=2.1 +watchdog>=4.0 +gunicorn>=23.0.0 +waitress>=3.0.2 \ No newline at end of file diff --git a/run.bash b/run.bash index 80f5336..d4ded69 100644 --- a/run.bash +++ b/run.bash @@ -1,17 +1,25 @@ -u!/bin/bash - -set -e # exit if any command fails - -# Ensure QPP/database directory exists -mkdir -p src/database - -python -m venv venv -source venv/bin/activate - -pip install --upgrade pip -pip install -r requirements.txt - -export FLASK_APP=src.app -export FLASK_ENV=production - -flask run --host=0.0.0.0 --port=5000 +#!/bin/bash + +set -e # exit if any command fails + +# Ensure src/database directory exists +mkdir -p src/database + +# Create virtual environment if it doesn't exist +if [ ! -d "venv" ]; then + python -m venv venv +fi +source venv/bin/activate + +# Upgrade pip and install dependencies +pip install --upgrade pip +pip install -r requirements.txt + +# Export environment variables +export FLASK_APP=src.app +export FLASK_ENV=production + +# Run with Gunicorn +echo "Starting Flask app with Gunicorn..." +exec gunicorn -w 4 -b 0.0.0.0:5000 src.app:app + diff --git a/run.bat b/run.bat index ba180fe..29376ac 100644 --- a/run.bat +++ b/run.bat @@ -1 +1,3 @@ -python -m flask --app .\src\app.py run --host=0.0.0.0 --port=5000 \ No newline at end of file +:: make db directory and then launch the server +md .\src\database +python -m waitress --listen=0.0.0.0:8000 src.app:app diff --git a/src/app.py b/src/app.py index 317f07e..02d9801 100644 --- a/src/app.py +++ b/src/app.py @@ -1,136 +1,136 @@ -from markupsafe import Markup -from flask import Flask, render_template, request, redirect, url_for, send_from_directory -import markdown as md -import ast -from src.models import db, Problem, Solution -from src.utils import run_code_against_tests -from src.leaderboard import create_leaderboard_table, log_leaderboard, get_leaderboard - - -import os -## from problem_loader import load_problems_from_json, schedule_problem_reload -from src.problem_scanner import start_problem_scanner -import sqlite3 -from pathlib import Path - -app = Flask(__name__) - -BASE_DIR = Path(__file__).parent -app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{BASE_DIR / 'database' / 'db.sqlite3'}" - -print(f">>>>>>>>>>>>>>>>>>>>< Using database URI: {app.config['SQLALCHEMY_DATABASE_URI']}") - -db.init_app(app) - -@app.before_request -def setup(): - db.create_all() - create_leaderboard_table() # Ensure leaderboard table exists - # Problems are now loaded from manifests by the background scanner. No need to load problems.json. - -# Start the background thread to scan problems -start_problem_scanner() - -@app.route("/script.js") -def script(): - return send_from_directory("templates", "script.js") - -@app.route('/favicon.ico') -def favicon(): - return send_from_directory("templates", "favicon.ico") - -@app.route('/') -def index(): - db_path = Path(__file__).parent / 'database/problems.sqlite3' - conn = sqlite3.connect(db_path) - c = conn.cursor() - # - c.execute('SELECT folder, description, test_code, difficulty FROM problems') - problems = c.fetchall() - conn.close() - # Get leaderboard entries - leaderboard = get_leaderboard() - # Map folder to title for display - problem_titles = {folder: folder.replace('_', ' ').title() for folder, _, _, _ in problems} - return render_template('index.html', problems=problems, leaderboard=leaderboard, problem_titles=problem_titles) - -@app.route('/problem/new', methods=['GET', 'POST']) -def new_problem(): - if request.method == 'POST': - title = request.form['title'] - description = request.form['description'] - test_code = request.form['test_code'] - problem = Problem(title=title, description=description, test_code=test_code) - db.session.add(problem) - db.session.commit() - return redirect(url_for('index')) - return render_template('new_problem.html') - -@app.route('/problem/', methods=['GET', 'POST']) -def view_problem(folder): - db_path = Path(__file__).parent / 'database/problems.sqlite3' - conn = sqlite3.connect(db_path) - c = conn.cursor() - c.execute('SELECT folder, description,test_code , difficulty FROM problems WHERE folder = ?', (folder,)) - row = c.fetchone() - conn.close() - - if not row: - return 'Problem not found', 404 - - problem = { - 'folder': row[0], - 'description': row[1], - 'difficulty': row[3], # now correct - 'test_code': row[2], # now correct - } - - result = None - if request.method == 'POST': - user_code = request.form['user_code'] - username = request.form.get('username', '').strip() or 'Anonymous' - import tracemalloc - tracemalloc.start() - run_result = run_code_against_tests(user_code, problem['test_code']) - current, peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - memory_used = peak // 1024 # in KB - - # Try to get the last line number executed (even for successful runs) - line_number = None - try: - tree = ast.parse(user_code) - # Find the highest line number in the AST (for multi-function/user code) - def get_max_lineno(node): - max_lineno = getattr(node, 'lineno', 0) - for child in ast.iter_child_nodes(node): - max_lineno = max(max_lineno, get_max_lineno(child)) - return max_lineno - line_number = get_max_lineno(tree) - except Exception: - pass - - # If there was an error, try to get the error line number from the traceback - if run_result['error']: - tb = run_result['error'] - import traceback - try: - tb_lines = traceback.extract_tb(traceback.TracebackException.from_string(tb).stack) - if tb_lines: - line_number = tb_lines[-1].lineno - except Exception: - pass - - # ONLY log to leaderboard if the solution passed all tests - if run_result['passed']: - log_leaderboard(username, problem['folder'], run_result['runtime'], memory_used, line_number) - - result = run_result - return render_template('problem.html', problem=problem, result=result) - -@app.template_filter('markdown') -def markdown_filter(text): - return Markup(md.markdown(text or '', extensions=['extra', 'sane_lists'])) - -if __name__ == '__main__': - app.run(debug=True) +from markupsafe import Markup +from flask import Flask, render_template, request, redirect, url_for, send_from_directory +import markdown as md +import ast +from src.models import db, Problem, Solution +from src.utils import run_code_against_tests +from src.leaderboard import create_leaderboard_table, log_leaderboard, get_leaderboard + + +import os +## from problem_loader import load_problems_from_json, schedule_problem_reload +from src.problem_scanner import start_problem_scanner +import sqlite3 +from pathlib import Path + +app = Flask(__name__) + +BASE_DIR = Path(__file__).parent +app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{BASE_DIR / 'database' / 'db.sqlite3'}" + +print(f"[ INFO ] : Using database URI: {app.config['SQLALCHEMY_DATABASE_URI']}") + +db.init_app(app) + +@app.before_request +def setup(): + db.create_all() + create_leaderboard_table() # Ensure leaderboard table exists + # Problems are now loaded from manifests by the background scanner. No need to load problems.json. + +# Start the background thread to scan problems +start_problem_scanner() + +@app.route("/script.js") +def script(): + return send_from_directory("templates", "script.js") + +@app.route('/favicon.ico') +def favicon(): + return send_from_directory("templates", "favicon.ico") + +@app.route('/') +def index(): + db_path = Path(__file__).parent / 'database/problems.sqlite3' + conn = sqlite3.connect(db_path) + c = conn.cursor() + # + c.execute('SELECT folder, description, test_code, difficulty FROM problems') + problems = c.fetchall() + conn.close() + # Get leaderboard entries + leaderboard = get_leaderboard() + # Map folder to title for display + problem_titles = {folder: folder.replace('_', ' ').title() for folder, _, _, _ in problems} + return render_template('index.html', problems=problems, leaderboard=leaderboard, problem_titles=problem_titles) + +@app.route('/problem/new', methods=['GET', 'POST']) +def new_problem(): + if request.method == 'POST': + title = request.form['title'] + description = request.form['description'] + test_code = request.form['test_code'] + problem = Problem(title=title, description=description, test_code=test_code) + db.session.add(problem) + db.session.commit() + return redirect(url_for('index')) + return render_template('new_problem.html') + +@app.route('/problem/', methods=['GET', 'POST']) +def view_problem(folder): + db_path = Path(__file__).parent / 'database/problems.sqlite3' + conn = sqlite3.connect(db_path) + c = conn.cursor() + c.execute('SELECT folder, description,test_code , difficulty FROM problems WHERE folder = ?', (folder,)) + row = c.fetchone() + conn.close() + + if not row: + return 'Problem not found', 404 + + problem = { + 'folder': row[0], + 'description': row[1], + 'difficulty': row[3], # now correct + 'test_code': row[2], # now correct + } + + result = None + if request.method == 'POST': + user_code = request.form['user_code'] + username = request.form.get('username', '').strip() or 'Anonymous' + import tracemalloc + tracemalloc.start() + run_result = run_code_against_tests(user_code, problem['test_code']) + current, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + memory_used = peak // 1024 # in KB + + # Try to get the last line number executed (even for successful runs) + line_number = None + try: + tree = ast.parse(user_code) + # Find the highest line number in the AST (for multi-function/user code) + def get_max_lineno(node): + max_lineno = getattr(node, 'lineno', 0) + for child in ast.iter_child_nodes(node): + max_lineno = max(max_lineno, get_max_lineno(child)) + return max_lineno + line_number = get_max_lineno(tree) + except Exception: + pass + + # If there was an error, try to get the error line number from the traceback + if run_result['error']: + tb = run_result['error'] + import traceback + try: + tb_lines = traceback.extract_tb(traceback.TracebackException.from_string(tb).stack) + if tb_lines: + line_number = tb_lines[-1].lineno + except Exception: + pass + + # ONLY log to leaderboard if the solution passed all tests + if run_result['passed']: + log_leaderboard(username, problem['folder'], run_result['runtime'], memory_used, line_number) + + result = run_result + return render_template('problem.html', problem=problem, result=result) + +@app.template_filter('markdown') +def markdown_filter(text): + return Markup(md.markdown(text or '', extensions=['extra', 'sane_lists'])) + +if __name__ == '__main__': + app.run(debug=True) diff --git a/src/problem_scanner.py b/src/problem_scanner.py index 47af8e0..6fb7b66 100644 --- a/src/problem_scanner.py +++ b/src/problem_scanner.py @@ -1,384 +1,385 @@ -import os -import time -import json -import sqlite3 -import threading -import random -import tempfile -import subprocess -import sys -import traceback -import io -from pathlib import Path - -try: - from watchdog.observers import Observer - from watchdog.events import FileSystemEventHandler - WATCHDOG_AVAILABLE = True -except ImportError: - WATCHDOG_AVAILABLE = False - -PROBLEMS_DIR = Path(__file__).parent / 'problems' -DB_PATH = Path(__file__).parent / 'database/problems.sqlite3' - -class ProblemScannerThread(threading.Thread): - def __init__(self, scan_interval=2): - super().__init__(daemon=True) - self.scan_interval = scan_interval - self.last_state = {} - self.observer = None - - def create_table(self, conn): - c = conn.cursor() - c.execute('PRAGMA journal_mode=WAL;') - c.execute('''CREATE TABLE IF NOT EXISTS problems ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - folder TEXT, - description TEXT, - difficulty TEXT, - test_code TEXT - )''') - conn.commit() - - def scan(self): - problems = [] - if not PROBLEMS_DIR.exists(): - print(f"Problems directory does not exist: {PROBLEMS_DIR}") - return problems - - for folder in PROBLEMS_DIR.iterdir(): - if folder.is_dir(): - # Dynamically find manifest file (manifest.json or manifets.json) - manifest_path = None - for candidate in ["manifest.json", "manifets.json"]: - candidate_path = folder / candidate - if candidate_path.exists(): - manifest_path = candidate_path - break - - desc_path = folder / 'description.md' - test_path = folder / 'test.py' - - # Check if required files exist - if manifest_path and desc_path.exists() and test_path.exists(): - try: - with open(desc_path, 'r', encoding='utf-8') as f: - description = f.read() - with open(test_path, 'r', encoding='utf-8') as f: - test_code = f.read() - with open(manifest_path, 'r', encoding='utf-8') as f: - manifest = json.load(f) - - difficulty = manifest.get('difficulty', 'unknown') - - problems.append({ - 'folder': folder.name, - 'description': description, - 'test_code': test_code, - 'difficulty': difficulty - }) - print(f"Found problem: {folder.name} ; Difficulty: {difficulty}") - except Exception as e: - print(f"Error reading problem files for {folder.name}: {e}") - else: - missing_files = [] - if not manifest_path: - missing_files.append("manifest.json/manifets.json") - if not desc_path.exists(): - missing_files.append("description.md") - if not test_path.exists(): - missing_files.append("test.py") - print(f"Skipping {folder.name}: missing {', '.join(missing_files)}") - - print(f"Total problems found: {len(problems)}") - return problems - - def update_db(self, problems, retries=5): - for attempt in range(retries): - try: - conn = sqlite3.connect(DB_PATH, timeout=5) - c = conn.cursor() - c.execute('PRAGMA journal_mode=WAL;') - - # Clear existing problems - c.execute('DELETE FROM problems') - - # Insert new problems - for p in problems: - c.execute('''INSERT INTO problems - (folder, description, difficulty, test_code) - VALUES (?, ?, ?, ?)''', - (p['folder'], p['description'], p['difficulty'], p['test_code'])) - - conn.commit() - print(f"Updated database with {len(problems)} problems") - conn.close() - return - - except sqlite3.OperationalError as e: - if 'locked' in str(e).lower(): - wait_time = 0.2 + random.random() * 0.3 - print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})") - time.sleep(wait_time) - else: - print(f"Database error: {e}") - raise - except Exception as e: - print(f"Unexpected error updating database: {e}") - raise - - print('Failed to update problems DB after several retries due to lock.') - - def rescan_and_update(self): - print("Scanning for problems...") - problems = self.scan() - self.update_db(problems) - - def run(self): - print("Starting problem scanner...") - - # Initial scan and table creation - try: - conn = sqlite3.connect(DB_PATH) - self.create_table(conn) - conn.close() - print("Database initialized") - except Exception as e: - print(f"Failed to initialize database: {e}") - return - - # Initial scan - self.rescan_and_update() - - if WATCHDOG_AVAILABLE: - print("Using watchdog for file monitoring") - - class Handler(FileSystemEventHandler): - def __init__(self, scanner): - self.scanner = scanner - self.last_event_time = 0 - - def on_any_event(self, event): - # Debounce events to avoid too many rescans - now = time.time() - if now - self.last_event_time > 1: # Wait at least 1 second between rescans - self.last_event_time = now - print(f"File system event: {event.event_type} - {event.src_path}") - self.scanner.rescan_and_update() - - event_handler = Handler(self) - self.observer = Observer() - self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True) - self.observer.start() - - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - print("Stopping problem scanner...") - finally: - self.observer.stop() - self.observer.join() - else: - print(f"Watchdog not available, using polling every {self.scan_interval}s") - # Fallback: poll every scan_interval seconds - try: - while True: - time.sleep(self.scan_interval) - self.rescan_and_update() - except KeyboardInterrupt: - print("Stopping problem scanner...") - -def start_problem_scanner(): - scanner = ProblemScannerThread() - scanner.start() - return scanner - -# Flask model loading functions -def load_problems_from_json(json_path): - """Load problems from JSON file into Flask database""" - if not os.path.exists(json_path): - print(f"Problem JSON file not found: {json_path}") - return - - try: - with open(json_path, 'r', encoding='utf-8') as f: - problems = json.load(f) - except Exception as e: - print(f"Error reading JSON file: {e}") - return - - # This assumes you have imported the necessary Flask/SQLAlchemy components - try: - from models import db, Problem - - for p in problems: - # Check if problem already exists by title - existing = Problem.query.filter_by(title=p['title']).first() - - # Load test code from solution file if provided - test_code = '' - if 'solution' in p and os.path.exists(p['solution']): - try: - with open(p['solution'], 'r', encoding='utf-8') as sf: - test_code = sf.read() - except Exception as e: - print(f"Error reading solution file for {p['title']}: {e}") - - if existing: - existing.description = p['description'] - existing.test_code = test_code - print(f"Updated problem: {p['title']}") - else: - new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code) - db.session.add(new_problem) - print(f"Added new problem: {p['title']}") - - db.session.commit() - print("Successfully updated problems from JSON") - - except ImportError: - print("Flask models not available - skipping JSON load") - except Exception as e: - print(f"Error loading problems from JSON: {e}") - -def schedule_problem_reload(app, json_path, interval_hours=10): - """Schedule periodic reloading of problems from JSON""" - def reload_loop(): - while True: - try: - with app.app_context(): - load_problems_from_json(json_path) - time.sleep(interval_hours * 3600) - except Exception as e: - print(f"Error in problem reload loop: {e}") - time.sleep(60) # Wait 1 minute before retrying - - t = threading.Thread(target=reload_loop, daemon=True) - t.start() - -def run_code_against_tests(user_code, test_code, timeout=10): - """ - Execute user code against test code with proper error handling. - - Args: - user_code: The user's solution code - test_code: The test code to validate the solution - timeout: Maximum execution time in seconds - - Returns: - dict: Result with passed, output, runtime, and error fields - """ - if not user_code or not user_code.strip(): - return { - 'passed': False, - 'output': '', - 'runtime': 0, - 'error': 'No code provided' - } - - if not test_code or not test_code.strip(): - return { - 'passed': False, - 'output': '', - 'runtime': 0, - 'error': 'No test code available' - } - - start_time = time.perf_counter() - output = '' - error = None - passed = False - temp_file = None - - try: - # Check if unittest is used in test_code - if 'unittest' in test_code: - # Create temporary file with user code + test code - with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f: - # Combine user code and test code - combined_code = f"{user_code}\n\n{test_code}" - f.write(combined_code) - f.flush() - temp_file = f.name - - try: - # Run the file as a subprocess with timeout - proc = subprocess.run( - [sys.executable, temp_file], - capture_output=True, - text=True, - timeout=timeout, - encoding='utf-8' - ) - - output = proc.stdout - if proc.stderr: - output += f"\nSTDERR:\n{proc.stderr}" - - passed = proc.returncode == 0 - if not passed: - error = f"Tests failed. Return code: {proc.returncode}\n{output}" - - except subprocess.TimeoutExpired: - passed = False - error = f"Code execution timed out after {timeout} seconds" - output = "Execution timed out" - - else: - # Direct execution approach for simple assert-based tests - local_ns = {} - - # Capture stdout - old_stdout = sys.stdout - captured_output = io.StringIO() - sys.stdout = captured_output - - try: - # Execute user code first - exec(user_code, {}, local_ns) - - # Execute test code in the same namespace - exec(test_code, local_ns, local_ns) - - # If we get here without exceptions, tests passed - passed = True - - except AssertionError as e: - passed = False - error = f"Assertion failed: {str(e)}" - - except Exception as e: - passed = False - error = f"Runtime error: {traceback.format_exc()}" - - finally: - output = captured_output.getvalue() - sys.stdout = old_stdout - - except Exception as e: - passed = False - error = f"Execution error: {traceback.format_exc()}" - - finally: - # Clean up temporary file - if temp_file and os.path.exists(temp_file): - try: - os.unlink(temp_file) - except Exception as e: - print(f"Warning: Could not delete temp file {temp_file}: {e}") - - runtime = time.perf_counter() - start_time - - result = { - 'passed': passed, - 'output': output.strip() if output else '', - 'runtime': runtime, - 'error': error if not passed else None - } - - print(f"Test execution result: passed={passed}, runtime={runtime:.3f}s") - if error: - print(f"Error: {error}") - - return result +import os +import time +import json +import sqlite3 +import threading +import random +import tempfile +import subprocess +import sys +import traceback +import io +from pathlib import Path + +try: + from watchdog.observers import Observer + from watchdog.events import FileSystemEventHandler + WATCHDOG_AVAILABLE = True +except ImportError: + WATCHDOG_AVAILABLE = False + +PROBLEMS_DIR = Path(__file__).parent / 'problems' +DB_PATH = Path(__file__).parent / 'database/problems.sqlite3' + +class ProblemScannerThread(threading.Thread): + def __init__(self, scan_interval=2): + super().__init__(daemon=True) + self.scan_interval = scan_interval + self.last_state = {} + self.observer = None + + def create_table(self, conn): + c = conn.cursor() + c.execute('PRAGMA journal_mode=WAL;') + c.execute('''CREATE TABLE IF NOT EXISTS problems ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + folder TEXT, + description TEXT, + difficulty TEXT, + test_code TEXT + )''') + conn.commit() + + def scan(self): + problems = [] + if not PROBLEMS_DIR.exists(): + print(f"Problems directory does not exist: {PROBLEMS_DIR}") + return problems + + for folder in PROBLEMS_DIR.iterdir(): + if folder.is_dir(): + # Dynamically find manifest file (manifest.json or manifets.json) + manifest_path = None + for candidate in ["manifest.json", "manifets.json"]: + candidate_path = folder / candidate + if candidate_path.exists(): + manifest_path = candidate_path + break + + desc_path = folder / 'description.md' + test_path = folder / 'test.py' + + # Check if required files exist + if manifest_path and desc_path.exists() and test_path.exists(): + try: + with open(desc_path, 'r', encoding='utf-8') as f: + description = f.read() + with open(test_path, 'r', encoding='utf-8') as f: + test_code = f.read() + with open(manifest_path, 'r', encoding='utf-8') as f: + manifest = json.load(f) + + difficulty = manifest.get('difficulty', 'unknown') + + problems.append({ + 'folder': folder.name, + 'description': description, + 'test_code': test_code, + 'difficulty': difficulty + }) + print(f"[ INFO ]: Found problem: {folder.name} ; Difficulty: {difficulty}") + except Exception as e: + print(f"[ ERROR ]: Error reading problem files for {folder.name}: {e}") + else: + missing_files = [] + if not manifest_path: + missing_files.append("manifest.json/manifets.json") + if not desc_path.exists(): + missing_files.append("description.md") + if not test_path.exists(): + missing_files.append("test.py") + print(f"[ SKIP ]: Skipping {folder.name}: missing {', '.join(missing_files)}") + + print(f"[ INFO ]: Total problems found: {len(problems)}") + return problems + + def update_db(self, problems, retries=5): + for attempt in range(retries): + try: + conn = sqlite3.connect(DB_PATH, timeout=5) + c = conn.cursor() + c.execute('PRAGMA journal_mode=WAL;') + + # Clear existing problems + c.execute('DELETE FROM problems') + + # Insert new problems + for p in problems: + c.execute('''INSERT INTO problems + (folder, description, difficulty, test_code) + VALUES (?, ?, ?, ?)''', + (p['folder'], p['description'], p['difficulty'], p['test_code'])) + + conn.commit() + print(f"[ INFO ]: Updated database with {len(problems)} problems") + conn.close() + return + + except sqlite3.OperationalError as e: + if 'locked' in str(e).lower(): + wait_time = 0.2 + random.random() * 0.3 + print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})") + time.sleep(wait_time) + else: + print(f"[ ERROR ]: Database error: {e}") + raise + except Exception as e: + print(f"[ ERROR ]: Unexpected error updating database: {e}") + raise + + print('[ FATAL ERROR ]: Failed to update problems DB after several retries due to lock.') + + def rescan_and_update(self): + print("[ INFO ]: Scanning for problems...") + problems = self.scan() + self.update_db(problems) + + def run(self): + print("[ INFO ]: Starting problem scanner...") + + # Initial scan and table creation + try: + conn = sqlite3.connect(DB_PATH) + self.create_table(conn) + conn.close() + print("[ INFO ]: Database initialized") + except Exception as e: + print(f"[ FATAL ERROR ]: Failed to initialize database: {e}") + return + + # Initial scan + self.rescan_and_update() + + if WATCHDOG_AVAILABLE: + print("[ INFO ]: Using watchdog for file monitoring") + + class Handler(FileSystemEventHandler): + def __init__(self, scanner): + self.scanner = scanner + self.last_event_time = 0 + + def on_any_event(self, event): + # Debounce events to avoid too many rescans + now = time.time() + if now - self.last_event_time > 1: # Wait at least 1 second between rescans + self.last_event_time = now + print(f"[ FSINFO ]: File system event: {event.event_type} - {event.src_path}") + self.scanner.rescan_and_update() + + event_handler = Handler(self) + self.observer = Observer() + self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True) + self.observer.start() + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("[ KBINT_INFO ]: Stopping problem scanner...") + finally: + self.observer.stop() + self.observer.join() + else: + print(f"[ WARNING ]: Watchdog not available, using polling every {self.scan_interval}s") + # Fallback: poll every scan_interval seconds + try: + while True: + time.sleep(self.scan_interval) + self.rescan_and_update() + except KeyboardInterrupt: + print("[ KBINT_INFO ]: Stopping problem scanner...") + +def start_problem_scanner(): + scanner = ProblemScannerThread() + scanner.start() + return scanner + +# Flask model loading functions +def load_problems_from_json(json_path): + """Load problems from JSON file into Flask database""" + if not os.path.exists(json_path): + print(f"[ DEPRECATED_INFO ]: Problem JSON file not found: {json_path}") + print("[ SUGGESTION ]: If you dont have this do not worry. Use mainfest.json!") + return + + try: + with open(json_path, 'r', encoding='utf-8') as f: + problems = json.load(f) + except Exception as e: + print(f"[ ERROR ]: Error reading JSON file: {e}") + return + + # This assumes you have imported the necessary Flask/SQLAlchemy components + try: + from models import db, Problem + + for p in problems: + # Check if problem already exists by title + existing = Problem.query.filter_by(title=p['title']).first() + + # Load test code from solution file if provided + test_code = '' + if 'solution' in p and os.path.exists(p['solution']): + try: + with open(p['solution'], 'r', encoding='utf-8') as sf: + test_code = sf.read() + except Exception as e: + print(f"[ FATAL ERROR ]: Error reading solution file for {p['title']}: {e}") + + if existing: + existing.description = p['description'] + existing.test_code = test_code + print(f"[ INFO ]: Updated problem: {p['title']}") + else: + new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code) + db.session.add(new_problem) + print(f"[ SUCCESS ]: Added new problem: {p['title']}") + + db.session.commit() + print("[ SUCCESS ]: Successfully updated problems from JSON") + + except ImportError: + print("[ FATAL IMPORT ERROR ]: Flask models not available - skipping JSON load @execptImportError") + except Exception as e: + print(f"[ ERROR ]: Error loading problems from JSON: {e}") + +def schedule_problem_reload(app, json_path, interval_hours=10): + """Schedule periodic reloading of problems from JSON""" + def reload_loop(): + while True: + try: + with app.app_context(): + load_problems_from_json(json_path) + time.sleep(interval_hours * 3600) + except Exception as e: + print(f"[ FATAL ERROR ]: Error in problem reload loop: {e}") + time.sleep(60) # Wait 1 minute before retrying + + t = threading.Thread(target=reload_loop, daemon=True) + t.start() + +def run_code_against_tests(user_code, test_code, timeout=10): + """ + Execute user code against test code with proper error handling. + + Args: + user_code: The user's solution code + test_code: The test code to validate the solution + timeout: Maximum execution time in seconds + + Returns: + dict: Result with passed, output, runtime, and error fields + """ + if not user_code or not user_code.strip(): + return { + 'passed': False, + 'output': '', + 'runtime': 0, + 'error': 'No code provided' + } + + if not test_code or not test_code.strip(): + return { + 'passed': False, + 'output': '', + 'runtime': 0, + 'error': 'No test code available' + } + + start_time = time.perf_counter() + output = '' + error = None + passed = False + temp_file = None + + try: + # Check if unittest is used in test_code + if 'unittest' in test_code: + # Create temporary file with user code + test code + with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f: + # Combine user code and test code + combined_code = f"{user_code}\n\n{test_code}" + f.write(combined_code) + f.flush() + temp_file = f.name + + try: + # Run the file as a subprocess with timeout + proc = subprocess.run( + [sys.executable, temp_file], + capture_output=True, + text=True, + timeout=timeout, + encoding='utf-8' + ) + + output = proc.stdout + if proc.stderr: + output += f"\nSTDERR:\n{proc.stderr}" + + passed = proc.returncode == 0 + if not passed: + error = f"Tests failed. Return code: {proc.returncode}\n{output}" + + except subprocess.TimeoutExpired: + passed = False + error = f"Code execution timed out after {timeout} seconds" + output = "Execution timed out" + + else: + # Direct execution approach for simple assert-based tests + local_ns = {} + + # Capture stdout + old_stdout = sys.stdout + captured_output = io.StringIO() + sys.stdout = captured_output + + try: + # Execute user code first + exec(user_code, {}, local_ns) + + # Execute test code in the same namespace + exec(test_code, local_ns, local_ns) + + # If we get here without exceptions, tests passed + passed = True + + except AssertionError as e: + passed = False + error = f"Assertion failed: {str(e)}" + + except Exception as e: + passed = False + error = f"Runtime error: {traceback.format_exc()}" + + finally: + output = captured_output.getvalue() + sys.stdout = old_stdout + + except Exception as e: + passed = False + error = f"Execution error: {traceback.format_exc()}" + + finally: + # Clean up temporary file + if temp_file and os.path.exists(temp_file): + try: + os.unlink(temp_file) + except Exception as e: + print(f"[ FATAL WARNING ]: Could not delete temp file {temp_file}: {e}") + + runtime = time.perf_counter() - start_time + + result = { + 'passed': passed, + 'output': output.strip() if output else '', + 'runtime': runtime, + 'error': error if not passed else None + } + + print(f"[ TEST RESULT ]: passed={passed}, runtime={runtime:.3f}s") + if error: + print(f"Error: {error}") + + return result