md for database as it was fucked up in windows script

This commit is contained in:
2025-08-17 12:19:18 +02:00
parent b6ab591054
commit 8dd5fcbeb7
5 changed files with 556 additions and 543 deletions

View File

@@ -1,5 +1,7 @@
Flask>=3.0 Flask>=3.0
Flask-SQLAlchemy>=3.1 Flask-SQLAlchemy>=3.1
Markdown>=3.6 Markdown>=3.6
MarkupSafe>=2.1 MarkupSafe>=2.1
watchdog>=4.0 watchdog>=4.0
gunicorn>=23.0.0
waitress>=3.0.2

View File

@@ -1,17 +1,25 @@
u!/bin/bash #!/bin/bash
set -e # exit if any command fails set -e # exit if any command fails
# Ensure QPP/database directory exists # Ensure src/database directory exists
mkdir -p src/database mkdir -p src/database
python -m venv venv # Create virtual environment if it doesn't exist
source venv/bin/activate if [ ! -d "venv" ]; then
python -m venv venv
pip install --upgrade pip fi
pip install -r requirements.txt source venv/bin/activate
export FLASK_APP=src.app # Upgrade pip and install dependencies
export FLASK_ENV=production pip install --upgrade pip
pip install -r requirements.txt
flask run --host=0.0.0.0 --port=5000
# Export environment variables
export FLASK_APP=src.app
export FLASK_ENV=production
# Run with Gunicorn
echo "Starting Flask app with Gunicorn..."
exec gunicorn -w 4 -b 0.0.0.0:5000 src.app:app

View File

@@ -1 +1,3 @@
python -m flask --app .\src\app.py run --host=0.0.0.0 --port=5000 :: make db directory and then launch the server
md .\src\database
python -m waitress --listen=0.0.0.0:8000 src.app:app

View File

