From 00c111da738ca0342dc868a7d5bac5493a12b5b2 Mon Sep 17 00:00:00 2001 From: rattatwinko Date: Wed, 11 Mar 2026 12:14:30 +0100 Subject: [PATCH] some cleanup, seperating classes into their own files. --- src/AboutDialog.py | 31 +-- src/CommandAutocomplete.py | 161 ++++++++++++++++ src/IRCPanel.py | 355 +---------------------------------- src/InterfaceSelectDialog.py | 103 ++++++++++ src/KaomojiPicker.py | 197 +++++++++++++++++++ src/LocalServer.py | 175 +---------------- src/LocalServerManager.py | 179 ++++++++++++++++++ src/ManageChannelsDialog.py | 104 ++++++++++ src/ScanHandler.py | 120 ++++++++++++ src/ScanWizard.py | 292 +--------------------------- src/ScanWizardIntroPage.py | 59 ++++++ src/ScanWizardResultsPage.py | 109 +++++++++++ src/main.py | 205 +------------------- 13 files changed, 1057 insertions(+), 1033 deletions(-) create mode 100644 src/CommandAutocomplete.py create mode 100644 src/InterfaceSelectDialog.py create mode 100644 src/KaomojiPicker.py create mode 100644 src/LocalServerManager.py create mode 100644 src/ManageChannelsDialog.py create mode 100644 src/ScanHandler.py create mode 100644 src/ScanWizardIntroPage.py create mode 100644 src/ScanWizardResultsPage.py diff --git a/src/AboutDialog.py b/src/AboutDialog.py index 07ef244..c8454ea 100644 --- a/src/AboutDialog.py +++ b/src/AboutDialog.py @@ -31,31 +31,20 @@ class AboutDialog(wx.Dialog): info_text = wx.StaticText(self, label="wxIRC Client") info_font = wx.Font(14, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD) info_text.SetFont(info_font) + sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 5) - import base64, hashlib - - self.encoded = base64.b64encode( - hashlib.sha256(f"{self.GetId()}-{self.GetHandle()}".encode()).digest() - ).decode('utf-8') - - - version_text = wx.StaticText(self, label=f"wxIRC V:e1") + version_text = wx.StaticText(self, label="Version : 0.2.3") # COMMIT_HUNDRED:COMMIT_TEN:COMMIT_ONE e.g 23 commits -> 0.2.3 version_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL) version_text.SetFont(version_font) - - - rand_hash = wx.StaticText(self, label=f"{self.encoded}") - rand_hash.SetFont(version_font) - - contrubutors_text = wx.StaticText(self, label="This software may not be used for commercial purposes. \n And may not be distributed for commercial purposes.") - contrubutors_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL) - contrubutors_text.SetFont(contrubutors_font) - - # Add info to sizer - sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 5) sizer.Add(version_text, 0, wx.ALL | wx.ALIGN_CENTER, 5) - sizer.Add(rand_hash, 0, wx.ALL | wx.ALIGN_CENTER, 5) - sizer.Add(contrubutors_text, 0, wx.ALL | wx.ALIGN_CENTER, 5) + + contributors_text = wx.StaticText(self, label=""" +Contributors: + * rattatwinko (rattatwinko.servecounterstrike.com) + """) + contributors_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL) + contributors_text.SetFont(contributors_font) + sizer.Add(contributors_text, 0, wx.ALL | wx.ALIGN_CENTER, 5) # OK button ok_btn = wx.Button(self, wx.ID_OK, "OK") diff --git a/src/CommandAutocomplete.py b/src/CommandAutocomplete.py new file mode 100644 index 0000000..b57b6f8 --- /dev/null +++ b/src/CommandAutocomplete.py @@ -0,0 +1,161 @@ +import wx +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class CommandAutocomplete(wx.PopupTransientWindow): + """Popup window for IRC command autocomplete, similar to Minecraft.""" + def __init__(self, parent, commands, on_select_callback): + super().__init__(parent, wx.BORDER_SIMPLE) + self.on_select_callback = on_select_callback + self.commands = commands # List of (command, description) tuples + self.filtered_commands = commands.copy() + self.selected_index = 0 + + self._init_ui() + self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy) + + def _init_ui(self): + panel = wx.Panel(self) + main_sizer = wx.BoxSizer(wx.VERTICAL) + + # Command list + self.list_ctrl = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER) + self.list_ctrl.InsertColumn(0, "Command", width=120) + self.list_ctrl.InsertColumn(1, "Description", width=280) + + self._update_list() + + # Bind events + self.list_ctrl.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_item_activated) + self.list_ctrl.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected) + self.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook) + self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus) + + main_sizer.Add(self.list_ctrl, 1, wx.EXPAND | wx.ALL, 2) + panel.SetSizer(main_sizer) + main_sizer.Fit(panel) + self.SetClientSize(panel.GetBestSize()) + + # Select first item + if self.list_ctrl.GetItemCount() > 0: + self.list_ctrl.Select(0) + self.selected_index = 0 + + def _update_list(self): + """Update the list with filtered commands.""" + self.list_ctrl.DeleteAllItems() + for cmd, desc in self.filtered_commands: + idx = self.list_ctrl.InsertItem(self.list_ctrl.GetItemCount(), cmd) + self.list_ctrl.SetItem(idx, 1, desc) + + # Resize to fit content (max 8 items visible) + item_height = 20 + max_items = min(8, len(self.filtered_commands)) + self.list_ctrl.SetSize((410, item_height * max_items + 4)) + self.SetClientSize((410, item_height * max_items + 4)) + + def filter_commands(self, search_text): + """Filter commands based on search text.""" + search_lower = search_text.lower().strip() + if not search_lower: + self.filtered_commands = self.commands.copy() + else: + self.filtered_commands = [ + (cmd, desc) for cmd, desc in self.commands + if cmd.lower().startswith(search_lower) + ] + + self.selected_index = 0 + self._update_list() + if self.list_ctrl.GetItemCount() > 0: + self.list_ctrl.Select(0) + + def on_item_activated(self, event): + """Handle double-click or Enter on item.""" + idx = event.GetIndex() + if 0 <= idx < len(self.filtered_commands): + cmd, _ = self.filtered_commands[idx] + if self.on_select_callback: + self.on_select_callback(cmd) + self.safe_dismiss() + + def on_item_selected(self, event): + """Handle item selection.""" + self.selected_index = event.GetIndex() + + def select_next(self): + """Select next item.""" + if self.list_ctrl.GetItemCount() > 0: + self.selected_index = (self.selected_index + 1) % self.list_ctrl.GetItemCount() + self.list_ctrl.Select(self.selected_index) + self.list_ctrl.EnsureVisible(self.selected_index) + + def select_previous(self): + """Select previous item.""" + if self.list_ctrl.GetItemCount() > 0: + self.selected_index = (self.selected_index - 1) % self.list_ctrl.GetItemCount() + self.list_ctrl.Select(self.selected_index) + self.list_ctrl.EnsureVisible(self.selected_index) + + def get_selected_command(self): + """Get the currently selected command.""" + if 0 <= self.selected_index < len(self.filtered_commands): + return self.filtered_commands[self.selected_index][0] + return None + + def on_char_hook(self, event): + """Handle keyboard events.""" + keycode = event.GetKeyCode() + if keycode == wx.WXK_ESCAPE: + self.safe_dismiss() + elif keycode == wx.WXK_UP: + self.select_previous() + elif keycode == wx.WXK_DOWN: + self.select_next() + elif keycode == wx.WXK_RETURN or keycode == wx.WXK_NUMPAD_ENTER: + cmd = self.get_selected_command() + if cmd and self.on_select_callback: + self.on_select_callback(cmd) + self.safe_dismiss() + else: + event.Skip() + + def on_kill_focus(self, event): + """Handle focus loss.""" + focused = wx.Window.FindFocus() + if focused and (focused == self.list_ctrl or self.IsDescendant(focused)): + event.Skip() + return + wx.CallLater(100, self.safe_dismiss) + event.Skip() + + def on_destroy(self, event): + event.Skip() + + def safe_dismiss(self): + """Safely dismiss the popup.""" + if not self.IsBeingDeleted() and self.IsShown(): + try: + self.Dismiss() + except Exception as e: + logger.error(f"Error dismissing command autocomplete: {e}") + + def show_at_input(self, input_ctrl): + """Show popup near the input control.""" + input_screen_pos = input_ctrl.ClientToScreen((0, 0)) + popup_size = self.GetSize() + display_size = wx.GetDisplaySize() + + # Show above the input, otherwise below + if input_screen_pos.y - popup_size.height > 0: + popup_y = input_screen_pos.y - popup_size.height - 2 + else: + popup_y = input_screen_pos.y + input_ctrl.GetSize().height + 2 + + # Keep popup on screen horizontally + popup_x = max(10, min(input_screen_pos.x, display_size.x - popup_size.width - 10)) + + self.Position((popup_x, popup_y), (0, 0)) + self.Popup() diff --git a/src/IRCPanel.py b/src/IRCPanel.py index 166bb27..f2b6774 100644 --- a/src/IRCPanel.py +++ b/src/IRCPanel.py @@ -5,358 +5,13 @@ import logging from SearchDialog import SearchDialog import traceback +import KaomojiPicker as KaomojiPicker +import CommandAutocomplete as CommandAutocomplete + # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) - -class KaomojiPicker(wx.PopupTransientWindow): - def __init__(self, parent, kaomoji_groups, on_select_callback): - super().__init__(parent, wx.BORDER_SIMPLE) - self.on_select_callback = on_select_callback - self.kaomoji_groups = kaomoji_groups - - self._init_ui() - self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy) - - def _init_ui(self): - panel = wx.Panel(self) - main_sizer = wx.BoxSizer(wx.VERTICAL) - - # Scrolled content area - self.scroll = wx.ScrolledWindow(panel, size=(380, 420), style=wx.VSCROLL) - self.scroll.SetScrollRate(0, 15) - self.scroll_sizer = wx.BoxSizer(wx.VERTICAL) - - # Storage for filtering - self.all_buttons = [] - self.group_headers = {} - self.group_containers = {} - - # Build kaomoji groups - self._build_kaomoji_groups() - - self.scroll.SetSizer(self.scroll_sizer) - main_sizer.Add(self.scroll, 1, wx.EXPAND) - - panel.SetSizer(main_sizer) - main_sizer.Fit(panel) - self.SetClientSize(panel.GetBestSize()) - - # Bind events - self._bind_events(panel) - - def _build_kaomoji_groups(self): - dc = wx.ClientDC(self.scroll) - dc.SetFont(self.scroll.GetFont()) - - self.scroll_sizer.AddSpacer(8) - - for group_name, kaomojis in self.kaomoji_groups.items(): - if not kaomojis: - continue - - # Group header - header = self._create_group_header(group_name) - self.group_headers[group_name] = header - self.scroll_sizer.Add(header, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 12) - - # Wrap sizer for buttons - wrap_sizer = wx.WrapSizer(wx.HORIZONTAL) - - for kaomoji in kaomojis: - btn = self._create_kaomoji_button(kaomoji, dc, group_name, wrap_sizer) - self.all_buttons.append(btn) - wrap_sizer.Add(btn, 0, wx.ALL, 3) - - self.group_containers[group_name] = wrap_sizer - self.scroll_sizer.Add(wrap_sizer, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 12) - - def _create_group_header(self, group_name): - header = wx.StaticText(self.scroll, label=group_name) - font = header.GetFont() - font.SetWeight(wx.FONTWEIGHT_BOLD) - font.SetPointSize(9) - header.SetFont(font) - return header - - def _create_kaomoji_button(self, kaomoji, dc, group_name, wrap_sizer): - text_width, text_height = dc.GetTextExtent(kaomoji) - btn_width = min(max(text_width + 20, 55), 140) - btn_height = text_height + 14 - - btn = wx.Button(self.scroll, label=kaomoji, size=(btn_width, btn_height)) - btn.SetToolTip(f"{kaomoji} - {group_name}") - - btn._kaomoji_text = kaomoji.lower() - btn._kaomoji_group = group_name.lower() - btn._group_name = group_name - - # Bind click event - btn.Bind(wx.EVT_BUTTON, lambda evt, k=kaomoji: self.on_kaomoji_selected(k)) - - return btn - - def _bind_events(self, panel): - panel.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook) - self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus) - - def on_kaomoji_selected(self, kaomoji): - try: - if self.on_select_callback: - self.on_select_callback(kaomoji) - except Exception as e: - logger.error(f"Error in kaomoji selection callback: {e}") - finally: - # Safely dismiss the popup - wx.CallAfter(self.safe_dismiss) - - def on_search(self, event): - try: - search_text = self.search_ctrl.GetValue().lower().strip() - - if not search_text: - self._show_all() - return - - visible_groups = set() - - for btn in self.all_buttons: - # Match kaomoji text or group name - is_match = (search_text in btn._kaomoji_text or - search_text in btn._kaomoji_group) - btn.Show(is_match) - - if is_match: - visible_groups.add(btn._group_name) - - # Show/hide group headers - for group_name, header in self.group_headers.items(): - header.Show(group_name in visible_groups) - - self.scroll.Layout() - self.scroll.FitInside() - - except Exception as e: - logger.error(f"Error in kaomoji search: {e}") - - def on_clear_search(self, event): - self.search_ctrl.SetValue("") - self._show_all() - - def _show_all(self): - for btn in self.all_buttons: - btn.Show() - for header in self.group_headers.values(): - header.Show() - self.scroll.Layout() - self.scroll.FitInside() - - def on_char_hook(self, event): - if event.GetKeyCode() == wx.WXK_ESCAPE: - self.safe_dismiss() - else: - event.Skip() - - def on_kill_focus(self, event): - focused = wx.Window.FindFocus() - if focused and (focused == self.search_ctrl or - focused.GetParent() == self.scroll or - self.scroll.IsDescendant(focused)): - event.Skip() - return - - wx.CallLater(100, self.safe_dismiss) - event.Skip() - - def on_destroy(self, event): - self.all_buttons.clear() - self.group_headers.clear() - self.group_containers.clear() - event.Skip() - - def safe_dismiss(self): - if not self.IsBeingDeleted() and self.IsShown(): - try: - self.Dismiss() - except Exception as e: - logger.error(f"Error dismissing popup: {e}") - - def show_at_button(self, button): - btn_screen_pos = button.ClientToScreen((0, 0)) - popup_size = self.GetSize() - display_size = wx.GetDisplaySize() - - # Try to show above the button, otherwise below - if btn_screen_pos.y - popup_size.height > 0: - popup_y = btn_screen_pos.y - popup_size.height - else: - popup_y = btn_screen_pos.y + button.GetSize().height - - # Keep popup on screen horizontally - popup_x = max(10, min(btn_screen_pos.x, display_size.x - popup_size.width - 10)) - - self.Position((popup_x, popup_y), (0, 0)) - self.Popup() - - -class CommandAutocomplete(wx.PopupTransientWindow): - """Popup window for IRC command autocomplete, similar to Minecraft.""" - def __init__(self, parent, commands, on_select_callback): - super().__init__(parent, wx.BORDER_SIMPLE) - self.on_select_callback = on_select_callback - self.commands = commands # List of (command, description) tuples - self.filtered_commands = commands.copy() - self.selected_index = 0 - - self._init_ui() - self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy) - - def _init_ui(self): - panel = wx.Panel(self) - main_sizer = wx.BoxSizer(wx.VERTICAL) - - # Command list - self.list_ctrl = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER) - self.list_ctrl.InsertColumn(0, "Command", width=120) - self.list_ctrl.InsertColumn(1, "Description", width=280) - - self._update_list() - - # Bind events - self.list_ctrl.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_item_activated) - self.list_ctrl.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected) - self.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook) - self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus) - - main_sizer.Add(self.list_ctrl, 1, wx.EXPAND | wx.ALL, 2) - panel.SetSizer(main_sizer) - main_sizer.Fit(panel) - self.SetClientSize(panel.GetBestSize()) - - # Select first item - if self.list_ctrl.GetItemCount() > 0: - self.list_ctrl.Select(0) - self.selected_index = 0 - - def _update_list(self): - """Update the list with filtered commands.""" - self.list_ctrl.DeleteAllItems() - for cmd, desc in self.filtered_commands: - idx = self.list_ctrl.InsertItem(self.list_ctrl.GetItemCount(), cmd) - self.list_ctrl.SetItem(idx, 1, desc) - - # Resize to fit content (max 8 items visible) - item_height = 20 - max_items = min(8, len(self.filtered_commands)) - self.list_ctrl.SetSize((410, item_height * max_items + 4)) - self.SetClientSize((410, item_height * max_items + 4)) - - def filter_commands(self, search_text): - """Filter commands based on search text.""" - search_lower = search_text.lower().strip() - if not search_lower: - self.filtered_commands = self.commands.copy() - else: - self.filtered_commands = [ - (cmd, desc) for cmd, desc in self.commands - if cmd.lower().startswith(search_lower) - ] - - self.selected_index = 0 - self._update_list() - if self.list_ctrl.GetItemCount() > 0: - self.list_ctrl.Select(0) - - def on_item_activated(self, event): - """Handle double-click or Enter on item.""" - idx = event.GetIndex() - if 0 <= idx < len(self.filtered_commands): - cmd, _ = self.filtered_commands[idx] - if self.on_select_callback: - self.on_select_callback(cmd) - self.safe_dismiss() - - def on_item_selected(self, event): - """Handle item selection.""" - self.selected_index = event.GetIndex() - - def select_next(self): - """Select next item.""" - if self.list_ctrl.GetItemCount() > 0: - self.selected_index = (self.selected_index + 1) % self.list_ctrl.GetItemCount() - self.list_ctrl.Select(self.selected_index) - self.list_ctrl.EnsureVisible(self.selected_index) - - def select_previous(self): - """Select previous item.""" - if self.list_ctrl.GetItemCount() > 0: - self.selected_index = (self.selected_index - 1) % self.list_ctrl.GetItemCount() - self.list_ctrl.Select(self.selected_index) - self.list_ctrl.EnsureVisible(self.selected_index) - - def get_selected_command(self): - """Get the currently selected command.""" - if 0 <= self.selected_index < len(self.filtered_commands): - return self.filtered_commands[self.selected_index][0] - return None - - def on_char_hook(self, event): - """Handle keyboard events.""" - keycode = event.GetKeyCode() - if keycode == wx.WXK_ESCAPE: - self.safe_dismiss() - elif keycode == wx.WXK_UP: - self.select_previous() - elif keycode == wx.WXK_DOWN: - self.select_next() - elif keycode == wx.WXK_RETURN or keycode == wx.WXK_NUMPAD_ENTER: - cmd = self.get_selected_command() - if cmd and self.on_select_callback: - self.on_select_callback(cmd) - self.safe_dismiss() - else: - event.Skip() - - def on_kill_focus(self, event): - """Handle focus loss.""" - focused = wx.Window.FindFocus() - if focused and (focused == self.list_ctrl or self.IsDescendant(focused)): - event.Skip() - return - wx.CallLater(100, self.safe_dismiss) - event.Skip() - - def on_destroy(self, event): - event.Skip() - - def safe_dismiss(self): - """Safely dismiss the popup.""" - if not self.IsBeingDeleted() and self.IsShown(): - try: - self.Dismiss() - except Exception as e: - logger.error(f"Error dismissing command autocomplete: {e}") - - def show_at_input(self, input_ctrl): - """Show popup near the input control.""" - input_screen_pos = input_ctrl.ClientToScreen((0, 0)) - popup_size = self.GetSize() - display_size = wx.GetDisplaySize() - - # Show above the input, otherwise below - if input_screen_pos.y - popup_size.height > 0: - popup_y = input_screen_pos.y - popup_size.height - 2 - else: - popup_y = input_screen_pos.y + input_ctrl.GetSize().height + 2 - - # Keep popup on screen horizontally - popup_x = max(10, min(input_screen_pos.x, display_size.x - popup_size.width - 10)) - - self.Position((popup_x, popup_y), (0, 0)) - self.Popup() - - class IRCPanel(wx.Panel): # IRC commands with descriptions IRC_COMMANDS = [ @@ -673,7 +328,7 @@ class IRCPanel(wx.Panel): self.current_popup.Dismiss() # Create new popup - self.current_popup = KaomojiPicker( + self.current_popup = KaomojiPicker.KaomojiPicker( self, self.KAOMOJI_GROUPS, self.on_kaomoji_insert @@ -746,7 +401,7 @@ class IRCPanel(wx.Panel): try: if not self.command_popup or self.command_popup.IsBeingDeleted(): # Create new popup - self.command_popup = CommandAutocomplete( + self.command_popup = CommandAutocomplete.CommandAutocomplete( # fuckass python self, self.IRC_COMMANDS, self.on_command_select diff --git a/src/InterfaceSelectDialog.py b/src/InterfaceSelectDialog.py new file mode 100644 index 0000000..e6a790a --- /dev/null +++ b/src/InterfaceSelectDialog.py @@ -0,0 +1,103 @@ +import wx +import socket +import logging + +logger = logging.getLogger(__name__) + +class InterfaceSelectDialog(wx.Dialog): + """Dialog that lets the user pick which local interface the server should bind to.""" + + def __init__(self, parent, current_host="127.0.0.1"): + super().__init__(parent, title="Select Network Interface", size=(420, 380)) + try: + self.SetIcon(parent.GetIcon()) + except Exception: + pass + + self.selected_host = current_host + panel = wx.Panel(self) + panel.SetBackgroundColour(parent.theme["window_bg"]) + main_sizer = wx.BoxSizer(wx.VERTICAL) + + info = wx.StaticText( + panel, + label="Choose the Network interface where your server should run:\n" + "You are EXPOSING a Server to your LOCAL Network, this may give away who you are!\n", + ) + info.Wrap(380) + main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 10) + + self.interface_list = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN) + self.interface_list.InsertColumn(0, "Interface", width=180) + self.interface_list.InsertColumn(1, "Address", width=180) + + self.interfaces = self._gather_interfaces() + current_index = 0 + for idx, entry in enumerate[tuple[str, str]](self.interfaces): + name, address = entry + self.interface_list.InsertItem(idx, name) + self.interface_list.SetItem(idx, 1, address) + if address == current_host: + current_index = idx + self.interface_list.Select(current_index) + self.interface_list.EnsureVisible(current_index) + if self.interfaces: + self.selected_host = self.interfaces[current_index][1] + self.interface_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_select) + self.interface_list.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_activate) + main_sizer.Add(self.interface_list, 1, wx.ALL | wx.EXPAND, 10) + + button_bar = wx.StdDialogButtonSizer() + ok_btn = wx.Button(panel, wx.ID_OK) + cancel_btn = wx.Button(panel, wx.ID_CANCEL) + button_bar.AddButton(ok_btn) + button_bar.AddButton(cancel_btn) + button_bar.Realize() + main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 10) + + ok_btn.Bind(wx.EVT_BUTTON, self.on_ok) + + panel.SetSizer(main_sizer) + + def on_select(self, event): + index = event.GetIndex() + _, address = self.interfaces[index] + self.selected_host = address + + def _gather_interfaces(self): + entries = [ + ("Loopback only", "127.0.0.1"), + ("All interfaces", "0.0.0.0"), + ] + try: + import psutil + + seen = {addr for _, addr in entries} + for name, addrs in psutil.net_if_addrs().items(): + for addr in addrs: + if addr.family == socket.AF_INET and addr.address not in seen: + label = f"{name}" + entries.append((label, addr.address)) + seen.add(addr.address) + except Exception as e: + logger.warning(f"Unable to enumerate network interfaces: {e}") + + return entries + + def get_selected_host(self): + return self.selected_host + + def on_activate(self, event): + self.on_select(event) + self.EndModal(wx.ID_OK) + + def on_ok(self, event): + index = self.interface_list.GetFirstSelected() + if index == -1 and self.interfaces: + wx.MessageBox("Select an interface before starting the server.", "No Interface Selected", wx.OK | wx.ICON_INFORMATION) + return + if index != -1: + _, address = self.interfaces[index] + self.selected_host = address + event.Skip() + diff --git a/src/KaomojiPicker.py b/src/KaomojiPicker.py new file mode 100644 index 0000000..26241f5 --- /dev/null +++ b/src/KaomojiPicker.py @@ -0,0 +1,197 @@ +import logging +import traceback +import wx +import wx.adv + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class KaomojiPicker(wx.PopupTransientWindow): + def __init__(self, parent, kaomoji_groups, on_select_callback): + super().__init__(parent, wx.BORDER_SIMPLE) + self.on_select_callback = on_select_callback + self.kaomoji_groups = kaomoji_groups + + self._init_ui() + self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy) + + def _init_ui(self): + panel = wx.Panel(self) + main_sizer = wx.BoxSizer(wx.VERTICAL) + + # Scrolled content area + self.scroll = wx.ScrolledWindow(panel, size=(380, 420), style=wx.VSCROLL) + self.scroll.SetScrollRate(0, 15) + self.scroll_sizer = wx.BoxSizer(wx.VERTICAL) + + # Storage for filtering + self.all_buttons = [] + self.group_headers = {} + self.group_containers = {} + + # Build kaomoji groups + self._build_kaomoji_groups() + + self.scroll.SetSizer(self.scroll_sizer) + main_sizer.Add(self.scroll, 1, wx.EXPAND) + + panel.SetSizer(main_sizer) + main_sizer.Fit(panel) + self.SetClientSize(panel.GetBestSize()) + + # Bind events + self._bind_events(panel) + + def _build_kaomoji_groups(self): + dc = wx.ClientDC(self.scroll) + dc.SetFont(self.scroll.GetFont()) + + self.scroll_sizer.AddSpacer(8) + + for group_name, kaomojis in self.kaomoji_groups.items(): + if not kaomojis: + continue + + # Group header + header = self._create_group_header(group_name) + self.group_headers[group_name] = header + self.scroll_sizer.Add(header, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 12) + + # Wrap sizer for buttons + wrap_sizer = wx.WrapSizer(wx.HORIZONTAL) + + for kaomoji in kaomojis: + btn = self._create_kaomoji_button(kaomoji, dc, group_name, wrap_sizer) + self.all_buttons.append(btn) + wrap_sizer.Add(btn, 0, wx.ALL, 3) + + self.group_containers[group_name] = wrap_sizer + self.scroll_sizer.Add(wrap_sizer, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 12) + + def _create_group_header(self, group_name): + header = wx.StaticText(self.scroll, label=group_name) + font = header.GetFont() + font.SetWeight(wx.FONTWEIGHT_BOLD) + font.SetPointSize(9) + header.SetFont(font) + return header + + def _create_kaomoji_button(self, kaomoji, dc, group_name, wrap_sizer): + text_width, text_height = dc.GetTextExtent(kaomoji) + btn_width = min(max(text_width + 20, 55), 140) + btn_height = text_height + 14 + + btn = wx.Button(self.scroll, label=kaomoji, size=(btn_width, btn_height)) + btn.SetToolTip(f"{kaomoji} - {group_name}") + + btn._kaomoji_text = kaomoji.lower() + btn._kaomoji_group = group_name.lower() + btn._group_name = group_name + + # Bind click event + btn.Bind(wx.EVT_BUTTON, lambda evt, k=kaomoji: self.on_kaomoji_selected(k)) + + return btn + + def _bind_events(self, panel): + panel.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook) + self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus) + + def on_kaomoji_selected(self, kaomoji): + try: + if self.on_select_callback: + self.on_select_callback(kaomoji) + except Exception as e: + logger.error(f"Error in kaomoji selection callback: {e}") + finally: + # Safely dismiss the popup + wx.CallAfter(self.safe_dismiss) + + def on_search(self, event): + try: + search_text = self.search_ctrl.GetValue().lower().strip() + + if not search_text: + self._show_all() + return + + visible_groups = set() + + for btn in self.all_buttons: + # Match kaomoji text or group name + is_match = (search_text in btn._kaomoji_text or + search_text in btn._kaomoji_group) + btn.Show(is_match) + + if is_match: + visible_groups.add(btn._group_name) + + # Show/hide group headers + for group_name, header in self.group_headers.items(): + header.Show(group_name in visible_groups) + + self.scroll.Layout() + self.scroll.FitInside() + + except Exception as e: + logger.error(f"Error in kaomoji search: {e}") + + def on_clear_search(self, event): + self.search_ctrl.SetValue("") + self._show_all() + + def _show_all(self): + for btn in self.all_buttons: + btn.Show() + for header in self.group_headers.values(): + header.Show() + self.scroll.Layout() + self.scroll.FitInside() + + def on_char_hook(self, event): + if event.GetKeyCode() == wx.WXK_ESCAPE: + self.safe_dismiss() + else: + event.Skip() + + def on_kill_focus(self, event): + focused = wx.Window.FindFocus() + if focused and (focused == self.search_ctrl or + focused.GetParent() == self.scroll or + self.scroll.IsDescendant(focused)): + event.Skip() + return + + wx.CallLater(100, self.safe_dismiss) + event.Skip() + + def on_destroy(self, event): + self.all_buttons.clear() + self.group_headers.clear() + self.group_containers.clear() + event.Skip() + + def safe_dismiss(self): + if not self.IsBeingDeleted() and self.IsShown(): + try: + self.Dismiss() + except Exception as e: + logger.error(f"Error dismissing popup: {e}") + + def show_at_button(self, button): + btn_screen_pos = button.ClientToScreen((0, 0)) + popup_size = self.GetSize() + display_size = wx.GetDisplaySize() + + # Try to show above the button, otherwise below + if btn_screen_pos.y - popup_size.height > 0: + popup_y = btn_screen_pos.y - popup_size.height + else: + popup_y = btn_screen_pos.y + button.GetSize().height + + # Keep popup on screen horizontally + popup_x = max(10, min(btn_screen_pos.x, display_size.x - popup_size.width - 10)) + + self.Position((popup_x, popup_y), (0, 0)) + self.Popup() diff --git a/src/LocalServer.py b/src/LocalServer.py index 2214c62..1984aea 100644 --- a/src/LocalServer.py +++ b/src/LocalServer.py @@ -1,18 +1,11 @@ import ipaddress import logging -import socket -import threading from typing import Callable, Iterable, List, Optional - from irc import server as irc_server logger = logging.getLogger(__name__) -def _default_log_callback(message: str, color=None, bold: bool = False): - """Fallback logger when UI callback is not available.""" - logger.info(message) - class LocalOnlyIRCServer(irc_server.IRCServer): """IRC server that only accepts connections from local/LAN addresses.""" @@ -44,172 +37,6 @@ class LocalOnlyIRCServer(irc_server.IRCServer): return False -class LocalServerManager: - """Manages the background IRC server lifecycle.""" - - DEFAULT_CHANNELS = ["#lobby"] - DEFAULT_ALLOWED_NETWORKS = [ - ipaddress.ip_network("127.0.0.0/8"), # Loopback - ipaddress.ip_network("10.0.0.0/8"), # RFC1918 - ipaddress.ip_network("172.16.0.0/12"), # RFC1918 - ipaddress.ip_network("192.168.0.0/16"), # RFC1918 - ipaddress.ip_network("169.254.0.0/16"), # Link-local - ] - - def __init__( - self, - log_callback: Callable[[str, Optional[object], bool], None] = _default_log_callback, - listen_host: str = "0.0.0.0", - listen_port: int = 6667, - ): - self.log_callback = log_callback or _default_log_callback - self.listen_host = listen_host - self.listen_port = listen_port - self.allowed_networks = list(self.DEFAULT_ALLOWED_NETWORKS) - self._channels = list(self.DEFAULT_CHANNELS) - - self._server: Optional[LocalOnlyIRCServer] = None - self._thread: Optional[threading.Thread] = None - self._lock = threading.RLock() - self._running = threading.Event() - self._ready = threading.Event() - self._error: Optional[Exception] = None - - # Public API --------------------------------------------------------- - def start(self, timeout: float = 5.0): - """Start the background IRC server.""" - with self._lock: - if self._running.is_set(): - raise RuntimeError("Local IRC server is already running.") - - self._running.set() - self._ready.clear() - self._error = None - - self._thread = threading.Thread( - target=self._serve_forever, - name="Local-IRC-Server", - daemon=True, - ) - self._thread.start() - - if not self._ready.wait(timeout): - self._running.clear() - raise TimeoutError("Local IRC server failed to start in time.") - - if self._error: - raise self._error - - def stop(self, timeout: float = 5.0): - """Stop the IRC server if it is running.""" - with self._lock: - if not self._running.is_set(): - return - - server = self._server - thread = self._thread - - if server: - server.shutdown() - server.server_close() - - if thread: - thread.join(timeout=timeout) - - with self._lock: - self._server = None - self._thread = None - self._running.clear() - self._ready.clear() - - def is_running(self) -> bool: - return self._running.is_set() - - def set_listen_host(self, host: str): - """Update the bind address. Local server must be stopped.""" - with self._lock: - if self._running.is_set(): - raise RuntimeError("Stop the server before changing the interface.") - self.listen_host = host - self._log(f"Local server interface set to {host}.") - - def get_channels(self) -> List[str]: - with self._lock: - return list(self._channels) - - def set_channels(self, channels: Iterable[str]): - cleaned = self._sanitize_channels(channels) - with self._lock: - self._channels = cleaned or list(self.DEFAULT_CHANNELS) - - if self.is_running(): - self._log( - "Channel list updated. Restart local server to apply changes.", - bold=True, - ) - - # Internal helpers --------------------------------------------------- - def _serve_forever(self): - try: - server = LocalOnlyIRCServer( - (self.listen_host, self.listen_port), - irc_server.IRCClient, - self.allowed_networks, - blocked_callback=lambda ip: self._log( - f"Blocked connection attempt from {ip}", bold=False - ), - ) - server.servername = socket.gethostname() or "wxirc-local" - - with self._lock: - self._server = server - seed_channels = self._channels or list(self.DEFAULT_CHANNELS) - for channel in seed_channels: - server.channels.setdefault(channel, irc_server.IRCChannel(channel)) - - self._log( - f"Local IRC server listening on {self.listen_host}:{self.listen_port}", - bold=True, - ) - self._ready.set() - server.serve_forever() - except Exception as exc: - logger.exception("Local IRC server failed: %s", exc) - self._error = exc - self._ready.set() - self._log(f"Local server error: {exc}", bold=True) - finally: - self._running.clear() - self._log("Local IRC server stopped.") - - def _sanitize_channels(self, channels: Iterable[str]) -> List[str]: - unique = [] - seen = set() - for channel in channels: - if not channel: - continue - name = channel.strip() - if not name.startswith("#"): - name = f"#{name}" - if not self._is_valid_channel(name): - continue - if name.lower() not in seen: - unique.append(name) - seen.add(name.lower()) - return unique - - @staticmethod - def _is_valid_channel(name: str) -> bool: - if len(name) < 2: - return False - allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#" - return all(ch in allowed for ch in name) - - def _log(self, message: str, color=None, bold: bool = False): - try: - self.log_callback(message, color, bold) - except Exception: - logger.info(message) if __name__ == "__main__": import argparse @@ -246,7 +73,7 @@ if __name__ == "__main__": ) # Initialize the server manager - manager = LocalServerManager( + manager = LocalServerManager( # pyright: ignore[reportUndefinedVariable] listen_host=args.host, listen_port=args.port ) diff --git a/src/LocalServerManager.py b/src/LocalServerManager.py new file mode 100644 index 0000000..298642b --- /dev/null +++ b/src/LocalServerManager.py @@ -0,0 +1,179 @@ +import ipaddress +import socket +import threading +import logging +from typing import Callable, Iterable, List, Optional +from LocalServer import LocalOnlyIRCServer +from irc import server as irc_server +logger = logging.getLogger(__name__) + +def _default_log_callback(message: str, color=None, bold: bool = False): + """Fallback logger when UI callback is not available.""" + logger.info(message) + +class LocalServerManager: + """Manages the background IRC server lifecycle.""" + + DEFAULT_CHANNELS = ["#lobby"] + DEFAULT_ALLOWED_NETWORKS = [ + ipaddress.ip_network("127.0.0.0/8"), # Loopback + ipaddress.ip_network("10.0.0.0/8"), # RFC1918 + ipaddress.ip_network("172.16.0.0/12"), # RFC1918 + ipaddress.ip_network("192.168.0.0/16"), # RFC1918 + ipaddress.ip_network("169.254.0.0/16"), # Link-local + ] + + def __init__( + self, + log_callback: Callable[[str, Optional[object], bool], None] = _default_log_callback, + listen_host: str = "0.0.0.0", + listen_port: int = 6667, + ): + self.log_callback = log_callback or _default_log_callback + self.listen_host = listen_host + self.listen_port = listen_port + self.allowed_networks = list(self.DEFAULT_ALLOWED_NETWORKS) + self._channels = list(self.DEFAULT_CHANNELS) + + self._server = None + self._thread = None + self._lock = threading.RLock() + self._running = threading.Event() + self._ready = threading.Event() + self._error: Optional[Exception] = None + + # Public API --------------------------------------------------------- + def start(self, timeout: float = 5.0): + """Start the background IRC server.""" + with self._lock: + if self._running.is_set(): + raise RuntimeError("Local IRC server is already running.") + + self._running.set() + self._ready.clear() + self._error = None + + self._thread = threading.Thread( + target=self._serve_forever, + name="Local-IRC-Server", + daemon=True, + ) + self._thread.start() + + if not self._ready.wait(timeout): + self._running.clear() + raise TimeoutError("Local IRC server failed to start in time.") + + if self._error: + raise self._error + + def stop(self, timeout: float = 5.0): + """Stop the IRC server if it is running.""" + with self._lock: + if not self._running.is_set(): + return + + server = self._server + thread = self._thread + + if server: + server.shutdown() + server.server_close() + + if thread: + thread.join(timeout=timeout) + + with self._lock: + self._server = None + self._thread = None + self._running.clear() + self._ready.clear() + + def is_running(self) -> bool: + return self._running.is_set() + + def set_listen_host(self, host: str): + """Update the bind address. Local server must be stopped.""" + with self._lock: + if self._running.is_set(): + raise RuntimeError("Stop the server before changing the interface.") + self.listen_host = host + self._log(f"Local server interface set to {host}.") + + def get_channels(self) -> List[str]: + with self._lock: + return list(self._channels) + + def set_channels(self, channels: Iterable[str]): + cleaned = self._sanitize_channels(channels) + with self._lock: + self._channels = cleaned or list(self.DEFAULT_CHANNELS) + + if self.is_running(): + self._log( + "Channel list updated. Restart local server to apply changes.", + bold=True, + ) + + # Internal helpers --------------------------------------------------- + def _serve_forever(self): + try: + server = LocalOnlyIRCServer( + (self.listen_host, self.listen_port), + irc_server.IRCClient, + self.allowed_networks, + blocked_callback=lambda ip: self._log( + f"Blocked connection attempt from {ip}", bold=False + ), + ) + server.servername = socket.gethostname() or "wxirc-local" + + with self._lock: + self._server = server + seed_channels = self._channels or list(self.DEFAULT_CHANNELS) + for channel in seed_channels: + server.channels.setdefault(channel, irc_server.IRCChannel(channel)) + + self._log( + f"Local IRC server listening on {self.listen_host}:{self.listen_port}", + bold=True, + ) + self._ready.set() + server.serve_forever() + except Exception as exc: + logger.exception("Local IRC server failed: %s", exc) + self._error = exc + self._ready.set() + self._log(f"Local server error: {exc}", bold=True) + finally: + self._running.clear() + self._log("Local IRC server stopped.") + + def _sanitize_channels(self, channels: Iterable[str]) -> List[str]: + unique = [] + seen = set() + for channel in channels: + if not channel: + continue + name = channel.strip() + if not name.startswith("#"): + name = f"#{name}" + if not self._is_valid_channel(name): + continue + if name.lower() not in seen: + unique.append(name) + seen.add(name.lower()) + return unique + + @staticmethod + def _is_valid_channel(name: str) -> bool: + if len(name) < 2: + return False + allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#" + return all(ch in allowed for ch in name) + + def _log(self, message: str, color=None, bold: bool = False): + try: + self.log_callback(message, color, bold) + except Exception: + logger.info(message) diff --git a/src/ManageChannelsDialog.py b/src/ManageChannelsDialog.py new file mode 100644 index 0000000..f4bffb0 --- /dev/null +++ b/src/ManageChannelsDialog.py @@ -0,0 +1,104 @@ +import wx +import wx.adv + +class ManageChannelsDialog(wx.Dialog): + """Simple dialog for curating the local server channel allowlist.""" + + def __init__(self, parent, channels): + super().__init__(parent, title="Manage Local Channels", size=(360, 420)) + try: + self.SetIcon(parent.GetIcon()) + except Exception: + pass + panel = wx.Panel(self) + panel.SetBackgroundColour(parent.theme["window_bg"]) + + main_sizer = wx.BoxSizer(wx.VERTICAL) + info = wx.StaticText( + panel, + label="Channels are shared with anyone on your LAN who joins the built-in server.", + ) + info.Wrap(320) + main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 8) + + self.list_box = wx.ListBox(panel) + for channel in channels: + self.list_box.Append(channel) + main_sizer.Add(self.list_box, 1, wx.ALL | wx.EXPAND, 8) + + input_sizer = wx.BoxSizer(wx.HORIZONTAL) + self.channel_input = wx.TextCtrl(panel, style=wx.TE_PROCESS_ENTER) + self.channel_input.SetHint("#channel-name") + add_btn = wx.Button(panel, label="Add") + input_sizer.Add(self.channel_input, 1, wx.RIGHT, 4) + input_sizer.Add(add_btn, 0) + main_sizer.Add(input_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 8) + + remove_btn = wx.Button(panel, label="Remove Selected") + reset_btn = wx.Button(panel, label="Reset to #lobby") + btn_sizer = wx.BoxSizer(wx.HORIZONTAL) + btn_sizer.Add(remove_btn, 1, wx.RIGHT, 4) + btn_sizer.Add(reset_btn, 1) + main_sizer.Add(btn_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP, 8) + + button_bar = wx.StdDialogButtonSizer() + ok_btn = wx.Button(panel, wx.ID_OK) + cancel_btn = wx.Button(panel, wx.ID_CANCEL) + button_bar.AddButton(ok_btn) + button_bar.AddButton(cancel_btn) + button_bar.Realize() + main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 8) + + panel.SetSizer(main_sizer) + + # Bindings + add_btn.Bind(wx.EVT_BUTTON, self.on_add) + remove_btn.Bind(wx.EVT_BUTTON, self.on_remove) + reset_btn.Bind(wx.EVT_BUTTON, self.on_reset) + self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_add) + ok_btn.Bind(wx.EVT_BUTTON, self.on_ok) + + def get_channels(self): + return [self.list_box.GetString(i) for i in range(self.list_box.GetCount())] + + def on_add(self, event): + value = self.channel_input.GetValue().strip() + if not value: + return + if not value.startswith('#'): + value = f"#{value}" + if not self._is_valid_channel(value): + wx.MessageBox( + "Channel names may contain letters, numbers, -, and _ only.", + "Invalid Channel", + wx.OK | wx.ICON_WARNING, + ) + return + if self.list_box.FindString(value) != wx.NOT_FOUND: + wx.MessageBox("Channel already exists.", "Duplicate Channel", wx.OK | wx.ICON_INFORMATION) + return + self.list_box.Append(value) + self.channel_input.Clear() + + def on_remove(self, event): + selection = self.list_box.GetSelection() + if selection != wx.NOT_FOUND: + self.list_box.Delete(selection) + + def on_reset(self, event): + self.list_box.Clear() + self.list_box.Append("#lobby") + + def on_ok(self, event): + if self.list_box.GetCount() == 0: + wx.MessageBox("Add at least one channel before saving.", "No Channels", wx.OK | wx.ICON_INFORMATION) + return + event.Skip() + + @staticmethod + def _is_valid_channel(name): + if len(name) < 2: + return False + allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#" + return all(ch in allowed for ch in name) + diff --git a/src/ScanHandler.py b/src/ScanHandler.py new file mode 100644 index 0000000..61c6ea9 --- /dev/null +++ b/src/ScanHandler.py @@ -0,0 +1,120 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed +import socket +import threading +import psutil +import ipaddress +import logging + +logger = logging.getLogger(__name__) + +class ScanHandler: + """Fast local network IRC scanner with minimal network overhead.""" + + def __init__(self, timeout=0.35, max_workers=64): + self.timeout = timeout + self.max_workers = max_workers + self._stop_event = threading.Event() + self._thread = None + self.results = [] + self.total_hosts = 0 + + def detect_networks(self): + """Return private IPv4 networks discovered on local interfaces.""" + networks = [] + try: + for iface, addrs in psutil.net_if_addrs().items(): + for addr in addrs: + if addr.family != socket.AF_INET or not addr.netmask: + continue + try: + interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}") + except ValueError: + continue + if not interface.network.is_private: + continue + network = self._cap_network(interface) + label = f"{iface} : {interface.ip} through {network.with_prefixlen} range" + networks.append({"label": label, "cidr": str(network)}) + except Exception as exc: + logger.error("Failed to enumerate interfaces: %s", exc) + + if not networks: + default_net = "192.168.1.0/24" + label = f"Default guess : {default_net}" + networks.append({"label": label, "cidr": default_net}) + return networks + + def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None): + """Launch the threaded scan.""" + if self._thread and self._thread.is_alive(): + return False + try: + network = ipaddress.ip_network(network_cidr, strict=False) + except ValueError: + return False + + hosts = [str(host) for host in network.hosts()] + if not hosts: + hosts = [str(network.network_address)] + self.total_hosts = len(hosts) + self.results.clear() + self._stop_event.clear() + + def _worker(): + logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr) + scanned = 0 + try: + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = {executor.submit(self._probe_host, host, ports): host for host in hosts} + for future in as_completed(futures): + if self._stop_event.is_set(): + break + scanned += 1 + server_info = future.result() + if server_info: + self.results.append(server_info) + if result_cb: + result_cb(server_info) + if progress_cb: + progress_cb(scanned, self.total_hosts) + except Exception as exc: + logger.error("Scan failure: %s", exc) + finally: + if done_cb: + done_cb(self.results) + logger.info("IRC scan finished (%s discovered)", len(self.results)) + + self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True) + self._thread.start() + return True + + def stop_scan(self): + self._stop_event.set() + + def _probe_host(self, host, ports): + if self._stop_event.is_set(): + return None + for port in ports: + try: + with socket.create_connection((host, port), timeout=self.timeout) as sock: + sock.settimeout(0.2) + banner = "" + try: + chunk = sock.recv(256) + if chunk: + banner = chunk.decode(errors="ignore").strip() + except socket.timeout: + banner = "IRC server (silent banner)" + except OSError: + pass + return {"address": host, "port": port, "banner": banner or "IRC server detected"} + except (socket.timeout, ConnectionRefusedError, OSError): + continue + return None + + @staticmethod + def _cap_network(interface): + """Cap huge networks to /24 to keep scans lightweight.""" + if interface.network.prefixlen >= 24: + return interface.network + return ipaddress.ip_network(f"{interface.ip}/24", strict=False) diff --git a/src/ScanWizard.py b/src/ScanWizard.py index ff83008..7ef06c0 100644 --- a/src/ScanWizard.py +++ b/src/ScanWizard.py @@ -1,299 +1,19 @@ -import ipaddress import logging -import socket -import threading -from concurrent.futures import ThreadPoolExecutor, as_completed +import ScanHandler +import ScanWizardIntroPage +import ScanWizardResultsPage -import psutil import wx import wx.adv as adv logger = logging.getLogger(__name__) - -class ScanHandler: - """Fast local network IRC scanner with minimal network overhead.""" - - def __init__(self, timeout=0.35, max_workers=64): - self.timeout = timeout - self.max_workers = max_workers - self._stop_event = threading.Event() - self._thread = None - self.results = [] - self.total_hosts = 0 - - def detect_networks(self): - """Return private IPv4 networks discovered on local interfaces.""" - networks = [] - try: - for iface, addrs in psutil.net_if_addrs().items(): - for addr in addrs: - if addr.family != socket.AF_INET or not addr.netmask: - continue - try: - interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}") - except ValueError: - continue - if not interface.network.is_private: - continue - network = self._cap_network(interface) - label = f"{iface} : {interface.ip} through {network.with_prefixlen} range" - networks.append({"label": label, "cidr": str(network)}) - except Exception as exc: - logger.error("Failed to enumerate interfaces: %s", exc) - - if not networks: - default_net = "192.168.1.0/24" - label = f"Default guess : {default_net}" - networks.append({"label": label, "cidr": default_net}) - return networks - - def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None): - """Launch the threaded scan.""" - if self._thread and self._thread.is_alive(): - return False - try: - network = ipaddress.ip_network(network_cidr, strict=False) - except ValueError: - return False - - hosts = [str(host) for host in network.hosts()] - if not hosts: - hosts = [str(network.network_address)] - self.total_hosts = len(hosts) - self.results.clear() - self._stop_event.clear() - - def _worker(): - logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr) - scanned = 0 - try: - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - futures = {executor.submit(self._probe_host, host, ports): host for host in hosts} - for future in as_completed(futures): - if self._stop_event.is_set(): - break - scanned += 1 - server_info = future.result() - if server_info: - self.results.append(server_info) - if result_cb: - result_cb(server_info) - if progress_cb: - progress_cb(scanned, self.total_hosts) - except Exception as exc: - logger.error("Scan failure: %s", exc) - finally: - if done_cb: - done_cb(self.results) - logger.info("IRC scan finished (%s discovered)", len(self.results)) - - self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True) - self._thread.start() - return True - - def stop_scan(self): - self._stop_event.set() - - def _probe_host(self, host, ports): - if self._stop_event.is_set(): - return None - for port in ports: - try: - with socket.create_connection((host, port), timeout=self.timeout) as sock: - sock.settimeout(0.2) - banner = "" - try: - chunk = sock.recv(256) - if chunk: - banner = chunk.decode(errors="ignore").strip() - except socket.timeout: - banner = "IRC server (silent banner)" - except OSError: - pass - return {"address": host, "port": port, "banner": banner or "IRC server detected"} - except (socket.timeout, ConnectionRefusedError, OSError): - continue - return None - - @staticmethod - def _cap_network(interface): - """Cap huge networks to /24 to keep scans lightweight.""" - if interface.network.prefixlen >= 24: - return interface.network - return ipaddress.ip_network(f"{interface.ip}/24", strict=False) - - -class ScanWizardIntroPage(adv.WizardPageSimple): - def __init__(self, parent, scan_handler): - super().__init__(parent) - self.scan_handler = scan_handler - self.networks = self.scan_handler.detect_networks() - self._build_ui() - - def _build_ui(self): - sizer = wx.BoxSizer(wx.VERTICAL) - - intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.") - intro.Wrap(420) - sizer.Add(intro, 0, wx.ALL, 5) - - sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8) - labels = [net["label"] for net in self.networks] - self.network_choice = wx.Choice(self, choices=labels) - if labels: - self.network_choice.SetSelection(0) - sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5) - - sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8) - self.port_ctrl = wx.TextCtrl(self, value="6667,6697") - sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5) - - self.SetSizer(sizer) - - def get_scan_params(self): - selection = self.network_choice.GetSelection() - if selection == wx.NOT_FOUND: - wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING) - return None - cidr = self.networks[selection]["cidr"] - raw_ports = self.port_ctrl.GetValue().split(",") - ports = [] - for raw in raw_ports: - raw = raw.strip() - if not raw: - continue - try: - port = int(raw) - if 1 <= port <= 65535: - ports.append(port) - except ValueError: - continue - if not ports: - wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING) - return None - try: - network = ipaddress.ip_network(cidr, strict=False) - except ValueError: - wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR) - return None - host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1) - return {"cidr": str(network), "ports": ports, "host_count": host_count} - - -class ScanWizardResultsPage(adv.WizardPageSimple): - def __init__(self, parent, scan_handler, main_frame): - super().__init__(parent) - self.scan_handler = scan_handler - self.main_frame = main_frame - self.discovered = [] - self._build_ui() - - def _build_ui(self): - sizer = wx.BoxSizer(wx.VERTICAL) - - self.summary = wx.StaticText(self, label="Waiting to start…") - sizer.Add(self.summary, 0, wx.ALL, 5) - - self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH) - sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5) - - self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN) - self.results_list.InsertColumn(0, "Address", width=140) - self.results_list.InsertColumn(1, "Port", width=60) - self.results_list.InsertColumn(2, "Details", width=260) - self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons) - self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons) - sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5) - - btn_row = wx.BoxSizer(wx.HORIZONTAL) - self.quick_connect_btn = wx.Button(self, label="Quick Connect") - self.quick_connect_btn.Disable() - self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect) - btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5) - - self.rescan_btn = wx.Button(self, label="Rescan") - self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan) - btn_row.Add(self.rescan_btn, 0) - sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5) - - self.SetSizer(sizer) - - def prepare_for_scan(self, params, start_callback): - self.results_list.DeleteAllItems() - self.discovered = [] - self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}…") - self.gauge.SetRange(max(params["host_count"], 1)) - self.gauge.SetValue(0) - self.quick_connect_btn.Disable() - start_callback(params) - - def on_scan_progress(self, scanned, total): - try: - total = max(total, 1) - self.gauge.SetRange(total) - self.gauge.SetValue(min(scanned, total)) - self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked") - except RuntimeError: - # C++ SHIT - logger.debug("Scan progress update after controls destroyed; ignoring") - - def on_scan_result(self, server_info): - """Handle a single discovered server row.""" - try: - idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"]) - self.results_list.SetItem(idx, 1, str(server_info["port"])) - self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected")) - self.discovered.append(server_info) - self.summary.SetLabel( - f"Found {len(self.discovered)} {'server' if len(self.discovered) == 1 else 'servers'}" - ) - except RuntimeError: - logger.debug("Scan result update after controls destroyed; ignoring") - - def on_scan_complete(self, results): - """Final scan completion callback.""" - try: - if results: - self.summary.SetLabel( - f"Scan complete : {len(results)} " - f"{'server' if len(results) == 1 else 'servers'} ready." - ) - else: - self.summary.SetLabel("Scan complete : no IRC servers discovered.") - self._toggle_buttons() - except RuntimeError: - logger.debug("Scan completion update after controls destroyed; ignoring") - - def on_quick_connect(self, event): - row = self.results_list.GetFirstSelected() - if row == -1: - return - server = self.results_list.GetItemText(row, 0) - port = int(self.results_list.GetItemText(row, 1)) - if self.main_frame.quick_connect(server, port): - self.GetParent().EndModal(wx.ID_OK) - - def on_rescan(self, event): - wizard = self.GetParent() - wizard.ShowPage(wizard.intro_page) - - def _toggle_buttons(self, event=None): - has_selection = self.results_list.GetFirstSelected() != -1 - if has_selection: - self.quick_connect_btn.Enable() - else: - self.quick_connect_btn.Disable() - - class ScanWizardDialog(adv.Wizard): - """Wizard that drives the ScanHandler workflow.""" - def __init__(self, parent): super().__init__(parent, title="wxScan") - self.scan_handler = ScanHandler() - self.intro_page = ScanWizardIntroPage(self, self.scan_handler) - self.results_page = ScanWizardResultsPage(self, self.scan_handler, parent) + self.scan_handler = ScanHandler.ScanHandler() + self.intro_page = ScanWizardIntroPage.ScanWizardIntroPage(self, self.scan_handler) + self.results_page = ScanWizardResultsPage.ScanWizardResultsPage(self, self.scan_handler, parent) self._chain_pages() self.Bind(adv.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing) self.Bind(adv.EVT_WIZARD_CANCEL, self.on_cancel) diff --git a/src/ScanWizardIntroPage.py b/src/ScanWizardIntroPage.py new file mode 100644 index 0000000..25c7a2e --- /dev/null +++ b/src/ScanWizardIntroPage.py @@ -0,0 +1,59 @@ +import wx +import wx.adv as adv +import ipaddress + +class ScanWizardIntroPage(adv.WizardPageSimple): + def __init__(self, parent, scan_handler): + super().__init__(parent) + self.scan_handler = scan_handler + self.networks = self.scan_handler.detect_networks() + self._build_ui() + + def _build_ui(self): + sizer = wx.BoxSizer(wx.VERTICAL) + + intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.") + intro.Wrap(420) + sizer.Add(intro, 0, wx.ALL, 5) + + sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8) + labels = [net["label"] for net in self.networks] + self.network_choice = wx.Choice(self, choices=labels) + if labels: + self.network_choice.SetSelection(0) + sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5) + + sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8) + self.port_ctrl = wx.TextCtrl(self, value="6667,6697") + sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5) + + self.SetSizer(sizer) + + def get_scan_params(self): + selection = self.network_choice.GetSelection() + if selection == wx.NOT_FOUND: + wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING) + return None + cidr = self.networks[selection]["cidr"] + raw_ports = self.port_ctrl.GetValue().split(",") + ports = [] + for raw in raw_ports: + raw = raw.strip() + if not raw: + continue + try: + port = int(raw) + if 1 <= port <= 65535: + ports.append(port) + except ValueError: + continue + if not ports: + wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING) + return None + try: + network = ipaddress.ip_network(cidr, strict=False) + except ValueError: + wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR) + return None + host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1) + return {"cidr": str(network), "ports": ports, "host_count": host_count} diff --git a/src/ScanWizardResultsPage.py b/src/ScanWizardResultsPage.py new file mode 100644 index 0000000..c1d20a9 --- /dev/null +++ b/src/ScanWizardResultsPage.py @@ -0,0 +1,109 @@ +import wx +import wx.adv as adv +import logging + +logger = logging.getLogger(__name__) + +class ScanWizardResultsPage(adv.WizardPageSimple): + def __init__(self, parent, scan_handler, main_frame): + super().__init__(parent) + self.scan_handler = scan_handler + self.main_frame = main_frame + self.discovered = [] + self._build_ui() + + def _build_ui(self): + sizer = wx.BoxSizer(wx.VERTICAL) + + self.summary = wx.StaticText(self, label="Waiting to start…") + sizer.Add(self.summary, 0, wx.ALL, 5) + + self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH) + sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5) + + self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN) + self.results_list.InsertColumn(0, "Address", width=140) + self.results_list.InsertColumn(1, "Port", width=60) + self.results_list.InsertColumn(2, "Details", width=260) + self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons) + self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons) + sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5) + + btn_row = wx.BoxSizer(wx.HORIZONTAL) + self.quick_connect_btn = wx.Button(self, label="Quick Connect") + self.quick_connect_btn.Disable() + self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect) + btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5) + + self.rescan_btn = wx.Button(self, label="Rescan") + self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan) + btn_row.Add(self.rescan_btn, 0) + sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5) + + self.SetSizer(sizer) + + def prepare_for_scan(self, params, start_callback): + self.results_list.DeleteAllItems() + self.discovered = [] + self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}…") + self.gauge.SetRange(max(params["host_count"], 1)) + self.gauge.SetValue(0) + self.quick_connect_btn.Disable() + start_callback(params) + + def on_scan_progress(self, scanned, total): + try: + total = max(total, 1) + self.gauge.SetRange(total) + self.gauge.SetValue(min(scanned, total)) + self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked") + except RuntimeError: + # C++ SHIT + logger.debug("Scan progress update after controls destroyed; ignoring") + + def on_scan_result(self, server_info): + """Handle a single discovered server row.""" + try: + idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"]) + self.results_list.SetItem(idx, 1, str(server_info["port"])) + self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected")) + self.discovered.append(server_info) + self.summary.SetLabel( + f"Found {len(self.discovered)} {'server' if len(self.discovered) == 1 else 'servers'}" + ) + except RuntimeError: + logger.debug("Scan result update after controls destroyed; ignoring") + + def on_scan_complete(self, results): + """Final scan completion callback.""" + try: + if results: + self.summary.SetLabel( + f"Scan complete : {len(results)} " + f"{'server' if len(results) == 1 else 'servers'} ready." + ) + else: + self.summary.SetLabel("Scan complete : no IRC servers discovered.") + self._toggle_buttons() + except RuntimeError: + logger.debug("Scan completion update after controls destroyed; ignoring") + + def on_quick_connect(self, event): + row = self.results_list.GetFirstSelected() + if row == -1: + return + server = self.results_list.GetItemText(row, 0) + port = int(self.results_list.GetItemText(row, 1)) + if self.main_frame.quick_connect(server, port): + self.GetParent().EndModal(wx.ID_OK) + + def on_rescan(self, event): + wizard = self.GetParent() + wizard.ShowPage(wizard.intro_page) + + def _toggle_buttons(self, event=None): + has_selection = self.results_list.GetFirstSelected() != -1 + if has_selection: + self.quick_connect_btn.Enable() + else: + self.quick_connect_btn.Disable() diff --git a/src/main.py b/src/main.py index 9ebb56a..34adc54 100644 --- a/src/main.py +++ b/src/main.py @@ -17,10 +17,12 @@ import sys from PrivacyNoticeDialog import PrivacyNoticeDialog from IRCPanel import IRCPanel from AboutDialog import AboutDialog +from InterfaceSelectDialog import InterfaceSelectDialog +from ManageChannelsDialog import ManageChannelsDialog from NotesDialog import NotesDialog if os.name == "nt": from Win32API import Win32API ; from Win32SoundHandler import Win32SoundHandler from ScanWizard import ScanWizardDialog -from LocalServer import LocalServerManager +from LocalServerManager import LocalServerManager # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -43,207 +45,6 @@ class UIUpdate: self.args = args self.kwargs = kwargs - -class ManageChannelsDialog(wx.Dialog): - """Simple dialog for curating the local server channel allowlist.""" - - def __init__(self, parent, channels): - super().__init__(parent, title="Manage Local Channels", size=(360, 420)) - try: - self.SetIcon(parent.GetIcon()) - except Exception: - pass - panel = wx.Panel(self) - panel.SetBackgroundColour(parent.theme["window_bg"]) - - main_sizer = wx.BoxSizer(wx.VERTICAL) - info = wx.StaticText( - panel, - label="Channels are shared with anyone on your LAN who joins the built-in server.", - ) - info.Wrap(320) - main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 8) - - self.list_box = wx.ListBox(panel) - for channel in channels: - self.list_box.Append(channel) - main_sizer.Add(self.list_box, 1, wx.ALL | wx.EXPAND, 8) - - input_sizer = wx.BoxSizer(wx.HORIZONTAL) - self.channel_input = wx.TextCtrl(panel, style=wx.TE_PROCESS_ENTER) - self.channel_input.SetHint("#channel-name") - add_btn = wx.Button(panel, label="Add") - input_sizer.Add(self.channel_input, 1, wx.RIGHT, 4) - input_sizer.Add(add_btn, 0) - main_sizer.Add(input_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 8) - - remove_btn = wx.Button(panel, label="Remove Selected") - reset_btn = wx.Button(panel, label="Reset to #lobby") - btn_sizer = wx.BoxSizer(wx.HORIZONTAL) - btn_sizer.Add(remove_btn, 1, wx.RIGHT, 4) - btn_sizer.Add(reset_btn, 1) - main_sizer.Add(btn_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP, 8) - - button_bar = wx.StdDialogButtonSizer() - ok_btn = wx.Button(panel, wx.ID_OK) - cancel_btn = wx.Button(panel, wx.ID_CANCEL) - button_bar.AddButton(ok_btn) - button_bar.AddButton(cancel_btn) - button_bar.Realize() - main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 8) - - panel.SetSizer(main_sizer) - - # Bindings - add_btn.Bind(wx.EVT_BUTTON, self.on_add) - remove_btn.Bind(wx.EVT_BUTTON, self.on_remove) - reset_btn.Bind(wx.EVT_BUTTON, self.on_reset) - self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_add) - ok_btn.Bind(wx.EVT_BUTTON, self.on_ok) - - def get_channels(self): - return [self.list_box.GetString(i) for i in range(self.list_box.GetCount())] - - def on_add(self, event): - value = self.channel_input.GetValue().strip() - if not value: - return - if not value.startswith('#'): - value = f"#{value}" - if not self._is_valid_channel(value): - wx.MessageBox( - "Channel names may contain letters, numbers, -, and _ only.", - "Invalid Channel", - wx.OK | wx.ICON_WARNING, - ) - return - if self.list_box.FindString(value) != wx.NOT_FOUND: - wx.MessageBox("Channel already exists.", "Duplicate Channel", wx.OK | wx.ICON_INFORMATION) - return - self.list_box.Append(value) - self.channel_input.Clear() - - def on_remove(self, event): - selection = self.list_box.GetSelection() - if selection != wx.NOT_FOUND: - self.list_box.Delete(selection) - - def on_reset(self, event): - self.list_box.Clear() - self.list_box.Append("#lobby") - - def on_ok(self, event): - if self.list_box.GetCount() == 0: - wx.MessageBox("Add at least one channel before saving.", "No Channels", wx.OK | wx.ICON_INFORMATION) - return - event.Skip() - - @staticmethod - def _is_valid_channel(name): - if len(name) < 2: - return False - allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#" - return all(ch in allowed for ch in name) - - -class InterfaceSelectDialog(wx.Dialog): - """Dialog that lets the user pick which local interface the server should bind to.""" - - def __init__(self, parent, current_host="127.0.0.1"): - super().__init__(parent, title="Select Network Interface", size=(420, 380)) - try: - self.SetIcon(parent.GetIcon()) - except Exception: - pass - - self.selected_host = current_host - panel = wx.Panel(self) - panel.SetBackgroundColour(parent.theme["window_bg"]) - main_sizer = wx.BoxSizer(wx.VERTICAL) - - info = wx.StaticText( - panel, - label="Choose the Network interface where your server should run:\n" - "You are EXPOSING a Server to your LOCAL Network, this may give away who you are!\n", - ) - info.Wrap(380) - main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 10) - - self.interface_list = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN) - self.interface_list.InsertColumn(0, "Interface", width=180) - self.interface_list.InsertColumn(1, "Address", width=180) - - self.interfaces = self._gather_interfaces() - current_index = 0 - for idx, entry in enumerate[tuple[str, str]](self.interfaces): - name, address = entry - self.interface_list.InsertItem(idx, name) - self.interface_list.SetItem(idx, 1, address) - if address == current_host: - current_index = idx - self.interface_list.Select(current_index) - self.interface_list.EnsureVisible(current_index) - if self.interfaces: - self.selected_host = self.interfaces[current_index][1] - self.interface_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_select) - self.interface_list.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_activate) - main_sizer.Add(self.interface_list, 1, wx.ALL | wx.EXPAND, 10) - - button_bar = wx.StdDialogButtonSizer() - ok_btn = wx.Button(panel, wx.ID_OK) - cancel_btn = wx.Button(panel, wx.ID_CANCEL) - button_bar.AddButton(ok_btn) - button_bar.AddButton(cancel_btn) - button_bar.Realize() - main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 10) - - ok_btn.Bind(wx.EVT_BUTTON, self.on_ok) - - panel.SetSizer(main_sizer) - - def on_select(self, event): - index = event.GetIndex() - _, address = self.interfaces[index] - self.selected_host = address - - def _gather_interfaces(self): - entries = [ - ("Loopback only", "127.0.0.1"), - ("All interfaces", "0.0.0.0"), - ] - try: - import psutil - - seen = {addr for _, addr in entries} - for name, addrs in psutil.net_if_addrs().items(): - for addr in addrs: - if addr.family == socket.AF_INET and addr.address not in seen: - label = f"{name}" - entries.append((label, addr.address)) - seen.add(addr.address) - except Exception as e: - logger.warning(f"Unable to enumerate network interfaces: {e}") - - return entries - - def get_selected_host(self): - return self.selected_host - - def on_activate(self, event): - self.on_select(event) - self.EndModal(wx.ID_OK) - - def on_ok(self, event): - index = self.interface_list.GetFirstSelected() - if index == -1 and self.interfaces: - wx.MessageBox("Select an interface before starting the server.", "No Interface Selected", wx.OK | wx.ICON_INFORMATION) - return - if index != -1: - _, address = self.interfaces[index] - self.selected_host = address - event.Skip() - - class IRCFrame(wx.Frame): def __init__(self): super().__init__(None, title="wxIRC", size=(1200, 700))