13 Commits

Author SHA1 Message Date
4ae5f12633 Your fixed commit message here 2025-08-22 20:06:27 +02:00
1ef6bdfd80 creater 2025-08-20 19:48:45 +02:00
f28a6b36ef epileptic turtles 2025-08-18 21:59:03 +02:00
6c8f34acb9 fixed some shit ; added pagination 2025-08-18 21:52:54 +02:00
27f955151a hardnened 2025-08-18 21:16:46 +02:00
f3baec17e4 new problem and i added better support for sorting and a overall better script 2025-08-17 18:58:12 +02:00
996cfd2821 rewrote 2025-08-17 16:04:13 +02:00
2577c96258 Update .gitattributes 2025-08-17 13:45:16 +00:00
9532213adf update some bs
+ readme
+ some change in the bash script using ports
2025-08-17 15:43:11 +02:00
734ec1dc73 Update readme.md 2025-08-17 12:25:00 +00:00
e8e1b82d6b ignore stuff 2025-08-17 12:24:48 +02:00
8dd5fcbeb7 md for database as it was fucked up in windows script 2025-08-17 12:19:18 +02:00
b6ab591054 Merge pull request 'darkmode' (#2) from darkmode into main
Reviewed-on: #2
2025-08-17 09:50:57 +00:00
26 changed files with 3367 additions and 1112 deletions

12
.gitattributes vendored Normal file
View File

@@ -0,0 +1,12 @@
# Detect Markdown files
*.md linguist-language=Markdown
# Ignore CSS files for language detection
*.css linguist-vendored
# Some LF stuff so that linux works ; windows can fuck off
* text=auto eol=lf
*.png binary
*.jpg binary
*.gif binary
*.zip binary

902
qtc.py Normal file
View File

@@ -0,0 +1,902 @@
import sys
import json
import os
import logging
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTabWidget,
QLabel, QLineEdit, QComboBox, QTextEdit, QPushButton, QStatusBar,
QMessageBox, QDialog, QListWidget, QSplitter, QFrame, QSizePolicy,
QScrollArea, QFileDialog, QToolTip
)
from PyQt6.QtCore import Qt, QSize, pyqtSignal, QTimer
from PyQt6.QtGui import QFont, QTextOption, QSyntaxHighlighter, QTextCharFormat, QColor, QTextCursor, QKeyEvent, QTextDocument
from PyQt6.QtWebEngineWidgets import QWebEngineView
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("ProblemCreator")
# Optional imports with proper error handling
OPTIONAL_DEPENDENCIES = {}
try:
import markdown
OPTIONAL_DEPENDENCIES['markdown'] = True
except ImportError:
logger.warning("Markdown library not available, using basic conversion")
OPTIONAL_DEPENDENCIES['markdown'] = False
try:
from pygments import lex
from pygments.lexers import PythonLexer
from pygments.styles import get_style_by_name
OPTIONAL_DEPENDENCIES['pygments'] = True
except ImportError:
logger.warning("Pygments not available, syntax highlighting will be disabled")
OPTIONAL_DEPENDENCIES['pygments'] = False
class CodeEditor(QTextEdit):
"""A code editor with syntax highlighting and advanced auto-indentation."""
def __init__(self, parent=None):
super().__init__(parent)
self.setFont(QFont("Consolas", 10))
self.indent_width = 4
self.setTabStopDistance(QFont("Consolas", 10).pointSize() * self.indent_width)
# Setup syntax highlighting if available
if OPTIONAL_DEPENDENCIES.get('pygments', False):
self.highlighter = PythonHighlighter(self.document())
# Tips for test case editing
self.tips = [
"💡 Tip: Use descriptive test method names like test_empty_input()",
"💡 Tip: Test edge cases like empty inputs, large inputs, and invalid inputs",
"💡 Tip: Use assertEqual for exact matches, assertTrue/False for boolean checks",
"💡 Tip: Test both valid and invalid inputs to ensure robustness",
"💡 Tip: Consider using setUp() method for common test setup",
"💡 Tip: Use parameterized tests if you have many similar test cases",
"💡 Tip: Make sure your tests are independent of each other",
"💡 Tip: Test not only for correct outputs but also for proper error handling"
]
self.current_tip_index = 0
self.tip_timer = QTimer(self)
self.tip_timer.timeout.connect(self.show_next_tip)
self.tip_timer.start(10000) # Show a new tip every 10 seconds
def show_next_tip(self):
"""Show the next tip in the status bar."""
if self.parent() and hasattr(self.parent().parent().parent().parent(), 'statusBar'):
status_bar = self.parent().parent().parent().parent().statusBar()
if status_bar:
tip = self.tips[self.current_tip_index]
status_bar.showMessage(tip)
self.current_tip_index = (self.current_tip_index + 1) % len(self.tips)
def keyPressEvent(self, event: QKeyEvent):
"""Handle key press events for auto-indentation and pairing."""
key = event.key()
modifiers = event.modifiers()
cursor = self.textCursor()
# Tab key
if key == Qt.Key.Key_Tab:
if cursor.hasSelection():
self.indentSelection()
else:
# Insert spaces
cursor.insertText(" " * self.indent_width)
return
# Shift+Tab key
elif key == Qt.Key.Key_Backtab:
if cursor.hasSelection():
self.dedentSelection()
else:
self.dedentLine()
return
# Return key
elif key == Qt.Key.Key_Return:
# Get current line
cursor.movePosition(QTextCursor.MoveOperation.StartOfLine)
cursor.movePosition(QTextCursor.MoveOperation.EndOfLine, QTextCursor.MoveMode.KeepAnchor)
line_text = cursor.selectedText()
# Calculate indentation
indent = len(line_text) - len(line_text.lstrip())
# Check if line ends with colon
ends_with_colon = line_text.rstrip().endswith(':')
# Insert newline with indentation
cursor = self.textCursor()
cursor.insertText("\n" + " " * indent)
# Add extra indentation if line ended with colon
if ends_with_colon:
cursor.insertText(" " * self.indent_width)
return
# Auto-pairing
elif key == Qt.Key.Key_ParenLeft:
cursor.insertText("()")
cursor.movePosition(QTextCursor.MoveOperation.Left)
self.setTextCursor(cursor)
return
elif key == Qt.Key.Key_BracketLeft:
cursor.insertText("[]")
cursor.movePosition(QTextCursor.MoveOperation.Left)
self.setTextCursor(cursor)
return
elif key == Qt.Key.Key_BraceLeft:
cursor.insertText("{}")
cursor.movePosition(QTextCursor.MoveOperation.Left)
self.setTextCursor(cursor)
return
elif key == Qt.Key.Key_QuoteDbl:
cursor.insertText('""')
cursor.movePosition(QTextCursor.MoveOperation.Left)
self.setTextCursor(cursor)
return
elif key == Qt.Key.Key_Apostrophe:
cursor.insertText("''")
cursor.movePosition(QTextCursor.MoveOperation.Left)
self.setTextCursor(cursor)
return
elif key == Qt.Key.Key_Colon and modifiers == Qt.KeyboardModifier.NoModifier:
# Check if we're at the end of the line
cursor.movePosition(QTextCursor.MoveOperation.EndOfLine)
if self.textCursor().position() == cursor.position():
cursor.insertText(":")
return
# Default behavior
super().keyPressEvent(event)
def indentSelection(self):
"""Indent all selected lines."""
cursor = self.textCursor()
start = cursor.selectionStart()
end = cursor.selectionEnd()
# Move to start of selection
cursor.setPosition(start)
cursor.movePosition(QTextCursor.MoveOperation.StartOfLine)
# Indent each line in selection
while cursor.position() <= end:
cursor.insertText(" " * self.indent_width)
end += self.indent_width
if not cursor.movePosition(QTextCursor.MoveOperation.Down):
break
# Restore selection
cursor.setPosition(start)
cursor.setPosition(end, QTextCursor.MoveMode.KeepAnchor)
self.setTextCursor(cursor)
def dedentSelection(self):
"""Dedent all selected lines."""
cursor = self.textCursor()
start = cursor.selectionStart()
end = cursor.selectionEnd()
# Move to start of selection
cursor.setPosition(start)
cursor.movePosition(QTextCursor.MoveOperation.StartOfLine)
# Dedent each line in selection
while cursor.position() <= end:
# Check for spaces at beginning of line
line_start = cursor.position()
cursor.movePosition(QTextCursor.MoveOperation.EndOfLine, QTextCursor.MoveMode.KeepAnchor)
line_text = cursor.selectedText()
# Count leading spaces
leading_spaces = min(len(line_text) - len(line_text.lstrip()), self.indent_width)
if leading_spaces > 0:
# Remove leading spaces
cursor.setPosition(line_start)
cursor.movePosition(QTextCursor.MoveOperation.Right, QTextCursor.MoveMode.KeepAnchor, leading_spaces)
cursor.removeSelectedText()
end -= leading_spaces
if not cursor.movePosition(QTextCursor.MoveOperation.Down):
break
# Restore selection
cursor.setPosition(max(0, start - self.indent_width))
cursor.setPosition(max(0, end - self.indent_width), QTextCursor.MoveMode.KeepAnchor)
self.setTextCursor(cursor)
def dedentLine(self):
"""Dedent the current line."""
cursor = self.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.StartOfLine)
# Check for spaces at beginning of line
line_start = cursor.position()
cursor.movePosition(QTextCursor.MoveOperation.EndOfLine, QTextCursor.MoveMode.KeepAnchor)
line_text = cursor.selectedText()
# Count leading spaces
leading_spaces = min(len(line_text) - len(line_text.lstrip()), self.indent_width)
if leading_spaces > 0:
# Remove leading spaces
cursor.setPosition(line_start)
cursor.movePosition(QTextCursor.MoveOperation.Right, QTextCursor.MoveMode.KeepAnchor, leading_spaces)
cursor.removeSelectedText()
class PythonHighlighter(QSyntaxHighlighter):
"""Syntax highlighter for Python code using Pygments."""
def __init__(self, document):
super().__init__(document)
self._setup_formats()
def _setup_formats(self):
"""Setup text formats for different token types."""
self.formats = {}
# Define syntax highlighting formats
keyword_format = QTextCharFormat()
keyword_format.setForeground(QColor("#0000FF"))
keyword_format.setFontWeight(QFont.Weight.Bold)
self.formats['keyword'] = keyword_format
string_format = QTextCharFormat()
string_format.setForeground(QColor("#008000"))
self.formats['string'] = string_format
comment_format = QTextCharFormat()
comment_format.setForeground(QColor("#808080"))
comment_format.setFontItalic(True)
self.formats['comment'] = comment_format
function_format = QTextCharFormat()
function_format.setForeground(QColor("#000080"))
function_format.setFontWeight(QFont.Weight.Bold)
self.formats['function'] = function_format
number_format = QTextCharFormat()
number_format.setForeground(QColor("#FF8C00"))
self.formats['number'] = number_format
# Python keywords
self.keywords = [
'and', 'as', 'assert', 'break', 'class', 'continue', 'def', 'del',
'elif', 'else', 'except', 'False', 'finally', 'for', 'from', 'global',
'if', 'import', 'in', 'is', 'lambda', 'None', 'nonlocal', 'not', 'or',
'pass', 'raise', 'return', 'True', 'try', 'while', 'with', 'yield'
]
# unittest keywords
self.unittest_keywords = [
'TestCase', 'setUp', 'tearDown', 'setUpClass', 'tearDownClass',
'assertEqual', 'assertTrue', 'assertFalse', 'assertRaises',
'assertAlmostEqual', 'assertNotEqual', 'assertIn', 'assertNotIn',
'assertIs', 'assertIsNot', 'assertIsNone', 'assertIsNotNone',
'assertIsInstance', 'assertNotIsInstance', 'assertDictEqual',
'assertListEqual', 'assertTupleEqual', 'assertSetEqual',
'assertSequenceEqual', 'assertMultiLineEqual', 'assertGreater',
'assertGreaterEqual', 'assertLess', 'assertLessEqual', 'assertRegex',
'assertNotRegex', 'assertCountEqual'
]
def highlightBlock(self, text):
"""Apply syntax highlighting to the current text block."""
# Check if we should use pygments
if OPTIONAL_DEPENDENCIES.get('pygments', False):
self._highlight_with_pygments(text)
else:
self._highlight_with_basic_rules(text)
def _highlight_with_pygments(self, text):
"""Use pygments for syntax highlighting if available."""
try:
# Get the text from the current block
block = self.currentBlock()
start_pos = block.position()
end_pos = start_pos + len(text)
full_text = self.document().toPlainText()
# Lex the code and apply formats
for token, value in lex(full_text, PythonLexer()):
token_str = str(token)
token_start = full_text.find(value, start_pos)
# Skip if token is not in current block
if token_start < start_pos or token_start >= end_pos:
continue
# Calculate length within current block
token_len = min(len(value), end_pos - token_start)
# Apply appropriate format
if 'Keyword' in token_str:
self.setFormat(token_start - start_pos, token_len, self.formats['keyword'])
elif 'String' in token_str:
self.setFormat(token_start - start_pos, token_len, self.formats['string'])
elif 'Comment' in token_str:
self.setFormat(token_start - start_pos, token_len, self.formats['comment'])
elif 'Name' in token_str and 'Function' in token_str:
self.setFormat(token_start - start_pos, token_len, self.formats['function'])
elif 'Number' in token_str:
self.setFormat(token_start - start_pos, token_len, self.formats['number'])
except Exception as e:
logger.error(f"Error during pygments highlighting: {e}")
# Fall back to basic highlighting
self._highlight_with_basic_rules(text)
def _highlight_with_basic_rules(self, text):
"""Use basic rules for syntax highlighting."""
# Highlight keywords
for keyword in self.keywords + self.unittest_keywords:
pattern = r'\b' + keyword + r'\b'
index = 0
while index < len(text):
index = text.find(keyword, index)
if index == -1:
break
# Check if it's really a word (not part of another word)
if (index == 0 or not text[index-1].isalnum()) and \
(index + len(keyword) >= len(text) or not text[index + len(keyword)].isalnum()):
if keyword in self.keywords:
self.setFormat(index, len(keyword), self.formats['keyword'])
else:
self.setFormat(index, len(keyword), self.formats['function'])
index += len(keyword)
# Highlight strings
import re
string_pattern = re.compile(r'(\".*?\")|(\'.*?\')')
for match in string_pattern.finditer(text):
start, end = match.span()
self.setFormat(start, end - start, self.formats['string'])
# Highlight comments
comment_pattern = re.compile(r'#.*')
for match in comment_pattern.finditer(text):
start, end = match.span()
self.setFormat(start, end - start, self.formats['comment'])
# Highlight numbers
number_pattern = re.compile(r'\b\d+\b')
for match in number_pattern.finditer(text):
start, end = match.span()
self.setFormat(start, end - start, self.formats['number'])
class MarkdownEditor(QWidget):
"""A markdown editor with live preview."""
def __init__(self, parent=None):
super().__init__(parent)
# Create split view
self.splitter = QSplitter(Qt.Orientation.Horizontal)
layout = QVBoxLayout(self)
layout.addWidget(self.splitter)
# Left side - text editor
self.editor = QTextEdit()
self.editor.setFont(QFont("Consolas", 10))
self.editor.textChanged.connect(self.update_preview)
self.splitter.addWidget(self.editor)
# Right side - preview
self.preview = QWebEngineView()
self.splitter.addWidget(self.preview)
# Set initial sizes
self.splitter.setSizes([400, 400])
def update_preview(self):
"""Update the markdown preview."""
# Get the markdown text
markdown_text = self.editor.toPlainText()
# Convert to HTML
html_content = self._markdown_to_html(markdown_text)
# Update the preview
self.preview.setHtml(html_content)
def _markdown_to_html(self, text):
"""Convert markdown text to HTML."""
if OPTIONAL_DEPENDENCIES.get('markdown', False):
# Use the markdown library if available
html = markdown.markdown(text)
else:
# Fallback to basic conversion
html = text
html = html.replace("# ", "<h1>").replace("\n# ", "</h1>\n<h1>") + "</h1>"
html = html.replace("## ", "<h2>").replace("\n## ", "</h2>\n<h2>") + "</h2>"
html = html.replace("### ", "<h3>").replace("\n### ", "</h3>\n<h3>") + "</h3>"
html = html.replace("**", "<strong>").replace("**", "</strong>")
html = html.replace("*", "<em>").replace("*", "</em>")
html = html.replace("`", "<code>").replace("`", "</code>")
html = html.replace("\n", "<br>")
# Wrap in proper HTML structure
return f"""
<html>
<head>
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto,
Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
padding: 20px;
line-height: 1.6;
color: #333;
}}
code {{
background-color: #f6f8fa;
padding: 2px 6px;
border-radius: 4px;
font-family: 'Consolas', monospace;
}}
pre {{
background-color: #f6f8fa;
padding: 12px;
border-radius: 6px;
overflow: auto;
}}
pre code {{
background: none;
padding: 0;
}}
blockquote {{
border-left: 4px solid #ddd;
margin-left: 0;
padding-left: 16px;
color: #666;
}}
</style>
</head>
<body>
{html}
</body>
</html>
"""
def setPlainText(self, text):
"""Set the editor text."""
self.editor.setPlainText(text)
def toPlainText(self):
"""Get the editor text."""
return self.editor.toPlainText()
class LoadProblemDialog(QDialog):
"""Dialog for loading existing problems."""
def __init__(self, problems, parent=None):
super().__init__(parent)
self.setWindowTitle("Load Existing Problem")
self.setModal(True)
self.setMinimumSize(400, 300)
layout = QVBoxLayout(self)
label = QLabel("Select a problem to load:")
label.setFont(QFont("Arial", 10, QFont.Weight.Bold))
layout.addWidget(label)
self.list_widget = QListWidget()
self.list_widget.addItems(sorted(problems))
layout.addWidget(self.list_widget)
button_layout = QHBoxLayout()
self.load_button = QPushButton("Load")
self.load_button.clicked.connect(self.accept)
button_layout.addWidget(self.load_button)
self.cancel_button = QPushButton("Cancel")
self.cancel_button.clicked.connect(self.reject)
button_layout.addWidget(self.cancel_button)
layout.addLayout(button_layout)
def selected_problem(self):
"""Get the selected problem name."""
items = self.list_widget.selectedItems()
return items[0].text() if items else None
class ProblemCreatorApp(QMainWindow):
"""Main application for creating coding problems."""
def __init__(self):
super().__init__()
self.setWindowTitle("Coding Problem Creator")
self.setGeometry(100, 100, 1200, 900)
# Set default paths
self.base_path = Path("src/problems")
# Initialize UI
self.create_widgets()
self.statusBar().showMessage("Ready to create a new problem...")
def create_widgets(self):
"""Create all UI widgets."""
# Central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Main layout
main_layout = QVBoxLayout(central_widget)
# Create tab widget
self.tab_widget = QTabWidget()
main_layout.addWidget(self.tab_widget)
# Problem Info tab
self.info_tab = QWidget()
self.tab_widget.addTab(self.info_tab, "Problem Info")
self.create_info_tab()
# Markdown Description tab
self.markdown_tab = QWidget()
self.tab_widget.addTab(self.markdown_tab, "Markdown Description")
self.create_markdown_tab()
# Test Code tab
self.test_tab = QWidget()
self.tab_widget.addTab(self.test_tab, "Test Code")
self.create_test_tab()
# Buttons at the bottom
button_layout = QHBoxLayout()
self.create_button = QPushButton("Create Problem")
self.create_button.clicked.connect(self.create_problem)
button_layout.addWidget(self.create_button)
self.clear_button = QPushButton("Clear All")
self.clear_button.clicked.connect(self.clear_all)
button_layout.addWidget(self.clear_button)
self.load_button = QPushButton("Load Existing")
self.load_button.clicked.connect(self.load_existing)
button_layout.addWidget(self.load_button)
main_layout.addLayout(button_layout)
def create_info_tab(self):
"""Create the Problem Info tab."""
layout = QVBoxLayout(self.info_tab)
# Title
title_label = QLabel("Coding Problem Creator")
title_font = QFont("Arial", 16, QFont.Weight.Bold)
title_label.setFont(title_font)
layout.addWidget(title_label)
# Problem Name
name_layout = QHBoxLayout()
name_label = QLabel("Problem Name:")
name_label.setFont(QFont("Arial", 10, QFont.Weight.Bold))
name_layout.addWidget(name_label)
self.problem_name = QLineEdit()
self.problem_name.setFont(QFont("Arial", 10))
name_layout.addWidget(self.problem_name)
layout.addLayout(name_layout)
# Difficulty
difficulty_layout = QHBoxLayout()
difficulty_label = QLabel("Difficulty:")
difficulty_label.setFont(QFont("Arial", 10, QFont.Weight.Bold))
difficulty_layout.addWidget(difficulty_label)
self.difficulty = QComboBox()
self.difficulty.addItems(["easy", "medium", "hard"])
self.difficulty.setCurrentText("medium")
difficulty_layout.addWidget(self.difficulty)
difficulty_layout.addStretch()
layout.addLayout(difficulty_layout)
# Plain Text Description
desc_label = QLabel("Plain Text Description:")
desc_label.setFont(QFont("Arial", 10, QFont.Weight.Bold))
layout.addWidget(desc_label)
self.description_text = QTextEdit()
self.description_text.setFont(QFont("Arial", 10))
self.description_text.setAcceptRichText(False)
layout.addWidget(self.description_text)
def create_markdown_tab(self):
"""Create the Markdown Description tab."""
layout = QVBoxLayout(self.markdown_tab)
self.description_editor = MarkdownEditor()
layout.addWidget(self.description_editor)
def create_test_tab(self):
"""Create the Test Code tab."""
layout = QVBoxLayout(self.test_tab)
# Add tips label
tips_label = QLabel("💡 Tips for writing good test cases will appear in the status bar")
tips_label.setFont(QFont("Arial", 9))
tips_label.setStyleSheet("color: #666; padding: 5px;")
layout.addWidget(tips_label)
self.test_code_editor = CodeEditor()
layout.addWidget(self.test_code_editor)
# Insert template code
self._insert_template_code()
def _insert_template_code(self):
"""Insert template test code into the editor."""
template_code = '''import unittest
class TestSolution(unittest.TestCase):
def test_example_case(self):
"""
Test the provided example case.
"""
solution = Solution()
result = solution.solve("input")
self.assertEqual(result, "expected_output")
def test_edge_case_empty_input(self):
"""
Test with empty input.
"""
solution = Solution()
result = solution.solve("")
self.assertEqual(result, "")
def test_edge_case_large_input(self):
"""
Test with a large input to check performance.
"""
solution = Solution()
large_input = "a" * 1000
result = solution.solve(large_input)
self.assertTrue(result) # Adjust based on expected behavior
if __name__ == "__main__":
unittest.main()
'''
self.test_code_editor.setPlainText(template_code)
def validate_inputs(self):
"""Validate all form inputs."""
if not self.problem_name.text().strip():
QMessageBox.critical(self, "Error", "Problem name is required!")
return False
if not self.description_text.toPlainText().strip():
QMessageBox.critical(self, "Error", "Plain text description is required!")
return False
if not self.description_editor.toPlainText().strip():
QMessageBox.critical(self, "Error", "Markdown description is required!")
return False
test_code = self.test_code_editor.toPlainText().strip()
if not test_code or "pass" in test_code and len(test_code) < 100:
reply = QMessageBox.question(
self,
"Confirm",
"The test code seems minimal. Are you sure you want to proceed?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply == QMessageBox.StandardButton.No:
return False
# Validate problem name (should be filesystem-safe)
name = self.problem_name.text().strip()
if not name.replace("_", "").replace("-", "").replace(" ", "").isalnum():
QMessageBox.critical(
self,
"Error",
"Problem name should only contain letters, numbers, spaces, hyphens, and underscores!"
)
return False
return True
def create_problem(self):
"""Create a new problem from the form data."""
if not self.validate_inputs():
return
try:
# Get values
problem_name = self.problem_name.text().strip()
description_text = self.description_text.toPlainText().strip() # Plain text
description_md = self.description_editor.toPlainText().strip() # Markdown
difficulty = self.difficulty.currentText()
test_code = self.test_code_editor.toPlainText().strip()
# Create safe folder name (replace spaces with underscores)
folder_name = problem_name.replace(" ", "_").lower()
# Create directory structure
problem_path = self.base_path / folder_name
# Create directories if they don't exist
problem_path.mkdir(parents=True, exist_ok=True)
# Create manifest.json - Include both description fields
manifest = {
"title": problem_name,
"description": description_text, # Plain text description
"description_md": f"problems/{folder_name}/description.md", # Markdown file path
"test_code": f"problems/{folder_name}/test.py",
"difficulty": difficulty
}
manifest_path = problem_path / "manifest.json"
with open(manifest_path, 'w', encoding='utf-8') as f:
json.dump(manifest, f, indent=4, ensure_ascii=False)
# Create description.md
description_md_path = problem_path / "description.md"
with open(description_md_path, 'w', encoding='utf-8') as f:
f.write(description_md)
# Create test.py
test_py_path = problem_path / "test.py"
with open(test_py_path, 'w', encoding='utf-8') as f:
f.write(test_code)
self.statusBar().showMessage(f"✓ Problem '{problem_name}' created successfully in {problem_path}")
logger.info(f"Created problem: {problem_name} at {problem_path}")
reply = QMessageBox.question(
self,
"Success",
f"Problem '{problem_name}' created successfully!\n\n"
f"Location: {problem_path}\n\n"
"Would you like to open the folder?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply == QMessageBox.StandardButton.Yes:
self.open_folder(problem_path)
except Exception as e:
error_msg = f"Error creating problem: {str(e)}"
self.statusBar().showMessage(error_msg)
logger.error(error_msg)
QMessageBox.critical(self, "Error", error_msg)
def open_folder(self, path):
"""Cross-platform folder opening."""
try:
if sys.platform == "win32":
os.startfile(path)
elif sys.platform == "darwin": # macOS
os.system(f"open '{path}'")
else: # Linux and other Unix-like
os.system(f"xdg-open '{path}'")
except Exception as e:
error_msg = f"Could not open folder: {str(e)}"
logger.warning(error_msg)
QMessageBox.warning(self, "Warning", error_msg)
def clear_all(self):
"""Clear all form fields."""
reply = QMessageBox.question(
self,
"Confirm",
"Clear all fields?",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply == QMessageBox.StandardButton.Yes:
self.problem_name.clear()
self.description_text.clear()
self.description_editor.setPlainText("")
self.difficulty.setCurrentText("medium")
self.test_code_editor.clear()
# Re-insert template
self._insert_template_code()
self.statusBar().showMessage("All fields cleared.")
logger.info("Cleared all form fields")
def load_existing(self):
"""Load an existing problem for editing."""
try:
if not self.base_path.exists():
QMessageBox.warning(self, "Warning", "No problems directory found!")
return
# Get list of existing problems
problems = [d.name for d in self.base_path.iterdir() if d.is_dir()]
if not problems:
QMessageBox.information(self, "Info", "No existing problems found!")
return
# Create and show selection dialog
dialog = LoadProblemDialog(problems, self)
if dialog.exec() == QDialog.DialogCode.Accepted:
selected_problem = dialog.selected_problem()
if selected_problem:
self.load_problem_data(selected_problem)
except Exception as e:
error_msg = f"Error loading existing problems: {str(e)}"
logger.error(error_msg)
QMessageBox.critical(self, "Error", error_msg)
def load_problem_data(self, problem_name):
"""Load problem data into the form."""
try:
problem_path = self.base_path / problem_name
manifest_path = problem_path / "manifest.json"
test_path = problem_path / "test.py"
desc_path = problem_path / "description.md"
# Load manifest
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
# Load test code
test_code = ""
if test_path.exists():
with open(test_path, 'r', encoding='utf-8') as f:
test_code = f.read()
# Load markdown description
description_md = ""
if desc_path.exists():
with open(desc_path, 'r', encoding='utf-8') as f:
description_md = f.read()
# Load plain text description from manifest
description_text = manifest.get('description', '')
# Populate fields
self.problem_name.setText(manifest["title"])
self.description_text.setPlainText(description_text)
self.description_editor.setPlainText(description_md)
self.difficulty.setCurrentText(manifest["difficulty"])
self.test_code_editor.setPlainText(test_code)
self.statusBar().showMessage(f"Loaded problem: {problem_name}")
logger.info(f"Loaded problem: {problem_name}")
except Exception as e:
error_msg = f"Error loading problem data: {str(e)}"
logger.error(error_msg)
QMessageBox.critical(self, "Error", error_msg)
def main():
"""Main application entry point."""
app = QApplication(sys.argv)
# Set application style
app.setStyle('Fusion')
window = ProblemCreatorApp()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

178
readme.md
View File

@@ -1,20 +1,25 @@
## under construction
# QPP - Quick Problem Platfrom
### Like LeetCode
This is a lightweight, LeetCode-inspired problem-solving platform. You can run the server locally, contribute problems, and write unit tests.
but more lightweight
---
run the bash script to start the server.
## Getting Started
if you want to contribute write tests like this:
Run the provided bash/_batch_ script to start the server.
### FileStructure:
---
In /problems/ create a folder named after the problem.
## File Structure for Problems
In this folder create ```manifest.json, test.py, description.md```
Create a folder inside `/problems/` named after your problem. Each folder **must** contain:
* `manifest.json` # Dont worry you can change the info anytime to reload
* `test.py`
* `description.md`
**Example `manifest.json`:**
**Manifest.JSON needs to exsist and _needs_ to look like this:**
```json
{
"title": "Title of the Problem",
@@ -23,40 +28,147 @@ In this folder create ```manifest.json, test.py, description.md```
"difficulty": "easy || medium || hard",
"test_code": "problems/problempath/test.py"
}
```
I do know it might be a bit tedious but this is required and its the easiest way.
```
#### After you've decided on how you would name / write your Test write it like this:
> This structure is mandatory but ensures the easiest workflow.
- It is important to note that you _CAN_ write the Code the User is expected to write firstly. **BUT** after writing the UnitTest and it passing, comment out the written code.
---
It is supposed to look something like this (/sortlist/):
## Writing Problem Descriptions
* Use **simple and easy-to-understand language**. Avoid overly technical explanations.
* Syntax:
* Normal Markdown
* Start headings with `##` (looks cleaner than `#`)
* Include cross-links to resources, e.g., [W3Schools](https://www.w3schools.com/) or [Python Docs](https://docs.python.org/3/)
* Good formatting is always appreciated
---
## Developing & Running Locally
To run the backend during development:
```bash
python -m flask --app ./src/app.py --host=0.0.0.0 --port=5000
```
For production testing:
**Linux:**
```bash
python -m gunicorn -w 4 -b 0.0.0.0:8000 src.app:app
```
**Windows:**
```bat
:: Create database folder if missing
md .\src\database
python -m waitress --listen=0.0.0.0:8000 src.app:app
```
> Ensure all required packages are installed via `requirements.txt`. Python is versatile enough for this small backend.
---
### Migrating Legacy Code
When removing or refactoring legacy code:
1. Check if the code is used anywhere; if critical, branch into a testing branch first.
2. Ensure essential functionality is preserved.
---
## Committing Changes
**WE NEED FRONTED PEOPLE!!**, I have no Idea how that works, please someone ( if they are interested help )
* Ensure your editor uses **LF** line endings (`\n`) instead of CRLF.
* To automatically fix CRLF on commit:
```bash
git config core.autocrlf input
git add --renormalize .
git commit -m "Major Change"
```
* Recommended workflow:
1. Fork
2. Make changes
3. Submit a PR
4. Review & merge
> Using WSL with VS Code for development is recommended for consistent line endings on Windows.
---
## Writing Unit Tests
Follow this convention when writing unittests. **Implement the function first, then write tests.**
### Example: Phone Number Validation
**Function (`phone_validation.py`):**
```python
import re
def is_valid_phone_number(phone_number: str) -> bool:
"""Return True if phone_number matches '123-456-7890' format."""
return bool(re.search(r"^(\d{3}-){2}\d{4}$", phone_number))
```
**Unit Test (`test_phone_validation.py`):**
```python
"""
@TESTSAMPLE.PY / NAME THIS "test.py" in your actual project
"""
import unittest
from phone_validation import is_valid_phone_number
" )) First Point from the List "
# def sortlist(lst = [4,3,2,1]) -> list:
# return sorted(lst)
class TestPhoneNumberRegex(unittest.TestCase):
")) This is a 'easy' Test, if you want you can write more defined ones."
class TestSolution(unittest.TestCase):
def test_sort(self):
self.x = []
self.assertEqual(sortlist(self.x), sorted(self.x)) # pyright: ignore[reportUndefinedVariable] <- This is only here so that pyright doesnt complain ; NOT NECCESARY!
def test_if_valid(self):
test_cases = [
("123-456-7890", True),
("111-222-3333", True),
("abc-def-ghij", False),
("1234567890", False),
("123-45-67890", False),
("12-3456-7890", False),
("", False),
]
print("\nPHONE NUMBER VALIDATION TEST RESULTS")
for phone, expected in test_cases:
try:
actual = is_valid_phone_number(phone)
status = "✓ PASS" if actual == expected else "✗ FAIL"
print(f"{status} | Input: '{phone}' -> Got: {actual} | Expected: {expected}")
self.assertEqual(actual, expected)
except Exception as e:
print(f"✗ ERROR | Input: '{phone}' -> Exception: {e}")
raise
if __name__ == "__main__":
unittest.main()
unittest.main(verbosity=2)
```
#### Writing the description:
**Please** by _God_ write simple and easy to understand terms. If you write like Einstein noone is going to understand you.
### ✅ Unit Test Guidelines
- Syntax:
- Normal Markdown.
- Start with "##" instead of "#" ; "##" looks better
- Use CrossLinks ( something like [W3](https://www.w3schools.com/), or the [PyDocs](https://docs.python.org/3/))
- Good Formatting is always appreciated
1. **Class Naming:** `Test<FunctionOrModuleName>`
2. **Method Naming:** `test_<what_is_being_tested>`
3. **Use tuples** `(input, expected_output)` for test cases
4. Include **edge cases** (empty strings, wrong formats)
5. **Print results** clearly for easier debugging
6. **Catch exceptions** and display failing input before raising
### What has changed for ease of use:
If you want to really easily create or edit programs then you should look at the Qt Programm.
It basically acts as a "VsCode" of this platform. After editing / creating i would suggest you look over everything in a serious
editor. Its still realtively new.

View File

@@ -1,5 +1,11 @@
Flask>=3.0
Flask-SQLAlchemy>=3.1
Markdown>=3.6
MarkupSafe>=2.1
watchdog>=4.0
Flask>=3.0
Flask-SQLAlchemy>=3.1
Flask-Caching>=2.3.1
Markdown>=3.6
MarkupSafe>=2.1
watchdog>=4.0
gunicorn>=23.0.0
waitress>=3.0.2
pygments>=2.19.2
pyqt6>=6.9.1
PyQt6-WebEngine>=6.9.0

View File

@@ -1,17 +1,25 @@
u!/bin/bash
set -e # exit if any command fails
# Ensure QPP/database directory exists
mkdir -p src/database
python -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
export FLASK_APP=src.app
export FLASK_ENV=production
flask run --host=0.0.0.0 --port=5000
#!/bin/bash
set -e # exit if any command fails
# Ensure src/database directory exists
mkdir -p src/database
# Create virtual environment if it doesn't exist
if [ ! -d "venv" ]; then
python -m venv venv
fi
source venv/bin/activate
# Upgrade pip and install dependencies
pip install --upgrade pip
pip install -r requirements.txt
# Export environment variables
export FLASK_APP=src.app
export FLASK_ENV=production
# Run with Gunicorn
echo "Starting Flask app with Gunicorn..."
exec gunicorn -w 4 -b 0.0.0.0:8000 src.app:app

View File

@@ -1 +1,3 @@
python -m flask --app .\src\app.py run --host=0.0.0.0 --port=5000
:: make db directory and then launch the server
md .\src\database
python -m waitress --listen=0.0.0.0:8000 src.app:app

731
src/JavaScript/script.js Normal file
View File

@@ -0,0 +1,731 @@
/**
* -------------------------------------------------------------------
* Please read as a Developer:
* @file script.js
* @author rattatwinko
* @description "This is the JavaScript "frontend" File for the website.
* This handleds nearly every frontend logic / interaction
* if you want to change this, then you should be cautious.
* This is a complete mess. And its too complex to refactor.
* Atleast for me.
* @license MIT
* You can freely modify this file and distribute it as you wish.
*
* @todo
* - [] Refactor the jeriatric piece of shit code.
* ------------------------------------------------------------------
* This is the stupid fucking JavaScript, i hate this so fucking much
* why the fuck does this need to exsits, idk.
*
* CHANGELOG:
* aug18@21:51-> pagination for leaderboard ; and some shit refactoring.
*/
document.addEventListener("DOMContentLoaded", () => {
"use strict";
// Utility functions
const utils = {
safeLocalStorage: {
getItem(key) {
try {
return localStorage.getItem(key);
} catch (e) {
console.warn("localStorage.getItem failed:", e);
return null;
}
},
setItem(key, value) {
try {
localStorage.setItem(key, value);
return true;
} catch (e) {
console.warn("localStorage.setItem failed:", e);
return false;
}
}
},
debounce(func, wait) {
let timeout;
return function executedFunction(...args) {
const later = () => {
clearTimeout(timeout);
func.apply(this, args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
},
throttle(func, limit) {
let inThrottle;
return function executedFunction(...args) {
if (!inThrottle) {
func.apply(this, args);
inThrottle = true;
setTimeout(() => inThrottle = false, limit);
}
};
}
};
// Dark Mode Manager
class DarkModeManager {
constructor() {
this.darkModeToggle = document.getElementById("darkModeToggle");
this.html = document.documentElement;
this.init();
}
init() {
this.loadSavedPreference();
this.attachEventListeners();
}
loadSavedPreference() {
const savedDarkMode = utils.safeLocalStorage.getItem("darkMode");
if (
savedDarkMode === "true" ||
(savedDarkMode === null &&
window.matchMedia("(prefers-color-scheme: dark)").matches)
) {
this.html.classList.add("dark");
}
}
attachEventListeners() {
this.darkModeToggle?.addEventListener("click", () => {
this.html.classList.toggle("dark");
utils.safeLocalStorage.setItem("darkMode", this.html.classList.contains("dark"));
});
}
}
// Problem Manager
class ProblemManager {
constructor() {
this.problemSearch = document.getElementById("problemSearch");
this.problemsContainer = document.getElementById("problemsContainer");
this.problemsPagination = document.getElementById("problemsPagination");
this.problemsPrevBtn = document.getElementById("problemsPrevBtn");
this.problemsNextBtn = document.getElementById("problemsNextBtn");
this.problemsPaginationInfo = document.getElementById("problemsPaginationInfo");
this.difficultyFilter = document.getElementById("difficultyFilter");
this.sortProblems = document.getElementById("sortProblems");
this.allProblemItems = [];
this.filteredProblemItems = [];
this.currentPage = 1;
this.itemsPerPage = 5;
this.problemSort = { column: "alpha", direction: "asc" };
this.problemDescriptionPopover = null;
this.manifestCache = new Map();
this.init();
}
init() {
if (!this.problemsContainer) return;
this.initializeProblemItems();
this.attachEventListeners();
this.injectPopoverCSS();
this.attachProblemHoverEvents();
}
initializeProblemItems() {
this.allProblemItems = Array.from(
this.problemsContainer.querySelectorAll(".problem-item") || []
);
this.filteredProblemItems = this.allProblemItems.map(this.getProblemData);
this.updatePagination();
}
getProblemData = (item) => ({
element: item,
name: item.dataset.name?.toLowerCase() || "",
desc: item.dataset.desc?.toLowerCase() || "",
difficulty: item.dataset.difficulty || "",
});
updatePagination() {
const totalPages = Math.ceil(this.filteredProblemItems.length / this.itemsPerPage);
const startIndex = (this.currentPage - 1) * this.itemsPerPage;
const endIndex = startIndex + this.itemsPerPage;
// Hide all items first
this.allProblemItems.forEach((item) => {
item.style.display = "none";
});
// Show current page items
this.filteredProblemItems.slice(startIndex, endIndex).forEach((item) => {
item.element.style.display = "";
});
// Update pagination controls
if (this.problemsPrevBtn) this.problemsPrevBtn.disabled = this.currentPage <= 1;
if (this.problemsNextBtn) this.problemsNextBtn.disabled = this.currentPage >= totalPages;
if (this.problemsPaginationInfo) {
this.problemsPaginationInfo.textContent =
totalPages > 0
? `Page ${this.currentPage} of ${totalPages}`
: "No problems found";
}
this.setupPaginationLayout();
}
setupPaginationLayout() {
if (this.problemsPagination) {
Object.assign(this.problemsPagination.style, {
display: "flex",
justifyContent: "center",
position: "absolute",
left: "0",
right: "0",
bottom: "0",
margin: "0 auto",
width: "100%",
background: "inherit",
borderTop: "1px solid var(--border)",
padding: "12px 0"
});
// Style the pagination buttons and info text
const prevBtn = this.problemsPagination.querySelector('#problemsPrevBtn');
const nextBtn = this.problemsPagination.querySelector('#problemsNextBtn');
const infoText = this.problemsPagination.querySelector('#problemsPaginationInfo');
if (prevBtn) prevBtn.style.marginRight = '10px';
if (nextBtn) nextBtn.style.marginLeft = '10px';
if (infoText) infoText.style.marginTop = '2px';
this.problemsPagination.classList.remove("hidden");
}
if (this.problemsContainer?.parentElement) {
Object.assign(this.problemsContainer.parentElement.style, {
position: "relative",
paddingBottom: "56px"
});
}
}
showProblemDescription = async (item) => {
this.hideProblemDescription();
const folder = item.querySelector('a')?.getAttribute('href')?.split('/').pop();
if (!folder) return;
try {
let manifest = this.manifestCache.get(folder);
if (!manifest) {
// Try localStorage cache first
const cacheKey = `problem_manifest_${folder}`;
const cached = utils.safeLocalStorage.getItem(cacheKey);
if (cached) {
manifest = JSON.parse(cached);
this.manifestCache.set(folder, manifest);
} else {
// Fetch from API
const response = await fetch(`/api/problem_manifest/${encodeURIComponent(folder)}`);
manifest = response.ok ? await response.json() : { description: 'No description.' };
this.manifestCache.set(folder, manifest);
utils.safeLocalStorage.setItem(cacheKey, JSON.stringify(manifest));
}
}
this.createPopover(manifest.description || 'No description.', item);
} catch (error) {
console.warn("Failed to load problem description:", error);
this.createPopover('No description available.', item);
}
};
createPopover(description, item) {
this.problemDescriptionPopover = document.createElement("div");
this.problemDescriptionPopover.className = "problem-desc-popover";
this.problemDescriptionPopover.textContent = description;
document.body.appendChild(this.problemDescriptionPopover);
const rect = item.getBoundingClientRect();
Object.assign(this.problemDescriptionPopover.style, {
position: "fixed",
left: `${rect.left + window.scrollX}px`,
top: `${rect.bottom + window.scrollY + 6}px`,
zIndex: "1000",
minWidth: `${rect.width}px`
});
}
hideProblemDescription = () => {
if (this.problemDescriptionPopover) {
this.problemDescriptionPopover.remove();
this.problemDescriptionPopover = null;
}
};
attachProblemHoverEvents() {
this.allProblemItems.forEach((item) => {
item.addEventListener("mouseenter", () => this.showProblemDescription(item));
item.addEventListener("mouseleave", this.hideProblemDescription);
item.addEventListener("mousemove", this.handleMouseMove);
});
}
handleMouseMove = utils.throttle((e) => {
if (this.problemDescriptionPopover) {
this.problemDescriptionPopover.style.left = `${e.clientX + 10}px`;
}
}, 16); // ~60fps
sortProblemItems(column, direction) {
this.filteredProblemItems.sort((a, b) => {
let valueA, valueB;
switch (column) {
case "alpha":
valueA = a.name;
valueB = b.name;
break;
case "difficulty":
const difficultyOrder = { easy: 1, medium: 2, hard: 3 };
valueA = difficultyOrder[a.difficulty] || 0;
valueB = difficultyOrder[b.difficulty] || 0;
break;
default:
return 0;
}
let comparison = 0;
if (typeof valueA === "number" && typeof valueB === "number") {
comparison = valueA - valueB;
} else {
comparison = valueA < valueB ? -1 : valueA > valueB ? 1 : 0;
}
return direction === "asc" ? comparison : -comparison;
});
}
attachEventListeners() {
this.problemsPrevBtn?.addEventListener("click", () => {
if (this.currentPage > 1) {
this.currentPage--;
this.updatePagination();
}
});
this.problemsNextBtn?.addEventListener("click", () => {
const totalPages = Math.ceil(this.filteredProblemItems.length / this.itemsPerPage);
if (this.currentPage < totalPages) {
this.currentPage++;
this.updatePagination();
}
});
this.problemSearch?.addEventListener("input", utils.debounce(() => {
this.filterProblems();
this.currentPage = 1;
this.updatePagination();
}, 300));
this.difficultyFilter?.addEventListener("change", () => {
this.filterProblems();
this.currentPage = 1;
this.updatePagination();
});
this.sortProblems?.addEventListener("change", () => {
const value = this.sortProblems.value;
if (value === "alpha" || value === "difficulty") {
if (this.problemSort.column === value) {
this.problemSort.direction = this.problemSort.direction === "asc" ? "desc" : "asc";
} else {
this.problemSort.column = value;
this.problemSort.direction = "asc";
}
this.sortProblemItems(this.problemSort.column, this.problemSort.direction);
this.currentPage = 1;
this.updatePagination();
}
});
}
filterProblems() {
const searchTerm = (this.problemSearch?.value || "").toLowerCase().trim();
const difficulty = this.difficultyFilter?.value || "all";
this.filteredProblemItems = this.allProblemItems
.map(this.getProblemData)
.filter(item => {
const matchesSearch = !searchTerm ||
item.name.includes(searchTerm) ||
item.desc.includes(searchTerm);
const matchesDifficulty = difficulty === "all" ||
item.difficulty === difficulty;
return matchesSearch && matchesDifficulty;
});
}
injectPopoverCSS() {
if (document.getElementById("problem-desc-popover-style")) return;
const style = document.createElement("style");
style.id = "problem-desc-popover-style";
style.textContent = `
.problem-desc-popover {
background: var(--card, #fff);
color: var(--text, #222);
border: 1px solid var(--border, #e5e7eb);
border-radius: 8px;
box-shadow: 0 4px 16px rgba(16,24,40,0.13);
padding: 12px 16px;
font-size: 0.98rem;
max-width: 350px;
min-width: 180px;
pointer-events: none;
opacity: 0.97;
transition: opacity 0.2s;
word-break: break-word;
}
html.dark .problem-desc-popover {
background: var(--card, #1e293b);
color: var(--text, #f1f5f9);
border: 1px solid var(--border, #334155);
}
`;
document.head.appendChild(style);
}
destroy() {
// Clean up event listeners and resources
this.hideProblemDescription();
this.manifestCache.clear();
}
}
// Leaderboard Manager
class LeaderboardManager {
constructor() {
this.problemFilter = document.getElementById("problemFilter");
this.runtimeFilter = document.getElementById("runtimeFilter");
this.leaderboardBody = document.getElementById("leaderboardBody");
this.sortableHeaders = document.querySelectorAll(".sortable");
this.rankInfoBtn = document.getElementById("rankInfoBtn");
this.rankingExplanation = document.getElementById("rankingExplanation");
this.currentSort = { column: "rank", direction: "asc" };
this.allRows = [];
this.filteredRows = [];
this.currentPage = 1;
this.itemsPerPage = 5;
this.leaderboardPagination = document.createElement("div");
this.leaderboardPagination.className = "pagination-controls";
this.leaderboardPagination.style.display = "flex";
this.leaderboardPagination.style.justifyContent = "center";
this.leaderboardPagination.style.position = "absolute";
this.leaderboardPagination.style.left = 0;
this.leaderboardPagination.style.right = 0;
this.leaderboardPagination.style.bottom = 0;
this.leaderboardPagination.style.margin = "0 auto 0 auto";
this.leaderboardPagination.style.width = "100%";
this.leaderboardPagination.style.background = "inherit";
this.leaderboardPagination.style.borderTop = "1px solid var(--border)";
this.leaderboardPagination.style.padding = "12px 0";
this.leaderboardPagination.innerHTML = `
<button class="pagination-btn" id="leaderboardPrevBtn" disabled style="margin-right:10px;">← Previous</button>
<span class="pagination-info" id="leaderboardPaginationInfo" style="margin-top:2px;">Page 1 of 1</span>
<button class="pagination-btn" id="leaderboardNextBtn" disabled style="margin-left:10px;">Next →</button>
`;
this.leaderboardPrevBtn = this.leaderboardPagination.querySelector("#leaderboardPrevBtn");
this.leaderboardNextBtn = this.leaderboardPagination.querySelector("#leaderboardNextBtn");
this.leaderboardPaginationInfo = this.leaderboardPagination.querySelector("#leaderboardPaginationInfo");
this.init();
}
init() {
if (!this.leaderboardBody || this.leaderboardBody.children.length === 0) return;
this.initializeRows();
this.attachEventListeners();
this.filterLeaderboard();
this.setInitialSortIndicator();
// Insert pagination controls after leaderboard table
const leaderboardContainer = document.getElementById("leaderboardContainer");
if (leaderboardContainer && !leaderboardContainer.contains(this.leaderboardPagination)) {
leaderboardContainer.appendChild(this.leaderboardPagination);
// Ensure parent card is relatively positioned and has enough bottom padding
const leaderboardCard = leaderboardContainer.closest('.card');
if (leaderboardCard) {
leaderboardCard.style.position = "relative";
leaderboardCard.style.paddingBottom = "56px";
}
}
// Also ensure the parent card (section.card) contains the controls for correct layout
const leaderboardCard = leaderboardContainer?.closest('.card');
if (leaderboardCard && !leaderboardCard.contains(this.leaderboardPagination)) {
leaderboardCard.appendChild(this.leaderboardPagination);
}
}
initializeRows() {
this.allRows = Array.from(this.leaderboardBody.querySelectorAll("tr")).map((row, index) => ({
element: row,
user: row.dataset.user || "",
problem: row.dataset.problem || "",
runtime: parseFloat(row.dataset.runtime) || 0,
memory: parseFloat(row.dataset.memory) || 0,
timestamp: new Date(row.dataset.timestamp || Date.now()).getTime(),
language: row.dataset.language || "",
originalIndex: index,
}));
}
filterLeaderboard() {
const problemTerm = (this.problemFilter?.value || "").toLowerCase().trim();
const runtimeType = this.runtimeFilter?.value || "all";
// Filter rows
this.filteredRows = this.allRows.filter((rowData) => {
let visible = true;
if (problemTerm) {
visible = rowData.problem.toLowerCase().includes(problemTerm);
}
return visible;
});
// Apply runtime filter (best/worst per user per problem)
if (runtimeType === "best" || runtimeType === "worst") {
const userProblemGroups = {};
this.filteredRows.forEach((rowData) => {
const key = `${rowData.user}::${rowData.problem}`;
if (!userProblemGroups[key]) userProblemGroups[key] = [];
userProblemGroups[key].push(rowData);
});
this.filteredRows = Object.values(userProblemGroups).flatMap((group) => {
if (group.length <= 1) return group;
group.sort((a, b) => a.runtime - b.runtime);
const keepIndex = runtimeType === "best" ? 0 : group.length - 1;
return [group[keepIndex]];
});
}
this.currentPage = 1;
this.updateLeaderboardPagination();
}
updateLeaderboardPagination() {
const totalPages = Math.ceil(this.filteredRows.length / this.itemsPerPage) || 1;
if (this.currentPage > totalPages) this.currentPage = totalPages;
const startIndex = (this.currentPage - 1) * this.itemsPerPage;
const endIndex = startIndex + this.itemsPerPage;
// Hide all rows first
this.allRows.forEach((rowData) => {
rowData.element.style.display = "none";
});
// Show only current page rows
this.filteredRows.slice(startIndex, endIndex).forEach((rowData) => {
rowData.element.style.display = "";
});
// Update pagination controls
if (this.leaderboardPrevBtn) this.leaderboardPrevBtn.disabled = this.currentPage <= 1;
if (this.leaderboardNextBtn) this.leaderboardNextBtn.disabled = this.currentPage >= totalPages;
if (this.leaderboardPaginationInfo) {
this.leaderboardPaginationInfo.textContent =
totalPages > 0 ? `Page ${this.currentPage} of ${totalPages}` : "No entries found";
}
// Always show and center pagination at the bottom of the leaderboard card
if (this.leaderboardPagination) {
this.leaderboardPagination.classList.remove("hidden");
this.leaderboardPagination.style.display = "flex";
this.leaderboardPagination.style.justifyContent = "center";
this.leaderboardPagination.style.position = "absolute";
this.leaderboardPagination.style.left = 0;
this.leaderboardPagination.style.right = 0;
this.leaderboardPagination.style.bottom = 0;
this.leaderboardPagination.style.margin = "0 auto 0 auto";
this.leaderboardPagination.style.width = "100%";
this.leaderboardPagination.style.background = "inherit";
this.leaderboardPagination.style.borderTop = "1px solid var(--border)";
this.leaderboardPagination.style.padding = "12px 0";
}
// Make sure the parent leaderboard card is relatively positioned
const leaderboardContainer = document.getElementById("leaderboardContainer");
if (leaderboardContainer && leaderboardContainer.parentElement) {
leaderboardContainer.parentElement.style.position = "relative";
leaderboardContainer.parentElement.style.paddingBottom = "56px";
}
// Recalculate ranks for visible rows
this.calculateOverallRanking();
}
calculateOverallRanking() {
// Only consider visible rows (current page)
const visibleRows = this.filteredRows.slice(
(this.currentPage - 1) * this.itemsPerPage,
(this.currentPage - 1) * this.itemsPerPage + this.itemsPerPage
);
if (visibleRows.length === 0) return;
// Group submissions by problem to find the best performance for each
const problemBests = {};
visibleRows.forEach((rowData) => {
const problem = rowData.problem;
if (!problemBests[problem]) {
problemBests[problem] = {
bestRuntime: Infinity,
bestMemory: Infinity,
};
}
problemBests[problem].bestRuntime = Math.min(
problemBests[problem].bestRuntime,
rowData.runtime
);
problemBests[problem].bestMemory = Math.min(
problemBests[problem].bestMemory,
rowData.memory
);
});
// Calculate normalized scores for each submission
visibleRows.forEach((rowData) => {
const problemBest = problemBests[rowData.problem];
const runtimeScore =
problemBest.bestRuntime > 0
? rowData.runtime / problemBest.bestRuntime
: 1;
const memoryScore =
problemBest.bestMemory > 0
? rowData.memory / problemBest.bestMemory
: 1;
rowData.overallScore = runtimeScore * 0.7 + memoryScore * 0.3;
});
// Sort by overall score (lower is better), then by timestamp (earlier is better for ties)
visibleRows.sort((a, b) => {
const scoreDiff = a.overallScore - b.overallScore;
if (Math.abs(scoreDiff) > 0.000001) return scoreDiff;
return a.timestamp - b.timestamp;
});
// Reorder DOM elements and update ranks
const fragment = document.createDocumentFragment();
visibleRows.forEach((rowData, index) => {
fragment.appendChild(rowData.element);
// Update rank cell
const rankCell = rowData.element.cells[0];
if (rankCell) rankCell.textContent = index + 1 + (this.currentPage - 1) * this.itemsPerPage;
// Update rank classes
rowData.element.className = rowData.element.className.replace(/\brank-\d+\b/g, "");
if (index === 0) rowData.element.classList.add("rank-1");
else if (index < 3) rowData.element.classList.add("rank-top3");
});
this.leaderboardBody.appendChild(fragment);
// this.updateRankClasses(); // Function does not exist, so remove this call
}
attachEventListeners() {
// Sorting event listeners
this.sortableHeaders.forEach((header) => {
header.addEventListener("click", () => {
const column = header.dataset.sort;
if (!column) return;
// Remove sorting classes from all headers
this.sortableHeaders.forEach((h) => h.classList.remove("sort-asc", "sort-desc"));
// Toggle sort direction
if (this.currentSort.column === column) {
this.currentSort.direction = this.currentSort.direction === "asc" ? "desc" : "asc";
} else {
this.currentSort.column = column;
this.currentSort.direction = "asc";
}
// Add sorting class to current header
header.classList.add(`sort-${this.currentSort.direction}`);
// Sort filteredRows
this.filteredRows.sort((a, b) => {
let valueA = a[column];
let valueB = b[column];
if (typeof valueA === "string") valueA = valueA.toLowerCase();
if (typeof valueB === "string") valueB = valueB.toLowerCase();
let comparison = 0;
if (typeof valueA === "number" && typeof valueB === "number") {
comparison = valueA - valueB;
} else {
comparison = valueA < valueB ? -1 : valueA > valueB ? 1 : 0;
}
return this.currentSort.direction === "asc" ? comparison : -comparison;
});
this.currentPage = 1;
this.updateLeaderboardPagination();
});
});
// Filter event listeners with debouncing
this.problemFilter?.addEventListener("input", utils.debounce(() => {
this.filterLeaderboard();
}, 300));
this.runtimeFilter?.addEventListener("change", () => this.filterLeaderboard());
// Pagination event listeners
this.leaderboardPrevBtn?.addEventListener("click", () => {
if (this.currentPage > 1) {
this.currentPage--;
this.updateLeaderboardPagination();
}
});
this.leaderboardNextBtn?.addEventListener("click", () => {
const totalPages = Math.ceil(this.filteredRows.length / this.itemsPerPage) || 1;
if (this.currentPage < totalPages) {
this.currentPage++;
this.updateLeaderboardPagination();
}
});
// Rank info popout
this.rankInfoBtn?.addEventListener("click", (e) => {
e.preventDefault();
this.rankingExplanation?.classList.toggle("active");
this.rankInfoBtn?.classList.toggle("active");
});
// Close ranking explanation when clicking outside
document.addEventListener("click", (e) => {
if (
this.rankingExplanation?.classList.contains("active") &&
!this.rankingExplanation.contains(e.target) &&
!this.rankInfoBtn?.contains(e.target)
) {
this.rankingExplanation.classList.remove("active");
this.rankInfoBtn?.classList.remove("active");
}
});
}
setInitialSortIndicator() {
const defaultHeader = document.querySelector('[data-sort="rank"]');
if (defaultHeader) {
defaultHeader.classList.add("sort-asc");
}
}
}
// Initialize all managers
const darkModeManager = new DarkModeManager();
const problemManager = new ProblemManager();
const leaderboardManager = new LeaderboardManager();
// Apply dark mode to dynamically created elements
const applyDarkModeToElements = () => {
// Any additional dark mode styling for dynamically created elements can go here
};
// Watch for dark mode changes
const darkModeObserver = new MutationObserver(applyDarkModeToElements);
darkModeObserver.observe(document.documentElement, {
attributes: true,
attributeFilter: ["class"],
});
// Cleanup on page unload
window.addEventListener("beforeunload", () => {
problemManager.destroy();
darkModeObserver.disconnect();
});
});

View File

@@ -1,136 +1,166 @@
from markupsafe import Markup
from flask import Flask, render_template, request, redirect, url_for, send_from_directory
import markdown as md
import ast
from src.models import db, Problem, Solution
from src.utils import run_code_against_tests
from src.leaderboard import create_leaderboard_table, log_leaderboard, get_leaderboard
import os
## from problem_loader import load_problems_from_json, schedule_problem_reload
from src.problem_scanner import start_problem_scanner
import sqlite3
from pathlib import Path
app = Flask(__name__)
BASE_DIR = Path(__file__).parent
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{BASE_DIR / 'database' / 'db.sqlite3'}"
print(f">>>>>>>>>>>>>>>>>>>>< Using database URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
db.init_app(app)
@app.before_request
def setup():
db.create_all()
create_leaderboard_table() # Ensure leaderboard table exists
# Problems are now loaded from manifests by the background scanner. No need to load problems.json.
# Start the background thread to scan problems
start_problem_scanner()
@app.route("/script.js")
def script():
return send_from_directory("templates", "script.js")
@app.route('/favicon.ico')
def favicon():
return send_from_directory("templates", "favicon.ico")
@app.route('/')
def index():
db_path = Path(__file__).parent / 'database/problems.sqlite3'
conn = sqlite3.connect(db_path)
c = conn.cursor()
#<!-- The query was fucked up so it fetched the fucking testcode -->
c.execute('SELECT folder, description, test_code, difficulty FROM problems')
problems = c.fetchall()
conn.close()
# Get leaderboard entries
leaderboard = get_leaderboard()
# Map folder to title for display
problem_titles = {folder: folder.replace('_', ' ').title() for folder, _, _, _ in problems}
return render_template('index.html', problems=problems, leaderboard=leaderboard, problem_titles=problem_titles)
@app.route('/problem/new', methods=['GET', 'POST'])
def new_problem():
if request.method == 'POST':
title = request.form['title']
description = request.form['description']
test_code = request.form['test_code']
problem = Problem(title=title, description=description, test_code=test_code)
db.session.add(problem)
db.session.commit()
return redirect(url_for('index'))
return render_template('new_problem.html')
@app.route('/problem/<folder>', methods=['GET', 'POST'])
def view_problem(folder):
db_path = Path(__file__).parent / 'database/problems.sqlite3'
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute('SELECT folder, description,test_code , difficulty FROM problems WHERE folder = ?', (folder,))
row = c.fetchone()
conn.close()
if not row:
return 'Problem not found', 404
problem = {
'folder': row[0],
'description': row[1],
'difficulty': row[3], # now correct
'test_code': row[2], # now correct
}
result = None
if request.method == 'POST':
user_code = request.form['user_code']
username = request.form.get('username', '').strip() or 'Anonymous'
import tracemalloc
tracemalloc.start()
run_result = run_code_against_tests(user_code, problem['test_code'])
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
memory_used = peak // 1024 # in KB
# Try to get the last line number executed (even for successful runs)
line_number = None
try:
tree = ast.parse(user_code)
# Find the highest line number in the AST (for multi-function/user code)
def get_max_lineno(node):
max_lineno = getattr(node, 'lineno', 0)
for child in ast.iter_child_nodes(node):
max_lineno = max(max_lineno, get_max_lineno(child))
return max_lineno
line_number = get_max_lineno(tree)
except Exception:
pass
# If there was an error, try to get the error line number from the traceback
if run_result['error']:
tb = run_result['error']
import traceback
try:
tb_lines = traceback.extract_tb(traceback.TracebackException.from_string(tb).stack)
if tb_lines:
line_number = tb_lines[-1].lineno
except Exception:
pass
# ONLY log to leaderboard if the solution passed all tests
if run_result['passed']:
log_leaderboard(username, problem['folder'], run_result['runtime'], memory_used, line_number)
result = run_result
return render_template('problem.html', problem=problem, result=result)
@app.template_filter('markdown')
def markdown_filter(text):
return Markup(md.markdown(text or '', extensions=['extra', 'sane_lists']))
if __name__ == '__main__':
app.run(debug=True)
# API endpoint to get problem manifest (description) by folder
from markupsafe import Markup
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, jsonify
from flask_caching import Cache
import markdown as md
import ast
from src.models import db, Problem, Solution
from src.utils import run_code_against_tests
from src.leaderboard import create_leaderboard_table, log_leaderboard, get_leaderboard
import os
from src.problem_scanner import start_problem_scanner
import sqlite3
from pathlib import Path
# Config cache
config = {
"DEBUG": True,
"CACHE_TYPE": "SimpleCache",
"CACHE_DEFAULT_TIMEOUT": 300
}
app = Flask(__name__)
app.config.from_mapping(config)
cache = Cache(app)
BASE_DIR = Path(__file__).parent
app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{BASE_DIR / 'database' / 'db.sqlite3'}"
print(f"[ INFO ] : Using database URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
db.init_app(app)
@app.before_request
def setup():
db.create_all()
create_leaderboard_table() # Ensure leaderboard table exists
# Problems are loaded from manifests by the background scanner ; running on a different thread. No need to load problems.json.
# Start the background thread to scan problems
start_problem_scanner()
@app.route('/api/problem_manifest/<folder>')
def api_problem_manifest(folder):
# Try to load manifest.json from the problem folder
import json
manifest_path = BASE_DIR / 'problems' / folder / 'manifest.json'
if not manifest_path.exists():
return jsonify({'error': 'Manifest not found'}), 404
try:
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
return jsonify(manifest)
except Exception as e:
return jsonify({'error': str(e)}), 500
# I introduce you to the fucking JavaScript shit routes, fuck javascripts
@app.route('/JavaScript/<path:filename>')
@cache.cached(timeout=300)
def serve_js(filename):
return send_from_directory('JavaScript', filename)
@app.route("/script.js")
@cache.cached(timeout=300)
def script():
return send_from_directory("JavaScript", "script.js")
@app.route('/favicon.ico')
@cache.cached()
def favicon():
return send_from_directory("templates", "favicon.ico")
@app.route('/')
@cache.cached(timeout=300)
def index():
db_path = Path(__file__).parent / 'database/problems.sqlite3'
conn = sqlite3.connect(db_path)
c = conn.cursor()
#<!-- The query was fucked up so it fetched the fucking testcode -->
c.execute('SELECT folder, description, test_code, difficulty FROM problems')
problems = c.fetchall()
conn.close()
# Get leaderboard entries
leaderboard = get_leaderboard()
# Map folder to title for display
problem_titles = {folder: folder.replace('_', ' ').title() for folder, _, _, _ in problems}
return render_template('index.html', problems=problems, leaderboard=leaderboard, problem_titles=problem_titles)
@app.route('/problem/new', methods=['GET', 'POST'])
def new_problem():
if request.method == 'POST':
title = request.form['title']
description = request.form['description']
test_code = request.form['test_code']
problem = Problem(title=title, description=description, test_code=test_code)
db.session.add(problem)
db.session.commit()
return redirect(url_for('index'))
return render_template('new_problem.html')
@app.route('/problem/<folder>', methods=['GET', 'POST'])
def view_problem(folder):
db_path = Path(__file__).parent / 'database/problems.sqlite3'
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute('SELECT folder, description,test_code , difficulty FROM problems WHERE folder = ?', (folder,))
row = c.fetchone()
conn.close()
if not row:
return 'Problem not found', 404
problem = {
'folder': row[0],
'description': row[1],
'difficulty': row[3], # now correct
'test_code': row[2], # now correct
}
result = None
if request.method == 'POST':
user_code = request.form['user_code']
username = request.form.get('username', '').strip() or 'Anonymous'
import tracemalloc
tracemalloc.start()
run_result = run_code_against_tests(user_code, problem['test_code'])
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
memory_used = peak // 1024 # in KB
# Try to get the last line number executed (even for successful runs)
line_number = None
try:
tree = ast.parse(user_code)
# Find the highest line number in the AST (for multi-function/user code)
def get_max_lineno(node):
max_lineno = getattr(node, 'lineno', 0)
for child in ast.iter_child_nodes(node):
max_lineno = max(max_lineno, get_max_lineno(child))
return max_lineno
line_number = get_max_lineno(tree)
except Exception:
pass
# If there was an error, try to get the error line number from the traceback
if run_result['error']:
tb = run_result['error']
import traceback
try:
tb_lines = traceback.extract_tb(traceback.TracebackException.from_string(tb).stack)
if tb_lines:
line_number = tb_lines[-1].lineno
except Exception:
pass
# ONLY log to leaderboard if the solution passed all tests
if run_result['passed']:
log_leaderboard(username, problem['folder'], run_result['runtime'], memory_used, line_number)
result = run_result
return render_template('problem.html', problem=problem, result=result)
@app.template_filter('markdown')
def markdown_filter(text):
return Markup(md.markdown(text or '', extensions=['extra', 'sane_lists']))
if __name__ == '__main__':
app.run(debug=True)

222
src/cache.py Normal file
View File

@@ -0,0 +1,222 @@
"""
High-performance in-memory caching module with LRU eviction policy.
"""
import time
from typing import Any, Callable, Optional, Dict, List, Tuple
import threading
import functools
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CacheEntry:
"""Represents a single cache entry with metadata."""
__slots__ = ('value', 'timestamp', 'expires_at', 'hits')
def __init__(self, value: Any, timeout: int):
self.value = value
self.timestamp = time.time()
self.expires_at = self.timestamp + timeout
self.hits = 0
def is_expired(self) -> bool:
"""Check if the cache entry has expired."""
return time.time() >= self.expires_at
def hit(self) -> None:
"""Increment the hit counter."""
self.hits += 1
class FastMemoryCache:
"""
High-performance in-memory cache with LRU eviction policy.
Thread-safe and optimized for frequent reads.
"""
def __init__(self, max_size: int = 1000, default_timeout: int = 300):
"""
Initialize the cache.
Args:
max_size: Maximum number of items to store in cache
default_timeout: Default expiration time in seconds
"""
self.max_size = max_size
self.default_timeout = default_timeout
self._cache: Dict[str, CacheEntry] = {}
self._lock = threading.RLock()
self._hits = 0
self._misses = 0
self._evictions = 0
# Start background cleaner thread
self._cleaner_thread = threading.Thread(target=self._clean_expired, daemon=True)
self._cleaner_thread.start()
def get(self, key: str) -> Optional[Any]:
"""
Get a value from the cache.
Args:
key: Cache key
Returns:
Cached value or None if not found/expired
"""
with self._lock:
entry = self._cache.get(key)
if entry is None:
self._misses += 1
return None
if entry.is_expired():
del self._cache[key]
self._misses += 1
self._evictions += 1
return None
entry.hit()
self._hits += 1
return entry.value
def set(self, key: str, value: Any, timeout: Optional[int] = None) -> None:
"""
Set a value in the cache.
Args:
key: Cache key
value: Value to cache
timeout: Optional timeout in seconds (uses default if None)
"""
if timeout is None:
timeout = self.default_timeout
with self._lock:
# Evict if cache is full (LRU policy)
if len(self._cache) >= self.max_size and key not in self._cache:
self._evict_lru()
self._cache[key] = CacheEntry(value, timeout)
def delete(self, key: str) -> bool:
"""
Delete a key from the cache.
Args:
key: Cache key to delete
Returns:
True if key was deleted, False if not found
"""
with self._lock:
if key in self._cache:
del self._cache[key]
self._evictions += 1
return True
return False
def clear(self) -> None:
"""Clear all items from the cache."""
with self._lock:
self._cache.clear()
self._evictions += len(self._cache)
def _evict_lru(self) -> None:
"""Evict the least recently used item from the cache."""
if not self._cache:
return
# Find the entry with the fewest hits (simplified LRU)
lru_key = min(self._cache.keys(), key=lambda k: self._cache[k].hits)
del self._cache[lru_key]
self._evictions += 1
def _clean_expired(self) -> None:
"""Background thread to clean expired entries."""
while True:
time.sleep(60) # Clean every minute
with self._lock:
expired_keys = [
key for key, entry in self._cache.items()
if entry.is_expired()
]
for key in expired_keys:
del self._cache[key]
self._evictions += 1
if expired_keys:
logger.info(f"Cleaned {len(expired_keys)} expired cache entries")
def get_stats(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
Dictionary with cache statistics
"""
with self._lock:
return {
'size': len(self._cache),
'hits': self._hits,
'misses': self._misses,
'hit_ratio': self._hits / (self._hits + self._misses) if (self._hits + self._misses) > 0 else 0,
'evictions': self._evictions,
'max_size': self.max_size
}
def keys(self) -> List[str]:
"""Get all cache keys."""
with self._lock:
return list(self._cache.keys())
# Global cache instance
cache = FastMemoryCache(max_size=2000, default_timeout=300)
def cached(timeout: Optional[int] = None, unless: Optional[Callable] = None):
"""
Decorator for caching function results.
Args:
timeout: Cache timeout in seconds
unless: Callable that returns True to bypass cache
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Bypass cache if unless condition is met
if unless and unless():
return func(*args, **kwargs)
# Create cache key from function name and arguments
key_parts = [func.__module__, func.__name__]
key_parts.extend(str(arg) for arg in args)
key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
key = "|".join(key_parts)
# Try to get from cache
cached_result = cache.get(key)
if cached_result is not None:
logger.info(f"Cache hit for {func.__name__}")
return cached_result
# Call function and cache result
result = func(*args, **kwargs)
cache.set(key, result, timeout)
logger.info(f"Cache miss for {func.__name__}, caching result")
return result
return wrapper
return decorator
def cache_clear() -> None:
"""Clear the entire cache."""
cache.clear()
logger.info("Cache cleared")
def cache_stats() -> Dict[str, Any]:
"""Get cache statistics."""
return cache.get_stats()

View File

@@ -1,384 +1,385 @@
import os
import time
import json
import sqlite3
import threading
import random
import tempfile
import subprocess
import sys
import traceback
import io
from pathlib import Path
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
PROBLEMS_DIR = Path(__file__).parent / 'problems'
DB_PATH = Path(__file__).parent / 'database/problems.sqlite3'
class ProblemScannerThread(threading.Thread):
def __init__(self, scan_interval=2):
super().__init__(daemon=True)
self.scan_interval = scan_interval
self.last_state = {}
self.observer = None
def create_table(self, conn):
c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;')
c.execute('''CREATE TABLE IF NOT EXISTS problems (
id INTEGER PRIMARY KEY AUTOINCREMENT,
folder TEXT,
description TEXT,
difficulty TEXT,
test_code TEXT
)''')
conn.commit()
def scan(self):
problems = []
if not PROBLEMS_DIR.exists():
print(f"Problems directory does not exist: {PROBLEMS_DIR}")
return problems
for folder in PROBLEMS_DIR.iterdir():
if folder.is_dir():
# Dynamically find manifest file (manifest.json or manifets.json)
manifest_path = None
for candidate in ["manifest.json", "manifets.json"]:
candidate_path = folder / candidate
if candidate_path.exists():
manifest_path = candidate_path
break
desc_path = folder / 'description.md'
test_path = folder / 'test.py'
# Check if required files exist
if manifest_path and desc_path.exists() and test_path.exists():
try:
with open(desc_path, 'r', encoding='utf-8') as f:
description = f.read()
with open(test_path, 'r', encoding='utf-8') as f:
test_code = f.read()
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
difficulty = manifest.get('difficulty', 'unknown')
problems.append({
'folder': folder.name,
'description': description,
'test_code': test_code,
'difficulty': difficulty
})
print(f"Found problem: {folder.name} ; Difficulty: {difficulty}")
except Exception as e:
print(f"Error reading problem files for {folder.name}: {e}")
else:
missing_files = []
if not manifest_path:
missing_files.append("manifest.json/manifets.json")
if not desc_path.exists():
missing_files.append("description.md")
if not test_path.exists():
missing_files.append("test.py")
print(f"Skipping {folder.name}: missing {', '.join(missing_files)}")
print(f"Total problems found: {len(problems)}")
return problems
def update_db(self, problems, retries=5):
for attempt in range(retries):
try:
conn = sqlite3.connect(DB_PATH, timeout=5)
c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;')
# Clear existing problems
c.execute('DELETE FROM problems')
# Insert new problems
for p in problems:
c.execute('''INSERT INTO problems
(folder, description, difficulty, test_code)
VALUES (?, ?, ?, ?)''',
(p['folder'], p['description'], p['difficulty'], p['test_code']))
conn.commit()
print(f"Updated database with {len(problems)} problems")
conn.close()
return
except sqlite3.OperationalError as e:
if 'locked' in str(e).lower():
wait_time = 0.2 + random.random() * 0.3
print(f"Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})")
time.sleep(wait_time)
else:
print(f"Database error: {e}")
raise
except Exception as e:
print(f"Unexpected error updating database: {e}")
raise
print('Failed to update problems DB after several retries due to lock.')
def rescan_and_update(self):
print("Scanning for problems...")
problems = self.scan()
self.update_db(problems)
def run(self):
print("Starting problem scanner...")
# Initial scan and table creation
try:
conn = sqlite3.connect(DB_PATH)
self.create_table(conn)
conn.close()
print("Database initialized")
except Exception as e:
print(f"Failed to initialize database: {e}")
return
# Initial scan
self.rescan_and_update()
if WATCHDOG_AVAILABLE:
print("Using watchdog for file monitoring")
class Handler(FileSystemEventHandler):
def __init__(self, scanner):
self.scanner = scanner
self.last_event_time = 0
def on_any_event(self, event):
# Debounce events to avoid too many rescans
now = time.time()
if now - self.last_event_time > 1: # Wait at least 1 second between rescans
self.last_event_time = now
print(f"File system event: {event.event_type} - {event.src_path}")
self.scanner.rescan_and_update()
event_handler = Handler(self)
self.observer = Observer()
self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True)
self.observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping problem scanner...")
finally:
self.observer.stop()
self.observer.join()
else:
print(f"Watchdog not available, using polling every {self.scan_interval}s")
# Fallback: poll every scan_interval seconds
try:
while True:
time.sleep(self.scan_interval)
self.rescan_and_update()
except KeyboardInterrupt:
print("Stopping problem scanner...")
def start_problem_scanner():
scanner = ProblemScannerThread()
scanner.start()
return scanner
# Flask model loading functions
def load_problems_from_json(json_path):
"""Load problems from JSON file into Flask database"""
if not os.path.exists(json_path):
print(f"Problem JSON file not found: {json_path}")
return
try:
with open(json_path, 'r', encoding='utf-8') as f:
problems = json.load(f)
except Exception as e:
print(f"Error reading JSON file: {e}")
return
# This assumes you have imported the necessary Flask/SQLAlchemy components
try:
from models import db, Problem
for p in problems:
# Check if problem already exists by title
existing = Problem.query.filter_by(title=p['title']).first()
# Load test code from solution file if provided
test_code = ''
if 'solution' in p and os.path.exists(p['solution']):
try:
with open(p['solution'], 'r', encoding='utf-8') as sf:
test_code = sf.read()
except Exception as e:
print(f"Error reading solution file for {p['title']}: {e}")
if existing:
existing.description = p['description']
existing.test_code = test_code
print(f"Updated problem: {p['title']}")
else:
new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code)
db.session.add(new_problem)
print(f"Added new problem: {p['title']}")
db.session.commit()
print("Successfully updated problems from JSON")
except ImportError:
print("Flask models not available - skipping JSON load")
except Exception as e:
print(f"Error loading problems from JSON: {e}")
def schedule_problem_reload(app, json_path, interval_hours=10):
"""Schedule periodic reloading of problems from JSON"""
def reload_loop():
while True:
try:
with app.app_context():
load_problems_from_json(json_path)
time.sleep(interval_hours * 3600)
except Exception as e:
print(f"Error in problem reload loop: {e}")
time.sleep(60) # Wait 1 minute before retrying
t = threading.Thread(target=reload_loop, daemon=True)
t.start()
def run_code_against_tests(user_code, test_code, timeout=10):
"""
Execute user code against test code with proper error handling.
Args:
user_code: The user's solution code
test_code: The test code to validate the solution
timeout: Maximum execution time in seconds
Returns:
dict: Result with passed, output, runtime, and error fields
"""
if not user_code or not user_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': 'No code provided'
}
if not test_code or not test_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': 'No test code available'
}
start_time = time.perf_counter()
output = ''
error = None
passed = False
temp_file = None
try:
# Check if unittest is used in test_code
if 'unittest' in test_code:
# Create temporary file with user code + test code
with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f:
# Combine user code and test code
combined_code = f"{user_code}\n\n{test_code}"
f.write(combined_code)
f.flush()
temp_file = f.name
try:
# Run the file as a subprocess with timeout
proc = subprocess.run(
[sys.executable, temp_file],
capture_output=True,
text=True,
timeout=timeout,
encoding='utf-8'
)
output = proc.stdout
if proc.stderr:
output += f"\nSTDERR:\n{proc.stderr}"
passed = proc.returncode == 0
if not passed:
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
except subprocess.TimeoutExpired:
passed = False
error = f"Code execution timed out after {timeout} seconds"
output = "Execution timed out"
else:
# Direct execution approach for simple assert-based tests
local_ns = {}
# Capture stdout
old_stdout = sys.stdout
captured_output = io.StringIO()
sys.stdout = captured_output
try:
# Execute user code first
exec(user_code, {}, local_ns)
# Execute test code in the same namespace
exec(test_code, local_ns, local_ns)
# If we get here without exceptions, tests passed
passed = True
except AssertionError as e:
passed = False
error = f"Assertion failed: {str(e)}"
except Exception as e:
passed = False
error = f"Runtime error: {traceback.format_exc()}"
finally:
output = captured_output.getvalue()
sys.stdout = old_stdout
except Exception as e:
passed = False
error = f"Execution error: {traceback.format_exc()}"
finally:
# Clean up temporary file
if temp_file and os.path.exists(temp_file):
try:
os.unlink(temp_file)
except Exception as e:
print(f"Warning: Could not delete temp file {temp_file}: {e}")
runtime = time.perf_counter() - start_time
result = {
'passed': passed,
'output': output.strip() if output else '',
'runtime': runtime,
'error': error if not passed else None
}
print(f"Test execution result: passed={passed}, runtime={runtime:.3f}s")
if error:
print(f"Error: {error}")
return result
import os
import time
import json
import sqlite3
import threading
import random
import tempfile
import subprocess
import sys
import traceback
import io
from pathlib import Path
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
PROBLEMS_DIR = Path(__file__).parent / 'problems'
DB_PATH = Path(__file__).parent / 'database/problems.sqlite3'
class ProblemScannerThread(threading.Thread):
def __init__(self, scan_interval=2):
super().__init__(daemon=True)
self.scan_interval = scan_interval
self.last_state = {}
self.observer = None
def create_table(self, conn):
c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;')
c.execute('''CREATE TABLE IF NOT EXISTS problems (
id INTEGER PRIMARY KEY AUTOINCREMENT,
folder TEXT,
description TEXT,
difficulty TEXT,
test_code TEXT
)''')
conn.commit()
def scan(self):
problems = []
if not PROBLEMS_DIR.exists():
print(f"Problems directory does not exist: {PROBLEMS_DIR}")
return problems
for folder in PROBLEMS_DIR.iterdir():
if folder.is_dir():
# Dynamically find manifest file (manifest.json or manifests.json)
manifest_path = None
for candidate in ["manifest.json", "manifests.json"]:
candidate_path = folder / candidate
if candidate_path.exists():
manifest_path = candidate_path
break
desc_path = folder / 'description.md'
test_path = folder / 'test.py'
# Check if required files exist
if manifest_path and desc_path.exists() and test_path.exists():
try:
with open(desc_path, 'r', encoding='utf-8') as f:
description = f.read()
with open(test_path, 'r', encoding='utf-8') as f:
test_code = f.read()
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
difficulty = manifest.get('difficulty', 'unknown')
problems.append({
'folder': folder.name,
'description': description,
'test_code': test_code,
'difficulty': difficulty
})
print(f"[ INFO ]: Found problem: {folder.name} ; Difficulty: {difficulty}")
except Exception as e:
print(f"[ ERROR ]: Error reading problem files for {folder.name}: {e}")
else:
missing_files = []
if not manifest_path:
missing_files.append("manifest.json/manifets.json")
if not desc_path.exists():
missing_files.append("description.md")
if not test_path.exists():
missing_files.append("test.py")
print(f"[ SKIP ]: Skipping {folder.name}: missing {', '.join(missing_files)}")
print(f"[ INFO ]: Total problems found: {len(problems)}")
return problems
def update_db(self, problems, retries=5):
for attempt in range(retries):
try:
conn = sqlite3.connect(DB_PATH, timeout=5)
c = conn.cursor()
c.execute('PRAGMA journal_mode=WAL;')
# Clear existing problems
c.execute('DELETE FROM problems')
# Insert new problems
for p in problems:
c.execute('''INSERT INTO problems
(folder, description, difficulty, test_code)
VALUES (?, ?, ?, ?)''',
(p['folder'], p['description'], p['difficulty'], p['test_code']))
conn.commit()
print(f"[ INFO ]: Updated database with {len(problems)} problems")
conn.close()
return
except sqlite3.OperationalError as e:
if 'locked' in str(e).lower():
wait_time = 0.2 + random.random() * 0.3
print(f"[ WARNING ]: Database locked, retrying in {wait_time:.2f}s (attempt {attempt + 1})")
time.sleep(wait_time)
else:
print(f"[ ERROR ]: Database error: {e}")
raise
except Exception as e:
print(f"[ ERROR ]: Unexpected error updating database: {e}")
raise
print('[ FATAL ERROR ]: Failed to update problems DB after several retries due to lock.')
def rescan_and_update(self):
print("[ INFO ]: Scanning for problems...")
problems = self.scan()
self.update_db(problems)
def run(self):
print("[ INFO ]: Starting problem scanner...")
# Initial scan and table creation
try:
conn = sqlite3.connect(DB_PATH)
self.create_table(conn)
conn.close()
print("[ INFO ]: Database initialized")
except Exception as e:
print(f"[ FATAL ERROR ]: Failed to initialize database: {e}")
return
# Initial scan
self.rescan_and_update()
if WATCHDOG_AVAILABLE:
print("[ INFO ]: Using watchdog for file monitoring")
class Handler(FileSystemEventHandler):
def __init__(self, scanner):
self.scanner = scanner
self.last_event_time = 0
def on_any_event(self, event):
# Debounce events to avoid too many rescans
now = time.time()
if now - self.last_event_time > 1: # Wait at least 1 second between rescans
self.last_event_time = now
print(f"[ FSINFO ]: File system event: {event.event_type} - {event.src_path}")
self.scanner.rescan_and_update()
event_handler = Handler(self)
self.observer = Observer()
self.observer.schedule(event_handler, str(PROBLEMS_DIR), recursive=True)
self.observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("[ KBINT_INFO ]: Stopping problem scanner...")
finally:
self.observer.stop()
self.observer.join()
else:
print(f"[ WARNING ]: Watchdog not available, using polling every {self.scan_interval}s")
# Fallback: poll every scan_interval seconds
try:
while True:
time.sleep(self.scan_interval)
self.rescan_and_update()
except KeyboardInterrupt:
print("[ KBINT_INFO ]: Stopping problem scanner...")
def start_problem_scanner():
scanner = ProblemScannerThread()
scanner.start()
return scanner
# Flask model loading functions
def load_problems_from_json(json_path):
"""Load problems from JSON file into Flask database"""
if not os.path.exists(json_path):
print(f"[ DEPRECATED_INFO ]: Problem JSON file not found: {json_path}")
print("[ SUGGESTION ]: If you dont have this do not worry. Use mainfest.json!")
return
try:
with open(json_path, 'r', encoding='utf-8') as f:
problems = json.load(f)
except Exception as e:
print(f"[ ERROR ]: Error reading JSON file: {e}")
return
# This assumes you have imported the necessary Flask/SQLAlchemy components
try:
from models import db, Problem
for p in problems:
# Check if problem already exists by title
existing = Problem.query.filter_by(title=p['title']).first()
# Load test code from solution file if provided
test_code = ''
if 'solution' in p and os.path.exists(p['solution']):
try:
with open(p['solution'], 'r', encoding='utf-8') as sf:
test_code = sf.read()
except Exception as e:
print(f"[ FATAL ERROR ]: Error reading solution file for {p['title']}: {e}")
if existing:
existing.description = p['description']
existing.test_code = test_code
print(f"[ INFO ]: Updated problem: {p['title']}")
else:
new_problem = Problem(title=p['title'], description=p['description'], test_code=test_code)
db.session.add(new_problem)
print(f"[ SUCCESS ]: Added new problem: {p['title']}")
db.session.commit()
print("[ SUCCESS ]: Successfully updated problems from JSON")
except ImportError:
print("[ FATAL IMPORT ERROR ]: Flask models not available - skipping JSON load @execptImportError")
except Exception as e:
print(f"[ ERROR ]: Error loading problems from JSON: {e}")
def schedule_problem_reload(app, json_path, interval_hours=10):
"""Schedule periodic reloading of problems from JSON"""
def reload_loop():
while True:
try:
with app.app_context():
load_problems_from_json(json_path)
time.sleep(interval_hours * 3600)
except Exception as e:
print(f"[ FATAL ERROR ]: Error in problem reload loop: {e}")
time.sleep(60) # Wait 1 minute before retrying
t = threading.Thread(target=reload_loop, daemon=True)
t.start()
def run_code_against_tests(user_code, test_code, timeout=10):
"""
Execute user code against test code with proper error handling.
Args:
user_code: The user's solution code
test_code: The test code to validate the solution
timeout: Maximum execution time in seconds
Returns:
dict: Result with passed, output, runtime, and error fields
"""
if not user_code or not user_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': 'No code provided'
}
if not test_code or not test_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': 'No test code available'
}
start_time = time.perf_counter()
output = ''
error = None
passed = False
temp_file = None
try:
# Check if unittest is used in test_code
if 'unittest' in test_code:
# Create temporary file with user code + test code
with tempfile.NamedTemporaryFile('w+', suffix='.py', delete=False, encoding='utf-8') as f:
# Combine user code and test code
combined_code = f"{user_code}\n\n{test_code}"
f.write(combined_code)
f.flush()
temp_file = f.name
try:
# Run the file as a subprocess with timeout
proc = subprocess.run(
[sys.executable, temp_file],
capture_output=True,
text=True,
timeout=timeout,
encoding='utf-8'
)
output = proc.stdout
if proc.stderr:
output += f"\nSTDERR:\n{proc.stderr}"
passed = proc.returncode == 0
if not passed:
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
except subprocess.TimeoutExpired:
passed = False
error = f"Code execution timed out after {timeout} seconds"
output = "Execution timed out"
else:
# Direct execution approach for simple assert-based tests
local_ns = {}
# Capture stdout
old_stdout = sys.stdout
captured_output = io.StringIO()
sys.stdout = captured_output
try:
# Execute user code first
exec(user_code, {}, local_ns)
# Execute test code in the same namespace
exec(test_code, local_ns, local_ns)
# If we get here without exceptions, tests passed
passed = True
except AssertionError as e:
passed = False
error = f"Assertion failed: {str(e)}"
except Exception as e:
passed = False
error = f"Runtime error: {traceback.format_exc()}"
finally:
output = captured_output.getvalue()
sys.stdout = old_stdout
except Exception as e:
passed = False
error = f"Execution error: {traceback.format_exc()}"
finally:
# Clean up temporary file
if temp_file and os.path.exists(temp_file):
try:
os.unlink(temp_file)
except Exception as e:
print(f"[ FATAL WARNING ]: Could not delete temp file {temp_file}: {e}")
runtime = time.perf_counter() - start_time
result = {
'passed': passed,
'output': output.strip() if output else '',
'runtime': runtime,
'error': error if not passed else None
}
print(f"[ TEST RESULT ]: passed={passed}, runtime={runtime:.3f}s")
if error:
print(f"Error: {error}")
return result

View File

@@ -0,0 +1,50 @@
## 🏷️ Problem: Lost & Found Office
You are designing a system for a **Lost-and-Found office**.
* People can **report lost items**, where each item is mapped to the owners name.
* People can later **claim their item**.
* If the item is not found, return `"No item found!"`.
---
### Function Signature
```python
class LostAndFound:
def __init__(self):
pass
def add_item(self, owner: str, item: str) -> None:
"""
Stores the item with the owner's name.
"""
def claim_item(self, owner: str) -> str:
"""
Returns the owner's item if it exists, otherwise
returns 'No item found!'.
"""
```
---
### Example
```python
office = LostAndFound()
office.add_item("Alice", "Umbrella")
office.add_item("Bob", "Backpack")
print(office.claim_item("Alice")) # Output: "Umbrella"
print(office.claim_item("Alice")) # Output: "No item found!"
print(office.claim_item("Charlie")) # Output: "No item found!"
```
---
### Constraints
* `1 <= len(owner), len(item) <= 100`
* You may assume only **strings** are used for owner and item.
* An owner can only have **one item** at a time.

View File

@@ -0,0 +1,7 @@
{
"title": "Hashmaps",
"description": "DSA - Hashmap. With Lost & Found",
"description_md": "problems/Hashmaps/description.md",
"test_code": "problems/Hashmaps/test.py",
"difficulty": "hard"
}

View File

@@ -0,0 +1,54 @@
#class LostAndFound:
# def __init__(self):
# self.items = {} # hashmap: owner -> item
#
# def add_item(self, owner: str, item: str) -> None:
# self.items[owner] = item
#
# def claim_item(self, owner: str) -> str:
# return self.items.pop(owner, "No item found!")
import unittest
class TestLostAndFound(unittest.TestCase):
def test_basic(self):
office = LostAndFound()
office.add_item("Alice", "Umbrella")
office.add_item("Bob", "Backpack")
test_cases = [
("Alice", "Umbrella", "First claim for Alice"),
("Alice", "No item found!", "Alice claims again (should fail)"),
("Charlie", "No item found!", "Charlie never added an item"),
]
print("\nTEST: Basic LostAndFound Behavior")
for name, expected, description in test_cases:
try:
actual = office.claim_item(name)
status = "✓ PASS" if actual == expected else "✗ FAIL"
print(f"{status} | {description} | Input: {name} -> Got: {actual} | Expected: {expected}")
self.assertEqual(actual, expected)
except Exception as e:
print(f"✗ ERROR | {description} | Input: {name} -> Exception: {e}")
raise
def test_overwrite_item(self):
office = LostAndFound()
office.add_item("Bob", "Hat")
office.add_item("Bob", "Shoes") # overwrite
print("\nTEST: Overwriting Items")
try:
actual = office.claim_item("Bob")
expected = "Shoes"
status = "✓ PASS" if actual == expected else "✗ FAIL"
print(f"{status} | Overwritten item claim | Input: Bob -> Got: {actual} | Expected: {expected}")
self.assertEqual(actual, expected)
except Exception as e:
print(f"✗ ERROR | Overwritten item claim | Input: Bob -> Exception: {e}")
raise
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@@ -0,0 +1,90 @@
# Prime Number Function Checker
You are asked to **write a function** that checks if a number is a **prime number**.
### What is a Prime Number?
* A **prime number** is a whole number greater than `1`.
* It has only **two divisors**: `1` and the number itself.
* Example:
* `7` → Prime (divisible only by `1` and `7`)
* `8` → Not Prime (divisible by `1, 2, 4, 8`)
Numbers less than or equal to `1` are **not prime**.
📖 More info: [Wikipedia](https://en.wikipedia.org/wiki/Prime_number)
---
### Function Signature
```python
def check_prime(number: int) -> bool:
```
* **Input**:
* `number` → an integer
* **Output**:
* `True` → if the number is prime
* `False` → if the number is not prime
---
### Example 1
**Input:**
```python
check_prime(2)
```
**Output:**
```
True
```
---
### Example 2
**Input:**
```python
check_prime(4)
```
**Output:**
```
False
```
---
### Example 3
**Input:**
```python
check_prime(13)
```
**Output:**
```
True
```
---
**_Dont worry you do NOT need to write these Function Calls into your solution. QPP checks automatically_**
### Hint
Try using the **modulo operator `%`** to check if one number divides evenly into another.
If any number between `2` and `n-1` divides your number evenly, then its **not prime**.

View File

@@ -0,0 +1,7 @@
{
"title": "Prime Number Checker",
"description": "Determine if a given number is a prime number",
"description_md": "problems/PrimeNumber/description.md",
"test_code": "problems/PrimeNumber/test.py",
"difficulty": "medium"
}

View File

@@ -0,0 +1,33 @@
import unittest
# <!-- Function to check -->
# def check_prime(number : int) -> bool:
# for i in range(2, int(number)):
# if int(number) % i == 0:
# return False
# return True
class TestPrimeNumber(unittest.TestCase):
def test_prime_function(self):
test_cases = [
(2,True),
(3,True),
(4,False),
(6,False),
(1,False)
]
print("\nFUNCTION OUTPUT TEST RESULTS")
for input_val, expected in test_cases:
try:
actual = check_prime(input_val)
status = "✓ PASS" if actual == expected else "✗ FAIL"
print(f"{status} | Input: '{input_val}' -> Got: {actual} | Expected: {expected}")
self.assertEqual(actual, expected)
except Exception as e:
print(f"✗ ERROR | Input: '{input_val}' -> Exception: {e}")
raise
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@@ -1,13 +1,13 @@
## Reverse a List
Write a function called `reverse_list` that takes a list as input and returns the list in reverse order.
You are **not allowed** to just use Pythons built-in `.reverse()` method or slicing (`[::-1]`) try to reverse it manually for practice.
You are **allowed** to just use Pythons built-in `.reverse()` method or slicing (`[::-1]`), try to reverse it manually for practice.
### Function Signature:
```python
def reverse_list(lst):
# your code here
# your code here
```
### Requirements

View File

@@ -1,33 +1,33 @@
import re
import unittest
## def is_valid_phone_number(phone_number : str):
## return bool(re.search(r"^(\d{3}-){2}\d{4}$", phone_number))
import unittest
class TestPhoneNumberRegex(unittest.TestCase):
def test_if_valid(self):
test_cases = [
("123-456-7890", True), # Valid format
("111-222-3333", True), # Another valid format
("abc-def-ghij", False), # Letters instead of digits
("1234567890", False), # Missing dashes
("123-45-67890", False), # Wrong grouping
("12-3456-7890", False), # Wrong grouping again
("", False), # Empty string
]
print("\nPHONE NUMBER VALIDATION TEST RESULTS")
for phone, expected in test_cases:
try:
actual = is_valid_phone_number(phone) # pyright: ignore[reportUndefinedVariable]
status = "✓ PASS" if actual == expected else "✗ FAIL"
print(f"{status} | Input: '{phone}' -> Got: {actual} | Expected: {expected}")
self.assertEqual(actual, expected)
except Exception as e:
print(f"✗ ERROR | Input: '{phone}' -> Exception: {e}")
raise
if __name__ == "__main__":
import re
import unittest
## def is_valid_phone_number(phone_number : str):
## return bool(re.search(r"^(\d{3}-){2}\d{4}$", phone_number))
import unittest
class TestPhoneNumberRegex(unittest.TestCase):
def test_if_valid(self):
test_cases = [
("123-456-7890", True), # Valid format
("111-222-3333", True), # Another valid format
("abc-def-ghij", False), # Letters instead of digits
("1234567890", False), # Missing dashes
("123-45-67890", False), # Wrong grouping
("12-3456-7890", False), # Wrong grouping again
("", False), # Empty string
]
print("\nPHONE NUMBER VALIDATION TEST RESULTS")
for phone, expected in test_cases:
try:
actual = is_valid_phone_number(phone) # pyright: ignore[reportUndefinedVariable]
status = "✓ PASS" if actual == expected else "✗ FAIL"
print(f"{status} | Input: '{phone}' -> Got: {actual} | Expected: {expected}")
self.assertEqual(actual, expected)
except Exception as e:
print(f"✗ ERROR | Input: '{phone}' -> Exception: {e}")
raise
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@@ -1 +1,14 @@
this is a easy sorting problem **it is solvable in less than 2 seconds**
## Sorting a List
In this example you are given a Task: **Sort a List of _say_ Apples**.
## Function Signature:
```python
def sortlist(lst: list) -> list:
return # Your solution
```
Using the Type Inferrence gives you a Idea of what to return. You may freely choose to type or not to. The Python Interpreter does not care about Type Inferrence
Sorting manually may be tedious. Look at the [PyDocs](https://docs.python.org/3/howto/sorting.html#sorting-basics)

View File

@@ -1,17 +1,26 @@
import unittest
# This is the function the user is expected to write.
# Its a really simple one, the user can choose not to type tho.
#def sortlist(lst = [4,3,2,1]) -> list:
#return sorted(lst)
class TestSolution(unittest.TestCase):
def test_sort(self):
# define x as a empty array.
# this will be used for the functiun ; a empty var does not work.
self.x = []
self.assertEqual(sortlist(self.x), sorted(self.x)) # pyright: ignore[reportUndefinedVariable]
test_cases=[
([3,2,1],[1,2,3]),
([4,3,2,1],[1,2,3,4])
]
print("\n Function Output Test Results: ")
for input_val, expected in test_cases:
try:
actual = sortlist(input_val) # pyright: ignore[reportUndefinedVariable]
status = "PASS" if actual == expected else "FAIL"
print(f"{status} | Input: '{input_val}' -> Got: '{actual}' | Expected: '{expected}'")
self.assertEqual(actual,expected)
except Exception as e:
print(f"ERROR | Input: '{input_val}' -> Exception: {e}")
raise
if __name__ == "__main__":
unittest.main()
unittest.main(verbosity=2)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

After

Width:  |  Height:  |  Size: 9.4 KiB

View File

@@ -1,403 +0,0 @@
document.addEventListener("DOMContentLoaded", () => {
// Dark mode functionality
const darkModeToggle = document.getElementById("darkModeToggle");
const html = document.documentElement;
// Load saved dark mode preference
const savedDarkMode = localStorage.getItem("darkMode");
if (
savedDarkMode === "true" ||
(savedDarkMode === null &&
// detect if the user already has a dark mode enabled in the system settings ( works for all systems )
window.matchMedia("(prefers-color-scheme: dark)").matches)
) {
html.classList.add("dark");
}
darkModeToggle?.addEventListener("click", () => {
html.classList.toggle("dark");
localStorage.setItem("darkMode", html.classList.contains("dark"));
});
// Problem search and pagination
const problemSearch = document.getElementById("problemSearch");
const problemsContainer = document.getElementById("problemsContainer");
const problemsPagination = document.getElementById("problemsPagination");
const problemsPrevBtn = document.getElementById("problemsPrevBtn");
const problemsNextBtn = document.getElementById("problemsNextBtn");
const problemsPaginationInfo = document.getElementById(
"problemsPaginationInfo",
);
let allProblemItems = [];
let filteredProblemItems = [];
let currentPage = 1;
const itemsPerPage = 5;
// Initialize problem items
function initializeProblemItems() {
allProblemItems = Array.from(
problemsContainer?.querySelectorAll(".problem-item") || [],
);
filteredProblemItems = [...allProblemItems];
updatePagination();
}
function updatePagination() {
const totalPages = Math.ceil(filteredProblemItems.length / itemsPerPage);
const startIndex = (currentPage - 1) * itemsPerPage;
const endIndex = startIndex + itemsPerPage;
// Hide all items first
allProblemItems.forEach((item) => {
item.style.display = "none";
});
// Show current page items
filteredProblemItems.slice(startIndex, endIndex).forEach((item) => {
item.style.display = "";
});
// Update pagination controls
if (problemsPrevBtn) problemsPrevBtn.disabled = currentPage <= 1;
if (problemsNextBtn) problemsNextBtn.disabled = currentPage >= totalPages;
if (problemsPaginationInfo) {
problemsPaginationInfo.textContent =
totalPages > 0
? `Page ${currentPage} of ${totalPages}`
: "No problems found";
}
// Hide pagination if not needed
if (problemsPagination) {
problemsPagination.classList.toggle("hidden", totalPages <= 1);
}
}
function filterProblems() {
const term = problemSearch?.value.toLowerCase().trim() || "";
filteredProblemItems = allProblemItems.filter((item) => {
const name = item.dataset.name?.toLowerCase() || "";
const desc = item.dataset.desc?.toLowerCase() || "";
return !term || name.includes(term) || desc.includes(term);
});
currentPage = 1;
updatePagination();
}
// Event listeners for pagination
problemsPrevBtn?.addEventListener("click", () => {
if (currentPage > 1) {
currentPage--;
updatePagination();
}
});
problemsNextBtn?.addEventListener("click", () => {
const totalPages = Math.ceil(filteredProblemItems.length / itemsPerPage);
if (currentPage < totalPages) {
currentPage++;
updatePagination();
}
});
problemSearch?.addEventListener("input", filterProblems);
// Initialize problems pagination
if (problemsContainer) {
initializeProblemItems();
}
// Leaderboard functionality
const problemFilter = document.getElementById("problemFilter");
const runtimeFilter = document.getElementById("runtimeFilter");
const leaderboardBody = document.getElementById("leaderboardBody");
const sortableHeaders = document.querySelectorAll(".sortable");
let currentSort = { column: "rank", direction: "asc" };
let allRows = [];
// Initialize rows array
function initializeRows() {
allRows = Array.from(leaderboardBody.querySelectorAll("tr")).map((row) => {
return {
element: row,
user: row.dataset.user || "",
problem: row.dataset.problem || "",
runtime: parseFloat(row.dataset.runtime) || 0,
memory: parseFloat(row.dataset.memory) || 0,
timestamp: new Date(row.dataset.timestamp || Date.now()).getTime(),
language: row.dataset.language || "",
originalIndex: Array.from(leaderboardBody.children).indexOf(row),
};
});
}
function updateRankClasses() {
const visibleRows = allRows.filter(
(row) => row.element.style.display !== "none",
);
visibleRows.forEach((rowData, index) => {
const rank = index + 1;
const row = rowData.element;
// Update rank cell
const rankCell = row.cells[0];
if (rankCell) rankCell.textContent = rank;
// Update rank classes
row.className = row.className.replace(/\brank-\d+\b/g, "");
if (rank === 1) row.classList.add("rank-1");
else if (rank <= 3) row.classList.add("rank-top3");
});
}
function calculateOverallRanking() {
const visibleRows = allRows.filter(
(row) => row.element.style.display !== "none",
);
if (visibleRows.length === 0) return;
// Group submissions by problem to find the best performance for each
const problemBests = {};
visibleRows.forEach((rowData) => {
const problem = rowData.problem;
if (!problemBests[problem]) {
problemBests[problem] = {
bestRuntime: Infinity,
bestMemory: Infinity,
};
}
problemBests[problem].bestRuntime = Math.min(
problemBests[problem].bestRuntime,
rowData.runtime,
);
problemBests[problem].bestMemory = Math.min(
problemBests[problem].bestMemory,
rowData.memory,
);
});
// Calculate normalized scores for each submission
visibleRows.forEach((rowData) => {
const problemBest = problemBests[rowData.problem];
// Prevent division by zero
const runtimeScore =
problemBest.bestRuntime > 0
? rowData.runtime / problemBest.bestRuntime
: 1;
const memoryScore =
problemBest.bestMemory > 0
? rowData.memory / problemBest.bestMemory
: 1;
// Weighted overall score (70% runtime, 30% memory)
rowData.overallScore = runtimeScore * 0.7 + memoryScore * 0.3;
});
// Sort by overall score (lower is better), then by timestamp (earlier is better for ties)
visibleRows.sort((a, b) => {
const scoreDiff = a.overallScore - b.overallScore;
if (Math.abs(scoreDiff) > 0.000001) return scoreDiff; // Use small epsilon for float comparison
// If scores are essentially equal, prefer earlier submission
return a.timestamp - b.timestamp;
});
// Reorder DOM elements and update ranks
visibleRows.forEach((rowData, index) => {
leaderboardBody.appendChild(rowData.element);
});
updateRankClasses();
}
function filterLeaderboard() {
const problemTerm = (problemFilter?.value || "").toLowerCase().trim();
const runtimeType = runtimeFilter?.value || "all";
// Reset all rows to visible first
allRows.forEach((rowData) => {
rowData.element.style.display = "";
});
// Apply problem filter
if (problemTerm) {
allRows.forEach((rowData) => {
const problemMatch = rowData.problem
.toLowerCase()
.includes(problemTerm);
if (!problemMatch) {
rowData.element.style.display = "none";
}
});
}
// Apply runtime filter (best/worst per user per problem)
if (runtimeType === "best" || runtimeType === "worst") {
const userProblemGroups = {};
// Group by user + problem combination
allRows.forEach((rowData) => {
if (rowData.element.style.display === "none") return;
const key = `${rowData.user}::${rowData.problem}`;
if (!userProblemGroups[key]) {
userProblemGroups[key] = [];
}
userProblemGroups[key].push(rowData);
});
// Hide all except best/worst for each user-problem combination
Object.values(userProblemGroups).forEach((group) => {
if (group.length <= 1) return;
// Sort by runtime
group.sort((a, b) => a.runtime - b.runtime);
const keepIndex = runtimeType === "best" ? 0 : group.length - 1;
group.forEach((rowData, index) => {
if (index !== keepIndex) {
rowData.element.style.display = "none";
}
});
});
}
calculateOverallRanking();
}
function getCellValue(rowData, column) {
switch (column) {
case "rank":
return parseInt(rowData.element.cells[0]?.textContent) || 0;
case "user":
return rowData.user.toLowerCase();
case "problem":
return rowData.problem.toLowerCase();
case "runtime":
return rowData.runtime;
case "memory":
return rowData.memory;
case "timestamp":
return rowData.timestamp;
case "language":
return rowData.language.toLowerCase();
default:
return "";
}
}
function sortLeaderboard(column, direction) {
if (column === "rank") {
calculateOverallRanking();
return;
}
const visibleRows = allRows.filter(
(row) => row.element.style.display !== "none",
);
visibleRows.sort((a, b) => {
const valueA = getCellValue(a, column);
const valueB = getCellValue(b, column);
let comparison = 0;
if (typeof valueA === "number" && typeof valueB === "number") {
comparison = valueA - valueB;
} else {
comparison = valueA < valueB ? -1 : valueA > valueB ? 1 : 0;
}
return direction === "asc" ? comparison : -comparison;
});
// Reorder DOM elements
visibleRows.forEach((rowData) => {
leaderboardBody.appendChild(rowData.element);
});
updateRankClasses();
}
// Event listeners for sorting
sortableHeaders.forEach((header) => {
header.addEventListener("click", () => {
const column = header.dataset.sort;
if (!column) return;
// Remove sorting classes from all headers
sortableHeaders.forEach((h) =>
h.classList.remove("sort-asc", "sort-desc"),
);
// Toggle sort direction
if (currentSort.column === column) {
currentSort.direction =
currentSort.direction === "asc" ? "desc" : "asc";
} else {
currentSort.column = column;
currentSort.direction = "asc";
}
// Add sorting class to current header
header.classList.add(`sort-${currentSort.direction}`);
sortLeaderboard(column, currentSort.direction);
});
});
// Filter event listeners
problemFilter?.addEventListener("input", filterLeaderboard);
runtimeFilter?.addEventListener("change", filterLeaderboard);
// Rank info popout
const rankInfoBtn = document.getElementById("rankInfoBtn");
const rankingExplanation = document.getElementById("rankingExplanation");
rankInfoBtn?.addEventListener("click", (e) => {
e.preventDefault();
rankingExplanation?.classList.toggle("active");
rankInfoBtn?.classList.toggle("active");
});
// Close ranking explanation when clicking outside
document.addEventListener("click", (e) => {
if (
rankingExplanation?.classList.contains("active") &&
!rankingExplanation.contains(e.target) &&
!rankInfoBtn?.contains(e.target)
) {
rankingExplanation.classList.remove("active");
rankInfoBtn?.classList.remove("active");
}
});
// Initialize everything
if (leaderboardBody && leaderboardBody.children.length > 0) {
initializeRows();
calculateOverallRanking();
// Set initial sort indicator
const defaultHeader = document.querySelector('[data-sort="rank"]');
if (defaultHeader) {
defaultHeader.classList.add("sort-asc");
}
}
// Apply dark mode to dynamically created elements
function applyDarkModeToElements() {
const isDark = html.classList.contains("dark");
// Any additional dark mode styling for dynamically created elements can go here
}
// Watch for dark mode changes
new MutationObserver(applyDarkModeToElements).observe(html, {
attributes: true,
attributeFilter: ["class"],
});
});

View File

@@ -1,6 +1,26 @@
<!doctype html>
<html lang="en" class="">
<head>
<script>
/*
This fix is for the fucking epileptics. idk what they're called.
It fixes the fucking flashing between changing pages.
This is both in the problems and this file
---
This changes nothing if the user uses light-system
*/
(function() {
try {
var dark = localStorage.getItem("darkMode");
if (
dark === "true" ||
(dark === null && window.matchMedia("(prefers-color-scheme: dark)").matches)
) {
document.documentElement.classList.add("dark");
}
} catch (e) {}
})();
</script>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<title>Quick Problem Platform</title>
@@ -69,7 +89,7 @@
<section class="card" id="leaderboardSection">
<div class="leaderboard-head">
<h2 style="font-size:1.1rem;margin:0">Leaderboard
<span id="rankInfoBtn" title="How ranking works"></span>
<!--<span id="rankInfoBtn" title="How ranking works"></span>-->
</h2>
</div>
<div class="leaderboard-controls">
@@ -141,6 +161,6 @@
</section>
</div>
</div>
<script src="{{ url_for('static', filename='script.js') }}"></script>
<script src="{{ url_for('serve_js', filename='script.js') }}"></script>
</body>
</html>

View File

@@ -1,6 +1,26 @@
<!doctype html>
<html lang="en" class="">
<head>
<script>
/*
This fix is for the fucking epileptics. idk what they're called.
It fixes the fucking flashing between changing pages.
This is both in the index and this file
---
This changes nothing if the user uses light-system
*/
(function() {
try {
var dark = localStorage.getItem("darkMode");
if (
dark === "true" ||
(dark === null && window.matchMedia("(prefers-color-scheme: dark)").matches)
) {
document.documentElement.classList.add("dark");
}
} catch (e) {}
})();
</script>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>{{ problem.title }} - Coding Problem</title>

View File

@@ -7,97 +7,302 @@ import subprocess
import os
import re
import ast
import signal
import resource
import shlex
import hashlib
import platform
from contextlib import contextmanager
# Security configuration
# Security configuration - Expanded whitelist
ALLOWED_IMPORTS = {
'math', 'random', 'datetime', 'json', 'collections', 'itertools',
'functools', 'operator', 'copy', 'unittest', 're', 'string'
'functools', 'operator', 'copy', 'unittest', 're', 'string', 'pyfiglet',
'decimal', 'fractions', 'statistics', 'textwrap', 'unicodedata',
'base64', 'binascii', 'struct', 'array', 'heapq', 'bisect'
}
# Enhanced dangerous patterns with more comprehensive coverage
DANGEROUS_PATTERNS = [
r'import\s+os(?:\s|$|\.)',
r'from\s+os\s+import',
r'import\s+subprocess(?:\s|$|\.)',
r'from\s+subprocess\s+import',
r'import\s+sys(?:\s|$|\.)',
r'from\s+sys\s+import',
r'import\s+shutil(?:\s|$|\.)',
r'from\s+shutil\s+import',
r'import\s+pathlib(?:\s|$|\.)',
r'from\s+pathlib\s+import',
r'__import__\s*\(',
r'exec\s*\(',
r'eval\s*\(',
r'compile\s*\(',
r'open\s*\(',
r'file\s*\(',
r'input\s*\(',
r'raw_input\s*\(',
r'\.unlink\s*\(',
r'\.remove\s*\(',
r'\.rmdir\s*\(',
r'\.rmtree\s*\(',
r'\.delete\s*\(',
r'\.kill\s*\(',
r'\.terminate\s*\(',
# System/OS operations
r'import\s+os(?:\s|$|\.)', r'from\s+os\s+import',
r'import\s+subprocess(?:\s|$|\.)', r'from\s+subprocess\s+import',
r'import\s+sys(?:\s|$|\.)', r'from\s+sys\s+import',
r'import\s+shutil(?:\s|$|\.)', r'from\s+shutil\s+import',
r'import\s+pathlib(?:\s|$|\.)', r'from\s+pathlib\s+import',
r'import\s+tempfile(?:\s|$|\.)', r'from\s+tempfile\s+import',
r'import\s+glob(?:\s|$|\.)', r'from\s+glob\s+import',
r'import\s+platform(?:\s|$|\.)', r'from\s+platform\s+import',
# Network operations
r'import\s+socket(?:\s|$|\.)', r'from\s+socket\s+import',
r'import\s+urllib(?:\s|$|\.)', r'from\s+urllib\s+import',
r'import\s+requests(?:\s|$|\.)', r'from\s+requests\s+import',
r'import\s+http(?:\s|$|\.)', r'from\s+http\s+import',
r'import\s+ftplib(?:\s|$|\.)', r'from\s+ftplib\s+import',
r'import\s+smtplib(?:\s|$|\.)', r'from\s+smtplib\s+import',
# Dynamic execution
r'__import__\s*\(', r'exec\s*\(', r'eval\s*\(', r'compile\s*\(',
r'globals\s*\(', r'locals\s*\(', r'vars\s*\(', r'dir\s*\(',
r'getattr\s*\(', r'setattr\s*\(', r'delattr\s*\(', r'hasattr\s*\(',
# File operations
r'open\s*\(', r'file\s*\(', r'input\s*\(', r'raw_input\s*\(',
# Destructive operations
r'\.unlink\s*\(', r'\.remove\s*\(', r'\.rmdir\s*\(', r'\.rmtree\s*\(',
r'\.delete\s*\(', r'\.kill\s*\(', r'\.terminate\s*\(',
# Threading and multiprocessing
r'import\s+threading(?:\s|$|\.)', r'from\s+threading\s+import',
r'import\s+multiprocessing(?:\s|$|\.)', r'from\s+multiprocessing\s+import',
r'import\s+asyncio(?:\s|$|\.)', r'from\s+asyncio\s+import',
# Memory and resource manipulation
r'import\s+gc(?:\s|$|\.)', r'from\s+gc\s+import',
r'import\s+resource(?:\s|$|\.)', r'from\s+resource\s+import',
r'import\s+ctypes(?:\s|$|\.)', r'from\s+ctypes\s+import',
# Code introspection
r'import\s+inspect(?:\s|$|\.)', r'from\s+inspect\s+import',
r'import\s+types(?:\s|$|\.)', r'from\s+types\s+import',
# Pickle and serialization security risks
r'import\s+pickle(?:\s|$|\.)', r'from\s+pickle\s+import',
r'import\s+marshal(?:\s|$|\.)', r'from\s+marshal\s+import',
# System exit
r'exit\s*\(', r'quit\s*\(', r'sys\.exit\s*\(',
# Dunder methods are dangerous if misused, for us we allow classes
# specifically the constructor
# del i dont allow tho
r'__del__\s*\(',
# Import tricks
r'importlib', r'imp\s', r'pkgutil',
]
# Maximum resource limits
MAX_MEMORY_MB = 100 # 100MB memory limit
MAX_CPU_TIME = 5 # 5 seconds CPU time
MAX_OUTPUT_SIZE = 10000 # 10KB output limit
MAX_CODE_SIZE = 50000 # 50KB code limit
MAX_TEST_SIZE = 10000 # 10KB test limit
class SecurityViolationError(Exception):
"""Raised when a security violation is detected."""
pass
class ResourceLimitError(Exception):
"""Raised when resource limits are exceeded."""
pass
@contextmanager
def resource_limits():
"""Context manager to set resource limits."""
# Set memory limit (in bytes)
if hasattr(resource, 'RLIMIT_AS'):
try:
resource.setrlimit(resource.RLIMIT_AS, (MAX_MEMORY_MB * 1024 * 1024, MAX_MEMORY_MB * 1024 * 1024))
except (OSError, ValueError):
pass # Ignore if we can't set memory limits
# Set CPU time limit
if hasattr(resource, 'RLIMIT_CPU'):
try:
resource.setrlimit(resource.RLIMIT_CPU, (MAX_CPU_TIME, MAX_CPU_TIME))
except (OSError, ValueError):
pass # Ignore if we can't set CPU limits
# Set file descriptor limit
if hasattr(resource, 'RLIMIT_NOFILE'):
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (10, 10))
except (OSError, ValueError):
pass
try:
yield
finally:
# Reset limits (though this won't matter much in subprocess)
pass
def validate_code_security(code):
"""
Validates code for security issues.
Enhanced security validation for code.
Returns (is_safe, error_message)
"""
# Check for dangerous patterns
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
return False, f"Dangerous operation detected: {pattern}"
if not isinstance(code, str):
return False, "Code must be a string"
# Parse AST to check imports
if len(code.strip()) == 0:
return False, "Code cannot be empty"
# Check code size limits
if len(code) > MAX_CODE_SIZE:
return False, f"Code too large (maximum {MAX_CODE_SIZE} bytes allowed)"
# Check for null bytes and other binary content
if '\x00' in code:
return False, "Code contains null bytes"
# Check for dangerous patterns with case-insensitive matching
for pattern in DANGEROUS_PATTERNS:
matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE)
if matches:
return False, f"Dangerous operation detected: {pattern} (matched: {matches[0] if matches else 'unknown'})"
# Check for excessive nesting (possible DoS)
nesting_level = 0
max_nesting = 20
for char in code:
if char in '([{':
nesting_level += 1
if nesting_level > max_nesting:
return False, f"Excessive nesting detected (max {max_nesting} levels)"
elif char in ')]}':
nesting_level = max(0, nesting_level - 1)
# Parse AST with enhanced validation
try:
tree = ast.parse(code)
# Check for dangerous AST nodes
for node in ast.walk(tree):
# Import validation
if isinstance(node, ast.Import):
for alias in node.names:
module_name = alias.name.split('.')[0]
if module_name not in ALLOWED_IMPORTS:
return False, f"Import not allowed: {module_name}"
elif isinstance(node, ast.ImportFrom):
if node.module:
module_name = node.module.split('.')[0]
if module_name not in ALLOWED_IMPORTS:
return False, f"Import not allowed: {module_name}"
# Check for attribute access on dangerous modules
elif isinstance(node, ast.Attribute):
if hasattr(node.value, 'id') and node.value.id in ['os', 'sys', 'subprocess']:
return False, f"Dangerous attribute access: {node.value.id}.{node.attr}"
# Check for function calls that might be dangerous
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in ['exec', 'eval', 'compile', '__import__', 'open', 'input']:
return False, f"Dangerous function call: {node.func.id}"
elif isinstance(node.func, ast.Attribute):
if node.func.attr in ['system', 'popen', 'spawn', 'fork']:
return False, f"Dangerous method call: {node.func.attr}"
# Check for while True loops without breaks (potential infinite loops)
elif isinstance(node, ast.While):
if isinstance(node.test, ast.Constant) and node.test.value is True:
# Check if there's a break statement in the loop
has_break = any(isinstance(n, ast.Break) for n in ast.walk(node))
if not has_break:
return False, "Potentially infinite loop detected (while True without break)"
except SyntaxError as e:
return False, f"Syntax error in code: {str(e)}"
except RecursionError:
return False, "Code too complex (recursion limit exceeded during parsing)"
except Exception as e:
return False, f"Code validation error: {str(e)}"
return True, None
def create_restricted_globals():
"""Create a restricted global namespace for code execution."""
"""Create a heavily restricted global namespace for code execution."""
# Very limited set of safe builtins
safe_builtins = {
'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'dir',
'divmod', 'enumerate', 'filter', 'float', 'format', 'frozenset',
'hex', 'id', 'int', 'isinstance', 'issubclass', 'iter', 'len',
'list', 'map', 'max', 'min', 'next', 'oct', 'ord', 'pow',
'print', 'range', 'repr', 'reversed', 'round', 'set', 'slice',
'sorted', 'str', 'sum', 'tuple', 'type', 'zip'
'abs', 'all', 'any', 'bin', 'bool', 'chr', 'dict', 'enumerate',
'filter', 'float', 'format', 'frozenset', 'hex', 'int', 'isinstance',
'issubclass', 'iter', 'len', 'list', 'map', 'max', 'min', 'next',
'oct', 'ord', 'pow', 'print', 'range', 'repr', 'reversed', 'round',
'set', 'slice', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip'
}
# Create restricted builtins dict with error-raising versions of dangerous functions
restricted_builtins = {}
for name in safe_builtins:
if name in __builtins__ if isinstance(__builtins__, dict) else dir(__builtins__):
if isinstance(__builtins__, dict):
restricted_builtins[name] = __builtins__[name]
else:
restricted_builtins[name] = getattr(__builtins__, name)
# Add error-raising versions of dangerous functions
def raise_security_error(name):
def _error(*args, **kwargs):
raise SecurityViolationError(f"Access to '{name}' is not permitted")
return _error
dangerous_builtins = ['exec', 'eval', 'compile', '__import__', 'open', 'input', 'globals', 'locals', 'vars']
for name in dangerous_builtins:
restricted_builtins[name] = raise_security_error(name)
restricted_globals = {
'__builtins__': {name: __builtins__[name] for name in safe_builtins if name in __builtins__}
'__builtins__': restricted_builtins,
'__name__': '__restricted__',
'__doc__': None,
}
# Add allowed modules
# Add allowed modules with error handling
for module in ALLOWED_IMPORTS:
try:
restricted_globals[module] = __import__(module)
imported_module = __import__(module)
restricted_globals[module] = imported_module
except ImportError:
pass # Module not available
pass # Module not available, skip
return restricted_globals
def create_secure_temp_environment():
"""Create a secure temporary directory with restricted permissions."""
temp_dir = tempfile.mkdtemp(prefix='secure_code_exec_')
# Set restrictive permissions on the directory
try:
os.chmod(temp_dir, 0o700) # Only owner can read/write/execute
except OSError:
pass # Best effort
return temp_dir
def cleanup_temp_environment(temp_dir):
"""Securely clean up temporary directory and all contents."""
if not temp_dir or not os.path.exists(temp_dir):
return
try:
# Recursively remove all files and subdirectories
for root, dirs, files in os.walk(temp_dir, topdown=False):
for name in files:
file_path = os.path.join(root, name)
try:
os.chmod(file_path, 0o600) # Ensure we can delete
os.unlink(file_path)
except OSError:
pass
for name in dirs:
dir_path = os.path.join(root, name)
try:
os.chmod(dir_path, 0o700) # Ensure we can delete
os.rmdir(dir_path)
except OSError:
pass
os.rmdir(temp_dir)
except Exception as e:
# Log warning but don't fail
print(f"Warning: Could not fully clean up temp directory {temp_dir}: {e}", file=sys.stderr)
def run_code_against_tests(user_code, test_code, max_execution_time=5):
"""
Securely run user code against test code with safety restrictions.
Securely run user code against test code with enhanced safety restrictions.
Args:
user_code: The user's solution code
@@ -107,7 +312,19 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
Returns:
dict: Result containing passed, output, runtime, and error information
"""
# Validate security for both user code and test code
# Input validation
if not isinstance(user_code, str) or not isinstance(test_code, str):
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "Both user_code and test_code must be strings"
}
# Validate execution time limit
max_execution_time = min(max(1, int(max_execution_time)), MAX_CPU_TIME)
# Enhanced security validation
user_safe, user_error = validate_code_security(user_code)
if not user_safe:
return {
@@ -126,20 +343,30 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
'error': f"Security violation in test code: {test_error}"
}
# Additional test code size validation
if len(test_code) > MAX_TEST_SIZE:
return {
'passed': False,
'output': '',
'runtime': 0,
'error': f"Test code too large (maximum {MAX_TEST_SIZE} bytes allowed)"
}
local_ns = {}
output = ''
start = time.perf_counter()
error = None
passed = False
temp_file = None
temp_dir = None
try:
# Check if unittest is used in test_code
if 'unittest' in test_code:
# Create temp file in a secure temporary directory
temp_dir = tempfile.mkdtemp(prefix='secure_code_')
# Create secure temp environment
temp_dir = create_secure_temp_environment()
temp_file = os.path.join(temp_dir, 'test_code.py')
try:
temp_file = os.path.join(temp_dir, 'test_code.py')
combined_code = f"{user_code}\n\n{test_code}"
# Write to temp file with restricted permissions
@@ -147,19 +374,65 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
f.write(combined_code)
os.chmod(temp_file, 0o600) # Read/write for owner only
# Run the file as a subprocess with additional security
# Prepare secure environment variables
secure_env = {
'PYTHONPATH': '',
'PYTHONDONTWRITEBYTECODE': '1',
'PYTHONUNBUFFERED': '1',
'PATH': '/usr/bin:/bin', # Minimal PATH
}
# Add current Python executable path if needed
python_dir = os.path.dirname(sys.executable)
if python_dir not in secure_env['PATH']:
secure_env['PATH'] = f"{python_dir}:{secure_env['PATH']}"
# Run with subprocess and comprehensive security measures
try:
# Create a wrapper script for additional security
wrapper_code = f"""
import sys
import signal
import resource
def timeout_handler(signum, frame):
raise TimeoutError("Execution timed out")
# Set up timeout handler
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm({max_execution_time})
try:
# Set resource limits
{resource_limits.__code__.co_consts}
with resource_limits():
exec(open(r'{temp_file}').read())
except Exception as e:
print(f"Error: {{e}}", file=sys.stderr)
sys.exit(1)
finally:
signal.alarm(0)
"""
wrapper_file = os.path.join(temp_dir, 'wrapper.py')
with open(wrapper_file, 'w', encoding='utf-8') as f:
f.write(wrapper_code)
os.chmod(wrapper_file, 0o600)
# Use the more secure wrapper approach
proc = subprocess.run(
[sys.executable, temp_file],
[sys.executable, temp_file], # Direct execution for now
capture_output=True,
text=True,
timeout=max_execution_time,
timeout=max_execution_time + 1, # Add buffer for subprocess overhead
encoding='utf-8',
cwd=temp_dir, # Run in the temporary directory
env={'PYTHONPATH': ''} # Restrict Python path
cwd=temp_dir,
env=secure_env,
# Additional security on Unix systems
preexec_fn=os.setpgrp if hasattr(os, 'setpgrp') else None
)
# Combine both stdout and stderr to capture all output
# Process output
combined_output = ""
if proc.stdout:
combined_output += proc.stdout
@@ -169,29 +442,32 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
else:
combined_output = proc.stderr
# Limit output size
if len(combined_output) > MAX_OUTPUT_SIZE:
combined_output = combined_output[:MAX_OUTPUT_SIZE] + "\n... (output truncated)"
output = combined_output
passed = proc.returncode == 0
if not passed:
error = f"Tests failed. Return code: {proc.returncode}\n{output}"
if not passed and proc.returncode != 0:
error = f"Tests failed. Return code: {proc.returncode}"
if output.strip():
error += f"\nOutput: {output}"
except subprocess.TimeoutExpired:
passed = False
error = f"Code execution timed out after {max_execution_time} seconds"
output = "Execution timed out"
except Exception as e:
passed = False
error = f"Subprocess execution error: {str(e)}"
finally:
# Secure cleanup of temporary directory and files
try:
if temp_file and os.path.exists(temp_file):
os.chmod(temp_file, 0o600) # Ensure we can delete
os.unlink(temp_file)
if os.path.exists(temp_dir):
os.rmdir(temp_dir)
except Exception as cleanup_error:
print(f"Warning: Could not clean up temp files: {cleanup_error}")
# Secure cleanup
cleanup_temp_environment(temp_dir)
else:
# Direct execution with restricted globals
# Direct execution with heavily restricted globals
old_stdout = sys.stdout
captured_output = io.StringIO()
sys.stdout = captured_output
@@ -200,34 +476,62 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
# Create restricted execution environment
restricted_globals = create_restricted_globals()
# Execute user code in restricted environment
exec(user_code, restricted_globals, local_ns)
# Set up timeout for direct execution
def timeout_handler(signum, frame):
raise TimeoutError("Execution timed out")
# Execute test code (should raise AssertionError if fail)
exec(test_code, {**restricted_globals, **local_ns}, local_ns)
passed = True
if hasattr(signal, 'SIGALRM'):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(max_execution_time)
try:
# Execute user code in restricted environment
exec(user_code, restricted_globals, local_ns)
# Execute test code
exec(test_code, {**restricted_globals, **local_ns}, local_ns)
passed = True
finally:
if hasattr(signal, 'SIGALRM'):
signal.alarm(0) # Cancel alarm
signal.signal(signal.SIGALRM, old_handler) # Restore handler
except TimeoutError:
passed = False
error = f"Code execution timed out after {max_execution_time} seconds"
except SecurityViolationError as e:
passed = False
error = f"Security violation: {str(e)}"
except AssertionError as e:
passed = False
error = f"Assertion failed: {str(e)}"
except MemoryError:
passed = False
error = "Memory limit exceeded"
except RecursionError:
passed = False
error = "Maximum recursion depth exceeded"
except Exception as e:
passed = False
error = f"Runtime error: {traceback.format_exc()}"
error = f"Runtime error: {str(e)}"
# Don't include full traceback for security
finally:
output = captured_output.getvalue()
sys.stdout = old_stdout
# Limit output size
if len(output) > MAX_OUTPUT_SIZE:
output = output[:MAX_OUTPUT_SIZE] + "\n... (output truncated)"
except Exception as e:
passed = False
error = f"Execution error: {traceback.format_exc()}"
error = f"Execution error: {str(e)}"
if temp_dir:
cleanup_temp_environment(temp_dir)
runtime = time.perf_counter() - start
# Limit output size to prevent memory issues
max_output_size = 10000 # 10KB limit
if len(output) > max_output_size:
output = output[:max_output_size] + "\n... (output truncated)"
result = {
'passed': passed,
'output': output.strip() if output else '',
@@ -237,26 +541,51 @@ def run_code_against_tests(user_code, test_code, max_execution_time=5):
return result
# Example usage with additional safety wrapper
def safe_code_runner(user_code, test_code):
"""
Additional wrapper for extra safety checks.
Enhanced safety wrapper with comprehensive security checks.
"""
# Additional length checks
if len(user_code) > 50000: # 50KB limit
# Input validation
if not isinstance(user_code, str) or not isinstance(test_code, str):
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "User code too large (maximum 50KB allowed)"
'error': "Both user_code and test_code must be strings"
}
if len(test_code) > 10000: # 10KB limit for test code
# Enhanced length checks
if len(user_code) > MAX_CODE_SIZE:
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "Test code too large (maximum 10KB allowed)"
'error': f"User code too large (maximum {MAX_CODE_SIZE} bytes allowed)"
}
return run_code_against_tests(user_code, test_code)
if len(test_code) > MAX_TEST_SIZE:
return {
'passed': False,
'output': '',
'runtime': 0,
'error': f"Test code too large (maximum {MAX_TEST_SIZE} bytes allowed)"
}
# Check for empty code
if not user_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "User code cannot be empty"
}
if not test_code.strip():
return {
'passed': False,
'output': '',
'runtime': 0,
'error': "Test code cannot be empty"
}
return run_code_against_tests(user_code, test_code, MAX_CPU_TIME)