From fb399c3d4e8a9e091530fe46fdc5959322c1f12b Mon Sep 17 00:00:00 2001 From: rattatwinko Date: Wed, 26 Nov 2025 10:23:44 +0100 Subject: [PATCH] scanhandler: scan local area network for any IRC Servers which may be open --- src/ScanWizard.py | 315 ++++++++++++++++++++++++++++++++++++++++++++++ src/main.py | 32 ++++- 2 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 src/ScanWizard.py diff --git a/src/ScanWizard.py b/src/ScanWizard.py new file mode 100644 index 0000000..9c96606 --- /dev/null +++ b/src/ScanWizard.py @@ -0,0 +1,315 @@ +import ipaddress +import logging +import socket +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed + +import psutil +import wx +import wx.adv as adv + +logger = logging.getLogger(__name__) + + +class ScanHandler: + """Fast local network IRC scanner with minimal network overhead.""" + + def __init__(self, timeout=0.35, max_workers=64): + self.timeout = timeout + self.max_workers = max_workers + self._stop_event = threading.Event() + self._thread = None + self.results = [] + self.total_hosts = 0 + + def detect_networks(self): + """Return private IPv4 networks discovered on local interfaces.""" + networks = [] + try: + for iface, addrs in psutil.net_if_addrs().items(): + for addr in addrs: + if addr.family != socket.AF_INET or not addr.netmask: + continue + try: + interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}") + except ValueError: + continue + if not interface.network.is_private: + continue + network = self._cap_network(interface) + label = f"{iface} : {interface.ip} through {network.with_prefixlen} range" + networks.append({"label": label, "cidr": str(network)}) + except Exception as exc: + logger.error("Failed to enumerate interfaces: %s", exc) + + if not networks: + default_net = "192.168.1.0/24" + label = f"Default guess : {default_net}" + networks.append({"label": label, "cidr": default_net}) + return networks + + def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None): + """Launch the threaded scan.""" + if self._thread and self._thread.is_alive(): + return False + try: + network = ipaddress.ip_network(network_cidr, strict=False) + except ValueError: + return False + + hosts = [str(host) for host in network.hosts()] + if not hosts: + hosts = [str(network.network_address)] + self.total_hosts = len(hosts) + self.results.clear() + self._stop_event.clear() + + def _worker(): + logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr) + scanned = 0 + try: + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = {executor.submit(self._probe_host, host, ports): host for host in hosts} + for future in as_completed(futures): + if self._stop_event.is_set(): + break + scanned += 1 + server_info = future.result() + if server_info: + self.results.append(server_info) + if result_cb: + result_cb(server_info) + if progress_cb: + progress_cb(scanned, self.total_hosts) + except Exception as exc: + logger.error("Scan failure: %s", exc) + finally: + if done_cb: + done_cb(self.results) + logger.info("IRC scan finished (%s discovered)", len(self.results)) + + self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True) + self._thread.start() + return True + + def stop_scan(self): + self._stop_event.set() + + def _probe_host(self, host, ports): + if self._stop_event.is_set(): + return None + for port in ports: + try: + with socket.create_connection((host, port), timeout=self.timeout) as sock: + sock.settimeout(0.2) + banner = "" + try: + chunk = sock.recv(256) + if chunk: + banner = chunk.decode(errors="ignore").strip() + except socket.timeout: + banner = "IRC server (silent banner)" + except OSError: + pass + return {"address": host, "port": port, "banner": banner or "IRC server detected"} + except (socket.timeout, ConnectionRefusedError, OSError): + continue + return None + + @staticmethod + def _cap_network(interface): + """Cap huge networks to /24 to keep scans lightweight.""" + if interface.network.prefixlen >= 24: + return interface.network + return ipaddress.ip_network(f"{interface.ip}/24", strict=False) + + +class ScanWizardIntroPage(adv.WizardPageSimple): + def __init__(self, parent, scan_handler): + super().__init__(parent) + self.scan_handler = scan_handler + self.networks = self.scan_handler.detect_networks() + self._build_ui() + + def _build_ui(self): + sizer = wx.BoxSizer(wx.VERTICAL) + + intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.") + intro.Wrap(420) + sizer.Add(intro, 0, wx.ALL, 5) + + sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8) + labels = [net["label"] for net in self.networks] + self.network_choice = wx.Choice(self, choices=labels) + if labels: + self.network_choice.SetSelection(0) + sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5) + + sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8) + self.port_ctrl = wx.TextCtrl(self, value="6667,6697") + sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5) + + self.SetSizer(sizer) + + def get_scan_params(self): + selection = self.network_choice.GetSelection() + if selection == wx.NOT_FOUND: + wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING) + return None + cidr = self.networks[selection]["cidr"] + raw_ports = self.port_ctrl.GetValue().split(",") + ports = [] + for raw in raw_ports: + raw = raw.strip() + if not raw: + continue + try: + port = int(raw) + if 1 <= port <= 65535: + ports.append(port) + except ValueError: + continue + if not ports: + wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING) + return None + try: + network = ipaddress.ip_network(cidr, strict=False) + except ValueError: + wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR) + return None + host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1) + return {"cidr": str(network), "ports": ports, "host_count": host_count} + + +class ScanWizardResultsPage(adv.WizardPageSimple): + def __init__(self, parent, scan_handler, main_frame): + super().__init__(parent) + self.scan_handler = scan_handler + self.main_frame = main_frame + self.discovered = [] + self._build_ui() + + def _build_ui(self): + sizer = wx.BoxSizer(wx.VERTICAL) + + self.summary = wx.StaticText(self, label="Waiting to start…") + sizer.Add(self.summary, 0, wx.ALL, 5) + + self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH) + sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5) + + self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN) + self.results_list.InsertColumn(0, "Address", width=140) + self.results_list.InsertColumn(1, "Port", width=60) + self.results_list.InsertColumn(2, "Details", width=260) + self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons) + self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons) + sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5) + + btn_row = wx.BoxSizer(wx.HORIZONTAL) + self.quick_connect_btn = wx.Button(self, label="Quick Connect") + self.quick_connect_btn.Disable() + self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect) + btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5) + + self.rescan_btn = wx.Button(self, label="Rescan") + self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan) + btn_row.Add(self.rescan_btn, 0) + sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5) + + self.SetSizer(sizer) + + def prepare_for_scan(self, params, start_callback): + self.results_list.DeleteAllItems() + self.discovered = [] + self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}…") + self.gauge.SetRange(max(params["host_count"], 1)) + self.gauge.SetValue(0) + self.quick_connect_btn.Disable() + start_callback(params) + + def on_scan_progress(self, scanned, total): + total = max(total, 1) + self.gauge.SetRange(total) + self.gauge.SetValue(min(scanned, total)) + self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked") + + def on_scan_result(self, server_info): + idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"]) + self.results_list.SetItem(idx, 1, str(server_info["port"])) + self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected")) + self.discovered.append(server_info) + self.summary.SetLabel(f"Found {len(self.discovered)} {"server" if self.discovered == 1 else "servers"}") + + def on_scan_complete(self, results): + if results: + self.summary.SetLabel(f"Scan complete : {len(results)} {"server" if len(results) == 1 else "servers"} ready.") + else: + self.summary.SetLabel("Scan complete : no IRC servers discovered.") + self._toggle_buttons() + + def on_quick_connect(self, event): + row = self.results_list.GetFirstSelected() + if row == -1: + return + server = self.results_list.GetItemText(row, 0) + port = int(self.results_list.GetItemText(row, 1)) + if self.main_frame.quick_connect(server, port): + self.GetParent().EndModal(wx.ID_OK) + + def on_rescan(self, event): + wizard = self.GetParent() + wizard.ShowPage(wizard.intro_page) + + def _toggle_buttons(self, event=None): + has_selection = self.results_list.GetFirstSelected() != -1 + if has_selection: + self.quick_connect_btn.Enable() + else: + self.quick_connect_btn.Disable() + + +class ScanWizardDialog(adv.Wizard): + """Wizard that drives the ScanHandler workflow.""" + + def __init__(self, parent): + super().__init__(parent, title="wxScan") + self.scan_handler = ScanHandler() + self.intro_page = ScanWizardIntroPage(self, self.scan_handler) + self.results_page = ScanWizardResultsPage(self, self.scan_handler, parent) + self._chain_pages() + self.Bind(adv.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing) + self.Bind(adv.EVT_WIZARD_CANCEL, self.on_cancel) + self.FitToPage(self.intro_page) + + def _chain_pages(self): + self.intro_page.SetNext(self.results_page) + self.results_page.SetPrev(self.intro_page) + + def on_page_changing(self, event): + if event.GetDirection() and event.GetPage() is self.intro_page: + params = self.intro_page.get_scan_params() + if not params: + event.Veto() + return + + def start_scan(p): + self.scan_handler.start_scan( + network_cidr=p["cidr"], + ports=p["ports"], + progress_cb=lambda s, t: wx.CallAfter(self.results_page.on_scan_progress, s, t), + result_cb=lambda info: wx.CallAfter(self.results_page.on_scan_result, info), + done_cb=lambda res: wx.CallAfter(self.results_page.on_scan_complete, res), + ) + + self.results_page.prepare_for_scan(params, start_scan) + elif not event.GetDirection() and event.GetPage() is self.results_page: + self.scan_handler.stop_scan() + + def on_cancel(self, event): + self.scan_handler.stop_scan() + event.Skip() + + def run(self): + return self.RunWizard(self.intro_page) + diff --git a/src/main.py b/src/main.py index 523aa30..d76b9a2 100644 --- a/src/main.py +++ b/src/main.py @@ -16,6 +16,7 @@ from PrivacyNoticeDialog import PrivacyNoticeDialog from IRCPanel import IRCPanel from AboutDialog import AboutDialog from NotesDialog import NotesDialog +from ScanWizard import ScanWizardDialog # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -68,7 +69,6 @@ class IRCFrame(wx.Frame): self.away = False self.timestamps = True - # Notes data - Add this self.notes_data = defaultdict(dict) # User color mapping - darker colors for white theme @@ -358,6 +358,7 @@ class IRCFrame(wx.Frame): # Tools menu tools_menu = wx.Menu() + tools_menu.Append(210, "&wxScan\tCtrl+S") tools_menu.Append(208, "&wxNotes\tCtrl+T") # Add Notes menu item tools_menu.Append(201, "&WHOIS User\tCtrl+I") tools_menu.Append(202, "Change &Nick\tCtrl+N") @@ -369,6 +370,7 @@ class IRCFrame(wx.Frame): tools_menu.Append(205, "Set &Highlights") tools_menu.Append(206, "Auto-join Channels") tools_menu.Append(207, "Command Help") + self.Bind(wx.EVT_MENU, self.on_scan_local_network, id=210) self.Bind(wx.EVT_MENU, self.on_notes, id=208) # Bind Notes menu item self.Bind(wx.EVT_MENU, self.on_menu_whois, id=201) self.Bind(wx.EVT_MENU, self.on_menu_change_nick, id=202) @@ -377,6 +379,7 @@ class IRCFrame(wx.Frame): self.Bind(wx.EVT_MENU, self.on_menu_highlights, id=205) self.Bind(wx.EVT_MENU, self.on_menu_autojoin, id=206) self.Bind(wx.EVT_MENU, self.on_menu_help, id=207) + self.Bind(wx.EVT_MENU, self.on_scan_local_network, id=210) menubar.Append(file_menu, "&File") menubar.Append(edit_menu, "&Edit") @@ -882,6 +885,33 @@ Available commands: except Exception as e: logger.error(f"Error adding channel: {e}") + def on_scan_local_network(self, event): + """Launch the local network scan wizard.""" + try: + wizard = ScanWizardDialog(self) + wizard.run() + wizard.Destroy() + except Exception as e: + logger.error(f"Error opening scan wizard: {e}") + self.log_server(f"Scan wizard failed to open: {e}", wx.Colour(255, 0, 0)) + + def quick_connect(self, server, port): + """Populate connection fields and initiate a connection if idle.""" + try: + if self.is_connected() or self.is_connecting: + wx.MessageBox("Please disconnect before using Quick Connect.", "Already connected", wx.OK | wx.ICON_INFORMATION) + return False + self.server_ctrl.SetValue(server) + self.port_ctrl.SetValue(str(port)) + self.SetStatusText(f"Quick connect ready: {server}:{port}") + wx.CallAfter(self.on_connect, None) + self.log_server(f"Quick connecting to {server}:{port}", wx.Colour(0, 0, 128)) + return True + except Exception as e: + logger.error(f"Quick connect failed: {e}") + self.log_server(f"Quick connect failed: {e}", wx.Colour(255, 0, 0)) + return False + # Menu handlers def on_menu_join(self, event): try: