Files
mucapy/mucapy/main.py
rattatwinko e50ab9d03c
All checks were successful
Build MuCaPy Executable / build-and-package (push) Successful in 1m41s
systeminfo updated in the about section!
2025-05-30 22:32:32 +02:00

2377 lines
92 KiB
Python

import os
import sys
import cv2
import json
import urllib.parse
import numpy as np
import psutil # Add psutil import
from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QPushButton, QComboBox, QSpinBox,
QFileDialog, QMessageBox, QMenu, QAction, QMenuBar,
QActionGroup, QSizePolicy, QGridLayout, QGroupBox,
QDockWidget, QScrollArea, QToolButton, QDialog,
QShortcut, QListWidget, QFormLayout, QLineEdit,
QCheckBox, QTabWidget, QListWidgetItem, QSplitter,
QProgressBar) # Add QProgressBar
from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QSettings, QDateTime, QRect, QThread, pyqtSignal, QMutex, QObject
from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter,
QPen, QBrush)
import platform
import time
import requests
import subprocess
class Config:
def __init__(self):
# Use platform-specific user directory for config
if sys.platform.startswith('win'):
config_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'MuCaPy')
pictures_dir = os.path.join(os.environ.get('USERPROFILE', os.path.expanduser('~')), 'Pictures', 'MuCaPy')
else:
config_dir = os.path.join(os.path.expanduser('~'), '.config', 'mucapy')
pictures_dir = os.path.join(os.path.expanduser('~'), 'Pictures', 'MuCaPy')
# Create config directory if it doesn't exist
os.makedirs(config_dir, exist_ok=True)
self.config_file = os.path.join(config_dir, 'config.json')
self.settings = {
'network_cameras': {}, # Store network cameras configuration
'last_model_dir': '',
'last_screenshot_dir': pictures_dir,
'last_layout': 0,
'last_fps': 10,
'last_selected_cameras': [],
'window_geometry': None,
'confidence_threshold': 0.35,
}
self.load_config()
def load_config(self):
"""Load configuration from JSON file"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
loaded_settings = json.load(f)
# Update settings while preserving default values for new keys
self.settings.update(loaded_settings)
except Exception as e:
print(f"Error loading config: {e}")
def save_config(self):
"""Save configuration to JSON file"""
try:
# Ensure the file's directory exists
os.makedirs(os.path.dirname(self.config_file), exist_ok=True)
with open(self.config_file, 'w') as f:
json.dump(self.settings, f, indent=4)
except Exception as e:
print(f"Error saving config: {e}")
def save_setting(self, key, value):
"""Save a setting to configuration"""
self.settings[key] = value
self.save_config()
def load_setting(self, key, default=None):
"""Load a setting from configuration"""
return self.settings.get(key, default)
class CameraThread(QThread):
"""Thread class for handling camera connections and frame grabbing"""
frame_ready = pyqtSignal(int, np.ndarray) # Signal to emit when new frame is ready (camera_index, frame)
error_occurred = pyqtSignal(int, str) # Signal to emit when error occurs (camera_index, error_message)
def __init__(self, camera_id, camera_info, parent=None):
super().__init__(parent)
self.camera_id = camera_id
self.camera_info = camera_info
self.running = False
self.cap = None
self.mutex = QMutex()
self.frame_interval = 1.0 / 30 # Default to 30 FPS
self.reconnect_attempts = 3 # Number of reconnection attempts
self.reconnect_delay = 2 # Delay between reconnection attempts in seconds
def set_fps(self, fps):
"""Set the target FPS for frame capture"""
self.frame_interval = 1.0 / fps
def validate_url(self, url):
"""Validate and normalize URL format"""
try:
# Remove any whitespace
url = url.strip()
# Parse the URL to validate its components
parsed = urllib.parse.urlparse(url)
# Ensure scheme is present
if not parsed.scheme:
url = f"http://{url}"
parsed = urllib.parse.urlparse(url)
# Validate DroidCam URL
if ':4747' in url:
# Ensure the path ends with /video
base_url = f"{parsed.scheme}://{parsed.netloc}"
return f"{base_url}/video"
return url
except Exception as e:
print(f"URL validation error: {e}")
return None
def construct_camera_url(self, camera_info):
"""Construct proper camera URL with authentication if needed"""
try:
if isinstance(camera_info, dict):
url = camera_info.get('url', '')
else:
url = str(camera_info)
# Validate and normalize the URL
url = self.validate_url(url)
if not url:
return None
# Handle authentication if provided
if isinstance(camera_info, dict) and 'username' in camera_info and 'password' in camera_info:
parsed = urllib.parse.urlparse(url)
if '@' not in parsed.netloc:
auth = f"{urllib.parse.quote(camera_info['username'])}:{urllib.parse.quote(camera_info['password'])}"
netloc = f"{auth}@{parsed.netloc}"
url = parsed._replace(netloc=netloc).geturl()
return url
except Exception as e:
print(f"Error constructing camera URL: {e}")
return None
def connect_to_camera(self):
"""Attempt to connect to the camera with retry logic"""
for attempt in range(self.reconnect_attempts):
try:
# Clean up any existing connection
if self.cap is not None:
self.cap.release()
self.cap = None
if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'):
name = self.camera_info[4:]
detector = self.parent().detector if self.parent() else None
if not detector or name not in detector.network_cameras:
self.error_occurred.emit(self.camera_id, f"Network camera {name} not found")
return False
camera_info = detector.network_cameras[name]
url = self.construct_camera_url(camera_info)
if not url:
self.error_occurred.emit(self.camera_id, f"Invalid camera URL for {name}")
return False
print(f"Attempting to connect to network camera URL: {url}")
# For DroidCam, try to verify the endpoint is accessible first
if ':4747' in url:
try:
response = requests.get(url, timeout=2)
if response.status_code != 200:
print(f"DroidCam endpoint returned status {response.status_code}")
if attempt < self.reconnect_attempts - 1:
continue
return False
except requests.exceptions.RequestException as e:
print(f"Failed to connect to DroidCam: {e}")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
# Create VideoCapture with the URL
self.cap = cv2.VideoCapture()
# Set buffer size to minimize latency
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Open the connection
if not self.cap.open(url):
print(f"Failed to open URL: {url}")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
else:
# Local camera
self.cap = cv2.VideoCapture(int(self.camera_info) if str(self.camera_info).isdigit() else self.camera_info)
# Verify the connection is working
if not self.cap.isOpened():
print("Camera not opened")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
# Test read a frame
ret, frame = self.cap.read()
if not ret or frame is None:
print("Failed to read test frame")
self.cap.release()
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
print(f"Successfully connected to camera")
return True
except Exception as e:
print(f"Connection attempt {attempt + 1} failed: {str(e)}")
if self.cap:
self.cap.release()
self.cap = None
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
else:
self.error_occurred.emit(self.camera_id, str(e))
return False
return False
def run(self):
"""Main thread loop"""
try:
if not self.connect_to_camera():
self.error_occurred.emit(self.camera_id, "Failed to connect to camera after multiple attempts")
return
self.running = True
last_frame_time = time.time()
consecutive_failures = 0
while self.running:
self.mutex.lock()
if not self.running:
self.mutex.unlock()
break
# Check if enough time has passed since last frame
current_time = time.time()
if current_time - last_frame_time < self.frame_interval:
self.mutex.unlock()
time.sleep(0.001) # Small sleep to prevent CPU hogging
continue
ret, frame = self.cap.read()
self.mutex.unlock()
if ret:
consecutive_failures = 0 # Reset failure counter on success
self.frame_ready.emit(self.camera_id, frame)
last_frame_time = current_time
else:
consecutive_failures += 1
if consecutive_failures >= 5: # Try to reconnect after 5 consecutive failures
print(f"Multiple frame read failures, attempting to reconnect...")
self.cap.release()
if not self.connect_to_camera():
self.error_occurred.emit(self.camera_id, "Failed to reconnect to camera")
break
consecutive_failures = 0
time.sleep(0.1) # Small delay before next attempt
except Exception as e:
self.error_occurred.emit(self.camera_id, str(e))
finally:
self.cleanup()
def stop(self):
"""Stop the thread safely"""
self.mutex.lock()
self.running = False
self.mutex.unlock()
self.wait()
def cleanup(self):
"""Clean up camera resources"""
if self.cap:
self.cap.release()
self.running = False
class MultiCamYOLODetector(QObject):
def __init__(self, parent=None):
super().__init__(parent)
self.cameras = []
self.camera_threads = {} # Dictionary to store camera threads
self.net = None
self.classes = []
self.colors = []
self.target_fps = 10
self.last_frame_time = 0
self.frame_interval = 1.0 / self.target_fps
self.available_cameras = []
self.model_dir = ""
self.cuda_available = self.check_cuda()
self.config = Config()
self.latest_frames = {} # Store latest frames from each camera
self.frame_lock = QMutex() # Mutex for thread-safe frame access
# Load settings
self.confidence_threshold = self.config.load_setting('confidence_threshold', 0.35)
self.network_cameras = self.config.load_setting('network_cameras', {})
self.target_fps = self.config.load_setting('last_fps', 10)
self.frame_interval = 1.0 / self.target_fps
# Load last used model if available
last_model = self.config.load_setting('last_model_dir')
if last_model and os.path.exists(last_model):
self.load_yolo_model(last_model)
def check_cuda(self):
"""Check if CUDA is available"""
try:
count = cv2.cuda.getCudaEnabledDeviceCount()
return count > 0
except:
return False
def add_network_camera(self, name, url):
"""Add a network camera to the saved list"""
self.network_cameras[name] = url
self.config.save_setting('network_cameras', self.network_cameras)
def remove_network_camera(self, name):
"""Remove a network camera from the saved list"""
if name in self.network_cameras:
del self.network_cameras[name]
self.config.save_setting('network_cameras', self.network_cameras)
def scan_for_cameras(self, max_to_check=10):
"""Check for available cameras including network cameras"""
self.available_cameras = []
# Check standard video devices
for i in range(max_to_check):
try:
cap = cv2.VideoCapture(i, cv2.CAP_V4L2)
if cap.isOpened():
self.available_cameras.append(str(i))
cap.release()
except:
continue
# Check direct device paths
v4l_paths = [f"/dev/video{i}" for i in range(max_to_check)]
v4l_paths += [f"/dev/v4l/video{i}" for i in range(max_to_check)]
for path in v4l_paths:
if os.path.exists(path):
try:
cap = cv2.VideoCapture(path, cv2.CAP_V4L2)
if cap.isOpened():
if path not in self.available_cameras:
self.available_cameras.append(path)
cap.release()
except:
continue
# Add saved network cameras
for name, url in self.network_cameras.items():
self.available_cameras.append(f"net:{name}")
return self.available_cameras
def load_yolo_model(self, model_dir):
"""Load YOLO model from selected directory with better error handling"""
self.model_dir = model_dir
try:
# Find model files in the directory
weights = [f for f in os.listdir(model_dir) if f.endswith(('.weights', '.onnx'))]
configs = [f for f in os.listdir(model_dir) if f.endswith('.cfg')]
classes = [f for f in os.listdir(model_dir) if f.endswith('.names')]
if not weights or not configs or not classes:
return False
# Use the first found files
weights_path = os.path.join(model_dir, weights[0])
config_path = os.path.join(model_dir, configs[0])
classes_path = os.path.join(model_dir, classes[0])
self.net = cv2.dnn.readNet(weights_path, config_path)
# Set backend based on availability
if self.cuda_available:
try:
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
except:
# Fall back to CPU if CUDA fails
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
else:
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
with open(classes_path, 'r') as f:
self.classes = f.read().strip().split('\n')
np.random.seed(42)
self.colors = np.random.randint(0, 255, size=(len(self.classes), 3), dtype='uint8')
return True
except Exception as e:
print(f"Error loading YOLO model: {e}")
return False
def connect_cameras(self, camera_paths):
"""Connect to multiple cameras including network cameras"""
self.disconnect_cameras()
for cam_path in camera_paths:
try:
if isinstance(cam_path, str):
if cam_path.startswith('net:'):
# Handle network camera
name = cam_path[4:] # Remove 'net:' prefix
if name in self.network_cameras:
url = self.network_cameras[name]
print(f"Connecting to network camera URL: {url}") # Debug print
cap = cv2.VideoCapture(url, cv2.CAP_ANY) # Use CAP_ANY for network streams
else:
print(f"Network camera {name} not found in saved cameras")
continue
elif cam_path.startswith('/dev/'):
# Handle device path
cap = cv2.VideoCapture(cam_path, cv2.CAP_V4L2)
else:
# Handle numeric index
cap = cv2.VideoCapture(int(cam_path), cv2.CAP_V4L2)
else:
cap = cv2.VideoCapture(int(cam_path), cv2.CAP_V4L2)
if not cap.isOpened():
print(f"Warning: Could not open camera {cam_path}")
continue
# Try to set properties but continue if they fail
try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
cap.set(cv2.CAP_PROP_FPS, self.target_fps)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)
except:
pass
self.cameras.append((cam_path, cap))
except Exception as e:
print(f"Error opening camera {cam_path}: {e}")
return len(self.cameras) > 0
def disconnect_cameras(self):
"""Disconnect all cameras"""
for _, cam in self.cameras:
try:
cam.release()
except:
pass
self.cameras = []
def get_frames(self):
"""Get frames from all cameras with error handling"""
frames = []
for i, (cam_path, cam) in enumerate(self.cameras):
try:
ret, frame = cam.read()
if not ret:
print(f"Warning: Could not read frame from camera {cam_path}")
frame = np.zeros((720, 1280, 3), dtype=np.uint8)
else:
# Only perform detection if net is loaded and detection is requested
parent_window = self.parent()
if parent_window and self.net is not None and parent_window.detection_enabled:
frame = self.get_detections(frame)
frames.append(frame)
except:
frame = np.zeros((720, 1280, 3), dtype=np.uint8)
frames.append(frame)
return frames
def get_detections(self, frame):
"""Perform YOLO object detection on a frame with error handling"""
if self.net is None:
return frame
try:
blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), swapRB=True, crop=False)
self.net.setInput(blob)
# Get output layer names compatible with different OpenCV versions
try:
layer_names = self.net.getLayerNames()
output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()]
except:
output_layers = self.net.getUnconnectedOutLayersNames()
outputs = self.net.forward(output_layers)
boxes = []
confidences = []
class_ids = []
for output in outputs:
for detection in output:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
if confidence > self.confidence_threshold: # Use configurable threshold
box = detection[0:4] * np.array([frame.shape[1], frame.shape[0],
frame.shape[1], frame.shape[0]])
(centerX, centerY, width, height) = box.astype('int')
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
class_ids.append(class_id)
indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confidence_threshold, 0.4)
if len(indices) > 0:
for i in indices.flatten():
(x, y, w, h) = boxes[i]
color = [int(c) for c in self.colors[class_ids[i]]]
cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
text = f"{self.classes[class_ids[i]]}: {confidences[i]:.2f}"
cv2.putText(frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
except Exception as e:
print(f"Detection error: {e}")
return frame
class CameraDisplay(QLabel):
"""Custom QLabel for displaying camera feed with fullscreen support"""
def __init__(self, parent=None):
super().__init__(parent)
self.setAlignment(Qt.AlignCenter)
self.setText("No camera feed")
self.setStyleSheet("""
QLabel {
background-color: #1E1E1E;
color: #DDD;
border: 2px solid #444;
border-radius: 4px;
}
""")
self.setMinimumSize(320, 240)
self.fullscreen_window = None
self.cam_id = None
self.fullscreen_timer = None
self.config = Config()
self.screenshot_dir = self.config.load_setting('screenshot_dir', os.path.expanduser('~/Pictures/MuCaPy'))
self.camera_name = None
# Create screenshot directory if it doesn't exist
if not os.path.exists(self.screenshot_dir):
os.makedirs(self.screenshot_dir, exist_ok=True)
def set_cam_id(self, cam_id):
"""Set camera identifier for this display"""
self.cam_id = cam_id
def set_camera_name(self, name):
"""Set the camera name for display"""
self.camera_name = name
self.update()
def take_screenshot(self):
"""Take a screenshot of the current frame"""
if not self.pixmap():
return
# Ask for screenshot directory if not set
if not self.screenshot_dir:
dir_path = QFileDialog.getExistingDirectory(
self,
"Select Screenshot Directory",
os.path.expanduser('~/Pictures'),
QFileDialog.ShowDirsOnly | QFileDialog.DontResolveSymlinks
)
if dir_path:
self.screenshot_dir = dir_path
self.config.save_setting('screenshot_dir', dir_path)
if not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
else:
return
# Generate filename with timestamp
timestamp = QDateTime.currentDateTime().toString('yyyy-MM-dd_hh-mm-ss')
filename = f"camera_{self.cam_id}_{timestamp}.png"
filepath = os.path.join(self.screenshot_dir, filename)
# Save the image
if self.pixmap().save(filepath):
QMessageBox.information(self, "Success", f"Screenshot saved to:\n{filepath}")
else:
QMessageBox.critical(self, "Error", "Failed to save screenshot")
def mouseDoubleClickEvent(self, event):
"""Handle double click to toggle fullscreen"""
if self.pixmap() and not self.fullscreen_window:
self.show_fullscreen()
elif self.fullscreen_window:
self.close_fullscreen()
def show_fullscreen(self):
"""Show this camera in a new window"""
if not self.pixmap():
return
self.fullscreen_window = QMainWindow(self.window())
self.fullscreen_window.setWindowTitle(f"Camera {self.cam_id}")
# Create central widget
central_widget = QWidget()
layout = QVBoxLayout(central_widget)
# Create fullscreen label
self.fullscreen_label = QLabel()
self.fullscreen_label.setAlignment(Qt.AlignCenter)
self.fullscreen_label.setMinimumSize(640, 480) # Set minimum size
self.fullscreen_label.setPixmap(self.pixmap().scaled(
QSize(1280, 720), # Default HD size
Qt.KeepAspectRatio,
Qt.SmoothTransformation
))
layout.addWidget(self.fullscreen_label)
self.fullscreen_window.setCentralWidget(central_widget)
# Add ESC shortcut to close
shortcut = QShortcut(QKeySequence(Qt.Key_Escape), self.fullscreen_window)
shortcut.activated.connect(self.close_fullscreen)
# Add screenshot shortcut (Ctrl+S)
screenshot_shortcut = QShortcut(QKeySequence("Ctrl+S"), self.fullscreen_window)
screenshot_shortcut.activated.connect(self.take_screenshot)
# Update fullscreen image when main window updates
self.fullscreen_timer = QTimer()
self.fullscreen_timer.timeout.connect(
lambda: self.update_fullscreen(self.fullscreen_label)
)
self.fullscreen_timer.start(30)
# Set window size and show
screen = QApplication.primaryScreen().availableGeometry()
self.fullscreen_window.resize(min(1280, screen.width() * 0.8), min(720, screen.height() * 0.8))
self.fullscreen_window.show()
def update_fullscreen(self, label):
"""Update the fullscreen display"""
if self.pixmap():
label.setPixmap(self.pixmap().scaled(
label.size(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
))
def close_fullscreen(self):
"""Close the fullscreen window"""
if self.fullscreen_window:
if self.fullscreen_timer:
self.fullscreen_timer.stop()
self.fullscreen_window.close()
self.fullscreen_window = None
self.fullscreen_timer = None
def paintEvent(self, event):
"""Override paint event to draw camera name overlay"""
super().paintEvent(event)
if self.camera_name and self.pixmap():
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
# Draw semi-transparent background
painter.setPen(Qt.NoPen)
painter.setBrush(QBrush(QColor(0, 0, 0, 180)))
rect = QRect(10, 10, painter.fontMetrics().width(self.camera_name) + 20, 30)
painter.drawRoundedRect(rect, 5, 5)
# Draw text
painter.setPen(QPen(QColor(255, 255, 255)))
painter.drawText(rect, Qt.AlignCenter, self.camera_name)
class CollapsibleDock(QDockWidget):
"""Custom dock widget with collapse/expand functionality"""
def __init__(self, title, parent=None):
super().__init__(title, parent)
self.setFeatures(QDockWidget.DockWidgetClosable |
QDockWidget.DockWidgetMovable |
QDockWidget.DockWidgetFloatable)
# Create a widget for the title bar that contains both toggle button and close button
title_widget = QWidget()
title_layout = QHBoxLayout(title_widget)
title_layout.setContentsMargins(0, 0, 0, 0)
title_layout.setSpacing(0)
self.toggle_button = QToolButton()
self.toggle_button.setIcon(QIcon.fromTheme("arrow-left"))
self.toggle_button.setIconSize(QSize(16, 16))
self.toggle_button.setStyleSheet("border: none;")
self.toggle_button.clicked.connect(self.toggle_collapse)
title_layout.addWidget(self.toggle_button)
title_layout.addStretch()
self.setTitleBarWidget(title_widget)
self.collapsed = False
self.original_size = None
self.original_minimum_width = None
def toggle_collapse(self):
"""Toggle between collapsed and expanded states"""
if self.collapsed:
self.expand()
else:
self.collapse()
def collapse(self):
"""Collapse the dock widget"""
if not self.collapsed:
self.original_size = self.size()
self.original_minimum_width = self.minimumWidth()
self.setMinimumWidth(0)
self.setMaximumWidth(0)
self.toggle_button.setIcon(QIcon.fromTheme("arrow-right"))
self.collapsed = True
def expand(self):
"""Expand the dock widget"""
if self.collapsed:
self.setMinimumWidth(250)
self.setMaximumWidth(16777215) # Qt default maximum
if self.original_size:
self.resize(self.original_size)
self.toggle_button.setIcon(QIcon.fromTheme("arrow-left"))
self.collapsed = False
class AboutWindow(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("About Multi-Camera YOLO Detection")
self.setWindowIcon(QIcon.fromTheme("help-about"))
self.resize(450, 420)
self.setWindowModality(Qt.ApplicationModal)
self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint)
layout = QVBoxLayout()
layout.setAlignment(Qt.AlignTop)
layout.setSpacing(20)
# App icon
icon_label = QLabel()
icon_label.setPixmap(QIcon.fromTheme("camera-web").pixmap(64, 64))
icon_label.setAlignment(Qt.AlignCenter)
layout.addWidget(icon_label)
# Title
title_label = QLabel("MuCaPy - 1")
title_label.setStyleSheet("font-size: 18px; font-weight: bold;")
title_label.setAlignment(Qt.AlignCenter)
layout.addWidget(title_label)
# Version label
version_label = QLabel("Version 1.0")
version_label.setAlignment(Qt.AlignCenter)
layout.addWidget(version_label)
# Get system info
info = self.get_system_info()
self.important_keys = ["Python", "OpenCV", "Memory", "CUDA"]
self.full_labels = {}
# === System Info Group ===
self.sysinfo_box = QGroupBox()
sysinfo_main_layout = QVBoxLayout()
sysinfo_main_layout.setContentsMargins(8, 8, 8, 8)
# Header layout: title + triangle button
header_layout = QHBoxLayout()
header_label = QLabel("System Information")
header_label.setStyleSheet("font-weight: bold;")
header_layout.addWidget(header_label)
header_layout.addStretch()
self.toggle_btn = QToolButton()
self.toggle_btn.setText("")
self.toggle_btn.setCheckable(True)
self.toggle_btn.setChecked(False)
self.toggle_btn.setStyleSheet("""
QToolButton {
border: none;
background: transparent;
font-size: 14px;
color: #DDD;
}
""")
self.toggle_btn.toggled.connect(self.toggle_expand)
header_layout.addWidget(self.toggle_btn)
sysinfo_main_layout.addLayout(header_layout)
# Details layout
self.sysinfo_layout = QVBoxLayout()
self.sysinfo_layout.setSpacing(5)
for key, value in info.items():
if key == "MemoryGB":
continue
label = QLabel(f"{key}: {value}")
self.style_label(label, key, value)
self.sysinfo_layout.addWidget(label)
self.full_labels[key] = label
if key not in self.important_keys:
label.setVisible(False)
sysinfo_main_layout.addLayout(self.sysinfo_layout)
self.sysinfo_box.setLayout(sysinfo_main_layout)
layout.addWidget(self.sysinfo_box)
# Close button
close_btn = QPushButton("Close")
close_btn.clicked.connect(self.accept)
close_btn.setFixedWidth(100)
layout.addWidget(close_btn, alignment=Qt.AlignCenter)
self.setStyleSheet("""
QDialog {
background-color: #2D2D2D;
color: #DDD;
}
QLabel {
color: #DDD;
}
QGroupBox {
border: 1px solid #555;
border-radius: 6px;
margin-top: 10px;
padding: 4px;
background-color: #252525;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 3px;
color: #DDD;
}
QPushButton {
background-color: #3A3A3A;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
min-width: 80px;
}
QPushButton:hover {
background-color: #4A4A4A;
}
""")
self.setLayout(layout)
def toggle_expand(self, checked):
for key, label in self.full_labels.items():
if key not in self.important_keys:
label.setVisible(checked)
self.toggle_btn.setText("" if checked else "")
def style_label(self, label, key, value):
if key == "Python":
label.setStyleSheet("color: #7FDBFF;")
elif key == "OpenCV":
label.setStyleSheet("color: #FF851B;")
elif key == "CUDA":
label.setStyleSheet("color: green;" if value == "Yes" else "color: red;")
elif key == "NumPy":
label.setStyleSheet("color: #B10DC9;")
elif key == "Requests":
label.setStyleSheet("color: #0074D9;")
elif key == "Memory":
try:
ram = int(value.split()[0])
if ram < 8:
label.setStyleSheet("color: red;")
elif ram < 16:
label.setStyleSheet("color: yellow;")
elif ram < 32:
label.setStyleSheet("color: lightgreen;")
else:
label.setStyleSheet("color: #90EE90;")
except:
label.setStyleSheet("color: gray;")
elif key == "CPU Usage":
try:
usage = float(value.strip('%'))
if usage > 80:
label.setStyleSheet("color: red;")
elif usage > 50:
label.setStyleSheet("color: yellow;")
else:
label.setStyleSheet("color: lightgreen;")
except:
label.setStyleSheet("color: gray;")
elif key in ("CPU Cores", "Logical CPUs"):
label.setStyleSheet("color: lightgreen;")
elif key in ("CPU", "Architecture", "OS"):
label.setStyleSheet("color: lightgray;")
else:
label.setStyleSheet("color: #DDD;")
def get_system_info(self):
import platform
info = {}
info['Python'] = sys.version.split()[0]
info['OS'] = f"{platform.system()} {platform.release()}"
info['Architecture'] = platform.machine()
info['OpenCV'] = cv2.__version__
info['CUDA'] = "Yes" if cv2.cuda.getCudaEnabledDeviceCount() > 0 else "No"
info['NumPy'] = np.__version__
info['Requests'] = requests.__version__
mem = psutil.virtual_memory()
info['MemoryGB'] = mem.total // (1024**3)
info['Memory'] = f"{info['MemoryGB']} GB RAM"
info['CPU Cores'] = psutil.cpu_count(logical=False)
info['Logical CPUs'] = psutil.cpu_count(logical=True)
info['CPU Usage'] = f"{psutil.cpu_percent()}%"
try:
if sys.platform == "win32":
info['CPU'] = platform.processor()
elif sys.platform == "linux":
info['CPU'] = subprocess.check_output("lscpu", shell=True).decode().split("\n")[0]
elif sys.platform == "darwin":
info['CPU'] = subprocess.check_output(["sysctl", "-n", "machdep.cpu.brand_string"]).decode().strip()
except Exception:
info['CPU'] = "Unknown"
return info
class NetworkCameraDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Network Camera Settings")
self.setModal(True)
self.resize(500, 400)
layout = QVBoxLayout(self)
# Instructions label
instructions = QLabel(
"Enter network camera details:\n"
"- For DroidCam: Use the IP and port shown in the app\n"
" Example: http://192.168.1.100:4747/video\n"
"- For other IP cameras: Enter the full stream URL\n"
"- Enable authentication if the camera requires username/password"
)
instructions.setWordWrap(True)
layout.addWidget(instructions)
# Camera list
self.camera_list = QListWidget()
layout.addWidget(self.camera_list)
# Input fields
form_layout = QFormLayout()
# Name and URL
self.name_edit = QLineEdit()
self.url_edit = QLineEdit()
form_layout.addRow("Name:", self.name_edit)
form_layout.addRow("URL:", self.url_edit)
# Authentication group
auth_group = QGroupBox("Authentication")
auth_layout = QVBoxLayout()
self.auth_checkbox = QCheckBox("Enable Authentication")
self.auth_checkbox.stateChanged.connect(self.toggle_auth_fields)
auth_layout.addWidget(self.auth_checkbox)
auth_form = QFormLayout()
self.username_edit = QLineEdit()
self.password_edit = QLineEdit()
self.password_edit.setEchoMode(QLineEdit.Password)
auth_form.addRow("Username:", self.username_edit)
auth_form.addRow("Password:", self.password_edit)
auth_layout.addLayout(auth_form)
auth_group.setLayout(auth_layout)
form_layout.addRow(auth_group)
layout.addLayout(form_layout)
# Initially disable auth fields
self.username_edit.setEnabled(False)
self.password_edit.setEnabled(False)
# Buttons
btn_layout = QHBoxLayout()
add_btn = QPushButton("Add Camera")
add_btn.clicked.connect(self.add_camera)
remove_btn = QPushButton("Remove Camera")
remove_btn.clicked.connect(self.remove_camera)
close_btn = QPushButton("Close")
close_btn.clicked.connect(self.accept)
btn_layout.addWidget(add_btn)
btn_layout.addWidget(remove_btn)
btn_layout.addWidget(close_btn)
layout.addLayout(btn_layout)
self.detector = parent.detector if parent else None
self.load_cameras()
def toggle_auth_fields(self, state):
"""Enable/disable authentication fields based on checkbox state"""
enabled = state == Qt.Checked
self.username_edit.setEnabled(enabled)
self.password_edit.setEnabled(enabled)
if not enabled:
self.username_edit.clear()
self.password_edit.clear()
def load_cameras(self):
"""Load saved network cameras into the list"""
if not self.detector:
return
self.camera_list.clear()
for name, camera_info in self.detector.network_cameras.items():
if isinstance(camera_info, dict):
url = camera_info.get('url', '')
has_auth = camera_info.get('username') is not None
display_text = f"{name} ({url})"
if has_auth:
display_text += " [Auth]"
else:
# Handle old format where camera_info was just the URL
display_text = f"{name} ({camera_info})"
self.camera_list.addItem(display_text)
def add_camera(self):
"""Add a new network camera"""
name = self.name_edit.text().strip()
url = self.url_edit.text().strip()
if not name or not url:
QMessageBox.warning(self, "Error", "Please enter both name and URL")
return
# Ensure URL has proper format for DroidCam
if ':4747' in url:
if not url.endswith('/video'):
url = url.rstrip('/') + '/video'
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://' + url
if self.detector:
print(f"Adding network camera: {name} with URL: {url}") # Debug print
self.detector.add_network_camera(name, url)
self.load_cameras()
self.name_edit.clear()
self.url_edit.clear()
def remove_camera(self):
"""Remove selected network camera"""
current = self.camera_list.currentItem()
if not current:
return
name = current.text().split(" (")[0]
if self.detector:
self.detector.remove_network_camera(name)
self.load_cameras()
class CameraSelectorDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Camera Selector")
self.setModal(True)
self.resize(800, 600) # Increased size for better visibility
self.detector = parent.detector if parent else None
self.selected_cameras = []
# Main layout
layout = QVBoxLayout(self)
# Instructions with better formatting
instructions = QLabel(
"Camera Selection Guide:\n\n"
"• Local Cameras: Built-in and USB cameras\n"
"• Network Cameras: IP cameras, DroidCam, etc.\n"
"• Use checkboxes to select/deselect cameras\n"
"• Double-click a camera to test the connection\n"
"• Selected cameras will appear in the preview below"
)
instructions.setStyleSheet("QLabel { background-color: #2A2A2A; padding: 10px; border-radius: 4px; }")
instructions.setWordWrap(True)
layout.addWidget(instructions)
# Split view for cameras
splitter = QSplitter(Qt.Horizontal)
# Left side - Available Cameras
left_widget = QWidget()
left_layout = QVBoxLayout(left_widget)
# Local Cameras Group
local_group = QGroupBox("Local Cameras")
local_layout = QVBoxLayout()
self.local_list = QListWidget()
self.local_list.setSelectionMode(QListWidget.ExtendedSelection)
local_layout.addWidget(self.local_list)
local_group.setLayout(local_layout)
left_layout.addWidget(local_group)
# Network Cameras Group
network_group = QGroupBox("Network Cameras")
network_layout = QVBoxLayout()
self.network_list = QListWidget()
self.network_list.setSelectionMode(QListWidget.ExtendedSelection)
network_layout.addWidget(self.network_list)
network_group.setLayout(network_layout)
left_layout.addWidget(network_group)
# Camera management buttons
btn_layout = QHBoxLayout()
refresh_btn = QPushButton("Refresh")
refresh_btn.clicked.connect(self.refresh_cameras)
add_net_btn = QPushButton("Add Network Camera")
add_net_btn.clicked.connect(self.show_network_dialog)
btn_layout.addWidget(refresh_btn)
btn_layout.addWidget(add_net_btn)
left_layout.addLayout(btn_layout)
splitter.addWidget(left_widget)
# Right side - Selected Cameras Preview
right_widget = QWidget()
right_layout = QVBoxLayout(right_widget)
preview_label = QLabel("Selected Cameras Preview")
preview_label.setStyleSheet("font-weight: bold;")
right_layout.addWidget(preview_label)
self.preview_list = QListWidget()
self.preview_list.setDragDropMode(QListWidget.InternalMove)
self.preview_list.setSelectionMode(QListWidget.ExtendedSelection)
right_layout.addWidget(self.preview_list)
# Preview controls
preview_btn_layout = QHBoxLayout()
remove_btn = QPushButton("Remove Selected")
remove_btn.clicked.connect(self.remove_selected)
clear_btn = QPushButton("Clear All")
clear_btn.clicked.connect(self.clear_selection)
preview_btn_layout.addWidget(remove_btn)
preview_btn_layout.addWidget(clear_btn)
right_layout.addLayout(preview_btn_layout)
splitter.addWidget(right_widget)
layout.addWidget(splitter)
# Bottom buttons
bottom_layout = QHBoxLayout()
select_all_btn = QPushButton("Select All")
select_all_btn.clicked.connect(self.select_all)
test_selected_btn = QPushButton("Test Selected")
test_selected_btn.clicked.connect(self.test_selected_cameras)
ok_btn = QPushButton("OK")
ok_btn.clicked.connect(self.accept)
cancel_btn = QPushButton("Cancel")
cancel_btn.clicked.connect(self.reject)
bottom_layout.addWidget(select_all_btn)
bottom_layout.addWidget(test_selected_btn)
bottom_layout.addStretch()
bottom_layout.addWidget(ok_btn)
bottom_layout.addWidget(cancel_btn)
layout.addLayout(bottom_layout)
# Connect signals
self.local_list.itemChanged.connect(self.update_preview)
self.network_list.itemChanged.connect(self.update_preview)
self.preview_list.model().rowsMoved.connect(self.update_camera_order)
# Set splitter sizes
splitter.setSizes([400, 400])
# Initial camera refresh
self.refresh_cameras()
# Restore last selection if available
if self.detector:
last_selected = self.detector.config.load_setting('last_selected_cameras', [])
if last_selected:
self.restore_selection(last_selected)
def refresh_cameras(self):
"""Refresh both local and network camera lists"""
self.local_list.clear()
self.network_list.clear()
if not self.detector:
return
# Add local cameras
for i in range(10): # Check first 10 indices
try:
cap = cv2.VideoCapture(i)
if cap.isOpened():
item = QListWidgetItem(f"Camera {i}")
item.setData(Qt.UserRole, str(i))
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Unchecked)
self.local_list.addItem(item)
cap.release()
except Exception as e:
print(f"Error checking camera {i}: {e}")
# Check device paths
if os.path.exists('/dev'):
for i in range(10):
device_path = f"/dev/video{i}"
if os.path.exists(device_path):
try:
cap = cv2.VideoCapture(device_path)
if cap.isOpened():
item = QListWidgetItem(f"{os.path.basename(device_path)}")
item.setData(Qt.UserRole, device_path)
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Unchecked)
self.local_list.addItem(item)
cap.release()
except Exception as e:
print(f"Error checking device {device_path}: {e}")
# Add network cameras
for name, camera_info in self.detector.network_cameras.items():
if isinstance(camera_info, dict):
url = camera_info.get('url', '')
has_auth = camera_info.get('username') is not None
display_text = f"{name} ({url})"
if has_auth:
display_text += " 🔒"
else:
display_text = f"{name} ({camera_info})"
item = QListWidgetItem(display_text)
item.setData(Qt.UserRole, f"net:{name}")
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Unchecked)
self.network_list.addItem(item)
def restore_selection(self, last_selected):
"""Restore previous camera selection"""
for cam_id in last_selected:
# Check local cameras
for i in range(self.local_list.count()):
item = self.local_list.item(i)
if item.data(Qt.UserRole) == cam_id:
item.setCheckState(Qt.Checked)
# Check network cameras
for i in range(self.network_list.count()):
item = self.network_list.item(i)
if item.data(Qt.UserRole) == cam_id:
item.setCheckState(Qt.Checked)
def update_preview(self):
"""Update the preview list with currently selected cameras"""
self.preview_list.clear()
self.selected_cameras = []
# Get selected local cameras
for i in range(self.local_list.count()):
item = self.local_list.item(i)
if item.checkState() == Qt.Checked:
cam_id = item.data(Qt.UserRole)
preview_item = QListWidgetItem(f"📷 {item.text()}")
preview_item.setData(Qt.UserRole, cam_id)
self.preview_list.addItem(preview_item)
self.selected_cameras.append(cam_id)
# Get selected network cameras
for i in range(self.network_list.count()):
item = self.network_list.item(i)
if item.checkState() == Qt.Checked:
cam_id = item.data(Qt.UserRole)
preview_item = QListWidgetItem(f"🌐 {item.text()}")
preview_item.setData(Qt.UserRole, cam_id)
self.preview_list.addItem(preview_item)
self.selected_cameras.append(cam_id)
# Save the current selection to config
if self.detector:
self.detector.config.save_setting('last_selected_cameras', self.selected_cameras)
def update_camera_order(self):
"""Update the camera order based on preview list order"""
self.selected_cameras = []
for i in range(self.preview_list.count()):
item = self.preview_list.item(i)
self.selected_cameras.append(item.data(Qt.UserRole))
# Save the new order
if self.detector:
self.detector.config.save_setting('last_selected_cameras', self.selected_cameras)
def select_all(self):
"""Select all cameras in both lists"""
for i in range(self.local_list.count()):
self.local_list.item(i).setCheckState(Qt.Checked)
for i in range(self.network_list.count()):
self.network_list.item(i).setCheckState(Qt.Checked)
def clear_selection(self):
"""Clear all selections"""
for i in range(self.local_list.count()):
self.local_list.item(i).setCheckState(Qt.Unchecked)
for i in range(self.network_list.count()):
self.network_list.item(i).setCheckState(Qt.Unchecked)
def remove_selected(self):
"""Remove selected items from the preview list"""
selected_items = self.preview_list.selectedItems()
for item in selected_items:
cam_id = item.data(Qt.UserRole)
# Uncheck corresponding items in source lists
for i in range(self.local_list.count()):
if self.local_list.item(i).data(Qt.UserRole) == cam_id:
self.local_list.item(i).setCheckState(Qt.Unchecked)
for i in range(self.network_list.count()):
if self.network_list.item(i).data(Qt.UserRole) == cam_id:
self.network_list.item(i).setCheckState(Qt.Unchecked)
def test_selected_cameras(self):
"""Test connection to selected cameras"""
selected_cameras = []
for i in range(self.preview_list.count()):
selected_cameras.append(self.preview_list.item(i).data(Qt.UserRole))
if not selected_cameras:
QMessageBox.warning(self, "Warning", "No cameras selected to test!")
return
# Create progress dialog
progress = QMessageBox(self)
progress.setIcon(QMessageBox.Information)
progress.setWindowTitle("Testing Cameras")
progress.setText("Testing camera connections...\nThis may take a few seconds.")
progress.setStandardButtons(QMessageBox.NoButton)
progress.show()
QApplication.processEvents()
# Test each camera
results = []
for cam_id in selected_cameras:
try:
if cam_id.startswith('net:'):
name = cam_id[4:]
camera_info = self.detector.network_cameras.get(name)
if isinstance(camera_info, dict):
url = camera_info['url']
if 'username' in camera_info and 'password' in camera_info:
parsed = urllib.parse.urlparse(url)
netloc = f"{camera_info['username']}:{camera_info['password']}@{parsed.netloc}"
url = parsed._replace(netloc=netloc).geturl()
else:
url = camera_info
cap = cv2.VideoCapture(url)
else:
cap = cv2.VideoCapture(int(cam_id) if cam_id.isdigit() else cam_id)
if cap.isOpened():
ret, frame = cap.read()
if ret:
results.append(f"{cam_id}: Connection successful")
else:
results.append(f"⚠️ {cam_id}: Connected but no frame received")
else:
results.append(f"{cam_id}: Failed to connect")
cap.release()
except Exception as e:
results.append(f"{cam_id}: Error - {str(e)}")
progress.close()
# Show results
result_dialog = QMessageBox(self)
result_dialog.setWindowTitle("Camera Test Results")
result_dialog.setText("\n".join(results))
result_dialog.setIcon(QMessageBox.Information)
result_dialog.exec_()
def show_network_dialog(self):
"""Show the network camera configuration dialog"""
dialog = NetworkCameraDialog(self)
if dialog.exec_() == QDialog.Accepted:
self.refresh_cameras()
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("MuCaPy - V1")
self.setGeometry(100, 100, 1200, 800)
# Initialize configuration
self.config = Config()
# Initialize default values
self.current_layout = 0 # Default to single camera layout
self.detector = MultiCamYOLODetector(self) # Pass self as parent
self.camera_settings = {}
self.detection_enabled = True # Add detection toggle flag
# Load saved settings first
self.load_saved_settings()
# Initialize hardware monitor timer
self.hw_timer = QTimer()
self.hw_timer.timeout.connect(self.update_hardware_stats)
self.hw_timer.start(1000) # Update every second
# Set dark theme style
self.setStyleSheet("""
QMainWindow, QWidget {
background-color: #2D2D2D;
color: #DDD;
}
QLabel {
color: #DDD;
}
QPushButton {
background-color: #3A3A3A;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
QPushButton:hover {
background-color: #4A4A4A;
}
QPushButton:pressed {
background-color: #2A2A2A;
}
QPushButton:disabled {
background-color: #2A2A2A;
color: #777;
}
QComboBox, QSpinBox {
background-color: #3A3A3A;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 3px;
}
QGroupBox {
border: 1px solid #555;
border-radius: 4px;
margin-top: 10px;
padding-top: 15px;
background-color: #252525;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 3px;
color: #DDD;
}
QMenuBar {
background-color: #252525;
color: #DDD;
}
QMenuBar::item {
background-color: transparent;
padding: 5px 10px;
}
QMenuBar::item:selected {
background-color: #3A3A3A;
}
QMenu {
background-color: #252525;
border: 1px solid #444;
color: #DDD;
}
QMenu::item:selected {
background-color: #3A3A3A;
}
QScrollArea {
border: none;
}
QDockWidget {
titlebar-close-icon: url(none);
titlebar-normal-icon: url(none);
}
QDockWidget::title {
background: #252525;
padding-left: 5px;
}
QToolButton {
background-color: transparent;
border: none;
}
""")
# Set palette for better dark mode support
palette = self.palette()
palette.setColor(palette.Window, QColor(45, 45, 45))
palette.setColor(palette.WindowText, QColor(221, 221, 221))
palette.setColor(palette.Base, QColor(35, 35, 35))
palette.setColor(palette.AlternateBase, QColor(45, 45, 45))
palette.setColor(palette.ToolTipBase, QColor(221, 221, 221))
palette.setColor(palette.ToolTipText, QColor(221, 221, 221))
palette.setColor(palette.Text, QColor(221, 221, 221))
palette.setColor(palette.Button, QColor(58, 58, 58))
palette.setColor(palette.ButtonText, QColor(221, 221, 221))
palette.setColor(palette.BrightText, Qt.red)
palette.setColor(palette.Link, QColor(42, 130, 218))
palette.setColor(palette.Highlight, QColor(42, 130, 218))
palette.setColor(palette.HighlightedText, Qt.black)
self.setPalette(palette)
# Initialize UI elements
self.init_ui()
# Create menus
self.create_menus()
# Initialize timer
self.init_timer()
# Apply saved settings to UI
self.apply_saved_settings()
def load_saved_settings(self):
"""Load saved settings from configuration"""
# Load model directory
model_dir = self.config.load_setting('model_dir')
if model_dir and os.path.exists(model_dir):
self.detector.load_yolo_model(model_dir)
# Load FPS setting
fps = self.config.load_setting('fps', 10)
self.detector.target_fps = int(fps)
self.detector.frame_interval = 1.0 / self.detector.target_fps
# Load layout setting
self.current_layout = int(self.config.load_setting('layout', 0))
def apply_saved_settings(self):
"""Apply loaded settings to UI elements"""
if hasattr(self, 'fps_spin'):
self.fps_spin.setValue(self.detector.target_fps)
if hasattr(self, 'layout_combo'):
self.layout_combo.setCurrentIndex(self.current_layout)
if hasattr(self, 'model_label') and self.detector.model_dir:
self.model_label.setText(f"Model: {os.path.basename(self.detector.model_dir)}")
def create_menus(self):
menubar = self.menuBar()
# File Menu
file_menu = menubar.addMenu('File')
# Save Settings action
save_settings_action = QAction('Save Settings...', self)
save_settings_action.setShortcut('Ctrl+S')
save_settings_action.setStatusTip('Save current settings to a file')
save_settings_action.triggered.connect(self.save_settings_to_file)
file_menu.addAction(save_settings_action)
# Load Settings action
load_settings_action = QAction('Load Settings...', self)
load_settings_action.setShortcut('Ctrl+O')
load_settings_action.setStatusTip('Load settings from a file')
load_settings_action.triggered.connect(self.load_settings_from_file)
file_menu.addAction(load_settings_action)
file_menu.addSeparator()
# Export Screenshots Directory action
export_screenshots_action = QAction('Export Screenshots Directory...', self)
export_screenshots_action.setStatusTip('Open the screenshots directory')
export_screenshots_action.triggered.connect(self.open_screenshots_directory)
file_menu.addAction(export_screenshots_action)
file_menu.addSeparator()
about_action = QAction("About", self)
about_action.triggered.connect(self.show_menu)
file_menu.addAction(about_action)
file_menu.addSeparator()
# Exit action
exit_action = QAction('Exit', self)
exit_action.setShortcut('Ctrl+Q')
exit_action.setStatusTip('Exit application')
exit_action.triggered.connect(self.close)
file_menu.addAction(exit_action)
# Model menu
model_menu = menubar.addMenu('Model')
load_model_action = QAction('Load Model Directory...', self)
load_model_action.triggered.connect(self.load_model_directory)
model_menu.addAction(load_model_action)
# View menu
view_menu = menubar.addMenu('View')
self.toggle_sidebar_action = QAction('Show Sidebar', self)
self.toggle_sidebar_action.setCheckable(True)
self.toggle_sidebar_action.setChecked(True)
self.toggle_sidebar_action.setShortcut('Ctrl+B')
self.toggle_sidebar_action.triggered.connect(self.toggle_sidebar_visibility)
view_menu.addAction(self.toggle_sidebar_action)
# Add toggle detection action
self.toggle_detection_action = QAction('Enable Detection', self)
self.toggle_detection_action.setCheckable(True)
self.toggle_detection_action.setChecked(True)
self.toggle_detection_action.setShortcut('Ctrl+D')
self.toggle_detection_action.triggered.connect(self.toggle_detection)
view_menu.addAction(self.toggle_detection_action)
# Camera menu
self.camera_menu = menubar.addMenu('Cameras')
# Add Camera Selector action
select_cameras_action = QAction('Select Cameras...', self)
select_cameras_action.setIcon(QIcon.fromTheme('camera-web'))
select_cameras_action.triggered.connect(self.show_camera_selector)
self.camera_menu.addAction(select_cameras_action)
self.camera_menu.addSeparator()
# Add Network Camera Settings action
network_camera_action = QAction('Network Camera Settings...', self)
network_camera_action.setIcon(QIcon.fromTheme('network-wireless'))
network_camera_action.triggered.connect(self.show_network_camera_dialog)
self.camera_menu.addAction(network_camera_action)
self.camera_menu.addSeparator()
# Create camera groups
self.local_camera_menu = QMenu('Local Cameras', self)
self.network_camera_menu = QMenu('Network Cameras', self)
self.camera_menu.addMenu(self.local_camera_menu)
self.camera_menu.addMenu(self.network_camera_menu)
# Create action groups for each camera type
self.local_camera_group = QActionGroup(self)
self.local_camera_group.setExclusive(False)
self.network_camera_group = QActionGroup(self)
self.network_camera_group.setExclusive(False)
# Initial population
self.populate_camera_menu()
def populate_camera_menu(self):
"""Populate the camera menu with available cameras"""
# Clear existing camera actions
self.local_camera_menu.clear()
self.network_camera_menu.clear()
# Add refresh action to both menus
refresh_action = QAction('Refresh List', self)
refresh_action.triggered.connect(self.populate_camera_menu)
self.local_camera_menu.addAction(refresh_action)
self.local_camera_menu.addSeparator()
available_cams = self.detector.scan_for_cameras()
local_cams_found = False
network_cams_found = False
for cam_path in available_cams:
if cam_path.startswith('net:'):
# Network camera
name = cam_path[4:]
action = QAction(name, self)
action.setCheckable(True)
action.setData(cam_path)
self.network_camera_group.addAction(action)
self.network_camera_menu.addAction(action)
network_cams_found = True
else:
# Local camera
if cam_path.startswith('/dev/'):
display_name = os.path.basename(cam_path)
else:
display_name = f"Camera {cam_path}"
action = QAction(display_name, self)
action.setCheckable(True)
action.setData(cam_path)
self.local_camera_group.addAction(action)
self.local_camera_menu.addAction(action)
local_cams_found = True
# Add placeholder text if no cameras found
if not local_cams_found:
no_local = QAction('No local cameras found', self)
no_local.setEnabled(False)
self.local_camera_menu.addAction(no_local)
if not network_cams_found:
no_net = QAction('No network cameras found', self)
no_net.setEnabled(False)
self.network_camera_menu.addAction(no_net)
# Update the camera label
self.update_selection_labels()
def update_selection_labels(self):
"""Update the model and camera selection labels"""
selected_cams = []
# Check local cameras
for action in self.local_camera_group.actions():
if action.isChecked():
selected_cams.append(action.text())
# Check network cameras
for action in self.network_camera_group.actions():
if action.isChecked():
selected_cams.append(action.text())
if selected_cams:
self.cameras_label.setText(f"Selected Cameras: {', '.join(selected_cams)}")
else:
self.cameras_label.setText("Selected Cameras: None")
def start_detection(self):
"""Start the detection process"""
if not self.detector.model_dir:
QMessageBox.critical(self, "Error", "No model directory selected!")
return
# Get selected cameras
selected_cameras = []
# Get local cameras
for action in self.local_camera_group.actions():
if action.isChecked():
selected_cameras.append(action.data())
# Get network cameras
for action in self.network_camera_group.actions():
if action.isChecked():
selected_cameras.append(action.data())
if not selected_cameras:
QMessageBox.critical(self, "Error", "No cameras selected!")
return
# Set FPS
self.detector.target_fps = self.fps_spin.value()
self.detector.frame_interval = 1.0 / self.detector.target_fps
# Connect to cameras
if not self.detector.connect_cameras(selected_cameras):
QMessageBox.critical(self, "Error", "Failed to connect to cameras!")
return
# Update UI
self.update_selection_labels()
self.start_btn.setEnabled(False)
self.stop_btn.setEnabled(True)
self.fps_spin.setEnabled(False)
# Start timer
self.timer.start(int(1000 / self.detector.target_fps))
def show_camera_selector(self):
"""Show a simplified camera selector dialog"""
dialog = QDialog(self)
dialog.setWindowTitle("Select Cameras")
dialog.setModal(True)
layout = QVBoxLayout(dialog)
# Create tabs for different camera types
tabs = QTabWidget()
local_tab = QWidget()
network_tab = QWidget()
# Local cameras tab
local_layout = QVBoxLayout(local_tab)
local_list = QListWidget()
local_layout.addWidget(QLabel("Available Local Cameras:"))
local_layout.addWidget(local_list)
# Network cameras tab
network_layout = QVBoxLayout(network_tab)
network_list = QListWidget()
network_layout.addWidget(QLabel("Available Network Cameras:"))
network_layout.addWidget(network_list)
# Add tabs
tabs.addTab(local_tab, "Local Cameras")
tabs.addTab(network_tab, "Network Cameras")
layout.addWidget(tabs)
# Populate lists
available_cams = self.detector.scan_for_cameras()
for cam_path in available_cams:
if cam_path.startswith('net:'):
name = cam_path[4:]
item = QListWidgetItem(name)
item.setData(Qt.UserRole, cam_path)
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Unchecked)
network_list.addItem(item)
else:
display_name = os.path.basename(cam_path) if cam_path.startswith('/dev/') else f"Camera {cam_path}"
item = QListWidgetItem(display_name)
item.setData(Qt.UserRole, cam_path)
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Unchecked)
local_list.addItem(item)
# Check currently selected cameras
for action in self.local_camera_group.actions():
if action.isChecked():
for i in range(local_list.count()):
item = local_list.item(i)
if item.data(Qt.UserRole) == action.data():
item.setCheckState(Qt.Checked)
for action in self.network_camera_group.actions():
if action.isChecked():
for i in range(network_list.count()):
item = network_list.item(i)
if item.data(Qt.UserRole) == action.data():
item.setCheckState(Qt.Checked)
# Buttons
btn_layout = QHBoxLayout()
ok_btn = QPushButton("OK")
cancel_btn = QPushButton("Cancel")
btn_layout.addWidget(ok_btn)
btn_layout.addWidget(cancel_btn)
layout.addLayout(btn_layout)
ok_btn.clicked.connect(dialog.accept)
cancel_btn.clicked.connect(dialog.reject)
if dialog.exec_() == QDialog.Accepted:
# Update camera selections
for action in self.local_camera_group.actions():
action.setChecked(False)
for action in self.network_camera_group.actions():
action.setChecked(False)
# Update local camera selections
for i in range(local_list.count()):
item = local_list.item(i)
if item.checkState() == Qt.Checked:
cam_path = item.data(Qt.UserRole)
for action in self.local_camera_group.actions():
if action.data() == cam_path:
action.setChecked(True)
# Update network camera selections
for i in range(network_list.count()):
item = network_list.item(i)
if item.checkState() == Qt.Checked:
cam_path = item.data(Qt.UserRole)
for action in self.network_camera_group.actions():
if action.data() == cam_path:
action.setChecked(True)
self.update_selection_labels()
def load_model_directory(self):
"""Open file dialog to select model directory"""
last_dir = self.config.load_setting('model_dir', QDir.homePath())
model_dir = QFileDialog.getExistingDirectory(
self,
"Select Model Directory",
last_dir,
QFileDialog.ShowDirsOnly | QFileDialog.DontResolveSymlinks
)
if model_dir:
if self.detector.load_yolo_model(model_dir):
self.model_label.setText(f"Model: {os.path.basename(model_dir)}")
self.config.save_setting('model_dir', model_dir)
QMessageBox.information(self, "Success", "Model loaded successfully!")
else:
QMessageBox.critical(self, "Error", "Failed to load model from selected directory")
def init_ui(self):
"""Initialize the user interface with collapsible sidebar"""
main_widget = QWidget()
main_layout = QHBoxLayout()
# Create collapsible sidebar
self.sidebar = CollapsibleDock("Controls")
self.sidebar.setMinimumWidth(250)
# Sidebar content
sidebar_content = QWidget()
sidebar_layout = QVBoxLayout()
# Model section
model_group = QGroupBox("Model")
model_layout = QVBoxLayout()
self.model_label = QLabel("Model: Not loaded")
model_layout.addWidget(self.model_label)
load_model_btn = QPushButton("Load Model Directory...")
load_model_btn.clicked.connect(self.load_model_directory)
model_layout.addWidget(load_model_btn)
model_group.setLayout(model_layout)
sidebar_layout.addWidget(model_group)
# Camera section
camera_group = QGroupBox("Cameras")
camera_layout = QVBoxLayout()
self.cameras_label = QLabel("Selected Cameras: None")
camera_layout.addWidget(self.cameras_label)
refresh_cams_btn = QPushButton("Refresh Camera List")
refresh_cams_btn.clicked.connect(self.populate_camera_menu)
camera_layout.addWidget(refresh_cams_btn)
camera_group.setLayout(camera_layout)
sidebar_layout.addWidget(camera_group)
# Settings section
settings_group = QGroupBox("Settings")
settings_layout = QVBoxLayout()
# FPS control
fps_layout = QHBoxLayout()
fps_layout.addWidget(QLabel("FPS:"))
self.fps_spin = QSpinBox()
self.fps_spin.setRange(1, 60)
self.fps_spin.setValue(10)
fps_layout.addWidget(self.fps_spin)
settings_layout.addLayout(fps_layout)
# Layout selection
layout_layout = QHBoxLayout()
layout_layout.addWidget(QLabel("Layout:"))
self.layout_combo = QComboBox()
self.layout_combo.addItems(["1 Camera", "2 Cameras", "3 Cameras", "4 Cameras", "Grid Layout"])
self.layout_combo.currentIndexChanged.connect(self.change_camera_layout)
layout_layout.addWidget(self.layout_combo)
settings_layout.addLayout(layout_layout)
# Add screenshot button to settings
screenshot_btn = QPushButton("Take Screenshot")
screenshot_btn.clicked.connect(self.take_screenshot)
settings_layout.addWidget(screenshot_btn)
settings_group.setLayout(settings_layout)
sidebar_layout.addWidget(settings_group)
# Control buttons
btn_layout = QHBoxLayout()
self.start_btn = QPushButton("Start")
self.start_btn.clicked.connect(self.start_detection)
btn_layout.addWidget(self.start_btn)
self.stop_btn = QPushButton("Stop")
self.stop_btn.clicked.connect(self.stop_detection)
self.stop_btn.setEnabled(False)
btn_layout.addWidget(self.stop_btn)
sidebar_layout.addLayout(btn_layout)
# Add stretch to push everything up
sidebar_layout.addStretch()
sidebar_content.setLayout(sidebar_layout)
# Add scroll area to sidebar
scroll = QScrollArea()
scroll.setWidget(sidebar_content)
scroll.setWidgetResizable(True)
scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self.sidebar.setWidget(scroll)
self.addDockWidget(Qt.LeftDockWidgetArea, self.sidebar)
# Main display area
self.display_area = QWidget()
self.display_layout = QGridLayout()
self.camera_displays = []
# Initially create 4 camera displays
for i in range(4):
display = CameraDisplay()
display.set_cam_id(i+1)
self.camera_displays.append(display)
self.display_layout.addWidget(display, i//2, i%2)
self.display_area.setLayout(self.display_layout)
main_layout.addWidget(self.display_area)
# Hardware Monitor section
hw_monitor_group = QGroupBox("Hardware Monitor")
hw_monitor_layout = QVBoxLayout()
# CPU Usage
cpu_layout = QHBoxLayout()
cpu_layout.addWidget(QLabel("CPU Usage:"))
self.cpu_progress = QProgressBar()
self.cpu_progress.setRange(0, 100)
self.cpu_progress.setTextVisible(True)
self.cpu_progress.setFormat("%p%")
self.cpu_progress.setStyleSheet("""
QProgressBar {
border: 1px solid #555;
border-radius: 2px;
text-align: center;
background-color: #2A2A2A;
}
QProgressBar::chunk {
background-color: #3A6EA5;
width: 1px;
}
""")
cpu_layout.addWidget(self.cpu_progress)
hw_monitor_layout.addLayout(cpu_layout)
# Per-core CPU Usage
cores_layout = QGridLayout()
self.core_bars = []
num_cores = psutil.cpu_count()
for i in range(num_cores):
core_label = QLabel(f"Core {i}:")
core_bar = QProgressBar()
core_bar.setRange(0, 100)
core_bar.setTextVisible(True)
core_bar.setFormat("%p%")
core_bar.setStyleSheet("""
QProgressBar {
border: 1px solid #555;
border-radius: 2px;
text-align: center;
background-color: #2A2A2A;
max-height: 12px;
}
QProgressBar::chunk {
background-color: #3A6EA5;
width: 1px;
}
""")
cores_layout.addWidget(core_label, i, 0)
cores_layout.addWidget(core_bar, i, 1)
self.core_bars.append(core_bar)
hw_monitor_layout.addLayout(cores_layout)
hw_monitor_group.setLayout(hw_monitor_layout)
sidebar_layout.addWidget(hw_monitor_group)
main_widget.setLayout(main_layout)
self.setCentralWidget(main_widget)
# Start with sidebar expanded
self.sidebar.expand()
def change_camera_layout(self, index):
"""Change the camera display layout"""
# Clear the layout
for i in reversed(range(self.display_layout.count())):
self.display_layout.itemAt(i).widget().setParent(None)
num_cameras = index + 1 if index < 4 else 4
if index == 4: # Grid layout
rows = 2
cols = 2
for i in range(4):
self.display_layout.addWidget(self.camera_displays[i], i//cols, i%cols)
else:
if num_cameras == 1:
self.display_layout.addWidget(self.camera_displays[0], 0, 0, 1, 2)
elif num_cameras == 2:
self.display_layout.addWidget(self.camera_displays[0], 0, 0)
self.display_layout.addWidget(self.camera_displays[1], 0, 1)
elif num_cameras == 3:
self.display_layout.addWidget(self.camera_displays[0], 0, 0)
self.display_layout.addWidget(self.camera_displays[1], 0, 1)
self.display_layout.addWidget(self.camera_displays[2], 1, 0, 1, 2)
elif num_cameras == 4:
for i in range(4):
self.display_layout.addWidget(self.camera_displays[i], i//2, i%2)
# Hide unused displays
for i, display in enumerate(self.camera_displays):
display.setVisible(i < num_cameras)
def init_timer(self):
"""Initialize the timer for updating camera feeds"""
self.timer = QTimer()
self.timer.timeout.connect(self.update_feeds)
def stop_detection(self):
"""Stop the detection process"""
self.timer.stop()
self.detector.disconnect_cameras()
# Update UI
self.start_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
self.fps_spin.setEnabled(True)
# Clear displays
for display in self.camera_displays:
display.setText("No camera feed")
display.setStyleSheet("""
QLabel {
background-color: #1E1E1E;
color: #DDD;
border: 2px solid #444;
border-radius: 4px;
}
""")
def update_feeds(self):
"""Update the camera feeds in the display"""
frames = self.detector.get_frames()
for i, (cam_path, frame) in enumerate(zip(self.detector.cameras, frames)):
if i >= len(self.camera_displays):
break
# Convert frame to QImage
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
bytes_per_line = ch * w
qt_image = QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format_RGB888)
# Scale while maintaining aspect ratio
pixmap = QPixmap.fromImage(qt_image)
display = self.camera_displays[i]
display.setPixmap(pixmap.scaled(display.width(), display.height(),
Qt.KeepAspectRatio, Qt.SmoothTransformation))
# Update camera name
cam_path = cam_path[0] if isinstance(cam_path, tuple) else cam_path
if isinstance(cam_path, str):
if cam_path.startswith('net:'):
# For network cameras, show the saved name
camera_name = cam_path[4:] # Get the name directly
display.set_camera_name(camera_name)
elif cam_path.startswith('/dev/'):
# For device paths, show the device name
display.set_camera_name(os.path.basename(cam_path))
else:
# For numeric indices, show Camera N
display.set_camera_name(f"Camera {cam_path}")
def take_screenshot(self):
"""Take screenshot of active camera displays"""
active_displays = [d for d in self.camera_displays if d.isVisible() and d.pixmap()]
if not active_displays:
QMessageBox.warning(self, "Warning", "No active camera displays to capture!")
return
for display in active_displays:
display.take_screenshot()
def show_menu(self):
about = AboutWindow(self) # Pass self as parent
about.exec_() # Use exec_() for modal dialog
def show_network_camera_dialog(self):
"""Show the network camera management dialog"""
dialog = NetworkCameraDialog(self)
dialog.exec_()
# Refresh camera list after dialog closes
self.populate_camera_menu()
def save_settings_to_file(self):
"""Save current settings to a JSON file"""
file_path, _ = QFileDialog.getSaveFileName(
self,
"Save Settings",
os.path.expanduser("~"),
"JSON Files (*.json)"
)
if file_path:
try:
settings = {
'model_dir': self.detector.model_dir,
'fps': self.fps_spin.value(),
'layout': self.layout_combo.currentIndex(),
'network_cameras': self.detector.network_cameras,
'confidence_threshold': self.detector.confidence_threshold
}
with open(file_path, 'w') as f:
json.dump(settings, f, indent=4)
QMessageBox.information(self, "Success", "Settings saved successfully!")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to save settings: {str(e)}")
def load_settings_from_file(self):
"""Load settings from a JSON file"""
file_path, _ = QFileDialog.getOpenFileName(
self,
"Load Settings",
os.path.expanduser("~"),
"JSON Files (*.json)"
)
if file_path:
try:
with open(file_path, 'r') as f:
settings = json.load(f)
# Apply loaded settings
if 'model_dir' in settings and os.path.exists(settings['model_dir']):
self.detector.load_yolo_model(settings['model_dir'])
self.model_label.setText(f"Model: {os.path.basename(settings['model_dir'])}")
if 'fps' in settings:
self.fps_spin.setValue(settings['fps'])
if 'layout' in settings:
self.layout_combo.setCurrentIndex(settings['layout'])
if 'network_cameras' in settings:
self.detector.network_cameras = settings['network_cameras']
self.populate_camera_menu()
if 'confidence_threshold' in settings:
self.detector.confidence_threshold = settings['confidence_threshold']
QMessageBox.information(self, "Success", "Settings loaded successfully!")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to load settings: {str(e)}")
def open_screenshots_directory(self):
"""Open the screenshots directory in the system's file explorer"""
screenshot_dir = self.config.load_setting('screenshot_dir', os.path.expanduser('~/Pictures/MuCaPy'))
if not os.path.exists(screenshot_dir):
os.makedirs(screenshot_dir, exist_ok=True)
# Open directory using the appropriate command for the OS
try:
if sys.platform.startswith('win'):
os.startfile(screenshot_dir)
elif sys.platform.startswith('darwin'): # macOS
subprocess.run(['open', screenshot_dir])
else: # Linux and other Unix-like
subprocess.run(['xdg-open', screenshot_dir])
except Exception as e:
QMessageBox.warning(self, "Warning", f"Could not open directory: {str(e)}")
def toggle_sidebar_visibility(self):
"""Toggle the visibility of the sidebar"""
if self.toggle_sidebar_action.isChecked():
self.sidebar.expand()
else:
self.sidebar.collapse()
def update_hardware_stats(self):
"""Update hardware statistics"""
# Update overall CPU usage
cpu_percent = psutil.cpu_percent()
self.cpu_progress.setValue(int(cpu_percent))
# Update per-core CPU usage
per_core = psutil.cpu_percent(percpu=True)
for i, usage in enumerate(per_core):
if i < len(self.core_bars):
self.core_bars[i].setValue(int(usage))
# Set color based on usage
for bar in [self.cpu_progress] + self.core_bars:
value = bar.value()
if value < 60:
bar.setStyleSheet("""
QProgressBar {
border: 1px solid #555;
border-radius: 2px;
text-align: center;
background-color: #2A2A2A;
}
QProgressBar::chunk {
background-color: #3A6EA5;
width: 1px;
}
""")
elif value < 85:
bar.setStyleSheet("""
QProgressBar {
border: 1px solid #555;
border-radius: 2px;
text-align: center;
background-color: #2A2A2A;
}
QProgressBar::chunk {
background-color: #E5A823;
width: 1px;
}
""")
else:
bar.setStyleSheet("""
QProgressBar {
border: 1px solid #555;
border-radius: 2px;
text-align: center;
background-color: #2A2A2A;
}
QProgressBar::chunk {
background-color: #A23535;
width: 1px;
}
""")
def toggle_detection(self):
"""Toggle detection enabled/disabled"""
self.detection_enabled = self.toggle_detection_action.isChecked()
if self.detection_enabled:
self.start_btn.setEnabled(True)
self.stop_btn.setEnabled(True)
else:
self.start_btn.setEnabled(False)
self.stop_btn.setEnabled(False)
if __name__ == "__main__":
# Set the env
os.environ["XDG_SESSION_TYPE"] = "xcb"
app = QApplication(sys.argv)
# Set application style to Fusion for better dark mode support
app.setStyle("Fusion")
window = MainWindow()
window.show()
sys.exit(app.exec_())