made netcams work again

This commit is contained in:
rattatwinko
2025-05-26 17:14:38 +02:00
parent 6da4cd2b40
commit 2887e2927c

View File

@@ -15,6 +15,7 @@ from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QSettings, QDateTime, QRect, Q
from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter,
QPen, QBrush)
import time
import requests
class Config:
def __init__(self):
@@ -85,39 +86,167 @@ class CameraThread(QThread):
self.cap = None
self.mutex = QMutex()
self.frame_interval = 1.0 / 30 # Default to 30 FPS
self.reconnect_attempts = 3 # Number of reconnection attempts
self.reconnect_delay = 2 # Delay between reconnection attempts in seconds
def set_fps(self, fps):
"""Set the target FPS for frame capture"""
self.frame_interval = 1.0 / fps
def validate_url(self, url):
"""Validate and normalize URL format"""
try:
# Remove any whitespace
url = url.strip()
# Parse the URL to validate its components
parsed = urllib.parse.urlparse(url)
# Ensure scheme is present
if not parsed.scheme:
url = f"http://{url}"
parsed = urllib.parse.urlparse(url)
# Validate DroidCam URL
if ':4747' in url:
# Ensure the path ends with /video
base_url = f"{parsed.scheme}://{parsed.netloc}"
return f"{base_url}/video"
return url
except Exception as e:
print(f"URL validation error: {e}")
return None
def construct_camera_url(self, camera_info):
"""Construct proper camera URL with authentication if needed"""
try:
if isinstance(camera_info, dict):
url = camera_info.get('url', '')
else:
url = str(camera_info)
# Validate and normalize the URL
url = self.validate_url(url)
if not url:
return None
# Handle authentication if provided
if isinstance(camera_info, dict) and 'username' in camera_info and 'password' in camera_info:
parsed = urllib.parse.urlparse(url)
if '@' not in parsed.netloc:
auth = f"{urllib.parse.quote(camera_info['username'])}:{urllib.parse.quote(camera_info['password'])}"
netloc = f"{auth}@{parsed.netloc}"
url = parsed._replace(netloc=netloc).geturl()
return url
except Exception as e:
print(f"Error constructing camera URL: {e}")
return None
def connect_to_camera(self):
"""Attempt to connect to the camera with retry logic"""
for attempt in range(self.reconnect_attempts):
try:
# Clean up any existing connection
if self.cap is not None:
self.cap.release()
self.cap = None
if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'):
name = self.camera_info[4:]
detector = self.parent().detector if self.parent() else None
if not detector or name not in detector.network_cameras:
self.error_occurred.emit(self.camera_id, f"Network camera {name} not found")
return False
camera_info = detector.network_cameras[name]
url = self.construct_camera_url(camera_info)
if not url:
self.error_occurred.emit(self.camera_id, f"Invalid camera URL for {name}")
return False
print(f"Attempting to connect to network camera URL: {url}")
# For DroidCam, try to verify the endpoint is accessible first
if ':4747' in url:
try:
response = requests.get(url, timeout=2)
if response.status_code != 200:
print(f"DroidCam endpoint returned status {response.status_code}")
if attempt < self.reconnect_attempts - 1:
continue
return False
except requests.exceptions.RequestException as e:
print(f"Failed to connect to DroidCam: {e}")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
# Create VideoCapture with the URL
self.cap = cv2.VideoCapture()
# Set buffer size to minimize latency
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Open the connection
if not self.cap.open(url):
print(f"Failed to open URL: {url}")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
else:
# Local camera
self.cap = cv2.VideoCapture(int(self.camera_info) if str(self.camera_info).isdigit() else self.camera_info)
# Verify the connection is working
if not self.cap.isOpened():
print("Camera not opened")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
# Test read a frame
ret, frame = self.cap.read()
if not ret or frame is None:
print("Failed to read test frame")
self.cap.release()
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
print(f"Successfully connected to camera")
return True
except Exception as e:
print(f"Connection attempt {attempt + 1} failed: {str(e)}")
if self.cap:
self.cap.release()
self.cap = None
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
else:
self.error_occurred.emit(self.camera_id, str(e))
return False
return False
def run(self):
"""Main thread loop"""
try:
# Connect to camera
if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'):
name = self.camera_info[4:]
detector = self.parent().detector if self.parent() else None
if detector and name in detector.network_cameras:
camera_info = detector.network_cameras[name]
if isinstance(camera_info, dict):
url = camera_info['url']
if 'username' in camera_info and 'password' in camera_info:
parsed = urllib.parse.urlparse(url)
netloc = f"{camera_info['username']}:{camera_info['password']}@{parsed.netloc}"
url = parsed._replace(netloc=netloc).geturl()
else:
url = camera_info
self.cap = cv2.VideoCapture(url)
else:
# Local camera
self.cap = cv2.VideoCapture(int(self.camera_info) if str(self.camera_info).isdigit() else self.camera_info)
if not self.cap.isOpened():
self.error_occurred.emit(self.camera_id, "Failed to open camera")
if not self.connect_to_camera():
self.error_occurred.emit(self.camera_id, "Failed to connect to camera after multiple attempts")
return
self.running = True
last_frame_time = time.time()
consecutive_failures = 0
while self.running:
self.mutex.lock()
@@ -136,11 +265,19 @@ class CameraThread(QThread):
self.mutex.unlock()
if ret:
consecutive_failures = 0 # Reset failure counter on success
self.frame_ready.emit(self.camera_id, frame)
last_frame_time = current_time
else:
self.error_occurred.emit(self.camera_id, "Failed to read frame")
break
consecutive_failures += 1
if consecutive_failures >= 5: # Try to reconnect after 5 consecutive failures
print(f"Multiple frame read failures, attempting to reconnect...")
self.cap.release()
if not self.connect_to_camera():
self.error_occurred.emit(self.camera_id, "Failed to reconnect to camera")
break
consecutive_failures = 0
time.sleep(0.1) # Small delay before next attempt
except Exception as e:
self.error_occurred.emit(self.camera_id, str(e))
@@ -197,57 +334,49 @@ class MultiCamYOLODetector:
except:
return False
def add_network_camera(self, name, camera_info):
def add_network_camera(self, name, url):
"""Add a network camera to the saved list"""
self.network_cameras[name] = camera_info
# Save to configuration immediately
self.config.settings['network_cameras'] = self.network_cameras
self.config.save_config()
self.network_cameras[name] = url
self.config.save_setting('network_cameras', self.network_cameras)
def remove_network_camera(self, name):
"""Remove a network camera from the saved list"""
if name in self.network_cameras:
del self.network_cameras[name]
# Save to configuration immediately
self.config.settings['network_cameras'] = self.network_cameras
self.config.save_config()
self.config.save_setting('network_cameras', self.network_cameras)
def scan_for_cameras(self, max_to_check=10):
"""Check for available cameras including network cameras"""
self.available_cameras = []
# Try numeric indices first (this works on all platforms)
# Check standard video devices
for i in range(max_to_check):
try:
cap = cv2.VideoCapture(i)
cap = cv2.VideoCapture(i, cv2.CAP_V4L2)
if cap.isOpened():
self.available_cameras.append(str(i))
cap.release()
except:
continue
# Platform-specific device path checks
if sys.platform.startswith('linux'):
if os.path.exists('/dev'):
for i in range(max_to_check):
device_path = f"/dev/video{i}"
if os.path.exists(device_path) and device_path not in self.available_cameras:
try:
cap = cv2.VideoCapture(device_path)
if cap.isOpened():
self.available_cameras.append(device_path)
cap.release()
except:
continue
# Check direct device paths
v4l_paths = [f"/dev/video{i}" for i in range(max_to_check)]
v4l_paths += [f"/dev/v4l/video{i}" for i in range(max_to_check)]
for path in v4l_paths:
if os.path.exists(path):
try:
cap = cv2.VideoCapture(path, cv2.CAP_V4L2)
if cap.isOpened():
if path not in self.available_cameras:
self.available_cameras.append(path)
cap.release()
except:
continue
# Add saved network cameras
for name, camera_info in self.network_cameras.items():
if isinstance(camera_info, dict):
url = camera_info.get('url', '')
self.available_cameras.append(f"net:{name}") # Use name instead of URL for better identification
else:
# Handle old format where camera_info was just the URL
self.available_cameras.append(f"net:{name}")
for name, url in self.network_cameras.items():
self.available_cameras.append(f"net:{name}")
return self.available_cameras
@@ -294,64 +423,76 @@ class MultiCamYOLODetector:
return False
def connect_cameras(self, camera_paths):
"""Connect to multiple cameras using threads"""
"""Connect to multiple cameras including network cameras"""
self.disconnect_cameras()
success = True
for i, cam_path in enumerate(camera_paths):
for cam_path in camera_paths:
try:
thread = CameraThread(i, cam_path)
thread.frame_ready.connect(self.handle_new_frame)
thread.error_occurred.connect(self.handle_camera_error)
thread.set_fps(self.target_fps)
self.camera_threads[i] = thread
self.cameras.append((cam_path, None)) # Store camera path for reference
thread.start()
if isinstance(cam_path, str):
if cam_path.startswith('net:'):
# Handle network camera
name = cam_path[4:] # Remove 'net:' prefix
if name in self.network_cameras:
url = self.network_cameras[name]
print(f"Connecting to network camera URL: {url}") # Debug print
cap = cv2.VideoCapture(url, cv2.CAP_ANY) # Use CAP_ANY for network streams
else:
print(f"Network camera {name} not found in saved cameras")
continue
elif cam_path.startswith('/dev/'):
# Handle device path
cap = cv2.VideoCapture(cam_path, cv2.CAP_V4L2)
else:
# Handle numeric index
cap = cv2.VideoCapture(int(cam_path), cv2.CAP_V4L2)
else:
cap = cv2.VideoCapture(int(cam_path), cv2.CAP_V4L2)
if not cap.isOpened():
print(f"Warning: Could not open camera {cam_path}")
continue
# Try to set properties but continue if they fail
try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
cap.set(cv2.CAP_PROP_FPS, self.target_fps)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)
except:
pass
self.cameras.append((cam_path, cap))
except Exception as e:
print(f"Error connecting to camera {cam_path}: {e}")
success = False
print(f"Error opening camera {cam_path}: {e}")
return success
return len(self.cameras) > 0
def disconnect_cameras(self):
"""Disconnect all cameras and stop threads"""
for thread in self.camera_threads.values():
thread.stop()
self.camera_threads.clear()
self.cameras.clear()
self.latest_frames.clear()
def handle_new_frame(self, camera_index, frame):
"""Handle new frame from camera thread"""
self.frame_lock.lock()
try:
# Apply YOLO detection
processed_frame = self.get_detections(frame)
self.latest_frames[camera_index] = processed_frame
finally:
self.frame_lock.unlock()
def handle_camera_error(self, camera_index, error_message):
"""Handle camera errors"""
print(f"Camera {camera_index} error: {error_message}")
# You might want to implement more sophisticated error handling here
"""Disconnect all cameras"""
for _, cam in self.cameras:
try:
cam.release()
except:
pass
self.cameras = []
def get_frames(self):
"""Get the latest frames from all cameras"""
self.frame_lock.lock()
try:
frames = []
for i in range(len(self.cameras)):
frame = self.latest_frames.get(i)
if frame is None:
# Create blank frame if no frame is available
"""Get frames from all cameras with error handling"""
frames = []
for i, (cam_path, cam) in enumerate(self.cameras):
try:
ret, frame = cam.read()
if not ret:
print(f"Warning: Could not read frame from camera {cam_path}")
frame = np.zeros((720, 1280, 3), dtype=np.uint8)
cv2.putText(frame, "No Signal", (480, 360),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255, 255, 255), 2)
else:
frame = self.get_detections(frame)
frames.append(frame)
return frames
finally:
self.frame_lock.unlock()
except:
frame = np.zeros((720, 1280, 3), dtype=np.uint8)
frames.append(frame)
return frames
def get_detections(self, frame):
"""Perform YOLO object detection on a frame with error handling"""
@@ -798,26 +939,19 @@ class NetworkCameraDialog(QDialog):
QMessageBox.warning(self, "Error", "Please enter both name and URL")
return
# Ensure URL has proper format for DroidCam
if ':4747' in url:
if not url.endswith('/video'):
url = url.rstrip('/') + '/video'
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://' + url
if self.detector:
camera_info = {'url': url}
# Add authentication if enabled
if self.auth_checkbox.isChecked():
username = self.username_edit.text().strip()
password = self.password_edit.text().strip()
if username and password:
camera_info['username'] = username
camera_info['password'] = password
self.detector.add_network_camera(name, camera_info)
print(f"Adding network camera: {name} with URL: {url}") # Debug print
self.detector.add_network_camera(name, url)
self.load_cameras()
# Clear fields
self.name_edit.clear()
self.url_edit.clear()
self.username_edit.clear()
self.password_edit.clear()
self.auth_checkbox.setChecked(False)
def remove_camera(self):
"""Remove selected network camera"""