From 417e50983e3d9d9d02166e17233890fb8c25e886 Mon Sep 17 00:00:00 2001 From: rattatwinko Date: Mon, 3 Nov 2025 21:08:57 +0100 Subject: [PATCH] webrtc working --- mucapy/WebServer.py | 434 ++++++++++++++++++++++++++++++++++++++++++++ mucapy/main.py | 63 +++++++ tac.html | 74 ++++++++ test_webrtc.py | 16 ++ 4 files changed, 587 insertions(+) create mode 100644 mucapy/WebServer.py create mode 100644 tac.html create mode 100644 test_webrtc.py diff --git a/mucapy/WebServer.py b/mucapy/WebServer.py new file mode 100644 index 0000000..11d3740 --- /dev/null +++ b/mucapy/WebServer.py @@ -0,0 +1,434 @@ +import asyncio +import logging +import time +import json +import threading +from aiohttp import web +from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack +from aiortc.contrib.media import MediaBlackhole +from av import VideoFrame +import cv2 +import numpy as np +from typing import Dict, Set, Optional, List +import fractions + +# Import the CameraThread from your existing code +from CameraThread import CameraThread +from PyQt5.QtCore import QMutex + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +@web.middleware +async def cors_middleware(request, handler): + """Global CORS middleware to ensure preflight (OPTIONS) and all responses include CORS headers.""" + cors_headers = { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET,POST,PUT,DELETE,OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type,Authorization', + } + + if request.method == 'OPTIONS': + preflight_headers = cors_headers.copy() + preflight_headers.update({ + 'Access-Control-Max-Age': '3600', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization, X-Requested-With', + 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', + }) + return web.Response(status=200, headers=preflight_headers) + + try: + resp = await handler(request) + if resp is None: + resp = web.Response(status=204) + if isinstance(resp, web.StreamResponse): + for k, v in cors_headers.items(): + if k not in resp.headers: + resp.headers[k] = v + return resp + except Exception as e: + logger.error(f"Error in request handler: {e}") + error_resp = web.Response(status=500, text=str(e)) + for k, v in cors_headers.items(): + error_resp.headers[k] = v + return error_resp + + +async def favicon_handler(request): + return web.Response(status=204) + + +class VideoStreamTrackFromFrames(VideoStreamTrack): + def __init__(self, camera_index: int): + super().__init__() + self.camera_index = camera_index + self.frame = None + self.frame_lock = asyncio.Lock() + + async def recv(self): + pts, time_base = await self.next_timestamp() + + async with self.frame_lock: + if self.frame is None: + # Create a test pattern to verify the stream is working + frame = self._create_test_pattern() + else: + frame = self.frame + + # Convert BGR to RGB for WebRTC + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + video_frame = VideoFrame.from_ndarray(frame_rgb, format='rgb24') + video_frame.pts = pts + video_frame.time_base = time_base + + return video_frame + + def _create_test_pattern(self): + """Create a test pattern with camera index for debugging""" + height, width = 480, 640 + frame = np.zeros((height, width, 3), dtype=np.uint8) + + # Add colored background based on camera index + colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0)] + color = colors[self.camera_index % len(colors)] + frame[:, :] = color + + # Add some text and shapes + cv2.rectangle(frame, (50, 50), (width-50, height-50), (255, 255, 255), 2) + cv2.putText(frame, f"Camera {self.camera_index}", (width//4, height//2), + cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) + cv2.putText(frame, "Waiting for frames...", (width//4, height//2 + 40), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1) + cv2.putText(frame, f"Time: {time.strftime('%H:%M:%S')}", (width//4, height//2 + 80), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + return frame + + async def set_frame(self, frame): + async with self.frame_lock: + if frame is not None and isinstance(frame, np.ndarray): + # Make sure we have a valid frame with proper dimensions + if len(frame.shape) == 3 and frame.shape[2] == 3: + self.frame = frame.copy() + else: + logger.warning(f"Invalid frame format: {frame.shape if hasattr(frame, 'shape') else 'No shape'}") + self.frame = None + else: + self.frame = None + + +class WebRTCVideoServer: + def __init__(self, main_window=None): + self.tracks: Dict[int, VideoStreamTrackFromFrames] = {} + self.pcs: Set[RTCPeerConnection] = set() + self.main_window = main_window + self.camera_threads: Dict[int, CameraThread] = {} + self.latest_frames: Dict[int, np.ndarray] = {} + self.frame_lock = QMutex() + self.active_camera_sources: List[str] = [] + self.app = web.Application(middlewares=[cors_middleware]) + self.runner = None + self.site = None + self.is_running = False + self._loop = None + self._server_thread = None + self._frame_update_task = None + self._setup_routes() + logger.info("WebRTCVideoServer initialized") + + def _setup_routes(self): + self.app.router.add_post(r'/offer/index{index:[0-9]+}', self.offer_handler) + self.app.router.add_post(r'/offer/{index:[0-9]+}', self.offer_handler) + self.app.router.add_get('/favicon.ico', favicon_handler) + self.app.router.add_get('/status', self.status_handler) + logger.info("Routes configured") + + async def status_handler(self, request): + return web.json_response({ + 'status': 'running' if self.is_running else 'stopped', + 'tracks': len(self.tracks), + 'connections': len(self.pcs), + 'active_cameras': list(self.tracks.keys()), + 'camera_sources': self.active_camera_sources + }) + + async def offer_handler(self, request): + headers = {} + logger.info(f"New WebRTC offer from {request.remote}") + + try: + params = await request.json() + sdp = params.get('sdp') + offer_type = params.get('type') + if not sdp or not offer_type: + return web.json_response({'error': 'Missing "sdp" or "type"'}, status=400, headers=headers) + + try: + index = int(request.match_info.get('index', 0)) + except Exception: + return web.json_response({'error': 'Invalid camera index'}, status=400, headers=headers) + + # Check if this camera index exists + if index >= len(self.active_camera_sources): + return web.json_response({'error': f'Camera index {index} not available'}, status=404, headers=headers) + + logger.info(f"WebRTC connection for camera index {index} (Source: {self.active_camera_sources[index]})") + + offer = RTCSessionDescription(sdp=sdp, type=offer_type) + pc = RTCPeerConnection() + self.pcs.add(pc) + + @pc.on("connectionstatechange") + async def on_connectionstatechange(): + logger.info(f"Connection state: {pc.connectionState}") + if pc.connectionState in ("failed", "closed"): + await self.cleanup_peer_connection(pc) + + await pc.setRemoteDescription(offer) + + # Create or get track for this camera index + if index not in self.tracks: + self.tracks[index] = VideoStreamTrackFromFrames(index) + logger.info(f"Created track for camera index {index}") + + track = self.tracks[index] + + # Try to attach to existing transceiver first + attached = False + for t in pc.getTransceivers(): + if t.kind == 'video': + try: + if t.sender is not None: + await t.sender.replace_track(track) + attached = True + logger.info("Attached track to existing transceiver") + break + except Exception: + continue + + if not attached: + try: + pc.addTrack(track) + logger.info('Added track via addTrack fallback') + except Exception as ex: + logger.debug(f'addTrack fallback failed: {ex}') + pc.addTransceiver(track, direction='sendonly') + + answer = await pc.createAnswer() + await pc.setLocalDescription(answer) + + response_data = { + 'sdp': pc.localDescription.sdp, + 'type': pc.localDescription.type + } + + logger.info(f"Sending answer for camera index {index}") + return web.json_response(response_data, headers=headers) + + except Exception as e: + logger.exception("Error handling WebRTC offer") + return web.json_response({'error': str(e)}, status=500, headers=headers) + + def setup_cameras(self, camera_sources: List[str]): + """Setup camera threads for WebRTC streaming""" + self.stop_cameras() # Stop any existing cameras + + self.active_camera_sources = camera_sources + + for i, source in enumerate(camera_sources): + try: + # Create camera thread with unique ID (offset by 1000 to avoid conflicts with main app) + thread_id = i + 1000 + thread = CameraThread(thread_id, source, parent=None) + thread.set_fps(15) # Set reasonable FPS for streaming + thread.frame_ready.connect(self._on_frame_ready) + thread.error_occurred.connect(self._on_camera_error) + + self.camera_threads[i] = thread + self.latest_frames[i] = None + thread.start() + + logger.info(f"WebRTC camera thread started for source {i}: {source}") + except Exception as e: + logger.error(f"Failed to start WebRTC camera thread for source {i}: {e}") + + def _on_frame_ready(self, cam_id, frame): + """Handle frame from camera thread""" + # Adjust camera ID back to our index + actual_index = cam_id - 1000 + if 0 <= actual_index < len(self.active_camera_sources): + self.frame_lock.lock() + try: + self.latest_frames[actual_index] = frame.copy() + finally: + self.frame_lock.unlock() + + def _on_camera_error(self, cam_id, message): + """Handle camera errors""" + actual_index = cam_id - 1000 + logger.error(f"WebRTC camera {actual_index} error: {message}") + + def get_frame(self, index: int) -> Optional[np.ndarray]: + """Get frame for specific camera index""" + self.frame_lock.lock() + try: + frame = self.latest_frames.get(index) + if frame is not None: + return frame.copy() + finally: + self.frame_lock.unlock() + return None + + def stop_cameras(self): + """Stop all camera threads""" + for thread in self.camera_threads.values(): + try: + thread.stop() + thread.wait(2000) # Wait up to 2 seconds for thread to finish + except Exception as e: + logger.error(f"Error stopping camera thread: {e}") + + self.camera_threads.clear() + self.latest_frames.clear() + logger.info("WebRTC camera threads stopped") + + async def cleanup_peer_connection(self, pc): + try: + self.pcs.discard(pc) + await pc.close() + logger.info("Cleaned up peer connection") + except Exception as e: + logger.error(f"Error cleaning up peer connection: {e}") + + async def start_server(self): + """Async method to start the server""" + if self.is_running: + logger.info("Server already running") + return + + try: + logger.info("Starting WebRTC server...") + self.runner = web.AppRunner(self.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, '0.0.0.0', 1337) + await self.site.start() + self.is_running = True + + # Start frame update loop + self._frame_update_task = asyncio.create_task(self._frame_update_loop()) + + logger.info("WebRTC server started successfully on port 1337") + + except Exception as e: + logger.error(f"Failed to start server: {e}") + self.is_running = False + raise + + async def _frame_update_loop(self): + """Background task to update tracks with frames from camera threads""" + while self.is_running: + try: + # Update each track with its corresponding camera frame + for index in self.tracks.keys(): + if index < len(self.active_camera_sources): + frame = self.get_frame(index) + if frame is not None and isinstance(frame, np.ndarray): + if len(frame.shape) == 3 and frame.shape[2] == 3: + # Check if frame is not just a black/empty frame + if frame.mean() > 10: + await self.tracks[index].set_frame(frame) + else: + logger.debug(f"Frame from camera {index} appears to be black/empty") + + # Sleep to match camera FPS + await asyncio.sleep(0.033) # ~30 FPS + + except Exception as e: + logger.error(f"Error in frame update loop: {e}") + await asyncio.sleep(1) # Wait longer on error + + async def stop_server(self): + """Async method to stop the server""" + logger.info("Stopping WebRTC server...") + self.is_running = False + + # Stop frame update task + if self._frame_update_task: + self._frame_update_task.cancel() + try: + await self._frame_update_task + except asyncio.CancelledError: + pass + self._frame_update_task = None + + # Stop camera threads + self.stop_cameras() + + try: + # Close all peer connections + close_tasks = [self.cleanup_peer_connection(pc) for pc in list(self.pcs)] + if close_tasks: + await asyncio.gather(*close_tasks, return_exceptions=True) + + if self.site: + await self.site.stop() + if self.runner: + await self.runner.cleanup() + + # Clear all tracks + self.tracks.clear() + self.active_camera_sources.clear() + + logger.info("WebRTC server stopped") + except Exception as e: + logger.error(f"Error stopping server: {e}") + + def start(self, camera_sources: List[str] = None): + """Synchronous start method for main thread""" + if self.is_running: + logger.info("Server already running") + return + + # Setup cameras if sources provided + if camera_sources: + self.setup_cameras(camera_sources) + + def run_server(): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + try: + self._loop.run_until_complete(self.start_server()) + self._loop.run_forever() + except Exception as e: + logger.error(f"Server thread error: {e}") + self.is_running = False + finally: + if self._loop.is_running(): + self._loop.stop() + self._loop.close() + + self._server_thread = threading.Thread(target=run_server, daemon=True) + self._server_thread.start() + + # Wait briefly for server to initialize + import time + for _ in range(10): + if self.is_running: + break + time.sleep(0.1) + + def stop(self): + """Synchronous stop method for main thread""" + if not self.is_running or not self._loop: + return + + try: + # Schedule stop in server thread's event loop + future = asyncio.run_coroutine_threadsafe(self.stop_server(), self._loop) + future.result(timeout=5) + except Exception as e: + logger.error(f"Error stopping server: {e}") + finally: + self._server_thread = None + self._loop = None diff --git a/mucapy/main.py b/mucapy/main.py index d3c51a2..0e5e974 100644 --- a/mucapy/main.py +++ b/mucapy/main.py @@ -95,6 +95,9 @@ class MainWindow(QMainWindow): self.hw_timer.timeout.connect(self.update_hardware_stats) self.hw_timer.start(1000) # Update every second + self.webrtc_server = None # Will hold the server instance + self.webrtc_enabled = False # WebRTC server state + # Set dark theme style style_file = getpath.resource_path("styling/mainwindow.qss") try: @@ -134,6 +137,44 @@ class MainWindow(QMainWindow): # For the CPU Styling we have another one so we set this here! self.sepstyleing = False + def start_webrtc_server(self): + """Start the WebRTC server if not already running""" + if self.webrtc_server is None: + try: + from WebServer import WebRTCVideoServer # Updated class name + + # Get the current camera sources from the detector + camera_sources = [] + if hasattr(self.detector, 'cameras'): + camera_sources = self.detector.cameras + + # Create and start the server with the same camera sources + self.webrtc_server = WebRTCVideoServer(main_window=self) + self.webrtc_server.start(camera_sources) + print("WebRTC Server started on port 1337") + except Exception as e: + QMessageBox.critical(self, "WebRTC Server Error", f"Failed to start server:\n{e}") + self.toggle_webserver_action.setChecked(False) + self.webrtc_server = None + + def stop_webrtc_server(self): + """Stop the WebRTC server""" + if self.webrtc_server is not None: + try: + self.webrtc_server.stop() + self.webrtc_server = None + print("WebRTC Server stopped") + except Exception as e: + QMessageBox.warning(self, "WebRTC Server Warning", f"Failed to stop server:\n{e}") + + def set_webrtc_enabled(self, state: bool): + self.webrtc_enabled = state + if state: + # Only start server if cameras are active + if hasattr(self.detector, 'cameras') and self.detector.cameras: + self.start_webrtc_server() + else: + self.stop_webrtc_server() def load_saved_settings(self): """Load saved settings from configuration""" @@ -241,6 +282,14 @@ class MainWindow(QMainWindow): self.toggle_alert_action.triggered.connect(self.set_alert_enabled) view_menu.addAction(self.toggle_alert_action) + # Add WebServer toggle action + self.toggle_webserver_action = QAction('Enable Web Server', self) + self.toggle_webserver_action.setCheckable(True) + self.toggle_webserver_action.setChecked(False) # Default to off + self.toggle_webserver_action.setStatusTip('Enable WebRTC server for remote viewing') + self.toggle_webserver_action.triggered.connect(self.set_webrtc_enabled) + view_menu.addAction(self.toggle_webserver_action) + # Camera menu self.camera_menu = menubar.addMenu('Cameras') @@ -408,6 +457,10 @@ class MainWindow(QMainWindow): QMessageBox.critical(self, "Error", "Failed to connect to cameras!") return + # Start WebRTC server if enabled + if self.webrtc_enabled: + self.start_webrtc_server() + # Update UI self.update_selection_labels() self.start_btn.setEnabled(False) @@ -693,6 +746,10 @@ class MainWindow(QMainWindow): self.timer.stop() self.detector.disconnect_cameras() + # Stop WebRTC server when cameras stop + if self.webrtc_enabled: + self.stop_webrtc_server() + # Update UI self.start_btn.setEnabled(True) self.stop_btn.setEnabled(False) @@ -1078,6 +1135,12 @@ class MainWindow(QMainWindow): def closeEvent(self, event): """Ensure background workers and timers are stopped to avoid crashes on exit.""" try: + try: + if self.webrtc_server is not None: + self.stop_webrtc_server() + except Exception: + pass + # Stop periodic timers try: if hasattr(self, 'timer') and self.timer is not None: diff --git a/tac.html b/tac.html new file mode 100644 index 0000000..6da6555 --- /dev/null +++ b/tac.html @@ -0,0 +1,74 @@ + + + + +WebRTC Test + + +

WebRTC Test (Camera 0)

+ + + + + diff --git a/test_webrtc.py b/test_webrtc.py new file mode 100644 index 0000000..7fbed8d --- /dev/null +++ b/test_webrtc.py @@ -0,0 +1,16 @@ +import asyncio +from mucapy.WebServer import VideoWebServer + +async def main(): + server = VideoWebServer() + await server.start_server() + print("Server started, press Ctrl+C to stop") + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + print("\nStopping server...") + await server.stop_server() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file