@@ -1,136 +1,136 @@
from markupsafe import Markup from markupsafe import Markup
from flask import Flask, render_template, request, redirect, url_for, send_from_directory from flask import Flask, render_template, request, redirect, url_for, send_from_directory
import markdown as md import markdown as md
import ast import ast
from src.models import db, Problem, Solution from src.models import db, Problem, Solution
from src.utils import run_code_against_tests from src.utils import run_code_against_tests
from src.leaderboard import create_leaderboard_table, log_leaderboard, get_leaderboard from src.leaderboard import create_leaderboard_table, log_leaderboard, get_leaderboard
import os import os
## from problem_loader import load_problems_from_json, schedule_problem_reload ## from problem_loader import load_problems_from_json, schedule_problem_reload
from src.problem_scanner import start_problem_scanner from src.problem_scanner import start_problem_scanner
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
app = Flask(__name__) app = Flask(__name__)
BASE_DIR = Path(__file__).parent BASE_DIR = Path(__file__).parent
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{BASE_DIR / 'database' / 'db.sqlite3'}" app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{BASE_DIR / 'database' / 'db.sqlite3'}"
print(f">>>>>>>>>>>>>>>>>>>>< Using database URI: {app.config['SQLALCHEMY_DATABASE_URI']}") print(f"[ INFO ] : Using database URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
db.init_app(app) db.init_app(app)
@app.before_request @app.before_request
def setup(): def setup():
db.create_all() db.create_all()
create_leaderboard_table() # Ensure leaderboard table exists create_leaderboard_table() # Ensure leaderboard table exists
# Problems are now loaded from manifests by the background scanner. No need to load problems.json. # Problems are now loaded from manifests by the background scanner. No need to load problems.json.
# Start the background thread to scan problems # Start the background thread to scan problems
start_problem_scanner() start_problem_scanner()
@app.route("/script.js") @app.route("/script.js")
def script(): def script():
return send_from_directory("templates", "script.js") return send_from_directory("templates", "script.js")
@app.route('/favicon.ico') @app.route('/favicon.ico')
def favicon(): def favicon():
return send_from_directory("templates", "favicon.ico") return send_from_directory("templates", "favicon.ico")
@app.route('/') @app.route('/')
def index(): def index():
db_path = Path(__file__).parent / 'database/problems.sqlite3' db_path = Path(__file__).parent / 'database/problems.sqlite3'
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
c = conn.cursor() c = conn.cursor()
#<!-- The query was fucked up so it fetched the fucking testcode --> #<!-- The query was fucked up so it fetched the fucking testcode -->
c.execute('SELECT folder, description, test_code, difficulty FROM problems') c.execute('SELECT folder, description, test_code, difficulty FROM problems')
problems = c.fetchall() problems = c.fetchall()
conn.close() conn.close()
# Get leaderboard entries # Get leaderboard entries
leaderboard = get_leaderboard() leaderboard = get_leaderboard()
# Map folder to title for display # Map folder to title for display
problem_titles = {folder: folder.replace('_', ' ').title() for folder, _, _, _ in problems} problem_titles = {folder: folder.replace('_', ' ').title() for folder, _, _, _ in problems}
return render_template('index.html', problems=problems, leaderboard=leaderboard, problem_titles=problem_titles) return render_template('index.html', problems=problems, leaderboard=leaderboard, problem_titles=problem_titles)
@app.route('/problem/new', methods=['GET', 'POST']) @app.route('/problem/new', methods=['GET', 'POST'])
def new_problem(): def new_problem():
if request.method == 'POST': if request.method == 'POST':
title = request.form['title'] title = request.form['title']
description = request.form['description'] description = request.form['description']
test_code = request.form['test_code'] test_code = request.form['test_code']
problem = Problem(title=title, description=description, test_code=test_code) problem = Problem(title=title, description=description, test_code=test_code)
db.session.add(problem) db.session.add(problem)
db.session.commit() db.session.commit()
return redirect(url_for('index')) return redirect(url_for('index'))
return render_template('new_problem.html') return render_template('new_problem.html')
@app.route('/problem/<folder>', methods=['GET', 'POST']) @app.route('/problem/<folder>', methods=['GET', 'POST'])
def view_problem(folder): def view_problem(folder):
db_path = Path(__file__).parent / 'database/problems.sqlite3' db_path = Path(__file__).parent / 'database/problems.sqlite3'
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
c = conn.cursor() c = conn.cursor()
c.execute('SELECT folder, description,test_code , difficulty FROM problems WHERE folder = ?', (folder,)) c.execute('SELECT folder, description,test_code , difficulty FROM problems WHERE folder = ?', (folder,))
row = c.fetchone() row = c.fetchone()
conn.close() conn.close()
if not row: if not row:
return 'Problem not found', 404 return 'Problem not found', 404
problem = { problem = {
'folder': row[0], 'folder': row[0],
'description': row[1], 'description': row[1],
'difficulty': row[3], # now correct 'difficulty': row[3], # now correct
'test_code': row[2], # now correct 'test_code': row[2], # now correct
} }
result = None result = None
if request.method == 'POST': if request.method == 'POST':
user_code = request.form['user_code'] user_code = request.form['user_code']
username = request.form.get('username', '').strip() or 'Anonymous' username = request.form.get('username', '').strip() or 'Anonymous'
import tracemalloc import tracemalloc
tracemalloc.start() tracemalloc.start()
run_result = run_code_against_tests(user_code, problem['test_code']) run_result = run_code_against_tests(user_code, problem['test_code'])
current, peak = tracemalloc.get_traced_memory() current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop() tracemalloc.stop()
memory_used = peak // 1024 # in KB memory_used = peak // 1024 # in KB
# Try to get the last line number executed (even for successful runs) # Try to get the last line number executed (even for successful runs)
line_number = None line_number = None
try: try:
tree = ast.parse(user_code) tree = ast.parse(user_code)
# Find the highest line number in the AST (for multi-function/user code) # Find the highest line number in the AST (for multi-function/user code)
def get_max_lineno(node): def get_max_lineno(node):
max_lineno = getattr(node, 'lineno', 0) max_lineno = getattr(node, 'lineno', 0)
for child in ast.iter_child_nodes(node): for child in ast.iter_child_nodes(node):
max_lineno = max(max_lineno, get_max_lineno(child)) max_lineno = max(max_lineno, get_max_lineno(child))
return max_lineno return max_lineno
line_number = get_max_lineno(tree) line_number = get_max_lineno(tree)
except Exception: except Exception:
pass pass
# If there was an error, try to get the error line number from the traceback # If there was an error, try to get the error line number from the traceback
if run_result['error']: if run_result['error']:
tb = run_result['error'] tb = run_result['error']
import traceback import traceback
try: try:
tb_lines = traceback.extract_tb(traceback.TracebackException.from_string(tb).stack) tb_lines = traceback.extract_tb(traceback.TracebackException.from_string(tb).stack)
if tb_lines: if tb_lines:
line_number = tb_lines[-1].lineno line_number = tb_lines[-1].lineno
except Exception: except Exception:
pass pass
# ONLY log to leaderboard if the solution passed all tests # ONLY log to leaderboard if the solution passed all tests
if run_result['passed']: if run_result['passed']:
log_leaderboard(username, problem['folder'], run_result['runtime'], memory_used, line_number) log_leaderboard(username, problem['folder'], run_result['runtime'], memory_used, line_number)
result = run_result result = run_result
return render_template('problem.html', problem=problem, result=result) return render_template('problem.html', problem=problem, result=result)
@app.template_filter('markdown') @app.template_filter('markdown')
def markdown_filter(text): def markdown_filter(text):
return Markup(md.markdown(text or '', extensions=['extra', 'sane_lists'])) return Markup(md.markdown(text or '', extensions=['extra', 'sane_lists']))
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=True) app.run(debug=True)

View File

@@ -1,384 +1,385 @@
import os import os
import time import time
import json import json
import sqlite3 import sqlite3
import threading import threading
import random import random
import tempfile import tempfile
import subprocess import subprocess
import sys import sys
import traceback import traceback
import io import io
from pathlib import Path from pathlib import Path
try: try:
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
WATCHDOG_AVAILABLE = True WATCHDOG_AVAILABLE = True
except ImportError: except ImportError:
WATCHDOG_AVAILABLE = False WATCHDOG_AVAILABLE = False
PROBLEMS_DIR = Path(__file__).parent / 'problems' PROBLEMS_DIR = Path(__file__).parent / 'problems'
DB_PATH = Path(__file__).parent / 'database/problems.sqlite3' DB_PATH = Path(__file__).parent / 'database/problems.sqlite3'
class ProblemScannerThread(threading.Thread): class ProblemScannerThread(threading.Thread):
def __init__(self, scan_interval=2): def __init__(self, scan_interval=2):
super().__init__(daemon=True) super().__init__(daemon=True)
self.scan_interval = scan_interval self.scan_interval = scan_interval
self.last_state = {} self.last_state = {}
self.observer = None self.observer = None
def create_table(self, conn): def create_table(self, conn):
c = conn.cursor() c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;') c.execute('PRAGMA journal_mode=WAL;')
c.execute('''CREATE TABLE IF NOT EXISTS problems ( c.execute('''CREATE TABLE IF NOT EXISTS problems (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
folder TEXT, folder TEXT,
description TEXT, description TEXT,
difficulty TEXT, difficulty TEXT,
test_code TEXT test_code TEXT
)''') )''')
conn.commit() conn.commit()
def scan(self): def scan(self):
problems = [] problems = []
if not PROBLEMS_DIR.exists(): if not PROBLEMS_DIR.exists():
print(f"Problems directory does not exist: {PROBLEMS_DIR}") print(f"Problems directory does not exist: {PROBLEMS_DIR}")
return problems return problems
for folder in PROBLEMS_DIR.iterdir(): for folder in PROBLEMS_DIR.iterdir():
if folder.is_dir(): if folder.is_dir():
# Dynamically find manifest file (manifest.json or manifets.json) # Dynamically find manifest file (manifest.json or manifets.json)
manifest_path = None manifest_path = None
for candidate in ["manifest.json", "manifets.json"]: for candidate in ["manifest.json", "manifets.json"]:
candidate_path = folder / candidate candidate_path = folder / candidate
if candidate_path.exists(): if candidate_path.exists():
manifest_path = candidate_path manifest_path = candidate_path
break break
desc_path = folder / 'description.md' desc_path = folder / 'description.md'
test_path = folder / 'test.py' test_path = folder / 'test.py'
# Check if required files exist # Check if required files exist
if manifest_path and desc_path.exists() and test_path.exists(): if manifest_path and desc_path.exists() and test_path.exists():
try: try:
with open(desc_path, 'r', encoding='utf-8') as f: with open(desc_path, 'r', encoding='utf-8') as f:
description = f.read() description = f.read()
with open(test_path, 'r', encoding='utf-8') as f: with open(test_path, 'r', encoding='utf-8') as f:
test_code = f.read() test_code = f.read()
with open(manifest_path, 'r', encoding='utf-8') as f: with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f) manifest = json.load(f)
difficulty = manifest.get('difficulty', 'unknown') difficulty = manifest.get('difficulty', 'unknown')
problems.append({ problems.append({
'folder': folder.name, 'folder': folder.name,
'description': description, 'description': description,
'test_code': test_code, 'test_code': test_code,
'difficulty': difficulty 'difficulty': difficulty
}) })
print(f"Found problem: {folder.name} ; Difficulty: {difficulty}") print(f"[ INFO ]: Found problem: {folder.name} ; Difficulty: {difficulty}")
except Exception as e: except Exception as e:
print(f"Error reading problem files for {folder.name}: {e}") print(f"[ ERROR ]: Error reading problem files for {folder.name}: {e}")
else: else:
missing_files = [] missing_files = []
if not manifest_path: if not manifest_path:
missing_files.append("manifest.json/manifets.json") missing_files.append("manifest.json/manifets.json")
if not desc_path.exists(): if not desc_path.exists():
missing_files.append("description.md") missing_files.append("description.md")
if not test_path.exists(): if not test_path.exists():
missing_files.append("test.py") missing_files.append("test.py")
print(f"Skipping {folder.name}: missing {', '.join(missing_files)}") print(f"[ SKIP ]: Skipping {folder.name}: missing {', '.join(missing_files)}")
print(f"Total problems found: {len(problems)}") print(f"[ INFO ]: Total problems found: {len(problems)}")
return problems return problems
def update_db(self, problems, retries=5): def update_db(self, problems, retries=5):
for attempt in range(retries): for attempt in range(retries):
try: try:
conn = sqlite3.connect(DB_PATH, timeout=5) conn = sqlite3.connect(DB_PATH, timeout=5)
c = conn.cursor() c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;') c.execute('PRAGMA journal_mode=WAL;')
# Clear existing problems # Clear existing problems
c.execute('DELETE FROM problems') c.execute('DELETE FROM problems')
# Insert new problems # Insert new problems
for p in problems: for p in problems:
c.execute('''INSERT INTO problems c.execute('''INSERT INTO problems
(folder, description, difficulty, test_code) (folder, description, difficulty, test_code)
VALUES (?, ?, ?, ?)''', VALUES (?, ?, ?, ?)''',
(p['folder'], p['description'], p['difficulty'], p['test_code'])) (p['folder'], p['description'], p['difficulty'], p['test_code']))
conn.commit() conn.commit()
print(f"Updated database with {len(problems)} problems") print(f"[ INFO ]: Updated database with {len(problems)} problems")
conn.close() conn.close()
return return
except sqlite3.OperationalError as e: except sqlite3.OperationalError as e:
if 'locked' in str(e).lower(): if 'locked' in str(e).lower():
wait_time = 0.2 + random.random() * 0.3 wait_time = 0.2 + random.random() * 0.3
print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})") print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})")
time.sleep(wait_time) time.sleep(wait_time)
else: else:
print(f"Database error: {e}") print(f"[ ERROR ]: Database error: {e}")
raise raise
except Exception as e: except Exception as e:
print(f"Unexpected error updating database: {e}") print(f"[ ERROR ]: Unexpected error updating database: {e}")
raise raise
print('Failed to update problems DB after several retries due to lock.') print('[ FATAL ERROR ]: Failed to update problems DB after several retries due to lock.')
def rescan_and_update(self): def rescan_and_update(self):
print("Scanning for problems...") print("[ INFO ]: Scanning for problems...")
problems = self.scan() problems = self.scan()
self.update_db(problems) self.update_db(problems)
def run(self): def run(self):
print("Starting problem scanner...") print("[ INFO ]: Starting problem scanner...")
# Initial scan and table creation # Initial scan and table creation
try: try:
conn = sqlite3.connect(DB_PATH) conn = sqlite3.connect(DB_PATH)
self.create_table(conn) self.create_table(conn)
conn.close() conn.close()
print("Database initialized") print("[ INFO ]: Database initialized")
except Exception as e: except Exception as e:
print(f"Failed to initialize database: {e}") print(f"[ FATAL ERROR ]: Failed to initialize database: {e}")
return return
# Initial scan # Initial scan
self.rescan_and_update() self.rescan_and_update()
if WATCHDOG_AVAILABLE: if WATCHDOG_AVAILABLE:
print("Using watchdog for file monitoring") print("[ INFO ]: Using watchdog for file monitoring")
class Handler(FileSystemEventHandler): class Handler(FileSystemEventHandler):
def __init__(self, scanner): def __init__(self, scanner):
self.scanner = scanner self.scanner = scanner
self.last_event_time = 0 self.last_event_time = 0
def on_any_event(self, event): def on_any_event(self, event):
# Debounce events to avoid too many rescans # Debounce events to avoid too many rescans
now = time.time() now = time.time()
if now - self.last_event_time > 1: # Wait at least 1 second between rescans if now - self.last_event_time > 1: # Wait at least 1 second between rescans
self.last_event_time = now self.last_event_time = now
print(f"File system event: {event.event_type} - {event.src_path}") print(f"[ FSINFO ]: File system event: {event.event_type} - {event.src_path}")
self.scanner.rescan_and_update() self.scanner.rescan_and_update()
event_handler = Handler(self) event_handler = Handler(self)
self.observer = Observer() self.observer = Observer()
self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True) self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True)
self.observer.start() self.observer.start()
try: try:
while True: while True:
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
print("Stopping problem scanner...") print("[ KBINT_INFO ]: Stopping problem scanner...")
finally: finally:
self.observer.stop() self.observer.stop()
self.observer.join() self.observer.join()
else: else:
print(f"Watchdog not available, using polling every {self.scan_interval}s") print(f"[ WARNING ]: Watchdog not available, using polling every {self.scan_interval}s")
# Fallback: poll every scan_interval seconds # Fallback: poll every scan_interval seconds
try: try:
while True: while True:
time.sleep(self.scan_interval) time.sleep(self.scan_interval)
self.rescan_and_update() self.rescan_and_update()
except KeyboardInterrupt: except KeyboardInterrupt:
print("Stopping problem scanner...") print("[ KBINT_INFO ]: Stopping problem scanner...")
def start_problem_scanner(): def start_problem_scanner():
scanner = ProblemScannerThread() scanner = ProblemScannerThread()
scanner.start() scanner.start()
return scanner return scanner
# Flask model loading functions # Flask model loading functions
def load_problems_from_json(json_path): def load_problems_from_json(json_path):
"""Load problems from JSON file into Flask database""" """Load problems from JSON file into Flask database"""
if not os.path.exists(json_path): if not os.path.exists(json_path):
print(f"Problem JSON file not found: {json_path}") print(f"[ DEPRECATED_INFO ]: Problem JSON file not found: {json_path}")
return print("[ SUGGESTION ]: If you dont have this do not worry. Use mainfest.json!")
return
try:
with open(json_path, 'r', encoding='utf-8') as f: try:
problems = json.load(f) with open(json_path, 'r', encoding='utf-8') as f:
except Exception as e: problems = json.load(f)
print(f"Error reading JSON file: {e}") except Exception as e:
return print(f"[ ERROR ]: Error reading JSON file: {e}")
return
# This assumes you have imported the necessary Flask/SQLAlchemy components
try: # This assumes you have imported the necessary Flask/SQLAlchemy components
from models import db, Problem try:
from models import db, Problem
for p in problems:
# Check if problem already exists by title for p in problems:
existing = Problem.query.filter_by(title=p['title']).first() # Check if problem already exists by title
existing = Problem.query.filter_by(title=p['title']).first()
# Load test code from solution file if provided
test_code = '' # Load test code from solution file if provided
if 'solution' in p and os.path.exists(p['solution']): test_code = ''
try: if 'solution' in p and os.path.exists(p['solution']):
with open(p['solution'], 'r', encoding='utf-8') as sf: try:
test_code = sf.read() with open(p['solution'], 'r', encoding='utf-8') as sf:
except Exception as e: test_code = sf.read()
print(f"Error reading solution file for {p['title']}: {e}") except Exception as e:
print(f"[ FATAL ERROR ]: Error reading solution file for {p['title']}: {e}")
if existing:
existing.description = p['description'] if existing:
existing.test_code = test_code existing.description = p['description']
print(f"Updated problem: {p['title']}") existing.test_code = test_code
else: print(f"[ INFO ]: Updated problem: {p['title']}")
new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code) else:
db.session.add(new_problem) new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code)
print(f"Added new problem: {p['title']}") db.session.add(new_problem)
print(f"[ SUCCESS ]: Added new problem: {p['title']}")
db.session.commit()
print("Successfully updated problems from JSON") db.session.commit()
print("[ SUCCESS ]: Successfully updated problems from JSON")
except ImportError:
print("Flask models not available - skipping JSON load") except ImportError:
except Exception as e: print("[ FATAL IMPORT ERROR ]: Flask models not available - skipping JSON load @execptImportError")
print(f"Error loading problems from JSON: {e}") except Exception as e:
print(f"[ ERROR ]: Error loading problems from JSON: {e}")
def schedule_problem_reload(app, json_path, interval_hours=10):
"""Schedule periodic reloading of problems from JSON""" def schedule_problem_reload(app, json_path, interval_hours=10):
def reload_loop(): """Schedule periodic reloading of problems from JSON"""
while True: def reload_loop():
try: while True:
with app.app_context(): try:
load_problems_from_json(json_path) with app.app_context():
time.sleep(interval_hours * 3600) load_problems_from_json(json_path)
except Exception as e: time.sleep(interval_hours * 3600)
print(f"Error in problem reload loop: {e}") except Exception as e:
time.sleep(60) # Wait 1 minute before retrying print(f"[ FATAL ERROR ]: Error in problem reload loop: {e}")
time.sleep(60) # Wait 1 minute before retrying
t = threading.Thread(target=reload_loop, daemon=True)
t.start() t = threading.Thread(target=reload_loop, daemon=True)
t.start()
def run_code_against_tests(user_code, test_code, timeout=10):
""" def run_code_against_tests(user_code, test_code, timeout=10):
Execute user code against test code with proper error handling. """
Execute user code against test code with proper error handling.
Args:
user_code: The user's solution code Args:
test_code: The test code to validate the solution user_code: The user's solution code
timeout: Maximum execution time in seconds test_code: The test code to validate the solution
timeout: Maximum execution time in seconds
Returns:
dict: Result with passed, output, runtime, and error fields Returns:
""" dict: Result with passed, output, runtime, and error fields
if not user_code or not user_code.strip(): """
return { if not user_code or not user_code.strip():
'passed': False, return {
'output': '', 'passed': False,
'runtime': 0, 'output': '',
'error': 'No code provided' 'runtime': 0,
} 'error': 'No code provided'
}
if not test_code or not test_code.strip():
return { if not test_code or not test_code.strip():
'passed': False, return {
'output': '', 'passed': False,
'runtime': 0, 'output': '',
'error': 'No test code available' 'runtime': 0,
} 'error': 'No test code available'
}
start_time = time.perf_counter()
output = '' start_time = time.perf_counter()
error = None output = ''
passed = False error = None
temp_file = None passed = False
temp_file = None
try:
# Check if unittest is used in test_code try:
if 'unittest' in test_code: # Check if unittest is used in test_code
# Create temporary file with user code + test code if 'unittest' in test_code:
with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f: # Create temporary file with user code + test code
# Combine user code and test code with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f:
combined_code = f"{user_code}\n\n{test_code}" # Combine user code and test code
f.write(combined_code) combined_code = f"{user_code}\n\n{test_code}"
f.flush() f.write(combined_code)
temp_file = f.name f.flush()
temp_file = f.name
try:
# Run the file as a subprocess with timeout try:
proc = subprocess.run( # Run the file as a subprocess with timeout
[sys.executable, temp_file], proc = subprocess.run(
capture_output=True, [sys.executable, temp_file],
text=True, capture_output=True,
timeout=timeout, text=True,
encoding='utf-8' timeout=timeout,
) encoding='utf-8'
)
output = proc.stdout
if proc.stderr: output = proc.stdout
output += f"\nSTDERR:\n{proc.stderr}" if proc.stderr:
output += f"\nSTDERR:\n{proc.stderr}"
passed = proc.returncode == 0
if not passed: passed = proc.returncode == 0
error = f"Tests failed. Return code: {proc.returncode}\n{output}" if not passed:
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
except subprocess.TimeoutExpired:
passed = False except subprocess.TimeoutExpired:
error = f"Code execution timed out after {timeout} seconds" passed = False
output = "Execution timed out" error = f"Code execution timed out after {timeout} seconds"
output = "Execution timed out"
else:
# Direct execution approach for simple assert-based tests else:
local_ns = {} # Direct execution approach for simple assert-based tests
local_ns = {}
# Capture stdout
old_stdout = sys.stdout # Capture stdout
captured_output = io.StringIO() old_stdout = sys.stdout
sys.stdout = captured_output captured_output = io.StringIO()
sys.stdout = captured_output
try:
# Execute user code first try:
exec(user_code, {}, local_ns) # Execute user code first
exec(user_code, {}, local_ns)
# Execute test code in the same namespace
exec(test_code, local_ns, local_ns) # Execute test code in the same namespace
exec(test_code, local_ns, local_ns)
# If we get here without exceptions, tests passed
passed = True # If we get here without exceptions, tests passed
passed = True
except AssertionError as e:
passed = False except AssertionError as e:
error = f"Assertion failed: {str(e)}" passed = False
error = f"Assertion failed: {str(e)}"
except Exception as e:
passed = False except Exception as e:
error = f"Runtime error: {traceback.format_exc()}" passed = False
error = f"Runtime error: {traceback.format_exc()}"
finally:
output = captured_output.getvalue() finally:
sys.stdout = old_stdout output = captured_output.getvalue()
sys.stdout = old_stdout
except Exception as e:
passed = False except Exception as e:
error = f"Execution error: {traceback.format_exc()}" passed = False
error = f"Execution error: {traceback.format_exc()}"
finally:
# Clean up temporary file finally:
if temp_file and os.path.exists(temp_file): # Clean up temporary file
try: if temp_file and os.path.exists(temp_file):
os.unlink(temp_file) try:
except Exception as e: os.unlink(temp_file)
print(f"Warning: Could not delete temp file {temp_file}: {e}") except Exception as e:
print(f"[ FATAL WARNING ]: Could not delete temp file {temp_file}: {e}")
runtime = time.perf_counter() - start_time
runtime = time.perf_counter() - start_time
result = {
'passed': passed, result = {
'output': output.strip() if output else '', 'passed': passed,
'runtime': runtime, 'output': output.strip() if output else '',
'error': error if not passed else None 'runtime': runtime,
} 'error': error if not passed else None
}
print(f"Test execution result: passed={passed}, runtime={runtime:.3f}s")
if error: print(f"[ TEST RESULT ]: passed={passed}, runtime={runtime:.3f}s")
print(f"Error: {error}") if error:
print(f"Error: {error}")
return result
return result