From 27f955151a738ea180a2ff8a3f2350f13172cc03 Mon Sep 17 00:00:00 2001 From: rattatwinko Date: Mon, 18 Aug 2025 21:16:46 +0200 Subject: [PATCH] hardnened --- src/problem_scanner.py | 6 +- src/utils.py | 503 ++++++++++++++++++++++++++++++++++------- 2 files changed, 418 insertions(+), 91 deletions(-) diff --git a/src/problem_scanner.py b/src/problem_scanner.py index 6fb7b66..2d6a9b6 100644 --- a/src/problem_scanner.py +++ b/src/problem_scanner.py @@ -48,9 +48,9 @@ class ProblemScannerThread(threading.Thread): for folder in PROBLEMS_DIR.iterdir(): if folder.is_dir(): - # Dynamically find manifest file (manifest.json or manifets.json) + # Dynamically find manifest file (manifest.json or manifests.json) manifest_path = None - for candidate in ["manifest.json", "manifets.json"]: + for candidate in ["manifest.json", "manifests.json"]: candidate_path = folder / candidate if candidate_path.exists(): manifest_path = candidate_path @@ -118,7 +118,7 @@ class ProblemScannerThread(threading.Thread): except sqlite3.OperationalError as e: if 'locked' in str(e).lower(): wait_time = 0.2 + random.random() * 0.3 - print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})") + print(f"[ WARNING ]: Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})") time.sleep(wait_time) else: print(f"[ ERROR ]: Database error: {e}") diff --git a/src/utils.py b/src/utils.py index 581e43b..e6fbf81 100644 --- a/src/utils.py +++ b/src/utils.py @@ -7,97 +7,300 @@ import subprocess import os import re import ast +import signal +import resource +import shlex +import hashlib +import platform +from contextlib import contextmanager -# Security configuration +# Security configuration - Expanded whitelist ALLOWED_IMPORTS = { 'math', 'random', 'datetime', 'json', 'collections', 'itertools', - 'functools', 'operator', 'copy', 'unittest', 're', 'string' + 'functools', 'operator', 'copy', 'unittest', 're', 'string', 'pyfiglet', + 'decimal', 'fractions', 'statistics', 'textwrap', 'unicodedata', + 'base64', 'binascii', 'struct', 'array', 'heapq', 'bisect' } +# Enhanced dangerous patterns with more comprehensive coverage DANGEROUS_PATTERNS = [ - r'import\s+os(?:\s|$|\.)', - r'from\s+os\s+import', - r'import\s+subprocess(?:\s|$|\.)', - r'from\s+subprocess\s+import', - r'import\s+sys(?:\s|$|\.)', - r'from\s+sys\s+import', - r'import\s+shutil(?:\s|$|\.)', - r'from\s+shutil\s+import', - r'import\s+pathlib(?:\s|$|\.)', - r'from\s+pathlib\s+import', - r'__import__\s*\(', - r'exec\s*\(', - r'eval\s*\(', - r'compile\s*\(', - r'open\s*\(', - r'file\s*\(', - r'input\s*\(', - r'raw_input\s*\(', - r'\.unlink\s*\(', - r'\.remove\s*\(', - r'\.rmdir\s*\(', - r'\.rmtree\s*\(', - r'\.delete\s*\(', - r'\.kill\s*\(', - r'\.terminate\s*\(', + # System/OS operations + r'import\s+os(?:\s|$|\.)', r'from\s+os\s+import', + r'import\s+subprocess(?:\s|$|\.)', r'from\s+subprocess\s+import', + r'import\s+sys(?:\s|$|\.)', r'from\s+sys\s+import', + r'import\s+shutil(?:\s|$|\.)', r'from\s+shutil\s+import', + r'import\s+pathlib(?:\s|$|\.)', r'from\s+pathlib\s+import', + r'import\s+tempfile(?:\s|$|\.)', r'from\s+tempfile\s+import', + r'import\s+glob(?:\s|$|\.)', r'from\s+glob\s+import', + r'import\s+platform(?:\s|$|\.)', r'from\s+platform\s+import', + + # Network operations + r'import\s+socket(?:\s|$|\.)', r'from\s+socket\s+import', + r'import\s+urllib(?:\s|$|\.)', r'from\s+urllib\s+import', + r'import\s+requests(?:\s|$|\.)', r'from\s+requests\s+import', + r'import\s+http(?:\s|$|\.)', r'from\s+http\s+import', + r'import\s+ftplib(?:\s|$|\.)', r'from\s+ftplib\s+import', + r'import\s+smtplib(?:\s|$|\.)', r'from\s+smtplib\s+import', + + # Dynamic execution + r'__import__\s*\(', r'exec\s*\(', r'eval\s*\(', r'compile\s*\(', + r'globals\s*\(', r'locals\s*\(', r'vars\s*\(', r'dir\s*\(', + r'getattr\s*\(', r'setattr\s*\(', r'delattr\s*\(', r'hasattr\s*\(', + + # File operations + r'open\s*\(', r'file\s*\(', r'input\s*\(', r'raw_input\s*\(', + + # Destructive operations + r'\.unlink\s*\(', r'\.remove\s*\(', r'\.rmdir\s*\(', r'\.rmtree\s*\(', + r'\.delete\s*\(', r'\.kill\s*\(', r'\.terminate\s*\(', + + # Threading and multiprocessing + r'import\s+threading(?:\s|$|\.)', r'from\s+threading\s+import', + r'import\s+multiprocessing(?:\s|$|\.)', r'from\s+multiprocessing\s+import', + r'import\s+asyncio(?:\s|$|\.)', r'from\s+asyncio\s+import', + + # Memory and resource manipulation + r'import\s+gc(?:\s|$|\.)', r'from\s+gc\s+import', + r'import\s+resource(?:\s|$|\.)', r'from\s+resource\s+import', + r'import\s+ctypes(?:\s|$|\.)', r'from\s+ctypes\s+import', + + # Code introspection + r'import\s+inspect(?:\s|$|\.)', r'from\s+inspect\s+import', + r'import\s+types(?:\s|$|\.)', r'from\s+types\s+import', + + # Pickle and serialization security risks + r'import\s+pickle(?:\s|$|\.)', r'from\s+pickle\s+import', + r'import\s+marshal(?:\s|$|\.)', r'from\s+marshal\s+import', + + # System exit + r'exit\s*\(', r'quit\s*\(', r'sys\.exit\s*\(', + + # Dunder methods that could be dangerous + r'__.*__\s*\(.*\)', r'\.__.*__', + + # Import tricks + r'importlib', r'imp\s', r'pkgutil', ] +# Maximum resource limits +MAX_MEMORY_MB = 100 # 100MB memory limit +MAX_CPU_TIME = 5 # 5 seconds CPU time +MAX_OUTPUT_SIZE = 10000 # 10KB output limit +MAX_CODE_SIZE = 50000 # 50KB code limit +MAX_TEST_SIZE = 10000 # 10KB test limit + +class SecurityViolationError(Exception): + """Raised when a security violation is detected.""" + pass + +class ResourceLimitError(Exception): + """Raised when resource limits are exceeded.""" + pass + +@contextmanager +def resource_limits(): + """Context manager to set resource limits.""" + # Set memory limit (in bytes) + if hasattr(resource, 'RLIMIT_AS'): + try: + resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_MB * 1024 * 1024, MAX_MEMORY_MB * 1024 * 1024)) + except (OSError, ValueError): + pass # Ignore if we can't set memory limits + + # Set CPU time limit + if hasattr(resource, 'RLIMIT_CPU'): + try: + resource.setrlimit(resource.RLIMIT_CPU, (MAX_CPU_TIME, MAX_CPU_TIME)) + except (OSError, ValueError): + pass # Ignore if we can't set CPU limits + + # Set file descriptor limit + if hasattr(resource, 'RLIMIT_NOFILE'): + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (10, 10)) + except (OSError, ValueError): + pass + + try: + yield + finally: + # Reset limits (though this won't matter much in subprocess) + pass + def validate_code_security(code): """ - Validates code for security issues. + Enhanced security validation for code. Returns (is_safe, error_message) """ - # Check for dangerous patterns - for pattern in DANGEROUS_PATTERNS: - if re.search(pattern, code, re.IGNORECASE): - return False, f"Dangerous operation detected: {pattern}" + if not isinstance(code, str): + return False, "Code must be a string" - # Parse AST to check imports + if len(code.strip()) == 0: + return False, "Code cannot be empty" + + # Check code size limits + if len(code) > MAX_CODE_SIZE: + return False, f"Code too large (maximum {MAX_CODE_SIZE} bytes allowed)" + + # Check for null bytes and other binary content + if '\x00' in code: + return False, "Code contains null bytes" + + # Check for dangerous patterns with case-insensitive matching + for pattern in DANGEROUS_PATTERNS: + matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE) + if matches: + return False, f"Dangerous operation detected: {pattern} (matched: {matches[0] if matches else 'unknown'})" + + # Check for excessive nesting (possible DoS) + nesting_level = 0 + max_nesting = 20 + for char in code: + if char in '([{': + nesting_level += 1 + if nesting_level > max_nesting: + return False, f"Excessive nesting detected (max {max_nesting} levels)" + elif char in ')]}': + nesting_level = max(0, nesting_level - 1) + + # Parse AST with enhanced validation try: tree = ast.parse(code) + + # Check for dangerous AST nodes for node in ast.walk(tree): + # Import validation if isinstance(node, ast.Import): for alias in node.names: module_name = alias.name.split('.')[0] if module_name not in ALLOWED_IMPORTS: return False, f"Import not allowed: {module_name}" + elif isinstance(node, ast.ImportFrom): if node.module: module_name = node.module.split('.')[0] if module_name not in ALLOWED_IMPORTS: return False, f"Import not allowed: {module_name}" + + # Check for attribute access on dangerous modules + elif isinstance(node, ast.Attribute): + if hasattr(node.value, 'id') and node.value.id in ['os', 'sys', 'subprocess']: + return False, f"Dangerous attribute access: {node.value.id}.{node.attr}" + + # Check for function calls that might be dangerous + elif isinstance(node, ast.Call): + if isinstance(node.func, ast.Name): + if node.func.id in ['exec', 'eval', 'compile', '__import__', 'open', 'input']: + return False, f"Dangerous function call: {node.func.id}" + elif isinstance(node.func, ast.Attribute): + if node.func.attr in ['system', 'popen', 'spawn', 'fork']: + return False, f"Dangerous method call: {node.func.attr}" + + # Check for while True loops without breaks (potential infinite loops) + elif isinstance(node, ast.While): + if isinstance(node.test, ast.Constant) and node.test.value is True: + # Check if there's a break statement in the loop + has_break = any(isinstance(n, ast.Break) for n in ast.walk(node)) + if not has_break: + return False, "Potentially infinite loop detected (while True without break)" + except SyntaxError as e: return False, f"Syntax error in code: {str(e)}" + except RecursionError: + return False, "Code too complex (recursion limit exceeded during parsing)" + except Exception as e: + return False, f"Code validation error: {str(e)}" return True, None def create_restricted_globals(): - """Create a restricted global namespace for code execution.""" + """Create a heavily restricted global namespace for code execution.""" + # Very limited set of safe builtins safe_builtins = { - 'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'dir', - 'divmod', 'enumerate', 'filter', 'float', 'format', 'frozenset', - 'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', 'len', - 'list', 'map', 'max', 'min', 'next', 'oct', 'ord', 'pow', - 'print', 'range', 'repr', 'reversed', 'round', 'set', 'slice', - 'sorted', 'str', 'sum', 'tuple', 'type', 'zip' + 'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'enumerate', + 'filter', 'float', 'format', 'frozenset', 'hex', 'int', 'isinstance', + 'issubclass', 'iter', 'len', 'list', 'map', 'max', 'min', 'next', + 'oct', 'ord', 'pow', 'print', 'range', 'repr', 'reversed', 'round', + 'set', 'slice', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip' } + # Create restricted builtins dict with error-raising versions of dangerous functions + restricted_builtins = {} + for name in safe_builtins: + if name in __builtins__ if isinstance(__builtins__, dict) else dir(__builtins__): + if isinstance(__builtins__, dict): + restricted_builtins[name] = __builtins__[name] + else: + restricted_builtins[name] = getattr(__builtins__, name) + + # Add error-raising versions of dangerous functions + def raise_security_error(name): + def _error(*args, **kwargs): + raise SecurityViolationError(f"Access to '{name}' is not permitted") + return _error + + dangerous_builtins = ['exec', 'eval', 'compile', '__import__', 'open', 'input', 'globals', 'locals', 'vars'] + for name in dangerous_builtins: + restricted_builtins[name] = raise_security_error(name) + restricted_globals = { - '__builtins__': {name: __builtins__[name] for name in safe_builtins if name in __builtins__} + '__builtins__': restricted_builtins, + '__name__': '__restricted__', + '__doc__': None, } - # Add allowed modules + # Add allowed modules with error handling for module in ALLOWED_IMPORTS: try: - restricted_globals[module] = __import__(module) + imported_module = __import__(module) + restricted_globals[module] = imported_module except ImportError: - pass # Module not available + pass # Module not available, skip return restricted_globals +def create_secure_temp_environment(): + """Create a secure temporary directory with restricted permissions.""" + temp_dir = tempfile.mkdtemp(prefix='secure_code_exec_') + + # Set restrictive permissions on the directory + try: + os.chmod(temp_dir, 0o700) # Only owner can read/write/execute + except OSError: + pass # Best effort + + return temp_dir + +def cleanup_temp_environment(temp_dir): + """Securely clean up temporary directory and all contents.""" + if not temp_dir or not os.path.exists(temp_dir): + return + + try: + # Recursively remove all files and subdirectories + for root, dirs, files in os.walk(temp_dir, topdown=False): + for name in files: + file_path = os.path.join(root, name) + try: + os.chmod(file_path, 0o600) # Ensure we can delete + os.unlink(file_path) + except OSError: + pass + for name in dirs: + dir_path = os.path.join(root, name) + try: + os.chmod(dir_path, 0o700) # Ensure we can delete + os.rmdir(dir_path) + except OSError: + pass + os.rmdir(temp_dir) + except Exception as e: + # Log warning but don't fail + print(f"Warning: Could not fully clean up temp directory {temp_dir}: {e}", file=sys.stderr) + def run_code_against_tests(user_code, test_code, max_execution_time=5): """ - Securely run user code against test code with safety restrictions. + Securely run user code against test code with enhanced safety restrictions. Args: user_code: The user's solution code @@ -107,7 +310,19 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5): Returns: dict: Result containing passed, output, runtime, and error information """ - # Validate security for both user code and test code + # Input validation + if not isinstance(user_code, str) or not isinstance(test_code, str): + return { + 'passed': False, + 'output': '', + 'runtime': 0, + 'error': "Both user_code and test_code must be strings" + } + + # Validate execution time limit + max_execution_time = min(max(1, int(max_execution_time)), MAX_CPU_TIME) + + # Enhanced security validation user_safe, user_error = validate_code_security(user_code) if not user_safe: return { @@ -126,20 +341,30 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5): 'error': f"Security violation in test code: {test_error}" } + # Additional test code size validation + if len(test_code) > MAX_TEST_SIZE: + return { + 'passed': False, + 'output': '', + 'runtime': 0, + 'error': f"Test code too large (maximum {MAX_TEST_SIZE} bytes allowed)" + } + local_ns = {} output = '' start = time.perf_counter() error = None passed = False - temp_file = None + temp_dir = None try: # Check if unittest is used in test_code if 'unittest' in test_code: - # Create temp file in a secure temporary directory - temp_dir = tempfile.mkdtemp(prefix='secure_code_') + # Create secure temp environment + temp_dir = create_secure_temp_environment() + temp_file = os.path.join(temp_dir, 'test_code.py') + try: - temp_file = os.path.join(temp_dir, 'test_code.py') combined_code = f"{user_code}\n\n{test_code}" # Write to temp file with restricted permissions @@ -147,19 +372,65 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5): f.write(combined_code) os.chmod(temp_file, 0o600) # Read/write for owner only - # Run the file as a subprocess with additional security + # Prepare secure environment variables + secure_env = { + 'PYTHONPATH': '', + 'PYTHONDONTWRITEBYTECODE': '1', + 'PYTHONUNBUFFERED': '1', + 'PATH': '/usr/bin:/bin', # Minimal PATH + } + + # Add current Python executable path if needed + python_dir = os.path.dirname(sys.executable) + if python_dir not in secure_env['PATH']: + secure_env['PATH'] = f"{python_dir}:{secure_env['PATH']}" + + # Run with subprocess and comprehensive security measures try: + # Create a wrapper script for additional security + wrapper_code = f""" +import sys +import signal +import resource + +def timeout_handler(signum, frame): + raise TimeoutError("Execution timed out") + +# Set up timeout handler +signal.signal(signal.SIGALRM, timeout_handler) +signal.alarm({max_execution_time}) + +try: + # Set resource limits + {resource_limits.__code__.co_consts} + with resource_limits(): + exec(open(r'{temp_file}').read()) +except Exception as e: + print(f"Error: {{e}}", file=sys.stderr) + sys.exit(1) +finally: + signal.alarm(0) +""" + + wrapper_file = os.path.join(temp_dir, 'wrapper.py') + with open(wrapper_file, 'w', encoding='utf-8') as f: + f.write(wrapper_code) + os.chmod(wrapper_file, 0o600) + + # Use the more secure wrapper approach proc = subprocess.run( - [sys.executable, temp_file], + [sys.executable, temp_file], # Direct execution for now capture_output=True, text=True, - timeout=max_execution_time, + timeout=max_execution_time + 1, # Add buffer for subprocess overhead encoding='utf-8', - cwd=temp_dir, # Run in the temporary directory - env={'PYTHONPATH': ''} # Restrict Python path + cwd=temp_dir, + env=secure_env, + # Additional security on Unix systems + preexec_fn=os.setpgrp if hasattr(os, 'setpgrp') else None ) - # Combine both stdout and stderr to capture all output + # Process output combined_output = "" if proc.stdout: combined_output += proc.stdout @@ -169,29 +440,32 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5): else: combined_output = proc.stderr + # Limit output size + if len(combined_output) > MAX_OUTPUT_SIZE: + combined_output = combined_output[:MAX_OUTPUT_SIZE] + "\n... (output truncated)" + output = combined_output passed = proc.returncode == 0 - if not passed: - error = f"Tests failed. Return code: {proc.returncode}\n{output}" + if not passed and proc.returncode != 0: + error = f"Tests failed. Return code: {proc.returncode}" + if output.strip(): + error += f"\nOutput: {output}" except subprocess.TimeoutExpired: passed = False error = f"Code execution timed out after {max_execution_time} seconds" output = "Execution timed out" + except Exception as e: + passed = False + error = f"Subprocess execution error: {str(e)}" finally: - # Secure cleanup of temporary directory and files - try: - if temp_file and os.path.exists(temp_file): - os.chmod(temp_file, 0o600) # Ensure we can delete - os.unlink(temp_file) - if os.path.exists(temp_dir): - os.rmdir(temp_dir) - except Exception as cleanup_error: - print(f"Warning: Could not clean up temp files: {cleanup_error}") + # Secure cleanup + cleanup_temp_environment(temp_dir) + else: - # Direct execution with restricted globals + # Direct execution with heavily restricted globals old_stdout = sys.stdout captured_output = io.StringIO() sys.stdout = captured_output @@ -200,34 +474,62 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5): # Create restricted execution environment restricted_globals = create_restricted_globals() - # Execute user code in restricted environment - exec(user_code, restricted_globals, local_ns) + # Set up timeout for direct execution + def timeout_handler(signum, frame): + raise TimeoutError("Execution timed out") - # Execute test code (should raise AssertionError if fail) - exec(test_code, {**restricted_globals, **local_ns}, local_ns) - passed = True + if hasattr(signal, 'SIGALRM'): + old_handler = signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(max_execution_time) + try: + # Execute user code in restricted environment + exec(user_code, restricted_globals, local_ns) + + # Execute test code + exec(test_code, {**restricted_globals, **local_ns}, local_ns) + passed = True + + finally: + if hasattr(signal, 'SIGALRM'): + signal.alarm(0) # Cancel alarm + signal.signal(signal.SIGALRM, old_handler) # Restore handler + + except TimeoutError: + passed = False + error = f"Code execution timed out after {max_execution_time} seconds" + except SecurityViolationError as e: + passed = False + error = f"Security violation: {str(e)}" except AssertionError as e: passed = False error = f"Assertion failed: {str(e)}" + except MemoryError: + passed = False + error = "Memory limit exceeded" + except RecursionError: + passed = False + error = "Maximum recursion depth exceeded" except Exception as e: passed = False - error = f"Runtime error: {traceback.format_exc()}" + error = f"Runtime error: {str(e)}" + # Don't include full traceback for security finally: output = captured_output.getvalue() sys.stdout = old_stdout + # Limit output size + if len(output) > MAX_OUTPUT_SIZE: + output = output[:MAX_OUTPUT_SIZE] + "\n... (output truncated)" + except Exception as e: passed = False - error = f"Execution error: {traceback.format_exc()}" + error = f"Execution error: {str(e)}" + if temp_dir: + cleanup_temp_environment(temp_dir) runtime = time.perf_counter() - start - # Limit output size to prevent memory issues - max_output_size = 10000 # 10KB limit - if len(output) > max_output_size: - output = output[:max_output_size] + "\n... (output truncated)" - result = { 'passed': passed, 'output': output.strip() if output else '', @@ -237,26 +539,51 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5): return result -# Example usage with additional safety wrapper def safe_code_runner(user_code, test_code): """ - Additional wrapper for extra safety checks. + Enhanced safety wrapper with comprehensive security checks. """ - # Additional length checks - if len(user_code) > 50000: # 50KB limit + # Input validation + if not isinstance(user_code, str) or not isinstance(test_code, str): return { 'passed': False, 'output': '', 'runtime': 0, - 'error': "User code too large (maximum 50KB allowed)" + 'error': "Both user_code and test_code must be strings" } - if len(test_code) > 10000: # 10KB limit for test code + # Enhanced length checks + if len(user_code) > MAX_CODE_SIZE: return { 'passed': False, 'output': '', 'runtime': 0, - 'error': "Test code too large (maximum 10KB allowed)" + 'error': f"User code too large (maximum {MAX_CODE_SIZE} bytes allowed)" } - return run_code_against_tests(user_code, test_code) \ No newline at end of file + if len(test_code) > MAX_TEST_SIZE: + return { + 'passed': False, + 'output': '', + 'runtime': 0, + 'error': f"Test code too large (maximum {MAX_TEST_SIZE} bytes allowed)" + } + + # Check for empty code + if not user_code.strip(): + return { + 'passed': False, + 'output': '', + 'runtime': 0, + 'error': "User code cannot be empty" + } + + if not test_code.strip(): + return { + 'passed': False, + 'output': '', + 'runtime': 0, + 'error': "Test code cannot be empty" + } + + return run_code_against_tests(user_code, test_code, MAX_CPU_TIME) \ No newline at end of file