diff --git a/CameraDisplay.py b/CameraDisplay.py new file mode 100644 index 0000000..e69de29 diff --git a/mucapy/AlertWorker.py b/mucapy/AlertWorker.py new file mode 100644 index 0000000..dca223d --- /dev/null +++ b/mucapy/AlertWorker.py @@ -0,0 +1,178 @@ +import shutil +import wave +try: + import simpleaudio as sa +except ImportError: + sa = None +sa = None # Force it to not use it cause it fucks stuff up +import os +import time +import sys +from PyQt5.QtCore import QThread, pyqtSignal + + +class AlertWorker(QThread): + """Worker thread to play an alert sound safely without blocking UI. + Uses winsound on Windows, external system players on Unix (afplay/paplay/aplay/ffplay), + and falls back to simpleaudio if available. Supports cooperative stop. + """ + finished = pyqtSignal(bool, str) # success, message + + def __init__(self, wav_path: str, parent=None): + super().__init__(parent) + self.wav_path = wav_path + self._stop = False + self._subproc = None + self._play_obj = None + + def stop(self): + """Request the worker to stop early.""" + try: + self._stop = True + if self._play_obj is not None: + try: + self._play_obj.stop() + except Exception: + pass + if self._subproc is not None: + try: + self._subproc.terminate() + except Exception: + pass + except Exception: + pass + + def _find_unix_player(self): + """Return (cmd_list, name) for an available player on Unix or (None, None).""" + try: + if sys.platform.startswith('darwin'): + if shutil.which('afplay'): + return (['afplay'], 'afplay') + # Linux and others + if shutil.which('paplay'): + return (['paplay'], 'paplay') + if shutil.which('aplay'): + return (['aplay', '-q'], 'aplay') + if shutil.which('ffplay'): + return (['ffplay', '-nodisp', '-autoexit', '-loglevel', 'error'], 'ffplay') + except Exception: + pass + return (None, None) + + def run(self): + try: + if not os.path.exists(self.wav_path): + self.finished.emit(False, f"File not found: {self.wav_path}") + return + + # Windows path: prefer winsound (native, safe) + if sys.platform.startswith('win'): + ws_error = "unknown" + try: + import winsound as _ws # type: ignore + # Resolve flags safely even if some attributes are missing + SND_FILENAME = getattr(_ws, 'SND_FILENAME', 0x00020000) + SND_SYNC = getattr(_ws, 'SND_SYNC', 0x0000) # 0 is synchronous by default + flags = SND_FILENAME | SND_SYNC + # Ensure PlaySound exists + play_fn = getattr(_ws, 'PlaySound', None) + if play_fn is None: + raise RuntimeError('winsound.PlaySound not available') + for _ in range(4): + if self._stop: + break + try: + play_fn(self.wav_path, flags) + except Exception as e: + # On failure, break to try alternative backends + ws_error = str(e) + break + time.sleep(0.002) + else: + # Completed all 4 plays + self.finished.emit(True, "Alert played") + return + # If here, winsound failed at some point; continue to fallbacks + except Exception as e: + ws_error = str(e) + # Try simpleaudio on Windows as fallback + if sa is not None: + try: + with wave.open(self.wav_path, 'rb') as wf: + n_channels = max(1, wf.getnchannels()) + sampwidth = max(1, wf.getsampwidth()) + framerate = max(8000, wf.getframerate() or 44100) + frames = wf.readframes(wf.getnframes()) + for _ in range(4): + if self._stop: + break + self._play_obj = sa.play_buffer(frames, n_channels, sampwidth, framerate) + self._play_obj.wait_done() + time.sleep(0.002) + self.finished.emit(True, "Alert played") + return + except Exception as e2: + self.finished.emit(False, f"Playback error (winsound fallback -> simpleaudio): {e2}") + return + else: + self.finished.emit(False, f"Audio backend not available (winsound failed: {ws_error})") + return + + # Non-Windows: try external players first + cmd, name = self._find_unix_player() + if cmd is not None: + for _ in range(4): + if self._stop: + break + try: + self._subproc = subprocess.Popen(cmd + [self.wav_path], stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) + # Poll until done or stop requested + while True: + if self._stop: + try: + self._subproc.terminate() + except Exception: + pass + break + ret = self._subproc.poll() + if ret is not None: + break + time.sleep(0.01) + except Exception as e: + # Try next backend + cmd = None + break + finally: + self._subproc = None + time.sleep(0.002) + if cmd is not None: + self.finished.emit(True, "Alert played") + return + + # Fallback: simpleaudio if available + if sa is not None: + try: + with wave.open(self.wav_path, 'rb') as wf: + n_channels = max(1, wf.getnchannels()) + sampwidth = max(1, wf.getsampwidth()) + framerate = max(8000, wf.getframerate() or 44100) + frames = wf.readframes(wf.getnframes()) + for _ in range(4): + if self._stop: + break + self._play_obj = sa.play_buffer(frames, n_channels, sampwidth, framerate) + self._play_obj.wait_done() + time.sleep(0.002) + self.finished.emit(True, "Alert played") + return + except Exception as e: + self.finished.emit(False, f"Playback error (simpleaudio): {e}") + return + + self.finished.emit(False, "No audio backend available (afplay/paplay/aplay/ffplay/simpleaudio)") + except Exception as e: + try: + self.finished.emit(False, str(e)) + except Exception: + pass \ No newline at end of file diff --git a/mucapy/CameraScanThread.py b/mucapy/CameraScanThread.py new file mode 100644 index 0000000..cc787dc --- /dev/null +++ b/mucapy/CameraScanThread.py @@ -0,0 +1,27 @@ +import sys + +from PyQt5.QtCore import QThread, pyqtSignal + + +class CameraScanThread(QThread): + scan_finished = pyqtSignal(list, dict) + + def __init__(self, detector, max_to_check=10, parent=None): + super().__init__(parent) + self.detector = detector + self.max_to_check = max_to_check + + def run(self): + try: + cams = self.detector.scan_for_cameras(self.max_to_check) + names = {} + if sys.platform.startswith('win'): + try: + names = self.detector.get_camera_names_windows(cams) + except Exception as e: + print(f"Failed to get Windows camera names: {e}") + names = {} + self.scan_finished.emit(cams, names) + except Exception as e: + print(f"CameraScanThread error: {e}") + self.scan_finished.emit([], {}) \ No newline at end of file diff --git a/mucapy/CameraThread.py b/mucapy/CameraThread.py new file mode 100644 index 0000000..653df85 --- /dev/null +++ b/mucapy/CameraThread.py @@ -0,0 +1,524 @@ +import time +import urllib.parse +from enum import Enum + +import cv2 +import numpy as np +import requests +from PyQt5.QtCore import QThread, pyqtSignal, QMutex + +# Optional: Try to import rtsp library for better RTSP handling +try: + import rtsp + RTSP_LIB_AVAILABLE = True +except ImportError: + RTSP_LIB_AVAILABLE = False + print("rtsp library not available. Install with: pip install rtsp") + + +class StreamType(Enum): + """Enum for different stream types""" + LOCAL = "local" + RTSP = "rtsp" + HTTP_MJPEG = "http_mjpeg" + DROIDCAM = "droidcam" + IP_CAMERA = "ip_camera" + NETWORK = "network" + + +class CameraThread(QThread): + """Enhanced thread class for handling various camera connections and frame grabbing""" + frame_ready = pyqtSignal(int, np.ndarray) + error_occurred = pyqtSignal(int, str) + connection_status = pyqtSignal(int, bool, str) # camera_id, connected, message + + def __init__(self, camera_id, camera_info, parent=None): + super().__init__(parent) + self.camera_id = camera_id + self.camera_info = camera_info + self.running = False + self.cap = None + self.rtsp_client = None # For rtsp library client + self.mutex = QMutex() + self.frame_interval = 1.0 / 30 # Default to 30 FPS + self.reconnect_attempts = 5 + self.reconnect_delay = 2 + self.stream_type = None + self.read_timeout = 5.0 + self.connection_timeout = 10 + self.use_rtsp_lib = RTSP_LIB_AVAILABLE # Use rtsp library if available + + def set_fps(self, fps): + """Set the target FPS for frame capture""" + if fps > 0: + self.frame_interval = 1.0 / fps + + def detect_stream_type(self, url_or_info): + """Detect the type of stream based on URL or camera info""" + if isinstance(url_or_info, (int, str)): + url_str = str(url_or_info) + + if url_str.isdigit(): + return StreamType.LOCAL + elif url_str.startswith('rtsp://'): + return StreamType.RTSP + elif url_str.startswith('net:'): + return StreamType.NETWORK + elif ':4747' in url_str or 'droidcam' in url_str.lower(): + return StreamType.DROIDCAM + elif url_str.startswith(('http://', 'https://')): + return StreamType.HTTP_MJPEG + else: + return StreamType.IP_CAMERA + + return StreamType.NETWORK + + @staticmethod + def validate_url(url): + """Validate and normalize URL format""" + try: + url = url.strip() + + if not url: + return None + + # Parse the URL + if not url.startswith(('http://', 'https://', 'rtsp://', 'rtmp://')): + url = f"http://{url}" + + parsed = urllib.parse.urlparse(url) + + if not parsed.netloc: + return None + + # Special handling for DroidCam + if ':4747' in url and not url.endswith('/video'): + base_url = f"{parsed.scheme}://{parsed.netloc}" + return f"{base_url}/video" + + return url + + except Exception as e: + print(f"URL validation error: {e}") + return None + + def construct_camera_url(self, camera_info): + """Construct proper camera URL with authentication if needed""" + try: + if isinstance(camera_info, dict): + url = camera_info.get('url', '') + username = camera_info.get('username', '') + password = camera_info.get('password', '') + else: + url = str(camera_info) + username = '' + password = '' + + url = self.validate_url(url) + if not url: + return None + + # Handle authentication + if username and password: + parsed = urllib.parse.urlparse(url) + if '@' not in parsed.netloc: + auth = f"{urllib.parse.quote(username)}:{urllib.parse.quote(password)}" + netloc = f"{auth}@{parsed.netloc}" + url = urllib.parse.urlunparse(parsed._replace(netloc=netloc)) + + return url + + except Exception as e: + print(f"Error constructing camera URL: {e}") + return None + + def configure_capture(self, cap, stream_type): + """Configure VideoCapture object based on stream type""" + try: + # Common settings + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + if stream_type == StreamType.LOCAL: + cap.set(cv2.CAP_PROP_FPS, 30) + + elif stream_type in [StreamType.RTSP, StreamType.IP_CAMERA]: + # RTSP/IP camera optimizations + cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) + cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) + cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) + + elif stream_type in [StreamType.HTTP_MJPEG, StreamType.DROIDCAM]: + cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) + cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) + + except Exception as e: + print(f"Warning: Could not configure capture settings: {e}") + + def test_network_endpoint(self, url, timeout=3): + """Test if a network endpoint is accessible""" + try: + response = requests.head(url, timeout=timeout, allow_redirects=True) + return response.status_code in [200, 401] + except requests.exceptions.RequestException: + try: + response = requests.get(url, timeout=timeout, stream=True) + response.close() + return response.status_code in [200, 401] + except Exception: + return False + + def connect_rtsp_with_library(self, url): + """Connect to RTSP stream using the rtsp library""" + try: + print(f" Attempting connection with rtsp library...") + self.rtsp_client = rtsp.Client(rtsp_server_uri=url, verbose=False) + + # Test if connection works + if self.rtsp_client.isOpened(): + # Try to read a frame + frame = self.rtsp_client.read() + if frame is not None: + print(f" Successfully connected with rtsp library") + return True + else: + print(f" Failed to read frame with rtsp library") + self.rtsp_client.close() + self.rtsp_client = None + else: + print(f" rtsp library failed to open stream") + self.rtsp_client = None + + except Exception as e: + print(f" rtsp library error: {e}") + if self.rtsp_client: + try: + self.rtsp_client.close() + except Exception: + pass + self.rtsp_client = None + + return False + + def connect_rtsp_with_opencv(self, url): + """Connect to RTSP stream using OpenCV with different transport protocols""" + import os + + transports = ['tcp', 'udp', 'http'] + + for transport in transports: + try: + print(f" Trying RTSP with {transport.upper()} transport...") + + # Set FFMPEG options + os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = ( + f"rtsp_transport;{transport}|" + f"timeout;5000000|" + f"stimeout;5000000|" + f"buffer_size;1024000" + ) + + self.cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG) + self.configure_capture(self.cap, StreamType.RTSP) + + if not self.cap.isOpened(): + print(f" Failed to open with {transport}") + self.cap.release() + continue + + # Try to read a frame + start_time = time.time() + while time.time() - start_time < 5: + ret, frame = self.cap.read() + if ret and frame is not None and frame.size > 0: + print(f" Successfully connected with {transport.upper()}") + return True + time.sleep(0.1) + + print(f" Failed to read frame with {transport}") + self.cap.release() + + except Exception as e: + print(f" Error with {transport}: {e}") + if self.cap: + self.cap.release() + self.cap = None + + return False + + def connect_to_camera(self): + """Attempt to connect to the camera with enhanced retry logic""" + for attempt in range(self.reconnect_attempts): + try: + # Clean up existing connections + if self.cap is not None: + try: + self.cap.release() + except Exception: + pass + self.cap = None + + if self.rtsp_client is not None: + try: + self.rtsp_client.close() + except Exception: + pass + self.rtsp_client = None + + # Determine camera source + if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'): + name = self.camera_info[4:] + detector = self.parent().detector if self.parent() else None + + if not detector or name not in detector.network_cameras: + self.connection_status.emit(self.camera_id, False, f"Network camera {name} not found") + return False + + camera_info = detector.network_cameras[name] + url = self.construct_camera_url(camera_info) + + if not url: + self.connection_status.emit(self.camera_id, False, f"Invalid URL for {name}") + return False + + self.stream_type = self.detect_stream_type(url) + camera_source = url + + else: + if isinstance(self.camera_info, dict): + url = self.construct_camera_url(self.camera_info) + if not url: + self.connection_status.emit(self.camera_id, False, "Invalid camera URL") + return False + camera_source = url + self.stream_type = self.detect_stream_type(url) + else: + camera_source = self.camera_info + self.stream_type = self.detect_stream_type(camera_source) + + if self.stream_type != StreamType.LOCAL: + camera_source = self.validate_url(str(camera_source)) + if not camera_source: + self.connection_status.emit(self.camera_id, False, "Invalid camera source") + return False + + print(f"Attempt {attempt + 1}/{self.reconnect_attempts}: Connecting to {self.stream_type.value} camera...") + + # Test network endpoint for HTTP streams + if self.stream_type in [StreamType.HTTP_MJPEG, StreamType.DROIDCAM, StreamType.IP_CAMERA]: + if not self.test_network_endpoint(camera_source): + print(f"Network endpoint not accessible") + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + self.connection_status.emit(self.camera_id, False, "Network endpoint not accessible") + return False + + # Connect based on stream type + if self.stream_type == StreamType.LOCAL: + self.cap = cv2.VideoCapture(int(camera_source)) + self.configure_capture(self.cap, self.stream_type) + + if not self.cap.isOpened(): + print("Failed to open local camera") + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + # Test frame reading + ret, frame = self.cap.read() + if not ret or frame is None: + print("Failed to read from local camera") + self.cap.release() + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + elif self.stream_type == StreamType.RTSP: + # Try rtsp library first if available + if self.use_rtsp_lib and self.connect_rtsp_with_library(camera_source): + self.connection_status.emit(self.camera_id, True, "Connected (rtsp lib)") + return True + + # Fall back to OpenCV with different transports + if self.connect_rtsp_with_opencv(camera_source): + self.connection_status.emit(self.camera_id, True, "Connected (opencv)") + return True + + print("All RTSP connection methods failed") + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + else: + # HTTP MJPEG, DroidCam, IP Camera + self.cap = cv2.VideoCapture(camera_source, cv2.CAP_FFMPEG) + self.configure_capture(self.cap, self.stream_type) + + if not self.cap.isOpened(): + print("Failed to open stream") + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + # Test frame reading + start_time = time.time() + ret, frame = False, None + while time.time() - start_time < self.read_timeout: + ret, frame = self.cap.read() + if ret and frame is not None and frame.size > 0: + break + time.sleep(0.1) + + if not ret or frame is None or frame.size == 0: + print("Failed to read frames") + self.cap.release() + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + print(f"Successfully connected to camera") + self.connection_status.emit(self.camera_id, True, "Connected") + return True + + except Exception as e: + print(f"Connection attempt {attempt + 1} failed: {str(e)}") + + if self.cap: + try: + self.cap.release() + except Exception: + pass + self.cap = None + + if self.rtsp_client: + try: + self.rtsp_client.close() + except Exception: + pass + self.rtsp_client = None + + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + else: + self.connection_status.emit(self.camera_id, False, str(e)) + self.error_occurred.emit(self.camera_id, str(e)) + return False + + return False + + def run(self): + """Main thread loop with enhanced error handling""" + try: + if not self.connect_to_camera(): + self.error_occurred.emit(self.camera_id, "Failed to connect after multiple attempts") + return + + self.running = True + last_frame_time = time.time() + consecutive_failures = 0 + last_reconnect_time = time.time() + + while self.running: + self.mutex.lock() + should_continue = self.running + self.mutex.unlock() + + if not should_continue: + break + + # Frame rate limiting + current_time = time.time() + if current_time - last_frame_time < self.frame_interval: + time.sleep(0.001) + continue + + # Read frame based on connection type + try: + if self.rtsp_client: + # Using rtsp library + frame = self.rtsp_client.read() + ret = frame is not None + if ret: + # Convert PIL Image to numpy array + frame = np.array(frame) + # Convert RGB to BGR for OpenCV compatibility + if len(frame.shape) == 3 and frame.shape[2] == 3: + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + else: + # Using OpenCV + ret, frame = self.cap.read() + + if ret and frame is not None and frame.size > 0: + consecutive_failures = 0 + self.frame_ready.emit(self.camera_id, frame) + last_frame_time = current_time + else: + consecutive_failures += 1 + + if consecutive_failures >= 10: + if current_time - last_reconnect_time > 5: + print("Multiple failures, attempting reconnection...") + self.connection_status.emit(self.camera_id, False, "Reconnecting...") + + if self.cap: + self.cap.release() + if self.rtsp_client: + self.rtsp_client.close() + + if self.connect_to_camera(): + consecutive_failures = 0 + last_reconnect_time = current_time + else: + self.error_occurred.emit(self.camera_id, "Reconnection failed") + break + else: + consecutive_failures = 0 + + time.sleep(0.1) + + except Exception as e: + print(f"Error reading frame: {e}") + consecutive_failures += 1 + time.sleep(0.1) + + except Exception as e: + self.error_occurred.emit(self.camera_id, f"Thread error: {str(e)}") + + finally: + self.cleanup() + + def stop(self): + """Stop the thread safely""" + self.mutex.lock() + self.running = False + self.mutex.unlock() + + if not self.wait(5000): + print(f"Warning: Camera thread {self.camera_id} did not stop gracefully") + self.terminate() + + def cleanup(self): + """Clean up camera resources""" + print(f"Cleaning up camera {self.camera_id}") + try: + if self.cap: + self.cap.release() + self.cap = None + except Exception as e: + print(f"Error during cap cleanup: {e}") + + try: + if self.rtsp_client: + self.rtsp_client.close() + self.rtsp_client = None + except Exception as e: + print(f"Error during rtsp client cleanup: {e}") + + finally: + self.running = False + self.connection_status.emit(self.camera_id, False, "Disconnected") \ No newline at end of file diff --git a/mucapy/Config.py b/mucapy/Config.py new file mode 100644 index 0000000..e21cc54 --- /dev/null +++ b/mucapy/Config.py @@ -0,0 +1,61 @@ +import os +import json +import sys +class Config: + def __init__(self): + # Use platform-specific user directory for config + if sys.platform.startswith('win'): + config_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'MuCaPy') + pictures_dir = os.path.join(os.environ.get('USERPROFILE', os.path.expanduser('~')), 'Pictures', 'MuCaPy') + else: + config_dir = os.path.join(os.path.expanduser('~'), '.config', 'mucapy') + pictures_dir = os.path.join(os.path.expanduser('~'), 'Pictures', 'MuCaPy') + + # Create config directory if it doesn't exist + os.makedirs(config_dir, exist_ok=True) + + self.config_file = os.path.join(config_dir, 'config.json') + self.settings = { + 'network_cameras': {}, # Store network cameras configuration + 'last_model_dir': '', + 'last_screenshot_dir': pictures_dir, + 'last_layout': 0, + 'last_fps': 10, + 'last_selected_cameras': [], + 'window_geometry': None, + 'confidence_threshold': 0.35, + } + self.load_config() + + def load_config(self): + """Load configuration from JSON file""" + try: + if os.path.exists(self.config_file): + with open(self.config_file, 'r') as f: + loaded_settings = json.load(f) + # Update settings while preserving default values for new keys + self.settings.update(loaded_settings) + except Exception as e: + print(f"Error loading config: {e}") + + def save_config(self): + """Save configuration to JSON file""" + try: + # Ensure the file's directory exists + os.makedirs(os.path.dirname(self.config_file), exist_ok=True) + try: + with open(self.config_file, 'w') as f: + json.dump(self.settings, f, indent=4) + except FileNotFoundError: + pass + except Exception as e: + print(f"Error saving config: {e}") + + def save_setting(self, key, value): + """Save a setting to configuration""" + self.settings[key] = value + self.save_config() + + def load_setting(self, key, default=None): + """Load a setting from configuration""" + return self.settings.get(key, default) diff --git a/mucapy/PopoutWindow.py b/mucapy/PopoutWindow.py new file mode 100644 index 0000000..473f09a --- /dev/null +++ b/mucapy/PopoutWindow.py @@ -0,0 +1,246 @@ +from PyQt5.QtCore import Qt, QTimer, QDateTime, QRect, QEvent +from PyQt5.QtGui import (QImage, QPixmap, QColor, QKeySequence, QPainter, + QPen, QBrush) +from PyQt5.QtWidgets import (QMainWindow, QVBoxLayout, QHBoxLayout, + QWidget, QLabel, QScrollArea, QToolButton, QShortcut) + +class PopoutWindow(QMainWindow): + """Enhanced popout window with zoom, pan, overlays and guard-friendly controls""" + + def __init__(self, source_display: QLabel, cam_id=None, parent=None): + super().__init__(parent) + self.setWindowTitle(f"Camera {cam_id}" if cam_id is not None else "Camera") + self.source_display = source_display # QLabel providing pixmap updates + self.cam_id = cam_id + self.zoom_factor = 1.0 + self.min_zoom = 0.2 + self.max_zoom = 5.0 + self.paused = False + self.show_grid = False + self.show_timestamp = True + self.setMinimumSize(640, 480) + # Drag-to-pan state + self.dragging = False + self.last_mouse_pos = None + + # Central area: toolbar + scrollable image label + central = QWidget() + vbox = QVBoxLayout(central) + vbox.setContentsMargins(4, 4, 4, 4) + vbox.setSpacing(4) + + # Toolbar with guard-friendly controls + toolbar = QHBoxLayout() + self.btn_zoom_in = QToolButton() + self.btn_zoom_in.setText("+") + self.btn_zoom_out = QToolButton() + self.btn_zoom_out.setText("-") + self.btn_zoom_reset = QToolButton() + self.btn_zoom_reset.setText("100%") + self.btn_pause = QToolButton() + self.btn_pause.setText("Pause") + self.btn_snapshot = QToolButton() + self.btn_snapshot.setText("Snapshot") + self.btn_grid = QToolButton() + self.btn_grid.setText("Grid") + self.btn_time = QToolButton() + self.btn_time.setText("Time") + self.btn_full = QToolButton() + self.btn_full.setText("Fullscreen") + + for b in [self.btn_zoom_out, self.btn_zoom_in, self.btn_zoom_reset, self.btn_pause, self.btn_snapshot, + self.btn_grid, self.btn_time, self.btn_full]: + toolbar.addWidget(b) + toolbar.addStretch(1) + vbox.addLayout(toolbar) + + # Scroll area for panning when zoomed + self.image_label = QLabel() + self.image_label.setAlignment(Qt.AlignCenter) + self.scroll = QScrollArea() + self.scroll.setWidget(self.image_label) + self.scroll.setWidgetResizable(True) + vbox.addWidget(self.scroll, 1) + + self.setCentralWidget(central) + + # Shortcuts + QShortcut(QKeySequence("+"), self, activated=self.zoom_in) + QShortcut(QKeySequence("-"), self, activated=self.zoom_out) + QShortcut(QKeySequence("0"), self, activated=self.reset_zoom) + QShortcut(QKeySequence(Qt.Key_Escape), self, activated=self.close) + QShortcut(QKeySequence("F"), self, activated=self.toggle_fullscreen) + QShortcut(QKeySequence("Ctrl+S"), self, activated=self.take_snapshot) + QShortcut(QKeySequence("Space"), self, activated=self.toggle_pause) + QShortcut(QKeySequence("G"), self, activated=self.toggle_grid) + QShortcut(QKeySequence("T"), self, activated=self.toggle_timestamp) + + # Connect buttons + self.btn_zoom_in.clicked.connect(self.zoom_in) + self.btn_zoom_out.clicked.connect(self.zoom_out) + self.btn_zoom_reset.clicked.connect(self.reset_zoom) + self.btn_pause.clicked.connect(self.toggle_pause) + self.btn_snapshot.clicked.connect(self.take_snapshot) + self.btn_grid.clicked.connect(self.toggle_grid) + self.btn_time.clicked.connect(self.toggle_timestamp) + self.btn_full.clicked.connect(self.toggle_fullscreen) + + # Timer to refresh from source display + self.timer = QTimer(self) + self.timer.timeout.connect(self.refresh_frame) + self.timer.start(40) + + # Mouse wheel zoom support + self.image_label.installEventFilter(self) + + # Initial render + self.refresh_frame() + + def closeEvent(self, event): + if hasattr(self, 'timer') and self.timer: + self.timer.stop() + return super().closeEvent(event) + + def toggle_fullscreen(self): + if self.isFullScreen(): + self.showNormal() + self.btn_full.setText("Fullscreen") + else: + self.showFullScreen() + self.btn_full.setText("Windowed") + + def toggle_pause(self): + self.paused = not self.paused + self.btn_pause.setText("Resume" if self.paused else "Pause") + + def toggle_grid(self): + self.show_grid = not self.show_grid + + def toggle_timestamp(self): + self.show_timestamp = not self.show_timestamp + + def take_snapshot(self): + # Prefer using source_display method if available + if hasattr(self.source_display, 'take_screenshot'): + self.source_display.take_screenshot() + return + + def current_pixmap(self): + pm = self.source_display.pixmap() + return pm + + def refresh_frame(self): + if self.paused: + return + pm = self.current_pixmap() + if not pm: + return + # Create a copy to draw overlays without touching original + image = pm.toImage().convertToFormat(QImage.Format_ARGB32) + painter = QPainter(image) + painter.setRenderHint(QPainter.Antialiasing) + + # Timestamp overlay + if self.show_timestamp: + ts = QDateTime.currentDateTime().toString('yyyy-MM-dd hh:mm:ss') + text = ts + metrics = painter.fontMetrics() + w = metrics.width(text) + 14 + h = metrics.height() + 8 + rect = QRect(10, 10, w, h) + painter.setPen(Qt.NoPen) + painter.setBrush(QBrush(QColor(0, 0, 0, 160))) + painter.drawRoundedRect(rect, 6, 6) + painter.setPen(QPen(QColor(255, 255, 255))) + painter.drawText(rect, Qt.AlignCenter, text) + + # Grid overlay (rule-of-thirds) + if self.show_grid: + painter.setPen(QPen(QColor(255, 255, 255, 120), 1)) + img_w = image.width() + img_h = image.height() + for i in range(1, 3): + x = int(img_w * i / 3) + y = int(img_h * i / 3) + painter.drawLine(x, 0, x, img_h) + painter.drawLine(0, y, img_w, y) + painter.end() + + composed = QPixmap.fromImage(image) + if self.zoom_factor != 1.0: + target_w = int(composed.width() * self.zoom_factor) + target_h = int(composed.height() * self.zoom_factor) + composed = composed.scaled(target_w, target_h, Qt.KeepAspectRatio, Qt.SmoothTransformation) + self.image_label.setPixmap(composed) + # Update cursor based on ability to pan at this zoom/size + self.update_cursor() + + def zoom_in(self): + self.set_zoom(self.zoom_factor * 1.2) + + def zoom_out(self): + self.set_zoom(self.zoom_factor / 1.2) + + def reset_zoom(self): + self.set_zoom(1.0) + + def set_zoom(self, z): + z = max(self.min_zoom, min(self.max_zoom, z)) + if abs(z - self.zoom_factor) > 1e-4: + self.zoom_factor = z + self.refresh_frame() + self.update_cursor() + + def can_pan(self): + # Allow panning when the pixmap is larger than the viewport (zoomed) + if not self.image_label.pixmap(): + return False + vp = self.scroll.viewport().size() + pm = self.image_label.pixmap().size() + return pm.width() > vp.width() or pm.height() > vp.height() + + def update_cursor(self): + if self.can_pan(): + self.image_label.setCursor(Qt.OpenHandCursor if not self.dragging else Qt.ClosedHandCursor) + else: + self.image_label.setCursor(Qt.ArrowCursor) + + def eventFilter(self, obj, event): + if obj is self.image_label: + # Mouse wheel zoom centered on cursor + if event.type() == QEvent.Wheel: + delta = event.angleDelta().y() + if delta > 0: + self.zoom_in() + else: + self.zoom_out() + return True + # Start drag + if event.type() == QEvent.MouseButtonPress and event.button() == Qt.LeftButton and self.can_pan(): + self.dragging = True + self.last_mouse_pos = event.pos() + self.update_cursor() + return True + # Dragging + if event.type() == QEvent.MouseMove and self.dragging and self.last_mouse_pos is not None: + delta = event.pos() - self.last_mouse_pos + hbar = self.scroll.horizontalScrollBar() + vbar = self.scroll.verticalScrollBar() + hbar.setValue(hbar.value() - delta.x()) + vbar.setValue(vbar.value() - delta.y()) + self.last_mouse_pos = event.pos() + return True + # End drag + if event.type() == QEvent.MouseButtonRelease and event.button() == Qt.LeftButton: + if self.dragging: + self.dragging = False + self.last_mouse_pos = None + self.update_cursor() + return True + if event.type() == QEvent.Enter or event.type() == QEvent.Leave: + # Update cursor when entering/leaving the label + if event.type() == QEvent.Leave: + self.dragging = False + self.last_mouse_pos = None + self.update_cursor() + return super().eventFilter(obj, event) \ No newline at end of file diff --git a/mucapy/YoloClass.py b/mucapy/YoloClass.py new file mode 100644 index 0000000..32b0df6 --- /dev/null +++ b/mucapy/YoloClass.py @@ -0,0 +1,468 @@ +import numpy as np +import cv2 +import time +import platform +import os +import subprocess +from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QDateTime, QRect, QThread, pyqtSignal, QMutex, QObject, QEvent +from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter, + QPen, QBrush) +from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, + QWidget, QLabel, QPushButton, QComboBox, QSpinBox, + QFileDialog, QMessageBox, QMenu, QAction, QActionGroup, QGridLayout, QGroupBox, + QDockWidget, QScrollArea, QToolButton, QDialog, + QShortcut, QListWidget, QFormLayout, QLineEdit, + QCheckBox, QTabWidget, QListWidgetItem, QSplitter, + QProgressBar, QSizePolicy) + +from CameraThread import CameraThread +from Config import Config +import sys +from CameraScanThread import CameraScanThread +class MultiCamYOLODetector(QObject): + cameras_scanned = pyqtSignal(list, dict) # Emits (available_cameras, index_to_name) + + def __init__(self, parent=None): + super().__init__(parent) + self.cameras = [] + self.camera_threads = {} # Dictionary to store camera threads + self.net = None + self.classes = [] + self.colors = [] + self.target_fps = 10 + self.last_frame_time = 0 + self.frame_interval = 1.0 / self.target_fps + self.available_cameras = [] + self.model_dir = "" + self.cuda_available = self.check_cuda() + self.config = Config() + self.latest_frames = {} # Store latest frames from each camera + self.frame_lock = QMutex() # Mutex for thread-safe frame access + self.scan_thread = None # Background scanner thread + self.camera_names = {} # Mapping index->friendly name (best effort) + + # Load settings + self.confidence_threshold = self.config.load_setting('confidence_threshold', 0.35) + self.network_cameras = self.config.load_setting('network_cameras', {}) + self.target_fps = self.config.load_setting('last_fps', 10) + self.frame_interval = 1.0 / self.target_fps + + # Load last used model if available + last_model = self.config.load_setting('last_model_dir') + if last_model and os.path.exists(last_model): + self.load_yolo_model(last_model) + + def check_cuda(self): + """Check if CUDA is available""" + try: + count = cv2.cuda.getCudaEnabledDeviceCount() + return count > 0 + except: + return False + + def add_network_camera(self, name, url): + """Add a network camera to the saved list""" + self.network_cameras[name] = url + self.config.save_setting('network_cameras', self.network_cameras) + + def remove_network_camera(self, name): + """Remove a network camera from the saved list""" + if name in self.network_cameras: + del self.network_cameras[name] + self.config.save_setting('network_cameras', self.network_cameras) + + def get_platform_backend(self): + """Get appropriate video capture backend for current platform""" + try: + if sys.platform.startswith('win'): + return cv2.CAP_DSHOW + elif sys.platform.startswith('darwin'): + return cv2.CAP_AVFOUNDATION + else: + return cv2.CAP_V4L2 + except Exception: + # Fallback to auto-detect if constants are missing + return cv2.CAP_ANY + + def get_camera_names_windows(self, cams): + """Get camera names on Windows using DirectShow (COM).""" + names = {} + + import platform + if platform.system().lower() != "windows": + for c in cams: + names[c] = None + return names + + try: + import comtypes + from comtypes import GUID, POINTER, HRESULT, COMMETHOD, BSTR + from ctypes import c_ulong, byref + from comtypes.automation import VARIANT + + # GUIDs + CLSID_SystemDeviceEnum = GUID("{62BE5D10-60EB-11D0-BD3B-00A0C911CE86}") + CLSID_VideoInputDeviceCategory = GUID("{860BB310-5D01-11D0-BD3B-00A0C911CE86}") + IID_ICreateDevEnum = GUID("{29840822-5B84-11D0-BD3B-00A0C911CE86}") + IID_IPropertyBag = GUID("{55272A00-42CB-11CE-8135-00AA004BB851}") + + # Interfaces + class IEnumMoniker(comtypes.IUnknown): + _iid_ = GUID("{00000102-0000-0000-C000-000000000046}") + _methods_ = [ + COMMETHOD([], HRESULT, 'Next', + (['in'], c_ulong, 'celt'), + (['out'], POINTER(POINTER(comtypes.IUnknown)), 'rgelt'), + (['out'], POINTER(c_ulong), 'pceltFetched')), + ] + + class IPropertyBag(comtypes.IUnknown): + _iid_ = IID_IPropertyBag + _methods_ = [ + COMMETHOD([], HRESULT, 'Read', + (['in'], BSTR, 'pszPropName'), + (['in', 'out'], POINTER(VARIANT), 'pVar'), + (['in'], POINTER(comtypes.IUnknown), 'pErrorLog')), + ] + + class ICreateDevEnum(comtypes.IUnknown): + _iid_ = IID_ICreateDevEnum + _methods_ = [ + COMMETHOD([], HRESULT, "CreateClassEnumerator", + (['in'], POINTER(GUID), 'clsidDeviceClass'), + (['out'], POINTER(POINTER(IEnumMoniker)), 'ppEnumMoniker')) + ] + + comtypes.CoInitialize() + dev_enum = comtypes.CoCreateInstance( + CLSID_SystemDeviceEnum, + interface=ICreateDevEnum + ) + + enum_moniker = POINTER(IEnumMoniker)() + hr = dev_enum.CreateClassEnumerator( + CLSID_VideoInputDeviceCategory, # pass GUID directly, no byref + byref(enum_moniker) # output pointer is byref + ) + + if hr != 0 or not enum_moniker: + raise RuntimeError("No video devices found") + + device_names = [] + fetched = c_ulong() + moniker = POINTER(comtypes.IUnknown)() + while enum_moniker.Next(1, byref(moniker), byref(fetched)) == 0: + prop_bag = moniker.BindToStorage(None, None, IPropertyBag) + if prop_bag: + name_var = VARIANT() + if prop_bag.Read("FriendlyName", byref(name_var), None) == 0: + device_names.append(str(name_var.value)) + moniker = POINTER(comtypes.IUnknown)() # release + + # map to cams + idx_only = [c for c in cams if not c.startswith("net:") and not c.startswith("/dev/")] + for i, cam in enumerate(idx_only): + names[cam] = device_names[i] if i < len(device_names) else None + + except Exception as e: + print(f"get_camera_names_windows failed: {e}") + for c in cams: + names[c] = None + + return names + + def start_camera_scan(self, max_to_check=10): + """Start background camera scan; emits cameras_scanned when done.""" + try: + if self.scan_thread and self.scan_thread.isRunning(): + # Already scanning; ignore + return False + self.scan_thread = CameraScanThread(self, max_to_check) + self.scan_thread.scan_finished.connect(self._on_scan_finished) + self.scan_thread.start() + return True + except Exception as e: + print(f"Failed to start camera scan: {e}") + return False + + def _on_scan_finished(self, cams, names): + # Store and forward via public signal + self.available_cameras = cams or [] + self.camera_names = names or {} + self.cameras_scanned.emit(self.available_cameras, self.camera_names) + + def scan_for_cameras_windows(self, max_to_check=10): + """Enhanced camera detection for Windows with multiple backend support""" + windows_cameras = [] + backends_to_try = [ + (cv2.CAP_DSHOW, "DSHOW"), + (cv2.CAP_MSMF, "MSMF"), + (cv2.CAP_ANY, "ANY") + ] + for backend, backend_name in backends_to_try: + print(f"Trying {backend_name} backend...") + for i in range(max_to_check): + try: + cap = cv2.VideoCapture(i, backend) + if cap.isOpened(): + ret, frame = cap.read() + if ret and frame is not None: + camera_id = f"{backend_name.lower()}:{i}" + if str(i) not in windows_cameras: + windows_cameras.append(str(i)) + print(f"Found camera {i} via {backend_name}") + cap.release() + else: + cap.release() + except Exception as e: + print(f"Error checking camera {i} with {backend_name}: {e}") + continue + return windows_cameras + + def scan_for_cameras(self, max_to_check=10): + """Check for available cameras with platform-specific backends""" + self.available_cameras = [] + + print(f"Scanning for cameras on {sys.platform}...") + + # Platform-specific detection + if sys.platform.startswith('win'): + cameras_found = self.scan_for_cameras_windows(max_to_check) + self.available_cameras.extend(cameras_found) + else: + # Linux/Unix/macOS detection + backend = cv2.CAP_AVFOUNDATION if sys.platform.startswith('darwin') else cv2.CAP_V4L2 + for i in range(max_to_check): + try: + cap = cv2.VideoCapture(i, backend) + if cap.isOpened(): + ret, frame = cap.read() + if ret and frame is not None: + self.available_cameras.append(str(i)) + cap.release() + except Exception as e: + print(f"Error checking camera {i}: {e}") + continue + + # Linux device paths + if sys.platform.startswith('linux'): + v4l_paths = [f"/dev/video{i}" for i in range(max_to_check)] + for path in v4l_paths: + if os.path.exists(path): + try: + cap = cv2.VideoCapture(path, cv2.CAP_V4L2) + if cap.isOpened() and path not in self.available_cameras: + self.available_cameras.append(path) + cap.release() + except Exception as e: + print(f"Error checking device {path}: {e}") + + # Add network cameras + network_count = 0 + for name, url in self.network_cameras.items(): + self.available_cameras.append(f"net:{name}") + network_count += 1 + + print( + f"Scan complete: Found {len(self.available_cameras) - network_count} local and {network_count} network cameras") + return self.available_cameras + + def load_yolo_model(self, model_dir): + """Load YOLO model from selected directory with better error handling""" + self.model_dir = model_dir + try: + # Find model files in the directory + weights = [f for f in os.listdir(model_dir) if f.endswith(('.weights', '.onnx'))] + configs = [f for f in os.listdir(model_dir) if f.endswith('.cfg')] + classes = [f for f in os.listdir(model_dir) if f.endswith('.names')] + + if not weights or not configs or not classes: + return False + + # Use the first found files + weights_path = os.path.join(model_dir, weights[0]) + config_path = os.path.join(model_dir, configs[0]) + classes_path = os.path.join(model_dir, classes[0]) + + self.net = cv2.dnn.readNet(weights_path, config_path) + + # Set backend based on availability + if self.cuda_available: + try: + self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA) + self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA) + except: + # Fall back to CPU if CUDA fails + self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) + self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) + else: + self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) + self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) + + try: + with open(classes_path, 'r') as f: + self.classes = f.read().strip().split('\n') + except FileNotFoundError: + pass + + np.random.seed(42) + self.colors = np.random.randint(0, 255, size=(len(self.classes), 3), dtype='uint8') + return True + except Exception as e: + print(f"Error loading YOLO model: {e}") + return False + + def connect_cameras(self, camera_paths): + """Connect to multiple cameras using background threads for smooth UI""" + self.disconnect_cameras() + + # Prepare internal state + self.cameras = [] # store identifiers/paths only + self.latest_frames = {} + + # Start one CameraThread per camera + for cam_index, cam_path in enumerate(camera_paths): + try: + thread = CameraThread(cam_index, cam_path, parent=self.parent()) + thread.set_fps(self.target_fps) + thread.frame_ready.connect(self._on_frame_ready) + thread.error_occurred.connect(self._on_camera_error) + self.camera_threads[cam_index] = thread + self.cameras.append(cam_path) + self.latest_frames[cam_index] = None + thread.start() + print(f"Started camera thread for {cam_path}") + except Exception as e: + print(f"Error starting camera thread for {cam_path}: {e}") + + success_count = len(self.camera_threads) + print(f"Camera connection summary: {success_count}/{len(camera_paths)} camera threads started") + return success_count > 0 + + def disconnect_cameras(self): + """Disconnect all cameras (stop threads)""" + # Stop and remove threads + for idx, thread in list(self.camera_threads.items()): + try: + thread.stop() + except Exception: + pass + try: + thread.deleteLater() + except Exception: + pass + self.camera_threads.clear() + self.cameras = [] + # Clear cached frames + self.frame_lock.lock() + try: + self.latest_frames = {} + finally: + self.frame_lock.unlock() + + def _on_frame_ready(self, cam_id, frame): + """Cache latest frame from a camera thread (non-blocking for UI).""" + self.frame_lock.lock() + try: + # Store a copy to avoid data races if producer reuses buffers + self.latest_frames[cam_id] = frame.copy() + finally: + self.frame_lock.unlock() + + def _on_camera_error(self, cam_id, message): + print(f"Camera {cam_id} error: {message}") + + def get_frames(self): + """Return latest frames without blocking the GUI thread.""" + frames = [] + # Snapshot current frames under lock + self.frame_lock.lock() + try: + for i, _ in enumerate(self.cameras): + frm = self.latest_frames.get(i) + if frm is None: + frames.append(np.zeros((720, 1280, 3), dtype=np.uint8)) + else: + frames.append(frm.copy()) + finally: + self.frame_lock.unlock() + + # Optionally run detection on the copies + parent_window = self.parent() + if parent_window and self.net is not None and parent_window.detection_enabled: + processed = [] + for f in frames: + try: + processed.append(self.get_detections(f)) + except Exception: + processed.append(f) + return processed + + return frames + + def get_detections(self, frame): + """Perform YOLO object detection on a frame with error handling""" + if self.net is None: + return frame + + try: + blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False) + self.net.setInput(blob) + + # Get output layer names compatible with different OpenCV versions + try: + layer_names = self.net.getLayerNames() + output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()] + except: + output_layers = self.net.getUnconnectedOutLayersNames() + + outputs = self.net.forward(output_layers) + + boxes = [] + confidences = [] + class_ids = [] + + for output in outputs: + for detection in output: + scores = detection[5:] + class_id = np.argmax(scores) + confidence = scores[class_id] + + if confidence > self.confidence_threshold: # Use configurable threshold + box = detection[0:4] * np.array([frame.shape[1], frame.shape[0], + frame.shape[1], frame.shape[0]]) + (centerX, centerY, width, height) = box.astype('int') + x = int(centerX - (width / 2)) + y = int(centerY - (height / 2)) + + boxes.append([x, y, int(width), int(height)]) + confidences.append(float(confidence)) + class_ids.append(class_id) + + indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confidence_threshold, 0.4) + + person_detected = False + if len(indices) > 0: + for i in indices.flatten(): + (x, y, w, h) = boxes[i] + color = [int(c) for c in self.colors[class_ids[i]]] + cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2) + cls_name = self.classes[class_ids[i]] if 0 <= class_ids[i] < len(self.classes) else str( + class_ids[i]) + text = f"{cls_name}: {confidences[i]:.2f}" + cv2.putText(frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) + if not person_detected and str(cls_name).lower() == 'person': + person_detected = True + # Auto-trigger alert if a person is detected on any camera and alerts are enabled + try: + if person_detected: + parent_window = self.parent() + if parent_window is not None: + # trigger_alert() has its own internal guards (enabled, cooldown, playing) + parent_window.trigger_alert() + except Exception: + pass + except Exception as e: + print(f"Detection error: {e}") + + return frame \ No newline at end of file diff --git a/mucapy/main.py b/mucapy/main.py index 42a3e46..0da20a5 100644 --- a/mucapy/main.py +++ b/mucapy/main.py @@ -1,4 +1,10 @@ import json +from PopoutWindow import PopoutWindow +from Config import Config +from CameraThread import CameraThread +from CameraScanThread import CameraScanThread +from AlertWorker import AlertWorker +from YoloClass import MultiCamYOLODetector try: import winreg except ImportError: @@ -15,6 +21,7 @@ import cv2 import numpy as np import psutil # Add psutil import import requests + try: from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QDateTime, QRect, QThread, pyqtSignal, QMutex, QObject, QEvent from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter, @@ -43,6 +50,7 @@ import todopackage.todo as todo # This shit will fail eventually | Or not IDK # Audio alert dependencies import wave + try: import simpleaudio as sa except Exception: @@ -74,1112 +82,23 @@ def bytes_to_human(n: int) -> str: return f"{val:.1f} {symbols[i]}" return f"{val:.2f} {symbols[i]}" - class getpath: + @staticmethod def resource_path(relative_path): - base_path = getattr(sys, '_MEIPASS', os.path.abspath(".")) + base_path = os.path.dirname(os.path.abspath(__file__)) return os.path.join(base_path, relative_path) -class Config: - def __init__(self): - # Use platform-specific user directory for config - if sys.platform.startswith('win'): - config_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'MuCaPy') - pictures_dir = os.path.join(os.environ.get('USERPROFILE', os.path.expanduser('~')), 'Pictures', 'MuCaPy') - else: - config_dir = os.path.join(os.path.expanduser('~'), '.config', 'mucapy') - pictures_dir = os.path.join(os.path.expanduser('~'), 'Pictures', 'MuCaPy') - - # Create config directory if it doesn't exist - os.makedirs(config_dir, exist_ok=True) - - self.config_file = os.path.join(config_dir, 'config.json') - self.settings = { - 'network_cameras': {}, # Store network cameras configuration - 'last_model_dir': '', - 'last_screenshot_dir': pictures_dir, - 'last_layout': 0, - 'last_fps': 10, - 'last_selected_cameras': [], - 'window_geometry': None, - 'confidence_threshold': 0.35, - } - self.load_config() - - def load_config(self): - """Load configuration from JSON file""" - try: - if os.path.exists(self.config_file): - with open(self.config_file, 'r') as f: - loaded_settings = json.load(f) - # Update settings while preserving default values for new keys - self.settings.update(loaded_settings) - except Exception as e: - print(f"Error loading config: {e}") - - def save_config(self): - """Save configuration to JSON file""" - try: - # Ensure the file's directory exists - os.makedirs(os.path.dirname(self.config_file), exist_ok=True) - try: - with open(self.config_file, 'w') as f: - json.dump(self.settings, f, indent=4) - except FileNotFoundError: - pass - except Exception as e: - print(f"Error saving config: {e}") - - def save_setting(self, key, value): - """Save a setting to configuration""" - self.settings[key] = value - self.save_config() - - def load_setting(self, key, default=None): - """Load a setting from configuration""" - return self.settings.get(key, default) -class CameraThread(QThread): - """Thread class for handling camera connections and frame grabbing""" - frame_ready = pyqtSignal(int, np.ndarray) # Signal to emit when new frame is ready (camera_index, frame) - error_occurred = pyqtSignal(int, str) # Signal to emit when error occurs (camera_index, error_message) - - def __init__(self, camera_id, camera_info, parent=None): - super().__init__(parent) - self.camera_id = camera_id - self.camera_info = camera_info - self.running = False - self.cap = None - self.mutex = QMutex() - self.frame_interval = 1.0 / 30 # Default to 30 FPS - self.reconnect_attempts = 3 # Number of reconnection attempts - self.reconnect_delay = 2 # Delay between reconnection attempts in seconds - - def set_fps(self, fps): - """Set the target FPS for frame capture""" - self.frame_interval = 1.0 / fps - - def validate_url(self, url): - """Validate and normalize URL format""" - try: - # Remove any whitespace - url = url.strip() - - # Parse the URL to validate its components - parsed = urllib.parse.urlparse(url) - - # Ensure scheme is present - if not parsed.scheme: - url = f"http://{url}" - parsed = urllib.parse.urlparse(url) - - # Validate DroidCam URL - if ':4747' in url: - # Ensure the path ends with /video - base_url = f"{parsed.scheme}://{parsed.netloc}" - return f"{base_url}/video" - - return url - except Exception as e: - print(f"URL validation error: {e}") - return None - - def construct_camera_url(self, camera_info): - """Construct proper camera URL with authentication if needed""" - try: - if isinstance(camera_info, dict): - url = camera_info.get('url', '') - else: - url = str(camera_info) - - # Validate and normalize the URL - url = self.validate_url(url) - if not url: - return None - - # Handle authentication if provided - if isinstance(camera_info, dict) and 'username' in camera_info and 'password' in camera_info: - parsed = urllib.parse.urlparse(url) - if '@' not in parsed.netloc: - auth = f"{urllib.parse.quote(camera_info['username'])}:{urllib.parse.quote(camera_info['password'])}" - netloc = f"{auth}@{parsed.netloc}" - url = parsed._replace(netloc=netloc).geturl() - - return url - except Exception as e: - print(f"Error constructing camera URL: {e}") - return None - - def connect_to_camera(self): - """Attempt to connect to the camera with retry logic""" - for attempt in range(self.reconnect_attempts): - try: - # Clean up any existing connection - if self.cap is not None: - self.cap.release() - self.cap = None - - if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'): - name = self.camera_info[4:] - detector = self.parent().detector if self.parent() else None - if not detector or name not in detector.network_cameras: - self.error_occurred.emit(self.camera_id, f"Network camera {name} not found") - return False - - camera_info = detector.network_cameras[name] - url = self.construct_camera_url(camera_info) - - if not url: - self.error_occurred.emit(self.camera_id, f"Invalid camera URL for {name}") - return False - - print(f"Attempting to connect to network camera URL: {url}") - - # For DroidCam, try to verify the endpoint is accessible first - if ':4747' in url: - try: - response = requests.get(url, timeout=2) - if response.status_code != 200: - print(f"DroidCam endpoint returned status {response.status_code}") - if attempt < self.reconnect_attempts - 1: - continue - return False - except requests.exceptions.RequestException as e: - print(f"Failed to connect to DroidCam: {e}") - if attempt < self.reconnect_attempts - 1: - time.sleep(self.reconnect_delay) - continue - return False - - # Create VideoCapture with the URL - self.cap = cv2.VideoCapture() - # Set buffer size to minimize latency - self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) - - # Open the connection - if not self.cap.open(url): - print(f"Failed to open URL: {url}") - if attempt < self.reconnect_attempts - 1: - time.sleep(self.reconnect_delay) - continue - return False - - else: - # Local camera - self.cap = cv2.VideoCapture(int(self.camera_info) if str(self.camera_info).isdigit() else self.camera_info) - - # Verify the connection is working - if not self.cap.isOpened(): - print("Camera not opened") - if attempt < self.reconnect_attempts - 1: - time.sleep(self.reconnect_delay) - continue - return False - - # Test read a frame - ret, frame = self.cap.read() - if not ret or frame is None: - print("Failed to read test frame") - self.cap.release() - if attempt < self.reconnect_attempts - 1: - time.sleep(self.reconnect_delay) - continue - return False - - print(f"Successfully connected to camera") - return True - - except Exception as e: - print(f"Connection attempt {attempt + 1} failed: {str(e)}") - - if self.cap: - self.cap.release() - self.cap = None - - if attempt < self.reconnect_attempts - 1: - time.sleep(self.reconnect_delay) - else: - self.error_occurred.emit(self.camera_id, str(e)) - return False - - return False - - def run(self): - """Main thread loop""" - try: - if not self.connect_to_camera(): - self.error_occurred.emit(self.camera_id, "Failed to connect to camera after multiple attempts") - return - - self.running = True - last_frame_time = time.time() - consecutive_failures = 0 - - while self.running: - self.mutex.lock() - if not self.running: - self.mutex.unlock() - break - - # Check if enough time has passed since last frame - current_time = time.time() - if current_time - last_frame_time < self.frame_interval: - self.mutex.unlock() - time.sleep(0.001) # Small sleep to prevent CPU hogging - continue - - ret, frame = self.cap.read() - self.mutex.unlock() - - if ret: - consecutive_failures = 0 # Reset failure counter on success - self.frame_ready.emit(self.camera_id, frame) - last_frame_time = current_time - else: - consecutive_failures += 1 - if consecutive_failures >= 5: # Try to reconnect after 5 consecutive failures - print(f"Multiple frame read failures, attempting to reconnect...") - self.cap.release() - if not self.connect_to_camera(): - self.error_occurred.emit(self.camera_id, "Failed to reconnect to camera") - break - consecutive_failures = 0 - time.sleep(0.1) # Small delay before next attempt - - except Exception as e: - self.error_occurred.emit(self.camera_id, str(e)) - - finally: - self.cleanup() - - def stop(self): - """Stop the thread safely""" - self.mutex.lock() - self.running = False - self.mutex.unlock() - self.wait() - - def cleanup(self): - """Clean up camera resources""" - if self.cap: - self.cap.release() - self.running = False -class CameraScanThread(QThread): - scan_finished = pyqtSignal(list, dict) - def __init__(self, detector, max_to_check=10, parent=None): - super().__init__(parent) - self.detector = detector - self.max_to_check = max_to_check - def run(self): - try: - cams = self.detector.scan_for_cameras(self.max_to_check) - names = {} - if sys.platform.startswith('win'): - try: - names = self.detector.get_camera_names_windows(cams) - except Exception as e: - print(f"Failed to get Windows camera names: {e}") - names = {} - self.scan_finished.emit(cams, names) - except Exception as e: - print(f"CameraScanThread error: {e}") - self.scan_finished.emit([], {}) - -class AlertWorker(QThread): - """Worker thread to play an alert sound safely without blocking UI. - Uses winsound on Windows, external system players on Unix (afplay/paplay/aplay/ffplay), - and falls back to simpleaudio if available. Supports cooperative stop. - """ - finished = pyqtSignal(bool, str) # success, message - - def __init__(self, wav_path: str, parent=None): - super().__init__(parent) - self.wav_path = wav_path - self._stop = False - self._subproc = None - self._play_obj = None - - def stop(self): - """Request the worker to stop early.""" - try: - self._stop = True - if self._play_obj is not None: - try: - self._play_obj.stop() - except Exception: - pass - if self._subproc is not None: - try: - self._subproc.terminate() - except Exception: - pass - except Exception: - pass - - def _find_unix_player(self): - """Return (cmd_list, name) for an available player on Unix or (None, None).""" - try: - if sys.platform.startswith('darwin'): - if shutil.which('afplay'): - return (['afplay'], 'afplay') - # Linux and others - if shutil.which('paplay'): - return (['paplay'], 'paplay') - if shutil.which('aplay'): - return (['aplay', '-q'], 'aplay') - if shutil.which('ffplay'): - return (['ffplay', '-nodisp', '-autoexit', '-loglevel', 'error'], 'ffplay') - except Exception: - pass - return (None, None) - - def run(self): - try: - if not os.path.exists(self.wav_path): - self.finished.emit(False, f"File not found: {self.wav_path}") - return - - # Windows path: prefer winsound (native, safe) - if sys.platform.startswith('win'): - ws_error = "unknown" - try: - import winsound as _ws # type: ignore - # Resolve flags safely even if some attributes are missing - SND_FILENAME = getattr(_ws, 'SND_FILENAME', 0x00020000) - SND_SYNC = getattr(_ws, 'SND_SYNC', 0x0000) # 0 is synchronous by default - flags = SND_FILENAME | SND_SYNC - # Ensure PlaySound exists - play_fn = getattr(_ws, 'PlaySound', None) - if play_fn is None: - raise RuntimeError('winsound.PlaySound not available') - for _ in range(4): - if self._stop: - break - try: - play_fn(self.wav_path, flags) - except Exception as e: - # On failure, break to try alternative backends - ws_error = str(e) - break - time.sleep(0.002) - else: - # Completed all 4 plays - self.finished.emit(True, "Alert played") - return - # If here, winsound failed at some point; continue to fallbacks - except Exception as e: - ws_error = str(e) - # Try simpleaudio on Windows as fallback - if sa is not None: - try: - with wave.open(self.wav_path, 'rb') as wf: - n_channels = max(1, wf.getnchannels()) - sampwidth = max(1, wf.getsampwidth()) - framerate = max(8000, wf.getframerate() or 44100) - frames = wf.readframes(wf.getnframes()) - for _ in range(4): - if self._stop: - break - self._play_obj = sa.play_buffer(frames, n_channels, sampwidth, framerate) - self._play_obj.wait_done() - time.sleep(0.002) - self.finished.emit(True, "Alert played") - return - except Exception as e2: - self.finished.emit(False, f"Playback error (winsound fallback -> simpleaudio): {e2}") - return - else: - self.finished.emit(False, f"Audio backend not available (winsound failed: {ws_error})") - return - - # Non-Windows: try external players first - cmd, name = self._find_unix_player() - if cmd is not None: - for _ in range(4): - if self._stop: - break - try: - self._subproc = subprocess.Popen(cmd + [self.wav_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - # Poll until done or stop requested - while True: - if self._stop: - try: - self._subproc.terminate() - except Exception: - pass - break - ret = self._subproc.poll() - if ret is not None: - break - time.sleep(0.01) - except Exception as e: - # Try next backend - cmd = None - break - finally: - self._subproc = None - time.sleep(0.002) - if cmd is not None: - self.finished.emit(True, "Alert played") - return - - # Fallback: simpleaudio if available - if sa is not None: - try: - with wave.open(self.wav_path, 'rb') as wf: - n_channels = max(1, wf.getnchannels()) - sampwidth = max(1, wf.getsampwidth()) - framerate = max(8000, wf.getframerate() or 44100) - frames = wf.readframes(wf.getnframes()) - for _ in range(4): - if self._stop: - break - self._play_obj = sa.play_buffer(frames, n_channels, sampwidth, framerate) - self._play_obj.wait_done() - time.sleep(0.002) - self.finished.emit(True, "Alert played") - return - except Exception as e: - self.finished.emit(False, f"Playback error (simpleaudio): {e}") - return - - self.finished.emit(False, "No audio backend available (afplay/paplay/aplay/ffplay/simpleaudio)") - except Exception as e: - try: - self.finished.emit(False, str(e)) - except Exception: - pass - -class MultiCamYOLODetector(QObject): - cameras_scanned = pyqtSignal(list, dict) # Emits (available_cameras, index_to_name) - def __init__(self, parent=None): - super().__init__(parent) - self.cameras = [] - self.camera_threads = {} # Dictionary to store camera threads - self.net = None - self.classes = [] - self.colors = [] - self.target_fps = 10 - self.last_frame_time = 0 - self.frame_interval = 1.0 / self.target_fps - self.available_cameras = [] - self.model_dir = "" - self.cuda_available = self.check_cuda() - self.config = Config() - self.latest_frames = {} # Store latest frames from each camera - self.frame_lock = QMutex() # Mutex for thread-safe frame access - self.scan_thread = None # Background scanner thread - self.camera_names = {} # Mapping index->friendly name (best effort) - - # Load settings - self.confidence_threshold = self.config.load_setting('confidence_threshold', 0.35) - self.network_cameras = self.config.load_setting('network_cameras', {}) - self.target_fps = self.config.load_setting('last_fps', 10) - self.frame_interval = 1.0 / self.target_fps - - # Load last used model if available - last_model = self.config.load_setting('last_model_dir') - if last_model and os.path.exists(last_model): - self.load_yolo_model(last_model) - - def check_cuda(self): - """Check if CUDA is available""" - try: - count = cv2.cuda.getCudaEnabledDeviceCount() - return count > 0 - except: - return False - - def add_network_camera(self, name, url): - """Add a network camera to the saved list""" - self.network_cameras[name] = url - self.config.save_setting('network_cameras', self.network_cameras) - - def remove_network_camera(self, name): - """Remove a network camera from the saved list""" - if name in self.network_cameras: - del self.network_cameras[name] - self.config.save_setting('network_cameras', self.network_cameras) - - def get_platform_backend(self): - """Get appropriate video capture backend for current platform""" - try: - if sys.platform.startswith('win'): - return cv2.CAP_DSHOW - elif sys.platform.startswith('darwin'): - return cv2.CAP_AVFOUNDATION - else: - return cv2.CAP_V4L2 - except Exception: - # Fallback to auto-detect if constants are missing - return cv2.CAP_ANY - - def get_camera_names_windows(self, cams): - """Best-effort map of camera index -> device friendly name on Windows.""" - names = {} - try: - # Query camera-like PnP devices - ps_cmd = ( - "Get-CimInstance Win32_PnPEntity | " - "Where-Object { $_.PNPClass -in @('Camera','Image') -and $_.Status -eq 'OK' } | " - "Select-Object -ExpandProperty Name" - ) - result = subprocess.run([ - 'powershell', '-NoProfile', '-ExecutionPolicy', 'Bypass', ps_cmd - ], capture_output=True, text=True, timeout=5) - lines = [l.strip() for l in result.stdout.splitlines() if l.strip()] - # Assign names in order to numeric indices - idx_only = [c for c in cams if not c.startswith('net:') and not c.startswith('/dev/')] - for i, cam in enumerate(idx_only): - try: - names[cam] = lines[i] if i < len(lines) else None - except Exception: - names[cam] = None - except Exception as e: - print(f"get_camera_names_windows failed: {e}") - return names - - def start_camera_scan(self, max_to_check=10): - """Start background camera scan; emits cameras_scanned when done.""" - try: - if self.scan_thread and self.scan_thread.isRunning(): - # Already scanning; ignore - return False - self.scan_thread = CameraScanThread(self, max_to_check) - self.scan_thread.scan_finished.connect(self._on_scan_finished) - self.scan_thread.start() - return True - except Exception as e: - print(f"Failed to start camera scan: {e}") - return False - - def _on_scan_finished(self, cams, names): - # Store and forward via public signal - self.available_cameras = cams or [] - self.camera_names = names or {} - self.cameras_scanned.emit(self.available_cameras, self.camera_names) - - def scan_for_cameras_windows(self, max_to_check=10): - """Enhanced camera detection for Windows with multiple backend support""" - windows_cameras = [] - backends_to_try = [ - (cv2.CAP_DSHOW, "DSHOW"), - (cv2.CAP_MSMF, "MSMF"), - (cv2.CAP_ANY, "ANY") - ] - for backend, backend_name in backends_to_try: - print(f"Trying {backend_name} backend...") - for i in range(max_to_check): - try: - cap = cv2.VideoCapture(i, backend) - if cap.isOpened(): - ret, frame = cap.read() - if ret and frame is not None: - camera_id = f"{backend_name.lower()}:{i}" - if str(i) not in windows_cameras: - windows_cameras.append(str(i)) - print(f"Found camera {i} via {backend_name}") - cap.release() - else: - cap.release() - except Exception as e: - print(f"Error checking camera {i} with {backend_name}: {e}") - continue - return windows_cameras - - def scan_for_cameras(self, max_to_check=10): - """Check for available cameras with platform-specific backends""" - self.available_cameras = [] - - print(f"Scanning for cameras on {sys.platform}...") - - # Platform-specific detection - if sys.platform.startswith('win'): - cameras_found = self.scan_for_cameras_windows(max_to_check) - self.available_cameras.extend(cameras_found) - else: - # Linux/Unix/macOS detection - backend = cv2.CAP_AVFOUNDATION if sys.platform.startswith('darwin') else cv2.CAP_V4L2 - for i in range(max_to_check): - try: - cap = cv2.VideoCapture(i, backend) - if cap.isOpened(): - ret, frame = cap.read() - if ret and frame is not None: - self.available_cameras.append(str(i)) - cap.release() - except Exception as e: - print(f"Error checking camera {i}: {e}") - continue - - # Linux device paths - if sys.platform.startswith('linux'): - v4l_paths = [f"/dev/video{i}" for i in range(max_to_check)] - for path in v4l_paths: - if os.path.exists(path): - try: - cap = cv2.VideoCapture(path, cv2.CAP_V4L2) - if cap.isOpened() and path not in self.available_cameras: - self.available_cameras.append(path) - cap.release() - except Exception as e: - print(f"Error checking device {path}: {e}") - - # Add network cameras - network_count = 0 - for name, url in self.network_cameras.items(): - self.available_cameras.append(f"net:{name}") - network_count += 1 - - print(f"Scan complete: Found {len(self.available_cameras) - network_count} local and {network_count} network cameras") - return self.available_cameras - - def load_yolo_model(self, model_dir): - """Load YOLO model from selected directory with better error handling""" - self.model_dir = model_dir - try: - # Find model files in the directory - weights = [f for f in os.listdir(model_dir) if f.endswith(('.weights', '.onnx'))] - configs = [f for f in os.listdir(model_dir) if f.endswith('.cfg')] - classes = [f for f in os.listdir(model_dir) if f.endswith('.names')] - - if not weights or not configs or not classes: - return False - - # Use the first found files - weights_path = os.path.join(model_dir, weights[0]) - config_path = os.path.join(model_dir, configs[0]) - classes_path = os.path.join(model_dir, classes[0]) - - self.net = cv2.dnn.readNet(weights_path, config_path) - - # Set backend based on availability - if self.cuda_available: - try: - self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA) - self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA) - except: - # Fall back to CPU if CUDA fails - self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) - self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) - else: - self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) - self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) - - try: - with open(classes_path, 'r') as f: - self.classes = f.read().strip().split('\n') - except FileNotFoundError: - pass - - np.random.seed(42) - self.colors = np.random.randint(0, 255, size=(len(self.classes), 3), dtype='uint8') - return True - except Exception as e: - print(f"Error loading YOLO model: {e}") - return False - - def connect_cameras(self, camera_paths): - """Connect to multiple cameras using background threads for smooth UI""" - self.disconnect_cameras() - - # Prepare internal state - self.cameras = [] # store identifiers/paths only - self.latest_frames = {} - - # Start one CameraThread per camera - for cam_index, cam_path in enumerate(camera_paths): - try: - thread = CameraThread(cam_index, cam_path, parent=self.parent()) - thread.set_fps(self.target_fps) - thread.frame_ready.connect(self._on_frame_ready) - thread.error_occurred.connect(self._on_camera_error) - self.camera_threads[cam_index] = thread - self.cameras.append(cam_path) - self.latest_frames[cam_index] = None - thread.start() - print(f"Started camera thread for {cam_path}") - except Exception as e: - print(f"Error starting camera thread for {cam_path}: {e}") - - success_count = len(self.camera_threads) - print(f"Camera connection summary: {success_count}/{len(camera_paths)} camera threads started") - return success_count > 0 - - def disconnect_cameras(self): - """Disconnect all cameras (stop threads)""" - # Stop and remove threads - for idx, thread in list(self.camera_threads.items()): - try: - thread.stop() - except Exception: - pass - try: - thread.deleteLater() - except Exception: - pass - self.camera_threads.clear() - self.cameras = [] - # Clear cached frames - self.frame_lock.lock() - try: - self.latest_frames = {} - finally: - self.frame_lock.unlock() - - def _on_frame_ready(self, cam_id, frame): - """Cache latest frame from a camera thread (non-blocking for UI).""" - self.frame_lock.lock() - try: - # Store a copy to avoid data races if producer reuses buffers - self.latest_frames[cam_id] = frame.copy() - finally: - self.frame_lock.unlock() - - def _on_camera_error(self, cam_id, message): - print(f"Camera {cam_id} error: {message}") - - def get_frames(self): - """Return latest frames without blocking the GUI thread.""" - frames = [] - # Snapshot current frames under lock - self.frame_lock.lock() - try: - for i, _ in enumerate(self.cameras): - frm = self.latest_frames.get(i) - if frm is None: - frames.append(np.zeros((720, 1280, 3), dtype=np.uint8)) - else: - frames.append(frm.copy()) - finally: - self.frame_lock.unlock() - - # Optionally run detection on the copies - parent_window = self.parent() - if parent_window and self.net is not None and parent_window.detection_enabled: - processed = [] - for f in frames: - try: - processed.append(self.get_detections(f)) - except Exception: - processed.append(f) - return processed - - return frames - - def get_detections(self, frame): - """Perform YOLO object detection on a frame with error handling""" - if self.net is None: - return frame - - try: - blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), swapRB=True, crop=False) - self.net.setInput(blob) - - # Get output layer names compatible with different OpenCV versions - try: - layer_names = self.net.getLayerNames() - output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()] - except: - output_layers = self.net.getUnconnectedOutLayersNames() - - outputs = self.net.forward(output_layers) - - boxes = [] - confidences = [] - class_ids = [] - - for output in outputs: - for detection in output: - scores = detection[5:] - class_id = np.argmax(scores) - confidence = scores[class_id] - - if confidence > self.confidence_threshold: # Use configurable threshold - box = detection[0:4] * np.array([frame.shape[1], frame.shape[0], - frame.shape[1], frame.shape[0]]) - (centerX, centerY, width, height) = box.astype('int') - x = int(centerX - (width / 2)) - y = int(centerY - (height / 2)) - - boxes.append([x, y, int(width), int(height)]) - confidences.append(float(confidence)) - class_ids.append(class_id) - - indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confidence_threshold, 0.4) - - person_detected = False - if len(indices) > 0: - for i in indices.flatten(): - (x, y, w, h) = boxes[i] - color = [int(c) for c in self.colors[class_ids[i]]] - cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2) - cls_name = self.classes[class_ids[i]] if 0 <= class_ids[i] < len(self.classes) else str(class_ids[i]) - text = f"{cls_name}: {confidences[i]:.2f}" - cv2.putText(frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) - if not person_detected and str(cls_name).lower() == 'person': - person_detected = True - # Auto-trigger alert if a person is detected on any camera and alerts are enabled - try: - if person_detected: - parent_window = self.parent() - if parent_window is not None: - # trigger_alert() has its own internal guards (enabled, cooldown, playing) - parent_window.trigger_alert() - except Exception: - pass - except Exception as e: - print(f"Detection error: {e}") - - return frame -class PopoutWindow(QMainWindow): - """Enhanced popout window with zoom, pan, overlays and guard-friendly controls""" - def __init__(self, source_display: QLabel, cam_id=None, parent=None): - super().__init__(parent) - self.setWindowTitle(f"Camera {cam_id}" if cam_id is not None else "Camera") - self.source_display = source_display # QLabel providing pixmap updates - self.cam_id = cam_id - self.zoom_factor = 1.0 - self.min_zoom = 0.2 - self.max_zoom = 5.0 - self.paused = False - self.show_grid = False - self.show_timestamp = True - self.setMinimumSize(640, 480) - # Drag-to-pan state - self.dragging = False - self.last_mouse_pos = None - - # Central area: toolbar + scrollable image label - central = QWidget() - vbox = QVBoxLayout(central) - vbox.setContentsMargins(4, 4, 4, 4) - vbox.setSpacing(4) - - # Toolbar with guard-friendly controls - toolbar = QHBoxLayout() - self.btn_zoom_in = QToolButton() - self.btn_zoom_in.setText("+") - self.btn_zoom_out = QToolButton() - self.btn_zoom_out.setText("-") - self.btn_zoom_reset = QToolButton() - self.btn_zoom_reset.setText("100%") - self.btn_pause = QToolButton() - self.btn_pause.setText("Pause") - self.btn_snapshot = QToolButton() - self.btn_snapshot.setText("Snapshot") - self.btn_grid = QToolButton() - self.btn_grid.setText("Grid") - self.btn_time = QToolButton() - self.btn_time.setText("Time") - self.btn_full = QToolButton() - self.btn_full.setText("Fullscreen") - - for b in [self.btn_zoom_out, self.btn_zoom_in, self.btn_zoom_reset, self.btn_pause, self.btn_snapshot, self.btn_grid, self.btn_time, self.btn_full]: - toolbar.addWidget(b) - toolbar.addStretch(1) - vbox.addLayout(toolbar) - - # Scroll area for panning when zoomed - self.image_label = QLabel() - self.image_label.setAlignment(Qt.AlignCenter) - self.scroll = QScrollArea() - self.scroll.setWidget(self.image_label) - self.scroll.setWidgetResizable(True) - vbox.addWidget(self.scroll, 1) - - self.setCentralWidget(central) - - # Shortcuts - QShortcut(QKeySequence("+"), self, activated=self.zoom_in) - QShortcut(QKeySequence("-"), self, activated=self.zoom_out) - QShortcut(QKeySequence("0"), self, activated=self.reset_zoom) - QShortcut(QKeySequence(Qt.Key_Escape), self, activated=self.close) - QShortcut(QKeySequence("F"), self, activated=self.toggle_fullscreen) - QShortcut(QKeySequence("Ctrl+S"), self, activated=self.take_snapshot) - QShortcut(QKeySequence("Space"), self, activated=self.toggle_pause) - QShortcut(QKeySequence("G"), self, activated=self.toggle_grid) - QShortcut(QKeySequence("T"), self, activated=self.toggle_timestamp) - - # Connect buttons - self.btn_zoom_in.clicked.connect(self.zoom_in) - self.btn_zoom_out.clicked.connect(self.zoom_out) - self.btn_zoom_reset.clicked.connect(self.reset_zoom) - self.btn_pause.clicked.connect(self.toggle_pause) - self.btn_snapshot.clicked.connect(self.take_snapshot) - self.btn_grid.clicked.connect(self.toggle_grid) - self.btn_time.clicked.connect(self.toggle_timestamp) - self.btn_full.clicked.connect(self.toggle_fullscreen) - - # Timer to refresh from source display - self.timer = QTimer(self) - self.timer.timeout.connect(self.refresh_frame) - self.timer.start(40) - - # Mouse wheel zoom support - self.image_label.installEventFilter(self) - - # Initial render - self.refresh_frame() - - def closeEvent(self, event): - if hasattr(self, 'timer') and self.timer: - self.timer.stop() - return super().closeEvent(event) - - def toggle_fullscreen(self): - if self.isFullScreen(): - self.showNormal() - self.btn_full.setText("Fullscreen") - else: - self.showFullScreen() - self.btn_full.setText("Windowed") - - def toggle_pause(self): - self.paused = not self.paused - self.btn_pause.setText("Resume" if self.paused else "Pause") - - def toggle_grid(self): - self.show_grid = not self.show_grid - - def toggle_timestamp(self): - self.show_timestamp = not self.show_timestamp - - def take_snapshot(self): - # Prefer using source_display method if available - if hasattr(self.source_display, 'take_screenshot'): - self.source_display.take_screenshot() - return - - def current_pixmap(self): - pm = self.source_display.pixmap() - return pm - - def refresh_frame(self): - if self.paused: - return - pm = self.current_pixmap() - if not pm: - return - # Create a copy to draw overlays without touching original - image = pm.toImage().convertToFormat(QImage.Format_ARGB32) - painter = QPainter(image) - painter.setRenderHint(QPainter.Antialiasing) - - # Timestamp overlay - if self.show_timestamp: - ts = QDateTime.currentDateTime().toString('yyyy-MM-dd hh:mm:ss') - text = ts - metrics = painter.fontMetrics() - w = metrics.width(text) + 14 - h = metrics.height() + 8 - rect = QRect(10, 10, w, h) - painter.setPen(Qt.NoPen) - painter.setBrush(QBrush(QColor(0, 0, 0, 160))) - painter.drawRoundedRect(rect, 6, 6) - painter.setPen(QPen(QColor(255, 255, 255))) - painter.drawText(rect, Qt.AlignCenter, text) - - # Grid overlay (rule-of-thirds) - if self.show_grid: - painter.setPen(QPen(QColor(255, 255, 255, 120), 1)) - img_w = image.width() - img_h = image.height() - for i in range(1, 3): - x = int(img_w * i / 3) - y = int(img_h * i / 3) - painter.drawLine(x, 0, x, img_h) - painter.drawLine(0, y, img_w, y) - painter.end() - - composed = QPixmap.fromImage(image) - if self.zoom_factor != 1.0: - target_w = int(composed.width() * self.zoom_factor) - target_h = int(composed.height() * self.zoom_factor) - composed = composed.scaled(target_w, target_h, Qt.KeepAspectRatio, Qt.SmoothTransformation) - self.image_label.setPixmap(composed) - # Update cursor based on ability to pan at this zoom/size - self.update_cursor() - - def zoom_in(self): - self.set_zoom(self.zoom_factor * 1.2) - - def zoom_out(self): - self.set_zoom(self.zoom_factor / 1.2) - - def reset_zoom(self): - self.set_zoom(1.0) - - def set_zoom(self, z): - z = max(self.min_zoom, min(self.max_zoom, z)) - if abs(z - self.zoom_factor) > 1e-4: - self.zoom_factor = z - self.refresh_frame() - self.update_cursor() - - def can_pan(self): - # Allow panning when the pixmap is larger than the viewport (zoomed) - if not self.image_label.pixmap(): - return False - vp = self.scroll.viewport().size() - pm = self.image_label.pixmap().size() - return pm.width() > vp.width() or pm.height() > vp.height() - - def update_cursor(self): - if self.can_pan(): - self.image_label.setCursor(Qt.OpenHandCursor if not self.dragging else Qt.ClosedHandCursor) - else: - self.image_label.setCursor(Qt.ArrowCursor) - - def eventFilter(self, obj, event): - if obj is self.image_label: - # Mouse wheel zoom centered on cursor - if event.type() == QEvent.Wheel: - delta = event.angleDelta().y() - if delta > 0: - self.zoom_in() - else: - self.zoom_out() - return True - # Start drag - if event.type() == QEvent.MouseButtonPress and event.button() == Qt.LeftButton and self.can_pan(): - self.dragging = True - self.last_mouse_pos = event.pos() - self.update_cursor() - return True - # Dragging - if event.type() == QEvent.MouseMove and self.dragging and self.last_mouse_pos is not None: - delta = event.pos() - self.last_mouse_pos - hbar = self.scroll.horizontalScrollBar() - vbar = self.scroll.verticalScrollBar() - hbar.setValue(hbar.value() - delta.x()) - vbar.setValue(vbar.value() - delta.y()) - self.last_mouse_pos = event.pos() - return True - # End drag - if event.type() == QEvent.MouseButtonRelease and event.button() == Qt.LeftButton: - if self.dragging: - self.dragging = False - self.last_mouse_pos = None - self.update_cursor() - return True - if event.type() == QEvent.Enter or event.type() == QEvent.Leave: - # Update cursor when entering/leaving the label - if event.type() == QEvent.Leave: - self.dragging = False - self.last_mouse_pos = None - self.update_cursor() - return super().eventFilter(obj, event) - class CameraDisplay(QLabel): """Custom QLabel for displaying camera feed with fullscreen support""" + def __init__(self, parent=None): super().__init__(parent) self.setAlignment(Qt.AlignCenter) self.setText("No camera feed") - + self.get_camera_display_style = getpath.resource_path("styling/camera_display.qss") try: - with open(self.get_camera_display_style,"r") as cdst: + with open(self.get_camera_display_style, "r") as cdst: self.setStyleSheet(cdst.read()) except FileNotFoundError: pass @@ -1191,25 +110,25 @@ class CameraDisplay(QLabel): self.config = Config() self.screenshot_dir = self.config.load_setting('screenshot_dir', os.path.expanduser('~/Pictures/MuCaPy')) self.camera_name = None - + # Create screenshot directory if it doesn't exist if not os.path.exists(self.screenshot_dir): os.makedirs(self.screenshot_dir, exist_ok=True) - + def set_cam_id(self, cam_id): """Set camera identifier for this display""" self.cam_id = cam_id - + def set_camera_name(self, name): """Set the camera name for display""" self.camera_name = name self.update() - + def take_screenshot(self): """Take a screenshot of the current frame""" if not self.pixmap(): return - + # Ask for screenshot directory if not set if not self.screenshot_dir: dir_path = QFileDialog.getExistingDirectory( @@ -1225,25 +144,25 @@ class CameraDisplay(QLabel): os.makedirs(dir_path, exist_ok=True) else: return - + # Generate filename with timestamp timestamp = QDateTime.currentDateTime().toString('yyyy-MM-dd_hh-mm-ss') filename = f"camera_{self.cam_id}_{timestamp}.png" filepath = os.path.join(self.screenshot_dir, filename) - + # Save the image if self.pixmap().save(filepath): QMessageBox.information(self, "Success", f"Screenshot saved to:\n{filepath}") else: QMessageBox.critical(self, "Error", "Failed to save screenshot") - + def mouseDoubleClickEvent(self, event): """Handle double click to toggle fullscreen""" if self.pixmap() and not self.fullscreen_window: self.show_fullscreen() elif self.fullscreen_window: self.close_fullscreen() - + def show_fullscreen(self): """Show this camera in a new window (enhanced popout)""" if not self.pixmap(): @@ -1255,7 +174,7 @@ class CameraDisplay(QLabel): self.fullscreen_window.resize(min(1280, int(screen.width() * 0.9)), min(720, int(screen.height() * 0.9))) self.fullscreen_window.show() # ESC shortcut already handled inside PopoutWindow - + def update_fullscreen(self, label): """Kept for backward compatibility; PopoutWindow manages its own refresh.""" if self.pixmap(): @@ -1264,31 +183,34 @@ class CameraDisplay(QLabel): Qt.KeepAspectRatio, Qt.SmoothTransformation )) - + def close_fullscreen(self): """Close the fullscreen window""" if self.fullscreen_window: self.fullscreen_window.close() self.fullscreen_window = None - + def paintEvent(self, event): """Override paint event to draw camera name overlay""" super().paintEvent(event) if self.camera_name and self.pixmap(): painter = QPainter(self) painter.setRenderHint(QPainter.Antialiasing) - + # Draw semi-transparent background painter.setPen(Qt.NoPen) painter.setBrush(QBrush(QColor(0, 0, 0, 180))) rect = QRect(10, 10, painter.fontMetrics().width(self.camera_name) + 20, 30) painter.drawRoundedRect(rect, 5, 5) - + # Draw text painter.setPen(QPen(QColor(255, 255, 255))) painter.drawText(rect, Qt.AlignCenter, self.camera_name) + + class CollapsibleDock(QDockWidget): """Custom dock widget with collapse/expand functionality""" + def __init__(self, title, parent=None): super().__init__(title, parent) self.setFeatures(QDockWidget.DockWidgetClosable | @@ -1300,7 +222,7 @@ class CollapsibleDock(QDockWidget): self.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Expanding) # Ensure the dock paints its own background (prevents visual bleed/clip) self.setAttribute(Qt.WA_StyledBackground, True) - + # Create a widget for the title bar that contains both toggle button and close button title_widget = QWidget() title_layout = QHBoxLayout(title_widget) @@ -1308,29 +230,29 @@ class CollapsibleDock(QDockWidget): title_layout.setSpacing(0) # Ensure title bar doesn't force tiny width title_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Fixed) - + self.toggle_button = QToolButton() self.toggle_button.setIcon(QIcon.fromTheme("arrow-left")) self.toggle_button.setIconSize(QSize(16, 16)) self.toggle_button.setStyleSheet("border: none;") self.toggle_button.clicked.connect(self.toggle_collapse) - + title_layout.addWidget(self.toggle_button) title_layout.addStretch() - + self.setTitleBarWidget(title_widget) self.collapsed = False self.original_size = None self.original_minimum_width = None self.original_maximum_width = None - + def toggle_collapse(self): """Toggle between collapsed and expanded states""" if self.collapsed: self.expand() else: self.collapse() - + def collapse(self): """Collapse the dock widget (fully hide).""" if not self.collapsed: @@ -1341,7 +263,7 @@ class CollapsibleDock(QDockWidget): self.setVisible(False) self.toggle_button.setIcon(QIcon.fromTheme("arrow-right")) self.collapsed = True - + def expand(self): """Expand (show) the dock widget""" if self.collapsed: @@ -1359,6 +281,8 @@ class CollapsibleDock(QDockWidget): self.raise_() self.toggle_button.setIcon(QIcon.fromTheme("arrow-left")) self.collapsed = False + + class AboutWindow(QDialog): def __init__(self, parent=None): global todo_style_path @@ -1415,7 +339,7 @@ class AboutWindow(QDialog): self.toggle_btn.setChecked(False) toggle_btn_style = getpath.resource_path("styling/togglebtnabout.qss") try: - with open(toggle_btn_style,"r") as tgbstyle: + with open(toggle_btn_style, "r") as tgbstyle: self.toggle_btn.setStyleSheet(tgbstyle.read()) except FileNotFoundError: pass @@ -1457,7 +381,7 @@ class AboutWindow(QDialog): # Set Styling for About Section style_file = getpath.resource_path("styling/about.qss") try: - with open(style_file,"r") as aboutstyle: + with open(style_file, "r") as aboutstyle: self.setStyleSheet(aboutstyle.read()) except FileNotFoundError: pass @@ -1474,7 +398,7 @@ class AboutWindow(QDialog): # TODO: Fix this xD ; Fixing a TODO lol try: todo_style_path = getpath.resource_path("styling/todostyle.qss") - with open(todo_style_path,"r") as tdf: + with open(todo_style_path, "r") as tdf: todo_label.setStyleSheet(tdf.read()) # here we have our wonderfull fix if True == True: @@ -1492,7 +416,7 @@ class AboutWindow(QDialog): todo_archive_label.setWordWrap(True) todo_archive_label.setAlignment(Qt.AlignLeft) todo_archive_label.setStyleSheet("color: #02d1fa ;font-style: italic;") - + self.info_obj = todo info_text = self.get_info_text() info_label = QLabel(f"
{info_text}")
@@ -1577,12 +501,12 @@ class AboutWindow(QDialog):
# If we are on Linux we display the QTVAR
if platform.system() == "Linux":
- info["XDG_ENVIROMENT_TYPE "] = initQT.getenv(self) # get the stupid env var of qt
+ info["XDG_ENVIROMENT_TYPE "] = initQT.getenv(self) # get the stupid env var of qt
else:
pass
mem = psutil.virtual_memory()
- info['MemoryGB'] = mem.total // (1024**3)
+ info['MemoryGB'] = mem.total // (1024 ** 3)
info['Memory'] = f"{info['MemoryGB']} GB RAM"
info['CPU Cores'] = psutil.cpu_count(logical=False)
@@ -1610,7 +534,7 @@ class AboutWindow(QDialog):
return "Invalid TODO format."
except Exception as e:
return f"Error retrieving TODO: {e}"
-
+
def get_info_text(self):
try:
info_text = self.info_obj.todo.getinfo()
@@ -1620,11 +544,11 @@ class AboutWindow(QDialog):
return "Invalid"
except Exception as e:
return f"fuck you => {e}"
-
+
def get_archive_text(self):
try:
todo_archive_text = self.todo_archive_object.todo.getarchive()
- if isinstance(todo_archive_text,str):
+ if isinstance(todo_archive_text, str):
return todo_archive_text.strip()
else:
return "invalid format??"
@@ -1634,7 +558,7 @@ class AboutWindow(QDialog):
def get_cam_text(self):
try:
cam_text = self.camobj.todo.getcams()
- if isinstance(cam_text,str):
+ if isinstance(cam_text, str):
return cam_text.strip()
else:
return "invalid cam format"
@@ -1643,42 +567,42 @@ class AboutWindow(QDialog):
class NetworkCameraDialog(QDialog):
-
+
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Network Camera Settings")
self.setModal(True)
self.resize(500, 400)
-
+
layout = QVBoxLayout(self)
-
+
# Instructions label
instructions = QLabel(todo.todo.get_instructions_CaSeDi_QLabel())
instructions.setWordWrap(True)
layout.addWidget(instructions)
-
+
# Camera list
self.camera_list = QListWidget()
layout.addWidget(self.camera_list)
-
+
# Input fields
form_layout = QFormLayout()
-
+
# Name and URL
self.name_edit = QLineEdit()
self.url_edit = QLineEdit()
form_layout.addRow("Name:", self.name_edit)
form_layout.addRow("URL:", self.url_edit)
-
+
# Authentication group
auth_group = QGroupBox("Authentication")
auth_layout = QVBoxLayout()
-
+
self.auth_checkbox = QCheckBox("Enable Authentication")
self.auth_checkbox.stateChanged.connect(self.toggle_auth_fields)
auth_layout.addWidget(self.auth_checkbox)
-
+
auth_form = QFormLayout()
self.username_edit = QLineEdit()
self.password_edit = QLineEdit()
@@ -1686,16 +610,16 @@ class NetworkCameraDialog(QDialog):
auth_form.addRow("Username:", self.username_edit)
auth_form.addRow("Password:", self.password_edit)
auth_layout.addLayout(auth_form)
-
+
auth_group.setLayout(auth_layout)
form_layout.addRow(auth_group)
-
+
layout.addLayout(form_layout)
-
+
# Initially disable auth fields
self.username_edit.setEnabled(False)
self.password_edit.setEnabled(False)
-
+
# Buttons
btn_layout = QHBoxLayout()
add_btn = QPushButton("Add Camera")
@@ -1704,15 +628,15 @@ class NetworkCameraDialog(QDialog):
remove_btn.clicked.connect(self.remove_camera)
close_btn = QPushButton("Close")
close_btn.clicked.connect(self.accept)
-
+
btn_layout.addWidget(add_btn)
btn_layout.addWidget(remove_btn)
btn_layout.addWidget(close_btn)
layout.addLayout(btn_layout)
-
+
self.detector = parent.detector if parent else None
self.load_cameras()
-
+
def toggle_auth_fields(self, state):
"""Enable/disable authentication fields based on checkbox state"""
enabled = state == Qt.Checked
@@ -1721,12 +645,12 @@ class NetworkCameraDialog(QDialog):
if not enabled:
self.username_edit.clear()
self.password_edit.clear()
-
+
def load_cameras(self):
"""Load saved network cameras into the list"""
if not self.detector:
return
-
+
self.camera_list.clear()
for name, camera_info in self.detector.network_cameras.items():
if isinstance(camera_info, dict):
@@ -1739,40 +663,42 @@ class NetworkCameraDialog(QDialog):
# Handle old format where camera_info was just the URL
display_text = f"{name} ({camera_info})"
self.camera_list.addItem(display_text)
-
+
def add_camera(self):
"""Add a new network camera"""
name = self.name_edit.text().strip()
url = self.url_edit.text().strip()
-
+
if not name or not url:
QMessageBox.warning(self, "Error", "Please enter both name and URL")
return
-
+
# Ensure URL has proper format for DroidCam
if ':4747' in url:
if not url.endswith('/video'):
url = url.rstrip('/') + '/video'
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://' + url
-
+
if self.detector:
print(f"Adding network camera: {name} with URL: {url}") # Debug print
self.detector.add_network_camera(name, url)
self.load_cameras()
self.name_edit.clear()
self.url_edit.clear()
-
+
def remove_camera(self):
"""Remove selected network camera"""
current = self.camera_list.currentItem()
if not current:
return
-
+
name = current.text().split(" (")[0]
if self.detector:
self.detector.remove_network_camera(name)
self.load_cameras()
+
+
class CameraSelectorDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
@@ -1780,13 +706,13 @@ class CameraSelectorDialog(QDialog):
self.setModal(True)
self.resize(900, 650) # Increased size for better visibility
self.setSizeGripEnabled(True)
-
+
self.detector = parent.detector if parent else None
self.selected_cameras = []
-
+
# Main layout
layout = QVBoxLayout(self)
-
+
# Instructions with better formatting
instructions = QLabel(todo.todo.get_instructions_CaSeDi_QLabel())
print(todo.todo.get_instructions_CaSeDi_QLabel())
@@ -1794,17 +720,17 @@ class CameraSelectorDialog(QDialog):
instructions.setStyleSheet("QLabel { background-color: #2A2A2A; padding: 10px; border-radius: 4px; }")
instructions.setWordWrap(True)
layout.addWidget(instructions)
-
+
# Split view for cameras
splitter = QSplitter(Qt.Horizontal)
splitter.setChildrenCollapsible(False)
splitter.setHandleWidth(6)
-
+
# Left side - Available Cameras
left_widget = QWidget()
left_layout = QVBoxLayout(left_widget)
left_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
-
+
# Local Cameras Group
local_group = QGroupBox("Local Cameras")
local_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
@@ -1815,7 +741,7 @@ class CameraSelectorDialog(QDialog):
local_layout.addWidget(self.local_list)
local_group.setLayout(local_layout)
left_layout.addWidget(local_group)
-
+
# Network Cameras Group
network_group = QGroupBox("Network Cameras")
network_group.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
@@ -1826,61 +752,61 @@ class CameraSelectorDialog(QDialog):
network_layout.addWidget(self.network_list)
network_group.setLayout(network_layout)
left_layout.addWidget(network_group)
-
+
# Camera management buttons
btn_layout = QHBoxLayout()
self.refresh_btn = QPushButton("Refresh")
self.refresh_btn.clicked.connect(self.refresh_cameras)
add_net_btn = QPushButton("Add Network Camera")
add_net_btn.clicked.connect(self.show_network_dialog)
-
+
btn_layout.addWidget(self.refresh_btn)
btn_layout.addWidget(add_net_btn)
left_layout.addLayout(btn_layout)
-
+
# Make lists expand and buttons stay minimal in left pane
left_layout.setStretch(0, 1)
left_layout.setStretch(1, 1)
left_layout.setStretch(2, 0)
-
+
splitter.addWidget(left_widget)
splitter.setStretchFactor(0, 1)
-
+
# Right side - Selected Cameras Preview
right_widget = QWidget()
right_layout = QVBoxLayout(right_widget)
right_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
-
+
preview_label = QLabel("Selected Cameras Preview")
preview_label.setStyleSheet("font-weight: bold;")
right_layout.addWidget(preview_label)
-
+
self.preview_list = QListWidget()
self.preview_list.setDragDropMode(QListWidget.InternalMove)
self.preview_list.setSelectionMode(QListWidget.ExtendedSelection)
self.preview_list.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
right_layout.addWidget(self.preview_list)
-
+
# Preview controls
preview_btn_layout = QHBoxLayout()
remove_btn = QPushButton("Remove Selected")
remove_btn.clicked.connect(self.remove_selected)
clear_btn = QPushButton("Clear All")
clear_btn.clicked.connect(self.clear_selection)
-
+
preview_btn_layout.addWidget(remove_btn)
preview_btn_layout.addWidget(clear_btn)
right_layout.addLayout(preview_btn_layout)
-
+
# Make preview list expand and buttons stay minimal in right pane
right_layout.setStretch(0, 0)
right_layout.setStretch(1, 1)
right_layout.setStretch(2, 0)
-
+
splitter.addWidget(right_widget)
splitter.setStretchFactor(1, 1)
layout.addWidget(splitter)
-
+
# Bottom buttons
bottom_layout = QHBoxLayout()
select_all_btn = QPushButton("Select All")
@@ -1889,38 +815,38 @@ class CameraSelectorDialog(QDialog):
ok_btn.clicked.connect(self.accept)
cancel_btn = QPushButton("Cancel")
cancel_btn.clicked.connect(self.reject)
-
+
bottom_layout.addWidget(select_all_btn)
bottom_layout.addStretch()
bottom_layout.addWidget(ok_btn)
bottom_layout.addWidget(cancel_btn)
layout.addLayout(bottom_layout)
-
+
# Connect signals
self.local_list.itemChanged.connect(self.update_preview)
self.network_list.itemChanged.connect(self.update_preview)
self.preview_list.model().rowsMoved.connect(self.update_camera_order)
-
+
# Set splitter sizes
splitter.setSizes([450, 450])
-
+
# Initial camera refresh
self.refresh_cameras()
-
+
# Restore last selection if available
if self.detector:
last_selected = self.detector.config.load_setting('last_selected_cameras', [])
if last_selected:
self.restore_selection(last_selected)
-
+
def refresh_cameras(self):
"""Refresh both local and network camera lists asynchronously"""
self.local_list.clear()
self.network_list.clear()
-
+
if not self.detector:
return
-
+
# Show placeholders and disable refresh while scanning
self.refresh_btn.setEnabled(False)
scanning_item_local = QListWidgetItem("Scanning for cameras…")
@@ -1929,27 +855,27 @@ class CameraSelectorDialog(QDialog):
scanning_item_net = QListWidgetItem("Loading network cameras…")
scanning_item_net.setFlags(Qt.NoItemFlags)
self.network_list.addItem(scanning_item_net)
-
+
# Start background scan
started = self.detector.start_camera_scan(10)
if not started:
# If a scan is already running, we'll just wait for its signal
pass
-
+
# Connect once to update lists when scan completes
try:
self.detector.cameras_scanned.disconnect(self._on_scan_finished_dialog)
except Exception:
pass
self.detector.cameras_scanned.connect(self._on_scan_finished_dialog)
-
+
def _on_scan_finished_dialog(self, cams, names):
# Re-enable refresh
self.refresh_btn.setEnabled(True)
# Rebuild lists
self.local_list.clear()
self.network_list.clear()
-
+
# Local cameras
for cam_path in cams:
if cam_path.startswith('net:'):
@@ -1965,7 +891,7 @@ class CameraSelectorDialog(QDialog):
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Unchecked)
self.local_list.addItem(item)
-
+
# Network cameras
for name, camera_info in self.detector.network_cameras.items():
if isinstance(camera_info, dict):
@@ -1981,7 +907,7 @@ class CameraSelectorDialog(QDialog):
item.setFlags(item.flags() | Qt.ItemIsUserCheckable)
item.setCheckState(Qt.Unchecked)
self.network_list.addItem(item)
-
+
def restore_selection(self, last_selected):
"""Restore previous camera selection"""
for cam_id in last_selected:
@@ -1990,18 +916,18 @@ class CameraSelectorDialog(QDialog):
item = self.local_list.item(i)
if item.data(Qt.UserRole) == cam_id:
item.setCheckState(Qt.Checked)
-
+
# Check network cameras
for i in range(self.network_list.count()):
item = self.network_list.item(i)
if item.data(Qt.UserRole) == cam_id:
item.setCheckState(Qt.Checked)
-
+
def update_preview(self):
"""Update the preview list with currently selected cameras"""
self.preview_list.clear()
self.selected_cameras = []
-
+
# Get selected local cameras
for i in range(self.local_list.count()):
item = self.local_list.item(i)
@@ -2011,7 +937,7 @@ class CameraSelectorDialog(QDialog):
preview_item.setData(Qt.UserRole, cam_id)
self.preview_list.addItem(preview_item)
self.selected_cameras.append(cam_id)
-
+
# Get selected network cameras
for i in range(self.network_list.count()):
item = self.network_list.item(i)
@@ -2021,36 +947,36 @@ class CameraSelectorDialog(QDialog):
preview_item.setData(Qt.UserRole, cam_id)
self.preview_list.addItem(preview_item)
self.selected_cameras.append(cam_id)
-
+
# Save the current selection to config
if self.detector:
self.detector.config.save_setting('last_selected_cameras', self.selected_cameras)
-
+
def update_camera_order(self):
"""Update the camera order based on preview list order"""
self.selected_cameras = []
for i in range(self.preview_list.count()):
item = self.preview_list.item(i)
self.selected_cameras.append(item.data(Qt.UserRole))
-
+
# Save the new order
if self.detector:
self.detector.config.save_setting('last_selected_cameras', self.selected_cameras)
-
+
def select_all(self):
"""Select all cameras in both lists"""
for i in range(self.local_list.count()):
self.local_list.item(i).setCheckState(Qt.Checked)
for i in range(self.network_list.count()):
self.network_list.item(i).setCheckState(Qt.Checked)
-
+
def clear_selection(self):
"""Clear all selections"""
for i in range(self.local_list.count()):
self.local_list.item(i).setCheckState(Qt.Unchecked)
for i in range(self.network_list.count()):
self.network_list.item(i).setCheckState(Qt.Unchecked)
-
+
def remove_selected(self):
"""Remove selected items from the preview list"""
selected_items = self.preview_list.selectedItems()
@@ -2063,33 +989,36 @@ class CameraSelectorDialog(QDialog):
for i in range(self.network_list.count()):
if self.network_list.item(i).data(Qt.UserRole) == cam_id:
self.network_list.item(i).setCheckState(Qt.Unchecked)
-
+
# Camera connection tests removed for performance reasons per user request.
def test_selected_cameras(self):
"""Deprecated: Camera tests are disabled to improve performance."""
- QMessageBox.information(self, "Camera Tests Disabled", "Camera connectivity tests have been removed to speed up the application.")
+ QMessageBox.information(self, "Camera Tests Disabled",
+ "Camera connectivity tests have been removed to speed up the application.")
return
-
+
def show_network_dialog(self):
"""Show the network camera configuration dialog"""
dialog = NetworkCameraDialog(self)
if dialog.exec_() == QDialog.Accepted:
self.refresh_cameras()
+
+
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("PySec")
self.setGeometry(100, 100, 1200, 800)
-
+
# Initialize configuration
self.config = Config()
-
+
# Initialize default values
self.current_layout = 0 # Default to single camera layout
self.detector = MultiCamYOLODetector(self) # Pass self as parent
self.camera_settings = {}
self.detection_enabled = True # Add detection toggle flag
-
+
# Alert system state
self.alert_enabled = bool(self.config.load_setting('alert_enabled', True))
self._alert_playing = False
@@ -2098,7 +1027,7 @@ class MainWindow(QMainWindow):
self._cooldown_timer.setSingleShot(True)
self._cooldown_timer.timeout.connect(self._on_cooldown_finished)
self._alert_worker = None
-
+
# Load saved settings first
self.load_saved_settings()
@@ -2108,7 +1037,7 @@ class MainWindow(QMainWindow):
self.hw_timer = QTimer()
self.hw_timer.timeout.connect(self.update_hardware_stats)
self.hw_timer.start(1000) # Update every second
-
+
# Set dark theme style
style_file = getpath.resource_path("styling/mainwindow.qss")
try:
@@ -2133,16 +1062,16 @@ class MainWindow(QMainWindow):
palette.setColor(palette.Highlight, QColor(42, 130, 218))
palette.setColor(palette.HighlightedText, Qt.black)
self.setPalette(palette)
-
+
# Initialize UI elements
self.init_ui()
-
+
# Create menus
self.create_menus()
-
+
# Initialize timer
self.init_timer()
-
+
# Apply saved settings to UI
self.apply_saved_settings()
@@ -2155,81 +1084,81 @@ class MainWindow(QMainWindow):
model_dir = self.config.load_setting('model_dir')
if model_dir and os.path.exists(model_dir):
self.detector.load_yolo_model(model_dir)
-
+
# Load FPS setting
fps = self.config.load_setting('fps', 10)
self.detector.target_fps = int(fps)
self.detector.frame_interval = 1.0 / self.detector.target_fps
-
+
# Load layout setting
self.current_layout = int(self.config.load_setting('layout', 0))
-
+
def apply_saved_settings(self):
"""Apply loaded settings to UI elements"""
if hasattr(self, 'fps_spin'):
self.fps_spin.setValue(self.detector.target_fps)
-
+
if hasattr(self, 'layout_combo'):
self.layout_combo.setCurrentIndex(self.current_layout)
-
+
if hasattr(self, 'model_label') and self.detector.model_dir:
self.model_label.setText(f"Model: {os.path.basename(self.detector.model_dir)}")
-
+
# Ensure alert UI reflects backend availability and state
try:
self._update_alert_ui()
except Exception:
pass
-
+
def create_menus(self):
menubar = self.menuBar()
-
+
# File Menu
file_menu = menubar.addMenu('File')
-
+
# Save Settings action
save_settings_action = QAction('Save Settings...', self)
save_settings_action.setShortcut('Ctrl+S')
save_settings_action.setStatusTip('Save current settings to a file')
save_settings_action.triggered.connect(self.save_settings_to_file)
file_menu.addAction(save_settings_action)
-
+
# Load Settings action
load_settings_action = QAction('Load Settings...', self)
load_settings_action.setShortcut('Ctrl+O')
load_settings_action.setStatusTip('Load settings from a file')
load_settings_action.triggered.connect(self.load_settings_from_file)
file_menu.addAction(load_settings_action)
-
+
file_menu.addSeparator()
-
+
# Export Screenshots Directory action
export_screenshots_action = QAction('Export Screenshots Directory...', self)
export_screenshots_action.setStatusTip('Open the screenshots directory')
export_screenshots_action.triggered.connect(self.open_screenshots_directory)
file_menu.addAction(export_screenshots_action)
-
+
file_menu.addSeparator()
-
+
about_action = QAction("About", self)
about_action.triggered.connect(self.show_menu)
file_menu.addAction(about_action)
-
+
file_menu.addSeparator()
-
+
# Exit action
exit_action = QAction('Exit', self)
exit_action.setShortcut('Ctrl+Q')
exit_action.setStatusTip('Exit application')
exit_action.triggered.connect(self.close)
file_menu.addAction(exit_action)
-
+
# Model menu
model_menu = menubar.addMenu('Model')
load_model_action = QAction('Load Model Directory...', self)
load_model_action.triggered.connect(self.load_model_directory)
model_menu.addAction(load_model_action)
-
+
# View menu
view_menu = menubar.addMenu('View')
self.toggle_sidebar_action = QAction('Show Sidebar', self)
@@ -2238,7 +1167,7 @@ class MainWindow(QMainWindow):
self.toggle_sidebar_action.setShortcut('Ctrl+B')
self.toggle_sidebar_action.triggered.connect(self.toggle_sidebar_visibility)
view_menu.addAction(self.toggle_sidebar_action)
-
+
# Add toggle detection action
self.toggle_detection_action = QAction('Enable Detection', self)
self.toggle_detection_action.setCheckable(True)
@@ -2254,59 +1183,59 @@ class MainWindow(QMainWindow):
self.toggle_alert_action.setStatusTip('Play an audible alert when a person is detected')
self.toggle_alert_action.triggered.connect(self.set_alert_enabled)
view_menu.addAction(self.toggle_alert_action)
-
+
# Camera menu
self.camera_menu = menubar.addMenu('Cameras')
-
+
# Add Camera Selector action
select_cameras_action = QAction('Select Cameras...', self)
select_cameras_action.setIcon(QIcon.fromTheme('camera-web'))
select_cameras_action.triggered.connect(self.show_camera_selector)
self.camera_menu.addAction(select_cameras_action)
-
+
self.camera_menu.addSeparator()
-
+
# Add Network Camera Settings action
network_camera_action = QAction('Network Camera Settings...', self)
network_camera_action.setIcon(QIcon.fromTheme('network-wireless'))
network_camera_action.triggered.connect(self.show_network_camera_dialog)
self.camera_menu.addAction(network_camera_action)
-
+
self.camera_menu.addSeparator()
-
+
# Create camera groups
self.local_camera_menu = QMenu('Local Cameras', self)
self.network_camera_menu = QMenu('Network Cameras', self)
self.camera_menu.addMenu(self.local_camera_menu)
self.camera_menu.addMenu(self.network_camera_menu)
-
+
# Create action groups for each camera type
self.local_camera_group = QActionGroup(self)
self.local_camera_group.setExclusive(False)
self.network_camera_group = QActionGroup(self)
self.network_camera_group.setExclusive(False)
-
+
# Initial population
self.populate_camera_menu()
-
+
# Sync sidebar toggle label/state with current visibility
try:
self._on_sidebar_visibility_changed(self.sidebar.isVisible())
except Exception:
pass
-
+
def populate_camera_menu(self):
"""Populate the camera menu with available cameras asynchronously"""
# Clear existing camera actions
self.local_camera_menu.clear()
self.network_camera_menu.clear()
-
+
# Add refresh action to both menus
refresh_action = QAction('Refresh List', self)
refresh_action.triggered.connect(self.populate_camera_menu)
self.local_camera_menu.addAction(refresh_action)
self.local_camera_menu.addSeparator()
-
+
# Show scanning placeholders
scanning_local = QAction('Scanning...', self)
scanning_local.setEnabled(False)
@@ -2314,7 +1243,7 @@ class MainWindow(QMainWindow):
scanning_net = QAction('Loading network cameras...', self)
scanning_net.setEnabled(False)
self.network_camera_menu.addAction(scanning_net)
-
+
# Start background scan
started = self.detector.start_camera_scan(10)
# Connect handler to build menus on completion
@@ -2323,20 +1252,20 @@ class MainWindow(QMainWindow):
except Exception:
pass
self.detector.cameras_scanned.connect(self._on_cameras_scanned_menu)
-
+
def _on_cameras_scanned_menu(self, available_cams, names):
# Rebuild menus with results
self.local_camera_menu.clear()
self.network_camera_menu.clear()
-
+
refresh_action = QAction('Refresh List', self)
refresh_action.triggered.connect(self.populate_camera_menu)
self.local_camera_menu.addAction(refresh_action)
self.local_camera_menu.addSeparator()
-
+
local_cams_found = False
network_cams_found = False
-
+
for cam_path in available_cams:
if cam_path.startswith('net:'):
# Network camera
@@ -2360,77 +1289,77 @@ class MainWindow(QMainWindow):
self.local_camera_group.addAction(action)
self.local_camera_menu.addAction(action)
local_cams_found = True
-
+
# Add placeholder text if no cameras found
if not local_cams_found:
no_local = QAction('No local cameras found', self)
no_local.setEnabled(False)
self.local_camera_menu.addAction(no_local)
-
+
if not network_cams_found:
no_net = QAction('No network cameras found', self)
no_net.setEnabled(False)
self.network_camera_menu.addAction(no_net)
-
+
# Update the camera label
self.update_selection_labels()
-
+
def update_selection_labels(self):
"""Update the model and camera selection labels"""
selected_cams = []
-
+
# Check local cameras
for action in self.local_camera_group.actions():
if action.isChecked():
selected_cams.append(action.text())
-
+
# Check network cameras
for action in self.network_camera_group.actions():
if action.isChecked():
selected_cams.append(action.text())
-
+
if selected_cams:
self.cameras_label.setText(f"Selected Cameras: {', '.join(selected_cams)}")
else:
self.cameras_label.setText("Selected Cameras: None")
-
+
def start_detection(self):
"""Start the detection process (camera feed can run without a model)"""
# Get selected cameras
selected_cameras = []
-
+
# Get local cameras
for action in self.local_camera_group.actions():
if action.isChecked():
selected_cameras.append(action.data())
-
+
# Get network cameras
for action in self.network_camera_group.actions():
if action.isChecked():
selected_cameras.append(action.data())
-
+
if not selected_cameras:
QMessageBox.critical(self, "Error", "No cameras selected!")
return
-
+
# Set FPS
self.detector.target_fps = self.fps_spin.value()
self.detector.frame_interval = 1.0 / self.detector.target_fps
-
+
# Connect to cameras
if not self.detector.connect_cameras(selected_cameras):
QMessageBox.critical(self, "Error", "Failed to connect to cameras!")
return
-
+
# Update UI
self.update_selection_labels()
self.start_btn.setEnabled(False)
self.stop_btn.setEnabled(True)
self.fps_spin.setEnabled(False)
-
+
# Start timer
self.timer.start(int(1000 / self.detector.target_fps))
-
+
def show_camera_selector(self):
"""Show the advanced CameraSelectorDialog (async scanning)"""
dialog = CameraSelectorDialog(self)
@@ -2450,17 +1379,17 @@ class MainWindow(QMainWindow):
if action.data() in selected:
action.setChecked(True)
self.update_selection_labels()
-
+
def load_model_directory(self):
"""Open file dialog to select model directory"""
last_dir = self.config.load_setting('model_dir', QDir.homePath())
model_dir = QFileDialog.getExistingDirectory(
- self,
- "Select Model Directory",
+ self,
+ "Select Model Directory",
last_dir,
QFileDialog.ShowDirsOnly | QFileDialog.DontResolveSymlinks
)
-
+
if model_dir:
if self.detector.load_yolo_model(model_dir):
self.model_label.setText(f"Model: {os.path.basename(model_dir)}")
@@ -2468,12 +1397,12 @@ class MainWindow(QMainWindow):
QMessageBox.information(self, "Success", "Model loaded successfully!")
else:
QMessageBox.critical(self, "Error", "Failed to load model from selected directory")
-
+
def init_ui(self):
"""Initialize the user interface with collapsible sidebar"""
main_widget = QWidget()
main_layout = QHBoxLayout()
-
+
# Create collapsible sidebar
self.sidebar = CollapsibleDock("Controls")
# Constrain sidebar width to prevent overexpansion from long labels/content
@@ -2485,45 +1414,45 @@ class MainWindow(QMainWindow):
except Exception:
pass
self.sidebar.visibilityChanged.connect(self._on_sidebar_visibility_changed)
-
+
# Sidebar content
sidebar_content = QWidget()
sidebar_layout = QVBoxLayout()
-
+
# Model section
model_group = QGroupBox("Model")
model_layout = QVBoxLayout()
-
+
self.model_label = QLabel("Model: Not loaded")
model_layout.addWidget(self.model_label)
-
+
load_model_btn = QPushButton("Load Model Directory...")
load_model_btn.clicked.connect(self.load_model_directory)
model_layout.addWidget(load_model_btn)
-
+
model_group.setLayout(model_layout)
sidebar_layout.addWidget(model_group)
-
+
# Camera section
camera_group = QGroupBox("Cameras")
camera_layout = QVBoxLayout()
-
+
self.cameras_label = QLabel("Selected Cameras: None")
self.cameras_label.setWordWrap(True)
self.cameras_label.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Minimum)
camera_layout.addWidget(self.cameras_label)
-
+
refresh_cams_btn = QPushButton("Refresh Camera List")
refresh_cams_btn.clicked.connect(self.populate_camera_menu)
camera_layout.addWidget(refresh_cams_btn)
-
+
camera_group.setLayout(camera_layout)
sidebar_layout.addWidget(camera_group)
-
+
# Settings section
settings_group = QGroupBox("Settings")
settings_layout = QVBoxLayout()
-
+
# FPS control
fps_layout = QHBoxLayout()
fps_layout.addWidget(QLabel("FPS:"))
@@ -2532,7 +1461,7 @@ class MainWindow(QMainWindow):
self.fps_spin.setValue(10)
fps_layout.addWidget(self.fps_spin)
settings_layout.addLayout(fps_layout)
-
+
# Layout selection
layout_layout = QHBoxLayout()
layout_layout.addWidget(QLabel("Layout:"))
@@ -2541,35 +1470,35 @@ class MainWindow(QMainWindow):
self.layout_combo.currentIndexChanged.connect(self.change_camera_layout)
layout_layout.addWidget(self.layout_combo)
settings_layout.addLayout(layout_layout)
-
+
# Button enablement determined dynamically based on backend availability
-
+
# Add screenshot button to settings
screenshot_btn = QPushButton("Take Screenshot")
screenshot_btn.clicked.connect(self.take_screenshot)
settings_layout.addWidget(screenshot_btn)
-
+
settings_group.setLayout(settings_layout)
sidebar_layout.addWidget(settings_group)
-
+
# Control buttons
btn_layout = QHBoxLayout()
self.start_btn = QPushButton("Start")
self.start_btn.clicked.connect(self.start_detection)
btn_layout.addWidget(self.start_btn)
-
+
self.stop_btn = QPushButton("Stop")
self.stop_btn.clicked.connect(self.stop_detection)
self.stop_btn.setEnabled(False)
btn_layout.addWidget(self.stop_btn)
-
+
sidebar_layout.addLayout(btn_layout)
-
+
# Add stretch to push everything up
sidebar_layout.addStretch()
-
+
sidebar_content.setLayout(sidebar_layout)
-
+
# Add scroll area to sidebar
scroll = QScrollArea()
scroll.setWidget(sidebar_content)
@@ -2578,24 +1507,24 @@ class MainWindow(QMainWindow):
# Ensure scroll area doesn't request excessive width
scroll.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Expanding)
self.sidebar.setWidget(scroll)
-
+
self.addDockWidget(Qt.LeftDockWidgetArea, self.sidebar)
-
+
# Main display area
self.display_area = QWidget()
self.display_layout = QGridLayout()
self.camera_displays = []
-
+
# Initially create 4 camera displays
for i in range(4):
display = CameraDisplay()
- display.set_cam_id(i+1)
+ display.set_cam_id(i + 1)
self.camera_displays.append(display)
- self.display_layout.addWidget(display, i//2, i%2)
-
+ self.display_layout.addWidget(display, i // 2, i % 2)
+
self.display_area.setLayout(self.display_layout)
main_layout.addWidget(self.display_area)
-
+
# Hardware Monitor section
hw_monitor_group = QGroupBox("Hardware Monitor")
hw_monitor_layout = QVBoxLayout()
@@ -2611,7 +1540,7 @@ class MainWindow(QMainWindow):
# Set Styling from cpu progress QSS file
style_file = getpath.resource_path("styling/cpu_progress.qss")
try:
- with open(style_file,"r") as cpu_progress_style:
+ with open(style_file, "r") as cpu_progress_style:
self.cpu_progress.setStyleSheet(cpu_progress_style.read())
except FileNotFoundError:
pass
@@ -2647,7 +1576,7 @@ class MainWindow(QMainWindow):
core_file = getpath.resource_path("styling/core_bar.qss")
try:
- with open(core_file,"r") as core_bar_styling:
+ with open(core_file, "r") as core_bar_styling:
core_bar.setStyleSheet(core_bar_styling.read())
except FileNotFoundError:
pass
@@ -2659,26 +1588,26 @@ class MainWindow(QMainWindow):
hw_monitor_group.setLayout(hw_monitor_layout)
sidebar_layout.addWidget(hw_monitor_group)
-
+
main_widget.setLayout(main_layout)
self.setCentralWidget(main_widget)
-
+
# Start with sidebar expanded
self.sidebar.expand()
-
+
def change_camera_layout(self, index):
"""Change the camera display layout"""
# Clear the layout
for i in reversed(range(self.display_layout.count())):
self.display_layout.itemAt(i).widget().setParent(None)
-
+
num_cameras = index + 1 if index < 4 else 4
-
+
if index == 4: # Grid layout
rows = 2
cols = 2
for i in range(4):
- self.display_layout.addWidget(self.camera_displays[i], i//cols, i%cols)
+ self.display_layout.addWidget(self.camera_displays[i], i // cols, i % cols)
else:
if num_cameras == 1:
self.display_layout.addWidget(self.camera_displays[0], 0, 0, 1, 2)
@@ -2691,33 +1620,33 @@ class MainWindow(QMainWindow):
self.display_layout.addWidget(self.camera_displays[2], 1, 0, 1, 2)
elif num_cameras == 4:
for i in range(4):
- self.display_layout.addWidget(self.camera_displays[i], i//2, i%2)
-
+ self.display_layout.addWidget(self.camera_displays[i], i // 2, i % 2)
+
# Hide unused displays
for i, display in enumerate(self.camera_displays):
display.setVisible(i < num_cameras)
-
+
def init_timer(self):
"""Initialize the timer for updating camera feeds"""
self.timer = QTimer()
self.timer.timeout.connect(self.update_feeds)
-
+
def stop_detection(self):
"""Stop the detection process"""
self.timer.stop()
self.detector.disconnect_cameras()
-
+
# Update UI
self.start_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
self.fps_spin.setEnabled(True)
-
+
# Clear displays
for display in self.camera_displays:
display.setText("No camera feed")
cleardisplaypath = getpath.resource_path("styling/cleardisplay.qss")
try:
- with open(cleardisplaypath,"r") as cdstyle:
+ with open(cleardisplaypath, "r") as cdstyle:
display.setStyleSheet(cdstyle.read())
except FileNotFoundError:
pass
@@ -2725,23 +1654,23 @@ class MainWindow(QMainWindow):
def update_feeds(self):
"""Update the camera feeds in the display"""
frames = self.detector.get_frames()
-
+
for i, (cam_path, frame) in enumerate(zip(self.detector.cameras, frames)):
if i >= len(self.camera_displays):
break
-
+
# Convert frame to QImage
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
bytes_per_line = ch * w
qt_image = QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format_RGB888)
-
+
# Scale while maintaining aspect ratio
pixmap = QPixmap.fromImage(qt_image)
display = self.camera_displays[i]
- display.setPixmap(pixmap.scaled(display.width(), display.height(),
- Qt.KeepAspectRatio, Qt.SmoothTransformation))
-
+ display.setPixmap(pixmap.scaled(display.width(), display.height(),
+ Qt.KeepAspectRatio, Qt.SmoothTransformation))
+
# Update camera name
cam_path = cam_path[0] if isinstance(cam_path, tuple) else cam_path
if isinstance(cam_path, str):
@@ -2755,28 +1684,28 @@ class MainWindow(QMainWindow):
else:
# For numeric indices, show Camera N
display.set_camera_name(f"Camera {cam_path}")
-
+
def take_screenshot(self):
"""Take screenshot of active camera displays"""
active_displays = [d for d in self.camera_displays if d.isVisible() and d.pixmap()]
if not active_displays:
QMessageBox.warning(self, "Warning", "No active camera displays to capture!")
return
-
+
for display in active_displays:
display.take_screenshot()
-
+
def show_menu(self):
about = AboutWindow(self) # Pass self as parent
about.exec_() # Use exec_() for modal dialog
-
+
def show_network_camera_dialog(self):
"""Show the network camera management dialog"""
dialog = NetworkCameraDialog(self)
dialog.exec_()
# Refresh camera list after dialog closes
self.populate_camera_menu()
-
+
def save_settings_to_file(self):
"""Save current settings to a JSON file"""
file_path, _ = QFileDialog.getSaveFileName(
@@ -2785,7 +1714,7 @@ class MainWindow(QMainWindow):
os.path.expanduser("~"),
"JSON Files (*.json)"
)
-
+
if file_path:
try:
settings = {
@@ -2795,13 +1724,13 @@ class MainWindow(QMainWindow):
'network_cameras': self.detector.network_cameras,
'confidence_threshold': self.detector.confidence_threshold
}
-
+
with open(file_path, 'w') as f:
json.dump(settings, f, indent=4)
QMessageBox.information(self, "Success", "Settings saved successfully!")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to save settings: {str(e)}")
-
+
def load_settings_from_file(self):
"""Load settings from a JSON file"""
file_path, _ = QFileDialog.getOpenFileName(
@@ -2810,41 +1739,41 @@ class MainWindow(QMainWindow):
os.path.expanduser("~"),
"JSON Files (*.json)"
)
-
+
if file_path:
try:
with open(file_path, 'r') as f:
settings = json.load(f)
-
+
# Apply loaded settings
if 'model_dir' in settings and os.path.exists(settings['model_dir']):
self.detector.load_yolo_model(settings['model_dir'])
self.model_label.setText(f"Model: {os.path.basename(settings['model_dir'])}")
-
+
if 'fps' in settings:
self.fps_spin.setValue(settings['fps'])
-
+
if 'layout' in settings:
self.layout_combo.setCurrentIndex(settings['layout'])
-
+
if 'network_cameras' in settings:
self.detector.network_cameras = settings['network_cameras']
self.populate_camera_menu()
-
+
if 'confidence_threshold' in settings:
self.detector.confidence_threshold = settings['confidence_threshold']
-
+
QMessageBox.information(self, "Success", "Settings loaded successfully!")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to load settings: {str(e)}")
-
+
def open_screenshots_directory(self):
"""Open the screenshots directory in the system's file explorer"""
screenshot_dir = self.config.load_setting('screenshot_dir', os.path.expanduser('~/Pictures/MuCaPy'))
-
+
if not os.path.exists(screenshot_dir):
os.makedirs(screenshot_dir, exist_ok=True)
-
+
# Open directory using the appropriate command for the OS
try:
if sys.platform.startswith('win'):
@@ -2870,7 +1799,7 @@ class MainWindow(QMainWindow):
self.sidebar.collapse()
self.toggle_sidebar_action.setText('Show Sidebar')
self.toggle_sidebar_action.setChecked(False)
-
+
def _on_sidebar_visibility_changed(self, visible):
"""Keep menu action in sync with actual sidebar visibility."""
if hasattr(self, 'toggle_sidebar_action'):
@@ -2888,7 +1817,7 @@ class MainWindow(QMainWindow):
# Update overall CPU usage
cpu_percent = psutil.cpu_percent()
self.cpu_progress.setValue(int(cpu_percent))
-
+
# Update per-core CPU usage
per_core = psutil.cpu_percent(percpu=True)
for i, usage in enumerate(per_core):
@@ -2910,7 +1839,8 @@ class MainWindow(QMainWindow):
rss_h = bytes_to_human(rss)
avail_h = bytes_to_human(available)
self.mem_progress.setFormat(f"{rss_h}")
- self.mem_progress.setToolTip(f"Python Resident Set Size: {rss_h}\nAvailable: {avail_h}\nTotal RAM: {bytes_to_human(total_ram)}")
+ self.mem_progress.setToolTip(
+ f"Python Resident Set Size: {rss_h}\nAvailable: {avail_h}\nTotal RAM: {bytes_to_human(total_ram)}")
except Exception:
pass
@@ -2925,7 +1855,7 @@ class MainWindow(QMainWindow):
if self.sepstyleing == False:
u60 = getpath.resource_path("styling/bar/u60.qss")
try:
- with open(u60,"r") as u60_style:
+ with open(u60, "r") as u60_style:
bar.setStyleSheet(u60_style.read())
except FileNotFoundError:
print("Styling for CPU U60 not found!")
@@ -2933,7 +1863,7 @@ class MainWindow(QMainWindow):
else:
u60seperate = getpath.resource_path("styling/bar/seperate/u60.qss")
try:
- with open(u60seperate,"r") as u60_seperate_styling:
+ with open(u60seperate, "r") as u60_seperate_styling:
bar.setStyleSheet(u60_seperate_styling.read())
except FileNotFoundError:
print("No Seperate Styling! Generate one!")
@@ -2944,7 +1874,7 @@ class MainWindow(QMainWindow):
if self.sepstyleing == False:
u85 = getpath.resource_path("styling/bar/a85.qss")
try:
- with open(u85,"r") as a85_styling:
+ with open(u85, "r") as a85_styling:
bar.setStyleSheet(a85_styling.read())
except FileNotFoundError:
print("Styling for CPU u85 not found")
@@ -2952,7 +1882,7 @@ class MainWindow(QMainWindow):
else:
u85sep = getpath.resource_path("styling/bar/seperate/a85.qss")
try:
- with open(u85sep,"r") as u85style_sep:
+ with open(u85sep, "r") as u85style_sep:
bar.setStyleSheet(u85style_sep.read())
except FileNotFoundError:
print("No Seperate File Found for U85")
@@ -2971,12 +1901,11 @@ class MainWindow(QMainWindow):
else:
else_file_seperate = getpath.resource_path("styling/bar/seperate/else.qss")
try:
- with open(else_file_seperate,"r") as efs:
+ with open(else_file_seperate, "r") as efs:
bar.setStyleSheet(efs.read())
except FileNotFoundError:
print("No Sepearte Styling found")
pass
-
def toggle_detection(self):
"""Toggle detection enabled/disabled"""
@@ -3033,7 +1962,8 @@ class MainWindow(QMainWindow):
tip.append("Alert is playing")
if not self.alert_enabled:
tip.append("Alert disabled")
- self.toggle_alert_action.setStatusTip("; ".join(tip) if tip else "Play an audible alert when a person is detected")
+ self.toggle_alert_action.setStatusTip(
+ "; ".join(tip) if tip else "Play an audible alert when a person is detected")
except Exception:
pass
@@ -3144,6 +2074,7 @@ class MainWindow(QMainWindow):
except Exception:
pass
+
class initQT:
"""
This is a QOL Change if you prefer to do it the hard way. Or you just like to get Fist Fucked then i suggest you remove the Function Calls in the
@@ -3152,10 +2083,11 @@ class initQT:
This is not needed for Windows as it does this Automatically (at least i think)
If some shit that is supposed to happen isnt happening. Step through this Class Via Debuggers!
"""
+
def __init__(self):
- self.session_type = None # This is for QT #
- #--------------------#
- self.env = os.environ.copy() # This is for CV2 #
+ self.session_type = None # This is for QT #
+ #--------------------#
+ self.env = os.environ.copy() # This is for CV2 #
def getenv(self):
# If the OS is Linux get Qts Session Type
@@ -3165,9 +2097,9 @@ class initQT:
else:
# If theres no Type then Exit 1
print(
- "No XDG Session Type found!"
- "echo $XDG_SESSION_TYPE"
- "Run this command in bash!"
+ "No XDG Session Type found!"
+ "echo $XDG_SESSION_TYPE"
+ "Run this command in bash!"
)
pass
@@ -3178,11 +2110,12 @@ class initQT:
else:
# If this fails then just exit with 1
print(
- "Setting the XDG_SESSION_TYPE failed!"
- f"export XDG_SESSION_TYPE={self.session_type}"
- "run this command in bash"
+ "Setting the XDG_SESSION_TYPE failed!"
+ f"export XDG_SESSION_TYPE={self.session_type}"
+ "run this command in bash"
)
pass
+
@staticmethod
def shutupCV():
# This needs some fixing as this only works before importing CV2 ; too much refactoring work tho!
@@ -3191,12 +2124,15 @@ class initQT:
else:
pass
+
"""
This is where windows fuckery starts, if you try to modify any of this then good luck,
this code is fragile but usually works, idk if it works in production but im pushing anyways,
fuck you.
Here we just try to set the windows titlebar to dark mode, this is done with HWND Handle
"""
+
+
def is_windows_darkmode() -> bool:
if platform.system() != "Windows":
return False
@@ -3212,6 +2148,7 @@ def is_windows_darkmode() -> bool:
print(f"Could not read Windows registry for dark mode: {e}")
return False
+
class darkmodechildren(QApplication):
def notify(self, receiver, event):
# Only handle top-level windows
@@ -3220,6 +2157,7 @@ class darkmodechildren(QApplication):
set_dark_titlebar(receiver)
return super().notify(receiver, event)
+
def set_dark_titlebar(widget: QWidget):
"""Apply dark titlebar on Windows to any top-level window."""
if platform.system() != "Windows":
@@ -3275,4 +2213,4 @@ if __name__ == "__main__":
sys.exit(app.exec_())
except Exception as e:
print(f"Exit Exception with: {e}")
- pass
\ No newline at end of file
+ pass
diff --git a/requirements.txt b/requirements.txt
index 9610c6e..c213556 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,4 +3,6 @@ numpy==2.2.6
PyQt5==5.15.11
requests==2.32.3
psutil==7.0.0
-pytest==8.4.0
\ No newline at end of file
+pytest==8.4.0
+comtypes==1.4.13
+rtsp==1.1.12
\ No newline at end of file