Files
mucapy/web_server.py
2025-05-27 16:05:04 +02:00

788 lines
30 KiB
Python

import os
import cv2
import json
import numpy as np
from flask import Flask, Response, render_template, jsonify, request
from flask_cors import CORS
import threading
import time
import queue
import urllib.parse
import glob
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
def find_yolo_model():
"""Scan current directory and subdirectories for YOLO model files"""
# Look for common YOLO file patterns
weights_files = glob.glob('**/*.weights', recursive=True) + glob.glob('**/*.onnx', recursive=True)
cfg_files = glob.glob('**/*.cfg', recursive=True)
names_files = glob.glob('**/*.names', recursive=True)
# Find directories containing all required files
model_dirs = set()
for weights in weights_files:
directory = os.path.dirname(weights)
if not directory:
directory = '.'
# Check if this directory has all required files
has_cfg = any(cfg for cfg in cfg_files if os.path.dirname(cfg) == directory)
has_names = any(names for names in names_files if os.path.dirname(names) == directory)
if has_cfg and has_names:
model_dirs.add(directory)
# Return the first valid directory found, or None
return next(iter(model_dirs), None)
class YOLODetector:
def __init__(self):
self.net = None
self.classes = []
self.colors = []
self.confidence_threshold = 0.35
self.cuda_available = self.check_cuda()
self.model_loaded = False
self.current_model = None
def check_cuda(self):
"""Check if CUDA is available"""
try:
count = cv2.cuda.getCudaEnabledDeviceCount()
return count > 0
except:
return False
def scan_for_model(self):
"""Auto-scan for YOLO model files in current directory"""
try:
# Look for model files in current directory
weights = [f for f in os.listdir('.') if f.endswith(('.weights', '.onnx'))]
configs = [f for f in os.listdir('.') if f.endswith('.cfg')]
classes = [f for f in os.listdir('.') if f.endswith('.names')]
if weights and configs and classes:
self.load_yolo_model('.', weights[0], configs[0], classes[0])
return True
return False
except Exception as e:
print(f"Error scanning for model: {e}")
return False
def load_yolo_model(self, model_dir, weights_file=None, config_file=None, classes_file=None):
"""Load YOLO model with specified files or auto-detect"""
try:
if not weights_file:
weights = [f for f in os.listdir(model_dir) if f.endswith(('.weights', '.onnx'))]
configs = [f for f in os.listdir(model_dir) if f.endswith('.cfg')]
classes = [f for f in os.listdir(model_dir) if f.endswith('.names')]
if not (weights and configs and classes):
return False
weights_file = weights[0]
config_file = configs[0]
classes_file = classes[0]
weights_path = os.path.join(model_dir, weights_file)
config_path = os.path.join(model_dir, config_file)
classes_path = os.path.join(model_dir, classes_file)
self.net = cv2.dnn.readNet(weights_path, config_path)
self.current_model = weights_file
if self.cuda_available:
try:
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
except:
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
with open(classes_path, 'r') as f:
self.classes = f.read().strip().split('\n')
np.random.seed(42)
self.colors = np.random.randint(0, 255, size=(len(self.classes), 3), dtype='uint8')
self.model_loaded = True
return True
except Exception as e:
print(f"Error loading model: {e}")
self.model_loaded = False
return False
def get_camera_resolution(self, cap):
"""Get the optimal resolution for a camera"""
try:
# Common resolutions to try
resolutions = [
(1920, 1080), # Full HD
(1280, 720), # HD
(800, 600), # SVGA
(640, 480) # VGA
]
best_width = 640
best_height = 480
for width, height in resolutions:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
actual_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
if actual_width > 0 and actual_height > 0:
best_width = actual_width
best_height = actual_height
break
return int(best_width), int(best_height)
except:
return 640, 480
def scan_cameras(self):
"""Scan for available cameras, skipping video0"""
cameras = []
# Start from video1 since video0 is often empty or system camera
for i in range(1, 10):
try:
cap = cv2.VideoCapture(i)
if cap.isOpened():
# Get optimal resolution
width, height = self.get_camera_resolution(cap)
cameras.append({
'id': i,
'width': width,
'height': height
})
cap.release()
except:
continue
# Check device paths
for i in range(1, 10):
path = f"/dev/video{i}"
if os.path.exists(path):
try:
cap = cv2.VideoCapture(path)
if cap.isOpened():
width, height = self.get_camera_resolution(cap)
cameras.append({
'id': path,
'width': width,
'height': height
})
cap.release()
except:
continue
return cameras
def detect(self, frame):
"""Perform object detection on frame"""
if self.net is None or not self.model_loaded:
return frame
try:
height, width = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), swapRB=True, crop=False)
self.net.setInput(blob)
try:
layer_names = self.net.getLayerNames()
output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()]
except:
output_layers = self.net.getUnconnectedOutLayersNames()
outputs = self.net.forward(output_layers)
# Process detections
boxes = []
confidences = []
class_ids = []
for output in outputs:
for detection in output:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
if confidence > self.confidence_threshold:
# Convert YOLO coords to screen coords
center_x = int(detection[0] * width)
center_y = int(detection[1] * height)
w = int(detection[2] * width)
h = int(detection[3] * height)
# Get top-left corner
x = max(0, int(center_x - w/2))
y = max(0, int(center_y - h/2))
boxes.append([x, y, w, h])
confidences.append(float(confidence))
class_ids.append(class_id)
# Apply non-maximum suppression
indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confidence_threshold, 0.4)
if len(indices) > 0:
for i in indices.flatten():
try:
(x, y, w, h) = boxes[i]
# Ensure coordinates are within frame bounds
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
color = [int(c) for c in self.colors[class_ids[i]]]
cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
# Draw label with background
text = f"{self.classes[class_ids[i]]}: {confidences[i]:.2f}"
font_scale = 0.5
font = cv2.FONT_HERSHEY_SIMPLEX
thickness = 1
(text_w, text_h), baseline = cv2.getTextSize(text, font, font_scale, thickness)
# Draw background rectangle for text
cv2.rectangle(frame, (x, y - text_h - baseline - 5), (x + text_w, y), color, -1)
# Draw text
cv2.putText(frame, text, (x, y - 5), font, font_scale, (255, 255, 255), thickness)
except Exception as e:
print(f"Error drawing detection {i}: {e}")
continue
return frame
except Exception as e:
print(f"Detection error: {e}")
return frame
class CameraStream:
def __init__(self, camera_id, detector):
self.camera_id = camera_id
self.cap = None
self.frame_queue = queue.Queue(maxsize=10)
self.running = False
self.thread = None
self.lock = threading.Lock()
self.detector = detector
self.is_network_camera = isinstance(camera_id, str) and camera_id.startswith('net:')
self.last_frame_time = time.time()
self.frame_timeout = 5.0 # Timeout after 5 seconds without frames
self.reconnect_interval = 5.0 # Try to reconnect every 5 seconds
self.last_reconnect_attempt = 0
self.connection_retries = 0
self.max_retries = 3
def start(self):
"""Start the camera stream with improved error handling"""
if self.running:
return False
try:
if self.is_network_camera:
# Handle network camera
name = self.camera_id[4:] # Remove 'net:' prefix
camera_info = camera_manager.network_cameras.get(name)
if not camera_info:
raise Exception(f"Network camera {name} not found")
if isinstance(camera_info, dict):
url = camera_info['url']
# Handle DroidCam URL formatting
if ':4747' in url and not url.endswith('/video'):
url = url.rstrip('/') + '/video'
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
if 'username' in camera_info and 'password' in camera_info:
parsed = urllib.parse.urlparse(url)
netloc = f"{camera_info['username']}:{camera_info['password']}@{parsed.netloc}"
url = parsed._replace(netloc=netloc).geturl()
else:
url = camera_info
logger.info(f"Connecting to network camera: {url}")
self.cap = cv2.VideoCapture(url)
# Wait for connection
retry_count = 0
while not self.cap.isOpened() and retry_count < 3:
time.sleep(1)
retry_count += 1
self.cap.open(url)
if not self.cap.isOpened():
raise Exception(f"Failed to connect to network camera at {url}")
else:
# Handle local camera
self.cap = cv2.VideoCapture(self.camera_id)
if not self.cap.isOpened():
raise Exception(f"Failed to open camera {self.camera_id}")
# Set camera properties
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
self.cap.set(cv2.CAP_PROP_FPS, 30)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.running = True
self.connection_retries = 0
self.thread = threading.Thread(target=self._capture_loop)
self.thread.daemon = True
self.thread.start()
return True
except Exception as e:
logger.error(f"Error starting camera {self.camera_id}: {e}")
if self.cap:
self.cap.release()
self.cap = None
self.connection_retries += 1
return False
def stop(self):
"""Stop the camera stream"""
self.running = False
if self.thread:
self.thread.join()
if self.cap:
self.cap.release()
self.cap = None
while not self.frame_queue.empty():
try:
self.frame_queue.get_nowait()
except queue.Empty:
break
def _capture_loop(self):
"""Main capture loop with improved error handling and reconnection"""
while self.running:
try:
if not self.cap or not self.cap.isOpened():
current_time = time.time()
if current_time - self.last_reconnect_attempt >= self.reconnect_interval:
if self.connection_retries < self.max_retries:
logger.info(f"Attempting to reconnect camera {self.camera_id}")
self.last_reconnect_attempt = current_time
if self.start():
logger.info(f"Successfully reconnected camera {self.camera_id}")
else:
logger.warning(f"Failed to reconnect camera {self.camera_id}")
else:
logger.error(f"Max reconnection attempts reached for camera {self.camera_id}")
self.running = False
break
time.sleep(1)
continue
ret, frame = self.cap.read()
if not ret:
current_time = time.time()
if current_time - self.last_frame_time > self.frame_timeout:
logger.warning(f"No frames received from camera {self.camera_id} for {self.frame_timeout} seconds")
self.cap.release()
self.cap = None
time.sleep(0.1)
continue
self.last_frame_time = time.time()
# Apply object detection
if self.detector and self.detector.net is not None:
frame = self.detector.detect(frame)
# Convert to JPEG
_, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
# Update queue
try:
self.frame_queue.put_nowait(jpeg.tobytes())
except queue.Full:
try:
self.frame_queue.get_nowait()
self.frame_queue.put_nowait(jpeg.tobytes())
except queue.Empty:
pass
except Exception as e:
logger.error(f"Error in capture loop for camera {self.camera_id}: {e}")
if self.cap:
self.cap.release()
self.cap = None
time.sleep(1)
def get_frame(self):
"""Get the latest frame"""
try:
return self.frame_queue.get_nowait()
except queue.Empty:
return None
class CameraManager:
def __init__(self):
self.cameras = {}
self.network_cameras = {}
self.lock = threading.Lock()
self.detector = YOLODetector()
# Auto-scan for model directory
model_dir = os.getenv('YOLO_MODEL_DIR')
if not model_dir or not os.path.exists(model_dir):
model_dir = find_yolo_model()
if model_dir:
print(f"Found YOLO model in directory: {model_dir}")
self.detector.load_yolo_model(model_dir)
else:
print("No YOLO model found in current directory")
def add_camera(self, camera_id):
"""Add a camera to the manager"""
with self.lock:
if camera_id not in self.cameras:
camera = CameraStream(camera_id, self.detector)
if camera.start():
self.cameras[camera_id] = camera
return True
return False
def remove_camera(self, camera_id):
"""Remove a camera from the manager"""
with self.lock:
if camera_id in self.cameras:
self.cameras[camera_id].stop()
del self.cameras[camera_id]
def get_camera(self, camera_id):
"""Get a camera by ID"""
return self.cameras.get(camera_id)
def get_all_cameras(self):
"""Get list of all camera IDs"""
return list(self.cameras.keys())
def add_network_camera(self, name, url, username=None, password=None):
"""Add a network camera"""
camera_info = {
'url': url,
'username': username,
'password': password
} if username and password else url
self.network_cameras[name] = camera_info
return True
def remove_network_camera(self, name):
"""Remove a network camera"""
if name in self.network_cameras:
camera_id = f"net:{name}"
if camera_id in self.cameras:
self.remove_camera(camera_id)
del self.network_cameras[name]
return True
return False
camera_manager = CameraManager()
def gen_frames(camera_id):
"""Generator function for camera frames"""
camera = camera_manager.get_camera(camera_id)
if not camera:
return
while True:
frame = camera.get_frame()
if frame is not None:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
else:
time.sleep(0.01)
@app.route('/')
def index():
"""Serve the main page"""
return render_template('index.html')
@app.route('/video_feed/<path:camera_id>')
def video_feed(camera_id):
"""Video streaming route"""
# Handle both local and network cameras
if camera_id.startswith('net:'):
camera_id = camera_id # Keep as string for network cameras
else:
try:
camera_id = int(camera_id) # Convert to int for local cameras
except ValueError:
camera_id = camera_id # Keep as string if not convertible
return Response(gen_frames(camera_id),
mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/scan_cameras')
def scan_cameras_route():
"""Scan and return available cameras"""
cameras = scan_cameras_silently()
return jsonify({'cameras': cameras})
@app.route('/cameras')
def get_cameras():
"""Get list of available cameras"""
# Get local cameras
cameras = scan_cameras_silently()
# Add network cameras
for name, info in camera_manager.network_cameras.items():
url = info['url'] if isinstance(info, dict) else info
cameras.append({
'id': f'net:{name}',
'type': 'network',
'name': f'{name} ({url})',
'width': 1280, # Default width for network cameras
'height': 720 # Default height for network cameras
})
# Add status information for active cameras
for camera in cameras:
camera_stream = camera_manager.get_camera(camera['id'])
if camera_stream:
camera['active'] = True
camera['status'] = 'connected' if camera_stream.cap and camera_stream.cap.isOpened() else 'reconnecting'
else:
camera['active'] = False
camera['status'] = 'disconnected'
return jsonify({'cameras': cameras})
@app.route('/add_camera/<path:camera_id>')
def add_camera(camera_id):
"""Add a camera to the stream"""
if camera_id.startswith('net:'):
camera_id = camera_id # Keep as string for network cameras
else:
try:
camera_id = int(camera_id) # Convert to int for local cameras
except ValueError:
camera_id = camera_id # Keep as string if not convertible
success = camera_manager.add_camera(camera_id)
return jsonify({'success': success})
@app.route('/remove_camera/<path:camera_id>')
def remove_camera(camera_id):
"""Remove a camera from the stream"""
camera_manager.remove_camera(camera_id)
return jsonify({'success': True})
@app.route('/network_cameras', methods=['POST'])
def add_network_camera():
"""Add a network camera"""
data = request.json
name = data.get('name')
url = data.get('url')
username = data.get('username')
password = data.get('password')
if not name or not url:
return jsonify({'success': False, 'error': 'Name and URL required'})
try:
success = camera_manager.add_network_camera(name, url, username, password)
if success:
# Try to connect to the camera to verify it works
camera_id = f"net:{name}"
test_success = camera_manager.add_camera(camera_id)
if test_success:
camera_manager.remove_camera(camera_id) # Remove test connection
else:
camera_manager.remove_network_camera(name)
return jsonify({'success': False, 'error': 'Failed to connect to camera'})
return jsonify({'success': success})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/load_model', methods=['POST'])
def load_model():
"""Load YOLO model"""
data = request.json
model_dir = data.get('model_dir')
if not model_dir or not os.path.exists(model_dir):
return jsonify({'success': False, 'error': 'Invalid model directory'})
success = camera_manager.detector.load_yolo_model(model_dir)
return jsonify({'success': success})
@app.route('/check_model')
def check_model():
"""Check YOLO model status"""
return jsonify({
'model_loaded': camera_manager.detector.model_loaded,
'model_name': camera_manager.detector.current_model,
'cuda_available': camera_manager.detector.cuda_available
})
@app.route('/model_info')
def model_info():
"""Get detailed model information"""
detector = camera_manager.detector
return jsonify({
'model_loaded': detector.model_loaded,
'model_name': detector.current_model,
'cuda_available': detector.cuda_available,
'classes': detector.classes,
'confidence_threshold': detector.confidence_threshold
})
@app.route('/update_model_settings', methods=['POST'])
def update_model_settings():
"""Update model settings"""
data = request.json
if 'confidence_threshold' in data:
try:
threshold = float(data['confidence_threshold'])
if 0 <= threshold <= 1:
camera_manager.detector.confidence_threshold = threshold
return jsonify({'success': True})
except ValueError:
pass
return jsonify({'success': False, 'error': 'Invalid settings'})
@app.route('/network_cameras/list')
def list_network_cameras():
"""List all configured network cameras"""
cameras = []
for name, info in camera_manager.network_cameras.items():
if isinstance(info, dict):
cameras.append({
'name': name,
'url': info['url'],
'username': info.get('username'),
'has_auth': bool(info.get('username') and info.get('password'))
})
else:
cameras.append({
'name': name,
'url': info,
'has_auth': False
})
return jsonify({'cameras': cameras})
@app.route('/network_cameras/remove/<name>', methods=['POST'])
def remove_network_camera(name):
"""Remove a network camera"""
success = camera_manager.remove_network_camera(name)
return jsonify({'success': success})
@app.route('/camera_settings/<path:camera_id>', methods=['GET', 'POST'])
def camera_settings(camera_id):
"""Get or update camera settings"""
camera = camera_manager.get_camera(camera_id)
if not camera:
return jsonify({'success': False, 'error': 'Camera not found'})
if request.method == 'POST':
data = request.json
try:
if 'width' in data and 'height' in data:
width = int(data['width'])
height = int(data['height'])
if camera.cap:
camera.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
camera.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
if 'fps' in data:
fps = int(data['fps'])
if camera.cap:
camera.cap.set(cv2.CAP_PROP_FPS, fps)
if 'reconnect' in data and data['reconnect']:
# Force camera reconnection
camera.stop()
time.sleep(1) # Wait for cleanup
camera.start()
return jsonify({'success': True})
except Exception as e:
logger.error(f"Error updating camera settings: {e}")
return jsonify({'success': False, 'error': str(e)})
else:
if not camera.cap:
return jsonify({'success': False, 'error': 'Camera not connected'})
settings = {
'width': int(camera.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
'height': int(camera.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
'fps': int(camera.cap.get(cv2.CAP_PROP_FPS)),
'is_network': camera.is_network_camera,
'status': 'connected' if camera.cap and camera.cap.isOpened() else 'disconnected'
}
return jsonify({'success': True, 'settings': settings})
# Reduce camera scanning noise
def scan_cameras_silently():
"""Scan for cameras while suppressing OpenCV warnings"""
import contextlib
with open(os.devnull, 'w') as devnull:
with contextlib.redirect_stderr(devnull):
cameras = []
# Check device paths first
for i in range(10):
device_path = f"/dev/video{i}"
if os.path.exists(device_path):
try:
cap = cv2.VideoCapture(device_path)
if cap.isOpened():
cameras.append({
'id': device_path,
'type': 'local',
'name': f'Camera {i} ({device_path})'
})
cap.release()
except Exception as e:
logger.debug(f"Error checking device {device_path}: {e}")
# Check numeric indices
for i in range(2): # Only check first two indices to reduce noise
try:
cap = cv2.VideoCapture(i)
if cap.isOpened():
cameras.append({
'id': str(i),
'type': 'local',
'name': f'Camera {i}'
})
cap.release()
except Exception as e:
logger.debug(f"Error checking camera {i}: {e}")
return cameras
if __name__ == '__main__':
# Create templates directory if it doesn't exist
os.makedirs('templates', exist_ok=True)
# Load model from environment variable if available
model_dir = os.getenv('YOLO_MODEL_DIR')
if model_dir and os.path.exists(model_dir):
camera_manager.detector.load_yolo_model(model_dir)
# Check if running with Gunicorn
if os.environ.get('GUNICORN_CMD_ARGS') is not None:
# Running with Gunicorn, let it handle the server
pass
else:
# Development server warning
logger.warning("Running in development mode. Use Gunicorn for production!")
app.run(host='0.0.0.0', port=5000, threaded=True)