webrtc working

This commit is contained in:
2025-11-03 21:08:57 +01:00
parent 96335af6ee
commit 417e50983e
4 changed files with 587 additions and 0 deletions

434
mucapy/WebServer.py Normal file
View File

@@ -0,0 +1,434 @@
import asyncio
import logging
import time
import json
import threading
from aiohttp import web
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
from aiortc.contrib.media import MediaBlackhole
from av import VideoFrame
import cv2
import numpy as np
from typing import Dict, Set, Optional, List
import fractions
# Import the CameraThread from your existing code
from CameraThread import CameraThread
from PyQt5.QtCore import QMutex
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
@web.middleware
async def cors_middleware(request, handler):
"""Global CORS middleware to ensure preflight (OPTIONS) and all responses include CORS headers."""
cors_headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type,Authorization',
}
if request.method == 'OPTIONS':
preflight_headers = cors_headers.copy()
preflight_headers.update({
'Access-Control-Max-Age': '3600',
'Access-Control-Allow-Headers': 'Content-Type, Authorization, X-Requested-With',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
})
return web.Response(status=200, headers=preflight_headers)
try:
resp = await handler(request)
if resp is None:
resp = web.Response(status=204)
if isinstance(resp, web.StreamResponse):
for k, v in cors_headers.items():
if k not in resp.headers:
resp.headers[k] = v
return resp
except Exception as e:
logger.error(f"Error in request handler: {e}")
error_resp = web.Response(status=500, text=str(e))
for k, v in cors_headers.items():
error_resp.headers[k] = v
return error_resp
async def favicon_handler(request):
return web.Response(status=204)
class VideoStreamTrackFromFrames(VideoStreamTrack):
def __init__(self, camera_index: int):
super().__init__()
self.camera_index = camera_index
self.frame = None
self.frame_lock = asyncio.Lock()
async def recv(self):
pts, time_base = await self.next_timestamp()
async with self.frame_lock:
if self.frame is None:
# Create a test pattern to verify the stream is working
frame = self._create_test_pattern()
else:
frame = self.frame
# Convert BGR to RGB for WebRTC
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
video_frame = VideoFrame.from_ndarray(frame_rgb, format='rgb24')
video_frame.pts = pts
video_frame.time_base = time_base
return video_frame
def _create_test_pattern(self):
"""Create a test pattern with camera index for debugging"""
height, width = 480, 640
frame = np.zeros((height, width, 3), dtype=np.uint8)
# Add colored background based on camera index
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0)]
color = colors[self.camera_index % len(colors)]
frame[:, :] = color
# Add some text and shapes
cv2.rectangle(frame, (50, 50), (width-50, height-50), (255, 255, 255), 2)
cv2.putText(frame, f"Camera {self.camera_index}", (width//4, height//2),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.putText(frame, "Waiting for frames...", (width//4, height//2 + 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)
cv2.putText(frame, f"Time: {time.strftime('%H:%M:%S')}", (width//4, height//2 + 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return frame
async def set_frame(self, frame):
async with self.frame_lock:
if frame is not None and isinstance(frame, np.ndarray):
# Make sure we have a valid frame with proper dimensions
if len(frame.shape) == 3 and frame.shape[2] == 3:
self.frame = frame.copy()
else:
logger.warning(f"Invalid frame format: {frame.shape if hasattr(frame, 'shape') else 'No shape'}")
self.frame = None
else:
self.frame = None
class WebRTCVideoServer:
def __init__(self, main_window=None):
self.tracks: Dict[int, VideoStreamTrackFromFrames] = {}
self.pcs: Set[RTCPeerConnection] = set()
self.main_window = main_window
self.camera_threads: Dict[int, CameraThread] = {}
self.latest_frames: Dict[int, np.ndarray] = {}
self.frame_lock = QMutex()
self.active_camera_sources: List[str] = []
self.app = web.Application(middlewares=[cors_middleware])
self.runner = None
self.site = None
self.is_running = False
self._loop = None
self._server_thread = None
self._frame_update_task = None
self._setup_routes()
logger.info("WebRTCVideoServer initialized")
def _setup_routes(self):
self.app.router.add_post(r'/offer/index{index:[0-9]+}', self.offer_handler)
self.app.router.add_post(r'/offer/{index:[0-9]+}', self.offer_handler)
self.app.router.add_get('/favicon.ico', favicon_handler)
self.app.router.add_get('/status', self.status_handler)
logger.info("Routes configured")
async def status_handler(self, request):
return web.json_response({
'status': 'running' if self.is_running else 'stopped',
'tracks': len(self.tracks),
'connections': len(self.pcs),
'active_cameras': list(self.tracks.keys()),
'camera_sources': self.active_camera_sources
})
async def offer_handler(self, request):
headers = {}
logger.info(f"New WebRTC offer from {request.remote}")
try:
params = await request.json()
sdp = params.get('sdp')
offer_type = params.get('type')
if not sdp or not offer_type:
return web.json_response({'error': 'Missing "sdp" or "type"'}, status=400, headers=headers)
try:
index = int(request.match_info.get('index', 0))
except Exception:
return web.json_response({'error': 'Invalid camera index'}, status=400, headers=headers)
# Check if this camera index exists
if index >= len(self.active_camera_sources):
return web.json_response({'error': f'Camera index {index} not available'}, status=404, headers=headers)
logger.info(f"WebRTC connection for camera index {index} (Source: {self.active_camera_sources[index]})")
offer = RTCSessionDescription(sdp=sdp, type=offer_type)
pc = RTCPeerConnection()
self.pcs.add(pc)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
logger.info(f"Connection state: {pc.connectionState}")
if pc.connectionState in ("failed", "closed"):
await self.cleanup_peer_connection(pc)
await pc.setRemoteDescription(offer)
# Create or get track for this camera index
if index not in self.tracks:
self.tracks[index] = VideoStreamTrackFromFrames(index)
logger.info(f"Created track for camera index {index}")
track = self.tracks[index]
# Try to attach to existing transceiver first
attached = False
for t in pc.getTransceivers():
if t.kind == 'video':
try:
if t.sender is not None:
await t.sender.replace_track(track)
attached = True
logger.info("Attached track to existing transceiver")
break
except Exception:
continue
if not attached:
try:
pc.addTrack(track)
logger.info('Added track via addTrack fallback')
except Exception as ex:
logger.debug(f'addTrack fallback failed: {ex}')
pc.addTransceiver(track, direction='sendonly')
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
response_data = {
'sdp': pc.localDescription.sdp,
'type': pc.localDescription.type
}
logger.info(f"Sending answer for camera index {index}")
return web.json_response(response_data, headers=headers)
except Exception as e:
logger.exception("Error handling WebRTC offer")
return web.json_response({'error': str(e)}, status=500, headers=headers)
def setup_cameras(self, camera_sources: List[str]):
"""Setup camera threads for WebRTC streaming"""
self.stop_cameras() # Stop any existing cameras
self.active_camera_sources = camera_sources
for i, source in enumerate(camera_sources):
try:
# Create camera thread with unique ID (offset by 1000 to avoid conflicts with main app)
thread_id = i + 1000
thread = CameraThread(thread_id, source, parent=None)
thread.set_fps(15) # Set reasonable FPS for streaming
thread.frame_ready.connect(self._on_frame_ready)
thread.error_occurred.connect(self._on_camera_error)
self.camera_threads[i] = thread
self.latest_frames[i] = None
thread.start()
logger.info(f"WebRTC camera thread started for source {i}: {source}")
except Exception as e:
logger.error(f"Failed to start WebRTC camera thread for source {i}: {e}")
def _on_frame_ready(self, cam_id, frame):
"""Handle frame from camera thread"""
# Adjust camera ID back to our index
actual_index = cam_id - 1000
if 0 <= actual_index < len(self.active_camera_sources):
self.frame_lock.lock()
try:
self.latest_frames[actual_index] = frame.copy()
finally:
self.frame_lock.unlock()
def _on_camera_error(self, cam_id, message):
"""Handle camera errors"""
actual_index = cam_id - 1000
logger.error(f"WebRTC camera {actual_index} error: {message}")
def get_frame(self, index: int) -> Optional[np.ndarray]:
"""Get frame for specific camera index"""
self.frame_lock.lock()
try:
frame = self.latest_frames.get(index)
if frame is not None:
return frame.copy()
finally:
self.frame_lock.unlock()
return None
def stop_cameras(self):
"""Stop all camera threads"""
for thread in self.camera_threads.values():
try:
thread.stop()
thread.wait(2000) # Wait up to 2 seconds for thread to finish
except Exception as e:
logger.error(f"Error stopping camera thread: {e}")
self.camera_threads.clear()
self.latest_frames.clear()
logger.info("WebRTC camera threads stopped")
async def cleanup_peer_connection(self, pc):
try:
self.pcs.discard(pc)
await pc.close()
logger.info("Cleaned up peer connection")
except Exception as e:
logger.error(f"Error cleaning up peer connection: {e}")
async def start_server(self):
"""Async method to start the server"""
if self.is_running:
logger.info("Server already running")
return
try:
logger.info("Starting WebRTC server...")
self.runner = web.AppRunner(self.app)
await self.runner.setup()
self.site = web.TCPSite(self.runner, '0.0.0.0', 1337)
await self.site.start()
self.is_running = True
# Start frame update loop
self._frame_update_task = asyncio.create_task(self._frame_update_loop())
logger.info("WebRTC server started successfully on port 1337")
except Exception as e:
logger.error(f"Failed to start server: {e}")
self.is_running = False
raise
async def _frame_update_loop(self):
"""Background task to update tracks with frames from camera threads"""
while self.is_running:
try:
# Update each track with its corresponding camera frame
for index in self.tracks.keys():
if index < len(self.active_camera_sources):
frame = self.get_frame(index)
if frame is not None and isinstance(frame, np.ndarray):
if len(frame.shape) == 3 and frame.shape[2] == 3:
# Check if frame is not just a black/empty frame
if frame.mean() > 10:
await self.tracks[index].set_frame(frame)
else:
logger.debug(f"Frame from camera {index} appears to be black/empty")
# Sleep to match camera FPS
await asyncio.sleep(0.033) # ~30 FPS
except Exception as e:
logger.error(f"Error in frame update loop: {e}")
await asyncio.sleep(1) # Wait longer on error
async def stop_server(self):
"""Async method to stop the server"""
logger.info("Stopping WebRTC server...")
self.is_running = False
# Stop frame update task
if self._frame_update_task:
self._frame_update_task.cancel()
try:
await self._frame_update_task
except asyncio.CancelledError:
pass
self._frame_update_task = None
# Stop camera threads
self.stop_cameras()
try:
# Close all peer connections
close_tasks = [self.cleanup_peer_connection(pc) for pc in list(self.pcs)]
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
if self.site:
await self.site.stop()
if self.runner:
await self.runner.cleanup()
# Clear all tracks
self.tracks.clear()
self.active_camera_sources.clear()
logger.info("WebRTC server stopped")
except Exception as e:
logger.error(f"Error stopping server: {e}")
def start(self, camera_sources: List[str] = None):
"""Synchronous start method for main thread"""
if self.is_running:
logger.info("Server already running")
return
# Setup cameras if sources provided
if camera_sources:
self.setup_cameras(camera_sources)
def run_server():
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self.start_server())
self._loop.run_forever()
except Exception as e:
logger.error(f"Server thread error: {e}")
self.is_running = False
finally:
if self._loop.is_running():
self._loop.stop()
self._loop.close()
self._server_thread = threading.Thread(target=run_server, daemon=True)
self._server_thread.start()
# Wait briefly for server to initialize
import time
for _ in range(10):
if self.is_running:
break
time.sleep(0.1)
def stop(self):
"""Synchronous stop method for main thread"""
if not self.is_running or not self._loop:
return
try:
# Schedule stop in server thread's event loop
future = asyncio.run_coroutine_threadsafe(self.stop_server(), self._loop)
future.result(timeout=5)
except Exception as e:
logger.error(f"Error stopping server: {e}")
finally:
self._server_thread = None
self._loop = None

View File

@@ -95,6 +95,9 @@ class MainWindow(QMainWindow):
self.hw_timer.timeout.connect(self.update_hardware_stats)
self.hw_timer.start(1000) # Update every second
self.webrtc_server = None # Will hold the server instance
self.webrtc_enabled = False # WebRTC server state
# Set dark theme style
style_file = getpath.resource_path("styling/mainwindow.qss")
try:
@@ -134,6 +137,44 @@ class MainWindow(QMainWindow):
# For the CPU Styling we have another one so we set this here!
self.sepstyleing = False
def start_webrtc_server(self):
"""Start the WebRTC server if not already running"""
if self.webrtc_server is None:
try:
from WebServer import WebRTCVideoServer # Updated class name
# Get the current camera sources from the detector
camera_sources = []
if hasattr(self.detector, 'cameras'):
camera_sources = self.detector.cameras
# Create and start the server with the same camera sources
self.webrtc_server = WebRTCVideoServer(main_window=self)
self.webrtc_server.start(camera_sources)
print("WebRTC Server started on port 1337")
except Exception as e:
QMessageBox.critical(self, "WebRTC Server Error", f"Failed to start server:\n{e}")
self.toggle_webserver_action.setChecked(False)
self.webrtc_server = None
def stop_webrtc_server(self):
"""Stop the WebRTC server"""
if self.webrtc_server is not None:
try:
self.webrtc_server.stop()
self.webrtc_server = None
print("WebRTC Server stopped")
except Exception as e:
QMessageBox.warning(self, "WebRTC Server Warning", f"Failed to stop server:\n{e}")
def set_webrtc_enabled(self, state: bool):
self.webrtc_enabled = state
if state:
# Only start server if cameras are active
if hasattr(self.detector, 'cameras') and self.detector.cameras:
self.start_webrtc_server()
else:
self.stop_webrtc_server()
def load_saved_settings(self):
"""Load saved settings from configuration"""
@@ -241,6 +282,14 @@ class MainWindow(QMainWindow):
self.toggle_alert_action.triggered.connect(self.set_alert_enabled)
view_menu.addAction(self.toggle_alert_action)
# Add WebServer toggle action
self.toggle_webserver_action = QAction('Enable Web Server', self)
self.toggle_webserver_action.setCheckable(True)
self.toggle_webserver_action.setChecked(False) # Default to off
self.toggle_webserver_action.setStatusTip('Enable WebRTC server for remote viewing')
self.toggle_webserver_action.triggered.connect(self.set_webrtc_enabled)
view_menu.addAction(self.toggle_webserver_action)
# Camera menu
self.camera_menu = menubar.addMenu('Cameras')
@@ -408,6 +457,10 @@ class MainWindow(QMainWindow):
QMessageBox.critical(self, "Error", "Failed to connect to cameras!")
return
# Start WebRTC server if enabled
if self.webrtc_enabled:
self.start_webrtc_server()
# Update UI
self.update_selection_labels()
self.start_btn.setEnabled(False)
@@ -693,6 +746,10 @@ class MainWindow(QMainWindow):
self.timer.stop()
self.detector.disconnect_cameras()
# Stop WebRTC server when cameras stop
if self.webrtc_enabled:
self.stop_webrtc_server()
# Update UI
self.start_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
@@ -1078,6 +1135,12 @@ class MainWindow(QMainWindow):
def closeEvent(self, event):
"""Ensure background workers and timers are stopped to avoid crashes on exit."""
try:
try:
if self.webrtc_server is not None:
self.stop_webrtc_server()
except Exception:
pass
# Stop periodic timers
try:
if hasattr(self, 'timer') and self.timer is not None:

74
tac.html Normal file
View File

@@ -0,0 +1,74 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebRTC Test</title>
</head>
<body>
<h1>WebRTC Test (Camera 0)</h1>
<video id="remoteVideo" autoplay playsinline></video>
<script>
async function startWebRTC() {
const pc = new RTCPeerConnection();
// Request to receive a remote video track from the server.
// Without this recvonly transceiver the generated offer may contain no
// video m= section and the server cannot add a sendonly video track
// (aiortc will fail to compute directions). Adding a recvonly
// transceiver signals the client wants to receive video.
try {
pc.addTransceiver('video', {direction: 'recvonly'});
} catch (e) {
// Some browsers / older APIs may use a different signature; ignore failure
console.debug('addTransceiver failed, continuing:', e);
}
// Create video element
const remoteVideo = document.getElementById('remoteVideo');
pc.ontrack = (event) => {
console.log('ontrack event:', event);
remoteVideo.srcObject = event.streams[0];
// Log track info
const tracks = event.streams[0].getTracks();
tracks.forEach(t => console.log('Remote track:', t.kind, t.readyState, t.enabled));
// Try to play the video programmatically (some browsers require a gesture for autoplay with audio)
remoteVideo.play().then(()=>{
console.log('remoteVideo.play() succeeded');
}).catch(err=>{
console.warn('remoteVideo.play() failed:', err);
});
};
try {
// Create a data channel for testing (optional)
const channel = pc.createDataChannel("testChannel");
channel.onopen = () => console.log("Data channel open");
channel.onmessage = (e) => console.log("Received message:", e.data);
// Create SDP offer
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
// Send offer to server
const response = await fetch('http://localhost:1337/offer/0', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ sdp: offer.sdp, type: offer.type })
});
const answer = await response.json();
console.log("Answer received from server:", answer);
// Set remote description
await pc.setRemoteDescription(answer);
} catch (err) {
console.error("WebRTC error:", err);
}
}
// Start WebRTC on page load
startWebRTC();
</script>
</body>
</html>

16
test_webrtc.py Normal file
View File

@@ -0,0 +1,16 @@
import asyncio
from mucapy.WebServer import VideoWebServer
async def main():
server = VideoWebServer()
await server.start_server()
print("Server started, press Ctrl+C to stop")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nStopping server...")
await server.stop_server()
if __name__ == "__main__":
asyncio.run(main())