diff --git a/app.py b/app.py
index 03d206f..7676b6e 100644
--- a/app.py
+++ b/app.py
@@ -1,5 +1,5 @@
from markupsafe import Markup
-from flask import Flask, render_template, request, redirect, url_for
+from flask import Flask, render_template, request, redirect, url_for, send_from_directory
import markdown as md
from models import db, Problem, Solution
@@ -28,6 +28,14 @@ def setup():
# Start the background thread to scan problems
start_problem_scanner()
+@app.route("/script.js")
+def script():
+ return send_from_directory("templates", "script.js")
+
+@app.route('/favicon.ico')
+def favicon():
+ return send_from_directory("templates", "favicon", "favicon.ico")
+
@app.route('/')
def index():
db_path = Path(__file__).parent / 'problems.sqlite3'
diff --git a/problem_scanner.py b/problem_scanner.py
index e557857..4c6460d 100644
--- a/problem_scanner.py
+++ b/problem_scanner.py
@@ -4,8 +4,12 @@ import json
import sqlite3
import threading
import random
-from pathlib import Path
+import tempfile
+import subprocess
import sys
+import traceback
+import io
+from pathlib import Path
try:
from watchdog.observers import Observer
@@ -37,6 +41,10 @@ class ProblemScannerThread(threading.Thread):
def scan(self):
problems = []
+ if not PROBLEMS_DIR.exists():
+ print(f"Problems directory does not exist: {PROBLEMS_DIR}")
+ return problems
+
for folder in PROBLEMS_DIR.iterdir():
if folder.is_dir():
# Dynamically find manifest file (manifest.json or manifets.json)
@@ -46,73 +54,323 @@ class ProblemScannerThread(threading.Thread):
if candidate_path.exists():
manifest_path = candidate_path
break
+
desc_path = folder / 'description.md'
test_path = folder / 'test.py'
- if manifest_path and test_path.exists():
- with open(desc_path, 'r') as f:
- description = f.read()
- with open(test_path, 'r') as f:
- test_code = f.read()
- problems.append({
- 'folder': folder.name,
- 'description': description,
- 'test_code': test_code
- })
+
+ # Check if required files exist
+ if manifest_path and desc_path.exists() and test_path.exists():
+ try:
+ with open(desc_path, 'r', encoding='utf-8') as f:
+ description = f.read()
+ with open(test_path, 'r', encoding='utf-8') as f:
+ test_code = f.read()
+
+ problems.append({
+ 'folder': folder.name,
+ 'description': description,
+ 'test_code': test_code
+ })
+ print(f"Found problem: {folder.name}")
+ except Exception as e:
+ print(f"Error reading problem files for {folder.name}: {e}")
+ else:
+ missing_files = []
+ if not manifest_path:
+ missing_files.append("manifest.json/manifets.json")
+ if not desc_path.exists():
+ missing_files.append("description.md")
+ if not test_path.exists():
+ missing_files.append("test.py")
+ print(f"Skipping {folder.name}: missing {', '.join(missing_files)}")
+
+ print(f"Total problems found: {len(problems)}")
return problems
def update_db(self, problems, retries=5):
for attempt in range(retries):
try:
- conn = sqlite3.connect(DB_PATH, timeout=2)
+ conn = sqlite3.connect(DB_PATH, timeout=5)
c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;')
+
+ # Clear existing problems
c.execute('DELETE FROM problems')
+
+ # Insert new problems
for p in problems:
c.execute('INSERT INTO problems (folder, description, test_code) VALUES (?, ?, ?)',
(p['folder'], p['description'], p['test_code']))
+
conn.commit()
+ print(f"Updated database with {len(problems)} problems")
conn.close()
return
+
except sqlite3.OperationalError as e:
- if 'locked' in str(e):
- time.sleep(0.2 + random.random() * 0.3)
+ if 'locked' in str(e).lower():
+ wait_time = 0.2 + random.random() * 0.3
+ print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})")
+ time.sleep(wait_time)
else:
+ print(f"Database error: {e}")
raise
+ except Exception as e:
+ print(f"Unexpected error updating database: {e}")
+ raise
+
print('Failed to update problems DB after several retries due to lock.')
def rescan_and_update(self):
+ print("Scanning for problems...")
problems = self.scan()
self.update_db(problems)
def run(self):
+ print("Starting problem scanner...")
+
# Initial scan and table creation
- conn = sqlite3.connect(DB_PATH)
- self.create_table(conn)
- conn.close()
+ try:
+ conn = sqlite3.connect(DB_PATH)
+ self.create_table(conn)
+ conn.close()
+ print("Database initialized")
+ except Exception as e:
+ print(f"Failed to initialize database: {e}")
+ return
+
+ # Initial scan
self.rescan_and_update()
+
if WATCHDOG_AVAILABLE:
+ print("Using watchdog for file monitoring")
+
class Handler(FileSystemEventHandler):
def __init__(self, scanner):
self.scanner = scanner
+ self.last_event_time = 0
+
def on_any_event(self, event):
- self.scanner.rescan_and_update()
+ # Debounce events to avoid too many rescans
+ now = time.time()
+ if now - self.last_event_time > 1: # Wait at least 1 second between rescans
+ self.last_event_time = now
+ print(f"File system event: {event.event_type} - {event.src_path}")
+ self.scanner.rescan_and_update()
+
event_handler = Handler(self)
self.observer = Observer()
self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True)
self.observer.start()
+
try:
while True:
time.sleep(1)
+ except KeyboardInterrupt:
+ print("Stopping problem scanner...")
finally:
self.observer.stop()
self.observer.join()
else:
+ print(f"Watchdog not available, using polling every {self.scan_interval}s")
# Fallback: poll every scan_interval seconds
- while True:
- time.sleep(self.scan_interval)
- self.rescan_and_update()
+ try:
+ while True:
+ time.sleep(self.scan_interval)
+ self.rescan_and_update()
+ except KeyboardInterrupt:
+ print("Stopping problem scanner...")
def start_problem_scanner():
scanner = ProblemScannerThread()
scanner.start()
return scanner
+
+# Flask model loading functions
+def load_problems_from_json(json_path):
+ """Load problems from JSON file into Flask database"""
+ if not os.path.exists(json_path):
+ print(f"Problem JSON file not found: {json_path}")
+ return
+
+ try:
+ with open(json_path, 'r', encoding='utf-8') as f:
+ problems = json.load(f)
+ except Exception as e:
+ print(f"Error reading JSON file: {e}")
+ return
+
+ # This assumes you have imported the necessary Flask/SQLAlchemy components
+ try:
+ from models import db, Problem
+
+ for p in problems:
+ # Check if problem already exists by title
+ existing = Problem.query.filter_by(title=p['title']).first()
+
+ # Load test code from solution file if provided
+ test_code = ''
+ if 'solution' in p and os.path.exists(p['solution']):
+ try:
+ with open(p['solution'], 'r', encoding='utf-8') as sf:
+ test_code = sf.read()
+ except Exception as e:
+ print(f"Error reading solution file for {p['title']}: {e}")
+
+ if existing:
+ existing.description = p['description']
+ existing.test_code = test_code
+ print(f"Updated problem: {p['title']}")
+ else:
+ new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code)
+ db.session.add(new_problem)
+ print(f"Added new problem: {p['title']}")
+
+ db.session.commit()
+ print("Successfully updated problems from JSON")
+
+ except ImportError:
+ print("Flask models not available - skipping JSON load")
+ except Exception as e:
+ print(f"Error loading problems from JSON: {e}")
+
+def schedule_problem_reload(app, json_path, interval_hours=10):
+ """Schedule periodic reloading of problems from JSON"""
+ def reload_loop():
+ while True:
+ try:
+ with app.app_context():
+ load_problems_from_json(json_path)
+ time.sleep(interval_hours * 3600)
+ except Exception as e:
+ print(f"Error in problem reload loop: {e}")
+ time.sleep(60) # Wait 1 minute before retrying
+
+ t = threading.Thread(target=reload_loop, daemon=True)
+ t.start()
+
+def run_code_against_tests(user_code, test_code, timeout=10):
+ """
+ Execute user code against test code with proper error handling.
+
+ Args:
+ user_code: The user's solution code
+ test_code: The test code to validate the solution
+ timeout: Maximum execution time in seconds
+
+ Returns:
+ dict: Result with passed, output, runtime, and error fields
+ """
+ if not user_code or not user_code.strip():
+ return {
+ 'passed': False,
+ 'output': '',
+ 'runtime': 0,
+ 'error': 'No code provided'
+ }
+
+ if not test_code or not test_code.strip():
+ return {
+ 'passed': False,
+ 'output': '',
+ 'runtime': 0,
+ 'error': 'No test code available'
+ }
+
+ start_time = time.perf_counter()
+ output = ''
+ error = None
+ passed = False
+ temp_file = None
+
+ try:
+ # Check if unittest is used in test_code
+ if 'unittest' in test_code:
+ # Create temporary file with user code + test code
+ with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f:
+ # Combine user code and test code
+ combined_code = f"{user_code}\n\n{test_code}"
+ f.write(combined_code)
+ f.flush()
+ temp_file = f.name
+
+ try:
+ # Run the file as a subprocess with timeout
+ proc = subprocess.run(
+ [sys.executable, temp_file],
+ capture_output=True,
+ text=True,
+ timeout=timeout,
+ encoding='utf-8'
+ )
+
+ output = proc.stdout
+ if proc.stderr:
+ output += f"\nSTDERR:\n{proc.stderr}"
+
+ passed = proc.returncode == 0
+ if not passed:
+ error = f"Tests failed. Return code: {proc.returncode}\n{output}"
+
+ except subprocess.TimeoutExpired:
+ passed = False
+ error = f"Code execution timed out after {timeout} seconds"
+ output = "Execution timed out"
+
+ else:
+ # Direct execution approach for simple assert-based tests
+ local_ns = {}
+
+ # Capture stdout
+ old_stdout = sys.stdout
+ captured_output = io.StringIO()
+ sys.stdout = captured_output
+
+ try:
+ # Execute user code first
+ exec(user_code, {}, local_ns)
+
+ # Execute test code in the same namespace
+ exec(test_code, local_ns, local_ns)
+
+ # If we get here without exceptions, tests passed
+ passed = True
+
+ except AssertionError as e:
+ passed = False
+ error = f"Assertion failed: {str(e)}"
+
+ except Exception as e:
+ passed = False
+ error = f"Runtime error: {traceback.format_exc()}"
+
+ finally:
+ output = captured_output.getvalue()
+ sys.stdout = old_stdout
+
+ except Exception as e:
+ passed = False
+ error = f"Execution error: {traceback.format_exc()}"
+
+ finally:
+ # Clean up temporary file
+ if temp_file and os.path.exists(temp_file):
+ try:
+ os.unlink(temp_file)
+ except Exception as e:
+ print(f"Warning: Could not delete temp file {temp_file}: {e}")
+
+ runtime = time.perf_counter() - start_time
+
+ result = {
+ 'passed': passed,
+ 'output': output.strip() if output else '',
+ 'runtime': runtime,
+ 'error': error if not passed else None
+ }
+
+ print(f"Test execution result: passed={passed}, runtime={runtime:.3f}s")
+ if error:
+ print(f"Error: {error}")
+
+ return result
diff --git a/problems/reversedstring/test.py b/problems/reversedstring/test.py
index 413de68..da68ff1 100644
--- a/problems/reversedstring/test.py
+++ b/problems/reversedstring/test.py
@@ -1,19 +1,26 @@
import unittest
-#
-def revstring(x):
- return x[::-1]
-
-#
class TestSolution(unittest.TestCase):
def test_simple(self):
- x = "Hello World"
- self.assertEqual(revstring(x), "dlroW olleH") # Test simple string reversal
- self.assertEqual(revstring(""), "") # Test empty string
- self.assertEqual(revstring("a"), "a") # Test single character
- self.assertEqual(revstring("racecar"), "racecar") # Test palindrome
- self.assertEqual(revstring("12345"), "54321") # Test numbers as string
- self.assertEqual(revstring("!@# $%"), "%$ #@!") # Test special chars and spaces
+ test_cases = [
+ ("Hello World", "dlroW olleH"),
+ ("", ""),
+ ("a", "a"),
+ ("racecar", "racecar"),
+ ("12345", "54321"),
+ ("!@# $%", "%$ #@!")
+ ]
+
+ print("\n=== Function Output Test Results ===")
+ for input_val, expected in test_cases:
+ try:
+ actual = revstring(input_val) # pyright: ignore[reportUndefinedVariable]
+ status = "✓ PASS" if actual == expected else "✗ FAIL"
+ print(f"{status} | Input: '{input_val}' -> Got: '{actual}' | Expected: '{expected}'")
+ self.assertEqual(actual, expected)
+ except Exception as e:
+ print(f"✗ ERROR | Input: '{input_val}' -> Exception: {e}")
+ raise
if __name__ == "__main__":
- unittest.main()
\ No newline at end of file
+ unittest.main(verbosity=2)
\ No newline at end of file
diff --git a/problems/sortlist/test.py b/problems/sortlist/test.py
index 3e32f1c..2c21403 100644
--- a/problems/sortlist/test.py
+++ b/problems/sortlist/test.py
@@ -2,15 +2,16 @@ import unittest
# This is the function the user is expected to write.
# Its a really simple one, the user can choose not to type tho.
-def sortlist(lst = [4,3,2,1]) -> list:
- return sorted(lst)
+
+#def sortlist(lst = [4,3,2,1]) -> list:
+ #return sorted(lst)
class TestSolution(unittest.TestCase):
def test_sort(self):
# define x as a empty array.
# this will be used for the functiun ; a empty var does not work.
self.x = []
- self.assertEqual(sortlist(self.x), sorted(self.x)) ## sort
+ self.assertEqual(sortlist(self.x), sorted(self.x)) # pyright: ignore[reportUndefinedVariable]
if __name__ == "__main__":
unittest.main()
\ No newline at end of file
diff --git a/static/index.css b/static/index.css
new file mode 100644
index 0000000..5e1af31
--- /dev/null
+++ b/static/index.css
@@ -0,0 +1,160 @@
+:root {
+ --bg: #f6f8fb;
+ --card: #fff;
+ --muted: #6b7280;
+ --accent: #2563eb;
+ --shadow: 0 4px 12px rgba(16, 24, 40, 0.06);
+ --radius: 8px;
+ --mono: 'JetBrains Mono', monospace;
+}
+* { box-sizing: border-box; margin: 0; padding: 0; }
+html, body {
+ height: 100%;
+}
+body {
+ font-family: Inter, sans-serif;
+ background: var(--bg);
+ color: #0f172a;
+ padding: 16px;
+ display: flex;
+ justify-content: center;
+ align-items: center;
+}
+.wrap {
+ width: 100%;
+ max-width: 1100px;
+}
+header {
+ margin-bottom: 14px;
+}
+header h1 {
+ font-size: 1.6rem;
+ color: #111827;
+}
+header p {
+ color: var(--muted);
+ font-size: 0.9rem;
+}
+.content {
+ display: grid;
+ grid-template-columns: 1fr 1fr;
+ gap: 12px;
+}
+.content.single-column {
+ grid-template-columns: 1fr;
+}
+.card {
+ background: var(--card);
+ border-radius: var(--radius);
+ box-shadow: var(--shadow);
+ padding: 12px;
+}
+/* Search/filter controls */
+.search-controls {
+ margin-bottom: 12px;
+ display: flex;
+ gap: 8px;
+}
+.search-input {
+ flex: 1;
+ padding: 6px 10px;
+ border: 1px solid #e5e7eb;
+ border-radius: 4px;
+ font-size: 0.9rem;
+}
+.filter-select {
+ padding: 6px 8px;
+ border: 1px solid #e5e7eb;
+ border-radius: 4px;
+ font-size: 0.9rem;
+ background: white;
+}
+/* Problems list */
+.problems-list .problem-item {
+ padding: 8px;
+ border-bottom: 1px solid #e5e7eb;
+}
+.problem-item:last-child {
+ border-bottom: none;
+}
+.problem-item a {
+ text-decoration: none;
+ color: #0077ff;
+ font-weight: 600;
+}
+/* Leaderboard */
+.leaderboard-head {
+ display: flex;
+ justify-content: space-between;
+ align-items: center;
+ margin-bottom: 6px;
+}
+.leaderboard-controls {
+ display: flex;
+ gap: 8px;
+ margin-bottom: 12px;
+}
+.leaderboard-table {
+ width: 100%;
+ border-collapse: collapse;
+ font-size: 0.9rem;
+}
+.leaderboard-table th,
+.leaderboard-table td {
+ padding: 6px 8px;
+ border-bottom: 1px solid #e5e7eb;
+ text-align: left;
+}
+.leaderboard-table th {
+ background: #f9fafb;
+ font-weight: 600;
+ color: var(--muted);
+}
+.leaderboard-table tr:hover {
+ background: #f3f4f6;
+}
+/* Sort indicators */
+.sortable {
+ cursor: pointer;
+ position: relative;
+ padding-right: 16px;
+}
+.sortable::after {
+ content: "↕";
+ position: absolute;
+ right: 4px;
+ top: 50%;
+ transform: translateY(-50%);
+ font-size: 0.8em;
+ opacity: 0.5;
+}
+.sort-asc::after {
+ content: "↑";
+ opacity: 1;
+}
+.sort-desc::after {
+ content: "↓";
+ opacity: 1;
+}
+/* Toggle button */
+.btn {
+ border: none;
+ background: transparent;
+ cursor: pointer;
+ color: var(--accent);
+ font-size: 0.85rem;
+ padding: 4px 6px;
+ border-radius: 4px;
+}
+.btn:hover {
+ background: rgba(37, 99, 235, 0.08);
+}
+.btn.active {
+ background: rgba(37, 99, 235, 0.15);
+}
+@media (max-width: 800px) {
+ .content { grid-template-columns: 1fr; }
+ .leaderboard-controls {
+ flex-direction: column;
+ }
+}
\ No newline at end of file
diff --git a/templates/favicon/favicon.ico b/templates/favicon/favicon.ico
new file mode 100644
index 0000000..1e4fe17
Binary files /dev/null and b/templates/favicon/favicon.ico differ
diff --git a/templates/index.html b/templates/index.html
index 173f5b5..cb2df4b 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -1,101 +1,102 @@
-
+
-
-
- Quick Problem Platform
-
-
+
+
+ Quick Problem Platform
+
+
+
+
+
+
Quick Problem Platform
-
-
+
+
+
+
+
+
+
+ Problems
+
+ {% for folder, description, test_code in problems %}
+
+ {% else %}
+
No problems yet.
+ {% endfor %}
+
-
+
diff --git a/templates/script.js b/templates/script.js
new file mode 100644
index 0000000..9e45856
--- /dev/null
+++ b/templates/script.js
@@ -0,0 +1,152 @@
+// Toggle leaderboard visibility
+const toggleBtn = document.getElementById('toggleLeaderboard');
+const leaderboardSection = document.getElementById('leaderboardSection');
+const contentContainer = document.getElementById('contentContainer');
+
+toggleBtn.addEventListener('click', () => {
+if (leaderboardSection.style.display === 'none') {
+ leaderboardSection.style.display = '';
+ toggleBtn.textContent = 'Hide';
+ contentContainer.classList.remove('single-column');
+} else {
+ leaderboardSection.style.display = 'none';
+ toggleBtn.textContent = 'Show';
+ contentContainer.classList.add('single-column');
+}
+});
+
+// Problem search functionality
+const problemSearch = document.getElementById('problemSearch');
+const problemsContainer = document.getElementById('problemsContainer');
+const problemItems = problemsContainer.querySelectorAll('.problem-item');
+
+problemSearch.addEventListener('input', () => {
+const searchTerm = problemSearch.value.toLowerCase();
+problemItems.forEach(item => {
+ const name = item.dataset.name.toLowerCase();
+ const desc = item.dataset.desc?.toLowerCase() || '';
+ if (name.includes(searchTerm) || desc.includes(searchTerm)) {
+ item.style.display = '';
+ } else {
+ item.style.display = 'none';
+ }
+});
+});
+
+// Leaderboard filtering and sorting
+const userSearch = document.getElementById('userSearch');
+const problemFilter = document.getElementById('problemFilter');
+const runtimeFilter = document.getElementById('runtimeFilter');
+const leaderboardBody = document.getElementById('leaderboardBody');
+const leaderboardRows = Array.from(leaderboardBody.querySelectorAll('tr'));
+const sortableHeaders = document.querySelectorAll('.sortable');
+
+// Current sort state
+let currentSort = {
+column: null,
+direction: 'asc'
+};
+
+// Filter leaderboard
+function filterLeaderboard() {
+const userTerm = userSearch.value.toLowerCase();
+const problemTerm = problemFilter.value.toLowerCase();
+const runtimeType = runtimeFilter.value;
+
+leaderboardRows.forEach(row => {
+ const user = row.dataset.user.toLowerCase();
+ const problem = row.dataset.problem.toLowerCase();
+ const runtime = parseFloat(row.dataset.runtime);
+ const showUser = user.includes(userTerm);
+ const showProblem = problem.includes(problemTerm);
+
+ let showRuntime = true;
+ if (runtimeType === 'best') {
+ // Find if this is the best runtime for this user+problem combo
+ const userProblemRows = leaderboardRows.filter(r =>
+ r.dataset.user === row.dataset.user &&
+ r.dataset.problem === row.dataset.problem
+ );
+ const bestRuntime = Math.min(...userProblemRows.map(r => parseFloat(r.dataset.runtime)));
+ showRuntime = runtime === bestRuntime;
+ } else if (runtimeType === 'worst') {
+ // Find if this is the worst runtime for this user+problem combo
+ const userProblemRows = leaderboardRows.filter(r =>
+ r.dataset.user === row.dataset.user &&
+ r.dataset.problem === row.dataset.problem
+ );
+ const worstRuntime = Math.max(...userProblemRows.map(r => parseFloat(r.dataset.runtime)));
+ showRuntime = runtime === worstRuntime;
+ }
+
+ if (showUser && showProblem && showRuntime) {
+ row.style.display = '';
+ } else {
+ row.style.display = 'none';
+ }
+});
+}
+
+// Sort leaderboard
+function sortLeaderboard(column, direction) {
+const rows = Array.from(leaderboardBody.querySelectorAll('tr'));
+const index = Array.from(document.querySelectorAll('th')).findIndex(th => th.dataset.sort === column);
+
+rows.sort((a, b) => {
+ let aValue = a.cells[index].textContent;
+ let bValue = b.cells[index].textContent;
+
+ // Special handling for numeric columns
+ if (column === 'runtime' || column === 'memory' || column === 'rank') {
+ aValue = parseFloat(aValue) || 0;
+ bValue = parseFloat(bValue) || 0;
+ return direction === 'asc' ? aValue - bValue : bValue - aValue;
+ }
+
+ // Special handling for timestamps
+ if (column === 'timestamp') {
+ aValue = new Date(aValue).getTime();
+ bValue = new Date(bValue).getTime();
+ return direction === 'asc' ? aValue - bValue : bValue - aValue;
+ }
+
+ // Default string comparison
+ aValue = aValue.toLowerCase();
+ bValue = bValue.toLowerCase();
+ if (aValue < bValue) return direction === 'asc' ? -1 : 1;
+ if (aValue > bValue) return direction === 'asc' ? 1 : -1;
+ return 0;
+});
+
+// Re-append rows in sorted order
+rows.forEach(row => leaderboardBody.appendChild(row));
+}
+
+// Set up event listeners
+userSearch.addEventListener('input', filterLeaderboard);
+problemFilter.addEventListener('input', filterLeaderboard);
+runtimeFilter.addEventListener('change', filterLeaderboard);
+
+// Set up sorting
+sortableHeaders.forEach(header => {
+header.addEventListener('click', () => {
+ const column = header.dataset.sort;
+
+ // Reset all sort indicators
+ sortableHeaders.forEach(h => {
+ h.classList.remove('sort-asc', 'sort-desc');
+ });
+
+ // Determine new sort direction
+ if (currentSort.column === column) {
+ currentSort.direction = currentSort.direction === 'asc' ? 'desc' : 'asc';
+ } else {
+ currentSort.column = column;
+ currentSort.direction = 'asc';
+ }
+
+ // Apply new sort
+ header.classList.add(`sort-${currentSort.direction}`);
+ sortLeaderboard(column, currentSort.direction);
+});
+});
\ No newline at end of file
diff --git a/utils.py b/utils.py
index 1437d67..cf20dc8 100644
--- a/utils.py
+++ b/utils.py
@@ -1,51 +1,99 @@
-
import sys
import traceback
import time
import io
+import tempfile
+import subprocess
+import os
def run_code_against_tests(user_code, test_code):
- import tempfile
- import subprocess
local_ns = {}
output = ''
start = time.perf_counter()
error = None
passed = False
+ temp_file = None
+
try:
# Check if unittest is used in test_code
if 'unittest' in test_code:
# Write user code + test code to a temp file
- with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False) as f:
- f.write(user_code + '\n' + test_code)
+ with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f:
+ combined_code = f"{user_code}\n\n{test_code}"
+ f.write(combined_code)
f.flush()
- f_name = f.name
+ temp_file = f.name
+
# Run the file as a subprocess
- proc = subprocess.run([sys.executable, f_name], capture_output=True, text=True, timeout=10)
- output = proc.stdout + proc.stderr
- passed = proc.returncode == 0
- error = None if passed else output
+ try:
+ proc = subprocess.run(
+ [sys.executable, temp_file],
+ capture_output=True,
+ text=True,
+ timeout=10,
+ encoding='utf-8'
+ )
+ output = proc.stdout
+ if proc.stderr:
+ output += f"\n{proc.stderr}"
+
+ passed = proc.returncode == 0
+ if not passed:
+ error = f"Tests failed. Return code: {proc.returncode}\n{output}"
+ else:
+ # For successful unittest runs, the stderr contains the test results
+ if proc.stderr and "OK" in proc.stderr:
+ output = proc.stderr # Use stderr as the main output for unittest
+
+ except subprocess.TimeoutExpired:
+ passed = False
+ error = "Code execution timed out after 10 seconds"
+ output = "Execution timed out"
+
else:
# Capture stdout
old_stdout = sys.stdout
- sys.stdout = mystdout = io.StringIO()
- # Execute user code
- exec(user_code, {}, local_ns)
- # Execute test code (should raise AssertionError if fail)
- exec(test_code, local_ns, local_ns)
- passed = True
+ captured_output = io.StringIO()
+ sys.stdout = captured_output
+
+ try:
+ # Execute user code
+ exec(user_code, {}, local_ns)
+ # Execute test code (should raise AssertionError if fail)
+ exec(test_code, local_ns, local_ns)
+ passed = True
+
+ except AssertionError as e:
+ passed = False
+ error = f"Assertion failed: {str(e)}"
+
+ except Exception as e:
+ passed = False
+ error = f"Runtime error: {traceback.format_exc()}"
+
+ finally:
+ output = captured_output.getvalue()
+ sys.stdout = old_stdout
+
except Exception as e:
passed = False
- error = traceback.format_exc()
+ error = f"Execution error: {traceback.format_exc()}"
+
finally:
- if 'mystdout' in locals():
- output = mystdout.getvalue() or output
- sys.stdout = old_stdout if 'old_stdout' in locals() else sys.stdout
+ # Clean up temporary file
+ if temp_file and os.path.exists(temp_file):
+ try:
+ os.unlink(temp_file)
+ except Exception as e:
+ print(f"Warning: Could not delete temp file {temp_file}: {e}")
+
runtime = time.perf_counter() - start
+
result = {
'passed': passed,
- 'output': output,
+ 'output': output.strip() if output else '',
'runtime': runtime,
'error': error if not passed else None
}
+
return result