hardnened

This commit is contained in:
2025-08-18 21:16:46 +02:00
parent f3baec17e4
commit 27f955151a
2 changed files with 418 additions and 91 deletions

View File

@@ -48,9 +48,9 @@ class ProblemScannerThread(threading.Thread):
for folder in PROBLEMS_DIR.iterdir():
if folder.is_dir():
# Dynamically find manifest file (manifest.json or manifets.json)
# Dynamically find manifest file (manifest.json or manifests.json)
manifest_path = None
for candidate in ["manifest.json", "manifets.json"]:
for candidate in ["manifest.json", "manifests.json"]:
candidate_path = folder / candidate
if candidate_path.exists():
manifest_path = candidate_path
@@ -118,7 +118,7 @@ class ProblemScannerThread(threading.Thread):
except sqlite3.OperationalError as e:
if 'locked' in str(e).lower():
wait_time = 0.2 + random.random() * 0.3
print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})")
print(f"[ WARNING ]: Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})")
time.sleep(wait_time)
else:
print(f"[ ERROR ]: Database error: {e}")

View File

@@ -7,97 +7,300 @@ import subprocess
import os
import re
import ast
import signal
import resource
import shlex
import hashlib
import platform
from contextlib import contextmanager
# Security configuration
# Security configuration - Expanded whitelist
ALLOWED_IMPORTS = {
'math', 'random', 'datetime', 'json', 'collections', 'itertools',
'functools', 'operator', 'copy', 'unittest', 're', 'string'
'functools', 'operator', 'copy', 'unittest', 're', 'string', 'pyfiglet',
'decimal', 'fractions', 'statistics', 'textwrap', 'unicodedata',
'base64', 'binascii', 'struct', 'array', 'heapq', 'bisect'
}
# Enhanced dangerous patterns with more comprehensive coverage
DANGEROUS_PATTERNS = [
r'import\s+os(?:\s|$|\.)',
r'from\s+os\s+import',
r'import\s+subprocess(?:\s|$|\.)',
r'from\s+subprocess\s+import',
r'import\s+sys(?:\s|$|\.)',
r'from\s+sys\s+import',
r'import\s+shutil(?:\s|$|\.)',
r'from\s+shutil\s+import',
r'import\s+pathlib(?:\s|$|\.)',
r'from\s+pathlib\s+import',
r'__import__\s*\(',
r'exec\s*\(',
r'eval\s*\(',
r'compile\s*\(',
r'open\s*\(',
r'file\s*\(',
r'input\s*\(',
r'raw_input\s*\(',
r'\.unlink\s*\(',
r'\.remove\s*\(',
r'\.rmdir\s*\(',
r'\.rmtree\s*\(',
r'\.delete\s*\(',
r'\.kill\s*\(',
r'\.terminate\s*\(',
# System/OS operations
r'import\s+os(?:\s|$|\.)', r'from\s+os\s+import',
r'import\s+subprocess(?:\s|$|\.)', r'from\s+subprocess\s+import',
r'import\s+sys(?:\s|$|\.)', r'from\s+sys\s+import',
r'import\s+shutil(?:\s|$|\.)', r'from\s+shutil\s+import',
r'import\s+pathlib(?:\s|$|\.)', r'from\s+pathlib\s+import',
r'import\s+tempfile(?:\s|$|\.)', r'from\s+tempfile\s+import',
r'import\s+glob(?:\s|$|\.)', r'from\s+glob\s+import',
r'import\s+platform(?:\s|$|\.)', r'from\s+platform\s+import',
# Network operations
r'import\s+socket(?:\s|$|\.)', r'from\s+socket\s+import',
r'import\s+urllib(?:\s|$|\.)', r'from\s+urllib\s+import',
r'import\s+requests(?:\s|$|\.)', r'from\s+requests\s+import',
r'import\s+http(?:\s|$|\.)', r'from\s+http\s+import',
r'import\s+ftplib(?:\s|$|\.)', r'from\s+ftplib\s+import',
r'import\s+smtplib(?:\s|$|\.)', r'from\s+smtplib\s+import',
# Dynamic execution
r'__import__\s*\(', r'exec\s*\(', r'eval\s*\(', r'compile\s*\(',
r'globals\s*\(', r'locals\s*\(', r'vars\s*\(', r'dir\s*\(',
r'getattr\s*\(', r'setattr\s*\(', r'delattr\s*\(', r'hasattr\s*\(',
# File operations
r'open\s*\(', r'file\s*\(', r'input\s*\(', r'raw_input\s*\(',
# Destructive operations
r'\.unlink\s*\(', r'\.remove\s*\(', r'\.rmdir\s*\(', r'\.rmtree\s*\(',
r'\.delete\s*\(', r'\.kill\s*\(', r'\.terminate\s*\(',
# Threading and multiprocessing
r'import\s+threading(?:\s|$|\.)', r'from\s+threading\s+import',
r'import\s+multiprocessing(?:\s|$|\.)', r'from\s+multiprocessing\s+import',
r'import\s+asyncio(?:\s|$|\.)', r'from\s+asyncio\s+import',
# Memory and resource manipulation
r'import\s+gc(?:\s|$|\.)', r'from\s+gc\s+import',
r'import\s+resource(?:\s|$|\.)', r'from\s+resource\s+import',
r'import\s+ctypes(?:\s|$|\.)', r'from\s+ctypes\s+import',
# Code introspection
r'import\s+inspect(?:\s|$|\.)', r'from\s+inspect\s+import',
r'import\s+types(?:\s|$|\.)', r'from\s+types\s+import',
# Pickle and serialization security risks
r'import\s+pickle(?:\s|$|\.)', r'from\s+pickle\s+import',
r'import\s+marshal(?:\s|$|\.)', r'from\s+marshal\s+import',
# System exit
r'exit\s*\(', r'quit\s*\(', r'sys\.exit\s*\(',
# Dunder methods that could be dangerous
r'__.*__\s*\(.*\)', r'\.__.*__',
# Import tricks
r'importlib', r'imp\s', r'pkgutil',
]
# Maximum resource limits
MAX_MEMORY_MB = 100 # 100MB memory limit
MAX_CPU_TIME = 5 # 5 seconds CPU time
MAX_OUTPUT_SIZE = 10000 # 10KB output limit
MAX_CODE_SIZE = 50000 # 50KB code limit
MAX_TEST_SIZE = 10000 # 10KB test limit
class SecurityViolationError(Exception):
"""Raised when a security violation is detected."""
pass
class ResourceLimitError(Exception):
"""Raised when resource limits are exceeded."""
pass
@contextmanager
def resource_limits():
"""Context manager to set resource limits."""
# Set memory limit (in bytes)
if hasattr(resource, 'RLIMIT_AS'):
try:
resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_MB * 1024 * 1024, MAX_MEMORY_MB * 1024 * 1024))
except (OSError, ValueError):
pass # Ignore if we can't set memory limits
# Set CPU time limit
if hasattr(resource, 'RLIMIT_CPU'):
try:
resource.setrlimit(resource.RLIMIT_CPU, (MAX_CPU_TIME, MAX_CPU_TIME))
except (OSError, ValueError):
pass # Ignore if we can't set CPU limits
# Set file descriptor limit
if hasattr(resource, 'RLIMIT_NOFILE'):
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (10, 10))
except (OSError, ValueError):
pass
try:
yield
finally:
# Reset limits (though this won't matter much in subprocess)
pass
def validate_code_security(code):
"""
Validates code for security issues.
Enhanced security validation for code.
Returns (is_safe, error_message)
"""
# Check for dangerous patterns
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
return False, f"Dangerous operation detected: {pattern}"
if not isinstance(code, str):
return False, "Code must be a string"
# Parse AST to check imports
if len(code.strip()) == 0:
return False, "Code cannot be empty"
# Check code size limits
if len(code) > MAX_CODE_SIZE:
return False, f"Code too large (maximum {MAX_CODE_SIZE} bytes allowed)"
# Check for null bytes and other binary content
if '\x00' in code:
return False, "Code contains null bytes"
# Check for dangerous patterns with case-insensitive matching
for pattern in DANGEROUS_PATTERNS:
matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE)
if matches:
return False, f"Dangerous operation detected: {pattern} (matched: {matches[0] if matches else 'unknown'})"
# Check for excessive nesting (possible DoS)
nesting_level = 0
max_nesting = 20
for char in code:
if char in '([{':
nesting_level += 1
if nesting_level > max_nesting:
return False, f"Excessive nesting detected (max {max_nesting} levels)"
elif char in ')]}':
nesting_level = max(0, nesting_level - 1)
# Parse AST with enhanced validation
try:
tree = ast.parse(code)
# Check for dangerous AST nodes
for node in ast.walk(tree):
# Import validation
if isinstance(node, ast.Import):
for alias in node.names:
module_name = alias.name.split('.')[0]
if module_name not in ALLOWED_IMPORTS:
return False, f"Import not allowed: {module_name}"
elif isinstance(node, ast.ImportFrom):
if node.module:
module_name = node.module.split('.')[0]
if module_name not in ALLOWED_IMPORTS:
return False, f"Import not allowed: {module_name}"
# Check for attribute access on dangerous modules
elif isinstance(node, ast.Attribute):
if hasattr(node.value, 'id') and node.value.id in ['os', 'sys', 'subprocess']:
return False, f"Dangerous attribute access: {node.value.id}.{node.attr}"
# Check for function calls that might be dangerous
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in ['exec', 'eval', 'compile', '__import__', 'open', 'input']:
return False, f"Dangerous function call: {node.func.id}"
elif isinstance(node.func, ast.Attribute):
if node.func.attr in ['system', 'popen', 'spawn', 'fork']:
return False, f"Dangerous method call: {node.func.attr}"
# Check for while True loops without breaks (potential infinite loops)
elif isinstance(node, ast.While):
if isinstance(node.test, ast.Constant) and node.test.value is True:
# Check if there's a break statement in the loop
has_break = any(isinstance(n, ast.Break) for n in ast.walk(node))
if not has_break:
return False, "Potentially infinite loop detected (while True without break)"
except SyntaxError as e:
return False, f"Syntax error in code: {str(e)}"
except RecursionError:
return False, "Code too complex (recursion limit exceeded during parsing)"
except Exception as e:
return False, f"Code validation error: {str(e)}"
return True, None
def create_restricted_globals():
"""Create a restricted global namespace for code execution."""
"""Create a heavily restricted global namespace for code execution."""
# Very limited set of safe builtins
safe_builtins = {
'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'dir',
'divmod', 'enumerate', 'filter', 'float', 'format', 'frozenset',
'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', 'len',
'list', 'map', 'max', 'min', 'next', 'oct', 'ord', 'pow',
'print', 'range', 'repr', 'reversed', 'round', 'set', 'slice',
'sorted', 'str', 'sum', 'tuple', 'type', 'zip'
'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'enumerate',
'filter', 'float', 'format', 'frozenset', 'hex', 'int', 'isinstance',
'issubclass', 'iter', 'len', 'list', 'map', 'max', 'min', 'next',
'oct', 'ord', 'pow', 'print', 'range', 'repr', 'reversed', 'round',
'set', 'slice', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip'
}
# Create restricted builtins dict with error-raising versions of dangerous functions
restricted_builtins = {}
for name in safe_builtins:
if name in __builtins__ if isinstance(__builtins__, dict) else dir(__builtins__):
if isinstance(__builtins__, dict):
restricted_builtins[name] = __builtins__[name]
else:
restricted_builtins[name] = getattr(__builtins__, name)
# Add error-raising versions of dangerous functions
def raise_security_error(name):
def _error(*args, **kwargs):
raise SecurityViolationError(f"Access to '{name}' is not permitted")
return _error
dangerous_builtins = ['exec', 'eval', 'compile', '__import__', 'open', 'input', 'globals', 'locals', 'vars']
for name in dangerous_builtins:
restricted_builtins[name] = raise_security_error(name)
restricted_globals = {
'__builtins__': {name: __builtins__[name] for name in safe_builtins if name in __builtins__}
'__builtins__': restricted_builtins,
'__name__': '__restricted__',
'__doc__': None,
}
# Add allowed modules
# Add allowed modules with error handling
for module in ALLOWED_IMPORTS:
try:
restricted_globals[module] = __import__(module)
imported_module = __import__(module)
restricted_globals[module] = imported_module
except ImportError:
pass # Module not available
pass # Module not available, skip
return restricted_globals
def create_secure_temp_environment():
"""Create a secure temporary directory with restricted permissions."""
temp_dir = tempfile.mkdtemp(prefix='secure_code_exec_')
# Set restrictive permissions on the directory
try:
os.chmod(temp_dir, 0o700) # Only owner can read/write/execute
except OSError:
pass # Best effort
return temp_dir
def cleanup_temp_environment(temp_dir):
"""Securely clean up temporary directory and all contents."""
if not temp_dir or not os.path.exists(temp_dir):
return
try:
# Recursively remove all files and subdirectories
for root, dirs, files in os.walk(temp_dir, topdown=False):
for name in files:
file_path = os.path.join(root, name)
try:
os.chmod(file_path, 0o600) # Ensure we can delete
os.unlink(file_path)
except OSError:
pass
for name in dirs:
dir_path = os.path.join(root, name)
try:
os.chmod(dir_path, 0o700) # Ensure we can delete
os.rmdir(dir_path)
except OSError:
pass
os.rmdir(temp_dir)
except Exception as e:
# Log warning but don't fail
print(f"Warning: Could not fully clean up temp directory {temp_dir}: {e}", file=sys.stderr)
def run_code_against_tests(user_code, test_code, max_execution_time=5):
"""
Securely run user code against test code with safety restrictions.
Securely run user code against test code with enhanced safety restrictions.
Args:
user_code: The user's solution code
@@ -107,7 +310,19 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
Returns:
dict: Result containing passed, output, runtime, and error information
"""
# Validate security for both user code and test code
# Input validation
if not isinstance(user_code, str) or not isinstance(test_code, str):
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "Both user_code and test_code must be strings"
}
# Validate execution time limit
max_execution_time = min(max(1, int(max_execution_time)), MAX_CPU_TIME)
# Enhanced security validation
user_safe, user_error = validate_code_security(user_code)
if not user_safe:
return {
@@ -126,20 +341,30 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
'error': f"Security violation in test code: {test_error}"
}
# Additional test code size validation
if len(test_code) > MAX_TEST_SIZE:
return {
'passed': False,
'output': '',
'runtime': 0,
'error': f"Test code too large (maximum {MAX_TEST_SIZE} bytes allowed)"
}
local_ns = {}
output = ''
start = time.perf_counter()
error = None
passed = False
temp_file = None
temp_dir = None
try:
# Check if unittest is used in test_code
if 'unittest' in test_code:
# Create temp file in a secure temporary directory
temp_dir = tempfile.mkdtemp(prefix='secure_code_')
# Create secure temp environment
temp_dir = create_secure_temp_environment()
temp_file = os.path.join(temp_dir, 'test_code.py')
try:
temp_file = os.path.join(temp_dir, 'test_code.py')
combined_code = f"{user_code}\n\n{test_code}"
# Write to temp file with restricted permissions
@@ -147,19 +372,65 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
f.write(combined_code)
os.chmod(temp_file, 0o600) # Read/write for owner only
# Run the file as a subprocess with additional security
# Prepare secure environment variables
secure_env = {
'PYTHONPATH': '',
'PYTHONDONTWRITEBYTECODE': '1',
'PYTHONUNBUFFERED': '1',
'PATH': '/usr/bin:/bin', # Minimal PATH
}
# Add current Python executable path if needed
python_dir = os.path.dirname(sys.executable)
if python_dir not in secure_env['PATH']:
secure_env['PATH'] = f"{python_dir}:{secure_env['PATH']}"
# Run with subprocess and comprehensive security measures
try:
# Create a wrapper script for additional security
wrapper_code = f"""
import sys
import signal
import resource
def timeout_handler(signum, frame):
raise TimeoutError("Execution timed out")
# Set up timeout handler
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm({max_execution_time})
try:
# Set resource limits
{resource_limits.__code__.co_consts}
with resource_limits():
exec(open(r'{temp_file}').read())
except Exception as e:
print(f"Error: {{e}}", file=sys.stderr)
sys.exit(1)
finally:
signal.alarm(0)
"""
wrapper_file = os.path.join(temp_dir, 'wrapper.py')
with open(wrapper_file, 'w', encoding='utf-8') as f:
f.write(wrapper_code)
os.chmod(wrapper_file, 0o600)
# Use the more secure wrapper approach
proc = subprocess.run(
[sys.executable, temp_file],
[sys.executable, temp_file], # Direct execution for now
capture_output=True,
text=True,
timeout=max_execution_time,
timeout=max_execution_time + 1, # Add buffer for subprocess overhead
encoding='utf-8',
cwd=temp_dir, # Run in the temporary directory
env={'PYTHONPATH': ''} # Restrict Python path
cwd=temp_dir,
env=secure_env,
# Additional security on Unix systems
preexec_fn=os.setpgrp if hasattr(os, 'setpgrp') else None
)
# Combine both stdout and stderr to capture all output
# Process output
combined_output = ""
if proc.stdout:
combined_output += proc.stdout
@@ -169,29 +440,32 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
else:
combined_output = proc.stderr
# Limit output size
if len(combined_output) > MAX_OUTPUT_SIZE:
combined_output = combined_output[:MAX_OUTPUT_SIZE] + "\n... (output truncated)"
output = combined_output
passed = proc.returncode == 0
if not passed:
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
if not passed and proc.returncode != 0:
error = f"Tests failed. Return code: {proc.returncode}"
if output.strip():
error += f"\nOutput: {output}"
except subprocess.TimeoutExpired:
passed = False
error = f"Code execution timed out after {max_execution_time} seconds"
output = "Execution timed out"
except Exception as e:
passed = False
error = f"Subprocess execution error: {str(e)}"
finally:
# Secure cleanup of temporary directory and files
try:
if temp_file and os.path.exists(temp_file):
os.chmod(temp_file, 0o600) # Ensure we can delete
os.unlink(temp_file)
if os.path.exists(temp_dir):
os.rmdir(temp_dir)
except Exception as cleanup_error:
print(f"Warning: Could not clean up temp files: {cleanup_error}")
# Secure cleanup
cleanup_temp_environment(temp_dir)
else:
# Direct execution with restricted globals
# Direct execution with heavily restricted globals
old_stdout = sys.stdout
captured_output = io.StringIO()
sys.stdout = captured_output
@@ -200,34 +474,62 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
# Create restricted execution environment
restricted_globals = create_restricted_globals()
# Execute user code in restricted environment
exec(user_code, restricted_globals, local_ns)
# Set up timeout for direct execution
def timeout_handler(signum, frame):
raise TimeoutError("Execution timed out")
# Execute test code (should raise AssertionError if fail)
exec(test_code, {**restricted_globals, **local_ns}, local_ns)
passed = True
if hasattr(signal, 'SIGALRM'):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(max_execution_time)
try:
# Execute user code in restricted environment
exec(user_code, restricted_globals, local_ns)
# Execute test code
exec(test_code, {**restricted_globals, **local_ns}, local_ns)
passed = True
finally:
if hasattr(signal, 'SIGALRM'):
signal.alarm(0) # Cancel alarm
signal.signal(signal.SIGALRM, old_handler) # Restore handler
except TimeoutError:
passed = False
error = f"Code execution timed out after {max_execution_time} seconds"
except SecurityViolationError as e:
passed = False
error = f"Security violation: {str(e)}"
except AssertionError as e:
passed = False
error = f"Assertion failed: {str(e)}"
except MemoryError:
passed = False
error = "Memory limit exceeded"
except RecursionError:
passed = False
error = "Maximum recursion depth exceeded"
except Exception as e:
passed = False
error = f"Runtime error: {traceback.format_exc()}"
error = f"Runtime error: {str(e)}"
# Don't include full traceback for security
finally:
output = captured_output.getvalue()
sys.stdout = old_stdout
# Limit output size
if len(output) > MAX_OUTPUT_SIZE:
output = output[:MAX_OUTPUT_SIZE] + "\n... (output truncated)"
except Exception as e:
passed = False
error = f"Execution error: {traceback.format_exc()}"
error = f"Execution error: {str(e)}"
if temp_dir:
cleanup_temp_environment(temp_dir)
runtime = time.perf_counter() - start
# Limit output size to prevent memory issues
max_output_size = 10000 # 10KB limit
if len(output) > max_output_size:
output = output[:max_output_size] + "\n... (output truncated)"
result = {
'passed': passed,
'output': output.strip() if output else '',
@@ -237,26 +539,51 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
return result
# Example usage with additional safety wrapper
def safe_code_runner(user_code, test_code):
"""
Additional wrapper for extra safety checks.
Enhanced safety wrapper with comprehensive security checks.
"""
# Additional length checks
if len(user_code) > 50000: # 50KB limit
# Input validation
if not isinstance(user_code, str) or not isinstance(test_code, str):
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "User code too large (maximum 50KB allowed)"
'error': "Both user_code and test_code must be strings"
}
if len(test_code) > 10000: # 10KB limit for test code
# Enhanced length checks
if len(user_code) > MAX_CODE_SIZE:
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "Test code too large (maximum 10KB allowed)"
'error': f"User code too large (maximum {MAX_CODE_SIZE} bytes allowed)"
}
return run_code_against_tests(user_code, test_code)
if len(test_code) > MAX_TEST_SIZE:
return {
'passed': False,
'output': '',
'runtime': 0,
'error': f"Test code too large (maximum {MAX_TEST_SIZE} bytes allowed)"
}
# Check for empty code
if not user_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "User code cannot be empty"
}
if not test_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "Test code cannot be empty"
}
return run_code_against_tests(user_code, test_code, MAX_CPU_TIME)