scanhandler:

scan local area network for any IRC Servers which may be open
This commit is contained in:
2025-11-26 10:23:44 +01:00
parent e63d94e21c
commit fb399c3d4e
2 changed files with 346 additions and 1 deletions

315
src/ScanWizard.py Normal file
View File

@@ -0,0 +1,315 @@
import ipaddress
import logging
import socket
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import psutil
import wx
import wx.adv as adv
logger = logging.getLogger(__name__)
class ScanHandler:
"""Fast local network IRC scanner with minimal network overhead."""
def __init__(self, timeout=0.35, max_workers=64):
self.timeout = timeout
self.max_workers = max_workers
self._stop_event = threading.Event()
self._thread = None
self.results = []
self.total_hosts = 0
def detect_networks(self):
"""Return private IPv4 networks discovered on local interfaces."""
networks = []
try:
for iface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family != socket.AF_INET or not addr.netmask:
continue
try:
interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}")
except ValueError:
continue
if not interface.network.is_private:
continue
network = self._cap_network(interface)
label = f"{iface} : {interface.ip} through {network.with_prefixlen} range"
networks.append({"label": label, "cidr": str(network)})
except Exception as exc:
logger.error("Failed to enumerate interfaces: %s", exc)
if not networks:
default_net = "192.168.1.0/24"
label = f"Default guess : {default_net}"
networks.append({"label": label, "cidr": default_net})
return networks
def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None):
"""Launch the threaded scan."""
if self._thread and self._thread.is_alive():
return False
try:
network = ipaddress.ip_network(network_cidr, strict=False)
except ValueError:
return False
hosts = [str(host) for host in network.hosts()]
if not hosts:
hosts = [str(network.network_address)]
self.total_hosts = len(hosts)
self.results.clear()
self._stop_event.clear()
def _worker():
logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr)
scanned = 0
try:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._probe_host, host, ports): host for host in hosts}
for future in as_completed(futures):
if self._stop_event.is_set():
break
scanned += 1
server_info = future.result()
if server_info:
self.results.append(server_info)
if result_cb:
result_cb(server_info)
if progress_cb:
progress_cb(scanned, self.total_hosts)
except Exception as exc:
logger.error("Scan failure: %s", exc)
finally:
if done_cb:
done_cb(self.results)
logger.info("IRC scan finished (%s discovered)", len(self.results))
self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True)
self._thread.start()
return True
def stop_scan(self):
self._stop_event.set()
def _probe_host(self, host, ports):
if self._stop_event.is_set():
return None
for port in ports:
try:
with socket.create_connection((host, port), timeout=self.timeout) as sock:
sock.settimeout(0.2)
banner = ""
try:
chunk = sock.recv(256)
if chunk:
banner = chunk.decode(errors="ignore").strip()
except socket.timeout:
banner = "IRC server (silent banner)"
except OSError:
pass
return {"address": host, "port": port, "banner": banner or "IRC server detected"}
except (socket.timeout, ConnectionRefusedError, OSError):
continue
return None
@staticmethod
def _cap_network(interface):
"""Cap huge networks to /24 to keep scans lightweight."""
if interface.network.prefixlen >= 24:
return interface.network
return ipaddress.ip_network(f"{interface.ip}/24", strict=False)
class ScanWizardIntroPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler):
super().__init__(parent)
self.scan_handler = scan_handler
self.networks = self.scan_handler.detect_networks()
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.")
intro.Wrap(420)
sizer.Add(intro, 0, wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8)
labels = [net["label"] for net in self.networks]
self.network_choice = wx.Choice(self, choices=labels)
if labels:
self.network_choice.SetSelection(0)
sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8)
self.port_ctrl = wx.TextCtrl(self, value="6667,6697")
sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5)
self.SetSizer(sizer)
def get_scan_params(self):
selection = self.network_choice.GetSelection()
if selection == wx.NOT_FOUND:
wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING)
return None
cidr = self.networks[selection]["cidr"]
raw_ports = self.port_ctrl.GetValue().split(",")
ports = []
for raw in raw_ports:
raw = raw.strip()
if not raw:
continue
try:
port = int(raw)
if 1 <= port <= 65535:
ports.append(port)
except ValueError:
continue
if not ports:
wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING)
return None
try:
network = ipaddress.ip_network(cidr, strict=False)
except ValueError:
wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR)
return None
host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1)
return {"cidr": str(network), "ports": ports, "host_count": host_count}
class ScanWizardResultsPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler, main_frame):
super().__init__(parent)
self.scan_handler = scan_handler
self.main_frame = main_frame
self.discovered = []
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
self.summary = wx.StaticText(self, label="Waiting to start…")
sizer.Add(self.summary, 0, wx.ALL, 5)
self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH)
sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5)
self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.results_list.InsertColumn(0, "Address", width=140)
self.results_list.InsertColumn(1, "Port", width=60)
self.results_list.InsertColumn(2, "Details", width=260)
self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons)
self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons)
sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5)
btn_row = wx.BoxSizer(wx.HORIZONTAL)
self.quick_connect_btn = wx.Button(self, label="Quick Connect")
self.quick_connect_btn.Disable()
self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect)
btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5)
self.rescan_btn = wx.Button(self, label="Rescan")
self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan)
btn_row.Add(self.rescan_btn, 0)
sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5)
self.SetSizer(sizer)
def prepare_for_scan(self, params, start_callback):
self.results_list.DeleteAllItems()
self.discovered = []
self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}")
self.gauge.SetRange(max(params["host_count"], 1))
self.gauge.SetValue(0)
self.quick_connect_btn.Disable()
start_callback(params)
def on_scan_progress(self, scanned, total):
total = max(total, 1)
self.gauge.SetRange(total)
self.gauge.SetValue(min(scanned, total))
self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked")
def on_scan_result(self, server_info):
idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"])
self.results_list.SetItem(idx, 1, str(server_info["port"]))
self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected"))
self.discovered.append(server_info)
self.summary.SetLabel(f"Found {len(self.discovered)} {"server" if self.discovered == 1 else "servers"}")
def on_scan_complete(self, results):
if results:
self.summary.SetLabel(f"Scan complete : {len(results)} {"server" if len(results) == 1 else "servers"} ready.")
else:
self.summary.SetLabel("Scan complete : no IRC servers discovered.")
self._toggle_buttons()
def on_quick_connect(self, event):
row = self.results_list.GetFirstSelected()
if row == -1:
return
server = self.results_list.GetItemText(row, 0)
port = int(self.results_list.GetItemText(row, 1))
if self.main_frame.quick_connect(server, port):
self.GetParent().EndModal(wx.ID_OK)
def on_rescan(self, event):
wizard = self.GetParent()
wizard.ShowPage(wizard.intro_page)
def _toggle_buttons(self, event=None):
has_selection = self.results_list.GetFirstSelected() != -1
if has_selection:
self.quick_connect_btn.Enable()
else:
self.quick_connect_btn.Disable()
class ScanWizardDialog(adv.Wizard):
"""Wizard that drives the ScanHandler workflow."""
def __init__(self, parent):
super().__init__(parent, title="wxScan")
self.scan_handler = ScanHandler()
self.intro_page = ScanWizardIntroPage(self, self.scan_handler)
self.results_page = ScanWizardResultsPage(self, self.scan_handler, parent)
self._chain_pages()
self.Bind(adv.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing)
self.Bind(adv.EVT_WIZARD_CANCEL, self.on_cancel)
self.FitToPage(self.intro_page)
def _chain_pages(self):
self.intro_page.SetNext(self.results_page)
self.results_page.SetPrev(self.intro_page)
def on_page_changing(self, event):
if event.GetDirection() and event.GetPage() is self.intro_page:
params = self.intro_page.get_scan_params()
if not params:
event.Veto()
return
def start_scan(p):
self.scan_handler.start_scan(
network_cidr=p["cidr"],
ports=p["ports"],
progress_cb=lambda s, t: wx.CallAfter(self.results_page.on_scan_progress, s, t),
result_cb=lambda info: wx.CallAfter(self.results_page.on_scan_result, info),
done_cb=lambda res: wx.CallAfter(self.results_page.on_scan_complete, res),
)
self.results_page.prepare_for_scan(params, start_scan)
elif not event.GetDirection() and event.GetPage() is self.results_page:
self.scan_handler.stop_scan()
def on_cancel(self, event):
self.scan_handler.stop_scan()
event.Skip()
def run(self):
return self.RunWizard(self.intro_page)

View File

@@ -16,6 +16,7 @@ from PrivacyNoticeDialog import PrivacyNoticeDialog
from IRCPanel import IRCPanel
from AboutDialog import AboutDialog
from NotesDialog import NotesDialog
from ScanWizard import ScanWizardDialog
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -68,7 +69,6 @@ class IRCFrame(wx.Frame):
self.away = False
self.timestamps = True
# Notes data - Add this
self.notes_data = defaultdict(dict)
# User color mapping - darker colors for white theme
@@ -358,6 +358,7 @@ class IRCFrame(wx.Frame):
# Tools menu
tools_menu = wx.Menu()
tools_menu.Append(210, "&wxScan\tCtrl+S")
tools_menu.Append(208, "&wxNotes\tCtrl+T") # Add Notes menu item
tools_menu.Append(201, "&WHOIS User\tCtrl+I")
tools_menu.Append(202, "Change &Nick\tCtrl+N")
@@ -369,6 +370,7 @@ class IRCFrame(wx.Frame):
tools_menu.Append(205, "Set &Highlights")
tools_menu.Append(206, "Auto-join Channels")
tools_menu.Append(207, "Command Help")
self.Bind(wx.EVT_MENU, self.on_scan_local_network, id=210)
self.Bind(wx.EVT_MENU, self.on_notes, id=208) # Bind Notes menu item
self.Bind(wx.EVT_MENU, self.on_menu_whois, id=201)
self.Bind(wx.EVT_MENU, self.on_menu_change_nick, id=202)
@@ -377,6 +379,7 @@ class IRCFrame(wx.Frame):
self.Bind(wx.EVT_MENU, self.on_menu_highlights, id=205)
self.Bind(wx.EVT_MENU, self.on_menu_autojoin, id=206)
self.Bind(wx.EVT_MENU, self.on_menu_help, id=207)
self.Bind(wx.EVT_MENU, self.on_scan_local_network, id=210)
menubar.Append(file_menu, "&File")
menubar.Append(edit_menu, "&Edit")
@@ -882,6 +885,33 @@ Available commands:
except Exception as e:
logger.error(f"Error adding channel: {e}")
def on_scan_local_network(self, event):
"""Launch the local network scan wizard."""
try:
wizard = ScanWizardDialog(self)
wizard.run()
wizard.Destroy()
except Exception as e:
logger.error(f"Error opening scan wizard: {e}")
self.log_server(f"Scan wizard failed to open: {e}", wx.Colour(255, 0, 0))
def quick_connect(self, server, port):
"""Populate connection fields and initiate a connection if idle."""
try:
if self.is_connected() or self.is_connecting:
wx.MessageBox("Please disconnect before using Quick Connect.", "Already connected", wx.OK | wx.ICON_INFORMATION)
return False
self.server_ctrl.SetValue(server)
self.port_ctrl.SetValue(str(port))
self.SetStatusText(f"Quick connect ready: {server}:{port}")
wx.CallAfter(self.on_connect, None)
self.log_server(f"Quick connecting to {server}:{port}", wx.Colour(0, 0, 128))
return True
except Exception as e:
logger.error(f"Quick connect failed: {e}")
self.log_server(f"Quick connect failed: {e}", wx.Colour(255, 0, 0))
return False
# Menu handlers
def on_menu_join(self, event):
try: