Files
QPP/problem_scanner.py

377 lines
13 KiB
Python

import os
import time
import json
import sqlite3
import threading
import random
import tempfile
import subprocess
import sys
import traceback
import io
from pathlib import Path
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
PROBLEMS_DIR = Path(__file__).parent / 'problems'
DB_PATH = Path(__file__).parent / 'problems.sqlite3'
class ProblemScannerThread(threading.Thread):
def __init__(self, scan_interval=2):
super().__init__(daemon=True)
self.scan_interval = scan_interval
self.last_state = {}
self.observer = None
def create_table(self, conn):
c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;')
c.execute('''CREATE TABLE IF NOT EXISTS problems (
id INTEGER PRIMARY KEY AUTOINCREMENT,
folder TEXT,
description TEXT,
test_code TEXT
)''')
conn.commit()
def scan(self):
problems = []
if not PROBLEMS_DIR.exists():
print(f"Problems directory does not exist: {PROBLEMS_DIR}")
return problems
for folder in PROBLEMS_DIR.iterdir():
if folder.is_dir():
# Dynamically find manifest file (manifest.json or manifets.json)
manifest_path = None
for candidate in ["manifest.json", "manifets.json"]:
candidate_path = folder / candidate
if candidate_path.exists():
manifest_path = candidate_path
break
desc_path = folder / 'description.md'
test_path = folder / 'test.py'
# Check if required files exist
if manifest_path and desc_path.exists() and test_path.exists():
try:
with open(desc_path, 'r', encoding='utf-8') as f:
description = f.read()
with open(test_path, 'r', encoding='utf-8') as f:
test_code = f.read()
problems.append({
'folder': folder.name,
'description': description,
'test_code': test_code
})
print(f"Found problem: {folder.name}")
except Exception as e:
print(f"Error reading problem files for {folder.name}: {e}")
else:
missing_files = []
if not manifest_path:
missing_files.append("manifest.json/manifets.json")
if not desc_path.exists():
missing_files.append("description.md")
if not test_path.exists():
missing_files.append("test.py")
print(f"Skipping {folder.name}: missing {', '.join(missing_files)}")
print(f"Total problems found: {len(problems)}")
return problems
def update_db(self, problems, retries=5):
for attempt in range(retries):
try:
conn = sqlite3.connect(DB_PATH, timeout=5)
c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;')
# Clear existing problems
c.execute('DELETE FROM problems')
# Insert new problems
for p in problems:
c.execute('INSERT INTO problems (folder, description, test_code) VALUES (?, ?, ?)',
(p['folder'], p['description'], p['test_code']))
conn.commit()
print(f"Updated database with {len(problems)} problems")
conn.close()
return
except sqlite3.OperationalError as e:
if 'locked' in str(e).lower():
wait_time = 0.2 + random.random() * 0.3
print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})")
time.sleep(wait_time)
else:
print(f"Database error: {e}")
raise
except Exception as e:
print(f"Unexpected error updating database: {e}")
raise
print('Failed to update problems DB after several retries due to lock.')
def rescan_and_update(self):
print("Scanning for problems...")
problems = self.scan()
self.update_db(problems)
def run(self):
print("Starting problem scanner...")
# Initial scan and table creation
try:
conn = sqlite3.connect(DB_PATH)
self.create_table(conn)
conn.close()
print("Database initialized")
except Exception as e:
print(f"Failed to initialize database: {e}")
return
# Initial scan
self.rescan_and_update()
if WATCHDOG_AVAILABLE:
print("Using watchdog for file monitoring")
class Handler(FileSystemEventHandler):
def __init__(self, scanner):
self.scanner = scanner
self.last_event_time = 0
def on_any_event(self, event):
# Debounce events to avoid too many rescans
now = time.time()
if now - self.last_event_time > 1: # Wait at least 1 second between rescans
self.last_event_time = now
print(f"File system event: {event.event_type} - {event.src_path}")
self.scanner.rescan_and_update()
event_handler = Handler(self)
self.observer = Observer()
self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True)
self.observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping problem scanner...")
finally:
self.observer.stop()
self.observer.join()
else:
print(f"Watchdog not available, using polling every {self.scan_interval}s")
# Fallback: poll every scan_interval seconds
try:
while True:
time.sleep(self.scan_interval)
self.rescan_and_update()
except KeyboardInterrupt:
print("Stopping problem scanner...")
def start_problem_scanner():
scanner = ProblemScannerThread()
scanner.start()
return scanner
# Flask model loading functions
def load_problems_from_json(json_path):
"""Load problems from JSON file into Flask database"""
if not os.path.exists(json_path):
print(f"Problem JSON file not found: {json_path}")
return
try:
with open(json_path, 'r', encoding='utf-8') as f:
problems = json.load(f)
except Exception as e:
print(f"Error reading JSON file: {e}")
return
# This assumes you have imported the necessary Flask/SQLAlchemy components
try:
from models import db, Problem
for p in problems:
# Check if problem already exists by title
existing = Problem.query.filter_by(title=p['title']).first()
# Load test code from solution file if provided
test_code = ''
if 'solution' in p and os.path.exists(p['solution']):
try:
with open(p['solution'], 'r', encoding='utf-8') as sf:
test_code = sf.read()
except Exception as e:
print(f"Error reading solution file for {p['title']}: {e}")
if existing:
existing.description = p['description']
existing.test_code = test_code
print(f"Updated problem: {p['title']}")
else:
new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code)
db.session.add(new_problem)
print(f"Added new problem: {p['title']}")
db.session.commit()
print("Successfully updated problems from JSON")
except ImportError:
print("Flask models not available - skipping JSON load")
except Exception as e:
print(f"Error loading problems from JSON: {e}")
def schedule_problem_reload(app, json_path, interval_hours=10):
"""Schedule periodic reloading of problems from JSON"""
def reload_loop():
while True:
try:
with app.app_context():
load_problems_from_json(json_path)
time.sleep(interval_hours * 3600)
except Exception as e:
print(f"Error in problem reload loop: {e}")
time.sleep(60) # Wait 1 minute before retrying
t = threading.Thread(target=reload_loop, daemon=True)
t.start()
def run_code_against_tests(user_code, test_code, timeout=10):
"""
Execute user code against test code with proper error handling.
Args:
user_code: The user's solution code
test_code: The test code to validate the solution
timeout: Maximum execution time in seconds
Returns:
dict: Result with passed, output, runtime, and error fields
"""
if not user_code or not user_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': 'No code provided'
}
if not test_code or not test_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': 'No test code available'
}
start_time = time.perf_counter()
output = ''
error = None
passed = False
temp_file = None
try:
# Check if unittest is used in test_code
if 'unittest' in test_code:
# Create temporary file with user code + test code
with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f:
# Combine user code and test code
combined_code = f"{user_code}\n\n{test_code}"
f.write(combined_code)
f.flush()
temp_file = f.name
try:
# Run the file as a subprocess with timeout
proc = subprocess.run(
[sys.executable, temp_file],
capture_output=True,
text=True,
timeout=timeout,
encoding='utf-8'
)
output = proc.stdout
if proc.stderr:
output += f"\nSTDERR:\n{proc.stderr}"
passed = proc.returncode == 0
if not passed:
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
except subprocess.TimeoutExpired:
passed = False
error = f"Code execution timed out after {timeout} seconds"
output = "Execution timed out"
else:
# Direct execution approach for simple assert-based tests
local_ns = {}
# Capture stdout
old_stdout = sys.stdout
captured_output = io.StringIO()
sys.stdout = captured_output
try:
# Execute user code first
exec(user_code, {}, local_ns)
# Execute test code in the same namespace
exec(test_code, local_ns, local_ns)
# If we get here without exceptions, tests passed
passed = True
except AssertionError as e:
passed = False
error = f"Assertion failed: {str(e)}"
except Exception as e:
passed = False
error = f"Runtime error: {traceback.format_exc()}"
finally:
output = captured_output.getvalue()
sys.stdout = old_stdout
except Exception as e:
passed = False
error = f"Execution error: {traceback.format_exc()}"
finally:
# Clean up temporary file
if temp_file and os.path.exists(temp_file):
try:
os.unlink(temp_file)
except Exception as e:
print(f"Warning: Could not delete temp file {temp_file}: {e}")
runtime = time.perf_counter() - start_time
result = {
'passed': passed,
'output': output.strip() if output else '',
'runtime': runtime,
'error': error if not passed else None
}
print(f"Test execution result: passed={passed}, runtime={runtime:.3f}s")
if error:
print(f"Error: {error}")
return result