fixed the tests and done some frontend shit
This commit is contained in:
@@ -4,8 +4,12 @@ import json
|
||||
import sqlite3
|
||||
import threading
|
||||
import random
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
import subprocess
|
||||
import sys
|
||||
import traceback
|
||||
import io
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
from watchdog.observers import Observer
|
||||
@@ -37,6 +41,10 @@ class ProblemScannerThread(threading.Thread):
|
||||
|
||||
def scan(self):
|
||||
problems = []
|
||||
if not PROBLEMS_DIR.exists():
|
||||
print(f"Problems directory does not exist: {PROBLEMS_DIR}")
|
||||
return problems
|
||||
|
||||
for folder in PROBLEMS_DIR.iterdir():
|
||||
if folder.is_dir():
|
||||
# Dynamically find manifest file (manifest.json or manifets.json)
|
||||
@@ -46,73 +54,323 @@ class ProblemScannerThread(threading.Thread):
|
||||
if candidate_path.exists():
|
||||
manifest_path = candidate_path
|
||||
break
|
||||
|
||||
desc_path = folder / 'description.md'
|
||||
test_path = folder / 'test.py'
|
||||
if manifest_path and test_path.exists():
|
||||
with open(desc_path, 'r') as f:
|
||||
description = f.read()
|
||||
with open(test_path, 'r') as f:
|
||||
test_code = f.read()
|
||||
problems.append({
|
||||
'folder': folder.name,
|
||||
'description': description,
|
||||
'test_code': test_code
|
||||
})
|
||||
|
||||
# Check if required files exist
|
||||
if manifest_path and desc_path.exists() and test_path.exists():
|
||||
try:
|
||||
with open(desc_path, 'r', encoding='utf-8') as f:
|
||||
description = f.read()
|
||||
with open(test_path, 'r', encoding='utf-8') as f:
|
||||
test_code = f.read()
|
||||
|
||||
problems.append({
|
||||
'folder': folder.name,
|
||||
'description': description,
|
||||
'test_code': test_code
|
||||
})
|
||||
print(f"Found problem: {folder.name}")
|
||||
except Exception as e:
|
||||
print(f"Error reading problem files for {folder.name}: {e}")
|
||||
else:
|
||||
missing_files = []
|
||||
if not manifest_path:
|
||||
missing_files.append("manifest.json/manifets.json")
|
||||
if not desc_path.exists():
|
||||
missing_files.append("description.md")
|
||||
if not test_path.exists():
|
||||
missing_files.append("test.py")
|
||||
print(f"Skipping {folder.name}: missing {', '.join(missing_files)}")
|
||||
|
||||
print(f"Total problems found: {len(problems)}")
|
||||
return problems
|
||||
|
||||
def update_db(self, problems, retries=5):
|
||||
for attempt in range(retries):
|
||||
try:
|
||||
conn = sqlite3.connect(DB_PATH, timeout=2)
|
||||
conn = sqlite3.connect(DB_PATH, timeout=5)
|
||||
c = conn.cursor()
|
||||
c.execute('PRAGMA journal_mode=WAL;')
|
||||
|
||||
# Clear existing problems
|
||||
c.execute('DELETE FROM problems')
|
||||
|
||||
# Insert new problems
|
||||
for p in problems:
|
||||
c.execute('INSERT INTO problems (folder, description, test_code) VALUES (?, ?, ?)',
|
||||
(p['folder'], p['description'], p['test_code']))
|
||||
|
||||
conn.commit()
|
||||
print(f"Updated database with {len(problems)} problems")
|
||||
conn.close()
|
||||
return
|
||||
|
||||
except sqlite3.OperationalError as e:
|
||||
if 'locked' in str(e):
|
||||
time.sleep(0.2 + random.random() * 0.3)
|
||||
if 'locked' in str(e).lower():
|
||||
wait_time = 0.2 + random.random() * 0.3
|
||||
print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})")
|
||||
time.sleep(wait_time)
|
||||
else:
|
||||
print(f"Database error: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"Unexpected error updating database: {e}")
|
||||
raise
|
||||
|
||||
print('Failed to update problems DB after several retries due to lock.')
|
||||
|
||||
def rescan_and_update(self):
|
||||
print("Scanning for problems...")
|
||||
problems = self.scan()
|
||||
self.update_db(problems)
|
||||
|
||||
def run(self):
|
||||
print("Starting problem scanner...")
|
||||
|
||||
# Initial scan and table creation
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
self.create_table(conn)
|
||||
conn.close()
|
||||
try:
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
self.create_table(conn)
|
||||
conn.close()
|
||||
print("Database initialized")
|
||||
except Exception as e:
|
||||
print(f"Failed to initialize database: {e}")
|
||||
return
|
||||
|
||||
# Initial scan
|
||||
self.rescan_and_update()
|
||||
|
||||
if WATCHDOG_AVAILABLE:
|
||||
print("Using watchdog for file monitoring")
|
||||
|
||||
class Handler(FileSystemEventHandler):
|
||||
def __init__(self, scanner):
|
||||
self.scanner = scanner
|
||||
self.last_event_time = 0
|
||||
|
||||
def on_any_event(self, event):
|
||||
self.scanner.rescan_and_update()
|
||||
# Debounce events to avoid too many rescans
|
||||
now = time.time()
|
||||
if now - self.last_event_time > 1: # Wait at least 1 second between rescans
|
||||
self.last_event_time = now
|
||||
print(f"File system event: {event.event_type} - {event.src_path}")
|
||||
self.scanner.rescan_and_update()
|
||||
|
||||
event_handler = Handler(self)
|
||||
self.observer = Observer()
|
||||
self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True)
|
||||
self.observer.start()
|
||||
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("Stopping problem scanner...")
|
||||
finally:
|
||||
self.observer.stop()
|
||||
self.observer.join()
|
||||
else:
|
||||
print(f"Watchdog not available, using polling every {self.scan_interval}s")
|
||||
# Fallback: poll every scan_interval seconds
|
||||
while True:
|
||||
time.sleep(self.scan_interval)
|
||||
self.rescan_and_update()
|
||||
try:
|
||||
while True:
|
||||
time.sleep(self.scan_interval)
|
||||
self.rescan_and_update()
|
||||
except KeyboardInterrupt:
|
||||
print("Stopping problem scanner...")
|
||||
|
||||
def start_problem_scanner():
|
||||
scanner = ProblemScannerThread()
|
||||
scanner.start()
|
||||
return scanner
|
||||
|
||||
# Flask model loading functions
|
||||
def load_problems_from_json(json_path):
|
||||
"""Load problems from JSON file into Flask database"""
|
||||
if not os.path.exists(json_path):
|
||||
print(f"Problem JSON file not found: {json_path}")
|
||||
return
|
||||
|
||||
try:
|
||||
with open(json_path, 'r', encoding='utf-8') as f:
|
||||
problems = json.load(f)
|
||||
except Exception as e:
|
||||
print(f"Error reading JSON file: {e}")
|
||||
return
|
||||
|
||||
# This assumes you have imported the necessary Flask/SQLAlchemy components
|
||||
try:
|
||||
from models import db, Problem
|
||||
|
||||
for p in problems:
|
||||
# Check if problem already exists by title
|
||||
existing = Problem.query.filter_by(title=p['title']).first()
|
||||
|
||||
# Load test code from solution file if provided
|
||||
test_code = ''
|
||||
if 'solution' in p and os.path.exists(p['solution']):
|
||||
try:
|
||||
with open(p['solution'], 'r', encoding='utf-8') as sf:
|
||||
test_code = sf.read()
|
||||
except Exception as e:
|
||||
print(f"Error reading solution file for {p['title']}: {e}")
|
||||
|
||||
if existing:
|
||||
existing.description = p['description']
|
||||
existing.test_code = test_code
|
||||
print(f"Updated problem: {p['title']}")
|
||||
else:
|
||||
new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code)
|
||||
db.session.add(new_problem)
|
||||
print(f"Added new problem: {p['title']}")
|
||||
|
||||
db.session.commit()
|
||||
print("Successfully updated problems from JSON")
|
||||
|
||||
except ImportError:
|
||||
print("Flask models not available - skipping JSON load")
|
||||
except Exception as e:
|
||||
print(f"Error loading problems from JSON: {e}")
|
||||
|
||||
def schedule_problem_reload(app, json_path, interval_hours=10):
|
||||
"""Schedule periodic reloading of problems from JSON"""
|
||||
def reload_loop():
|
||||
while True:
|
||||
try:
|
||||
with app.app_context():
|
||||
load_problems_from_json(json_path)
|
||||
time.sleep(interval_hours * 3600)
|
||||
except Exception as e:
|
||||
print(f"Error in problem reload loop: {e}")
|
||||
time.sleep(60) # Wait 1 minute before retrying
|
||||
|
||||
t = threading.Thread(target=reload_loop, daemon=True)
|
||||
t.start()
|
||||
|
||||
def run_code_against_tests(user_code, test_code, timeout=10):
|
||||
"""
|
||||
Execute user code against test code with proper error handling.
|
||||
|
||||
Args:
|
||||
user_code: The user's solution code
|
||||
test_code: The test code to validate the solution
|
||||
timeout: Maximum execution time in seconds
|
||||
|
||||
Returns:
|
||||
dict: Result with passed, output, runtime, and error fields
|
||||
"""
|
||||
if not user_code or not user_code.strip():
|
||||
return {
|
||||
'passed': False,
|
||||
'output': '',
|
||||
'runtime': 0,
|
||||
'error': 'No code provided'
|
||||
}
|
||||
|
||||
if not test_code or not test_code.strip():
|
||||
return {
|
||||
'passed': False,
|
||||
'output': '',
|
||||
'runtime': 0,
|
||||
'error': 'No test code available'
|
||||
}
|
||||
|
||||
start_time = time.perf_counter()
|
||||
output = ''
|
||||
error = None
|
||||
passed = False
|
||||
temp_file = None
|
||||
|
||||
try:
|
||||
# Check if unittest is used in test_code
|
||||
if 'unittest' in test_code:
|
||||
# Create temporary file with user code + test code
|
||||
with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f:
|
||||
# Combine user code and test code
|
||||
combined_code = f"{user_code}\n\n{test_code}"
|
||||
f.write(combined_code)
|
||||
f.flush()
|
||||
temp_file = f.name
|
||||
|
||||
try:
|
||||
# Run the file as a subprocess with timeout
|
||||
proc = subprocess.run(
|
||||
[sys.executable, temp_file],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout,
|
||||
encoding='utf-8'
|
||||
)
|
||||
|
||||
output = proc.stdout
|
||||
if proc.stderr:
|
||||
output += f"\nSTDERR:\n{proc.stderr}"
|
||||
|
||||
passed = proc.returncode == 0
|
||||
if not passed:
|
||||
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
passed = False
|
||||
error = f"Code execution timed out after {timeout} seconds"
|
||||
output = "Execution timed out"
|
||||
|
||||
else:
|
||||
# Direct execution approach for simple assert-based tests
|
||||
local_ns = {}
|
||||
|
||||
# Capture stdout
|
||||
old_stdout = sys.stdout
|
||||
captured_output = io.StringIO()
|
||||
sys.stdout = captured_output
|
||||
|
||||
try:
|
||||
# Execute user code first
|
||||
exec(user_code, {}, local_ns)
|
||||
|
||||
# Execute test code in the same namespace
|
||||
exec(test_code, local_ns, local_ns)
|
||||
|
||||
# If we get here without exceptions, tests passed
|
||||
passed = True
|
||||
|
||||
except AssertionError as e:
|
||||
passed = False
|
||||
error = f"Assertion failed: {str(e)}"
|
||||
|
||||
except Exception as e:
|
||||
passed = False
|
||||
error = f"Runtime error: {traceback.format_exc()}"
|
||||
|
||||
finally:
|
||||
output = captured_output.getvalue()
|
||||
sys.stdout = old_stdout
|
||||
|
||||
except Exception as e:
|
||||
passed = False
|
||||
error = f"Execution error: {traceback.format_exc()}"
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
if temp_file and os.path.exists(temp_file):
|
||||
try:
|
||||
os.unlink(temp_file)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not delete temp file {temp_file}: {e}")
|
||||
|
||||
runtime = time.perf_counter() - start_time
|
||||
|
||||
result = {
|
||||
'passed': passed,
|
||||
'output': output.strip() if output else '',
|
||||
'runtime': runtime,
|
||||
'error': error if not passed else None
|
||||
}
|
||||
|
||||
print(f"Test execution result: passed={passed}, runtime={runtime:.3f}s")
|
||||
if error:
|
||||
print(f"Error: {error}")
|
||||
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user