stashcom
This commit is contained in:
253
src/utils.py
253
src/utils.py
@@ -5,8 +5,127 @@ import io
|
||||
import tempfile
|
||||
import subprocess
|
||||
import os
|
||||
import re
|
||||
import ast
|
||||
|
||||
def run_code_against_tests(user_code, test_code):
|
||||
# Security configuration
|
||||
ALLOWED_IMPORTS = {
|
||||
'math', 'random', 'datetime', 'json', 'collections', 'itertools',
|
||||
'functools', 'operator', 'copy', 'unittest', 're', 'string'
|
||||
}
|
||||
|
||||
DANGEROUS_PATTERNS = [
|
||||
r'import\s+os(?:\s|$|\.)',
|
||||
r'from\s+os\s+import',
|
||||
r'import\s+subprocess(?:\s|$|\.)',
|
||||
r'from\s+subprocess\s+import',
|
||||
r'import\s+sys(?:\s|$|\.)',
|
||||
r'from\s+sys\s+import',
|
||||
r'import\s+shutil(?:\s|$|\.)',
|
||||
r'from\s+shutil\s+import',
|
||||
r'import\s+pathlib(?:\s|$|\.)',
|
||||
r'from\s+pathlib\s+import',
|
||||
r'__import__\s*\(',
|
||||
r'exec\s*\(',
|
||||
r'eval\s*\(',
|
||||
r'compile\s*\(',
|
||||
r'open\s*\(',
|
||||
r'file\s*\(',
|
||||
r'input\s*\(',
|
||||
r'raw_input\s*\(',
|
||||
r'\.unlink\s*\(',
|
||||
r'\.remove\s*\(',
|
||||
r'\.rmdir\s*\(',
|
||||
r'\.rmtree\s*\(',
|
||||
r'\.delete\s*\(',
|
||||
r'\.kill\s*\(',
|
||||
r'\.terminate\s*\(',
|
||||
]
|
||||
|
||||
def validate_code_security(code):
|
||||
"""
|
||||
Validates code for security issues.
|
||||
Returns (is_safe, error_message)
|
||||
"""
|
||||
# Check for dangerous patterns
|
||||
for pattern in DANGEROUS_PATTERNS:
|
||||
if re.search(pattern, code, re.IGNORECASE):
|
||||
return False, f"Dangerous operation detected: {pattern}"
|
||||
|
||||
# Parse AST to check imports
|
||||
try:
|
||||
tree = ast.parse(code)
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
module_name = alias.name.split('.')[0]
|
||||
if module_name not in ALLOWED_IMPORTS:
|
||||
return False, f"Import not allowed: {module_name}"
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
if node.module:
|
||||
module_name = node.module.split('.')[0]
|
||||
if module_name not in ALLOWED_IMPORTS:
|
||||
return False, f"Import not allowed: {module_name}"
|
||||
except SyntaxError as e:
|
||||
return False, f"Syntax error in code: {str(e)}"
|
||||
|
||||
return True, None
|
||||
|
||||
def create_restricted_globals():
|
||||
"""Create a restricted global namespace for code execution."""
|
||||
safe_builtins = {
|
||||
'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'dir',
|
||||
'divmod', 'enumerate', 'filter', 'float', 'format', 'frozenset',
|
||||
'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', 'len',
|
||||
'list', 'map', 'max', 'min', 'next', 'oct', 'ord', 'pow',
|
||||
'print', 'range', 'repr', 'reversed', 'round', 'set', 'slice',
|
||||
'sorted', 'str', 'sum', 'tuple', 'type', 'zip'
|
||||
}
|
||||
|
||||
restricted_globals = {
|
||||
'__builtins__': {name: __builtins__[name] for name in safe_builtins if name in __builtins__}
|
||||
}
|
||||
|
||||
# Add allowed modules
|
||||
for module in ALLOWED_IMPORTS:
|
||||
try:
|
||||
restricted_globals[module] = __import__(module)
|
||||
except ImportError:
|
||||
pass # Module not available
|
||||
|
||||
return restricted_globals
|
||||
|
||||
def run_code_against_tests(user_code, test_code, max_execution_time=5):
|
||||
"""
|
||||
Securely run user code against test code with safety restrictions.
|
||||
|
||||
Args:
|
||||
user_code: The user's solution code
|
||||
test_code: The test code to validate the solution
|
||||
max_execution_time: Maximum execution time in seconds (default: 5)
|
||||
|
||||
Returns:
|
||||
dict: Result containing passed, output, runtime, and error information
|
||||
"""
|
||||
# Validate security for both user code and test code
|
||||
user_safe, user_error = validate_code_security(user_code)
|
||||
if not user_safe:
|
||||
return {
|
||||
'passed': False,
|
||||
'output': '',
|
||||
'runtime': 0,
|
||||
'error': f"Security violation in user code: {user_error}"
|
||||
}
|
||||
|
||||
test_safe, test_error = validate_code_security(test_code)
|
||||
if not test_safe:
|
||||
return {
|
||||
'passed': False,
|
||||
'output': '',
|
||||
'runtime': 0,
|
||||
'error': f"Security violation in test code: {test_error}"
|
||||
}
|
||||
|
||||
local_ns = {}
|
||||
output = ''
|
||||
start = time.perf_counter()
|
||||
@@ -17,60 +136,83 @@ def run_code_against_tests(user_code, test_code):
|
||||
try:
|
||||
# Check if unittest is used in test_code
|
||||
if 'unittest' in test_code:
|
||||
# Write user code + test code to a temp file
|
||||
with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f:
|
||||
combined_code = f"{user_code}\n\n{test_code}"
|
||||
f.write(combined_code)
|
||||
f.flush()
|
||||
temp_file = f.name
|
||||
|
||||
# Run the file as a subprocess
|
||||
# Create temp file in a secure temporary directory
|
||||
temp_dir = tempfile.mkdtemp(prefix='secure_code_')
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
[sys.executable, temp_file],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
encoding='utf-8'
|
||||
)
|
||||
output = proc.stdout
|
||||
if proc.stderr:
|
||||
output += f"\n{proc.stderr}"
|
||||
temp_file = os.path.join(temp_dir, 'test_code.py')
|
||||
combined_code = f"{user_code}\n\n{test_code}"
|
||||
|
||||
passed = proc.returncode == 0
|
||||
if not passed:
|
||||
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
|
||||
else:
|
||||
# For successful unittest runs, the stderr contains the test results
|
||||
if proc.stderr and "OK" in proc.stderr:
|
||||
output = proc.stderr # Use stderr as the main output for unittest
|
||||
# Write to temp file with restricted permissions
|
||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||
f.write(combined_code)
|
||||
os.chmod(temp_file, 0o600) # Read/write for owner only
|
||||
|
||||
# Run the file as a subprocess with additional security
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
[sys.executable, temp_file],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=max_execution_time,
|
||||
encoding='utf-8',
|
||||
cwd=temp_dir, # Run in the temporary directory
|
||||
env={'PYTHONPATH': ''} # Restrict Python path
|
||||
)
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
passed = False
|
||||
error = "Code execution timed out after 10 seconds"
|
||||
output = "Execution timed out"
|
||||
|
||||
# Combine both stdout and stderr to capture all output
|
||||
combined_output = ""
|
||||
if proc.stdout:
|
||||
combined_output += proc.stdout
|
||||
if proc.stderr:
|
||||
if combined_output:
|
||||
combined_output += "\n" + proc.stderr
|
||||
else:
|
||||
combined_output = proc.stderr
|
||||
|
||||
output = combined_output
|
||||
passed = proc.returncode == 0
|
||||
|
||||
if not passed:
|
||||
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
passed = False
|
||||
error = f"Code execution timed out after {max_execution_time} seconds"
|
||||
output = "Execution timed out"
|
||||
|
||||
finally:
|
||||
# Secure cleanup of temporary directory and files
|
||||
try:
|
||||
if temp_file and os.path.exists(temp_file):
|
||||
os.chmod(temp_file, 0o600) # Ensure we can delete
|
||||
os.unlink(temp_file)
|
||||
if os.path.exists(temp_dir):
|
||||
os.rmdir(temp_dir)
|
||||
except Exception as cleanup_error:
|
||||
print(f"Warning: Could not clean up temp files: {cleanup_error}")
|
||||
else:
|
||||
# Capture stdout
|
||||
# Direct execution with restricted globals
|
||||
old_stdout = sys.stdout
|
||||
captured_output = io.StringIO()
|
||||
sys.stdout = captured_output
|
||||
|
||||
try:
|
||||
# Execute user code
|
||||
exec(user_code, {}, local_ns)
|
||||
# Create restricted execution environment
|
||||
restricted_globals = create_restricted_globals()
|
||||
|
||||
# Execute user code in restricted environment
|
||||
exec(user_code, restricted_globals, local_ns)
|
||||
|
||||
# Execute test code (should raise AssertionError if fail)
|
||||
exec(test_code, local_ns, local_ns)
|
||||
exec(test_code, {**restricted_globals, **local_ns}, local_ns)
|
||||
passed = True
|
||||
|
||||
except AssertionError as e:
|
||||
passed = False
|
||||
error = f"Assertion failed: {str(e)}"
|
||||
|
||||
except Exception as e:
|
||||
passed = False
|
||||
error = f"Runtime error: {traceback.format_exc()}"
|
||||
|
||||
finally:
|
||||
output = captured_output.getvalue()
|
||||
sys.stdout = old_stdout
|
||||
@@ -78,17 +220,14 @@ def run_code_against_tests(user_code, test_code):
|
||||
except Exception as e:
|
||||
passed = False
|
||||
error = f"Execution error: {traceback.format_exc()}"
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
if temp_file and os.path.exists(temp_file):
|
||||
try:
|
||||
os.unlink(temp_file)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not delete temp file {temp_file}: {e}")
|
||||
|
||||
runtime = time.perf_counter() - start
|
||||
|
||||
# Limit output size to prevent memory issues
|
||||
max_output_size = 10000 # 10KB limit
|
||||
if len(output) > max_output_size:
|
||||
output = output[:max_output_size] + "\n... (output truncated)"
|
||||
|
||||
result = {
|
||||
'passed': passed,
|
||||
'output': output.strip() if output else '',
|
||||
@@ -97,3 +236,27 @@ def run_code_against_tests(user_code, test_code):
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
# Example usage with additional safety wrapper
|
||||
def safe_code_runner(user_code, test_code):
|
||||
"""
|
||||
Additional wrapper for extra safety checks.
|
||||
"""
|
||||
# Additional length checks
|
||||
if len(user_code) > 50000: # 50KB limit
|
||||
return {
|
||||
'passed': False,
|
||||
'output': '',
|
||||
'runtime': 0,
|
||||
'error': "User code too large (maximum 50KB allowed)"
|
||||
}
|
||||
|
||||
if len(test_code) > 10000: # 10KB limit for test code
|
||||
return {
|
||||
'passed': False,
|
||||
'output': '',
|
||||
'runtime': 0,
|
||||
'error': "Test code too large (maximum 10KB allowed)"
|
||||
}
|
||||
|
||||
return run_code_against_tests(user_code, test_code)
|
||||
Reference in New Issue
Block a user