Files
QPP/src/utils.py
2025-08-14 22:05:20 +02:00

262 lines
9.0 KiB
Python

import sys
import traceback
import time
import io
import tempfile
import subprocess
import os
import re
import ast
# Security configuration
ALLOWED_IMPORTS = {
'math', 'random', 'datetime', 'json', 'collections', 'itertools',
'functools', 'operator', 'copy', 'unittest', 're', 'string'
}
DANGEROUS_PATTERNS = [
r'import\s+os(?:\s|$|\.)',
r'from\s+os\s+import',
r'import\s+subprocess(?:\s|$|\.)',
r'from\s+subprocess\s+import',
r'import\s+sys(?:\s|$|\.)',
r'from\s+sys\s+import',
r'import\s+shutil(?:\s|$|\.)',
r'from\s+shutil\s+import',
r'import\s+pathlib(?:\s|$|\.)',
r'from\s+pathlib\s+import',
r'__import__\s*\(',
r'exec\s*\(',
r'eval\s*\(',
r'compile\s*\(',
r'open\s*\(',
r'file\s*\(',
r'input\s*\(',
r'raw_input\s*\(',
r'\.unlink\s*\(',
r'\.remove\s*\(',
r'\.rmdir\s*\(',
r'\.rmtree\s*\(',
r'\.delete\s*\(',
r'\.kill\s*\(',
r'\.terminate\s*\(',
]
def validate_code_security(code):
"""
Validates code for security issues.
Returns (is_safe, error_message)
"""
# Check for dangerous patterns
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
return False, f"Dangerous operation detected: {pattern}"
# Parse AST to check imports
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
module_name = alias.name.split('.')[0]
if module_name not in ALLOWED_IMPORTS:
return False, f"Import not allowed: {module_name}"
elif isinstance(node, ast.ImportFrom):
if node.module:
module_name = node.module.split('.')[0]
if module_name not in ALLOWED_IMPORTS:
return False, f"Import not allowed: {module_name}"
except SyntaxError as e:
return False, f"Syntax error in code: {str(e)}"
return True, None
def create_restricted_globals():
"""Create a restricted global namespace for code execution."""
safe_builtins = {
'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'dir',
'divmod', 'enumerate', 'filter', 'float', 'format', 'frozenset',
'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', 'len',
'list', 'map', 'max', 'min', 'next', 'oct', 'ord', 'pow',
'print', 'range', 'repr', 'reversed', 'round', 'set', 'slice',
'sorted', 'str', 'sum', 'tuple', 'type', 'zip'
}
restricted_globals = {
'__builtins__': {name: __builtins__[name] for name in safe_builtins if name in __builtins__}
}
# Add allowed modules
for module in ALLOWED_IMPORTS:
try:
restricted_globals[module] = __import__(module)
except ImportError:
pass # Module not available
return restricted_globals
def run_code_against_tests(user_code, test_code, max_execution_time=5):
"""
Securely run user code against test code with safety restrictions.
Args:
user_code: The user's solution code
test_code: The test code to validate the solution
max_execution_time: Maximum execution time in seconds (default: 5)
Returns:
dict: Result containing passed, output, runtime, and error information
"""
# Validate security for both user code and test code
user_safe, user_error = validate_code_security(user_code)
if not user_safe:
return {
'passed': False,
'output': '',
'runtime': 0,
'error': f"Security violation in user code: {user_error}"
}
test_safe, test_error = validate_code_security(test_code)
if not test_safe:
return {
'passed': False,
'output': '',
'runtime': 0,
'error': f"Security violation in test code: {test_error}"
}
local_ns = {}
output = ''
start = time.perf_counter()
error = None
passed = False
temp_file = None
try:
# Check if unittest is used in test_code
if 'unittest' in test_code:
# Create temp file in a secure temporary directory
temp_dir = tempfile.mkdtemp(prefix='secure_code_')
try:
temp_file = os.path.join(temp_dir, 'test_code.py')
combined_code = f"{user_code}\n\n{test_code}"
# Write to temp file with restricted permissions
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(combined_code)
os.chmod(temp_file, 0o600) # Read/write for owner only
# Run the file as a subprocess with additional security
try:
proc = subprocess.run(
[sys.executable, temp_file],
capture_output=True,
text=True,
timeout=max_execution_time,
encoding='utf-8',
cwd=temp_dir, # Run in the temporary directory
env={'PYTHONPATH': ''} # Restrict Python path
)
# Combine both stdout and stderr to capture all output
combined_output = ""
if proc.stdout:
combined_output += proc.stdout
if proc.stderr:
if combined_output:
combined_output += "\n" + proc.stderr
else:
combined_output = proc.stderr
output = combined_output
passed = proc.returncode == 0
if not passed:
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
except subprocess.TimeoutExpired:
passed = False
error = f"Code execution timed out after {max_execution_time} seconds"
output = "Execution timed out"
finally:
# Secure cleanup of temporary directory and files
try:
if temp_file and os.path.exists(temp_file):
os.chmod(temp_file, 0o600) # Ensure we can delete
os.unlink(temp_file)
if os.path.exists(temp_dir):
os.rmdir(temp_dir)
except Exception as cleanup_error:
print(f"Warning: Could not clean up temp files: {cleanup_error}")
else:
# Direct execution with restricted globals
old_stdout = sys.stdout
captured_output = io.StringIO()
sys.stdout = captured_output
try:
# Create restricted execution environment
restricted_globals = create_restricted_globals()
# Execute user code in restricted environment
exec(user_code, restricted_globals, local_ns)
# Execute test code (should raise AssertionError if fail)
exec(test_code, {**restricted_globals, **local_ns}, local_ns)
passed = True
except AssertionError as e:
passed = False
error = f"Assertion failed: {str(e)}"
except Exception as e:
passed = False
error = f"Runtime error: {traceback.format_exc()}"
finally:
output = captured_output.getvalue()
sys.stdout = old_stdout
except Exception as e:
passed = False
error = f"Execution error: {traceback.format_exc()}"
runtime = time.perf_counter() - start
# Limit output size to prevent memory issues
max_output_size = 10000 # 10KB limit
if len(output) > max_output_size:
output = output[:max_output_size] + "\n... (output truncated)"
result = {
'passed': passed,
'output': output.strip() if output else '',
'runtime': runtime,
'error': error if not passed else None
}
return result
# Example usage with additional safety wrapper
def safe_code_runner(user_code, test_code):
"""
Additional wrapper for extra safety checks.
"""
# Additional length checks
if len(user_code) > 50000: # 50KB limit
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "User code too large (maximum 50KB allowed)"
}
if len(test_code) > 10000: # 10KB limit for test code
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "Test code too large (maximum 10KB allowed)"
}
return run_code_against_tests(user_code, test_code)