Files
mucapy/mucapy/seperate/CameraThread.py
rattatwinko b313fc7629 seoerated
2025-05-27 20:47:53 +02:00

244 lines
10 KiB
Python

import os
import sys
import cv2
import json
import urllib.parse
import numpy as np
from PyQt5.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QPushButton, QComboBox, QSpinBox,
QFileDialog, QMessageBox, QMenu, QAction, QMenuBar,
QActionGroup, QSizePolicy, QGridLayout, QGroupBox,
QDockWidget, QScrollArea, QToolButton, QDialog,
QShortcut, QListWidget, QFormLayout, QLineEdit,
QCheckBox, QTabWidget, QListWidgetItem, QSplitter)
from PyQt5.QtCore import Qt, QTimer, QDir, QSize, QSettings, QDateTime, QRect, QThread, pyqtSignal, QMutex
from PyQt5.QtGui import (QImage, QPixmap, QIcon, QColor, QKeySequence, QPainter,
QPen, QBrush)
import time
import requests
import subprocess
class CameraThread(QThread):
"""Thread class for handling camera connections and frame grabbing"""
frame_ready = pyqtSignal(int, np.ndarray) # Signal to emit when new frame is ready (camera_index, frame)
error_occurred = pyqtSignal(int, str) # Signal to emit when error occurs (camera_index, error_message)
def __init__(self, camera_id, camera_info, parent=None):
super().__init__(parent)
self.camera_id = camera_id
self.camera_info = camera_info
self.running = False
self.cap = None
self.mutex = QMutex()
self.frame_interval = 1.0 / 30 # Default to 30 FPS
self.reconnect_attempts = 3 # Number of reconnection attempts
self.reconnect_delay = 2 # Delay between reconnection attempts in seconds
def set_fps(self, fps):
"""Set the target FPS for frame capture"""
self.frame_interval = 1.0 / fps
def validate_url(self, url):
"""Validate and normalize URL format"""
try:
# Remove any whitespace
url = url.strip()
# Parse the URL to validate its components
parsed = urllib.parse.urlparse(url)
# Ensure scheme is present
if not parsed.scheme:
url = f"http://{url}"
parsed = urllib.parse.urlparse(url)
# Validate DroidCam URL
if ':4747' in url:
# Ensure the path ends with /video
base_url = f"{parsed.scheme}://{parsed.netloc}"
return f"{base_url}/video"
return url
except Exception as e:
print(f"URL validation error: {e}")
return None
def construct_camera_url(self, camera_info):
"""Construct proper camera URL with authentication if needed"""
try:
if isinstance(camera_info, dict):
url = camera_info.get('url', '')
else:
url = str(camera_info)
# Validate and normalize the URL
url = self.validate_url(url)
if not url:
return None
# Handle authentication if provided
if isinstance(camera_info, dict) and 'username' in camera_info and 'password' in camera_info:
parsed = urllib.parse.urlparse(url)
if '@' not in parsed.netloc:
auth = f"{urllib.parse.quote(camera_info['username'])}:{urllib.parse.quote(camera_info['password'])}"
netloc = f"{auth}@{parsed.netloc}"
url = parsed._replace(netloc=netloc).geturl()
return url
except Exception as e:
print(f"Error constructing camera URL: {e}")
return None
def connect_to_camera(self):
"""Attempt to connect to the camera with retry logic"""
for attempt in range(self.reconnect_attempts):
try:
# Clean up any existing connection
if self.cap is not None:
self.cap.release()
self.cap = None
if isinstance(self.camera_info, str) and self.camera_info.startswith('net:'):
name = self.camera_info[4:]
detector = self.parent().detector if self.parent() else None
if not detector or name not in detector.network_cameras:
self.error_occurred.emit(self.camera_id, f"Network camera {name} not found")
return False
camera_info = detector.network_cameras[name]
url = self.construct_camera_url(camera_info)
if not url:
self.error_occurred.emit(self.camera_id, f"Invalid camera URL for {name}")
return False
print(f"Attempting to connect to network camera URL: {url}")
# For DroidCam, try to verify the endpoint is accessible first
if ':4747' in url:
try:
response = requests.get(url, timeout=2)
if response.status_code != 200:
print(f"DroidCam endpoint returned status {response.status_code}")
if attempt < self.reconnect_attempts - 1:
continue
return False
except requests.exceptions.RequestException as e:
print(f"Failed to connect to DroidCam: {e}")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
# Create VideoCapture with the URL
self.cap = cv2.VideoCapture()
# Set buffer size to minimize latency
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Open the connection
if not self.cap.open(url):
print(f"Failed to open URL: {url}")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
else:
# Local camera
self.cap = cv2.VideoCapture(int(self.camera_info) if str(self.camera_info).isdigit() else self.camera_info)
# Verify the connection is working
if not self.cap.isOpened():
print("Camera not opened")
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
# Test read a frame
ret, frame = self.cap.read()
if not ret or frame is None:
print("Failed to read test frame")
self.cap.release()
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
continue
return False
print(f"Successfully connected to camera")
return True
except Exception as e:
print(f"Connection attempt {attempt + 1} failed: {str(e)}")
if self.cap:
self.cap.release()
self.cap = None
if attempt < self.reconnect_attempts - 1:
time.sleep(self.reconnect_delay)
else:
self.error_occurred.emit(self.camera_id, str(e))
return False
return False
def run(self):
"""Main thread loop"""
try:
if not self.connect_to_camera():
self.error_occurred.emit(self.camera_id, "Failed to connect to camera after multiple attempts")
return
self.running = True
last_frame_time = time.time()
consecutive_failures = 0
while self.running:
self.mutex.lock()
if not self.running:
self.mutex.unlock()
break
# Check if enough time has passed since last frame
current_time = time.time()
if current_time - last_frame_time < self.frame_interval:
self.mutex.unlock()
time.sleep(0.001) # Small sleep to prevent CPU hogging
continue
ret, frame = self.cap.read()
self.mutex.unlock()
if ret:
consecutive_failures = 0 # Reset failure counter on success
self.frame_ready.emit(self.camera_id, frame)
last_frame_time = current_time
else:
consecutive_failures += 1
if consecutive_failures >= 5: # Try to reconnect after 5 consecutive failures
print(f"Multiple frame read failures, attempting to reconnect...")
self.cap.release()
if not self.connect_to_camera():
self.error_occurred.emit(self.camera_id, "Failed to reconnect to camera")
break
consecutive_failures = 0
time.sleep(0.1) # Small delay before next attempt
except Exception as e:
self.error_occurred.emit(self.camera_id, str(e))
finally:
self.cleanup()
def stop(self):
"""Stop the thread safely"""
self.mutex.lock()
self.running = False
self.mutex.unlock()
self.wait()
def cleanup(self):
"""Clean up camera resources"""
if self.cap:
self.cap.release()
self.running = False