dirty shitt

This commit is contained in:
rattatwinko
2025-05-26 16:41:58 +02:00
parent c264acac29
commit b80dd3f7d7

View File

@@ -11,9 +11,10 @@ from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout
QDockWidget, QScrollArea, QToolButton, QDialog, QDockWidget, QScrollArea, QToolButton, QDialog,
QShortcut, QListWidget, QFormLayout, QLineEdit, QShortcut, QListWidget, QFormLayout, QLineEdit,
QCheckBox, QTabWidget, QListWidgetItem, QSplitter) QCheckBox, QTabWidget, QListWidgetItem, QSplitter)
from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QSettings, QDateTime, QRect from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QSettings, QDateTime, QRect, QThread, pyqtSignal, QMutex
from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter, from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter,
QPen, QBrush) QPen, QBrush)
import time
class Config: class Config:
def __init__(self): def __init__(self):
@@ -61,9 +62,99 @@ class Config:
"""Load a setting from configuration""" """Load a setting from configuration"""
return self.settings.get(key, default) return self.settings.get(key, default)
class CameraThread(QThread):
"""Thread class for handling camera connections and frame grabbing"""
frame_ready = pyqtSignal(int, np.ndarray) # Signal to emit when new frame is ready (camera_index, frame)
error_occurred = pyqtSignal(int, str) # Signal to emit when error occurs (camera_index, error_message)
def __init__(self, camera_id, camera_info, parent=None):
super().__init__(parent)
self.camera_id = camera_id
self.camera_info = camera_info
self.running = False
self.cap = None
self.mutex = QMutex()
self.frame_interval = 1.0 / 30 # Default to 30 FPS
def set_fps(self, fps):
"""Set the target FPS for frame capture"""
self.frame_interval = 1.0 / fps
def run(self):
"""Main thread loop"""
try:
# Connect to camera
if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'):
name = self.camera_info[4:]
detector = self.parent().detector if self.parent() else None
if detector and name in detector.network_cameras:
camera_info = detector.network_cameras[name]
if isinstance(camera_info, dict):
url = camera_info['url']
if 'username' in camera_info and 'password' in camera_info:
parsed = urllib.parse.urlparse(url)
netloc = f"{camera_info['username']}:{camera_info['password']}@{parsed.netloc}"
url = parsed._replace(netloc=netloc).geturl()
else:
url = camera_info
self.cap = cv2.VideoCapture(url)
else:
# Local camera
self.cap = cv2.VideoCapture(int(self.camera_info) if str(self.camera_info).isdigit() else self.camera_info)
if not self.cap.isOpened():
self.error_occurred.emit(self.camera_id, "Failed to open camera")
return
self.running = True
last_frame_time = time.time()
while self.running:
self.mutex.lock()
if not self.running:
self.mutex.unlock()
break
# Check if enough time has passed since last frame
current_time = time.time()
if current_time - last_frame_time < self.frame_interval:
self.mutex.unlock()
time.sleep(0.001) # Small sleep to prevent CPU hogging
continue
ret, frame = self.cap.read()
self.mutex.unlock()
if ret:
self.frame_ready.emit(self.camera_id, frame)
last_frame_time = current_time
else:
self.error_occurred.emit(self.camera_id, "Failed to read frame")
break
except Exception as e:
self.error_occurred.emit(self.camera_id, str(e))
finally:
self.cleanup()
def stop(self):
"""Stop the thread safely"""
self.mutex.lock()
self.running = False
self.mutex.unlock()
self.wait()
def cleanup(self):
"""Clean up camera resources"""
if self.cap:
self.cap.release()
self.running = False
class MultiCamYOLODetector: class MultiCamYOLODetector:
def __init__(self): def __init__(self):
self.cameras = [] self.cameras = []
self.camera_threads = {} # Dictionary to store camera threads
self.net = None self.net = None
self.classes = [] self.classes = []
self.colors = [] self.colors = []
@@ -74,6 +165,8 @@ class MultiCamYOLODetector:
self.model_dir = "" self.model_dir = ""
self.cuda_available = self.check_cuda() self.cuda_available = self.check_cuda()
self.config = Config() self.config = Config()
self.latest_frames = {} # Store latest frames from each camera
self.frame_lock = QMutex() # Mutex for thread-safe frame access
# Load settings # Load settings
self.confidence_threshold = self.config.load_setting('confidence_threshold', 0.35) self.confidence_threshold = self.config.load_setting('confidence_threshold', 0.35)
@@ -190,66 +283,64 @@ class MultiCamYOLODetector:
return False return False
def connect_cameras(self, camera_paths): def connect_cameras(self, camera_paths):
"""Connect to multiple cameras including network cameras with authentication""" """Connect to multiple cameras using threads"""
self.disconnect_cameras() self.disconnect_cameras()
success = True
for cam_path in camera_paths: for i, cam_path in enumerate(camera_paths):
try: try:
if isinstance(cam_path, str): thread = CameraThread(i, cam_path)
if cam_path.startswith('net:'): thread.frame_ready.connect(self.handle_new_frame)
# Handle network camera thread.error_occurred.connect(self.handle_camera_error)
camera_name = cam_path[4:] # Remove 'net:' prefix to get camera name thread.set_fps(self.target_fps)
camera_info = self.network_cameras.get(camera_name) self.camera_threads[i] = thread
self.cameras.append((cam_path, None)) # Store camera path for reference
if isinstance(camera_info, dict): thread.start()
url = camera_info['url']
# Add authentication to URL if provided
if 'username' in camera_info and 'password' in camera_info:
# Parse URL and add authentication
parsed = urllib.parse.urlparse(url)
netloc = f"{camera_info['username']}:{camera_info['password']}@{parsed.netloc}"
url = parsed._replace(netloc=netloc).geturl()
else:
# Handle old format where camera_info was just the URL
url = camera_info
cap = cv2.VideoCapture(url)
elif cam_path.startswith('/dev/'):
# Handle device path
cap = cv2.VideoCapture(cam_path)
else:
# Handle numeric index
cap = cv2.VideoCapture(int(cam_path))
else:
cap = cv2.VideoCapture(int(cam_path))
if not cap.isOpened():
print(f"Warning: Could not open camera {cam_path}")
continue
# Try to set properties but continue if they fail
try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
cap.set(cv2.CAP_PROP_FPS, self.target_fps)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)
except:
pass
self.cameras.append((cam_path, cap))
except Exception as e: except Exception as e:
print(f"Error opening camera {cam_path}: {e}") print(f"Error connecting to camera {cam_path}: {e}")
success = False
return len(self.cameras) > 0 return success
def disconnect_cameras(self): def disconnect_cameras(self):
"""Disconnect all cameras""" """Disconnect all cameras and stop threads"""
for _, cam in self.cameras: for thread in self.camera_threads.values():
try: thread.stop()
cam.release() self.camera_threads.clear()
except: self.cameras.clear()
pass self.latest_frames.clear()
self.cameras = []
def handle_new_frame(self, camera_index, frame):
"""Handle new frame from camera thread"""
self.frame_lock.lock()
try:
# Apply YOLO detection
processed_frame = self.get_detections(frame)
self.latest_frames[camera_index] = processed_frame
finally:
self.frame_lock.unlock()
def handle_camera_error(self, camera_index, error_message):
"""Handle camera errors"""
print(f"Camera {camera_index} error: {error_message}")
# You might want to implement more sophisticated error handling here
def get_frames(self):
"""Get the latest frames from all cameras"""
self.frame_lock.lock()
try:
frames = []
for i in range(len(self.cameras)):
frame = self.latest_frames.get(i)
if frame is None:
# Create blank frame if no frame is available
frame = np.zeros((720, 1280, 3), dtype=np.uint8)
cv2.putText(frame, "No Signal", (480, 360),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255, 255, 255), 2)
frames.append(frame)
return frames
finally:
self.frame_lock.unlock()
def get_detections(self, frame): def get_detections(self, frame):
"""Perform YOLO object detection on a frame with error handling""" """Perform YOLO object detection on a frame with error handling"""
@@ -303,24 +394,6 @@ class MultiCamYOLODetector:
print(f"Detection error: {e}") print(f"Detection error: {e}")
return frame return frame
def get_frames(self):
"""Get frames from all cameras with error handling"""
frames = []
for i, (cam_path, cam) in enumerate(self.cameras):
try:
ret, frame = cam.read()
if not ret:
print(f"Warning: Could not read frame from camera {cam_path}")
frame = np.zeros((720, 1280, 3), dtype=np.uint8)
else:
frame = self.get_detections(frame)
frames.append(frame)
except:
frame = np.zeros((720, 1280, 3), dtype=np.uint8)
frames.append(frame)
return frames
class CameraDisplay(QLabel): class CameraDisplay(QLabel):
"""Custom QLabel for displaying camera feed with fullscreen support""" """Custom QLabel for displaying camera feed with fullscreen support"""