ты мать эта говно и пидарас
This commit is contained in:
655
web_server.py
Normal file
655
web_server.py
Normal file
@@ -0,0 +1,655 @@
|
||||
import os
|
||||
import cv2
|
||||
import json
|
||||
import numpy as np
|
||||
from flask import Flask, Response, render_template, jsonify, request
|
||||
from flask_cors import CORS
|
||||
import threading
|
||||
import time
|
||||
import queue
|
||||
import urllib.parse
|
||||
import glob
|
||||
import logging
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
app = Flask(__name__)
|
||||
CORS(app) # Enable CORS for all routes
|
||||
|
||||
def find_yolo_model():
|
||||
"""Scan current directory and subdirectories for YOLO model files"""
|
||||
# Look for common YOLO file patterns
|
||||
weights_files = glob.glob('**/*.weights', recursive=True) + glob.glob('**/*.onnx', recursive=True)
|
||||
cfg_files = glob.glob('**/*.cfg', recursive=True)
|
||||
names_files = glob.glob('**/*.names', recursive=True)
|
||||
|
||||
# Find directories containing all required files
|
||||
model_dirs = set()
|
||||
for weights in weights_files:
|
||||
directory = os.path.dirname(weights)
|
||||
if not directory:
|
||||
directory = '.'
|
||||
|
||||
# Check if this directory has all required files
|
||||
has_cfg = any(cfg for cfg in cfg_files if os.path.dirname(cfg) == directory)
|
||||
has_names = any(names for names in names_files if os.path.dirname(names) == directory)
|
||||
|
||||
if has_cfg and has_names:
|
||||
model_dirs.add(directory)
|
||||
|
||||
# Return the first valid directory found, or None
|
||||
return next(iter(model_dirs), None)
|
||||
|
||||
class YOLODetector:
|
||||
def __init__(self):
|
||||
self.net = None
|
||||
self.classes = []
|
||||
self.colors = []
|
||||
self.confidence_threshold = 0.35
|
||||
self.cuda_available = self.check_cuda()
|
||||
self.model_loaded = False
|
||||
self.current_model = None
|
||||
|
||||
def check_cuda(self):
|
||||
"""Check if CUDA is available"""
|
||||
try:
|
||||
count = cv2.cuda.getCudaEnabledDeviceCount()
|
||||
return count > 0
|
||||
except:
|
||||
return False
|
||||
|
||||
def scan_for_model(self):
|
||||
"""Auto-scan for YOLO model files in current directory"""
|
||||
try:
|
||||
# Look for model files in current directory
|
||||
weights = [f for f in os.listdir('.') if f.endswith(('.weights', '.onnx'))]
|
||||
configs = [f for f in os.listdir('.') if f.endswith('.cfg')]
|
||||
classes = [f for f in os.listdir('.') if f.endswith('.names')]
|
||||
|
||||
if weights and configs and classes:
|
||||
self.load_yolo_model('.', weights[0], configs[0], classes[0])
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Error scanning for model: {e}")
|
||||
return False
|
||||
|
||||
def load_yolo_model(self, model_dir, weights_file=None, config_file=None, classes_file=None):
|
||||
"""Load YOLO model with specified files or auto-detect"""
|
||||
try:
|
||||
if not weights_file:
|
||||
weights = [f for f in os.listdir(model_dir) if f.endswith(('.weights', '.onnx'))]
|
||||
configs = [f for f in os.listdir(model_dir) if f.endswith('.cfg')]
|
||||
classes = [f for f in os.listdir(model_dir) if f.endswith('.names')]
|
||||
|
||||
if not (weights and configs and classes):
|
||||
return False
|
||||
|
||||
weights_file = weights[0]
|
||||
config_file = configs[0]
|
||||
classes_file = classes[0]
|
||||
|
||||
weights_path = os.path.join(model_dir, weights_file)
|
||||
config_path = os.path.join(model_dir, config_file)
|
||||
classes_path = os.path.join(model_dir, classes_file)
|
||||
|
||||
self.net = cv2.dnn.readNet(weights_path, config_path)
|
||||
self.current_model = weights_file
|
||||
|
||||
if self.cuda_available:
|
||||
try:
|
||||
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
|
||||
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
|
||||
except:
|
||||
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
|
||||
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
|
||||
|
||||
with open(classes_path, 'r') as f:
|
||||
self.classes = f.read().strip().split('\n')
|
||||
|
||||
np.random.seed(42)
|
||||
self.colors = np.random.randint(0, 255, size=(len(self.classes), 3), dtype='uint8')
|
||||
self.model_loaded = True
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error loading model: {e}")
|
||||
self.model_loaded = False
|
||||
return False
|
||||
|
||||
def get_camera_resolution(self, cap):
|
||||
"""Get the optimal resolution for a camera"""
|
||||
try:
|
||||
# Common resolutions to try
|
||||
resolutions = [
|
||||
(1920, 1080), # Full HD
|
||||
(1280, 720), # HD
|
||||
(800, 600), # SVGA
|
||||
(640, 480) # VGA
|
||||
]
|
||||
|
||||
best_width = 640
|
||||
best_height = 480
|
||||
|
||||
for width, height in resolutions:
|
||||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
||||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
||||
actual_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
|
||||
actual_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
||||
|
||||
if actual_width > 0 and actual_height > 0:
|
||||
best_width = actual_width
|
||||
best_height = actual_height
|
||||
break
|
||||
|
||||
return int(best_width), int(best_height)
|
||||
except:
|
||||
return 640, 480
|
||||
|
||||
def scan_cameras(self):
|
||||
"""Scan for available cameras, skipping video0"""
|
||||
cameras = []
|
||||
|
||||
# Start from video1 since video0 is often empty or system camera
|
||||
for i in range(1, 10):
|
||||
try:
|
||||
cap = cv2.VideoCapture(i)
|
||||
if cap.isOpened():
|
||||
# Get optimal resolution
|
||||
width, height = self.get_camera_resolution(cap)
|
||||
cameras.append({
|
||||
'id': i,
|
||||
'width': width,
|
||||
'height': height
|
||||
})
|
||||
cap.release()
|
||||
except:
|
||||
continue
|
||||
|
||||
# Check device paths
|
||||
for i in range(1, 10):
|
||||
path = f"/dev/video{i}"
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
cap = cv2.VideoCapture(path)
|
||||
if cap.isOpened():
|
||||
width, height = self.get_camera_resolution(cap)
|
||||
cameras.append({
|
||||
'id': path,
|
||||
'width': width,
|
||||
'height': height
|
||||
})
|
||||
cap.release()
|
||||
except:
|
||||
continue
|
||||
|
||||
return cameras
|
||||
|
||||
def detect(self, frame):
|
||||
"""Perform object detection on frame"""
|
||||
if self.net is None or not self.model_loaded:
|
||||
return frame
|
||||
|
||||
try:
|
||||
height, width = frame.shape[:2]
|
||||
blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), swapRB=True, crop=False)
|
||||
self.net.setInput(blob)
|
||||
|
||||
try:
|
||||
layer_names = self.net.getLayerNames()
|
||||
output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()]
|
||||
except:
|
||||
output_layers = self.net.getUnconnectedOutLayersNames()
|
||||
|
||||
outputs = self.net.forward(output_layers)
|
||||
|
||||
# Process detections
|
||||
boxes = []
|
||||
confidences = []
|
||||
class_ids = []
|
||||
|
||||
for output in outputs:
|
||||
for detection in output:
|
||||
scores = detection[5:]
|
||||
class_id = np.argmax(scores)
|
||||
confidence = scores[class_id]
|
||||
|
||||
if confidence > self.confidence_threshold:
|
||||
# Convert YOLO coords to screen coords
|
||||
center_x = int(detection[0] * width)
|
||||
center_y = int(detection[1] * height)
|
||||
w = int(detection[2] * width)
|
||||
h = int(detection[3] * height)
|
||||
|
||||
# Get top-left corner
|
||||
x = max(0, int(center_x - w/2))
|
||||
y = max(0, int(center_y - h/2))
|
||||
|
||||
boxes.append([x, y, w, h])
|
||||
confidences.append(float(confidence))
|
||||
class_ids.append(class_id)
|
||||
|
||||
# Apply non-maximum suppression
|
||||
indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confidence_threshold, 0.4)
|
||||
|
||||
if len(indices) > 0:
|
||||
for i in indices.flatten():
|
||||
try:
|
||||
(x, y, w, h) = boxes[i]
|
||||
# Ensure coordinates are within frame bounds
|
||||
x = max(0, min(x, width - 1))
|
||||
y = max(0, min(y, height - 1))
|
||||
w = min(w, width - x)
|
||||
h = min(h, height - y)
|
||||
|
||||
color = [int(c) for c in self.colors[class_ids[i]]]
|
||||
cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
|
||||
|
||||
# Draw label with background
|
||||
text = f"{self.classes[class_ids[i]]}: {confidences[i]:.2f}"
|
||||
font_scale = 0.5
|
||||
font = cv2.FONT_HERSHEY_SIMPLEX
|
||||
thickness = 1
|
||||
(text_w, text_h), baseline = cv2.getTextSize(text, font, font_scale, thickness)
|
||||
|
||||
# Draw background rectangle for text
|
||||
cv2.rectangle(frame, (x, y - text_h - baseline - 5), (x + text_w, y), color, -1)
|
||||
# Draw text
|
||||
cv2.putText(frame, text, (x, y - 5), font, font_scale, (255, 255, 255), thickness)
|
||||
except Exception as e:
|
||||
print(f"Error drawing detection {i}: {e}")
|
||||
continue
|
||||
|
||||
return frame
|
||||
|
||||
except Exception as e:
|
||||
print(f"Detection error: {e}")
|
||||
return frame
|
||||
|
||||
class CameraStream:
|
||||
def __init__(self, camera_id, detector):
|
||||
self.camera_id = camera_id
|
||||
self.cap = None
|
||||
self.frame_queue = queue.Queue(maxsize=10)
|
||||
self.running = False
|
||||
self.thread = None
|
||||
self.lock = threading.Lock()
|
||||
self.detector = detector
|
||||
self.is_network_camera = isinstance(camera_id, str) and camera_id.startswith('net:')
|
||||
self.last_frame_time = time.time()
|
||||
self.frame_timeout = 5.0 # Timeout after 5 seconds without frames
|
||||
self.reconnect_interval = 5.0 # Try to reconnect every 5 seconds
|
||||
self.last_reconnect_attempt = 0
|
||||
|
||||
def start(self):
|
||||
"""Start the camera stream"""
|
||||
if self.running:
|
||||
return
|
||||
|
||||
try:
|
||||
if self.is_network_camera:
|
||||
# Handle network camera
|
||||
name = self.camera_id[4:] # Remove 'net:' prefix
|
||||
camera_info = camera_manager.network_cameras.get(name)
|
||||
if not camera_info:
|
||||
raise Exception(f"Network camera {name} not found")
|
||||
|
||||
if isinstance(camera_info, dict):
|
||||
url = camera_info['url']
|
||||
# Handle DroidCam URL formatting
|
||||
if ':4747' in url and not url.endswith('/video'):
|
||||
url = url.rstrip('/') + '/video'
|
||||
if not url.startswith(('http://', 'https://')):
|
||||
url = 'http://' + url
|
||||
|
||||
if 'username' in camera_info and 'password' in camera_info:
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
netloc = f"{camera_info['username']}:{camera_info['password']}@{parsed.netloc}"
|
||||
url = parsed._replace(netloc=netloc).geturl()
|
||||
else:
|
||||
url = camera_info
|
||||
|
||||
logger.info(f"Connecting to network camera: {url}")
|
||||
self.cap = cv2.VideoCapture(url)
|
||||
else:
|
||||
# Handle local camera
|
||||
self.cap = cv2.VideoCapture(self.camera_id)
|
||||
|
||||
if not self.cap.isOpened():
|
||||
raise Exception(f"Failed to open camera {self.camera_id}")
|
||||
|
||||
# Set camera properties
|
||||
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
|
||||
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
|
||||
self.cap.set(cv2.CAP_PROP_FPS, 30)
|
||||
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
||||
|
||||
self.running = True
|
||||
self.thread = threading.Thread(target=self._capture_loop)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting camera {self.camera_id}: {e}")
|
||||
if self.cap:
|
||||
self.cap.release()
|
||||
self.cap = None
|
||||
return False
|
||||
|
||||
def stop(self):
|
||||
"""Stop the camera stream"""
|
||||
self.running = False
|
||||
if self.thread:
|
||||
self.thread.join()
|
||||
if self.cap:
|
||||
self.cap.release()
|
||||
self.cap = None
|
||||
|
||||
while not self.frame_queue.empty():
|
||||
try:
|
||||
self.frame_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
def _capture_loop(self):
|
||||
"""Main capture loop with automatic reconnection"""
|
||||
while self.running:
|
||||
try:
|
||||
if not self.cap or not self.cap.isOpened():
|
||||
current_time = time.time()
|
||||
if current_time - self.last_reconnect_attempt >= self.reconnect_interval:
|
||||
logger.info(f"Attempting to reconnect camera {self.camera_id}")
|
||||
self.last_reconnect_attempt = current_time
|
||||
self.start()
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
ret, frame = self.cap.read()
|
||||
if not ret:
|
||||
current_time = time.time()
|
||||
if current_time - self.last_frame_time > self.frame_timeout:
|
||||
logger.warning(f"No frames received from camera {self.camera_id} for {self.frame_timeout} seconds")
|
||||
self.cap.release()
|
||||
self.cap = None
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
self.last_frame_time = time.time()
|
||||
|
||||
# Apply object detection
|
||||
if self.detector and self.detector.net is not None:
|
||||
frame = self.detector.detect(frame)
|
||||
|
||||
# Convert to JPEG
|
||||
_, jpeg = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
||||
|
||||
# Update queue
|
||||
try:
|
||||
self.frame_queue.put_nowait(jpeg.tobytes())
|
||||
except queue.Full:
|
||||
try:
|
||||
self.frame_queue.get_nowait()
|
||||
self.frame_queue.put_nowait(jpeg.tobytes())
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in capture loop for camera {self.camera_id}: {e}")
|
||||
if self.cap:
|
||||
self.cap.release()
|
||||
self.cap = None
|
||||
time.sleep(1)
|
||||
|
||||
def get_frame(self):
|
||||
"""Get the latest frame"""
|
||||
try:
|
||||
return self.frame_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
return None
|
||||
|
||||
class CameraManager:
|
||||
def __init__(self):
|
||||
self.cameras = {}
|
||||
self.network_cameras = {}
|
||||
self.lock = threading.Lock()
|
||||
self.detector = YOLODetector()
|
||||
|
||||
# Auto-scan for model directory
|
||||
model_dir = os.getenv('YOLO_MODEL_DIR')
|
||||
if not model_dir or not os.path.exists(model_dir):
|
||||
model_dir = find_yolo_model()
|
||||
|
||||
if model_dir:
|
||||
print(f"Found YOLO model in directory: {model_dir}")
|
||||
self.detector.load_yolo_model(model_dir)
|
||||
else:
|
||||
print("No YOLO model found in current directory")
|
||||
|
||||
def add_camera(self, camera_id):
|
||||
"""Add a camera to the manager"""
|
||||
with self.lock:
|
||||
if camera_id not in self.cameras:
|
||||
camera = CameraStream(camera_id, self.detector)
|
||||
if camera.start():
|
||||
self.cameras[camera_id] = camera
|
||||
return True
|
||||
return False
|
||||
|
||||
def remove_camera(self, camera_id):
|
||||
"""Remove a camera from the manager"""
|
||||
with self.lock:
|
||||
if camera_id in self.cameras:
|
||||
self.cameras[camera_id].stop()
|
||||
del self.cameras[camera_id]
|
||||
|
||||
def get_camera(self, camera_id):
|
||||
"""Get a camera by ID"""
|
||||
return self.cameras.get(camera_id)
|
||||
|
||||
def get_all_cameras(self):
|
||||
"""Get list of all camera IDs"""
|
||||
return list(self.cameras.keys())
|
||||
|
||||
def add_network_camera(self, name, url, username=None, password=None):
|
||||
"""Add a network camera"""
|
||||
camera_info = {
|
||||
'url': url,
|
||||
'username': username,
|
||||
'password': password
|
||||
} if username and password else url
|
||||
|
||||
self.network_cameras[name] = camera_info
|
||||
return True
|
||||
|
||||
def remove_network_camera(self, name):
|
||||
"""Remove a network camera"""
|
||||
if name in self.network_cameras:
|
||||
camera_id = f"net:{name}"
|
||||
if camera_id in self.cameras:
|
||||
self.remove_camera(camera_id)
|
||||
del self.network_cameras[name]
|
||||
return True
|
||||
return False
|
||||
|
||||
camera_manager = CameraManager()
|
||||
|
||||
def gen_frames(camera_id):
|
||||
"""Generator function for camera frames"""
|
||||
camera = camera_manager.get_camera(camera_id)
|
||||
if not camera:
|
||||
return
|
||||
|
||||
while True:
|
||||
frame = camera.get_frame()
|
||||
if frame is not None:
|
||||
yield (b'--frame\r\n'
|
||||
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
|
||||
else:
|
||||
time.sleep(0.01)
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
"""Serve the main page"""
|
||||
return render_template('index.html')
|
||||
|
||||
@app.route('/video_feed/<path:camera_id>')
|
||||
def video_feed(camera_id):
|
||||
"""Video streaming route"""
|
||||
# Handle both local and network cameras
|
||||
if camera_id.startswith('net:'):
|
||||
camera_id = camera_id # Keep as string for network cameras
|
||||
else:
|
||||
try:
|
||||
camera_id = int(camera_id) # Convert to int for local cameras
|
||||
except ValueError:
|
||||
camera_id = camera_id # Keep as string if not convertible
|
||||
|
||||
return Response(gen_frames(camera_id),
|
||||
mimetype='multipart/x-mixed-replace; boundary=frame')
|
||||
|
||||
@app.route('/cameras')
|
||||
def get_cameras():
|
||||
"""Get list of available cameras"""
|
||||
# Scan for local cameras silently
|
||||
cameras = scan_cameras_silently()
|
||||
|
||||
# Add network cameras
|
||||
for name, info in camera_manager.network_cameras.items():
|
||||
url = info['url'] if isinstance(info, dict) else info
|
||||
cameras.append({
|
||||
'id': f'net:{name}',
|
||||
'type': 'network',
|
||||
'name': f'{name} ({url})'
|
||||
})
|
||||
|
||||
# Add status information for active cameras
|
||||
for camera in cameras:
|
||||
camera_stream = camera_manager.get_camera(camera['id'])
|
||||
if camera_stream:
|
||||
camera['active'] = True
|
||||
camera['status'] = 'connected' if camera_stream.cap and camera_stream.cap.isOpened() else 'reconnecting'
|
||||
else:
|
||||
camera['active'] = False
|
||||
camera['status'] = 'disconnected'
|
||||
|
||||
return jsonify(cameras)
|
||||
|
||||
@app.route('/add_camera/<path:camera_id>')
|
||||
def add_camera(camera_id):
|
||||
"""Add a camera to the stream"""
|
||||
if camera_id.startswith('net:'):
|
||||
camera_id = camera_id # Keep as string for network cameras
|
||||
else:
|
||||
try:
|
||||
camera_id = int(camera_id) # Convert to int for local cameras
|
||||
except ValueError:
|
||||
camera_id = camera_id # Keep as string if not convertible
|
||||
|
||||
success = camera_manager.add_camera(camera_id)
|
||||
return jsonify({'success': success})
|
||||
|
||||
@app.route('/remove_camera/<path:camera_id>')
|
||||
def remove_camera(camera_id):
|
||||
"""Remove a camera from the stream"""
|
||||
camera_manager.remove_camera(camera_id)
|
||||
return jsonify({'success': True})
|
||||
|
||||
@app.route('/network_cameras', methods=['POST'])
|
||||
def add_network_camera():
|
||||
"""Add a network camera"""
|
||||
data = request.json
|
||||
name = data.get('name')
|
||||
url = data.get('url')
|
||||
username = data.get('username')
|
||||
password = data.get('password')
|
||||
|
||||
if not name or not url:
|
||||
return jsonify({'success': False, 'error': 'Name and URL required'})
|
||||
|
||||
try:
|
||||
success = camera_manager.add_network_camera(name, url, username, password)
|
||||
if success:
|
||||
# Try to connect to the camera to verify it works
|
||||
camera_id = f"net:{name}"
|
||||
test_success = camera_manager.add_camera(camera_id)
|
||||
if test_success:
|
||||
camera_manager.remove_camera(camera_id) # Remove test connection
|
||||
else:
|
||||
camera_manager.remove_network_camera(name)
|
||||
return jsonify({'success': False, 'error': 'Failed to connect to camera'})
|
||||
return jsonify({'success': success})
|
||||
except Exception as e:
|
||||
return jsonify({'success': False, 'error': str(e)})
|
||||
|
||||
@app.route('/load_model', methods=['POST'])
|
||||
def load_model():
|
||||
"""Load YOLO model"""
|
||||
data = request.json
|
||||
model_dir = data.get('model_dir')
|
||||
|
||||
if not model_dir or not os.path.exists(model_dir):
|
||||
return jsonify({'success': False, 'error': 'Invalid model directory'})
|
||||
|
||||
success = camera_manager.detector.load_yolo_model(model_dir)
|
||||
return jsonify({'success': success})
|
||||
|
||||
# Reduce camera scanning noise
|
||||
def scan_cameras_silently():
|
||||
"""Scan for cameras while suppressing OpenCV warnings"""
|
||||
import contextlib
|
||||
with open(os.devnull, 'w') as devnull:
|
||||
with contextlib.redirect_stderr(devnull):
|
||||
cameras = []
|
||||
# Check device paths first
|
||||
for i in range(10):
|
||||
device_path = f"/dev/video{i}"
|
||||
if os.path.exists(device_path):
|
||||
try:
|
||||
cap = cv2.VideoCapture(device_path)
|
||||
if cap.isOpened():
|
||||
cameras.append({
|
||||
'id': device_path,
|
||||
'type': 'local',
|
||||
'name': f'Camera {i} ({device_path})'
|
||||
})
|
||||
cap.release()
|
||||
except Exception as e:
|
||||
logger.debug(f"Error checking device {device_path}: {e}")
|
||||
|
||||
# Check numeric indices
|
||||
for i in range(2): # Only check first two indices to reduce noise
|
||||
try:
|
||||
cap = cv2.VideoCapture(i)
|
||||
if cap.isOpened():
|
||||
cameras.append({
|
||||
'id': str(i),
|
||||
'type': 'local',
|
||||
'name': f'Camera {i}'
|
||||
})
|
||||
cap.release()
|
||||
except Exception as e:
|
||||
logger.debug(f"Error checking camera {i}: {e}")
|
||||
|
||||
return cameras
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Create templates directory if it doesn't exist
|
||||
os.makedirs('templates', exist_ok=True)
|
||||
|
||||
# Load model from environment variable if available
|
||||
model_dir = os.getenv('YOLO_MODEL_DIR')
|
||||
if model_dir and os.path.exists(model_dir):
|
||||
camera_manager.detector.load_yolo_model(model_dir)
|
||||
|
||||
# Check if running with Gunicorn
|
||||
if os.environ.get('GUNICORN_CMD_ARGS') is not None:
|
||||
# Running with Gunicorn, let it handle the server
|
||||
pass
|
||||
else:
|
||||
# Development server warning
|
||||
logger.warning("Running in development mode. Use Gunicorn for production!")
|
||||
app.run(host='0.0.0.0', port=5000, threaded=True)
|
||||
Reference in New Issue
Block a user