From 2887e2927ca26e1c44086fea99c575fdeeb2001a Mon Sep 17 00:00:00 2001 From: rattatwinko Date: Mon, 26 May 2025 17:14:38 +0200 Subject: [PATCH] made netcams work again --- mucapy/main.py | 366 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 250 insertions(+), 116 deletions(-) diff --git a/mucapy/main.py b/mucapy/main.py index fe9da44..ed4fba0 100644 --- a/mucapy/main.py +++ b/mucapy/main.py @@ -15,6 +15,7 @@ from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QSettings, QDateTime, QRect, Q from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter, QPen, QBrush) import time +import requests class Config: def __init__(self): @@ -85,39 +86,167 @@ class CameraThread(QThread): self.cap = None self.mutex = QMutex() self.frame_interval = 1.0 / 30 # Default to 30 FPS + self.reconnect_attempts = 3 # Number of reconnection attempts + self.reconnect_delay = 2 # Delay between reconnection attempts in seconds def set_fps(self, fps): """Set the target FPS for frame capture""" self.frame_interval = 1.0 / fps + def validate_url(self, url): + """Validate and normalize URL format""" + try: + # Remove any whitespace + url = url.strip() + + # Parse the URL to validate its components + parsed = urllib.parse.urlparse(url) + + # Ensure scheme is present + if not parsed.scheme: + url = f"http://{url}" + parsed = urllib.parse.urlparse(url) + + # Validate DroidCam URL + if ':4747' in url: + # Ensure the path ends with /video + base_url = f"{parsed.scheme}://{parsed.netloc}" + return f"{base_url}/video" + + return url + except Exception as e: + print(f"URL validation error: {e}") + return None + + def construct_camera_url(self, camera_info): + """Construct proper camera URL with authentication if needed""" + try: + if isinstance(camera_info, dict): + url = camera_info.get('url', '') + else: + url = str(camera_info) + + # Validate and normalize the URL + url = self.validate_url(url) + if not url: + return None + + # Handle authentication if provided + if isinstance(camera_info, dict) and 'username' in camera_info and 'password' in camera_info: + parsed = urllib.parse.urlparse(url) + if '@' not in parsed.netloc: + auth = f"{urllib.parse.quote(camera_info['username'])}:{urllib.parse.quote(camera_info['password'])}" + netloc = f"{auth}@{parsed.netloc}" + url = parsed._replace(netloc=netloc).geturl() + + return url + except Exception as e: + print(f"Error constructing camera URL: {e}") + return None + + def connect_to_camera(self): + """Attempt to connect to the camera with retry logic""" + for attempt in range(self.reconnect_attempts): + try: + # Clean up any existing connection + if self.cap is not None: + self.cap.release() + self.cap = None + + if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'): + name = self.camera_info[4:] + detector = self.parent().detector if self.parent() else None + if not detector or name not in detector.network_cameras: + self.error_occurred.emit(self.camera_id, f"Network camera {name} not found") + return False + + camera_info = detector.network_cameras[name] + url = self.construct_camera_url(camera_info) + + if not url: + self.error_occurred.emit(self.camera_id, f"Invalid camera URL for {name}") + return False + + print(f"Attempting to connect to network camera URL: {url}") + + # For DroidCam, try to verify the endpoint is accessible first + if ':4747' in url: + try: + response = requests.get(url, timeout=2) + if response.status_code != 200: + print(f"DroidCam endpoint returned status {response.status_code}") + if attempt < self.reconnect_attempts - 1: + continue + return False + except requests.exceptions.RequestException as e: + print(f"Failed to connect to DroidCam: {e}") + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + # Create VideoCapture with the URL + self.cap = cv2.VideoCapture() + # Set buffer size to minimize latency + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + # Open the connection + if not self.cap.open(url): + print(f"Failed to open URL: {url}") + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + else: + # Local camera + self.cap = cv2.VideoCapture(int(self.camera_info) if str(self.camera_info).isdigit() else self.camera_info) + + # Verify the connection is working + if not self.cap.isOpened(): + print("Camera not opened") + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + # Test read a frame + ret, frame = self.cap.read() + if not ret or frame is None: + print("Failed to read test frame") + self.cap.release() + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + continue + return False + + print(f"Successfully connected to camera") + return True + + except Exception as e: + print(f"Connection attempt {attempt + 1} failed: {str(e)}") + if self.cap: + self.cap.release() + self.cap = None + + if attempt < self.reconnect_attempts - 1: + time.sleep(self.reconnect_delay) + else: + self.error_occurred.emit(self.camera_id, str(e)) + return False + + return False + def run(self): """Main thread loop""" try: - # Connect to camera - if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'): - name = self.camera_info[4:] - detector = self.parent().detector if self.parent() else None - if detector and name in detector.network_cameras: - camera_info = detector.network_cameras[name] - if isinstance(camera_info, dict): - url = camera_info['url'] - if 'username' in camera_info and 'password' in camera_info: - parsed = urllib.parse.urlparse(url) - netloc = f"{camera_info['username']}:{camera_info['password']}@{parsed.netloc}" - url = parsed._replace(netloc=netloc).geturl() - else: - url = camera_info - self.cap = cv2.VideoCapture(url) - else: - # Local camera - self.cap = cv2.VideoCapture(int(self.camera_info) if str(self.camera_info).isdigit() else self.camera_info) - - if not self.cap.isOpened(): - self.error_occurred.emit(self.camera_id, "Failed to open camera") + if not self.connect_to_camera(): + self.error_occurred.emit(self.camera_id, "Failed to connect to camera after multiple attempts") return self.running = True last_frame_time = time.time() + consecutive_failures = 0 while self.running: self.mutex.lock() @@ -136,11 +265,19 @@ class CameraThread(QThread): self.mutex.unlock() if ret: + consecutive_failures = 0 # Reset failure counter on success self.frame_ready.emit(self.camera_id, frame) last_frame_time = current_time else: - self.error_occurred.emit(self.camera_id, "Failed to read frame") - break + consecutive_failures += 1 + if consecutive_failures >= 5: # Try to reconnect after 5 consecutive failures + print(f"Multiple frame read failures, attempting to reconnect...") + self.cap.release() + if not self.connect_to_camera(): + self.error_occurred.emit(self.camera_id, "Failed to reconnect to camera") + break + consecutive_failures = 0 + time.sleep(0.1) # Small delay before next attempt except Exception as e: self.error_occurred.emit(self.camera_id, str(e)) @@ -197,57 +334,49 @@ class MultiCamYOLODetector: except: return False - def add_network_camera(self, name, camera_info): + def add_network_camera(self, name, url): """Add a network camera to the saved list""" - self.network_cameras[name] = camera_info - # Save to configuration immediately - self.config.settings['network_cameras'] = self.network_cameras - self.config.save_config() + self.network_cameras[name] = url + self.config.save_setting('network_cameras', self.network_cameras) def remove_network_camera(self, name): """Remove a network camera from the saved list""" if name in self.network_cameras: del self.network_cameras[name] - # Save to configuration immediately - self.config.settings['network_cameras'] = self.network_cameras - self.config.save_config() + self.config.save_setting('network_cameras', self.network_cameras) def scan_for_cameras(self, max_to_check=10): """Check for available cameras including network cameras""" self.available_cameras = [] - # Try numeric indices first (this works on all platforms) + # Check standard video devices for i in range(max_to_check): try: - cap = cv2.VideoCapture(i) + cap = cv2.VideoCapture(i, cv2.CAP_V4L2) if cap.isOpened(): self.available_cameras.append(str(i)) cap.release() except: continue - # Platform-specific device path checks - if sys.platform.startswith('linux'): - if os.path.exists('/dev'): - for i in range(max_to_check): - device_path = f"/dev/video{i}" - if os.path.exists(device_path) and device_path not in self.available_cameras: - try: - cap = cv2.VideoCapture(device_path) - if cap.isOpened(): - self.available_cameras.append(device_path) - cap.release() - except: - continue + # Check direct device paths + v4l_paths = [f"/dev/video{i}" for i in range(max_to_check)] + v4l_paths += [f"/dev/v4l/video{i}" for i in range(max_to_check)] + + for path in v4l_paths: + if os.path.exists(path): + try: + cap = cv2.VideoCapture(path, cv2.CAP_V4L2) + if cap.isOpened(): + if path not in self.available_cameras: + self.available_cameras.append(path) + cap.release() + except: + continue # Add saved network cameras - for name, camera_info in self.network_cameras.items(): - if isinstance(camera_info, dict): - url = camera_info.get('url', '') - self.available_cameras.append(f"net:{name}") # Use name instead of URL for better identification - else: - # Handle old format where camera_info was just the URL - self.available_cameras.append(f"net:{name}") + for name, url in self.network_cameras.items(): + self.available_cameras.append(f"net:{name}") return self.available_cameras @@ -294,64 +423,76 @@ class MultiCamYOLODetector: return False def connect_cameras(self, camera_paths): - """Connect to multiple cameras using threads""" + """Connect to multiple cameras including network cameras""" self.disconnect_cameras() - success = True - for i, cam_path in enumerate(camera_paths): + for cam_path in camera_paths: try: - thread = CameraThread(i, cam_path) - thread.frame_ready.connect(self.handle_new_frame) - thread.error_occurred.connect(self.handle_camera_error) - thread.set_fps(self.target_fps) - self.camera_threads[i] = thread - self.cameras.append((cam_path, None)) # Store camera path for reference - thread.start() + if isinstance(cam_path, str): + if cam_path.startswith('net:'): + # Handle network camera + name = cam_path[4:] # Remove 'net:' prefix + if name in self.network_cameras: + url = self.network_cameras[name] + print(f"Connecting to network camera URL: {url}") # Debug print + cap = cv2.VideoCapture(url, cv2.CAP_ANY) # Use CAP_ANY for network streams + else: + print(f"Network camera {name} not found in saved cameras") + continue + elif cam_path.startswith('/dev/'): + # Handle device path + cap = cv2.VideoCapture(cam_path, cv2.CAP_V4L2) + else: + # Handle numeric index + cap = cv2.VideoCapture(int(cam_path), cv2.CAP_V4L2) + else: + cap = cv2.VideoCapture(int(cam_path), cv2.CAP_V4L2) + + if not cap.isOpened(): + print(f"Warning: Could not open camera {cam_path}") + continue + + # Try to set properties but continue if they fail + try: + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + cap.set(cv2.CAP_PROP_FPS, self.target_fps) + cap.set(cv2.CAP_PROP_BUFFERSIZE, 2) + except: + pass + + self.cameras.append((cam_path, cap)) except Exception as e: - print(f"Error connecting to camera {cam_path}: {e}") - success = False + print(f"Error opening camera {cam_path}: {e}") - return success + return len(self.cameras) > 0 def disconnect_cameras(self): - """Disconnect all cameras and stop threads""" - for thread in self.camera_threads.values(): - thread.stop() - self.camera_threads.clear() - self.cameras.clear() - self.latest_frames.clear() - - def handle_new_frame(self, camera_index, frame): - """Handle new frame from camera thread""" - self.frame_lock.lock() - try: - # Apply YOLO detection - processed_frame = self.get_detections(frame) - self.latest_frames[camera_index] = processed_frame - finally: - self.frame_lock.unlock() - - def handle_camera_error(self, camera_index, error_message): - """Handle camera errors""" - print(f"Camera {camera_index} error: {error_message}") - # You might want to implement more sophisticated error handling here + """Disconnect all cameras""" + for _, cam in self.cameras: + try: + cam.release() + except: + pass + self.cameras = [] def get_frames(self): - """Get the latest frames from all cameras""" - self.frame_lock.lock() - try: - frames = [] - for i in range(len(self.cameras)): - frame = self.latest_frames.get(i) - if frame is None: - # Create blank frame if no frame is available + """Get frames from all cameras with error handling""" + frames = [] + for i, (cam_path, cam) in enumerate(self.cameras): + try: + ret, frame = cam.read() + if not ret: + print(f"Warning: Could not read frame from camera {cam_path}") frame = np.zeros((720, 1280, 3), dtype=np.uint8) - cv2.putText(frame, "No Signal", (480, 360), - cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255, 255, 255), 2) + else: + frame = self.get_detections(frame) frames.append(frame) - return frames - finally: - self.frame_lock.unlock() + except: + frame = np.zeros((720, 1280, 3), dtype=np.uint8) + frames.append(frame) + + return frames def get_detections(self, frame): """Perform YOLO object detection on a frame with error handling""" @@ -798,26 +939,19 @@ class NetworkCameraDialog(QDialog): QMessageBox.warning(self, "Error", "Please enter both name and URL") return + # Ensure URL has proper format for DroidCam + if ':4747' in url: + if not url.endswith('/video'): + url = url.rstrip('/') + '/video' + if not url.startswith('http://') and not url.startswith('https://'): + url = 'http://' + url + if self.detector: - camera_info = {'url': url} - - # Add authentication if enabled - if self.auth_checkbox.isChecked(): - username = self.username_edit.text().strip() - password = self.password_edit.text().strip() - if username and password: - camera_info['username'] = username - camera_info['password'] = password - - self.detector.add_network_camera(name, camera_info) + print(f"Adding network camera: {name} with URL: {url}") # Debug print + self.detector.add_network_camera(name, url) self.load_cameras() - - # Clear fields self.name_edit.clear() self.url_edit.clear() - self.username_edit.clear() - self.password_edit.clear() - self.auth_checkbox.setChecked(False) def remove_camera(self): """Remove selected network camera"""