safer camerathread

This commit is contained in:
2025-11-01 18:23:32 +01:00
parent cffc59a285
commit 1300c41172

View File

@@ -1,19 +1,25 @@
import time import time
import urllib.parse import urllib.parse
from enum import Enum from enum import Enum
import logging
import traceback
from typing import Optional, Dict, Any
import cv2 import cv2
import numpy as np import numpy as np
import requests import requests
from PyQt5.QtCore import QThread, pyqtSignal, QMutex from PyQt5.QtCore import QThread, pyqtSignal, QMutex, QWaitCondition
# Optional: Try to import rtsp library for better RTSP handling
try: try:
import rtsp import rtsp
RTSP_LIB_AVAILABLE = True RTSP_LIB_AVAILABLE = True
except ImportError: except ImportError:
RTSP_LIB_AVAILABLE = False RTSP_LIB_AVAILABLE = False
print("rtsp library not available. Install with: pip install rtsp") logging.info("rtsp library not available. Install with: pip install rtsp")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StreamType(Enum): class StreamType(Enum):
@@ -27,58 +33,123 @@ class StreamType(Enum):
class CameraThread(QThread): class CameraThread(QThread):
"""Enhanced thread class for handling various camera connections and frame grabbing""" # Signals
frame_ready = pyqtSignal(int, np.ndarray) frame_ready = pyqtSignal(int, np.ndarray)
error_occurred = pyqtSignal(int, str) error_occurred = pyqtSignal(int, str)
connection_status = pyqtSignal(int, bool, str) # camera_id, connected, message connection_status = pyqtSignal(int, bool, str) # camera_id, connected, message
stats_updated = pyqtSignal(int, dict) # camera_id, stats
def __init__(self, camera_id, camera_info, parent=None): def __init__(self, camera_id, camera_info, parent=None):
super().__init__(parent) super().__init__(parent)
self.camera_id = camera_id self.camera_id = camera_id
self.camera_info = camera_info self.camera_info = camera_info
self.running = False self.running = False
self.paused = False
self.cap = None self.cap = None
self.rtsp_client = None # For rtsp library client self.rtsp_client = None
self.mutex = QMutex() self.mutex = QMutex()
self.condition = QWaitCondition()
# Configuration with safe defaults
self.frame_interval = 1.0 / 30 # Default to 30 FPS self.frame_interval = 1.0 / 30 # Default to 30 FPS
self.reconnect_attempts = 5 self.max_reconnect_attempts = 10
self.reconnect_delay = 2 self.reconnect_delay = 2
self.stream_type = None self.reconnect_backoff = 1.5 # Exponential backoff factor
self.read_timeout = 5.0 self.read_timeout = 5.0
self.connection_timeout = 10 self.connection_timeout = 10
self.use_rtsp_lib = RTSP_LIB_AVAILABLE # Use rtsp library if available self.max_consecutive_failures = 15
self.health_check_interval = 5.0
# State tracking
self.stream_type = None
self.use_rtsp_lib = RTSP_LIB_AVAILABLE
self.last_successful_frame = 0
self.consecutive_failures = 0
self.total_failures = 0
self.total_frames = 0
self.last_health_check = 0
self.connection_attempts = 0
# Statistics
self.stats = {
'fps': 0,
'total_frames': 0,
'total_failures': 0,
'connection_attempts': 0,
'uptime': 0,
'start_time': 0,
'last_frame_time': 0
}
def set_fps(self, fps): def set_fps(self, fps):
"""Set the target FPS for frame capture""" """Set the target FPS for frame capture"""
if fps > 0: try:
self.frame_interval = 1.0 / fps if fps > 0 and fps <= 120: # Reasonable bounds
self.frame_interval = 1.0 / fps
logger.info(f"Camera {self.camera_id}: FPS set to {fps}")
else:
logger.warning(f"Camera {self.camera_id}: Invalid FPS value {fps}")
except Exception as e:
logger.error(f"Camera {self.camera_id}: Error setting FPS: {e}")
def safe_emit(self, signal, *args):
try:
if self.isRunning():
signal.emit(*args)
except Exception as e:
logger.error(f"Camera {self.camera_id}: Signal emit failed: {e}")
def update_stats(self):
try:
current_time = time.time()
if self.stats['last_frame_time'] > 0:
time_diff = current_time - self.stats['last_frame_time']
if time_diff < 5: # Only update FPS if we have recent frames
self.stats['fps'] = 1.0 / time_diff if time_diff > 0 else 0
self.stats['total_frames'] = self.total_frames
self.stats['total_failures'] = self.total_failures
self.stats['connection_attempts'] = self.connection_attempts
self.stats['uptime'] = current_time - self.stats['start_time'] if self.stats['start_time'] > 0 else 0
self.safe_emit(self.stats_updated, self.camera_id, self.stats.copy())
except Exception as e:
logger.debug(f"Camera {self.camera_id}: Stats update error: {e}")
def detect_stream_type(self, url_or_info): def detect_stream_type(self, url_or_info):
"""Detect the type of stream based on URL or camera info""" try:
if isinstance(url_or_info, (int, str)): if isinstance(url_or_info, (int, str)):
url_str = str(url_or_info) url_str = str(url_or_info).strip().lower()
if url_str.isdigit(): if url_str.isdigit():
return StreamType.LOCAL return StreamType.LOCAL
elif url_str.startswith('rtsp://'): elif url_str.startswith('rtsp://'):
return StreamType.RTSP return StreamType.RTSP
elif url_str.startswith('net:'): elif url_str.startswith('net:'):
return StreamType.NETWORK return StreamType.NETWORK
elif ':4747' in url_str or 'droidcam' in url_str.lower(): elif ':4747' in url_str or 'droidcam' in url_str:
return StreamType.DROIDCAM return StreamType.DROIDCAM
elif url_str.startswith(('http://', 'https://')): elif url_str.startswith(('http://', 'https://')):
return StreamType.HTTP_MJPEG return StreamType.HTTP_MJPEG
else: else:
return StreamType.IP_CAMERA # Try to parse as IP camera
if any(x in url_str for x in ['.', ':']):
return StreamType.IP_CAMERA
return StreamType.LOCAL # Fallback
return StreamType.NETWORK return StreamType.NETWORK
except Exception as e:
logger.error(f"Camera {self.camera_id}: Stream type detection failed: {e}")
return StreamType.IP_CAMERA # Safe fallback
@staticmethod @staticmethod
def validate_url(url): def validate_url(url):
"""Validate and normalize URL format""" """Safely validate and normalize URL format"""
try: try:
url = url.strip() if not url or not isinstance(url, str):
return None
url = url.strip()
if not url: if not url:
return None return None
@@ -99,11 +170,11 @@ class CameraThread(QThread):
return url return url
except Exception as e: except Exception as e:
print(f"URL validation error: {e}") logger.error(f"URL validation error: {e}")
return None return None
def construct_camera_url(self, camera_info): def construct_camera_url(self, camera_info):
"""Construct proper camera URL with authentication if needed""" """Safely construct proper camera URL with authentication if needed"""
try: try:
if isinstance(camera_info, dict): if isinstance(camera_info, dict):
url = camera_info.get('url', '') url = camera_info.get('url', '')
@@ -129,11 +200,35 @@ class CameraThread(QThread):
return url return url
except Exception as e: except Exception as e:
print(f"Error constructing camera URL: {e}") logger.error(f"Camera {self.camera_id}: Error constructing camera URL: {e}")
return None return None
def safe_capture_release(self):
"""Safely release OpenCV capture"""
try:
if self.cap is not None:
self.cap.release()
self.cap = None
logger.debug(f"Camera {self.camera_id}: Capture released")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: Error releasing capture: {e}")
finally:
self.cap = None
def safe_rtsp_close(self):
"""Safely close RTSP client"""
try:
if self.rtsp_client is not None:
self.rtsp_client.close()
self.rtsp_client = None
logger.debug(f"Camera {self.camera_id}: RTSP client closed")
except Exception as e:
logger.debug(f"Camera {self.camera_id}: Error closing RTSP client: {e}")
finally:
self.rtsp_client = None
def configure_capture(self, cap, stream_type): def configure_capture(self, cap, stream_type):
"""Configure VideoCapture object based on stream type""" """Safely configure VideoCapture object based on stream type"""
try: try:
# Common settings # Common settings
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
@@ -151,63 +246,69 @@ class CameraThread(QThread):
cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000)
cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000)
logger.debug(f"Camera {self.camera_id}: Capture configured for {stream_type.value}")
except Exception as e: except Exception as e:
print(f"Warning: Could not configure capture settings: {e}") logger.warning(f"Camera {self.camera_id}: Could not configure capture settings: {e}")
def test_network_endpoint(self, url, timeout=3): def test_network_endpoint(self, url, timeout=3):
"""Test if a network endpoint is accessible""" """Safely test if a network endpoint is accessible"""
try: try:
response = requests.head(url, timeout=timeout, allow_redirects=True) response = requests.head(url, timeout=timeout, allow_redirects=True)
return response.status_code in [200, 401] accessible = response.status_code in [200, 401, 403] # 401/403 means it's there but needs auth
logger.debug(f"Camera {self.camera_id}: Network test for {url}: {accessible}")
return accessible
except requests.exceptions.RequestException: except requests.exceptions.RequestException:
try: try:
response = requests.get(url, timeout=timeout, stream=True) response = requests.get(url, timeout=timeout, stream=True)
response.close() response.close()
return response.status_code in [200, 401] accessible = response.status_code in [200, 401, 403]
except Exception: logger.debug(f"Camera {self.camera_id}: Network test (GET) for {url}: {accessible}")
return accessible
except Exception as e:
logger.debug(f"Camera {self.camera_id}: Network test failed for {url}: {e}")
return False return False
except Exception as e:
logger.debug(f"Camera {self.camera_id}: Network test error for {url}: {e}")
return False
def connect_rtsp_with_library(self, url): def connect_rtsp_with_library(self, url):
"""Connect to RTSP stream using the rtsp library""" """Safely connect to RTSP stream using the rtsp library"""
if not self.use_rtsp_lib:
return False
try: try:
print(f" Attempting connection with rtsp library...") logger.info(f"Camera {self.camera_id}: Attempting RTSP library connection...")
self.rtsp_client = rtsp.Client(rtsp_server_uri=url, verbose=False) self.rtsp_client = rtsp.Client(rtsp_server_uri=url, verbose=False)
# Test if connection works # Test if connection works
if self.rtsp_client.isOpened(): if self.rtsp_client.isOpened():
# Try to read a frame # Try to read a frame with timeout
frame = self.rtsp_client.read() start_time = time.time()
if frame is not None: while time.time() - start_time < self.read_timeout:
print(f" Successfully connected with rtsp library") frame = self.rtsp_client.read()
return True if frame is not None:
else: logger.info(f"Camera {self.camera_id}: Successfully connected with rtsp library")
print(f" Failed to read frame with rtsp library") return True
self.rtsp_client.close() time.sleep(0.1)
self.rtsp_client = None
else: logger.warning(f"Camera {self.camera_id}: Failed to connect with rtsp library")
print(f" rtsp library failed to open stream") self.safe_rtsp_close()
self.rtsp_client = None return False
except Exception as e: except Exception as e:
print(f" rtsp library error: {e}") logger.warning(f"Camera {self.camera_id}: RTSP library error: {e}")
if self.rtsp_client: self.safe_rtsp_close()
try: return False
self.rtsp_client.close()
except Exception:
pass
self.rtsp_client = None
return False
def connect_rtsp_with_opencv(self, url): def connect_rtsp_with_opencv(self, url):
"""Connect to RTSP stream using OpenCV with different transport protocols""" """Safely connect to RTSP stream using OpenCV with different transport protocols"""
import os import os
transports = ['tcp', 'udp', 'http'] transports = ['tcp', 'udp', 'http']
for transport in transports: for transport in transports:
try: try:
print(f" Trying RTSP with {transport.upper()} transport...") logger.info(f"Camera {self.camera_id}: Trying RTSP with {transport.upper()} transport...")
# Set FFMPEG options # Set FFMPEG options
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = ( os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = (
@@ -221,64 +322,55 @@ class CameraThread(QThread):
self.configure_capture(self.cap, StreamType.RTSP) self.configure_capture(self.cap, StreamType.RTSP)
if not self.cap.isOpened(): if not self.cap.isOpened():
print(f" Failed to open with {transport}") logger.debug(f"Camera {self.camera_id}: Failed to open with {transport}")
self.cap.release() self.safe_capture_release()
continue continue
# Try to read a frame # Try to read a frame with timeout
start_time = time.time() start_time = time.time()
while time.time() - start_time < 5: while time.time() - start_time < self.read_timeout:
ret, frame = self.cap.read() ret, frame = self.cap.read()
if ret and frame is not None and frame.size > 0: if ret and frame is not None and frame.size > 0:
print(f" Successfully connected with {transport.upper()}") logger.info(f"Camera {self.camera_id}: Successfully connected with {transport.upper()}")
return True return True
time.sleep(0.1) time.sleep(0.1)
print(f" Failed to read frame with {transport}") logger.debug(f"Camera {self.camera_id}: Failed to read frame with {transport}")
self.cap.release() self.safe_capture_release()
except Exception as e: except Exception as e:
print(f" Error with {transport}: {e}") logger.debug(f"Camera {self.camera_id}: Error with {transport}: {e}")
if self.cap: self.safe_capture_release()
self.cap.release()
self.cap = None
return False return False
def connect_to_camera(self): def connect_to_camera(self):
"""Attempt to connect to the camera with enhanced retry logic""" """Safely attempt to connect to the camera with enhanced retry logic"""
for attempt in range(self.reconnect_attempts): self.connection_attempts += 1
for attempt in range(self.max_reconnect_attempts):
try: try:
# Clean up existing connections # Clean up existing connections
if self.cap is not None: self.safe_capture_release()
try: self.safe_rtsp_close()
self.cap.release()
except Exception:
pass
self.cap = None
if self.rtsp_client is not None:
try:
self.rtsp_client.close()
except Exception:
pass
self.rtsp_client = None
# Determine camera source # Determine camera source
if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'): if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'):
name = self.camera_info[4:] name = self.camera_info[4:]
detector = self.parent().detector if self.parent() else None detector = self.parent().detector if self.parent() else None
if not detector or name not in detector.network_cameras: if not detector or name not in getattr(detector, 'network_cameras', {}):
self.connection_status.emit(self.camera_id, False, f"Network camera {name} not found") self.safe_emit(self.connection_status, self.camera_id, False, f"Network camera {name} not found")
return False time.sleep(self.reconnect_delay * (self.reconnect_backoff ** attempt))
continue
camera_info = detector.network_cameras[name] camera_info = detector.network_cameras[name]
url = self.construct_camera_url(camera_info) url = self.construct_camera_url(camera_info)
if not url: if not url:
self.connection_status.emit(self.camera_id, False, f"Invalid URL for {name}") self.safe_emit(self.connection_status, self.camera_id, False, f"Invalid URL for {name}")
return False time.sleep(self.reconnect_delay * (self.reconnect_backoff ** attempt))
continue
self.stream_type = self.detect_stream_type(url) self.stream_type = self.detect_stream_type(url)
camera_source = url camera_source = url
@@ -287,8 +379,9 @@ class CameraThread(QThread):
if isinstance(self.camera_info, dict): if isinstance(self.camera_info, dict):
url = self.construct_camera_url(self.camera_info) url = self.construct_camera_url(self.camera_info)
if not url: if not url:
self.connection_status.emit(self.camera_id, False, "Invalid camera URL") self.safe_emit(self.connection_status, self.camera_id, False, "Invalid camera URL")
return False time.sleep(self.reconnect_delay * (self.reconnect_backoff ** attempt))
continue
camera_source = url camera_source = url
self.stream_type = self.detect_stream_type(url) self.stream_type = self.detect_stream_type(url)
else: else:
@@ -298,227 +391,252 @@ class CameraThread(QThread):
if self.stream_type != StreamType.LOCAL: if self.stream_type != StreamType.LOCAL:
camera_source = self.validate_url(str(camera_source)) camera_source = self.validate_url(str(camera_source))
if not camera_source: if not camera_source:
self.connection_status.emit(self.camera_id, False, "Invalid camera source") self.safe_emit(self.connection_status, self.camera_id, False, "Invalid camera source")
return False time.sleep(self.reconnect_delay * (self.reconnect_backoff ** attempt))
continue
print(f"Attempt {attempt + 1}/{self.reconnect_attempts}: Connecting to {self.stream_type.value} camera...") logger.info(f"Camera {self.camera_id}: Attempt {attempt + 1}/{self.max_reconnect_attempts} connecting to {self.stream_type.value}...")
# Test network endpoint for HTTP streams # Test network endpoint for HTTP streams
if self.stream_type in [StreamType.HTTP_MJPEG, StreamType.DROIDCAM, StreamType.IP_CAMERA]: if self.stream_type in [StreamType.HTTP_MJPEG, StreamType.DROIDCAM, StreamType.IP_CAMERA]:
if not self.test_network_endpoint(camera_source): if not self.test_network_endpoint(camera_source):
print(f"Network endpoint not accessible") logger.warning(f"Camera {self.camera_id}: Network endpoint not accessible")
if attempt < self.reconnect_attempts - 1: time.sleep(self.reconnect_delay * (self.reconnect_backoff ** attempt))
time.sleep(self.reconnect_delay) continue
continue
self.connection_status.emit(self.camera_id, False, "Network endpoint not accessible")
return False
# Connect based on stream type # Connect based on stream type
success = False
if self.stream_type == StreamType.LOCAL: if self.stream_type == StreamType.LOCAL:
self.cap = cv2.VideoCapture(int(camera_source)) try:
self.configure_capture(self.cap, self.stream_type) self.cap = cv2.VideoCapture(int(camera_source))
self.configure_capture(self.cap, self.stream_type)
if not self.cap.isOpened(): if self.cap.isOpened():
print("Failed to open local camera") # Test frame reading
if attempt < self.reconnect_attempts - 1: ret, frame = self.cap.read()
time.sleep(self.reconnect_delay) if ret and frame is not None:
continue success = True
return False except Exception as e:
logger.warning(f"Camera {self.camera_id}: Local camera error: {e}")
# Test frame reading
ret, frame = self.cap.read()
if not ret or frame is None:
print("Failed to read from local camera")
self.cap.release()
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
elif self.stream_type == StreamType.RTSP: elif self.stream_type == StreamType.RTSP:
# Try rtsp library first if available # Try rtsp library first if available
if self.use_rtsp_lib and self.connect_rtsp_with_library(camera_source): if self.use_rtsp_lib and self.connect_rtsp_with_library(camera_source):
self.connection_status.emit(self.camera_id, True, "Connected (rtsp lib)") success = True
return True elif self.connect_rtsp_with_opencv(camera_source):
success = True
# Fall back to OpenCV with different transports
if self.connect_rtsp_with_opencv(camera_source):
self.connection_status.emit(self.camera_id, True, "Connected (opencv)")
return True
print("All RTSP connection methods failed")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
else: else:
# HTTP MJPEG, DroidCam, IP Camera # HTTP MJPEG, DroidCam, IP Camera
self.cap = cv2.VideoCapture(camera_source, cv2.CAP_FFMPEG) try:
self.configure_capture(self.cap, self.stream_type) self.cap = cv2.VideoCapture(camera_source, cv2.CAP_FFMPEG)
self.configure_capture(self.cap, self.stream_type)
if not self.cap.isOpened(): if self.cap.isOpened():
print("Failed to open stream") # Test frame reading with timeout
if attempt < self.reconnect_attempts - 1: start_time = time.time()
time.sleep(self.reconnect_delay) ret, frame = False, None
continue while time.time() - start_time < self.read_timeout:
return False ret, frame = self.cap.read()
if ret and frame is not None and frame.size > 0:
success = True
break
time.sleep(0.1)
except Exception as e:
logger.warning(f"Camera {self.camera_id}: Network camera error: {e}")
# Test frame reading if success:
start_time = time.time() logger.info(f"Camera {self.camera_id}: Successfully connected")
ret, frame = False, None self.safe_emit(self.connection_status, self.camera_id, True, "Connected")
while time.time() - start_time < self.read_timeout: self.consecutive_failures = 0
ret, frame = self.cap.read() return True
if ret and frame is not None and frame.size > 0: else:
break logger.warning(f"Camera {self.camera_id}: Connection attempt {attempt + 1} failed")
time.sleep(0.1) self.safe_capture_release()
self.safe_rtsp_close()
if not ret or frame is None or frame.size == 0: if attempt < self.max_reconnect_attempts - 1:
print("Failed to read frames") delay = self.reconnect_delay * (self.reconnect_backoff ** attempt)
self.cap.release() logger.info(f"Camera {self.camera_id}: Retrying in {delay:.1f}s...")
if attempt < self.reconnect_attempts - 1: time.sleep(delay)
time.sleep(self.reconnect_delay)
continue
return False
print(f"Successfully connected to camera")
self.connection_status.emit(self.camera_id, True, "Connected")
return True
except Exception as e: except Exception as e:
print(f"Connection attempt {attempt + 1} failed: {str(e)}") logger.error(f"Camera {self.camera_id}: Connection attempt {attempt + 1} error: {e}")
self.safe_capture_release()
self.safe_rtsp_close()
if self.cap: if attempt < self.max_reconnect_attempts - 1:
try: time.sleep(self.reconnect_delay * (self.reconnect_backoff ** attempt))
self.cap.release()
except Exception:
pass
self.cap = None
if self.rtsp_client:
try:
self.rtsp_client.close()
except Exception:
pass
self.rtsp_client = None
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
else:
self.connection_status.emit(self.camera_id, False, str(e))
self.error_occurred.emit(self.camera_id, str(e))
return False
logger.error(f"Camera {self.camera_id}: All connection attempts failed")
self.safe_emit(self.connection_status, self.camera_id, False, "Connection failed")
self.safe_emit(self.error_occurred, self.camera_id, "Failed to connect after multiple attempts")
return False return False
def run(self): def run(self):
"""Main thread loop with enhanced error handling""" self.stats['start_time'] = time.time()
try: try:
logger.info(f"Camera {self.camera_id}: Thread starting")
if not self.connect_to_camera(): if not self.connect_to_camera():
self.error_occurred.emit(self.camera_id, "Failed to connect after multiple attempts") logger.error(f"Camera {self.camera_id}: Initial connection failed")
return return
self.running = True self.running = True
last_frame_time = time.time() last_frame_time = 0
consecutive_failures = 0 self.last_health_check = time.time()
last_reconnect_time = time.time()
while self.running: while self.running:
self.mutex.lock()
should_continue = self.running
self.mutex.unlock()
if not should_continue:
break
# Frame rate limiting
current_time = time.time()
if current_time - last_frame_time < self.frame_interval:
time.sleep(0.001)
continue
# Read frame based on connection type
try: try:
if self.rtsp_client: # Check if paused
# Using rtsp library if self.paused:
frame = self.rtsp_client.read() time.sleep(0.1)
ret = frame is not None continue
if ret:
# Convert PIL Image to numpy array # Frame rate limiting
frame = np.array(frame) current_time = time.time()
# Convert RGB to BGR for OpenCV compatibility if current_time - last_frame_time < self.frame_interval:
if len(frame.shape) == 3 and frame.shape[2] == 3: time.sleep(0.001)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) continue
else:
# Using OpenCV # Health check
ret, frame = self.cap.read() if current_time - self.last_health_check > self.health_check_interval:
if self.consecutive_failures > self.max_consecutive_failures / 2:
logger.warning(f"Camera {self.camera_id}: Health check failed, reconnecting...")
if not self.connect_to_camera():
break
self.last_health_check = current_time
# Read frame based on connection type
frame = None
ret = False
try:
if self.rtsp_client and self.rtsp_client.isOpened():
frame = self.rtsp_client.read()
ret = frame is not None
if ret:
# Convert PIL Image to numpy array if needed
if hasattr(frame, 'size'): # Likely PIL Image
frame = np.array(frame)
if len(frame.shape) == 3 and frame.shape[2] == 3:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
elif self.cap and self.cap.isOpened():
ret, frame = self.cap.read()
else:
ret = False
except Exception as e:
logger.debug(f"Camera {self.camera_id}: Frame read error: {e}")
ret = False
if ret and frame is not None and frame.size > 0: if ret and frame is not None and frame.size > 0:
consecutive_failures = 0 # Validate frame
self.frame_ready.emit(self.camera_id, frame) if (isinstance(frame, np.ndarray) and
last_frame_time = current_time len(frame.shape) in [2, 3] and
frame.shape[0] > 0 and frame.shape[1] > 0):
self.consecutive_failures = 0
self.total_frames += 1
self.stats['last_frame_time'] = current_time
last_frame_time = current_time
self.safe_emit(self.frame_ready, self.camera_id, frame)
self.update_stats()
else:
self.handle_frame_failure()
else: else:
consecutive_failures += 1 self.handle_frame_failure()
if consecutive_failures >= 10: # Brief sleep to prevent CPU overload
if current_time - last_reconnect_time > 5: time.sleep(0.001)
print("Multiple failures, attempting reconnection...")
self.connection_status.emit(self.camera_id, False, "Reconnecting...")
if self.cap:
self.cap.release()
if self.rtsp_client:
self.rtsp_client.close()
if self.connect_to_camera():
consecutive_failures = 0
last_reconnect_time = current_time
else:
self.error_occurred.emit(self.camera_id, "Reconnection failed")
break
else:
consecutive_failures = 0
time.sleep(0.1)
except Exception as e: except Exception as e:
print(f"Error reading frame: {e}") logger.error(f"Camera {self.camera_id}: Main loop error: {e}")
consecutive_failures += 1 self.handle_frame_failure()
time.sleep(0.1) time.sleep(0.1) # Longer sleep on error
except Exception as e: except Exception as e:
self.error_occurred.emit(self.camera_id, f"Thread error: {str(e)}") logger.critical(f"Camera {self.camera_id}: Critical thread error: {e}")
self.safe_emit(self.error_occurred, self.camera_id, f"Thread crash: {str(e)}")
finally: finally:
logger.info(f"Camera {self.camera_id}: Thread stopping")
self.cleanup() self.cleanup()
def handle_frame_failure(self):
"""Handle frame reading failures with reconnection logic"""
self.consecutive_failures += 1
self.total_failures += 1
if self.consecutive_failures >= self.max_consecutive_failures:
logger.warning(f"Camera {self.camera_id}: Too many failures, attempting reconnection...")
self.safe_emit(self.connection_status, self.camera_id, False, "Reconnecting...")
if not self.connect_to_camera():
logger.error(f"Camera {self.camera_id}: Reconnection failed, stopping thread")
self.running = False
else:
self.consecutive_failures = 0
def stop(self): def stop(self):
"""Stop the thread safely""" """Safely stop the thread"""
logger.info(f"Camera {self.camera_id}: Stopping thread...")
self.mutex.lock() self.mutex.lock()
self.running = False self.running = False
self.mutex.unlock() self.mutex.unlock()
if not self.wait(5000): # Wake up thread if it's waiting
print(f"Warning: Camera thread {self.camera_id} did not stop gracefully") self.condition.wakeAll()
self.terminate()
if not self.wait(3000): # 3 second timeout
logger.warning(f"Camera {self.camera_id}: Thread did not stop gracefully, terminating...")
try:
self.terminate()
if not self.wait(1000):
logger.error(f"Camera {self.camera_id}: Thread termination failed")
except Exception as e:
logger.error(f"Camera {self.camera_id}: Error during termination: {e}")
else:
logger.info(f"Camera {self.camera_id}: Thread stopped gracefully")
def pause(self):
"""Pause frame capture"""
self.paused = True
logger.info(f"Camera {self.camera_id}: Paused")
def resume(self):
"""Resume frame capture"""
self.paused = False
logger.info(f"Camera {self.camera_id}: Resumed")
def cleanup(self): def cleanup(self):
"""Clean up camera resources""" """Comprehensive cleanup of all resources"""
print(f"Cleaning up camera {self.camera_id}") logger.info(f"Camera {self.camera_id}: Cleaning up resources...")
try:
if self.cap:
self.cap.release()
self.cap = None
except Exception as e:
print(f"Error during cap cleanup: {e}")
try: try:
if self.rtsp_client:
self.rtsp_client.close()
self.rtsp_client = None
except Exception as e:
print(f"Error during rtsp client cleanup: {e}")
finally:
self.running = False self.running = False
self.connection_status.emit(self.camera_id, False, "Disconnected") self.safe_capture_release()
self.safe_rtsp_close()
self.safe_emit(self.connection_status, self.camera_id, False, "Disconnected")
self.update_stats()
logger.info(f"Camera {self.camera_id}: Cleanup completed")
except Exception as e:
logger.error(f"Camera {self.camera_id}: Cleanup error: {e}")
def get_status(self) -> Dict[str, Any]:
"""Get current camera status"""
return {
'running': self.running,
'paused': self.paused,
'connected': (self.cap is not None and self.cap.isOpened()) or
(self.rtsp_client is not None and self.rtsp_client.isOpened()),
'stream_type': self.stream_type.value if self.stream_type else 'unknown',
'consecutive_failures': self.consecutive_failures,
'total_frames': self.total_frames,
'total_failures': self.total_failures,
'stats': self.stats.copy()
}