cleaned code and refactored a bunch

This commit is contained in:
2025-11-01 18:18:20 +01:00
parent 7752eaaf9d
commit cffc59a285
9 changed files with 1794 additions and 1350 deletions

0
CameraDisplay.py Normal file
View File

178
mucapy/AlertWorker.py Normal file
View File

@@ -0,0 +1,178 @@
import shutil
import wave
try:
import simpleaudio as sa
except ImportError:
sa = None
sa = None # Force it to not use it cause it fucks stuff up
import os
import time
import sys
from PyQt5.QtCore import QThread, pyqtSignal
class AlertWorker(QThread):
"""Worker thread to play an alert sound safely without blocking UI.
Uses winsound on Windows, external system players on Unix (afplay/paplay/aplay/ffplay),
and falls back to simpleaudio if available. Supports cooperative stop.
"""
finished = pyqtSignal(bool, str) # success, message
def __init__(self, wav_path: str, parent=None):
super().__init__(parent)
self.wav_path = wav_path
self._stop = False
self._subproc = None
self._play_obj = None
def stop(self):
"""Request the worker to stop early."""
try:
self._stop = True
if self._play_obj is not None:
try:
self._play_obj.stop()
except Exception:
pass
if self._subproc is not None:
try:
self._subproc.terminate()
except Exception:
pass
except Exception:
pass
def _find_unix_player(self):
"""Return (cmd_list, name) for an available player on Unix or (None, None)."""
try:
if sys.platform.startswith('darwin'):
if shutil.which('afplay'):
return (['afplay'], 'afplay')
# Linux and others
if shutil.which('paplay'):
return (['paplay'], 'paplay')
if shutil.which('aplay'):
return (['aplay', '-q'], 'aplay')
if shutil.which('ffplay'):
return (['ffplay', '-nodisp', '-autoexit', '-loglevel', 'error'], 'ffplay')
except Exception:
pass
return (None, None)
def run(self):
try:
if not os.path.exists(self.wav_path):
self.finished.emit(False, f"File not found: {self.wav_path}")
return
# Windows path: prefer winsound (native, safe)
if sys.platform.startswith('win'):
ws_error = "unknown"
try:
import winsound as _ws # type: ignore
# Resolve flags safely even if some attributes are missing
SND_FILENAME = getattr(_ws, 'SND_FILENAME', 0x00020000)
SND_SYNC = getattr(_ws, 'SND_SYNC', 0x0000) # 0 is synchronous by default
flags = SND_FILENAME | SND_SYNC
# Ensure PlaySound exists
play_fn = getattr(_ws, 'PlaySound', None)
if play_fn is None:
raise RuntimeError('winsound.PlaySound not available')
for _ in range(4):
if self._stop:
break
try:
play_fn(self.wav_path, flags)
except Exception as e:
# On failure, break to try alternative backends
ws_error = str(e)
break
time.sleep(0.002)
else:
# Completed all 4 plays
self.finished.emit(True, "Alert played")
return
# If here, winsound failed at some point; continue to fallbacks
except Exception as e:
ws_error = str(e)
# Try simpleaudio on Windows as fallback
if sa is not None:
try:
with wave.open(self.wav_path, 'rb') as wf:
n_channels = max(1, wf.getnchannels())
sampwidth = max(1, wf.getsampwidth())
framerate = max(8000, wf.getframerate() or 44100)
frames = wf.readframes(wf.getnframes())
for _ in range(4):
if self._stop:
break
self._play_obj = sa.play_buffer(frames, n_channels, sampwidth, framerate)
self._play_obj.wait_done()
time.sleep(0.002)
self.finished.emit(True, "Alert played")
return
except Exception as e2:
self.finished.emit(False, f"Playback error (winsound fallback -> simpleaudio): {e2}")
return
else:
self.finished.emit(False, f"Audio backend not available (winsound failed: {ws_error})")
return
# Non-Windows: try external players first
cmd, name = self._find_unix_player()
if cmd is not None:
for _ in range(4):
if self._stop:
break
try:
self._subproc = subprocess.Popen(cmd + [self.wav_path], stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
# Poll until done or stop requested
while True:
if self._stop:
try:
self._subproc.terminate()
except Exception:
pass
break
ret = self._subproc.poll()
if ret is not None:
break
time.sleep(0.01)
except Exception as e:
# Try next backend
cmd = None
break
finally:
self._subproc = None
time.sleep(0.002)
if cmd is not None:
self.finished.emit(True, "Alert played")
return
# Fallback: simpleaudio if available
if sa is not None:
try:
with wave.open(self.wav_path, 'rb') as wf:
n_channels = max(1, wf.getnchannels())
sampwidth = max(1, wf.getsampwidth())
framerate = max(8000, wf.getframerate() or 44100)
frames = wf.readframes(wf.getnframes())
for _ in range(4):
if self._stop:
break
self._play_obj = sa.play_buffer(frames, n_channels, sampwidth, framerate)
self._play_obj.wait_done()
time.sleep(0.002)
self.finished.emit(True, "Alert played")
return
except Exception as e:
self.finished.emit(False, f"Playback error (simpleaudio): {e}")
return
self.finished.emit(False, "No audio backend available (afplay/paplay/aplay/ffplay/simpleaudio)")
except Exception as e:
try:
self.finished.emit(False, str(e))
except Exception:
pass

View File

@@ -0,0 +1,27 @@
import sys
from PyQt5.QtCore import QThread, pyqtSignal
class CameraScanThread(QThread):
scan_finished = pyqtSignal(list, dict)
def __init__(self, detector, max_to_check=10, parent=None):
super().__init__(parent)
self.detector = detector
self.max_to_check = max_to_check
def run(self):
try:
cams = self.detector.scan_for_cameras(self.max_to_check)
names = {}
if sys.platform.startswith('win'):
try:
names = self.detector.get_camera_names_windows(cams)
except Exception as e:
print(f"Failed to get Windows camera names: {e}")
names = {}
self.scan_finished.emit(cams, names)
except Exception as e:
print(f"CameraScanThread error: {e}")
self.scan_finished.emit([], {})

524
mucapy/CameraThread.py Normal file
View File

@@ -0,0 +1,524 @@
import time
import urllib.parse
from enum import Enum
import cv2
import numpy as np
import requests
from PyQt5.QtCore import QThread, pyqtSignal, QMutex
# Optional: Try to import rtsp library for better RTSP handling
try:
import rtsp
RTSP_LIB_AVAILABLE = True
except ImportError:
RTSP_LIB_AVAILABLE = False
print("rtsp library not available. Install with: pip install rtsp")
class StreamType(Enum):
"""Enum for different stream types"""
LOCAL = "local"
RTSP = "rtsp"
HTTP_MJPEG = "http_mjpeg"
DROIDCAM = "droidcam"
IP_CAMERA = "ip_camera"
NETWORK = "network"
class CameraThread(QThread):
"""Enhanced thread class for handling various camera connections and frame grabbing"""
frame_ready = pyqtSignal(int, np.ndarray)
error_occurred = pyqtSignal(int, str)
connection_status = pyqtSignal(int, bool, str) # camera_id, connected, message
def __init__(self, camera_id, camera_info, parent=None):
super().__init__(parent)
self.camera_id = camera_id
self.camera_info = camera_info
self.running = False
self.cap = None
self.rtsp_client = None # For rtsp library client
self.mutex = QMutex()
self.frame_interval = 1.0 / 30 # Default to 30 FPS
self.reconnect_attempts = 5
self.reconnect_delay = 2
self.stream_type = None
self.read_timeout = 5.0
self.connection_timeout = 10
self.use_rtsp_lib = RTSP_LIB_AVAILABLE # Use rtsp library if available
def set_fps(self, fps):
"""Set the target FPS for frame capture"""
if fps > 0:
self.frame_interval = 1.0 / fps
def detect_stream_type(self, url_or_info):
"""Detect the type of stream based on URL or camera info"""
if isinstance(url_or_info, (int, str)):
url_str = str(url_or_info)
if url_str.isdigit():
return StreamType.LOCAL
elif url_str.startswith('rtsp://'):
return StreamType.RTSP
elif url_str.startswith('net:'):
return StreamType.NETWORK
elif ':4747' in url_str or 'droidcam' in url_str.lower():
return StreamType.DROIDCAM
elif url_str.startswith(('http://', 'https://')):
return StreamType.HTTP_MJPEG
else:
return StreamType.IP_CAMERA
return StreamType.NETWORK
@staticmethod
def validate_url(url):
"""Validate and normalize URL format"""
try:
url = url.strip()
if not url:
return None
# Parse the URL
if not url.startswith(('http://', 'https://', 'rtsp://', 'rtmp://')):
url = f"http://{url}"
parsed = urllib.parse.urlparse(url)
if not parsed.netloc:
return None
# Special handling for DroidCam
if ':4747' in url and not url.endswith('/video'):
base_url = f"{parsed.scheme}://{parsed.netloc}"
return f"{base_url}/video"
return url
except Exception as e:
print(f"URL validation error: {e}")
return None
def construct_camera_url(self, camera_info):
"""Construct proper camera URL with authentication if needed"""
try:
if isinstance(camera_info, dict):
url = camera_info.get('url', '')
username = camera_info.get('username', '')
password = camera_info.get('password', '')
else:
url = str(camera_info)
username = ''
password = ''
url = self.validate_url(url)
if not url:
return None
# Handle authentication
if username and password:
parsed = urllib.parse.urlparse(url)
if '@' not in parsed.netloc:
auth = f"{urllib.parse.quote(username)}:{urllib.parse.quote(password)}"
netloc = f"{auth}@{parsed.netloc}"
url = urllib.parse.urlunparse(parsed._replace(netloc=netloc))
return url
except Exception as e:
print(f"Error constructing camera URL: {e}")
return None
def configure_capture(self, cap, stream_type):
"""Configure VideoCapture object based on stream type"""
try:
# Common settings
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if stream_type == StreamType.LOCAL:
cap.set(cv2.CAP_PROP_FPS, 30)
elif stream_type in [StreamType.RTSP, StreamType.IP_CAMERA]:
# RTSP/IP camera optimizations
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000)
cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000)
elif stream_type in [StreamType.HTTP_MJPEG, StreamType.DROIDCAM]:
cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000)
cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000)
except Exception as e:
print(f"Warning: Could not configure capture settings: {e}")
def test_network_endpoint(self, url, timeout=3):
"""Test if a network endpoint is accessible"""
try:
response = requests.head(url, timeout=timeout, allow_redirects=True)
return response.status_code in [200, 401]
except requests.exceptions.RequestException:
try:
response = requests.get(url, timeout=timeout, stream=True)
response.close()
return response.status_code in [200, 401]
except Exception:
return False
def connect_rtsp_with_library(self, url):
"""Connect to RTSP stream using the rtsp library"""
try:
print(f" Attempting connection with rtsp library...")
self.rtsp_client = rtsp.Client(rtsp_server_uri=url, verbose=False)
# Test if connection works
if self.rtsp_client.isOpened():
# Try to read a frame
frame = self.rtsp_client.read()
if frame is not None:
print(f" Successfully connected with rtsp library")
return True
else:
print(f" Failed to read frame with rtsp library")
self.rtsp_client.close()
self.rtsp_client = None
else:
print(f" rtsp library failed to open stream")
self.rtsp_client = None
except Exception as e:
print(f" rtsp library error: {e}")
if self.rtsp_client:
try:
self.rtsp_client.close()
except Exception:
pass
self.rtsp_client = None
return False
def connect_rtsp_with_opencv(self, url):
"""Connect to RTSP stream using OpenCV with different transport protocols"""
import os
transports = ['tcp', 'udp', 'http']
for transport in transports:
try:
print(f" Trying RTSP with {transport.upper()} transport...")
# Set FFMPEG options
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = (
f"rtsp_transport;{transport}|"
f"timeout;5000000|"
f"stimeout;5000000|"
f"buffer_size;1024000"
)
self.cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
self.configure_capture(self.cap, StreamType.RTSP)
if not self.cap.isOpened():
print(f" Failed to open with {transport}")
self.cap.release()
continue
# Try to read a frame
start_time = time.time()
while time.time() - start_time < 5:
ret, frame = self.cap.read()
if ret and frame is not None and frame.size > 0:
print(f" Successfully connected with {transport.upper()}")
return True
time.sleep(0.1)
print(f" Failed to read frame with {transport}")
self.cap.release()
except Exception as e:
print(f" Error with {transport}: {e}")
if self.cap:
self.cap.release()
self.cap = None
return False
def connect_to_camera(self):
"""Attempt to connect to the camera with enhanced retry logic"""
for attempt in range(self.reconnect_attempts):
try:
# Clean up existing connections
if self.cap is not None:
try:
self.cap.release()
except Exception:
pass
self.cap = None
if self.rtsp_client is not None:
try:
self.rtsp_client.close()
except Exception:
pass
self.rtsp_client = None
# Determine camera source
if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'):
name = self.camera_info[4:]
detector = self.parent().detector if self.parent() else None
if not detector or name not in detector.network_cameras:
self.connection_status.emit(self.camera_id, False, f"Network camera {name} not found")
return False
camera_info = detector.network_cameras[name]
url = self.construct_camera_url(camera_info)
if not url:
self.connection_status.emit(self.camera_id, False, f"Invalid URL for {name}")
return False
self.stream_type = self.detect_stream_type(url)
camera_source = url
else:
if isinstance(self.camera_info, dict):
url = self.construct_camera_url(self.camera_info)
if not url:
self.connection_status.emit(self.camera_id, False, "Invalid camera URL")
return False
camera_source = url
self.stream_type = self.detect_stream_type(url)
else:
camera_source = self.camera_info
self.stream_type = self.detect_stream_type(camera_source)
if self.stream_type != StreamType.LOCAL:
camera_source = self.validate_url(str(camera_source))
if not camera_source:
self.connection_status.emit(self.camera_id, False, "Invalid camera source")
return False
print(f"Attempt {attempt + 1}/{self.reconnect_attempts}: Connecting to {self.stream_type.value} camera...")
# Test network endpoint for HTTP streams
if self.stream_type in [StreamType.HTTP_MJPEG, StreamType.DROIDCAM, StreamType.IP_CAMERA]:
if not self.test_network_endpoint(camera_source):
print(f"Network endpoint not accessible")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
self.connection_status.emit(self.camera_id, False, "Network endpoint not accessible")
return False
# Connect based on stream type
if self.stream_type == StreamType.LOCAL:
self.cap = cv2.VideoCapture(int(camera_source))
self.configure_capture(self.cap, self.stream_type)
if not self.cap.isOpened():
print("Failed to open local camera")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
# Test frame reading
ret, frame = self.cap.read()
if not ret or frame is None:
print("Failed to read from local camera")
self.cap.release()
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
elif self.stream_type == StreamType.RTSP:
# Try rtsp library first if available
if self.use_rtsp_lib and self.connect_rtsp_with_library(camera_source):
self.connection_status.emit(self.camera_id, True, "Connected (rtsp lib)")
return True
# Fall back to OpenCV with different transports
if self.connect_rtsp_with_opencv(camera_source):
self.connection_status.emit(self.camera_id, True, "Connected (opencv)")
return True
print("All RTSP connection methods failed")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
else:
# HTTP MJPEG, DroidCam, IP Camera
self.cap = cv2.VideoCapture(camera_source, cv2.CAP_FFMPEG)
self.configure_capture(self.cap, self.stream_type)
if not self.cap.isOpened():
print("Failed to open stream")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
# Test frame reading
start_time = time.time()
ret, frame = False, None
while time.time() - start_time < self.read_timeout:
ret, frame = self.cap.read()
if ret and frame is not None and frame.size > 0:
break
time.sleep(0.1)
if not ret or frame is None or frame.size == 0:
print("Failed to read frames")
self.cap.release()
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
print(f"Successfully connected to camera")
self.connection_status.emit(self.camera_id, True, "Connected")
return True
except Exception as e:
print(f"Connection attempt {attempt + 1} failed: {str(e)}")
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
if self.rtsp_client:
try:
self.rtsp_client.close()
except Exception:
pass
self.rtsp_client = None
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
else:
self.connection_status.emit(self.camera_id, False, str(e))
self.error_occurred.emit(self.camera_id, str(e))
return False
return False
def run(self):
"""Main thread loop with enhanced error handling"""
try:
if not self.connect_to_camera():
self.error_occurred.emit(self.camera_id, "Failed to connect after multiple attempts")
return
self.running = True
last_frame_time = time.time()
consecutive_failures = 0
last_reconnect_time = time.time()
while self.running:
self.mutex.lock()
should_continue = self.running
self.mutex.unlock()
if not should_continue:
break
# Frame rate limiting
current_time = time.time()
if current_time - last_frame_time < self.frame_interval:
time.sleep(0.001)
continue
# Read frame based on connection type
try:
if self.rtsp_client:
# Using rtsp library
frame = self.rtsp_client.read()
ret = frame is not None
if ret:
# Convert PIL Image to numpy array
frame = np.array(frame)
# Convert RGB to BGR for OpenCV compatibility
if len(frame.shape) == 3 and frame.shape[2] == 3:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
else:
# Using OpenCV
ret, frame = self.cap.read()
if ret and frame is not None and frame.size > 0:
consecutive_failures = 0
self.frame_ready.emit(self.camera_id, frame)
last_frame_time = current_time
else:
consecutive_failures += 1
if consecutive_failures >= 10:
if current_time - last_reconnect_time > 5:
print("Multiple failures, attempting reconnection...")
self.connection_status.emit(self.camera_id, False, "Reconnecting...")
if self.cap:
self.cap.release()
if self.rtsp_client:
self.rtsp_client.close()
if self.connect_to_camera():
consecutive_failures = 0
last_reconnect_time = current_time
else:
self.error_occurred.emit(self.camera_id, "Reconnection failed")
break
else:
consecutive_failures = 0
time.sleep(0.1)
except Exception as e:
print(f"Error reading frame: {e}")
consecutive_failures += 1
time.sleep(0.1)
except Exception as e:
self.error_occurred.emit(self.camera_id, f"Thread error: {str(e)}")
finally:
self.cleanup()
def stop(self):
"""Stop the thread safely"""
self.mutex.lock()
self.running = False
self.mutex.unlock()
if not self.wait(5000):
print(f"Warning: Camera thread {self.camera_id} did not stop gracefully")
self.terminate()
def cleanup(self):
"""Clean up camera resources"""
print(f"Cleaning up camera {self.camera_id}")
try:
if self.cap:
self.cap.release()
self.cap = None
except Exception as e:
print(f"Error during cap cleanup: {e}")
try:
if self.rtsp_client:
self.rtsp_client.close()
self.rtsp_client = None
except Exception as e:
print(f"Error during rtsp client cleanup: {e}")
finally:
self.running = False
self.connection_status.emit(self.camera_id, False, "Disconnected")

61
mucapy/Config.py Normal file
View File

@@ -0,0 +1,61 @@
import os
import json
import sys
class Config:
def __init__(self):
# Use platform-specific user directory for config
if sys.platform.startswith('win'):
config_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'MuCaPy')
pictures_dir = os.path.join(os.environ.get('USERPROFILE', os.path.expanduser('~')), 'Pictures', 'MuCaPy')
else:
config_dir = os.path.join(os.path.expanduser('~'), '.config', 'mucapy')
pictures_dir = os.path.join(os.path.expanduser('~'), 'Pictures', 'MuCaPy')
# Create config directory if it doesn't exist
os.makedirs(config_dir, exist_ok=True)
self.config_file = os.path.join(config_dir, 'config.json')
self.settings = {
'network_cameras': {}, # Store network cameras configuration
'last_model_dir': '',
'last_screenshot_dir': pictures_dir,
'last_layout': 0,
'last_fps': 10,
'last_selected_cameras': [],
'window_geometry': None,
'confidence_threshold': 0.35,
}
self.load_config()
def load_config(self):
"""Load configuration from JSON file"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
loaded_settings = json.load(f)
# Update settings while preserving default values for new keys
self.settings.update(loaded_settings)
except Exception as e:
print(f"Error loading config: {e}")
def save_config(self):
"""Save configuration to JSON file"""
try:
# Ensure the file's directory exists
os.makedirs(os.path.dirname(self.config_file), exist_ok=True)
try:
with open(self.config_file, 'w') as f:
json.dump(self.settings, f, indent=4)
except FileNotFoundError:
pass
except Exception as e:
print(f"Error saving config: {e}")
def save_setting(self, key, value):
"""Save a setting to configuration"""
self.settings[key] = value
self.save_config()
def load_setting(self, key, default=None):
"""Load a setting from configuration"""
return self.settings.get(key, default)

246
mucapy/PopoutWindow.py Normal file
View File

@@ -0,0 +1,246 @@
from PyQt5.QtCore import Qt, QTimer, QDateTime, QRect, QEvent
from PyQt5.QtGui import (QImage, QPixmap, QColor, QKeySequence, QPainter,
QPen, QBrush)
from PyQt5.QtWidgets import (QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QScrollArea, QToolButton, QShortcut)
class PopoutWindow(QMainWindow):
"""Enhanced popout window with zoom, pan, overlays and guard-friendly controls"""
def __init__(self, source_display: QLabel, cam_id=None, parent=None):
super().__init__(parent)
self.setWindowTitle(f"Camera {cam_id}" if cam_id is not None else "Camera")
self.source_display = source_display # QLabel providing pixmap updates
self.cam_id = cam_id
self.zoom_factor = 1.0
self.min_zoom = 0.2
self.max_zoom = 5.0
self.paused = False
self.show_grid = False
self.show_timestamp = True
self.setMinimumSize(640, 480)
# Drag-to-pan state
self.dragging = False
self.last_mouse_pos = None
# Central area: toolbar + scrollable image label
central = QWidget()
vbox = QVBoxLayout(central)
vbox.setContentsMargins(4, 4, 4, 4)
vbox.setSpacing(4)
# Toolbar with guard-friendly controls
toolbar = QHBoxLayout()
self.btn_zoom_in = QToolButton()
self.btn_zoom_in.setText("+")
self.btn_zoom_out = QToolButton()
self.btn_zoom_out.setText("-")
self.btn_zoom_reset = QToolButton()
self.btn_zoom_reset.setText("100%")
self.btn_pause = QToolButton()
self.btn_pause.setText("Pause")
self.btn_snapshot = QToolButton()
self.btn_snapshot.setText("Snapshot")
self.btn_grid = QToolButton()
self.btn_grid.setText("Grid")
self.btn_time = QToolButton()
self.btn_time.setText("Time")
self.btn_full = QToolButton()
self.btn_full.setText("Fullscreen")
for b in [self.btn_zoom_out, self.btn_zoom_in, self.btn_zoom_reset, self.btn_pause, self.btn_snapshot,
self.btn_grid, self.btn_time, self.btn_full]:
toolbar.addWidget(b)
toolbar.addStretch(1)
vbox.addLayout(toolbar)
# Scroll area for panning when zoomed
self.image_label = QLabel()
self.image_label.setAlignment(Qt.AlignCenter)
self.scroll = QScrollArea()
self.scroll.setWidget(self.image_label)
self.scroll.setWidgetResizable(True)
vbox.addWidget(self.scroll, 1)
self.setCentralWidget(central)
# Shortcuts
QShortcut(QKeySequence("+"), self, activated=self.zoom_in)
QShortcut(QKeySequence("-"), self, activated=self.zoom_out)
QShortcut(QKeySequence("0"), self, activated=self.reset_zoom)
QShortcut(QKeySequence(Qt.Key_Escape), self, activated=self.close)
QShortcut(QKeySequence("F"), self, activated=self.toggle_fullscreen)
QShortcut(QKeySequence("Ctrl+S"), self, activated=self.take_snapshot)
QShortcut(QKeySequence("Space"), self, activated=self.toggle_pause)
QShortcut(QKeySequence("G"), self, activated=self.toggle_grid)
QShortcut(QKeySequence("T"), self, activated=self.toggle_timestamp)
# Connect buttons
self.btn_zoom_in.clicked.connect(self.zoom_in)
self.btn_zoom_out.clicked.connect(self.zoom_out)
self.btn_zoom_reset.clicked.connect(self.reset_zoom)
self.btn_pause.clicked.connect(self.toggle_pause)
self.btn_snapshot.clicked.connect(self.take_snapshot)
self.btn_grid.clicked.connect(self.toggle_grid)
self.btn_time.clicked.connect(self.toggle_timestamp)
self.btn_full.clicked.connect(self.toggle_fullscreen)
# Timer to refresh from source display
self.timer = QTimer(self)
self.timer.timeout.connect(self.refresh_frame)
self.timer.start(40)
# Mouse wheel zoom support
self.image_label.installEventFilter(self)
# Initial render
self.refresh_frame()
def closeEvent(self, event):
if hasattr(self, 'timer') and self.timer:
self.timer.stop()
return super().closeEvent(event)
def toggle_fullscreen(self):
if self.isFullScreen():
self.showNormal()
self.btn_full.setText("Fullscreen")
else:
self.showFullScreen()
self.btn_full.setText("Windowed")
def toggle_pause(self):
self.paused = not self.paused
self.btn_pause.setText("Resume" if self.paused else "Pause")
def toggle_grid(self):
self.show_grid = not self.show_grid
def toggle_timestamp(self):
self.show_timestamp = not self.show_timestamp
def take_snapshot(self):
# Prefer using source_display method if available
if hasattr(self.source_display, 'take_screenshot'):
self.source_display.take_screenshot()
return
def current_pixmap(self):
pm = self.source_display.pixmap()
return pm
def refresh_frame(self):
if self.paused:
return
pm = self.current_pixmap()
if not pm:
return
# Create a copy to draw overlays without touching original
image = pm.toImage().convertToFormat(QImage.Format_ARGB32)
painter = QPainter(image)
painter.setRenderHint(QPainter.Antialiasing)
# Timestamp overlay
if self.show_timestamp:
ts = QDateTime.currentDateTime().toString('yyyy-MM-dd hh:mm:ss')
text = ts
metrics = painter.fontMetrics()
w = metrics.width(text) + 14
h = metrics.height() + 8
rect = QRect(10, 10, w, h)
painter.setPen(Qt.NoPen)
painter.setBrush(QBrush(QColor(0, 0, 0, 160)))
painter.drawRoundedRect(rect, 6, 6)
painter.setPen(QPen(QColor(255, 255, 255)))
painter.drawText(rect, Qt.AlignCenter, text)
# Grid overlay (rule-of-thirds)
if self.show_grid:
painter.setPen(QPen(QColor(255, 255, 255, 120), 1))
img_w = image.width()
img_h = image.height()
for i in range(1, 3):
x = int(img_w * i / 3)
y = int(img_h * i / 3)
painter.drawLine(x, 0, x, img_h)
painter.drawLine(0, y, img_w, y)
painter.end()
composed = QPixmap.fromImage(image)
if self.zoom_factor != 1.0:
target_w = int(composed.width() * self.zoom_factor)
target_h = int(composed.height() * self.zoom_factor)
composed = composed.scaled(target_w, target_h, Qt.KeepAspectRatio, Qt.SmoothTransformation)
self.image_label.setPixmap(composed)
# Update cursor based on ability to pan at this zoom/size
self.update_cursor()
def zoom_in(self):
self.set_zoom(self.zoom_factor * 1.2)
def zoom_out(self):
self.set_zoom(self.zoom_factor / 1.2)
def reset_zoom(self):
self.set_zoom(1.0)
def set_zoom(self, z):
z = max(self.min_zoom, min(self.max_zoom, z))
if abs(z - self.zoom_factor) > 1e-4:
self.zoom_factor = z
self.refresh_frame()
self.update_cursor()
def can_pan(self):
# Allow panning when the pixmap is larger than the viewport (zoomed)
if not self.image_label.pixmap():
return False
vp = self.scroll.viewport().size()
pm = self.image_label.pixmap().size()
return pm.width() > vp.width() or pm.height() > vp.height()
def update_cursor(self):
if self.can_pan():
self.image_label.setCursor(Qt.OpenHandCursor if not self.dragging else Qt.ClosedHandCursor)
else:
self.image_label.setCursor(Qt.ArrowCursor)
def eventFilter(self, obj, event):
if obj is self.image_label:
# Mouse wheel zoom centered on cursor
if event.type() == QEvent.Wheel:
delta = event.angleDelta().y()
if delta > 0:
self.zoom_in()
else:
self.zoom_out()
return True
# Start drag
if event.type() == QEvent.MouseButtonPress and event.button() == Qt.LeftButton and self.can_pan():
self.dragging = True
self.last_mouse_pos = event.pos()
self.update_cursor()
return True
# Dragging
if event.type() == QEvent.MouseMove and self.dragging and self.last_mouse_pos is not None:
delta = event.pos() - self.last_mouse_pos
hbar = self.scroll.horizontalScrollBar()
vbar = self.scroll.verticalScrollBar()
hbar.setValue(hbar.value() - delta.x())
vbar.setValue(vbar.value() - delta.y())
self.last_mouse_pos = event.pos()
return True
# End drag
if event.type() == QEvent.MouseButtonRelease and event.button() == Qt.LeftButton:
if self.dragging:
self.dragging = False
self.last_mouse_pos = None
self.update_cursor()
return True
if event.type() == QEvent.Enter or event.type() == QEvent.Leave:
# Update cursor when entering/leaving the label
if event.type() == QEvent.Leave:
self.dragging = False
self.last_mouse_pos = None
self.update_cursor()
return super().eventFilter(obj, event)

468
mucapy/YoloClass.py Normal file
View File

@@ -0,0 +1,468 @@
import numpy as np
import cv2
import time
import platform
import os
import subprocess
from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QDateTime, QRect, QThread, pyqtSignal, QMutex, QObject, QEvent
from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter,
QPen, QBrush)
from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QPushButton, QComboBox, QSpinBox,
QFileDialog, QMessageBox, QMenu, QAction, QActionGroup, QGridLayout, QGroupBox,
QDockWidget, QScrollArea, QToolButton, QDialog,
QShortcut, QListWidget, QFormLayout, QLineEdit,
QCheckBox, QTabWidget, QListWidgetItem, QSplitter,
QProgressBar, QSizePolicy)
from CameraThread import CameraThread
from Config import Config
import sys
from CameraScanThread import CameraScanThread
class MultiCamYOLODetector(QObject):
cameras_scanned = pyqtSignal(list, dict) # Emits (available_cameras, index_to_name)
def __init__(self, parent=None):
super().__init__(parent)
self.cameras = []
self.camera_threads = {} # Dictionary to store camera threads
self.net = None
self.classes = []
self.colors = []
self.target_fps = 10
self.last_frame_time = 0
self.frame_interval = 1.0 / self.target_fps
self.available_cameras = []
self.model_dir = ""
self.cuda_available = self.check_cuda()
self.config = Config()
self.latest_frames = {} # Store latest frames from each camera
self.frame_lock = QMutex() # Mutex for thread-safe frame access
self.scan_thread = None # Background scanner thread
self.camera_names = {} # Mapping index->friendly name (best effort)
# Load settings
self.confidence_threshold = self.config.load_setting('confidence_threshold', 0.35)
self.network_cameras = self.config.load_setting('network_cameras', {})
self.target_fps = self.config.load_setting('last_fps', 10)
self.frame_interval = 1.0 / self.target_fps
# Load last used model if available
last_model = self.config.load_setting('last_model_dir')
if last_model and os.path.exists(last_model):
self.load_yolo_model(last_model)
def check_cuda(self):
"""Check if CUDA is available"""
try:
count = cv2.cuda.getCudaEnabledDeviceCount()
return count > 0
except:
return False
def add_network_camera(self, name, url):
"""Add a network camera to the saved list"""
self.network_cameras[name] = url
self.config.save_setting('network_cameras', self.network_cameras)
def remove_network_camera(self, name):
"""Remove a network camera from the saved list"""
if name in self.network_cameras:
del self.network_cameras[name]
self.config.save_setting('network_cameras', self.network_cameras)
def get_platform_backend(self):
"""Get appropriate video capture backend for current platform"""
try:
if sys.platform.startswith('win'):
return cv2.CAP_DSHOW
elif sys.platform.startswith('darwin'):
return cv2.CAP_AVFOUNDATION
else:
return cv2.CAP_V4L2
except Exception:
# Fallback to auto-detect if constants are missing
return cv2.CAP_ANY
def get_camera_names_windows(self, cams):
"""Get camera names on Windows using DirectShow (COM)."""
names = {}
import platform
if platform.system().lower() != "windows":
for c in cams:
names[c] = None
return names
try:
import comtypes
from comtypes import GUID, POINTER, HRESULT, COMMETHOD, BSTR
from ctypes import c_ulong, byref
from comtypes.automation import VARIANT
# GUIDs
CLSID_SystemDeviceEnum = GUID("{62BE5D10-60EB-11D0-BD3B-00A0C911CE86}")
CLSID_VideoInputDeviceCategory = GUID("{860BB310-5D01-11D0-BD3B-00A0C911CE86}")
IID_ICreateDevEnum = GUID("{29840822-5B84-11D0-BD3B-00A0C911CE86}")
IID_IPropertyBag = GUID("{55272A00-42CB-11CE-8135-00AA004BB851}")
# Interfaces
class IEnumMoniker(comtypes.IUnknown):
_iid_ = GUID("{00000102-0000-0000-C000-000000000046}")
_methods_ = [
COMMETHOD([], HRESULT, 'Next',
(['in'], c_ulong, 'celt'),
(['out'], POINTER(POINTER(comtypes.IUnknown)), 'rgelt'),
(['out'], POINTER(c_ulong), 'pceltFetched')),
]
class IPropertyBag(comtypes.IUnknown):
_iid_ = IID_IPropertyBag
_methods_ = [
COMMETHOD([], HRESULT, 'Read',
(['in'], BSTR, 'pszPropName'),
(['in', 'out'], POINTER(VARIANT), 'pVar'),
(['in'], POINTER(comtypes.IUnknown), 'pErrorLog')),
]
class ICreateDevEnum(comtypes.IUnknown):
_iid_ = IID_ICreateDevEnum
_methods_ = [
COMMETHOD([], HRESULT, "CreateClassEnumerator",
(['in'], POINTER(GUID), 'clsidDeviceClass'),
(['out'], POINTER(POINTER(IEnumMoniker)), 'ppEnumMoniker'))
]
comtypes.CoInitialize()
dev_enum = comtypes.CoCreateInstance(
CLSID_SystemDeviceEnum,
interface=ICreateDevEnum
)
enum_moniker = POINTER(IEnumMoniker)()
hr = dev_enum.CreateClassEnumerator(
CLSID_VideoInputDeviceCategory, # pass GUID directly, no byref
byref(enum_moniker) # output pointer is byref
)
if hr != 0 or not enum_moniker:
raise RuntimeError("No video devices found")
device_names = []
fetched = c_ulong()
moniker = POINTER(comtypes.IUnknown)()
while enum_moniker.Next(1, byref(moniker), byref(fetched)) == 0:
prop_bag = moniker.BindToStorage(None, None, IPropertyBag)
if prop_bag:
name_var = VARIANT()
if prop_bag.Read("FriendlyName", byref(name_var), None) == 0:
device_names.append(str(name_var.value))
moniker = POINTER(comtypes.IUnknown)() # release
# map to cams
idx_only = [c for c in cams if not c.startswith("net:") and not c.startswith("/dev/")]
for i, cam in enumerate(idx_only):
names[cam] = device_names[i] if i < len(device_names) else None
except Exception as e:
print(f"get_camera_names_windows failed: {e}")
for c in cams:
names[c] = None
return names
def start_camera_scan(self, max_to_check=10):
"""Start background camera scan; emits cameras_scanned when done."""
try:
if self.scan_thread and self.scan_thread.isRunning():
# Already scanning; ignore
return False
self.scan_thread = CameraScanThread(self, max_to_check)
self.scan_thread.scan_finished.connect(self._on_scan_finished)
self.scan_thread.start()
return True
except Exception as e:
print(f"Failed to start camera scan: {e}")
return False
def _on_scan_finished(self, cams, names):
# Store and forward via public signal
self.available_cameras = cams or []
self.camera_names = names or {}
self.cameras_scanned.emit(self.available_cameras, self.camera_names)
def scan_for_cameras_windows(self, max_to_check=10):
"""Enhanced camera detection for Windows with multiple backend support"""
windows_cameras = []
backends_to_try = [
(cv2.CAP_DSHOW, "DSHOW"),
(cv2.CAP_MSMF, "MSMF"),
(cv2.CAP_ANY, "ANY")
]
for backend, backend_name in backends_to_try:
print(f"Trying {backend_name} backend...")
for i in range(max_to_check):
try:
cap = cv2.VideoCapture(i, backend)
if cap.isOpened():
ret, frame = cap.read()
if ret and frame is not None:
camera_id = f"{backend_name.lower()}:{i}"
if str(i) not in windows_cameras:
windows_cameras.append(str(i))
print(f"Found camera {i} via {backend_name}")
cap.release()
else:
cap.release()
except Exception as e:
print(f"Error checking camera {i} with {backend_name}: {e}")
continue
return windows_cameras
def scan_for_cameras(self, max_to_check=10):
"""Check for available cameras with platform-specific backends"""
self.available_cameras = []
print(f"Scanning for cameras on {sys.platform}...")
# Platform-specific detection
if sys.platform.startswith('win'):
cameras_found = self.scan_for_cameras_windows(max_to_check)
self.available_cameras.extend(cameras_found)
else:
# Linux/Unix/macOS detection
backend = cv2.CAP_AVFOUNDATION if sys.platform.startswith('darwin') else cv2.CAP_V4L2
for i in range(max_to_check):
try:
cap = cv2.VideoCapture(i, backend)
if cap.isOpened():
ret, frame = cap.read()
if ret and frame is not None:
self.available_cameras.append(str(i))
cap.release()
except Exception as e:
print(f"Error checking camera {i}: {e}")
continue
# Linux device paths
if sys.platform.startswith('linux'):
v4l_paths = [f"/dev/video{i}" for i in range(max_to_check)]
for path in v4l_paths:
if os.path.exists(path):
try:
cap = cv2.VideoCapture(path, cv2.CAP_V4L2)
if cap.isOpened() and path not in self.available_cameras:
self.available_cameras.append(path)
cap.release()
except Exception as e:
print(f"Error checking device {path}: {e}")
# Add network cameras
network_count = 0
for name, url in self.network_cameras.items():
self.available_cameras.append(f"net:{name}")
network_count += 1
print(
f"Scan complete: Found {len(self.available_cameras) - network_count} local and {network_count} network cameras")
return self.available_cameras
def load_yolo_model(self, model_dir):
"""Load YOLO model from selected directory with better error handling"""
self.model_dir = model_dir
try:
# Find model files in the directory
weights = [f for f in os.listdir(model_dir) if f.endswith(('.weights', '.onnx'))]
configs = [f for f in os.listdir(model_dir) if f.endswith('.cfg')]
classes = [f for f in os.listdir(model_dir) if f.endswith('.names')]
if not weights or not configs or not classes:
return False
# Use the first found files
weights_path = os.path.join(model_dir, weights[0])
config_path = os.path.join(model_dir, configs[0])
classes_path = os.path.join(model_dir, classes[0])
self.net = cv2.dnn.readNet(weights_path, config_path)
# Set backend based on availability
if self.cuda_available:
try:
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
except:
# Fall back to CPU if CUDA fails
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
else:
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
try:
with open(classes_path, 'r') as f:
self.classes = f.read().strip().split('\n')
except FileNotFoundError:
pass
np.random.seed(42)
self.colors = np.random.randint(0, 255, size=(len(self.classes), 3), dtype='uint8')
return True
except Exception as e:
print(f"Error loading YOLO model: {e}")
return False
def connect_cameras(self, camera_paths):
"""Connect to multiple cameras using background threads for smooth UI"""
self.disconnect_cameras()
# Prepare internal state
self.cameras = [] # store identifiers/paths only
self.latest_frames = {}
# Start one CameraThread per camera
for cam_index, cam_path in enumerate(camera_paths):
try:
thread = CameraThread(cam_index, cam_path, parent=self.parent())
thread.set_fps(self.target_fps)
thread.frame_ready.connect(self._on_frame_ready)
thread.error_occurred.connect(self._on_camera_error)
self.camera_threads[cam_index] = thread
self.cameras.append(cam_path)
self.latest_frames[cam_index] = None
thread.start()
print(f"Started camera thread for {cam_path}")
except Exception as e:
print(f"Error starting camera thread for {cam_path}: {e}")
success_count = len(self.camera_threads)
print(f"Camera connection summary: {success_count}/{len(camera_paths)} camera threads started")
return success_count > 0
def disconnect_cameras(self):
"""Disconnect all cameras (stop threads)"""
# Stop and remove threads
for idx, thread in list(self.camera_threads.items()):
try:
thread.stop()
except Exception:
pass
try:
thread.deleteLater()
except Exception:
pass
self.camera_threads.clear()
self.cameras = []
# Clear cached frames
self.frame_lock.lock()
try:
self.latest_frames = {}
finally:
self.frame_lock.unlock()
def _on_frame_ready(self, cam_id, frame):
"""Cache latest frame from a camera thread (non-blocking for UI)."""
self.frame_lock.lock()
try:
# Store a copy to avoid data races if producer reuses buffers
self.latest_frames[cam_id] = frame.copy()
finally:
self.frame_lock.unlock()
def _on_camera_error(self, cam_id, message):
print(f"Camera {cam_id} error: {message}")
def get_frames(self):
"""Return latest frames without blocking the GUI thread."""
frames = []
# Snapshot current frames under lock
self.frame_lock.lock()
try:
for i, _ in enumerate(self.cameras):
frm = self.latest_frames.get(i)
if frm is None:
frames.append(np.zeros((720, 1280, 3), dtype=np.uint8))
else:
frames.append(frm.copy())
finally:
self.frame_lock.unlock()
# Optionally run detection on the copies
parent_window = self.parent()
if parent_window and self.net is not None and parent_window.detection_enabled:
processed = []
for f in frames:
try:
processed.append(self.get_detections(f))
except Exception:
processed.append(f)
return processed
return frames
def get_detections(self, frame):
"""Perform YOLO object detection on a frame with error handling"""
if self.net is None:
return frame
try:
blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
self.net.setInput(blob)
# Get output layer names compatible with different OpenCV versions
try:
layer_names = self.net.getLayerNames()
output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()]
except:
output_layers = self.net.getUnconnectedOutLayersNames()
outputs = self.net.forward(output_layers)
boxes = []
confidences = []
class_ids = []
for output in outputs:
for detection in output:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
if confidence > self.confidence_threshold: # Use configurable threshold
box = detection[0:4] * np.array([frame.shape[1], frame.shape[0],
frame.shape[1], frame.shape[0]])
(centerX, centerY, width, height) = box.astype('int')
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
class_ids.append(class_id)
indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confidence_threshold, 0.4)
person_detected = False
if len(indices) > 0:
for i in indices.flatten():
(x, y, w, h) = boxes[i]
color = [int(c) for c in self.colors[class_ids[i]]]
cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
cls_name = self.classes[class_ids[i]] if 0 <= class_ids[i] < len(self.classes) else str(
class_ids[i])
text = f"{cls_name}: {confidences[i]:.2f}"
cv2.putText(frame, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
if not person_detected and str(cls_name).lower() == 'person':
person_detected = True
# Auto-trigger alert if a person is detected on any camera and alerts are enabled
try:
if person_detected:
parent_window = self.parent()
if parent_window is not None:
# trigger_alert() has its own internal guards (enabled, cooldown, playing)
parent_window.trigger_alert()
except Exception:
pass
except Exception as e:
print(f"Detection error: {e}")
return frame

File diff suppressed because it is too large Load Diff

View File

@@ -4,3 +4,5 @@ PyQt5==5.15.11
requests==2.32.3
psutil==7.0.0
pytest==8.4.0
comtypes==1.4.13
rtsp==1.1.12